Apache Kafka provides an unified, high-throughput, low-latency platform for handling real-time data feeds. Installing Apache Kafka, especially the right configuration of Kafka Security including authentication and encryption is kind of a challenge. This should give a brief summary about our experience and lessons learned when trying to install and configure Apache Kafka, the right way.


Do you really need Apache Kafka?

Think about this carefully before you start using Apache Kafka. Don’t get me wrong: Apache Kafka is great in doing stuff it is intended to do but for some use-cases Apache Kafka tend to be a overkill. Going into this topic is outside the scope. I’ve found a good article about this topic: Apache Kafka Use Cases: When To Use It & When Not To. To sum it up in a few words:

Use Kafka for:

  • Real-time data processing: Transmitting data from producers to data handlers and then to data storages.
  • Application activity tracking: This is the use case Kafka was originally developed for, to be used in LinkedIn: Publishing events in applications to dedicated Kafka topics.
  • Logging and/or monitoring: Publishing logs into Kafka topics.

Don’t use Kafka iff:

  • You need to process only a small amount of messages per day. Think about using traditional message queues like RabbitMQ.
  • You need to apply on-the-fly data transformations or doing ETL jobs (extract, transform, load).
  • You need a task queue: Again, RabbitMQ is designed for beeing a task queue.
  • You need a database: There is no need for an explanation: Kafka is not designed for persistent data storage.

What do we have?

There are several ways how to setup Apache Kafka but the easiest way to install Apache Kafka is using Ansible and the official Kafka playjobs from Confluent: The Easiest Way to Install Apache Kafka and Confluent Platform – Using Ansible. This can also be used for production i.e. to setup a multi-node cluster including 3 nodes with an Kafka Broker and a Apache ZooKeeper instance on each node. The Kafka Broker is doing the work i.e. consuming and producing messages based on using topics. The Apache ZooKeeper instances are used for maintaining configuration information, naming, providing distributed synchronization, and providing group services. So you can say, that the Apache ZooKeeper is used to manage our Kafka Cluster.

Now the first challenge: You need authN/authZ and SSL encryption for all connections:

  • Kafka clients (i.e. your software) are connecting to the Kafka Brokers.
  • Each Kafka Broker is connecting to the other brokers.
  • Each Kafka Broker is connecting to the ZooKeeper instances.

Each of these connections need SSL encryption, authentication and authorization. Since authorization is done via ACLs the focus of this article is on SSL encryption and authentication.

Kafka Broker Authentication

There are several methods to authenticate Kafka clients against Kafka Brokers. Note that for inter-node communication a Kafka Broker can also be a Kafka client that is connecting to another Kafka Broker.

Mutual TLS (mTLS)

Each Kafka Broker and each Kafka client gets a SSL certificate that supports client authentication. This is easy and bullet-proofed but you need a client certificate for each Kafka Broker and each Kafka client and this is out of scope what e.g. Let’s Encrypt offers. So you need your own PKI infrastructure and probably an own certificate authority to do so.

SASL PLAIN

In SASL PLAIN, user and passwords are stored in each Kafka Broker using SASL PLAIN (not to mix up with no SSL encryption being called PLAINTEXT). So SSL encryption is a must or this doesn’t make sense at all. Further think about what will happen if you add or change a user or a password: You must update and restart each Kafka Broker for each user or password change. So this is still simple to setup but this does not scale well with Kafka Brokers and clients.

See https://docs.confluent.io/current/kafka/authentication_sasl/authentication_sasl_plain.html

SASL SCRAM

Salted Challenge Response Authentication Mechanism (SCRAM) is a family password-based challenge–response authentication mechanisms providing authentication of a user (here our Kafka client or Kafka Broker) to a server (Kafka Broker). If enabled, salted usernames and passwords are stored encrypted in Apache ZooKeeper. So scaling clients without restarting the Kafka Brokers should work. Further no passwords will be send unencrypted over the wire but SSL encryption is still required to prevent interception of SCRAM exchange.

There exist many good articles about SASL/SCRAM in Kafka (i.e. Kafka authentication using SASL/SCRAM from Hussein Moussa). Just ask Google for more!

