In my last post I wrote about first steps and lessions learned when setting up Apache Kafka with encryption, SASL SCRAM/Digest authentication and ACL authorization using Confluent Platform. This secures Kafka using SASL SCRAM between clients and Kafka Brokers and SASL MD5 digest between Kafka Brokers and ZooKeeper. This approach has some drawbacks i.e. the passwords must be stored on the clients and ZooKeeper is using MD5 hashes for passwords on the wire. So we try another approach by using Mutual TLS (mTLS) only, which seems a bit easier and which also seems to be suitable for a corporate environment.


Running Confluent Ansible Playbook

As described in Kafka Security – First Steps we are using the Confluent Ansible Playbook to deploy our Kafka. We are using the following hosts.yml file:

all:
  vars:
    ansible_connection: ssh
    ansible_user: root
    ansible_become: true    
    
    # Use kafka instead of confluent-server
    # https://docs.confluent.io/ansible/current/ansible-configure.html#deploy-cs-or-ak
    confluent_server_enabled: false

    # https://docs.confluent.io/ansible/current/ansible-authenticate.html#configure-mtls-authentication
    ssl_enabled: true
    ssl_mutual_auth_enabled: true

    # https://docs.confluent.io/ansible/current/ansible-authenticate.html#configure-mtls
    zookeeper_ssl_enabled: true
    zookeeper_ssl_mutual_auth_enabled: true

    # Enable ACLs
    # https://kafka.apache.org/documentation/#security_authz
    kafka_broker_custom_properties:
      authorizer.class.name: kafka.security.authorizer.AclAuthorizer
      super.users: User:kafka_broker;User:schema_registry;User:zookeeper;User:kafka
      ssl.principal.mapping.rules: RULE:^CN=([a-zA-Z.0-9@-]+).*$/$1/,DEFAULT

    zookeeper_custom_properties:
      zookeeper.set.acl: True
      
zookeeper:
  hosts:
    kafka-1:
      ansible_python_interpreter: /usr/bin/python3
    kafka-2:
      ansible_python_interpreter: /usr/bin/python3
    kafka-3:
      ansible_python_interpreter: /usr/bin/python3

kafka_broker:
  hosts:
    kafka-1:
      ansible_python_interpreter: /usr/bin/python3
    kafka-2:
      ansible_python_interpreter: /usr/bin/python3
    kafka-3:
      ansible_python_interpreter: /usr/bin/python3

The setting confluent_server_enabled: false disables the confluent-server package which is part of the Confluent Enterprise plan and uses the confluent-kafka package instead. See https://docs.confluent.io/ansible/current/ansible-configure.html#deploy-cs-or-ak.

Further we enable SSL and Mutual Authentifcation (mTLS) as follows:

    # https://docs.confluent.io/ansible/current/ansible-authenticate.html#configure-mtls-authentication
    ssl_enabled: true
    ssl_mutual_auth_enabled: true

    # https://docs.confluent.io/ansible/current/ansible-authenticate.html#configure-mtls
    zookeeper_ssl_enabled: true
    zookeeper_ssl_mutual_auth_enabled: true

Finally we enable ACLs for authorization by configuring authorizer.class.name to use authorizer.class.name: kafka.security.authorizer.AclAuthorizer which comes with Apache Kafka as follows:

    # Enable ACLs
    # https://kafka.apache.org/documentation/#security_authz
    kafka_broker_custom_properties:
      authorizer.class.name: kafka.security.authorizer.AclAuthorizer
      super.users: User:kafka_broker;User:schema_registry;User:zookeeper;User:kafka
      ssl.principal.mapping.rules: RULE:^CN=([a-zA-Z.0-9@-]+).*$/$1/,DEFAULT

    zookeeper_custom_properties:
      zookeeper.set.acl: True

Note that we need to define the users kafka_broker, schema_registry, zookeeper and kafka as super.users or create appropirate ACLs so that they are authorized.

Note that the rule ssl.principal.mapping.rules maps the Distinguished Name (DN) i.e. CN=client1 from the certificates used for mTLS to the username i.e. client1 of the client.

mTLS Authentication

While a key store typically holds onto certificates that identify us (contains certificate and key), a trust store holds onto certificates that identify others (contains certificate authorities). See https://www.baeldung.com/java-keystore-truststore-difference for more info about Java key and trust stores.

Let’s create a truststore and a keystore for a client client1 to authenticate against a Kafka cluster using mTLS authentication. Let’s assume that snakeoil-ca-1.crt and snakeoil-ca-1.key are used as authority to generate self signed certificates for the cluster (this is the default when using Ansible playbooks from Confluent).

First let’s create the keystore:

