In my last post I wrote about first steps and lessions learned when setting up Apache Kafka with encryption, SASL SCRAM/Digest authentication and ACL authorization using Confluent Platform. This secures Kafka using SASL SCRAM between clients and Kafka Brokers and SASL MD5 digest between Kafka Brokers and ZooKeeper. This approach has some drawbacks i.e. the passwords must be stored on the clients and ZooKeeper is using MD5 hashes for passwords on the wire. So we try another approach by using Mutual TLS (mTLS) only, which seems a bit easier and which also seems to be suitable for a corporate environment.
¶ Running Confluent Ansible Playbook
As described in Kafka Security – First Steps we are using the Confluent Ansible Playbook to deploy our Kafka. We are using the following hosts.yml
file:
all:
vars:
ansible_connection: ssh
ansible_user: root
ansible_become: true
# Use kafka instead of confluent-server
# https://docs.confluent.io/ansible/current/ansible-configure.html#deploy-cs-or-ak
confluent_server_enabled: false
# https://docs.confluent.io/ansible/current/ansible-authenticate.html#configure-mtls-authentication
ssl_enabled: true
ssl_mutual_auth_enabled: true
# https://docs.confluent.io/ansible/current/ansible-authenticate.html#configure-mtls
zookeeper_ssl_enabled: true
zookeeper_ssl_mutual_auth_enabled: true
# Enable ACLs
# https://kafka.apache.org/documentation/#security_authz
kafka_broker_custom_properties:
authorizer.class.name: kafka.security.authorizer.AclAuthorizer
super.users: User:kafka_broker;User:schema_registry;User:zookeeper;User:kafka
ssl.principal.mapping.rules: RULE:^CN=([a-zA-Z.0-9@-]+).*$/$1/,DEFAULT
zookeeper_custom_properties:
zookeeper.set.acl: True
zookeeper:
hosts:
kafka-1:
ansible_python_interpreter: /usr/bin/python3
kafka-2:
ansible_python_interpreter: /usr/bin/python3
kafka-3:
ansible_python_interpreter: /usr/bin/python3
kafka_broker:
hosts:
kafka-1:
ansible_python_interpreter: /usr/bin/python3
kafka-2:
ansible_python_interpreter: /usr/bin/python3
kafka-3:
ansible_python_interpreter: /usr/bin/python3
The setting confluent_server_enabled: false
disables the confluent-server
package which is part of the Confluent Enterprise plan and uses the confluent-kafka
package instead. See https://docs.confluent.io/ansible/current/ansible-configure.html#deploy-cs-or-ak.
Further we enable SSL and Mutual Authentifcation (mTLS) as follows:
# https://docs.confluent.io/ansible/current/ansible-authenticate.html#configure-mtls-authentication
ssl_enabled: true
ssl_mutual_auth_enabled: true
# https://docs.confluent.io/ansible/current/ansible-authenticate.html#configure-mtls
zookeeper_ssl_enabled: true
zookeeper_ssl_mutual_auth_enabled: true
Finally we enable ACLs for authorization by configuring authorizer.class.name
to use authorizer.class.name: kafka.security.authorizer.AclAuthorizer
which comes with Apache Kafka as follows:
# Enable ACLs
# https://kafka.apache.org/documentation/#security_authz
kafka_broker_custom_properties:
authorizer.class.name: kafka.security.authorizer.AclAuthorizer
super.users: User:kafka_broker;User:schema_registry;User:zookeeper;User:kafka
ssl.principal.mapping.rules: RULE:^CN=([a-zA-Z.0-9@-]+).*$/$1/,DEFAULT
zookeeper_custom_properties:
zookeeper.set.acl: True
Note that we need to define the users kafka_broker
, schema_registry
, zookeeper
and kafka
as super.users
or create appropirate ACLs so that they are authorized.
Note that the rule ssl.principal.mapping.rules
maps the Distinguished Name (DN) i.e. CN=client1
from the certificates used for mTLS to the username i.e. client1
of the client.
¶ mTLS Authentication
While a key store typically holds onto certificates that identify us (contains certificate and key), a trust store holds onto certificates that identify others (contains certificate authorities). See https://www.baeldung.com/java-keystore-truststore-difference for more info about Java key and trust stores.
Let’s create a truststore and a keystore for a client client1
to authenticate against a Kafka cluster using mTLS authentication. Let’s assume that snakeoil-ca-1.crt
and snakeoil-ca-1.key
are used as authority to generate self signed certificates for the cluster (this is the default when using Ansible playbooks from Confluent).
First let’s create the keystore:
keytool -genkeypair -noprompt
-storetype pkcs12
-keyalg RSA
-keysize 2048
-alias client1
-dname "CN=client1"
-keystore client1-keystore.jks
-storepass client-secret
-keypass client-secret
Create Certificate Signing Request:
keytool -keystore client1-keystore.jks
-storetype pkcs12
-alias client1
-certreq -file client1.csr
-storepass client-secret
-keypass client-secret
Create OpenSSL config file openssl.conf
including extended key usage for client/server authentication:
[req]
distinguished_name = req_distinguished_name
x509_extensions = v3_req
prompt = no
[v3_req]
extendedKeyUsage = serverAuth , clientAuth
Sign the CSR with your authority:
openssl x509 -req
-CA snakeoil-ca-1.crt
-CAkey snakeoil-ca-1.key
-in client1.csr
-out client1.crt
-days 36500
-passin pass:capassword123
-extfile openssl.conf
-extensions v3_req
[-CAcreateserial or -CAserial snakeoil-ca-1.srl]
Import the authority cert into the keystore:
keytool -noprompt
-keystore client1-keystore.jks
-storetype pkcs12
-alias CARoot
-import -file snakeoil-ca-1.crt
-storepass client-secret
-keypass client-secret
Import the signed host certificate into the keystore:
keytool -noprompt
-keystore client1-keystore.jks
-storetype pkcs12
-alias client1
-import -file client1.crt
-storepass client-secret
-keypass client-secret
Next create a trust store containing the root certificate from your authority. This is used from the cli tools to make sure the certificate presented by the broker was signed by your authority:
keytool -noprompt
-keystore client1-truststore.jks
-storetype pkcs12
-alias CARoot
-import -file snakeoil-ca-1.crt
-storepass client-secret
-keypass client-secret
Finally let’s create a user.properties
file to use in our cli tools:
security.protocol=SSL
ssl.keystore.location=client1-keystore.jks
ssl.keystore.password=client-secret
ssl.key.password=client-secret
ssl.truststore.location=client1-truststore.jks
ssl.truststore.password=client-secret
Let’s test our user.properties
using pre installed cli tools in the docker image confluentinc/cp-kafka:6.0.0
. Run the image and bind mount the created stores and configs as follows:
docker run --entrypoint=/bin/bash --user 0 --rm -t -i -v /etc/hosts:/etc/hosts -v $(pwd):/client confluentinc/cp-kafka:6.0.0
Not let’s try to list existing topics:
cd /client
kafka-topics --bootstrap-server kafka-1:9091 --command-config user.properties --list
No topics since the new user does not have permission to see any topics. But also no authentication error, so mTLS authentication seems to work.
¶ ACL Authorization
Creating a topic fails with authorization failed
:
kafka-topics --bootstrap-server kafka-1:9091 --command-config user.properties --create --topic test
(org.apache.kafka.clients.admin.AdminClientConfig)
Error while executing topic command : Authorization failed.
[2020-12-03 10:51:41,617] ERROR org.apache.kafka.common.errors.TopicAuthorizationException: Authorization failed.
(kafka.admin.TopicCommand$)
That’s fine since the new user client1
does not have any ACLs. So to create ACLs we need to use a super user. For testing purpose, we add client1
to the attribute super.users
in server.properites
and restart confluent-kafka
.
Now we can use client1
to create a topic test
and ACLs so that client1
can produce and consume the new topic. First let’s create the topic:
kafka-topics --bootstrap-server kafka-1:9091 --command-config user.properties --create --topic test
Created topic test.
Create producer
and consumer
ACLs:
kafka-acls --bootstrap-server kafka-1:9091 --command-config user.properties --add --allow-principal User:client1 --producer --topic test
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
(principal=User:client, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:client, host=*, operation=CREATE, permissionType=ALLOW)
(principal=User:client, host=*, operation=DESCRIBE, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
(principal=User:client, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:client, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:client, host=*, operation=CREATE, permissionType=ALLOW)
kafka-acls --bootstrap-server kafka-1:9091 --command-config user.properties --add --allow-principal User:client1 --consumer --group tester --topic test
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
(principal=User:client, host=*, operation=READ, permissionType=ALLOW)
(principal=User:client, host=*, operation=DESCRIBE, permissionType=ALLOW)
Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=tester, patternType=LITERAL)`:
(principal=User:client, host=*, operation=READ, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=tester, patternType=LITERAL)`:
(principal=User:client, host=*, operation=READ, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
(principal=User:client, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:client, host=*, operation=READ, permissionType=ALLOW)
(principal=User:client, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:client, host=*, operation=CREATE, permissionType=ALLOW)
Note that we are using consumer group
tester
which can by any group and must be specified in the consumer command as you can see below.
Undo changes in server.properties
by removing client1
from super.users
and restart confluent-kafka
.
Now let’s start a consumer of topic test
with user client1
and consumer group tester
:
kafka-console-consumer --bootstrap-server=kafka-1:9091 --topic=test --consumer.config user.properties --group tester
And finally start the producer as follows:
kafka-console-producer --broker-list kafka-1:9091 --topic=test --producer.config user.properties
>
Type some words into the producer command line. After a short time, they should appear in the consumer console.
¶ ZooKeeper Security
As mentioned in Kafka Security – First Steps Apache ZooKeeper is world writable by default. And as explained here, we can use zookeeper-security-migration
to make ZooKeeper secure.
To do so, let’s create a zookeeper-client.properties
file on one of the ZooKeeper nodes so that we are able to connect to ZooKeeper:
zookeeper.ssl.client.enable=true
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
zookeeper.ssl.keystore.location=/var/ssl/private/zookeeper.keystore.jks
zookeeper.ssl.keystore.password=confluentkeystorestorepass
zookeeper.ssl.truststore.location=/var/ssl/private/zookeeper.truststore.jks
zookeeper.ssl.truststore.password=confluenttruststorepass
We run this on a ZooKeeper node since
zookeeper.keystore.jks
andzookeeper.truststore.jks
are already there and can be re-used.
Next, run the migration tool on the ZooKeeper node and pass --zookeeper.acl secure
as follows:
zookeeper-security-migration --zk-tls-config-file zookeeper-client.properties --zookeeper.connect=kafka-1:2182 --zookeeper.acl secure
Let’s see whether the migration tool created ACLs:
zookeeper-shell kafka-1:2182 -zk-tls-config-file /etc/kafka/zookeeper-client.properties getAcl /config/users
Connecting to kafka-1:2182
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
'x509,'CN=kafka_broker%2COU=TEST%2CO=CONFLUENT%2CL=PaloAlto%2CST=Ca%2CC=US
: cdrwa
We can see that there are ACLs for CN=kafka_broker OU=TEST O=CONFLUENT L=PaloAlto ST=Ca C=US
which is the DN of the certificate we used when we run zookeeper-security-migration
. This is also the DN of the certificate that all Kafka Broker will use. Otherwise they won’t be able to connect to ZooKeeper and Kafka won’t start.
If you plan to also run other Kafka components i.e.
schema_registry
either make sure they use the same certificate including the DNCN=kafka_broker OU=TEST O=CONFLUENT L=PaloAlto ST=Ca C=US
or add additional ZooKeeper ACLs for the other DNs, too.
So lets try to connect to ZooKeeper without authentication i.e. to create a user (which should fail):
zookeeper-shell kafka-1:2181 create /config/users/foo bar
Connecting to kafka-1:2181
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
Authentication is not valid : /config/users/foo
We get a authentication is not valid
which shows that ACLs are working and effective.
Be careful: This is Kafka! If you create a topic, the default ZooKeeper ACLs are
cdrwa
forworld/anyone
also for the newly created topic. So you should manually adjust these ACLs tocdrwa
forCN=kafka_broker OU=TEST O=CONFLUENT L=PaloAlto ST=Ca C=US
. Otherwise an unauthorized client is able to modify your topic or event delete your topic. Because of this, it is a good idea to deny access to ZooKeeper form outside the ZooKeeper cluster also with enabled ACLs.
¶ Conclusion
The mTLS approach is bullet-proofed since it is using TLS for authentication. A little tricky is mapping the Distinguished Name (DN) from the certificates to the user name i.e. using the ssl.principal.mapping.rules
setting in server.properties
. We recommend to use a regex tester e.g. this to test the rules since debugging this in Kafka seems to be hard.
For authorization, you still have to enable ACLs as describe in the previous post and you still have to enable ACLs for both, Kafka Broker (using authorizer.class.name: kafka.security.authorizer.AclAuthorizer
) and ZooKeeper (using zookeeper-security-migration
). Especially managing ZooKeeper ACLs is a mess and they do have a terrible syntax as well.
To sum up, this setup seems to be production ready with some traps that you should watch out for. This is still no recommendation to use Apache Kafka at all. If you just need a messaging system, consider using NATS or RabbitMQ.