BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Securing a Kafka Cluster in Kubernetes Using Strimzi

Securing a Kafka Cluster in Kubernetes Using Strimzi

Key Takeaways

  • Strimzi simplifies the deployment of a Kafka cluster to a Kubernetes cluster.
  • Strimzi configuration lets you secure Kafka communications and provide user/topic RBAC management in a declarative way.
  • Debezium Server provides attributes to connect to a secured Kafka cluster.
  • Debezium Embedded can be used as Strimzi creates Kubernetes Secrets with the required credentials that any Kubernetes deployment can read.
  • By default, Kubernetes doesn’t encrypt secrets, which you need to configure to protect them against attacks.

In part 3 of this series, we learned about dual writes problems and how to solve them using Change Data Capture patterns, specifically using Debezium to read changes done in the database (through transaction log) and populating these changes to a Kafka topic.

In part 4 of this series, we moved the example one step forward, running the application from the local development machine to Kubernetes (production environment). We rely on Strimzi to deploy and configure Kafka and Debezium into the Kafka cluster.

But overall, we missed one important thing not covered at that time to make things simpler yet very important; this is security.

  • How to secure the MySQL instance without having the username/password directly hardcoded in the deployment file.
  • How to add authn in the Kafka cluster using Strimzi.
  • How to configure Debezium to authenticate against Kafka and MySQL instances securely.

In this article, we’ll answer all these questions by taking the application developed in the previous article (using the Debezium Server approach) and securing it.

Kubernetes

We need a Kubernetes cluster with Strimzi installed. We explained this in part 4 of this series; if you are reusing it, you first need to delete the application, the MySQL database, the Kafka cluster, and the Debezium instance.

IMPORTANT: You only need to run the following steps IF you still have the cluster from part 4. If you already deleted it, continue reading after the section that describes how to delete the cluster.

Run the following commands in a terminal window to delete them:

kubectl delete deployment movie-plays-producer-debezium-server -n kafka
kubectl delete service movie-plays-producer-debezium-server -n kafka
kubectl delete -f mysql-deployment.yaml -n kafka
kubectl delete -f debezium-kafka-connector.yaml -n kafka
kubectl delete -f debezium-kafka-connect.yaml -n kafka
kubectl delete -f kafka.yaml -n kafka

IMPORTANT: You only need to run the following step if you don’t have a Kuberntes cluster.

If you have already destroyed the cluster, follow the quick instructions to create a new one. In a terminal window, run these commands:

minikube start -p strimzi --kubernetes-version='v1.22.12' --vm-driver='virtualbox' --memory=12096 --cpus=3

kubectl create namespace kafka

kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka

Validate the operator installation by running the following command:

kubectl get pods -n kafka

NAME                                        READY   STATUS    RESTARTS   AGE
strimzi-cluster-operator-597d67c7d6-ms987   1/1     Running   0          4m27s

Wait until the operator is running and ready.

At this point, we can start installing all the components with authentication and authorization instead of anonymous access.

MySQL

In the previous article, we deployed the MySQL instance, hardcoding the username/password in the deployment file as an environment variable:

env:
    - name: MYSQL_ROOT_PASSWORD
      value: alex
    - name: MYSQL_DATABASE
      value: moviesdb
    - name: MYSQL_USER
      value: alex
    - name: MYSQL_PASSWORD
      value: alex

Let’s create a Kubernetes Secret to store these sensitive data. Data in a Kubernetes secrets file must be encoded in base64 format. The alex string encoded in base64 is YWxleA==.

To generate this value, run the following command:

echo -n 'alex' | base64
YWxleA==

Create the mysql-secret.yaml file with the secrets set:

apiVersion: v1
kind: Secret
metadata:
 name: mysqlsecret
type: Opaque
data:
 mysqlrootpassword: YWxleA==
 mysqluser: YWxleA==
 mysqlpassword: YWxleA==

And apply it to the cluster:

kubectl apply -f mysql-secret.yaml -n kafka

Then update the MySQL deployment file to read the values from the secret created in the previous step using the secretKeyRef field in the value section:

apiVersion: v1
kind: Service
metadata:
 name: mysql
 labels:
   app: mysql
spec:
 ports:
   - port: 3306
 selector:
   app: mysql
 clusterIP: None
---
apiVersion: apps/v1
kind: Deployment
metadata:
 name: mysql
 labels:
   app: mysql
spec:
 selector:
   matchLabels:
     app: mysql
 strategy:
   type: Recreate
 template:
   metadata:
     labels:
       app: mysql
   spec:
     containers:
     - image: mysql:8.0.30
       name: mysql
       env:
       - name: MYSQL_ROOT_PASSWORD
         valueFrom:
           secretKeyRef:
             key: mysqlrootpassword
             name: mysqlsecret
       - name: MYSQL_DATABASE
         value: moviesdb
       - name: MYSQL_USER
         valueFrom:
           secretKeyRef:
             key: mysqluser
             name: mysqlsecret
       - name: MYSQL_PASSWORD
         valueFrom:
           secretKeyRef:
             key: mysqlpassword
             name: mysqlsecret
       ports:
       - containerPort: 3306
         name: mysql

In a secretKeyRef section, we specify the secret name where secrets are stored; in this case, we named mysqlsecret in the mysql-secret.yaml file.