keytool -genkeypair -noprompt 
    -storetype pkcs12 
  -keyalg RSA 
  -keysize 2048 
  -alias client1 
  -dname "CN=client1" 
  -keystore client1-keystore.jks 
  -storepass client-secret 
    -keypass client-secret

Create Certificate Signing Request:

keytool -keystore client1-keystore.jks 
    -storetype pkcs12 
  -alias client1 
  -certreq -file client1.csr 
  -storepass client-secret 
    -keypass client-secret

Create OpenSSL config file openssl.conf including extended key usage for client/server authentication:

[req]
distinguished_name = req_distinguished_name
x509_extensions = v3_req
prompt = no
[v3_req]
extendedKeyUsage = serverAuth , clientAuth

Sign the CSR with your authority:

openssl x509 -req 
    -CA snakeoil-ca-1.crt 
  -CAkey snakeoil-ca-1.key 
  -in client1.csr 
  -out client1.crt 
  -days 36500 
  -passin pass:capassword123 
  -extfile openssl.conf 
  -extensions v3_req 
  [-CAcreateserial or -CAserial snakeoil-ca-1.srl]

Import the authority cert into the keystore:

keytool -noprompt 
    -keystore client1-keystore.jks 
  -storetype pkcs12 
  -alias CARoot 
  -import -file snakeoil-ca-1.crt 
  -storepass client-secret 
    -keypass client-secret

Import the signed host certificate into the keystore:

keytool -noprompt 
    -keystore client1-keystore.jks 
  -storetype pkcs12 
  -alias client1 
  -import -file client1.crt 
  -storepass client-secret 
    -keypass client-secret  

Next create a trust store containing the root certificate from your authority. This is used from the cli tools to make sure the certificate presented by the broker was signed by your authority:

keytool -noprompt 
    -keystore client1-truststore.jks 
  -storetype pkcs12 
  -alias CARoot 
  -import -file snakeoil-ca-1.crt 
  -storepass client-secret 
    -keypass client-secret    

Finally let’s create a user.properties file to use in our cli tools:

security.protocol=SSL
ssl.keystore.location=client1-keystore.jks
ssl.keystore.password=client-secret
ssl.key.password=client-secret
ssl.truststore.location=client1-truststore.jks
ssl.truststore.password=client-secret  

Let’s test our user.properties using pre installed cli tools in the docker image confluentinc/cp-kafka:6.0.0. Run the image and bind mount the created stores and configs as follows:

docker run --entrypoint=/bin/bash --user 0 --rm -t -i -v /etc/hosts:/etc/hosts -v $(pwd):/client confluentinc/cp-kafka:6.0.0

Not let’s try to list existing topics:

cd /client
kafka-topics --bootstrap-server kafka-1:9091 --command-config user.properties --list

No topics since the new user does not have permission to see any topics. But also no authentication error, so mTLS authentication seems to work.

ACL Authorization

Creating a topic fails with authorization failed:

 kafka-topics --bootstrap-server kafka-1:9091 --command-config user.properties --create --topic test
(org.apache.kafka.clients.admin.AdminClientConfig)
Error while executing topic command : Authorization failed.
[2020-12-03 10:51:41,617] ERROR org.apache.kafka.common.errors.TopicAuthorizationException: Authorization failed.
 (kafka.admin.TopicCommand$)

That’s fine since the new user client1 does not have any ACLs. So to create ACLs we need to use a super user. For testing purpose, we add client1 to the attribute super.users in server.properites and restart confluent-kafka.

Now we can use client1 to create a topic test and ACLs so that client1 can produce and consume the new topic. First let’s create the topic:

kafka-topics --bootstrap-server kafka-1:9091 --command-config user.properties --create --topic test
Created topic test.

Create producer and consumer ACLs:

kafka-acls --bootstrap-server kafka-1:9091 --command-config user.properties  --add --allow-principal User:client1 --producer --topic test
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: 
 	(principal=User:client, host=*, operation=WRITE, permissionType=ALLOW)
    (principal=User:client, host=*, operation=CREATE, permissionType=ALLOW)
    (principal=User:client, host=*, operation=DESCRIBE, permissionType=ALLOW)
  
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: 
 	(principal=User:client, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:client, host=*, operation=WRITE, permissionType=ALLOW)
    (principal=User:client, host=*, operation=CREATE, permissionType=ALLOW)