See https://docs.confluent.io/current/kafka/authentication_sasl/authentication_sasl_scram.html

SASL Delegation Token

Delegation based authentication can be used to complement existing SASL/SSL methods. Typically you authenticate with a Kafka cluster using SSL or SASL to obtain a delegation token. These tokens can be used by Kafka clients to securely connect to the Kafka Brokers. To do so, a primary secret key must be configured accross all Kafka Brokers. The generated delegation token are stored in Apache ZooKeeper.

For a simple and secure approach, there are two main drawbacks: You still need a SASL/SSL secured channel for token creation and you need to implement a token refresh in your client software.

See https://docs.confluent.io/current/kafka/authentication_sasl/authentication_sasl_delegation.html

SASL GSSAPI

This is thought to be used in corporate environments that are using Kerberos (i.e. Active Directory): Kafka Brokers and Kafka clients will use Kerberos for authentication. So you need a principal for each Kafka Broker and each Kafka client that will connect to the Kafka cluster. This might be the most secure method but you need a Kerberos server and your clients require to login to Kerberos and use this Kerberos ticket to authenticating to the Kafka Broker.

See https://kafka.apache.org/documentation/#security_sasl_kerberos and https://docs.confluent.io/current/kafka/authentication_sasl/authentication_sasl_gssapi.html.

Apache ZooKeeper Authentication

Apache ZooKeeper is used to manage the Kafka cluster i.e. Kafka topics and their configuration, a list of Kafka Brokers that are part of the cluster and ACLs used for authoriation. To sum up: If you have access to the Apache ZooKeeper, you have access to the Kafka Cluster including all data. So you should consider to properly secure the Apache ZooKeeper as well for production use.

Mutual SSL (mTLS)

As of version 3.5.x, ZooKeeper supports mutual SSL client authentication. As of version 2.5 Kafka began shipping with ZooKeeper versions that support mTLS authentication. You can now enable ZooKeeper authentication with or without SASL authentication. Prior to the 2.5 release, only the DIGEST-MD5 and GSSAPI SASL mechanisms were supported.

See https://www.confluent.io/blog/secure-kafka-deployment-best-practices/

Using MTLS is simple and bullet-proofed but similar to Kafka Broker authentication using mTLS, all other Zookeeper instances, Kafka Brokers and/or CLI Tools need a SSL certificate with client authentication enabled. Remember that this is out of scope from what Let’s Encrypt offers. Further all SSL certificates must share the same Distinguished Name (DN) since this is included in the Zookeeper ACL that allows access to the Zookeeper node. So if you need to use different identities (i.e. different DNs) you should use MTLS and SASL authentication. Here you can choose between the two supported SASL Digest MD5 or SASL GSSAPI using Kerberos.

See https://docs.confluent.io/current/security/zk-security.html#mtls-authentication

SASL Digest MD5

When using SASL Digest MD5 the username and password are stored in plaintext in Apache ZooKeeper and on the ZooKeeper clients (i.e. Kafka Brokers or other ZooKeeper instances). As the name says this sends MD5 hashed passwords over the wire. Further there are several security concerns regarding MD5 hashes: For production use, you should enable TLS encryption to secure MD5 hashed passwords. Further only the brokers will need access to ZooKeeper, so you should limit the access to the ZooKeeper nodes by a firewall.

See https://docs.confluent.io/current/security/zk-security.html#sasl-with-digest-md5

SASL GSSAPI

Next to SASL Digest MD5 you can also use Kerberos for ZooKeeper authentication. This is the most secure way for a bullet-proofed authentication in an corporate environment: All Kafka and ZooKeeper clients including Kafka Broker and Apache Zookeeper instances have their own Kerberos principals e.g. from a corperate Active Directory and are using these Kerberos logins for authentication among each other.

See https://docs.confluent.io/current/security/zk-security.html#sasl-with-kerberos

Hands On

Let’s try to setup a kind-of-production-ready Kafka cluster for testing purpose using Ansible Playbooks from Confluent. We use 3 nodes as follows:

Hostname Purpose
kafka-1 ZooKeeper, Broker, Schema Registry
kafka-2 ZooKeeper, Broker
kafka-3 ZooKeeper, Broker