Deploy the MySQL instance into the Kubernetes cluster:

kubectl apply -f mysql-deployment.yaml -n kafka

We can validate secrets that are injected correctly by exporting the environment variables. First, let’s get the Pod name:

kubectl get pods -n kafka
NAME                                        READY   STATUS    RESTARTS   AGE
mysql-7888f99967-4cj47                      1/1     Running   0          90s

And then running export command inside the container by running the following commands in a terminal window::

kubectl exec -n kafka -ti mysql-7888f99967-4cj47 /bin/bash

bash-4.4# export
declare -x GOSU_VERSION="1.14"
declare -x HOME="/root"
declare -x HOSTNAME="mysql-7888f99967-4cj47"
declare -x KUBERNETES_PORT="tcp://10.96.0.1:443"
declare -x KUBERNETES_PORT_443_TCP="tcp://10.96.0.1:443"
declare -x KUBERNETES_PORT_443_TCP_ADDR="10.96.0.1"
declare -x KUBERNETES_PORT_443_TCP_PORT="443"
declare -x KUBERNETES_PORT_443_TCP_PROTO="tcp"
declare -x KUBERNETES_SERVICE_HOST="10.96.0.1"
declare -x KUBERNETES_SERVICE_PORT="443"
declare -x KUBERNETES_SERVICE_PORT_HTTPS="443"
declare -x MYSQL_DATABASE="moviesdb"
declare -x MYSQL_MAJOR="8.0"
declare -x MYSQL_PASSWORD="alex"
declare -x MYSQL_ROOT_PASSWORD="alex"
declare -x MYSQL_SHELL_VERSION="8.0.30-1.el8"
declare -x MYSQL_USER="alex"
declare -x MYSQL_VERSION="8.0.30-1.el8"
declare -x OLDPWD
declare -x PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
declare -x PWD="/"
declare -x SHLVL="1"
declare -x TERM="xterm"

Now you can exit the container:

exit

The MySQL database credentials are now configured using a Kubernetes Secret, which is much better than setting them in the deployment file. The other part to update is the application as it now needs to read the credentials from the Secret instead of having them statically set in the configuration file.

Movie Plays Producer Debezium

The database username and password are hardcoded in the application.properties file. It would be better if the application could be configured automatically with username and password set in the Kubernetes Secret when deployed to Kubernetes.

One way to do this could be by injecting the secrets as environment variables as we did in the MySQL deployment into the application Pod. For example, in the case of the password, the env part of the deployment file would be:

- name: MYSQL_PASSWORD
  valueFrom:
    secretKeyRef:
      key: mysqlpassword
      name: mysqlsecret

Now update the application.properties file to set the password value from the environment variable:

%prod.quarkus.datasource.password=${mysql-password}

This works, but storing secrets as environment variables isn’t the most secure way to do it, as they can easily be hacked by anyone listing the environment variables.

Quarkus includes the kubernetes-config extension that allows the application to read Kubernetes ConfigMaps and Secrets directly from the Kubernetes API server. This way, secrets are securely transmitted from the Kubernetes cluster to the application memory without any middle step like materializing them as environment variables or mounting them as volumes.

Kubernetes Config Extension

The first thing to do is register the kubernetes-config extension. Open the pom.xml file and add the following dependency:

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-kubernetes-config</artifactId>
</dependency>

Then, enable the application to read Kubernetes Secrets directly from the Kubernetes API, and set the name of the Secret (in our case mysqlsecret) to read.

Open the src/main/resources/application.properties file and append the following lines:

%prod.quarkus.kubernetes-config.secrets.enabled=true                           
quarkus.kubernetes-config.secrets=mysqlsecret

Then update the quarkus.datasource.username and quarkus.datasource.password properties to read their values from the keys, mysqluser and mysqlpassword, from the mysqlsecret Secret.

In the application.properties file, update these properties accordingly:

%prod.quarkus.datasource.username=${mysqluser}
%prod.quarkus.datasource.password=${mysqlpassword}

Both values are assigned with the value obtained from the key set in the mysqlsecret Secret.

Since reading Kubernetes Secrets involves interacting with the Kubernetes API Server, when RBAC (roll-based access control) is enabled on the cluster, the ServiceAccount used to run the application must have the proper permissions for such access.

Because we registered the Kubernetes extension in the previous article, all the necessary Kubernetes resources to make that happen are automatically generated, so we don’t need to do anything.

Let’s deploy the application running the following command in a terminal window:

./mvnw clean package -DskipTests -Dquarkus.kubernetes.deploy=true[INFO] [io.quarkus.kubernetes.deployment.KubernetesDeployer] Deploying to kubernetes server: https://192.168.59.104:8443/ in namespace: kafka.
[INFO] [io.quarkus.kubernetes.deployment.KubernetesDeployer] Applied: Service movie-plays-producer-debezium-server.
[INFO] [io.quarkus.kubernetes.deployment.KubernetesDeployer] Applied: Deployment movie-plays-producer-debezium-server.
[INFO] [io.quarkus.deployment.QuarkusAugmentor] Quarkus augmentation completed in 9537ms

To validate the correctness of the deployment, inspect the Pod’s log so no error is shown and SQL statements are executed correctly:

kubectl get pods -n kafka

