
Kafka Security - First Steps
Kafka Security - First Steps
Do you really need Apache Kafka?
Think about this carefully before you start using Apache Kafka. Don't get me wrong: Apache Kafka is great in doing stuff it is intended to do but for some use-cases Apache Kafka tend to be a overkill. Going into this topic is outside the scope. I've found a good article about this topic: Apache Kafka Use Cases: When To Use It & When Not To. To sum it up in a few words:
Use Kafka for:
- Real-time data processing: Transmitting data from producers to data handlers and then to data storages.
- Application activity tracking: This is the use case Kafka was originally developed for, to be used in LinkedIn: Publishing events in applications to dedicated Kafka topics.
- Logging and/or monitoring: Publishing logs into Kafka topics.
Don't use Kafka iff:
- You need to process only a small amount of messages per day. Think about using traditional message queues like RabbitMQ.
- You need to apply on-the-fly data transformations or doing ETL jobs (extract, transform, load).
- You need a task queue: Again, RabbitMQ is designed for beeing a task queue.
- You need a database: There is no need for an explanation: Kafka is not designed for persistent data storage.
What do we have?
There are several ways how to setup Apache Kafka but the easiest way to install Apache Kafka is using Ansible and the official Kafka playjobs from Confluent: The Easiest Way to Install Apache Kafka and Confluent Platform – Using Ansible. This can also be used for production i.e. to setup a multi-node cluster including 3 nodes with an Kafka Broker and a Apache ZooKeeper instance on each node. The Kafka Broker is doing the work i.e. consuming and producing messages based on using topics. The Apache ZooKeeper instances are used for maintaining configuration information, naming, providing distributed synchronization, and providing group services. So you can say, that the Apache ZooKeeper is used to manage our Kafka Cluster.
Now the first challenge: You need authN/authZ and SSL encryption for all connections:
- Kafka clients (i.e. your software) are connecting to the Kafka Brokers.
- Each Kafka Broker is connecting to the other brokers.
- Each Kafka Broker is connecting to the ZooKeeper instances.
Each of these connections need SSL encryption, authentication and authorization. Since authorization is done via ACLs the focus of this article is on SSL encryption and authentication.
Kafka Broker Authentication
There are several methods to authenticate Kafka clients against Kafka Brokers. Note that for inter-node communication a Kafka Broker can also be a Kafka client that is connecting to another Kafka Broker.
Mutual TLS (mTLS)
Each Kafka Broker and each Kafka client gets a SSL certificate that supports client authentication. This is easy and bullet-proofed but you need a client certificate for each Kafka Broker and each Kafka client and this is out of scope what e.g. Let's Encrypt offers. So you need your own PKI infrastructure and probably an own certificate authority to do so.
SASL PLAIN
In SASL PLAIN, user and passwords are stored in each Kafka Broker using SASL PLAIN (not to mix up with no SSL encryption being called PLAINTEXT). So SSL encryption is a must or this doesn't make sense at all. Further think about what will happen if you add or change a user or a password: You must update and restart each Kafka Broker for each user or password change. So this is still simple to setup but this does not scale well with Kafka Brokers and clients.
SASL SCRAM
Salted Challenge Response Authentication Mechanism (SCRAM) is a family password-based challenge–response authentication mechanisms providing authentication of a user (here our Kafka client or Kafka Broker) to a server (Kafka Broker). If enabled, salted usernames and passwords are stored encrypted in Apache ZooKeeper. So scaling clients without restarting the Kafka Brokers should work. Further no passwords will be send unencrypted over the wire but SSL encryption is still required to prevent interception of SCRAM exchange.
SASL Delegation Token
Delegation based authentication can be used to complement existing SASL/SSL methods. Typically you authenticate with a Kafka cluster using SSL or SASL to obtain a delegation token. These tokens can be used by Kafka clients to securely connect to the Kafka Brokers. To do so, a primary secret key must be configured accross all Kafka Brokers. The generated delegation token are stored in Apache ZooKeeper.
For a simple and secure approach, there are two main drawbacks: You still need a SASL/SSL secured channel for token creation and you need to implement a token refresh in your client software.
SASL GSSAPI
This is thought to be used in corporate environments that are using Kerberos (i.e. Active Directory): Kafka Brokers and Kafka clients will use Kerberos for authentication. So you need a principal for each Kafka Broker and each Kafka client that will connect to the Kafka cluster. This might be the most secure method but you need a Kerberos server and your clients require to login to Kerberos and use this Kerberos ticket to authenticating to the Kafka Broker.
Apache ZooKeeper Authentication
Apache ZooKeeper is used to manage the Kafka cluster i.e. Kafka topics and their configuration, a list of Kafka Brokers that are part of the cluster and ACLs used for authoriation. To sum up: If you have access to the Apache ZooKeeper, you have access to the Kafka Cluster including all data. So you should consider to properly secure the Apache ZooKeeper as well for production use.
Mutual SSL (mTLS)
As of version 3.5.x, ZooKeeper supports mutual SSL client authentication. As of version 2.5 Kafka began shipping with ZooKeeper versions that support mTLS authentication. You can now enable ZooKeeper authentication with or without SASL authentication. Prior to the 2.5 release, only the DIGEST-MD5 and GSSAPI SASL mechanisms were supported.
Using MTLS is simple and bullet-proofed but similar to Kafka Broker authentication using mTLS, all other Zookeeper instances, Kafka Brokers and/or CLI Tools need a SSL certificate with client authentication enabled. Remember that this is out of scope from what Let's Encrypt offers. Further all SSL certificates must share the same Distinguished Name (DN) since this is included in the Zookeeper ACL that allows access to the Zookeeper node. So if you need to use different identities (i.e. different DNs) you should use MTLS and SASL authentication. Here you can choose between the two supported SASL Digest MD5 or SASL GSSAPI using Kerberos.
SASL Digest MD5
When using SASL Digest MD5 the username and password are stored in plaintext in Apache ZooKeeper and on the ZooKeeper clients (i.e. Kafka Brokers or other ZooKeeper instances). As the name says this sends MD5 hashed passwords over the wire. Further there are several security concerns regarding MD5 hashes: For production use, you should enable TLS encryption to secure MD5 hashed passwords. Further only the brokers will need access to ZooKeeper, so you should limit the access to the ZooKeeper nodes by a firewall.
SASL GSSAPI
Next to SASL Digest MD5 you can also use Kerberos for ZooKeeper authentication. This is the most secure way for a bullet-proofed authentication in an corporate environment: All Kafka and ZooKeeper clients including Kafka Broker and Apache Zookeeper instances have their own Kerberos principals e.g. from a corperate Active Directory and are using these Kerberos logins for authentication among each other.
Hands On
Let's try to setup a kind-of-production-ready Kafka cluster for testing purpose using Ansible Playbooks from Confluent. We use 3 nodes as follows:
| Hostname | Purpose |
|---|---|
| kafka-1 | ZooKeeper, Broker, Schema Registry |
| kafka-2 | ZooKeeper, Broker |
| kafka-3 | ZooKeeper, Broker |
Support operating systems for the Confluent Playbooks can be found here. If you plan to go into production, you should consider the hardware requirements here.
Prepare
Let's checkout the Confluent Playbook and create our hosts.yml file:
$ git clone https://github.com/confluentinc/cp-ansible
$ cp cp-ansible/hosts_example.yml cp-ansible/hosts.yml
/etc/hosts should be fine.Further update hosts.yml according to your needs. For now, we just enable a ZooKeeper and Kafka Broker instance on each node and a Schema Registry on kafka-1:
---
all:
vars:
ansible_connection: ssh
ansible_user: root
ansible_become: true
ansible_ssh_private_key_file: ~/.ssh/id_rsa
# Enable SSL
ssl_enabled: True
# Brokers: Enable SASL SCRAM
# https://docs.confluent.io/current/installation/cp-ansible/ansible-authenticate.html#configure-sasl-scram-authentication
sasl_protocol: scram
# ZooKeeper: Enable SASL DIGEST-MD5
# https://docs.confluent.io/current/installation/cp-ansible/ansible-authenticate.html
zookeeper_sasl_protocol: digest
zookeeper:
hosts:
kafka-1:
kafka-2:
kafka-3:
kafka_broker:
hosts:
kafka-1:
kafka-2:
kafka-3:
schema_registry:
hosts:
kafka-1:
As you can see, we further enable SSL encryption using self-signed certificates, SASL SCRAM for Kafka Brokers and SASL DIGEST-MD5 for the ZooKeepers. Next to Kerberos, this seems to be suitable mechanisms to secure a Kafka cluster.
Install
To run the playbooks you can use all.yml as follows:
ansible-playbook -i hosts.yml all.yml
For updates or testing-purpose you can run the steps one-by-one as follows:
- Generate the certificate authority if TLS encryption is enabled:
ansible-playbook -i hosts.yml all.yml --tags=certificate_authority - Install ZooKeeper and Kafka:
ansible-playbook -i hosts.yml all.yml --tags=zookeeper,kafka_broker
Testing
Let's try to create a topic with a given user and password from outside the cluster. For quick and dirty testing purpose we can use the confluentinc/cp-kafka docker image which already includes all tools we need.
First we need a admin.properties file including the security parameters and user credentials. Let's create and change into directory kafka where we will store the relevant files for testing-purpose:
mkdir kafka
cd kafkaLet's create admin.properties as follows:
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret";
ssl.truststore.location=truststore.jks
ssl.truststore.password=confluenttruststorepassSecond we need the Java truststore and keystore including SSL certificates to connect to the cluster. We shamelessly use on of the stores that Ansible has created on the server:
scp root@kafka-1:/var/ssl/private/kafka_broker.truststore.jks truststore.jks
scp root@kafka-1:/var/ssl/private/kafka_broker.keystore.jks keystore.jksadmin and admin-secret as well as confluenttruststorepass. Make sure to change these passwords for production use.Now let's run the docker image and change to our mounted working directory /home/appuser/kafka:
docker run --entrypoint=/bin/bash --user 0 --rm -t -i -v /etc/hosts:/etc/hosts -v $(pwd)/kafka:/home/appuser/kafka confluentinc/cp-kafka:6.0.0
cd /home/appuser/kafka
Testing Authentication
Let's try to list existing topics to test authentication. First let's try to use the prepared admin.properties as follows:
kafka-topics --bootstrap-server kafka-1:9091 --command-config admin.properties --list
_confluent-license
_confluent-telemetry-metrics
This seems to work, so let's try to connect with an invalid password and see if it fails:
sed 's/admin-secret/admin-secret-123/g' admin.properties > admin-invalid-password.properties
kafka-topics --bootstrap-server kafka-1:9091 --command-config admin-invalid-password.properties --list
[2020-11-17 17:04:17,243] ERROR [AdminClient clientId=adminclient-1] Connection to node -1 (kafka-1/168.119.186.234:9091) failed authentication due to: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512 (org.apache.kafka.clients.NetworkClient)
[2020-11-17 17:04:17,244] WARN [AdminClient clientId=adminclient-1] Metadata update failed due to authentication error (org.apache.kafka.clients.admin.internals.AdminMetadataManager)
org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512
Error while executing topic command : org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512
[2020-11-17 17:04:17,247] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.TopicCommand$AdminClientTopicService.getTopics(TopicCommand.scala:352)
at kafka.admin.TopicCommand$AdminClientTopicService.listTopics(TopicCommand.scala:260)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512
(kafka.admin.TopicCommand$)
This fails with Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512, so authentication seems to work.
Testing Authorization
Apache Kafka supports a pluggable authorizer. The default authorizer implementation coming with Apache Kafka is called AclAuthorizer and is using Apache ZooKeeper to store access control lists (ACLs).
Note that there are Kafka ACLs for controlling access to the Kafka Broker and there are ZooKeeper ACLs that control access to the ZooKeeper nodes. By defaults no ZooKeeper ACLs are created and thus the information in ZooKeeper will be writeable by everyone. So if you go to production, make sure that zookeeper.set.acl is enabled which is true by default if ZooKeeper is configured to use either Digest or Kerberos security:
zk_acls:
enabled: "{{ zookeeper_sasl_protocol in ['kerberos', 'digest'] }}"
properties:
zookeeper.set.acl: 'true'
Let's start and try to list Kafka ACLs using the Docker container and the admin admin.properites from above:
kafka-acls --bootstrap-server kafka-1:9091 --command-config admin.properties --list
Error while executing ACL command: org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is configured on the broker
No Kafka ACLs means everyone has access to everything. So let's enable ACLs as described here by changing hosts.yml as follows:
---
all:
vars:
ansible_connection: ssh
ansible_user: root
ansible_become: true
ansible_ssh_private_key_file: ~/.ssh/id_rsa
# Enable SSL
ssl_enabled: True
# Enable ACLs
kafka_broker_custom_properties:
authorizer.class.name: kafka.security.authorizer.AclAuthorizer
super.users: User:admin;User:schema_registry
# Implicitly set by "zookeeper_sasl_protocol: digest"
#zookeeper_custom_properties:
# zookeeper.set.acl: True
#...
As you see, we are adding User:admin and User:schema_registry to the list of super.users since by default, no one is allowed to execute any action and so without this, the Kafka Broker wouldn't be allowed to create required topics for initialization.
Let's re-reploy the playbook:
ansible-playbook -i hosts.yml all.yml
Let's try to list Kafka ACLs with enabled AclAuthorizer:
kafka-acls --bootstrap-server kafka-1:9091 --command-config admin.properties --list
No more error and we see the ACLs we have: None. So let's create a new user user1 and ACLs to allow the user to read and write the topic test.
kafka-configs --zookeeper kafka-1:2181 --alter --add-config 'SCRAM-SHA-512=[password=test123]' --entity-type users --entity-name user1
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Completed updating config for entity: user-principal 'user1'.
Wait a minute ... did we just create a user without authentication? Well let's dive into Zookeepers ACL's and see who can write the config i.e. and create users:
zookeeper-shell kafka-1:2181 getAcl /config/users
Connecting to kafka-1:2181
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
'world,'anyone
: cdrwa
This confirms that by default everyone is allowed to change ZooKeeper's config and thus by default everyone is allowed to access the Kafka cluster. We should change this for production use and limit the access to authorized users only. This is where zookeeper-security-migration comes into play: As describe here, this tools sets the required the ZooKeeper ACLs to secure ZooKeeper accordingly. A brief description about ZooKeeper ACLs can be found here: Basically an ACL has three parts:
- The first part is the authentication type. For example,
worldindicates all authentication types andsaslindicates the kerberos authentication type. - The second part is the account. For example,
anyoneindicates any user. - The third part is permission. For example,
cdrwaindicates all permissions.
Permission types are as follows:
| Description | Name | Details |
|---|---|---|
| Create permission | create(c) | Users with this permission can create znodes in the current znode. |
| Delete permission | delete(d) | Users with this permission can delete the current znode. |
| Read permission | read(r) | Users with this permission can obtain data of the current znode and list all the child znodes of the current znode. |
| Write permission | write(w) | Users with this permission can write data to the current znode and its child znodes. |
| Administration permission | admin(a) | Users with this permission can set permission for the current znode. |
So let's try to secure our ZooKeeper by running zookeeper-security-migration on one of our Kafka nodes as follows:
ssh root@kafka-1
KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.conf" zookeeper-security-migration --zk-tls-config-file zookeeper.properties --zookeeper.acl secure
Now the access to the znodes should be limited to authenticated users only. Let's create a zookeeper_client_jaas.conf including the user we want to use for accessing the ZooKeeper shell.
Back in our Docker container, create zookeeper_client_jaas.conf as follows:
cat zookeeper_jaas_client.conf
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="admin-secret";
};
Using the JAAS file, we can connect and authorize with ZooKeeper as follows:
KAFKA_OPTS="-Djava.security.auth.login.config=zookeeper_jaas_client.conf" zookeeper-shell kafka-1:2181
Connecting to kafka-1:2181
Welcome to ZooKeeper!
Let's check the ACLs that were created by zookeeper-security-migration i.e. for /config/users:
KAFKA_OPTS="-Djava.security.auth.login.config=zookeeper_jaas_client.conf" zookeeper-shell kafka-1:2181 getAcl /config/users
Connecting to kafka-1:2181
'sasl,'admin
: cdrwa
We can see, that only SASL authenticated user admin is allowed to cdrwa in path /config/users.
So let's try again to create a user without authentication:
kafka-configs --zookeeper kafka-1:2181 --alter --add-config 'SCRAM-SHA-512=[password=test123]' --entity-type users --entity-name user1
Error while executing config command with args '--zookeeper kafka-1:2181 --alter --add-config SCRAM-SHA-512=[password=test123] --entity-type users --entity-name user2'
org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /config/users/user1
And now with enabled authentication using the created JAAS file:
KAFKA_OPTS="-Djava.security.auth.login.config=zookeeper_jaas_client.conf" kafka-configs --zookeeper kafka-1:2181 --alter --add-config 'SCRAM-SHA-512=[password=test123]' --entity-type users --entity-name user1
Completed updating config for entity: user-principal 'user1'.
We can list the users in ZooKeeper using the zookeeper-shell as follows:
KAFKA_OPTS="-Djava.security.auth.login.config=zookeeper_jaas_client.conf" zookeeper-shell kafka-1:2181 ls /config/users
Connecting to kafka-1:2181
[admin, client, control_center, kafka_connect, kafka_rest, ksql, schema_registry, user1]
After we've created a new user user1 let's create a topic test using admin.properties from above:
kafka-topics --bootstrap-server kafka-1:9092 --create --topic test --command-config admin.properties
Created topic test.
java.lang.OutOfMemoryError: Java heap space it doesn't necessarily mean that it's a memory problem. Some Kafka tools like kafka-topics have a known issue (or a feature?) that falsely report a OOM error if a client is using the SSL port without setting setting the security protocol i.e. by using --command-config admin.properties. Also see hereNext let's check the ACLs for the topic test:
kafka-acls --bootstrap-server kafka-1:9091 --command-config admin.properties --list --topic test
Create a new ACL with read/write access for the user user1 to the topic test as follows:
kafka-acls --bootstrap-server kafka-1:9091 --command-config admin.properties --add --allow-principal User:user1 --operation read --operation write --topic test
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
(principal=User:user1, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:user1, host=*, operation=READ, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
(principal=User:user1, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:user1, host=*, operation=READ, permissionType=ALLOW)
To test our topic, we use the kafka-console-producer and kafka-console-consumer scripts. To do so, we need to create a property file i.e. user.properties for the producer and consumer:
security.protocol=SASL_SSL
ssl.truststore.location=truststore.jks
ssl.truststore.password=confluenttruststorepass
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="test123";
We can use this to start the consumer as follows:
kafka-console-consumer --bootstrap-server=kafka-1:9092 --topic=test --consumer.config user.properties
[2020-11-30 13:55:25,295] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: console-consumer-64923
Processed a total of 0 messages
Well of course, this can't be the end. So we get another authorization error: Not authorized to access group: console-consumer-64923. As briefly mentioned here, we need to authorize the user user1 as producer and consumer for the topic test as follows:
kafka-acls --bootstrap-server kafka-1:9091 --command-config admin.properties --add --allow-principal User:user1 --producer --topic test
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
(principal=User:user1, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:user1, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:user1, host=*, operation=CREATE, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
(principal=User:user1, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:user1, host=*, operation=CREATE, permissionType=ALLOW)
(principal=User:user1, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:user1, host=*, operation=READ, permissionType=ALLOW)
--consumer and --produver are convenience option to add/remove ACLs for producer role. This will generate ACLs that allows WRITE, DESCRIBE and CREATE on topic.Note that we have to specify the consumer group as --group when using ACLs for consumer roles. So let's create the consumer ACLs for the group console-consumer as follows:
kafka-acls --bootstrap-server kafka-1:9091 --command-config admin.properties --add --allow-principal User:user1 --consumer --group console-consumer --topic test
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
(principal=User:user1, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:user1, host=*, operation=READ, permissionType=ALLOW)
Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=console-consumer, patternType=LITERAL)`:
(principal=User:user1, host=*, operation=READ, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=console-consumer, patternType=LITERAL)`:
(principal=User:user1, host=*, operation=READ, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
(principal=User:user1, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:user1, host=*, operation=READ, permissionType=ALLOW)
(principal=User:user1, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:user1, host=*, operation=CREATE, permissionType=ALLOW)
So let's try again to consume a topic:
kafka-console-consumer --bootstrap-server=kafka-1:9092 --topic=test --consumer.config user.properties --group console.consumer
And finally start the producer as follows:
kafka-console-producer --broker-list kafka-1:9092 --topic=test --producer.config user.properties
Type some words into the producer command line. After a short time, they should appear in the consumer console. What a journey!
Restart
In some (many) cases you might have to reset for testing instance and start from scratch. Be carefully if you play around with the ZooKeeper ACLs: I have managed to exclude myself more than once. If you messed it up (and believe me, you will), you can run the following commands to reset your Kafka nodes and to give Ansible a chance to start from scratch:
systemctl stop confluent-server confluent-zookeeper confluent-schema-registry
rm -rf /var/lib/kafka /var/lib/zookeeper /var/lib/confluent /etc/kafka* /etc/confluent-* /var/ssl/private
Conclusion
If you followed my steps above, there is not much more to say. If you managed to setup and secure this 3 node Kafka cluster, you can easily estimate the effort to maintain a Kafka in production. Compare this to the benefits of Kafka and carefuly think about my first question: "Do you really need Apache Kafka?".
Of course there are a lot of projects where Kafka like kubeless or jHipster is hidden "under the hood". So you could say that there is no effort for setup and configuration. Well, yes! And if you can give these things in hands of experts that can support you or if you are the expert and this is your job in this project: Perfect! But if not, this is fine as long things are running. But if your Kafka fails in production or you need to investigate issues and dive deeper into your Kafka setup: Things become complicated easily. Google is only partly your friend since the docs and issues you'll find in the web are also a mess including the documentation of Confluent and Apache Kafka itself.
So my best advice is to keep things as easy as possible. Now it is up to you to decide whether Kafka looks easy or not.
