
Kafka Security - mTLS & ACL Authorization
Kafka Security - mTLS & ACL Authorization
Running Confluent Ansible Playbook
As described in Kafka Security - First Steps we are using the Confluent Ansible Playbook to deploy our Kafka. We are using the following hosts.yml file:
all:
vars:
ansible_connection: ssh
ansible_user: root
ansible_become: true
# Use kafka instead of confluent-server
# https://docs.confluent.io/ansible/current/ansible-configure.html#deploy-cs-or-ak
confluent_server_enabled: false
# https://docs.confluent.io/ansible/current/ansible-authenticate.html#configure-mtls-authentication
ssl_enabled: true
ssl_mutual_auth_enabled: true
# https://docs.confluent.io/ansible/current/ansible-authenticate.html#configure-mtls
zookeeper_ssl_enabled: true
zookeeper_ssl_mutual_auth_enabled: true
# Enable ACLs
# https://kafka.apache.org/documentation/#security_authz
kafka_broker_custom_properties:
authorizer.class.name: kafka.security.authorizer.AclAuthorizer
super.users: User:kafka_broker;User:schema_registry;User:zookeeper;User:kafka
ssl.principal.mapping.rules: RULE:^CN=([a-zA-Z\.0-9@\-]+).*$/$1/,DEFAULT
zookeeper_custom_properties:
zookeeper.set.acl: True
zookeeper:
hosts:
kafka-1:
ansible_python_interpreter: /usr/bin/python3
kafka-2:
ansible_python_interpreter: /usr/bin/python3
kafka-3:
ansible_python_interpreter: /usr/bin/python3
kafka_broker:
hosts:
kafka-1:
ansible_python_interpreter: /usr/bin/python3
kafka-2:
ansible_python_interpreter: /usr/bin/python3
kafka-3:
ansible_python_interpreter: /usr/bin/python3
The setting confluent_server_enabled: false disables the confluent-server package which is part of the Confluent Enterprise plan and uses the confluent-kafka package instead. See https://docs.confluent.io/ansible/current/ansible-configure.html#deploy-cs-or-ak.
Further we enable SSL and Mutual Authentifcation (mTLS) as follows:
# https://docs.confluent.io/ansible/current/ansible-authenticate.html#configure-mtls-authentication
ssl_enabled: true
ssl_mutual_auth_enabled: true
# https://docs.confluent.io/ansible/current/ansible-authenticate.html#configure-mtls
zookeeper_ssl_enabled: true
zookeeper_ssl_mutual_auth_enabled: true
Finally we enable ACLs for authorization by configuring authorizer.class.name to use authorizer.class.name: kafka.security.authorizer.AclAuthorizer which comes with Apache Kafka as follows:
# Enable ACLs
# https://kafka.apache.org/documentation/#security_authz
kafka_broker_custom_properties:
authorizer.class.name: kafka.security.authorizer.AclAuthorizer
super.users: User:kafka_broker;User:schema_registry;User:zookeeper;User:kafka
ssl.principal.mapping.rules: RULE:^CN=([a-zA-Z\.0-9@\-]+).*$/$1/,DEFAULT
zookeeper_custom_properties:
zookeeper.set.acl: True
Note that we need to define the users kafka_broker, schema_registry, zookeeper and kafka as super.users or create appropirate ACLs so that they are authorized.
Note that the rule ssl.principal.mapping.rules maps the Distinguished Name (DN) i.e. CN=client1 from the certificates used for mTLS to the username i.e. client1 of the client.
mTLS Authentication
Let's create a truststore and a keystore for a client client1 to authenticate against a Kafka cluster using mTLS authentication. Let's assume that snakeoil-ca-1.crt and snakeoil-ca-1.key are used as authority to generate self signed certificates for the cluster (this is the default when using Ansible playbooks from Confluent).
First let's create the keystore:
keytool -genkeypair -noprompt \
-storetype pkcs12 \
-keyalg RSA \
-keysize 2048 \
-alias client1 \
-dname "CN=client1" \
-keystore client1-keystore.jks \
-storepass client-secret \
-keypass client-secret
Create Certificate Signing Request:
keytool -keystore client1-keystore.jks \
-storetype pkcs12 \
-alias client1 \
-certreq -file client1.csr \
-storepass client-secret \
-keypass client-secret
Create OpenSSL config file openssl.conf including extended key usage for client/server authentication:
[req]
distinguished_name = req_distinguished_name
x509_extensions = v3_req
prompt = no
[v3_req]
extendedKeyUsage = serverAuth , clientAuth
Sign the CSR with your authority:
openssl x509 -req \
-CA snakeoil-ca-1.crt \
-CAkey snakeoil-ca-1.key \
-in client1.csr \
-out client1.crt \
-days 36500 \
-passin pass:capassword123 \
-extfile openssl.conf \
-extensions v3_req \
[-CAcreateserial or -CAserial snakeoil-ca-1.srl]
Import the authority cert into the keystore:
keytool -noprompt \
-keystore client1-keystore.jks \
-storetype pkcs12 \
-alias CARoot \
-import -file snakeoil-ca-1.crt \
-storepass client-secret \
-keypass client-secret
Import the signed host certificate into the keystore:
keytool -noprompt \
-keystore client1-keystore.jks \
-storetype pkcs12 \
-alias client1 \
-import -file client1.crt \
-storepass client-secret \
-keypass client-secret
Next create a trust store containing the root certificate from your authority. This is used from the cli tools to make sure the certificate presented by the broker was signed by your authority:
keytool -noprompt \
-keystore client1-truststore.jks \
-storetype pkcs12 \
-alias CARoot \
-import -file snakeoil-ca-1.crt \
-storepass client-secret \
-keypass client-secret
Finally let's create a user.properties file to use in our cli tools:
security.protocol=SSL
ssl.keystore.location=client1-keystore.jks
ssl.keystore.password=client-secret
ssl.key.password=client-secret
ssl.truststore.location=client1-truststore.jks
ssl.truststore.password=client-secret
Let's test our user.properties using pre installed cli tools in the docker image confluentinc/cp-kafka:6.0.0. Run the image and bind mount the created stores and configs as follows:
docker run --entrypoint=/bin/bash --user 0 --rm -t -i -v /etc/hosts:/etc/hosts -v $(pwd):/client confluentinc/cp-kafka:6.0.0
Not let's try to list existing topics:
cd /client
kafka-topics --bootstrap-server kafka-1:9091 --command-config user.properties --list
No topics since the new user does not have permission to see any topics. But also no authentication error, so mTLS authentication seems to work.
ACL Authorization
Creating a topic fails with authorization failed:
kafka-topics --bootstrap-server kafka-1:9091 --command-config user.properties --create --topic test
(org.apache.kafka.clients.admin.AdminClientConfig)
Error while executing topic command : Authorization failed.
[2020-12-03 10:51:41,617] ERROR org.apache.kafka.common.errors.TopicAuthorizationException: Authorization failed.
(kafka.admin.TopicCommand$)
That's fine since the new user client1 does not have any ACLs. So to create ACLs we need to use a super user. For testing purpose, we add client1 to the attribute super.users in server.properites and restart confluent-kafka.
Now we can use client1 to create a topic test and ACLs so that client1 can produce and consume the new topic. First let's create the topic:
kafka-topics --bootstrap-server kafka-1:9091 --command-config user.properties --create --topic test
Created topic test.
Create producer and consumer ACLs:
kafka-acls --bootstrap-server kafka-1:9091 --command-config user.properties --add --allow-principal User:client1 --producer --topic test
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
(principal=User:client, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:client, host=*, operation=CREATE, permissionType=ALLOW)
(principal=User:client, host=*, operation=DESCRIBE, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
(principal=User:client, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:client, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:client, host=*, operation=CREATE, permissionType=ALLOW)
kafka-acls --bootstrap-server kafka-1:9091 --command-config user.properties --add --allow-principal User:client1 --consumer --group tester --topic test
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
(principal=User:client, host=*, operation=READ, permissionType=ALLOW)
(principal=User:client, host=*, operation=DESCRIBE, permissionType=ALLOW)
Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=tester, patternType=LITERAL)`:
(principal=User:client, host=*, operation=READ, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=tester, patternType=LITERAL)`:
(principal=User:client, host=*, operation=READ, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
(principal=User:client, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:client, host=*, operation=READ, permissionType=ALLOW)
(principal=User:client, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:client, host=*, operation=CREATE, permissionType=ALLOW)
tester which can by any group and must be specified in the consumer command as you can see below.Undo changes in server.properties by removing client1 from super.users and restart confluent-kafka.
Now let's start a consumer of topic test with user client1 and consumer group tester:
kafka-console-consumer --bootstrap-server=kafka-1:9091 --topic=test --consumer.config user.properties --group tester
And finally start the producer as follows:
kafka-console-producer --broker-list kafka-1:9091 --topic=test --producer.config user.properties
>
Type some words into the producer command line. After a short time, they should appear in the consumer console.
ZooKeeper Security
As mentioned in Kafka Security - First Steps Apache ZooKeeper is world writable by default. And as explained here, we can use zookeeper-security-migration to make ZooKeeper secure.
To do so, let's create a zookeeper-client.properties file on one of the ZooKeeper nodes so that we are able to connect to ZooKeeper:
zookeeper.ssl.client.enable=true
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
zookeeper.ssl.keystore.location=/var/ssl/private/zookeeper.keystore.jks
zookeeper.ssl.keystore.password=confluentkeystorestorepass
zookeeper.ssl.truststore.location=/var/ssl/private/zookeeper.truststore.jks
zookeeper.ssl.truststore.password=confluenttruststorepass
zookeeper.keystore.jks and zookeeper.truststore.jks are already there and can be re-used.Next, run the migration tool on the ZooKeeper node and pass --zookeeper.acl secure as follows:
zookeeper-security-migration --zk-tls-config-file zookeeper-client.properties --zookeeper.connect=kafka-1:2182 --zookeeper.acl secure
Let's see whether the migration tool created ACLs:
zookeeper-shell kafka-1:2182 -zk-tls-config-file /etc/kafka/zookeeper-client.properties getAcl /config/users
Connecting to kafka-1:2182
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
'x509,'CN=kafka_broker%2COU=TEST%2CO=CONFLUENT%2CL=PaloAlto%2CST=Ca%2CC=US
: cdrwa
We can see that there are ACLs for CN=kafka_broker OU=TEST O=CONFLUENT L=PaloAlto ST=Ca C=US which is the DN of the certificate we used when we run zookeeper-security-migration. This is also the DN of the certificate that all Kafka Broker will use. Otherwise they won't be able to connect to ZooKeeper and Kafka won't start.
schema_registry either make sure they use the same certificate including the DN CN=kafka_broker OU=TEST O=CONFLUENT L=PaloAlto ST=Ca C=US or add additional ZooKeeper ACLs for the other DNs, too.So lets try to connect to ZooKeeper without authentication i.e. to create a user (which should fail):
zookeeper-shell kafka-1:2181 create /config/users/foo bar
Connecting to kafka-1:2181
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
Authentication is not valid : /config/users/foo
We get a authentication is not valid which shows that ACLs are working and effective.
cdrwa for world/anyone also for the newly created topic. So you should manually adjust these ACLs to cdrwa for CN=kafka_broker OU=TEST O=CONFLUENT L=PaloAlto ST=Ca C=US. Otherwise an unauthorized client is able to modify your topic or event delete your topic. Because of this, it is a good idea to deny access to ZooKeeper form outside the ZooKeeper cluster also with enabled ACLs.Conclusion
The mTLS approach is bullet-proofed since it is using TLS for authentication. A little tricky is mapping the Distinguished Name (DN) from the certificates to the user name i.e. using the ssl.principal.mapping.rules setting in server.properties. We recommend to use a regex tester e.g. this to test the rules since debugging this in Kafka seems to be hard.
For authorization, you still have to enable ACLs as describe in the previous post and you still have to enable ACLs for both, Kafka Broker (using authorizer.class.name: kafka.security.authorizer.AclAuthorizer) and ZooKeeper (using zookeeper-security-migration). Especially managing ZooKeeper ACLs is a mess and they do have a terrible syntax as well.
To sum up, this setup seems to be production ready with some traps that you should watch out for. This is still no recommendation to use Apache Kafka at all. If you just need a messaging system, consider using NATS or RabbitMQ.