NAME                                                         READY   STATUS      RESTARTS   AGE
movie-plays-producer-debezium-server-auth-7cc69fb56c-nc8tx   1/1     Running     0          44s


kubectl logs movie-plays-producer-debezium-server-auth-7cc69fb56c-nc8tx -n kafka

__  ____  __  _____   ___  __ ____  ______
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2022-08-21 21:00:41,277 INFO  [io.deb.out.qua.int.AdditionalJaxbMappingProducerImpl] (main) Contributed XML mapping for entity: io.debezium.outbox.quarkus.internal.OutboxEvent

…
Hibernate:

    create table Movie (
       id bigint not null,
        director varchar(255),
        genre varchar(255),
        name varchar(255),
        primary key (id)
    ) engine=InnoDB
Hibernate:

    create table OutboxEvent (
       id binary(255) not null,
        aggregatetype varchar(255) not null,
        aggregateid varchar(255) not null,
        type varchar(255) not null,
        timestamp datetime(6) not null,
        payload varchar(8000),
        tracingspancontext varchar(256),
        primary key (id)
    ) engine=InnoDB

In the following illustration, you can see the part we correctly secured.


 
Now that the application is running, with the MySQL credentials correctly managed, let’s move on to secure Kafka and Debezium parts.

Kafka

So far, we’ve deployed an open Kafka cluster; no authentication or authorization logic was enabled.

Strimzi allows to deploy of a Kafka cluster with the following authentication mechanisms:

  • SASL SCRAM-SHA-512
  • TLS client authentication
  • OAuth 2.0 token-based authentication

Since the Strimzi Operator is already installed on the Kubernetes cluster, we can use the Kafka custom resource. Kafka resource configures a cluster deployment, and in this case, with TLS client authentication enabled.

Strimzi has options to set a listener in the listeners block to use mTLS as communication protocol (tls=true) and authentication method type (authentication field).

Create a new file named kafka.yaml with the following content to configure a secured Kafka:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
 name: my-cluster
 namespace: kafka
spec:
 kafka:
   version: 3.2.0
   replicas: 1
   listeners:
     - name: demo
       port: 9092
       type: internal
       tls: false
     - name: secure
       port: 9093
       type: internal
       tls: true
       authentication:
         type: tls
   authorization:
     type: simple
   config:
     offsets.topic.replication.factor: 1
     transaction.state.log.replication.factor: 1
     transaction.state.log.min.isr: 1
     default.replication.factor: 1
     min.insync.replicas: 1
     inter.broker.protocol.version: "3.2"
   storage:
     type: ephemeral
 zookeeper:
   replicas: 1
   storage:
     type: ephemeral
 entityOperator:
   topicOperator: {}
   userOperator: {}

And apply it to the Kubernetes cluster:

kubectl apply -f kafka.yaml -n kafka

kafka.kafka.strimzi.io/my-cluster created

Let’s validate the Kafka cluster is up and running:

kubectl get pods -n kafka

NAME                                         READY   STATUS    RESTARTS   AGE
my-cluster-entity-operator-d4db5ff58-rt96n   3/3     Running   0          2m26s
my-cluster-kafka-0                           1/1     Running   0          2m58s
my-cluster-zookeeper-0                       1/1     Running   0          3m31s

Since we set the listener to use TLS, Strimzi has automatically created a Kubernetes Secret with the cluster certificate, pkcs12 truststore, and associated password as data.

kubectl get secrets -n kafka

my-cluster-clients-ca                        Opaque                                1      9m14s
my-cluster-clients-ca-cert                   Opaque                                3      9m14s
my-cluster-cluster-ca                        Opaque                                1      9m14s
my-cluster-cluster-ca-cert                   Opaque                                3      9m14s
my-cluster-cluster-operator-certs            Opaque                                4      9m14s
my-cluster-entity-operator-dockercfg-5wwb5   kubernetes.io/dockercfg               1      8m9s
my-cluster-entity-operator-token-h9xkq       kubernetes.io/service-account-token   4      8m9s
my-cluster-entity-operator-token-npvfc       kubernetes.io/service-account-token   4      8m9s
my-cluster-entity-topic-operator-certs       Opaque                                4      8m9s
my-cluster-entity-user-operator-certs        Opaque                                4      8m8s
my-cluster-kafka-brokers                     Opaque                                4      8m41s
my-cluster-kafka-dockercfg-fgpx2             kubernetes.io/dockercfg               1      8m41s
my-cluster-kafka-token-2x7s8                 kubernetes.io/service-account-token   4      8m41s
my-cluster-kafka-token-6qdgk                 kubernetes.io/service-account-token   4      8m41s
my-cluster-zookeeper-dockercfg-p296g         kubernetes.io/dockercfg               1      9m13s
my-cluster-zookeeper-nodes                   Opaque                                4      9m13s
my-cluster-zookeeper-token-dp9sc             kubernetes.io/service-account-token   4      9m13s
my-cluster-zookeeper-token-gbrxg             kubernetes.io/service-account-token   4      9m13s

The important secret here is the one named <clustername>-cluster-ca-cert (in this case my-cluster-cluster-ca-cert).

List the content of the secret by running the following command in a terminal window:

kubectl get secret my-cluster-cluster-ca-cert -o yaml -n kafka