Support operating systems for the Confluent Playbooks can be found here. If you plan to go into production, you should consider the hardware requirements here.

Prepare

Let’s checkout the Confluent Playbook and create our hosts.yml file:

$ git clone https://github.com/confluentinc/cp-ansible
$ cp cp-ansible/hosts_example.yml cp-ansible/hosts.yml

Make sure that the hostname properly resolves on each host and on your Ansible client. For testing-purpose adding these hosts to your /etc/hosts should be fine.

Further update hosts.yml according to your needs. For now, we just enable a ZooKeeper and Kafka Broker instance on each node and a Schema Registry on kafka-1:

---
all:
  vars:
    ansible_connection: ssh
    ansible_user: root
    ansible_become: true
    ansible_ssh_private_key_file: ~/.ssh/id_rsa

    # Enable SSL
    ssl_enabled: True

    # Brokers: Enable SASL SCRAM
    # https://docs.confluent.io/current/installation/cp-ansible/ansible-authenticate.html#configure-sasl-scram-authentication
    sasl_protocol: scram

    # ZooKeeper: Enable SASL DIGEST-MD5
    # https://docs.confluent.io/current/installation/cp-ansible/ansible-authenticate.html
    zookeeper_sasl_protocol: digest

zookeeper:
  hosts:
    kafka-1:
    kafka-2:
    kafka-3:

kafka_broker:
  hosts:
    kafka-1:
    kafka-2:
    kafka-3:

schema_registry:
  hosts:
    kafka-1:

As you can see, we further enable SSL encryption using self-signed certificates, SASL SCRAM for Kafka Brokers and SASL DIGEST-MD5 for the ZooKeepers. Next to Kerberos, this seems to be suitable mechanisms to secure a Kafka cluster.

This is for testing-purpose only. If you run a public Kafka cluster, consider to secure the Kafka cluster with additional layers i.e. firewalls and/or VPN.

Install

To run the playbooks you can use all.yml as follows:

ansible-playbook -i hosts.yml all.yml

For updates or testing-purpose you can run the steps one-by-one as follows:

  1. Generate the certificate authority if TLS encryption is enabled:
    ansible-playbook -i hosts.yml all.yml --tags=certificate_authority
    
  2. Install ZooKeeper and Kafka:
    ansible-playbook -i hosts.yml all.yml --tags=zookeeper,kafka_broker
    

Testing

Let’s try to create a topic with a given user and password from outside the cluster. For quick and dirty testing purpose we can use the confluentinc/cp-kafka docker image which already includes all tools we need.

First we need a admin.properties file including the security parameters and user credentials. Let’s create and change into directory kafka where we will store the relevant files for testing-purpose:

mkdir kafka
cd kafka

Let’s create admin.properties as follows:

security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret";
ssl.truststore.location=truststore.jks
ssl.truststore.password=confluenttruststorepass

Second we need the Java truststore and keystore including SSL certificates to connect to the cluster. We shamelessly use on of the stores that Ansible has created on the server:

scp root@kafka-1:/var/ssl/private/kafka_broker.truststore.jks truststore.jks
scp root@kafka-1:/var/ssl/private/kafka_broker.keystore.jks keystore.jks

