Apache Kafka provides an unified, high-throughput, low-latency platform for handling real-time data feeds. Installing Apache Kafka, especially the right configuration of Kafka Security including authentication and encryption is kind of a challenge. This should give a brief summary about our experience and lessons learned when trying to install and configure Apache Kafka, the right way.
¶ Do you really need Apache Kafka?
Think about this carefully before you start using Apache Kafka. Don’t get me wrong: Apache Kafka is great in doing stuff it is intended to do but for some use-cases Apache Kafka tend to be a overkill. Going into this topic is outside the scope. I’ve found a good article about this topic: Apache Kafka Use Cases: When To Use It & When Not To. To sum it up in a few words:
Use Kafka for:
- Real-time data processing: Transmitting data from producers to data handlers and then to data storages.
- Application activity tracking: This is the use case Kafka was originally developed for, to be used in LinkedIn: Publishing events in applications to dedicated Kafka topics.
- Logging and/or monitoring: Publishing logs into Kafka topics.
Don’t use Kafka iff:
- You need to process only a small amount of messages per day. Think about using traditional message queues like RabbitMQ.
- You need to apply on-the-fly data transformations or doing ETL jobs (extract, transform, load).
- You need a task queue: Again, RabbitMQ is designed for beeing a task queue.
- You need a database: There is no need for an explanation: Kafka is not designed for persistent data storage.
¶ What do we have?
There are several ways how to setup Apache Kafka but the easiest way to install Apache Kafka is using Ansible and the official Kafka playjobs from Confluent: The Easiest Way to Install Apache Kafka and Confluent Platform – Using Ansible. This can also be used for production i.e. to setup a multi-node cluster including 3 nodes with an Kafka Broker and a Apache ZooKeeper instance on each node. The Kafka Broker is doing the work i.e. consuming and producing messages based on using topics. The Apache ZooKeeper instances are used for maintaining configuration information, naming, providing distributed synchronization, and providing group services. So you can say, that the Apache ZooKeeper is used to manage our Kafka Cluster.
Now the first challenge: You need authN/authZ and SSL encryption for all connections:
- Kafka clients (i.e. your software) are connecting to the Kafka Brokers.
- Each Kafka Broker is connecting to the other brokers.
- Each Kafka Broker is connecting to the ZooKeeper instances.
Each of these connections need SSL encryption, authentication and authorization. Since authorization is done via ACLs the focus of this article is on SSL encryption and authentication.
¶ Kafka Broker Authentication
There are several methods to authenticate Kafka clients against Kafka Brokers. Note that for inter-node communication a Kafka Broker can also be a Kafka client that is connecting to another Kafka Broker.
¶ Mutual TLS (mTLS)
Each Kafka Broker and each Kafka client gets a SSL certificate that supports client authentication. This is easy and bullet-proofed but you need a client certificate for each Kafka Broker and each Kafka client and this is out of scope what e.g. Let’s Encrypt offers. So you need your own PKI infrastructure and probably an own certificate authority to do so.
In SASL PLAIN, user and passwords are stored in each Kafka Broker using SASL PLAIN (not to mix up with no SSL encryption being called PLAINTEXT). So SSL encryption is a must or this doesn’t make sense at all. Further think about what will happen if you add or change a user or a password: You must update and restart each Kafka Broker for each user or password change. So this is still simple to setup but this does not scale well with Kafka Brokers and clients.
See https://docs.confluent.io/current/kafka/authentication_sasl/authentication_sasl_plain.html
Salted Challenge Response Authentication Mechanism (SCRAM) is a family password-based challenge–response authentication mechanisms providing authentication of a user (here our Kafka client or Kafka Broker) to a server (Kafka Broker). If enabled, salted usernames and passwords are stored encrypted in Apache ZooKeeper. So scaling clients without restarting the Kafka Brokers should work. Further no passwords will be send unencrypted over the wire but SSL encryption is still required to prevent interception of SCRAM exchange.
There exist many good articles about SASL/SCRAM in Kafka (i.e. Kafka authentication using SASL/SCRAM from Hussein Moussa). Just ask Google for more!
See https://docs.confluent.io/current/kafka/authentication_sasl/authentication_sasl_scram.html
¶ SASL Delegation Token
Delegation based authentication can be used to complement existing SASL/SSL methods. Typically you authenticate with a Kafka cluster using SSL or SASL to obtain a delegation token. These tokens can be used by Kafka clients to securely connect to the Kafka Brokers. To do so, a primary secret key must be configured accross all Kafka Brokers. The generated delegation token are stored in Apache ZooKeeper.
For a simple and secure approach, there are two main drawbacks: You still need a SASL/SSL secured channel for token creation and you need to implement a token refresh in your client software.
See https://docs.confluent.io/current/kafka/authentication_sasl/authentication_sasl_delegation.html
This is thought to be used in corporate environments that are using Kerberos (i.e. Active Directory): Kafka Brokers and Kafka clients will use Kerberos for authentication. So you need a principal for each Kafka Broker and each Kafka client that will connect to the Kafka cluster. This might be the most secure method but you need a Kerberos server and your clients require to login to Kerberos and use this Kerberos ticket to authenticating to the Kafka Broker.
See https://kafka.apache.org/documentation/#security_sasl_kerberos and https://docs.confluent.io/current/kafka/authentication_sasl/authentication_sasl_gssapi.html.
¶ Apache ZooKeeper Authentication
Apache ZooKeeper is used to manage the Kafka cluster i.e. Kafka topics and their configuration, a list of Kafka Brokers that are part of the cluster and ACLs used for authoriation. To sum up: If you have access to the Apache ZooKeeper, you have access to the Kafka Cluster including all data. So you should consider to properly secure the Apache ZooKeeper as well for production use.
¶ Mutual SSL (mTLS)
As of version 3.5.x, ZooKeeper supports mutual SSL client authentication. As of version 2.5 Kafka began shipping with ZooKeeper versions that support mTLS authentication. You can now enable ZooKeeper authentication with or without SASL authentication. Prior to the 2.5 release, only the DIGEST-MD5 and GSSAPI SASL mechanisms were supported.
See https://www.confluent.io/blog/secure-kafka-deployment-best-practices/
Using MTLS is simple and bullet-proofed but similar to Kafka Broker authentication using mTLS, all other Zookeeper instances, Kafka Brokers and/or CLI Tools need a SSL certificate with client authentication enabled. Remember that this is out of scope from what Let’s Encrypt offers. Further all SSL certificates must share the same Distinguished Name (DN) since this is included in the Zookeeper ACL that allows access to the Zookeeper node. So if you need to use different identities (i.e. different DNs) you should use MTLS and SASL authentication. Here you can choose between the two supported SASL Digest MD5 or SASL GSSAPI using Kerberos.
See https://docs.confluent.io/current/security/zk-security.html#mtls-authentication
¶ SASL Digest MD5
When using SASL Digest MD5 the username and password are stored in plaintext in Apache ZooKeeper and on the ZooKeeper clients (i.e. Kafka Brokers or other ZooKeeper instances). As the name says this sends MD5 hashed passwords over the wire. Further there are several security concerns regarding MD5 hashes: For production use, you should enable TLS encryption to secure MD5 hashed passwords. Further only the brokers will need access to ZooKeeper, so you should limit the access to the ZooKeeper nodes by a firewall.
See https://docs.confluent.io/current/security/zk-security.html#sasl-with-digest-md5
Next to SASL Digest MD5 you can also use Kerberos for ZooKeeper authentication. This is the most secure way for a bullet-proofed authentication in an corporate environment: All Kafka and ZooKeeper clients including Kafka Broker and Apache Zookeeper instances have their own Kerberos principals e.g. from a corperate Active Directory and are using these Kerberos logins for authentication among each other.
See https://docs.confluent.io/current/security/zk-security.html#sasl-with-kerberos
¶ Hands On
Let’s try to setup a kind-of-production-ready Kafka cluster for testing purpose using Ansible Playbooks from Confluent. We use 3 nodes as follows:
Hostname | Purpose |
kafka-1 | ZooKeeper, Broker, Schema Registry |
kafka-2 | ZooKeeper, Broker |
kafka-3 | ZooKeeper, Broker |
Support operating systems for the Confluent Playbooks can be found here. If you plan to go into production, you should consider the hardware requirements here.
¶ Prepare
Let’s checkout the Confluent Playbook and create our hosts.yml
$ git clone https://github.com/confluentinc/cp-ansible
$ cp cp-ansible/hosts_example.yml cp-ansible/hosts.yml
Make sure that the hostname properly resolves on each host and on your Ansible client. For testing-purpose adding these hosts to your
should be fine.
Further update hosts.yml
according to your needs. For now, we just enable a ZooKeeper and Kafka Broker instance on each node and a Schema Registry on kafka-1
ansible_connection: ssh
ansible_user: root
ansible_become: true
ansible_ssh_private_key_file: ~/.ssh/id_rsa
# Enable SSL
ssl_enabled: True
# Brokers: Enable SASL SCRAM
# https://docs.confluent.io/current/installation/cp-ansible/ansible-authenticate.html#configure-sasl-scram-authentication
sasl_protocol: scram
# ZooKeeper: Enable SASL DIGEST-MD5
# https://docs.confluent.io/current/installation/cp-ansible/ansible-authenticate.html
zookeeper_sasl_protocol: digest
As you can see, we further enable SSL encryption using self-signed certificates, SASL SCRAM for Kafka Brokers and SASL DIGEST-MD5 for the ZooKeepers. Next to Kerberos, this seems to be suitable mechanisms to secure a Kafka cluster.
This is for testing-purpose only. If you run a public Kafka cluster, consider to secure the Kafka cluster with additional layers i.e. firewalls and/or VPN.
¶ Install
To run the playbooks you can use all.yml
as follows:
ansible-playbook -i hosts.yml all.yml
For updates or testing-purpose you can run the steps one-by-one as follows:
- Generate the certificate authority if TLS encryption is enabled:
ansible-playbook -i hosts.yml all.yml --tags=certificate_authority
- Install ZooKeeper and Kafka:
ansible-playbook -i hosts.yml all.yml --tags=zookeeper,kafka_broker
¶ Testing
Let’s try to create a topic with a given user and password from outside the cluster. For quick and dirty testing purpose we can use the confluentinc/cp-kafka
docker image which already includes all tools we need.
First we need a admin.properties
file including the security parameters and user credentials. Let’s create and change into directory kafka
where we will store the relevant files for testing-purpose:
mkdir kafka
cd kafka
Let’s create admin.properties
as follows:
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret";
Second we need the Java truststore and keystore including SSL certificates to connect to the cluster. We shamelessly use on of the stores that Ansible has created on the server:
scp root@kafka-1:/var/ssl/private/kafka_broker.truststore.jks truststore.jks
scp root@kafka-1:/var/ssl/private/kafka_broker.keystore.jks keystore.jks
The Confluent Playbook creates some default users and passwords (i.e.
as well asconfluenttruststorepass
. Make sure to change these passwords for production use.
Now let’s run the docker image and change to our mounted working directory /home/appuser/kafka
docker run --entrypoint=/bin/bash --user 0 --rm -t -i -v /etc/hosts:/etc/hosts -v $(pwd)/kafka:/home/appuser/kafka confluentinc/cp-kafka:6.0.0
cd /home/appuser/kafka
¶ Testing Authentication
Let’s try to list existing topics to test authentication. First let’s try to use the prepared admin.properties
as follows:
kafka-topics --bootstrap-server kafka-1:9091 --command-config admin.properties --list
This seems to work, so let’s try to connect with an invalid password and see if it fails:
sed 's/admin-secret/admin-secret-123/g' admin.properties > admin-invalid-password.properties
kafka-topics --bootstrap-server kafka-1:9091 --command-config admin-invalid-password.properties --list
[2020-11-17 17:04:17,243] ERROR [AdminClient clientId=adminclient-1] Connection to node -1 (kafka-1/ failed authentication due to: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512 (org.apache.kafka.clients.NetworkClient)
[2020-11-17 17:04:17,244] WARN [AdminClient clientId=adminclient-1] Metadata update failed due to authentication error (org.apache.kafka.clients.admin.internals.AdminMetadataManager)
org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512
Error while executing topic command : org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512
[2020-11-17 17:04:17,247] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.TopicCommand$AdminClientTopicService.getTopics(TopicCommand.scala:352)
at kafka.admin.TopicCommand$AdminClientTopicService.listTopics(TopicCommand.scala:260)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512
This fails with Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512
, so authentication seems to work.
¶ Testing Authorization
Apache Kafka supports a pluggable authorizer. The default authorizer implementation coming with Apache Kafka is called AclAuthorizer
and is using Apache ZooKeeper to store access control lists (ACLs).
Note that there are Kafka ACLs for controlling access to the Kafka Broker and there are ZooKeeper ACLs that control access to the ZooKeeper nodes. By defaults no ZooKeeper ACLs are created and thus the information in ZooKeeper will be writeable by everyone. So if you go to production, make sure that zookeeper.set.acl
is enabled which is true by default if ZooKeeper is configured to use either Digest or Kerberos security:
enabled: "{{ zookeeper_sasl_protocol in ['kerberos', 'digest'] }}"
zookeeper.set.acl: 'true'
Let’s start and try to list Kafka ACLs using the Docker container and the admin admin.properites
from above:
kafka-acls --bootstrap-server kafka-1:9091 --command-config admin.properties --list
Error while executing ACL command: org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is configured on the broker
No Kafka ACLs means everyone has access to everything. So let’s enable ACLs as described here by changing hosts.yml
as follows:
ansible_connection: ssh
ansible_user: root
ansible_become: true
ansible_ssh_private_key_file: ~/.ssh/id_rsa
# Enable SSL
ssl_enabled: True
# Enable ACLs
authorizer.class.name: kafka.security.authorizer.AclAuthorizer
super.users: User:admin;User:schema_registry
# Implicitly set by "zookeeper_sasl_protocol: digest"
# zookeeper.set.acl: True
As you see, we are adding User:admin
and User:schema_registry
to the list of super.users
since by default, no one is allowed to execute any action and so without this, the Kafka Broker wouldn’t be allowed to create required topics for initialization.
Let’s re-reploy the playbook:
ansible-playbook -i hosts.yml all.yml
Let’s try to list Kafka ACLs with enabled AclAuthorizer
kafka-acls --bootstrap-server kafka-1:9091 --command-config admin.properties --list
No more error and we see the ACLs we have: None. So let’s create a new user user1
and ACLs to allow the user to read and write the topic test
kafka-configs --zookeeper kafka-1:2181 --alter --add-config 'SCRAM-SHA-512=[password=test123]' --entity-type users --entity-name user1
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Completed updating config for entity: user-principal 'user1'.
Wait a minute … did we just create a user without authentication? Well let’s dive into Zookeepers ACL’s and see who can write the config i.e. and create users:
zookeeper-shell kafka-1:2181 getAcl /config/users
Connecting to kafka-1:2181
WatchedEvent state:SyncConnected type:None path:null
: cdrwa
This confirms that by default everyone is allowed to change ZooKeeper’s config and thus by default everyone is allowed to access the Kafka cluster. We should change this for production use and limit the access to authorized users only. This is where zookeeper-security-migration
comes into play: As describe here, this tools sets the required the ZooKeeper ACLs to secure ZooKeeper accordingly. A brief description about ZooKeeper ACLs can be found here: Basically an ACL has three parts:
- The first part is the authentication type. For example,
indicates all authentication types andsasl
indicates the kerberos authentication type. - The second part is the account. For example,
indicates any user. - The third part is permission. For example,
indicates all permissions.
Permission types are as follows:
Description | Name | Details |
Create permission | create(c) | Users with this permission can create znodes in the current znode. |
Delete permission | delete(d) | Users with this permission can delete the current znode. |
Read permission | read(r) | Users with this permission can obtain data of the current znode and list all the child znodes of the current znode. |
Write permission | write(w) | Users with this permission can write data to the current znode and its child znodes. |
Administration permission | admin(a) | Users with this permission can set permission for the current znode. |
So let’s try to secure our ZooKeeper by running zookeeper-security-migration
on one of our Kafka nodes as follows:
ssh root@kafka-1
KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.conf" zookeeper-security-migration --zk-tls-config-file zookeeper.properties --zookeeper.acl secure
Now the access to the znodes
should be limited to authenticated users only. Let’s create a zookeeper_client_jaas.conf
including the user we want to use for accessing the ZooKeeper shell.
Back in our Docker container, create zookeeper_client_jaas.conf
as follows:
cat zookeeper_jaas_client.conf
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
Using the JAAS file, we can connect and authorize with ZooKeeper as follows:
KAFKA_OPTS="-Djava.security.auth.login.config=zookeeper_jaas_client.conf" zookeeper-shell kafka-1:2181
Connecting to kafka-1:2181
Welcome to ZooKeeper!
Let’s check the ACLs that were created by zookeeper-security-migration
i.e. for /config/users
KAFKA_OPTS="-Djava.security.auth.login.config=zookeeper_jaas_client.conf" zookeeper-shell kafka-1:2181 getAcl /config/users
Connecting to kafka-1:2181
: cdrwa
We can see, that only SASL authenticated user admin
is allowed to cdrwa
in path /config/users
So let’s try again to create a user without authentication:
kafka-configs --zookeeper kafka-1:2181 --alter --add-config 'SCRAM-SHA-512=[password=test123]' --entity-type users --entity-name user1
Error while executing config command with args '--zookeeper kafka-1:2181 --alter --add-config SCRAM-SHA-512=[password=test123] --entity-type users --entity-name user2'
org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /config/users/user1
And now with enabled authentication using the created JAAS file:
KAFKA_OPTS="-Djava.security.auth.login.config=zookeeper_jaas_client.conf" kafka-configs --zookeeper kafka-1:2181 --alter --add-config 'SCRAM-SHA-512=[password=test123]' --entity-type users --entity-name user1
Completed updating config for entity: user-principal 'user1'.
We can list the users in ZooKeeper using the zookeeper-shell
as follows:
KAFKA_OPTS="-Djava.security.auth.login.config=zookeeper_jaas_client.conf" zookeeper-shell kafka-1:2181 ls /config/users
Connecting to kafka-1:2181
[admin, client, control_center, kafka_connect, kafka_rest, ksql, schema_registry, user1]
After we’ve created a new user user1
let’s create a topic test
using admin.properties
from above:
kafka-topics --bootstrap-server kafka-1:9092 --create --topic test --command-config admin.properties
Created topic test.
If you hit a
java.lang.OutOfMemoryError: Java heap space
it doesn’t necessarily mean that it’s a memory problem. Some Kafka tools likekafka-topics
have a known issue (or a feature?) that falsely report a OOM error if a client is using the SSL port without setting setting the security protocol i.e. by using--command-config admin.properties
. Also see here
Next let’s check the ACLs for the topic test
kafka-acls --bootstrap-server kafka-1:9091 --command-config admin.properties --list --topic test
Create a new ACL with read/write access for the user user1
to the topic test
as follows:
kafka-acls --bootstrap-server kafka-1:9091 --command-config admin.properties --add --allow-principal User:user1 --operation read --operation write --topic test
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
(principal=User:user1, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:user1, host=*, operation=READ, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
(principal=User:user1, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:user1, host=*, operation=READ, permissionType=ALLOW)
To test our topic, we use the kafka-console-producer
and kafka-console-consumer
scripts. To do so, we need to create a property file i.e. user.properties
for the producer and consumer:
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="test123";
We can use this to start the consumer as follows:
kafka-console-consumer --bootstrap-server=kafka-1:9092 --topic=test --consumer.config user.properties
[2020-11-30 13:55:25,295] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: console-consumer-64923
Processed a total of 0 messages
Well of course, this can’t be the end. So we get another authorization error: Not authorized to access group: console-consumer-64923
. As briefly mentioned here, we need to authorize the user user1
as producer and consumer for the topic test
as follows:
kafka-acls --bootstrap-server kafka-1:9091 --command-config admin.properties --add --allow-principal User:user1 --producer --topic test
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
(principal=User:user1, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:user1, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:user1, host=*, operation=CREATE, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
(principal=User:user1, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:user1, host=*, operation=CREATE, permissionType=ALLOW)
(principal=User:user1, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:user1, host=*, operation=READ, permissionType=ALLOW)
The options
are convenience option to add/remove ACLs for producer role. This will generate ACLs that allows WRITE, DESCRIBE and CREATE on topic.
Note that we have to specify the consumer group as --group
when using ACLs for consumer roles. So let’s create the consumer ACLs for the group console-consumer
as follows:
kafka-acls --bootstrap-server kafka-1:9091 --command-config admin.properties --add --allow-principal User:user1 --consumer --group console-consumer --topic test
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
(principal=User:user1, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:user1, host=*, operation=READ, permissionType=ALLOW)
Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=console-consumer, patternType=LITERAL)`:
(principal=User:user1, host=*, operation=READ, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=console-consumer, patternType=LITERAL)`:
(principal=User:user1, host=*, operation=READ, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
(principal=User:user1, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:user1, host=*, operation=READ, permissionType=ALLOW)
(principal=User:user1, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:user1, host=*, operation=CREATE, permissionType=ALLOW)
So let’s try again to consume a topic:
kafka-console-consumer --bootstrap-server=kafka-1:9092 --topic=test --consumer.config user.properties --group console.consumer
And finally start the producer as follows:
kafka-console-producer --broker-list kafka-1:9092 --topic=test --producer.config user.properties
Type some words into the producer command line. After a short time, they should appear in the consumer console. What a journey!
¶ Restart
In some (many) cases you might have to reset for testing instance and start from scratch. Be carefully if you play around with the ZooKeeper ACLs: I have managed to exclude myself more than once. If you messed it up (and believe me, you will), you can run the following commands to reset your Kafka nodes and to give Ansible a chance to start from scratch:
systemctl stop confluent-server confluent-zookeeper confluent-schema-registry
rm -rf /var/lib/kafka /var/lib/zookeeper /var/lib/confluent /etc/kafka* /etc/confluent-* /var/ssl/private
¶ Conclusion
If you followed my steps above, there is not much more to say. If you managed to setup and secure this 3 node Kafka cluster, you can easily estimate the effort to maintain a Kafka in production. Compare this to the benefits of Kafka and carefuly think about my first question: „Do you really need Apache Kafka?“.
Of course there are a lot of projects where Kafka like kubeless or jHipster is hidden „under the hood“. So you could say that there is no effort for setup and configuration. Well, yes! And if you can give these things in hands of experts that can support you or if you are the expert and this is your job in this project: Perfect! But if not, this is fine as long things are running. But if your Kafka fails in production or you need to investigate issues and dive deeper into your Kafka setup: Things become complicated easily. Google is only partly your friend since the docs and issues you’ll find in the web are also a mess including the documentation of Confluent and Apache Kafka itself.
So my best advice is to keep things as easy as possible. Now it is up to you to decide whether Kafka looks easy or not.