apiVersion: v1
data:
  ca.crt: LS0tLS1CRUdJTiBDRVJU
  ca.p12: MIIGkwIBAzCCBk==
  ca.password: azJjY2tIMEs1c091
kind: Secret
metadata:
  annotations:
    strimzi.io/ca-cert-generation: "0"
  creationTimestamp: "2022-08-21T19:32:55Z"
  labels:
    app.kubernetes.io/instance: my-cluster
    app.kubernetes.io/managed-by: strimzi-cluster-operator
    app.kubernetes.io/name: strimzi
    app.kubernetes.io/part-of: strimzi-my-cluster
    strimzi.io/cluster: my-cluster
    strimzi.io/kind: Kafka
    strimzi.io/name: strimzi
  name: my-cluster-cluster-ca-cert
  namespace: kafka
  ownerReferences:
  - apiVersion: kafka.strimzi.io/v1beta2
    blockOwnerDeletion: false
    controller: false
    kind: Kafka
    name: my-cluster
    uid: 23c84dfb-bb33-47ed-bd41-b4e87e0a4c3a
  resourceVersion: "49424"
  uid: 6c2679a8-216f-421b-880a-de0e6a0879fa
type: Opaque

Let’s create a user assigned to the mTLS authorization.

Security and Debezium

With Kafka secured, let’s create a KafkaUser resource to set the authorization roles to the groups and topics authenticated for users using the mTLS mode.

Create a new file named kafka-user-connect-all-topics.yaml with the following content:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
 name: my-connect
 namespace: kafka
 labels:
   # Cluster name set previously
   strimzi.io/cluster: my-cluster
spec:
 authentication:
   type: tls
 authorization:
   type: simple
   acls:
   # Kafka Connects internal topics used to store configuration, offsets or status
   - resource:
       type: group
       name: outbox-viewer
     operation: Read
   - resource:
       type: group
       name: outbox-viewer
     operation: Describe
   - resource:
       type: group
       name: mysql-dbhistory
     operation: Read
   - resource:
       type: group
       name: mysql-dbhistory
     operation: Describe
   - resource:
       type: group
       name: connect-cluster
     operation: Read
   - resource:
       type: group
       name: connect-cluster
     operation: Describe
   - resource:
       type: topic
       name: connect-cluster-configs
     operation: Read
   - resource:
       type: topic
       name: connect-cluster-configs
     operation: Describe
   - resource:
       type: topic
       name: connect-cluster-configs
     operation: Write
   - resource:
       type: topic
       name: connect-cluster-configs
     operation: Create
   - resource:
       type: topic
       name: connect-cluster-status
     operation: Read
   - resource:
       type: topic
       name: connect-cluster-status
     operation: Describe
   - resource:
       type: topic
       name: connect-cluster-status
     operation: Write
   - resource:
       type: topic
       name: connect-cluster-status
     operation: Create
   - resource:
       type: topic
       name: connect-cluster-offsets
     operation: Read
   - resource:
       type: topic
       name: connect-cluster-offsets
     operation: Write
   - resource:
       type: topic
       name: connect-cluster-offsets
     operation: Describe
   - resource:
       type: topic
       name: connect-cluster-offsets
     operation: Create
   - resource:
       type: group
       name: connect-cluster
     operation: Read
   # Debezium topics
   - resource:
       type: topic
       name: "*"
     operation: Read
   - resource:
       type: topic
       name: "*"
     operation: Describe
   - resource:
       type: topic
       name: "*"
     operation: Write
   - resource:
       type: topic
       name: "*"
     operation: Create

Apply the resource in a terminal window:

kubectl apply -f kafka-user-connect-all-topics.yaml -n kafka
kafkauser.kafka.strimzi.io/my-connect created

After registering this Kafka user, Strimzi creates a new secret with the same name as the KafkaUser resource (my-connect) with the pkcs12 keystore holding the client's private key and the password to access it.

kubectl get secret my-connect -n kafka -o yaml

apiVersion: v1
data:
  ca.crt: LS0tLS1CK
  user.crt: LS0tLS1CRUdJTiB==
  user.key: LS0tLS1CRUdJTiBQUklWQVRK
  user.p12: MIILNAIBAzCAA==
  user.password: UUR4Nk5NemsxUVFF
kind: Secret
metadata:
  creationTimestamp: "2022-08-21T20:12:44Z"
  labels:
    app.kubernetes.io/instance: my-connect
    app.kubernetes.io/managed-by: strimzi-user-operator
    app.kubernetes.io/name: strimzi-user-operator
    app.kubernetes.io/part-of: strimzi-my-connect
    strimzi.io/cluster: my-cluster
    strimzi.io/kind: KafkaUser
  name: my-connect
  namespace: kafka
  ownerReferences:
  - apiVersion: kafka.strimzi.io/v1beta2
    blockOwnerDeletion: false
    controller: false
    kind: KafkaUser
    name: my-connect
    uid: 882447cc-7759-4884-9d2f-f57f8be92711
  resourceVersion: "60439"
  uid: 9313676f-3417-42d8-b3fb-a1b1fe1b3a39
type: Opaque

So now, we’ve got a new Kafka user with required permissions to use the required Kafka topics.

Before deploying the Debezium Kafka Connector, let’s permit the Kafka Connector object to read MySQL secrets directly from the mysqlsecret Secret object using the Kubernetes API (like we did in the application) so the Connector can authenticate the database to read the transaction log.

