In earlier posts (1, 2), we’ve described the different use cases for Apache Kafka and the supported method for Kafka authentication based on the well known Ansible playbooks for the Confluent Platform. Since this deployment method is hardly flavored by Confluent and it deploys the Confluent Server instead of Apache Kafka by default, it is maybe a better choice, to deploy Apache Kafka from scratch.


In the following we step through a manual deployment of a 3-node Apache Kafka Cluster using KRaft and mTLS.

KRaft (Kafka Raft) is a feature of Apache Kafka, a popular distributed streaming platform. KRaft is essentially a mode of operation in Kafka where it uses its own consensus mechanism based on the Raft algorithm, instead of relying on Apache ZooKeeper for managing its cluster metadata.

Mutual TLS (mTLS) authentication in Apache Kafka is a security feature that ensures that both the client and the server authenticate each other’s identities using Transport Layer Security (TLS) certificates. In this post, we build a self-sign Certificate Authority (CA) using Cloudflare’s PKI and TLS Toolkit CFSSL to create mTLS certificates to introduce authentication and authorization between Kafka clients, Kafka broker and the KRaft controller.

Certificate Authority

In order to create a CA, we install the CFSSL toolkit from https://github.com/cloudflare/cfssl as follows:

export VERSION=1.6.4 # See https://github.com/cloudflare/cfssl/releases
mkdir -p ~/.local/bin
curl -L https://github.com/cloudflare/cfssl/releases/download/v${VERSION}/cfssl_${VERSION}_linux_amd64 -o ~/.local/bin/cfssl
curl -L https://github.com/cloudflare/cfssl/releases/download/v${VERSION}/cfssljson_${VERSION}_linux_amd64 -o ~/.local/bin/cfssl-json
chmod +x ~/.local/bin/cfssl*

As a best practice, we recommend creating a multi-tiered CA structure, including a root certificate and an intermediate certificate. While the root certificate can be kept offline and used only to sign or recoke intermediate certificates, the latter can be used for day-to-day operations i.e. to create our client-server certificates for mTLS. If an intermediate certificate has a security issue, it’s easier and less disruptive to revoke an intermediate certificate than a root certificate.