The Confluent Playbook creates some default users and passwords (i.e. admin and admin-secret as well as confluenttruststorepass. Make sure to change these passwords for production use.

Now let’s run the docker image and change to our mounted working directory /home/appuser/kafka:

docker run --entrypoint=/bin/bash --user 0 --rm -t -i -v /etc/hosts:/etc/hosts -v $(pwd)/kafka:/home/appuser/kafka confluentinc/cp-kafka:6.0.0
cd /home/appuser/kafka

Testing Authentication

Let’s try to list existing topics to test authentication. First let’s try to use the prepared admin.properties as follows:

kafka-topics --bootstrap-server kafka-1:9091 --command-config admin.properties --list
_confluent-license
_confluent-telemetry-metrics

This seems to work, so let’s try to connect with an invalid password and see if it fails:

sed 's/admin-secret/admin-secret-123/g' admin.properties > admin-invalid-password.properties
kafka-topics --bootstrap-server kafka-1:9091 --command-config admin-invalid-password.properties --list
[2020-11-17 17:04:17,243] ERROR [AdminClient clientId=adminclient-1] Connection to node -1 (kafka-1/168.119.186.234:9091) failed authentication due to: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512 (org.apache.kafka.clients.NetworkClient)
[2020-11-17 17:04:17,244] WARN [AdminClient clientId=adminclient-1] Metadata update failed due to authentication error (org.apache.kafka.clients.admin.internals.AdminMetadataManager)
org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512
Error while executing topic command : org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512
[2020-11-17 17:04:17,247] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
    at kafka.admin.TopicCommand$AdminClientTopicService.getTopics(TopicCommand.scala:352)
    at kafka.admin.TopicCommand$AdminClientTopicService.listTopics(TopicCommand.scala:260)
    at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
    at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512
 (kafka.admin.TopicCommand$)

This fails with Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512, so authentication seems to work.

Testing Authorization

Apache Kafka supports a pluggable authorizer. The default authorizer implementation coming with Apache Kafka is called AclAuthorizer and is using Apache ZooKeeper to store access control lists (ACLs).

Note that there are Kafka ACLs for controlling access to the Kafka Broker and there are ZooKeeper ACLs that control access to the ZooKeeper nodes. By defaults no ZooKeeper ACLs are created and thus the information in ZooKeeper will be writeable by everyone. So if you go to production, make sure that zookeeper.set.acl is enabled which is true by default if ZooKeeper is configured to use either Digest or Kerberos security:

  zk_acls:
    enabled: "{{ zookeeper_sasl_protocol in ['kerberos', 'digest'] }}"
    properties:
      zookeeper.set.acl: 'true'

Let’s start and try to list Kafka ACLs using the Docker container and the admin admin.properites from above:

kafka-acls --bootstrap-server kafka-1:9091 --command-config admin.properties --list
Error while executing ACL command: org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is configured on the broker

No Kafka ACLs means everyone has access to everything. So let’s enable ACLs as described here by changing hosts.yml as follows:

---
all:
  vars:
    ansible_connection: ssh
    ansible_user: root
    ansible_become: true
    ansible_ssh_private_key_file: ~/.ssh/id_rsa

    # Enable SSL
    ssl_enabled: True
    
    # Enable ACLs
    kafka_broker_custom_properties:
      authorizer.class.name: kafka.security.authorizer.AclAuthorizer
      super.users: User:admin;User:schema_registry

        # Implicitly set by "zookeeper_sasl_protocol: digest"
    #zookeeper_custom_properties:    
    #  zookeeper.set.acl: True
    ...

As you see, we are adding User:admin and User:schema_registry to the list of super.users since by default, no one is allowed to execute any action and so without this, the Kafka Broker wouldn’t be allowed to create required topics for initialization.

Let’s re-reploy the playbook:

ansible-playbook -i hosts.yml all.yml

Let’s try to list Kafka ACLs with enabled AclAuthorizer:

kafka-acls --bootstrap-server kafka-1:9091 --command-config admin.properties --list


No more error and we see the ACLs we have: None. So let’s create a new user user1 and ACLs to allow the user to read and write the topic test.

kafka-configs --zookeeper kafka-1:2181 --alter --add-config 'SCRAM-SHA-512=[password=test123]' --entity-type users --entity-name user1
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Completed updating config for entity: user-principal 'user1'.

Wait a minute … did we just create a user without authentication? Well let’s dive into Zookeepers ACL’s and see who can write the config i.e. and create users:

zookeeper-shell kafka-1:2181 getAcl /config/users
Connecting to kafka-1:2181

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
'world,'anyone
: cdrwa

This confirms that by default everyone is allowed to change ZooKeeper’s config and thus by default everyone is allowed to access the Kafka cluster. We should change this for production use and limit the access to authorized users only. This is where zookeeper-security-migration comes into play: As describe here, this tools sets the required the ZooKeeper ACLs to secure ZooKeeper accordingly. A brief description about ZooKeeper ACLs can be found here: Basically an ACL has three parts:

  1. The first part is the authentication type. For example, world indicates all authentication types and sasl indicates the kerberos authentication type.
  2. The second part is the account. For example, anyone indicates any user.
  3. The third part is permission. For example, cdrwa indicates all permissions.

Permission types are as follows:

Description Name Details
Create permission create(c) Users with this permission can create znodes in the current znode.
Delete permission delete(d) Users with this permission can delete the current znode.
Read permission read(r) Users with this permission can obtain data of the current znode and list all the child znodes of the current znode.
Write permission write(w) Users with this permission can write data to the current znode and its child znodes.
Administration permission admin(a) Users with this permission can set permission for the current znode.

So let’s try to secure our ZooKeeper by running zookeeper-security-migration on one of our Kafka nodes as follows:

ssh root@kafka-1
KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.conf" zookeeper-security-migration --zk-tls-config-file zookeeper.properties --zookeeper.acl secure

Now the access to the znodes should be limited to authenticated users only. Let’s create a zookeeper_client_jaas.conf including the user we want to use for accessing the ZooKeeper shell.
Back in our Docker container, create zookeeper_client_jaas.conf as follows:

cat zookeeper_jaas_client.conf 
Client {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="admin"
    password="admin-secret";
};

Using the JAAS file, we can connect and authorize with ZooKeeper as follows:

KAFKA_OPTS="-Djava.security.auth.login.config=zookeeper_jaas_client.conf" zookeeper-shell kafka-1:2181
Connecting to kafka-1:2181
Welcome to ZooKeeper!

Let’s check the ACLs that were created by zookeeper-security-migration i.e. for /config/users:

KAFKA_OPTS="-Djava.security.auth.login.config=zookeeper_jaas_client.conf" zookeeper-shell kafka-1:2181 getAcl /config/users
Connecting to kafka-1:2181
'sasl,'admin
: cdrwa

We can see, that only SASL authenticated user admin is allowed to cdrwa in path /config/users.

So let’s try again to create a user without authentication:

kafka-configs --zookeeper kafka-1:2181 --alter --add-config 'SCRAM-SHA-512=[password=test123]' --entity-type users --entity-name user1
Error while executing config command with args '--zookeeper kafka-1:2181 --alter --add-config SCRAM-SHA-512=[password=test123] --entity-type users --entity-name user2'
org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /config/users/user1

And now with enabled authentication using the created JAAS file:

KAFKA_OPTS="-Djava.security.auth.login.config=zookeeper_jaas_client.conf" kafka-configs --zookeeper kafka-1:2181 --alter --add-config 'SCRAM-SHA-512=[password=test123]' --entity-type users --entity-name user1
Completed updating config for entity: user-principal 'user1'.

We can list the users in ZooKeeper using the zookeeper-shell as follows:

KAFKA_OPTS="-Djava.security.auth.login.config=zookeeper_jaas_client.conf" zookeeper-shell kafka-1:2181 ls /config/users
Connecting to kafka-1:2181
[admin, client, control_center, kafka_connect, kafka_rest, ksql, schema_registry, user1]

After we’ve created a new user user1 let’s create a topic test using admin.properties from above:

kafka-topics --bootstrap-server kafka-1:9092 --create --topic test --command-config admin.properties
Created topic test.

If you hit a java.lang.OutOfMemoryError: Java heap space it doesn’t necessarily mean that it’s a memory problem. Some Kafka tools like kafka-topics have a known issue (or a feature?) that falsely report a OOM error if a client is using the SSL port without setting setting the security protocol i.e. by using --command-config admin.properties. Also see here

Next let’s check the ACLs for the topic test:

kafka-acls --bootstrap-server kafka-1:9091 --command-config admin.properties --list --topic test

Create a new ACL with read/write access for the user user1 to the topic test as follows:

kafka-acls --bootstrap-server kafka-1:9091 --command-config admin.properties --add --allow-principal User:user1 --operation read --operation write --topic test
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: 
 	(principal=User:user1, host=*, operation=WRITE, permissionType=ALLOW)
    (principal=User:user1, host=*, operation=READ, permissionType=ALLOW) 
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: 
 	(principal=User:user1, host=*, operation=WRITE, permissionType=ALLOW)
    (principal=User:user1, host=*, operation=READ, permissionType=ALLOW) 

To test our topic, we use the kafka-console-producer and kafka-console-consumer scripts. To do so, we need to create a property file i.e. user.properties for the producer and consumer:

security.protocol=SASL_SSL
ssl.truststore.location=truststore.jks
ssl.truststore.password=confluenttruststorepass
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="test123";

We can use this to start the consumer as follows:

kafka-console-consumer --bootstrap-server=kafka-1:9092 --topic=test --consumer.config user.properties
[2020-11-30 13:55:25,295] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: console-consumer-64923
Processed a total of 0 messages

Well of course, this can’t be the end. So we get another authorization error: Not authorized to access group: console-consumer-64923. As briefly mentioned here, we need to authorize the user user1 as producer and consumer for the topic test as follows:

kafka-acls --bootstrap-server kafka-1:9091 --command-config admin.properties  --add --allow-principal User:user1 --producer --topic test
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: 
 	(principal=User:user1, host=*, operation=WRITE, permissionType=ALLOW)
    (principal=User:user1, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:user1, host=*, operation=CREATE, permissionType=ALLOW) 
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: 
 	(principal=User:user1, host=*, operation=WRITE, permissionType=ALLOW)
    (principal=User:user1, host=*, operation=CREATE, permissionType=ALLOW)
    (principal=User:user1, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:user1, host=*, operation=READ, permissionType=ALLOW) 

The options --consumer and --produver are convenience option to add/remove ACLs for producer role. This will generate ACLs that allows WRITE, DESCRIBE and CREATE on topic.

Note that we have to specify the consumer group as --group when using ACLs for consumer roles. So let’s create the consumer ACLs for the group console-consumer as follows:

kafka-acls --bootstrap-server kafka-1:9091 --command-config admin.properties  --add --allow-principal User:user1 --consumer --group console-consumer --topic test
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: 
 	(principal=User:user1, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:user1, host=*, operation=READ, permissionType=ALLOW) 
Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=console-consumer, patternType=LITERAL)`: 
 	(principal=User:user1, host=*, operation=READ, permissionType=ALLOW) 
Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=console-consumer, patternType=LITERAL)`: 
 	(principal=User:user1, host=*, operation=READ, permissionType=ALLOW) 
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: 
 	(principal=User:user1, host=*, operation=WRITE, permissionType=ALLOW)
    (principal=User:user1, host=*, operation=READ, permissionType=ALLOW)
    (principal=User:user1, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:user1, host=*, operation=CREATE, permissionType=ALLOW)