Create the kafka-role-binding.yaml file with the following content:

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
 name: connector-configuration-role
 namespace: kafka
rules:
- apiGroups: [""]
 resources: ["secrets"]
 resourceNames: ["mysqlsecret", "my-connect", "my-cluster-cluster-ca-cert"]
 verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
 name: connector-configuration-role-binding
 namespace: kafka
subjects:
- kind: ServiceAccount
 name: debezium-connect-cluster-connect
 namespace: kafka
roleRef:
 kind: Role
 name: connector-configuration-role
 apiGroup: rbac.authorization.k8s.io

Notice that the name under the subjects block is the service account running the Debezium Kafka Connect Pod. We’ve not deployed the Pod yet, but when deploying a Kafka Connect component, the service account created follows the format $KafkaConnectName-connect. Since the Debezium Kafka Connect will be named debezium-connect-cluster-connect, the service account created will be my-connect-connect, and we give permissions to this account to read Kubernetes  Secrets directly.

Apply the kafka-role-binding.yaml before deploying the Debezium Kafka Connect:

kubectl apply -f kafka-role-binding.yaml -n kafka

role.rbac.authorization.k8s.io/connector-configuration-role created
rolebinding.rbac.authorization.k8s.io/connector-configuration-role-binding created

The following illustration summarizes the current secured communications:

To deploy the Debezium Kafka Connect, we’ll use the KafkaConnect object again provided by Strimzi but with some changes to authenticate against the Kafka cluster and enable reading configuration parameters from Kubernetes Secrets (the main purpose is to read MySQL credentials to authenticate from Debezium).

The following fields are configured:

  • The port is now 9093.
  • mTLS certificate is set to communicate with the cluster (tls field).
  • The certificate and key user are set to authenticate against the cluster (authentication field).
  • config.providers is set to read configuration from Kubernetes Secrets in the MySQL Connector.
  • The externalConfiguration section is used to materialize the truststores and keystores from secrets to a file. They are materialized in the /opt/kafka/external-configuration/<secretName> directory. They are accessed by MySQL connector.

Create the kafka-connect.yaml file as shown in the following listing:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
 name: debezium-connect-cluster
 namespace: kafka
 annotations:
   strimzi.io/use-connector-resources: "true"
spec:
 version: 3.2.0
 image: quay.io/lordofthejars/debezium-connector-mysql:1.9.4
 replicas: 1
 bootstrapServers: my-cluster-kafka-bootstrap:9093
 logging:
   type: inline
   loggers:
     connect.root.logger.level: "INFO"
 tls:
   trustedCertificates:
     - secretName: my-cluster-cluster-ca-cert
       certificate: ca.crt
 authentication:
   type: tls
   certificateAndKey:
     secretName: my-connect
     certificate: user.crt
     key: user.key
 config:
   config.providers: secrets
   config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
   group.id: connect-cluster
   offset.storage.topic: connect-cluster-offsets
   offset.storage.replication.factor: 1
   config.storage.topic: connect-cluster-configs
   config.storage.replication.factor: 1
   status.storage.topic: connect-cluster-status
   status.storage.replication.factor: 1
 externalConfiguration:
   volumes:
     - name: cluster-ca
       secret:
         secretName: my-cluster-cluster-ca-cert
     - name: my-user
       secret:
         secretName: my-connect

The trustedCertificates are set from the secret created when the Kafka cluster was deployed using the Kafka object.

The certificateAndKey, under the authentication block, is set from the secret created when KafkaUser was registered.

Deploy the resource and validate it’s correctly deployed and configured:

kubectl apply -f kafka-connect.yaml -n kafka
kafkaconnect.kafka.strimzi.io/debezium-connect-cluster created

Create a new file named debezium-kafka-connector.yaml configuring Debezium to register the MySQL connector to access the transaction log of MySQL instance. In this case, we are not using plain text username and password in the connector configuration but referring to the Secret object we previously created with MySQL credentials. The format to access to the Secret is secrets:<namespace>/<secretname>:<key>. Moreover, it reads the trust stores and key stores materialized when you applied the KafkaConnect definition.


apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
 name: debezium-connector-mysql
 namespace: kafka
 labels:
   strimzi.io/cluster: debezium-connect-cluster
spec:
 class: io.debezium.connector.mysql.MySqlConnector
 tasksMax: 1
 config:
   group.id: connect-cluster
   tasks.max: 1
   database.hostname: mysql
   database.port: 3306
   database.user: root
   database.password: ${secrets:kafka/mysqlsecret:mysqlpassword}
   database.server.id: 184054
   database.server.name: mysql
   database.include.list: moviesdb
   database.allowPublicKeyRetrieval: true
   table.include.list: moviesdb.OutboxEvent
   database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9093
   database.history.kafka.topic: schema-changes.movies
   database.history.producer.security.protocol: SSL
   database.history.producer.ssl.keystore.type: PKCS12
   database.history.producer.ssl.keystore.location: /opt/kafka/external-configuration/my-user/user.p12
   database.history.producer.ssl.keystore.password: ${secrets:kafka/my-connect:user.password}
   database.history.producer.ssl.truststore.type: PKCS12
   database.history.producer.ssl.truststore.location: /opt/kafka/external-configuration/cluster-ca/ca.p12
   database.history.producer.ssl.truststore.password: ${secrets:kafka/my-cluster-cluster-ca-cert:ca.password}

   database.history.consumer.security.protocol: SSL
   database.history.consumer.ssl.keystore.type: PKCS12
   database.history.consumer.ssl.keystore.location: /opt/kafka/external-configuration/my-user/user.p12
   database.history.consumer.ssl.keystore.password: ${secrets:kafka/my-connect:user.password}
   database.history.consumer.ssl.truststore.type: PKCS12
   database.history.consumer.ssl.truststore.location: /opt/kafka/external-configuration/cluster-ca/ca.p12
   database.history.consumer.ssl.truststore.password: ${secrets:kafka/my-cluster-cluster-ca-cert:ca.password}