kafka-acls --bootstrap-server kafka-1:9091 --command-config user.properties  --add --allow-principal User:client1 --consumer --group tester --topic test
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: 
 	(principal=User:client, host=*, operation=READ, permissionType=ALLOW)
    (principal=User:client, host=*, operation=DESCRIBE, permissionType=ALLOW) 

Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=tester, patternType=LITERAL)`: 
 	(principal=User:client, host=*, operation=READ, permissionType=ALLOW)
  Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=tester, patternType=LITERAL)`: 
 	(principal=User:client, host=*, operation=READ, permissionType=ALLOW) 

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: 
 	(principal=User:client, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:client, host=*, operation=READ, permissionType=ALLOW)
    (principal=User:client, host=*, operation=WRITE, permissionType=ALLOW)
    (principal=User:client, host=*, operation=CREATE, permissionType=ALLOW) 

Note that we are using consumer group tester which can by any group and must be specified in the consumer command as you can see below.

Undo changes in server.properties by removing client1 from super.users and restart confluent-kafka.

Now let’s start a consumer of topic test with user client1 and consumer group tester:

kafka-console-consumer --bootstrap-server=kafka-1:9091 --topic=test --consumer.config user.properties --group tester

And finally start the producer as follows:

kafka-console-producer --broker-list kafka-1:9091 --topic=test --producer.config user.properties
>

Type some words into the producer command line. After a short time, they should appear in the consumer console.

ZooKeeper Security

As mentioned in Kafka Security – First Steps Apache ZooKeeper is world writable by default. And as explained here, we can use zookeeper-security-migration to make ZooKeeper secure.

To do so, let’s create a zookeeper-client.properties file on one of the ZooKeeper nodes so that we are able to connect to ZooKeeper:

zookeeper.ssl.client.enable=true
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
zookeeper.ssl.keystore.location=/var/ssl/private/zookeeper.keystore.jks
zookeeper.ssl.keystore.password=confluentkeystorestorepass
zookeeper.ssl.truststore.location=/var/ssl/private/zookeeper.truststore.jks
zookeeper.ssl.truststore.password=confluenttruststorepass

We run this on a ZooKeeper node since zookeeper.keystore.jks and zookeeper.truststore.jks are already there and can be re-used.

Next, run the migration tool on the ZooKeeper node and pass --zookeeper.acl secure as follows:

zookeeper-security-migration --zk-tls-config-file zookeeper-client.properties --zookeeper.connect=kafka-1:2182  --zookeeper.acl secure

Let’s see whether the migration tool created ACLs:

zookeeper-shell kafka-1:2182 -zk-tls-config-file /etc/kafka/zookeeper-client.properties getAcl /config/users
Connecting to kafka-1:2182

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
'x509,'CN=kafka_broker%2COU=TEST%2CO=CONFLUENT%2CL=PaloAlto%2CST=Ca%2CC=US
: cdrwa

We can see that there are ACLs for CN=kafka_broker OU=TEST O=CONFLUENT L=PaloAlto ST=Ca C=US which is the DN of the certificate we used when we run zookeeper-security-migration. This is also the DN of the certificate that all Kafka Broker will use. Otherwise they won’t be able to connect to ZooKeeper and Kafka won’t start.

If you plan to also run other Kafka components i.e. schema_registry either make sure they use the same certificate including the DN CN=kafka_broker OU=TEST O=CONFLUENT L=PaloAlto ST=Ca C=US or add additional ZooKeeper ACLs for the other DNs, too.

So lets try to connect to ZooKeeper without authentication i.e. to create a user (which should fail):

zookeeper-shell kafka-1:2181 create /config/users/foo bar
Connecting to kafka-1:2181

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
Authentication is not valid : /config/users/foo

We get a authentication is not valid which shows that ACLs are working and effective.

Be careful: This is Kafka! If you create a topic, the default ZooKeeper ACLs are cdrwa for world/anyone also for the newly created topic. So you should manually adjust these ACLs to cdrwa for CN=kafka_broker OU=TEST O=CONFLUENT L=PaloAlto ST=Ca C=US. Otherwise an unauthorized client is able to modify your topic or event delete your topic. Because of this, it is a good idea to deny access to ZooKeeper form outside the ZooKeeper cluster also with enabled ACLs.

Conclusion

The mTLS approach is bullet-proofed since it is using TLS for authentication. A little tricky is mapping the Distinguished Name (DN) from the certificates to the user name i.e. using the ssl.principal.mapping.rules setting in server.properties. We recommend to use a regex tester e.g. this to test the rules since debugging this in Kafka seems to be hard.

For authorization, you still have to enable ACLs as describe in the previous post and you still have to enable ACLs for both, Kafka Broker (using authorizer.class.name: kafka.security.authorizer.AclAuthorizer) and ZooKeeper (using zookeeper-security-migration). Especially managing ZooKeeper ACLs is a mess and they do have a terrible syntax as well.

To sum up, this setup seems to be production ready with some traps that you should watch out for. This is still no recommendation to use Apache Kafka at all. If you just need a messaging system, consider using NATS or RabbitMQ.