So let’s try again to consume a topic:

kafka-console-consumer --bootstrap-server=kafka-1:9092 --topic=test --consumer.config user.properties --group console.consumer

And finally start the producer as follows:

kafka-console-producer --broker-list kafka-1:9092 --topic=test --producer.config user.properties
>

Type some words into the producer command line. After a short time, they should appear in the consumer console. What a journey!

Restart

In some (many) cases you might have to reset for testing instance and start from scratch. Be carefully if you play around with the ZooKeeper ACLs: I have managed to exclude myself more than once. If you messed it up (and believe me, you will), you can run the following commands to reset your Kafka nodes and to give Ansible a chance to start from scratch:

systemctl stop confluent-server confluent-zookeeper confluent-schema-registry
rm -rf /var/lib/kafka /var/lib/zookeeper /var/lib/confluent /etc/kafka* /etc/confluent-* /var/ssl/private

Conclusion

If you followed my steps above, there is not much more to say. If you managed to setup and secure this 3 node Kafka cluster, you can easily estimate the effort to maintain a Kafka in production. Compare this to the benefits of Kafka and carefuly think about my first question: „Do you really need Apache Kafka?“.

Of course there are a lot of projects where Kafka like kubeless or jHipster is hidden „under the hood“. So you could say that there is no effort for setup and configuration. Well, yes! And if you can give these things in hands of experts that can support you or if you are the expert and this is your job in this project: Perfect! But if not, this is fine as long things are running. But if your Kafka fails in production or you need to investigate issues and dive deeper into your Kafka setup: Things become complicated easily. Google is only partly your friend since the docs and issues you’ll find in the web are also a mess including the documentation of Confluent and Apache Kafka itself.

So my best advice is to keep things as easy as possible. Now it is up to you to decide whether Kafka looks easy or not.