Apply the file to register the MySQL connector running the following command in a terminal window:

kubectl apply -f kafka-connector.yaml -n kafka
kafkaconnector.kafka.strimzi.io/debezium-connector-mysql created

Finally, all the communications are secured.

Demo

And that’s all, now we’ve got the same example shown in the previous article, but now it’s secured.

To test it, there is a Quarkus application named outbox-viewer that prints all the content of the OutboxEvent topic to the console. Apply the following YAML file to deploy it:

---
apiVersion: v1
kind: ServiceAccount
metadata:
 annotations:
   app.quarkus.io/commit-id: ebe139afdc9f7f956725af5c5a92cf3c03486bca
   app.quarkus.io/build-timestamp: 2022-08-23 - 11:14:36 +0000
 labels:
   app.kubernetes.io/name: outbox-viewer
   app.kubernetes.io/version: 1.0.0-SNAPSHOT
 name: outbox-viewer
 namespace: kafka
---
apiVersion: v1
kind: Service
metadata:
 annotations:
   app.quarkus.io/commit-id: ebe139afdc9f7f956725af5c5a92cf3c03486bca
   app.quarkus.io/build-timestamp: 2022-08-23 - 11:14:36 +0000
 labels:
   app.kubernetes.io/name: outbox-viewer
   app.kubernetes.io/version: 1.0.0-SNAPSHOT
 name: outbox-viewer
 namespace: kafka
spec:
 ports:
   - name: http
     port: 80
     targetPort: 8080
 selector:
   app.kubernetes.io/name: outbox-viewer
   app.kubernetes.io/version: 1.0.0-SNAPSHOT
 type: ClusterIP
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
 name: view-secrets
 namespace: kafka
rules:
 - apiGroups:
     - ""
   resources:
     - secrets
   verbs:
     - get
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
 name: outbox-viewer-view
 namespace: kafka
roleRef:
 kind: ClusterRole
 apiGroup: rbac.authorization.k8s.io
 name: view
subjects:
 - kind: ServiceAccount
   name: outbox-viewer
   namespace: kafka
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
 name: outbox-viewer-view-secrets
 namespace: kafka
roleRef:
 kind: Role
 apiGroup: rbac.authorization.k8s.io
 name: view-secrets
subjects:
 - kind: ServiceAccount
   name: outbox-viewer
   namespace: kafka
---
apiVersion: apps/v1
kind: Deployment
metadata:
 annotations:
   app.quarkus.io/commit-id: ebe139afdc9f7f956725af5c5a92cf3c03486bca
   app.quarkus.io/build-timestamp: 2022-08-23 - 11:14:36 +0000
 labels:
   app.kubernetes.io/name: outbox-viewer
   app.kubernetes.io/version: 1.0.0-SNAPSHOT
 name: outbox-viewer
 namespace: kafka
spec:
 replicas: 1
 selector:
   matchLabels:
     app.kubernetes.io/name: outbox-viewer
     app.kubernetes.io/version: 1.0.0-SNAPSHOT
 template:
   metadata:
     annotations:
       app.quarkus.io/commit-id: ebe139afdc9f7f956725af5c5a92cf3c03486bca
       app.quarkus.io/build-timestamp: 2022-08-23 - 11:14:36 +0000
     labels:
       app.kubernetes.io/name: outbox-viewer
       app.kubernetes.io/version: 1.0.0-SNAPSHOT
     namespace: kafka
   spec:
     containers:
       - env:
           - name: KUBERNETES_NAMESPACE
             valueFrom:
               fieldRef:
                 fieldPath: metadata.namespace
         image: quay.io/lordofthejars/outbox-viewer:1.0.0-SNAPSHOT
         imagePullPolicy: Always
         name: outbox-viewer
         ports:
           - containerPort: 8080
             name: http
             protocol: TCP
         volumeMounts:
           - mountPath: /home/jboss/cluster
             name: cluster-volume
             readOnly: false
           - mountPath: /home/jboss/user
             name: user-volume
             readOnly: false
     serviceAccountName: outbox-viewer
     volumes:
       - name: cluster-volume
         secret:
           optional: false
           secretName: my-cluster-cluster-ca-cert
       - name: user-volume
         secret:
           optional: false
           secretName: my-connect

Then in one terminal window, follow the logs of the application’s Pod.

kubectl logs outbox-viewer-684969f9f6-7snng -f

Substitute the Pod name with your Pod.

Find the IP and port of the Movie Player Producer application running the following commands in a terminal:

minikube ip -p strimzi

192.168.59.106