So let’s create the following configs for certificate creation and signing (see https://blog.cloudflare.com/introducing-cfssl for details about these configs) and create the root and intermediate certificate as follows:

Root Certificate

cat root-ca-csr.json
{
    "CN": "Kafka Root CA",
    "names": [{
        "C": "DE",
        "L": "Karlsruhe",
        "O": "awesome information technology GmbH",
        "OU": "IT",
        "ST": "Baden-Wuerttemberg"
    }],
    "key": {
        "algo": "rsa",
        "size": 4096
    }
    ,
    "ca": {
        "expiry": "87600h"
    }
}
cat root-ca-config.json
{
  "signing": {
    "profiles": {
      "intermediate_ca":
      {
          "ca_constraint": {
              "is_ca": true,
              "max_path_len": 0,
              "max_path_len_zero": true
          },
          "expiry": "87600h",
          "usages": [
              "signing",
              "digital signature",
              "key encipherment",
              "cert sign",
              "crl sign",
              "server auth",
              "client auth"
          ]
      }
    }
  }
}

Generate the root certificate in out/root-ca.pem as:

mkdir out
cfssl gencert -initca root-ca-csr.json | cfssl-json -bare out/root-ca

Intermediate Certificate

cat intermediate-ca-csr.json
{
    "CN": "Kafka Intermediate CA",
    "names": [{
        "C": "DE",
        "L": "Karlsruhe",
        "O": "awesome information technology GmbH",
        "OU": "IT",
        "ST": "Baden-Wuerttemberg"
    }],
    "key": {
        "algo": "rsa",
        "size": 4096
    }
    ,
    "ca": {
        "expiry": "87600h"
    }
}
cat intermediate-ca-config.json
{
  "signing": {
    "profiles": {
      "client":
      {
          "expiry": "87600h",
          "usages": [
              "signing",
              "key encipherment",
              "client auth"
          ]
      },
      "server":
      {
          "expiry": "87600h",
          "usages": [
              "signing",
              "key encipherment",
              "server auth",
              "client auth"
          ]
      }
    }
  }
}

Generate and sign the intermediate certificate in out/intermediate-ca.pem as:

mkdir out
cfssl gencert -initca intermediate-ca-csr.json | cfssl-json -bare out/intermediate-ca
cfssl sign -ca out/root-ca.pem -ca-key out/root-ca-key.pem -config root-ca-config.json -profile intermediate_ca out/intermediate-ca.csr 2> intermediate-ca-sign.log | cfssl-json -bare out/intermediate-ca

As it’s recommended to always include the full certificate chain, we create intermediate-full-chain.pem including both, the root and the intermediate certificate as follows:

cat out/root-ca.pem out/intermediate-ca.pem > out/intermediate-full-chain.pem

Kafka Server Certificates

In the next step, we create and sign server certificates for all Kafka nodes (i.e. kafka-1, kafka-2 and kafka-3). Make sure that you include the node name as common name (CN) and other hostnames that could be used to communicate in between your Kafka nodes in „Subject Alternative Name“ (SAN) of your certificate. In the CFSSL config, you need to add these hostnames to the hosts property.

So let’s create the following CSR config for the kafka-1 host:

cat kafka-1.csr.json
{
    "CN": "kafka-1",
    "names": [{
        "C": "DE",
        "L": "Karlsruhe",
        "O": "awesome information technology GmbH",
        "OU": "IT",
        "ST": "Baden-Wuerttemberg"
    }],
    "key": {
        "algo": "rsa",
        "size": 4096
    },
    "hosts": [
        "kafka-1.example.com","localhost"    
    ]
}

Now let’s create a server certificate for kafka-1:

cfssl gencert -ca out/intermediate-ca.pem -ca-key out/intermediate-ca-key.pem -config intermediate-ca-config.json -profile server kafka-1-csr.json | cfssl-json -bare out/kafka-1

This must be repeated for all Kafka nodes i.e. kafka-1, kafka-2 and kafka-3.

Kafka Client Certificates

Now we can create client certificates for the Kafka clients. For testing purpose, we will create two users: admin-1 and client-1. While we grant full access for the administrative user, the client will get limited access for a specific topic.

Let’s create the CSR config for admin-1. Make sure that you include the username as common name (CN) in the certificate:

cat admin-1-csr.json
{
    "CN": "admin-1",
    "names": [{
        "C": "DE",
        "L": "Karlsruhe",
        "O": "awesome information technology GmbH",
        "OU": "IT",
        "ST": "Baden-Wuerttemberg"
    }],
    "key": {
        "algo": "rsa",
        "size": 4096
    }
        ,
    "hosts": []
}

Create a client certificate for admin-1:

cfssl gencert -ca out/intermediate-ca.pem -ca-key out/intermediate-ca-key.pem -config intermediate-ca-config.json -profile client admin-1-csr.json | cfssl-json -bare out/admin-1

This must be repeated for all users i.e. admin-1 and client-1.

Java Trust- & KeyStores

Because it is Java and to increase the level of complexity (because it’s Java), we need to create Java Trust- and KeyStores for our CA certificate and for each of our certificates.

TrustStore

Let’s start with the TrustStore including a full-chain certificate of your authority as follows:

keytool -import -noprompt -keystore out/intermediate-full-chain.truststore.jks -alias intermediate-ca -trustcacerts -storepass a-very-secret-secret out/intermediate-full-chain.pem

KeyStores

We need to create a KeyStore for each certificate. To do so, we need to create a PKCS#12 archive first, including the certificate and its key as well as the full-chain certificate of the authority. So for kafka-1, we do as follows:

openssl pkcs12 -export -inkey out/kafka-1-key.pem -in out/kafka-1.pem -certfile out/intermediate-full-chain.pem -passout pass: -out out/kafka-1.p12

Note that you could pass a password in -passout for this archive. But the Java KeyStore does also support a password protection, so we skip using a password for the PKCS#12 archive.

Now we can create the Java KeyStore as follows:

keytool -importkeystore -noprompt -srckeystore out/kafka-1.p12 -srcstoretype pkcs12 -srcstorepass "" -destkeystore out/kafka-1.keystore.jks -deststorepass a-very-secret-secret 

This must be repeated for all server certificates i.e. kafka-1, kafka-2, kafka-3 and for all client certificates i.e. admin-1 and client-1.

Kafka & KRaft Setup

After we’ve generated the certificate authority and the Key- & TrustStores including the required server and client certificates, we can proceed to setup the Kafka cluster nodes. Since following must be deployed on all Kafka nodes, it is recommended to use a configuration management tool like SaltStack or Ansible.

For demonstration purpose, we describe a manual deployment for the Kafka node kafka-1 on a Ubuntu 22.04 LTS. Further we assume that all nodes can reach each other using kafka-1, kafka-2 and kafka-3 i.e. by create corresponding DNS entries or adding the hosts to /etc/hosts.

Please keep in mind that the following steps must be repeated on all cluster nodes accordingly.

Installation

First of all we install the default Java runtime environment and the Apache Kafka package from https://kafka.apache.org/downloads as follows:

apt update
apt install default-jre
curl -L https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz -o /tmp/kafka.tgz
cd /opt
tar -xvzf /tmp/kafka.tgz

Note, that we will run Kafka not as root but as dedicated user kafka, so let’s create the user as:

adduser --system --group kafka

Let’s create the KRaft log directories (this is where Kafka stores the data) and grant access for the kafka user:

mkdir -p /var/kafka/kraft-combined-logs
chown kafka:kafka /var/kafka/kraft-combined-logs

Configuration

Now its time to add the previously generated Trust- & KeyStores i.e. to /etc/ssl/kafka. The result for the Kafka node kafka-1 should look like this:

ls /etc/ssl/kafka/
kafka-1.keystore.jks  kafka-1.truststore.jks

Note that we renamed intermediate-full-chain.truststore.jks to kafka-1.truststore.jks.

Since we want to use KRaft, the relevant configuration files are located in /opt/kafka_*/config/kraft. To configure the Kafka server (i.e. Broker and Controller), we need to modify the following parts of server.properties:

Server Basics

# The node id associated with this instance's roles. Just increment this for each node
node.id=0

# The connect string for the controller quorum: <node.id>@<host>:<port>
controller.quorum.voters=0@kafka-1:9093,1@kafka-2:9093,2@kafka-3:9093

Socket Server Settings

# The address the socket server listens on.
listeners=SSL://kafka-1:9092,CONTROLLER://kafka-1:9093

# Name of listener used for communication between brokers.
inter.broker.listener.name=SSL

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=SSL://kafka-1:9092

# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:SSL,SSL:SSL

Further include the path to the Trust- and Keystores and their secret as follows:

ssl.keystore.location=/etc/ssl/kafka/kafka-1.keystore.jks
ssl.keystore.password=a-very-secret-secret
ssl.truststore.location=/etc/ssl/kafka/kafka-1.truststore.jks
ssl.truststore.password=a-very-secret-secret

Enforce clients that connect to the Kafka brokers must provide a valid client certificate for authentifcation:

ssl.client.auth=required

Configure the Standard Authorizer to enable ACL based authorization. Further grant super user permission to the Kafka nodes and the admin-1 user:

authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
super.users=User:kafka-1;User:kafka-2;User:kafka-3;User:admin-1

As an additional hardening, it might be clever to skip allowing everyone if no ACLs were found:

allow.everyone.if.no.acl.found=False

Further you need to map the common name (CN) from the mTLS certificates to the Kafka username using ssl.principal.mapping.rules so that you can use the common name i.e. client-1 in your ACLs.

Log Basics

Make sure that you set the correct log dir path in log.dirs:

# A comma separated list of directories under which to store log files
log.dirs=/var/kafka/kraft-combined-logs

Note that these steps must be repeated on each Kafka nodes i.e. kafka-1, kafka-2 and kafka-3.

Starting Kafka

After we finished the configuration, we can start our Kafka cluster. To do so, we need to generate a unique cluster ID as follows:

/opt/kafka_*/bin/kafka-storage.sh random-uuid
CAXzjOdiS3mESGUi-yFxNQ

Now we need to format the Kafka storage and start the Kafka server on each node:

sudo -u kafka /opt/kafka_*/bin/kafka-storage.sh format -t "CAXzjOdiS3mESGUi-yFxNQ" -c /opt/kafka_*/config/kraft/server.properties
sudo -u kafka /opt/kafka_*/bin/kafka-server-start.sh /opt/kafka_*/config/kraft/server.properties

Note that it is recommended to introduce a systemd service unit configuration so you can enable a permanent Kafka service. Further you might to tweak log4j.properties to redirect all your Kafka logs to systemd-journald.

Testing Kafka

To test Kafka, we create the following user config referencing the client certificate for client-1 that we’ve created earlier:

cat client-1.properties

security.protocol=SSL
ssl.keystore.location=client-1.keystore.jks
ssl.keystore.password=a-very-secret-secret
ssl.truststore.location=truststore.jks
ssl.truststore.password=a-very-secret-secret

Let’s try to create a topic without authentication:

./bin/kafka-topics.sh --bootstrap-server kafka-1:9092 --create --topic my-test
Error while executing topic command : The AdminClient thread has exited. Call: createTopics
[2024-01-31 17:36:52,394] ERROR org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: createTopics
 (kafka.admin.TopicCommand$)
[2024-01-31 17:36:52,395] ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1': (org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space
    at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61)
    at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348)
    at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402)
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:571)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1381)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1312)
    at java.base/java.lang.Thread.run(Thread.java:829)