Gets the exposed port of the movie-plays-producer-debezium, which is the second port (in bold in the following snippet).

kubectl get services -n kafka

movie-plays-producer-debezium   LoadBalancer   10.100.117.203   <pending>     80:32460/TCP                 67m

The, send a curl request to the Movie Plays Producer application:

curl -X 'POST' \
  'http://192.168.59.106:32460/movie' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "name": "Minions: The Rise of Gru",
  "director": "Kyle Balda",
  "genre": "Animation"
}'

Adapt the IP and port to your case.

Finally, inspect the output of the outbox-viewer Pod to see the transmission of the data from the database to Kafka using the Debezium Server approach.

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"bytes","optional":false,"field”
…
,"aggregatetype":"Movie","aggregateid":"1","type":"MovieCreated","timestamp":1661339188708005,"payload":"{\"id\":1,\"name\":\"Minions: The Rise of Gru\",\"director\":\"Kyle Balda\",\"genre\":\"Animation\"}","tracingspancontext":null},"source":{"version":"1.9.4.Final","connector":"mysql","name":"mysql","ts_ms":1661339188000,"snapshot":"false","db":"moviesdb","sequence":null,"table":"OutboxEvent","server_id":1,"gtid":null,"file":"binlog.000002","pos":2967,"row":0,"thread":15,"query":null},"op":"c","ts_ms":1661339188768,"transaction":null}}

Debezium Embedded

So far, we’ve secured the interactions between the Application and the MySQL database, Debezium Server and MySQL, Debezium Server and Kafka.

But you might wonder, what happens if instead of using Debezium Server, I am using Debezium Embedded deployed within the Quarkus application?  How can I configure the Kafka connection to be secured using the mTLS method?

Quarkus offers two ways to connect to Kafka, using the Kafka client or the Reactive Message client; let’s see the properties required in both cases to authenticate to a Kafka cluster using the mTLS authentication method.

KeyStore and TrustStore

To configure mTLS on the client side, four elements are required:

  • Cluster TrustStore to make the mTLS connection
  • TrustStore password
  • Kafka User KeyStore to authenticate
  • KeyStore password

The first two elements are stored in the my-cluster-cluster-ca-cert Kubernetes Secret created before when we applied the Strimzi resources. To get them, run the following commands in a terminal window:

kubectl get secret my-cluster-cluster-ca-cert -n kafka -o jsonpath='{.data.ca\.p12}' | base64 -d > mtls-cluster-ca.p12

And the password:

kubectl get secret my-cluster-cluster-ca-cert -n kafka -o jsonpath='{.data.ca\.password}' | base64 -d
k2cckH0K5sOu

The later elements are stored in the my-connect Kubernetes Secret. To get them, run the following commands in a terminal window:

kubectl get secret my-connect -n kafka -o jsonpath='{.data.user\.p12}' | base64 -d > mtls-user.p12

And the password:

kubectl get secret my-connect -n kafka -o jsonpath='{.data.user\.password}' | base64 -d
QDx6NMzk1QQE

Now, set the Quarkus Kafka configuration properties to authenticate to the Kafka cluster using the previous credentials:

%prod.kafka.ssl.truststore.location=mtls-cluster-ca.p12
%prod.kafka.ssl.truststore.password=k2cckH0K5sOu
%prod.kafka.ssl.truststore.type=PKCS12
%prod.kafka.ssl.keystore.location=mtls-user.p12
%prod.kafka.ssl.keystore.password=QDx6NMzk1QQE
%prod.kafka.ssl.keystore.type=PKCS12
%prod.kafka.security.protocol=SSL

%prod.mp.messaging.incoming.movies.ssl.truststore.location=mtls-cluster-ca.p12
%prod.mp.messaging.incoming.movies.ssl.truststore.password=k2cckH0K5sOu
%prod.mp.messaging.incoming.movies.ssl.truststore.type=PKCS12
%prod.mp.messaging.incoming.movies.ssl.keystore.location=mtls-user.p12
%prod.mp.messaging.incoming.movies.ssl.keystore.password=QDx6NMzk1QQE
%prod.mp.messaging.incoming.movies.ssl.keystore.type=PKCS12
%prod.mp.messaging.incoming.movies.security.protocol=SSL

We could use the Quarkus Kubernetes Config extension as done with MySQL credentials to inject the credentials directly, but for the sake of simplification, we did it in this way.

But in terms of security, there is still one missing important point: how do we correctly store secrets inside a YAML file, and how do we keep secrets at rest securely inside a Kubernetes cluster?

Encryption of Secrets

At the beginning of this article, we created a Kubernetes Secret object with the MySQL credentials, but it was a YAML file with sensitive information encoded in Base64 format, so that is not very secure at all. Probably this YAML file will end up in a Git repository making the secrets available to anyone with access to the repo. In the following section, we’ll fix this.

Sealed Secrets

Sealed Secrets is a Kubernetes controller permitting to encrypt Kubernetes Secrets resources at the client side (local machine) and decrypting them inside the Kubernetes cluster when applied.

There are two components to start using the Sealed Secrets project. The first one is the kubeseal CLI tool to encrypt secrets.

To install kubeseal, download the package depending on your operative system from the following link.

The second one is the kubeseal Kubernetes controller. To install it, run the following command on the command line:

kubectl apply -f https://github.com/bitnami-labs/sealed-secrets/releases/download/v0.18.1/controller.yaml -n kube-system

role.rbac.authorization.k8s.io/sealed-secrets-service-proxier created
clusterrole.rbac.authorization.k8s.io/secrets-unsealer created
deployment.apps/sealed-secrets-controller created
customresourcedefinition.apiextensions.k8s.io/sealedsecrets.bitnami.com created
service/sealed-secrets-controller created
role.rbac.authorization.k8s.io/sealed-secrets-key-admin created
clusterrolebinding.rbac.authorization.k8s.io/sealed-secrets-controller created
serviceaccount/sealed-secrets-controller created
rolebinding.rbac.authorization.k8s.io/sealed-secrets-service-proxier created
rolebinding.rbac.authorization.k8s.io/sealed-secrets-controller created

Inspect that controller has correctly deployed and running by running the following command:

kubectl  get pods -n kube-system

sealed-secrets-controller-554d94cb68-xr6mw                                1/1     Running   0          8m46s

After that, we can take the mysql-secret.yaml file and use kubeseal tool to automatically create a new Kubernetes resource of the kind SealedSecret with the data field encrypted.

kubeseal -n kube -o yaml <mysql-secret.yaml > mysql-secret-encrypted.yaml

The new file, named mysql-secret-encrypted.yaml, is of kind SealedSecret with the value of each key encrypted:

apiVersion: bitnami.com/v1alpha1
kind: SealedSecret
metadata:
 creationTimestamp: null
 name: mysqlsecret
 namespace: kube
spec:
 encryptedData:
   mysqlpassword: AgBl721mnowwPlC35FfO26zP0
   mysqlrootpassword: AgAKl1tWV8hahn00yGS4ucs
   mysqluser: AgCWrWFl1/LcS
template:
   data: null
   metadata:
     creationTimestamp: null
     name: mysqlsecret
     namespace: kafka
   type: Opaque

At this point, you can safely remove the mysql-secret.yaml file as it’s not required anymore.

Apply the encrypted resource as any other Kubernetes resource file, and the Sealed Secrets Kubernetes controller will decrypt and store it correctly inside Kubernetes as a normal Secret.

You can validate the Secret by running the following command:

kubectl  get secret mysqlsecret -n kafka -o yaml

apiVersion: v1
data:
  mysqlpassword: YWxleA==
  mysqlrootpassword: YWxleA==
  mysqluser: YWxleA==
kind: Secret
metadata:
  creationTimestamp: "2022-08-21T19:05:21Z"
  name: mysqlsecret
  namespace: kafka
  ownerReferences:
  - apiVersion: bitnami.com/v1alpha1
    controller: true
    kind: SealedSecret
    name: mysqlsecret
    uid: 2a5ee74b-c2b2-49b3-9a9f-877e7a77b163
  resourceVersion: "41514"
  uid: 494cbe8b-7480-4ebd-9cc5-6fe396795eaa
type: Opaque

It’s important to note that it’s a decrypted Kubernetes Secret having a reference to the SealedSecret responsible for its creation. In this way, the lifecycle of the SealedSecret is tight to the Secret too.

We’ve fixed the problem of storing the YAML file correctly without revealing sensitive data, but when the Secret is applied to the Kubernetes cluster, it’s stored in Base64 encoding format, so it's not secret.

Secrets at Rest

By default, Kubernetes doesn’t store secrets encrypted at rest in the etcd database. Encrypting Secret Data at Rest is a huge topic that would deserve its  own post (in fact there is a book Kubernetes Secret Management dedicated to this topic). Every Kubernetes implementation might have different ways to enable encryption of secrets at rest, although at the very end, it’s a configuration file (EncryptionConfiguration), copied inside every kube-apiserver node.

This file is the form of:

apiVersion: apiserver.config.k8s.io/v1
kind: EncryptionConfiguration
resources:
 - resources:
     - secrets
   providers:
     - identity: {}
     - aesgcm:
         keys:
           - name: key1
             secret: c2VjcmV0IGlzIHNlY3VyZQ==
           - name: key2
             secret: dGhpcyBpcyBwYXNzd29yZA==
     - aescbc:
         keys:
           - name: key1
             secret: c2VjcmV0IGlzIHNlY3VyZQ==
           - name: key2
             secret: dGhpcyBpcyBwYXNzd29yZA==
     - secretbox:
         keys:
           - name: key1
             secret: YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoxMjM0NTY=

In the following image, we can see the flow of a secret when an EncryptionConfiguration file is registered in the kube-apiserver.

Now, secrets are encrypted at the YAML file thanks to the SealedSecrets object and also protected when at rest using the EncryptionConfiguration file.

Conclusions

Securing all the infrastructure is important, and we’ve learned in this article how to secure the access to the database and to Kafka using Kubernetes Secrets.

With Strimzi we are able to define not only the authentication part, but also the authorization part providing some rules on who can do what regarding Kafka topics.

Accessing these secrets is also an important part, and Quarkus and Debezium let you access those secrets in an efficient yet secured way, without persisting the secret in the filesystem (or as environment variable) but injecting them directly into memory.

Security is an important topic, and Strimzi is the perfect match when it’s time to manage it in a Kafka cluster.

The source code is available on GitHub.

About the Author

BT