Well, this is the common way of Kafka saying that we are not authenticated and thus topic creation is not possible. So works as „expected“!

Now let’s try to create a topic with the user client-1 by passing the --command-config including the client certificates:

$ ./bin/kafka-topics.sh --bootstrap-server kafka-1:9092 --command-config client-1.properties --create --topic my-test
Error while executing topic command : Authorization failed.
[2024-01-31 17:35:54,185] ERROR org.apache.kafka.common.errors.TopicAuthorizationException: Authorization failed.

It seems that the authentication worked and Kafka correctly denies topic creation since client-1 does not have permission for this operation.

So let’s create a topic and grant consumer permission for client-1 using the user admin-1 which we’ve defined as a super user:

./bin/kafka-topics.sh --bootstrap-server kafka-1:9092 --command-config admin-1.properties --create --topic my-test
Created topic my-test.
./bin/kafka-acls.sh --bootstrap-server kafka-1:9092 --command-config admin-1.properties --add --consumer --allow-principal User:client-1 --group '*' --topic my-test

Let’s use client-1 to consume messages from the topic my-test on node kafka-1 while we use admin-1 to produce message for the topic on another node i.e. kafka-2:

./bin/kafka-console-consumer.sh --bootstrap-server kafka-1:9092 --consumer.config client-1.properties --topic my-test

In another terminal, start the producer as follows:

cowsay "Hello World" | ./bin/kafka-console-producer.sh --bootstrap-server kafka-3:9092 --producer.config admin-1.properties --topic my-test

We should now see the message on the consumer console:

./bin/kafka-console-consumer.sh --bootstrap-server kafka-1:9092 --consumer.config client-1.properties --topic my-test
 _____________
< Hello World >
 -------------
           ^__^
           (oo)_______
            (__)       )/
                ||----w |
                ||     ||

Conclusion

We setup a 3-node Apache Kafka cluster using KRaft and mTLS for authentication and authorization. We’ve created a super user admin-1 and a user client-1 for which we’ve granted consumer access for the topic my-test. We’ve shown that anonymous and unauthorized access was denied while the user client-1 was able to consume messages from my-test.

Please keep in mind, that this describes a manual deployment for demonstration purposes. Despite the fact that we use mTLS to secure Kafka in this deployment, this is not sufficient for use in production. For example, you should consider preventing external access to the Kafka KRaft Controller via a firewall. You should also think about backup and monitoring to ensure data security and identify problems at an early stage.