BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Moving Kafka and Debezium to Kubernetes Using Strimzi - the GitOps Way

Moving Kafka and Debezium to Kubernetes Using Strimzi - the GitOps Way

Key Takeaways

  • Strimzi simplifies the deployment of a Kafka cluster to a Kubernetes cluster.
  • Strimzi is a must to manage Kafka using GitOps methodology.
  • Debezium Embedded can be easily replaced to Debezium Server and maintain the application without Debezium/Kafka dependencies.
  • It is recommended to use Debezium Server for brownfield projects as it requires no modification of the original code.
     

In part 1 and part 2 of this series, we learned about the integration between Apache KafkaApache Kafka Streams, and Quarkus, where we developed a simple application producing events to a Kafka topic and consuming and processing them in real-time with Kafka Streams.

In that example, we simulated a Movies streaming company. We stored Movies in one Kafka topic and, in another Kafka topic, held each occurrence when a user stopped watching a movie and captured the time it had been played. We post-processed these events in real-time to count the number of times a movie is played for more than 10 minutes.

The following figure shows the architecture of the application:

Then in part 3, we covered the Outbox pattern and Debezium to avoid the dual-writes problem that occurs when different systems need to synchronize the same data.

In all three previous articles, we’ve been learning all these technologies from the developer's point of view, and we end up deploying the application locally (in the dev mode) on the developer’s machine.

In this article, we’ll explore how to put everything in production; more concretely, in Kubernetes, we’ll learn how to:

  • Install and manage an Apache Kafka cluster in a Kubernetes cluster.
  • Containerize a Quarkus application.
  • Configure a Quarkus application with production parameters.
  • Migrate from using Debezium Embedded to Debezium Server.

Kubernetes

Kubernetes is an open-source container orchestrator, becoming the de-facto platform to deploy microservices that can be run either in bare-metal or in the cloud.

This article uses minikube as a Kubernetes cluster, but the same steps should be valid in any other implementation.

Starting the Cluster

Execute the following command in a terminal window to start the cluster in a VirtualBox machine with 8GB of memory and two vCPUs:

minikube start -p strimzi --kubernetes-version='v1.22.12' --vm-driver='virtualbox' --memory=8096

  [strimzi] minikube v1.24.0 on Darwin 12.5
  minikube 1.26.1 is available! Download it: https://github.com/kubernetes/minikube/releases/tag/v1.26.1
  To disable this notice, run: 'minikube config set WantUpdateNotification false'

✨  Using the virtualbox driver based on user configuration
  Starting control plane node strimzi in cluster strimzi
  Creating virtualbox VM (CPUs=2, Memory=8096MB, Disk=20000MB) ...
    > kubelet.sha256: 64 B / 64 B [--------------------------] 100.00% ? p/s 0s
    > kubeadm.sha256: 64 B / 64 B [--------------------------] 100.00% ? p/s 0s
    > kubectl.sha256: 64 B / 64 B [--------------------------] 100.00% ? p/s 0s
    > kubeadm: 43.74 MiB / 43.74 MiB [-------------] 100.00% 13.98 MiB p/s 3.3s
    > kubectl: 44.77 MiB / 44.77 MiB [-------------] 100.00% 11.11 MiB p/s 4.2s
    > kubelet: 115.30 MiB / 115.30 MiB [-----------] 100.00% 20.16 MiB p/s 5.9s

    ▪ Generating certificates and keys ...
    ▪ Booting up control plane ...
    ▪ Configuring RBAC rules ...
    ▪ Using image gcr.io/k8s-minikube/storage-provisioner:v5
  Verifying Kubernetes components...
  Enabled addons: storage-provisioner, default-storageclass

❗  /usr/local/bin/kubectl is version 1.24.0, which may have incompatibilites with Kubernetes 1.22.12.
    ▪ Want kubectl v1.22.12? Try 'minikube kubectl -- get pods -A'
  Done! kubectl is now configured to use "strimzi" cluster and "default" namespace by default

To check if the Kubernetes cluster is up and running, run the following two commands in a terminal window:

kubectl get nodes

NAME      STATUS   ROLES                  AGE    VERSION
strimzi   Ready    control-plane,master   3m4s   v1.22.12

kubectl get pods
No resources found in default namespace.

Apache Kafka

In all past articles, we used the Quarkus dev mode to leverage Quarkus to boot-up all external dependencies required to run the application (Kafka cluster and MySQL database). Although dev mode is fantastic from the development point of view when it’s time to deploy to production, you might find things more complicated to manage. The first barrier might be installing and configuring a Kafka cluster into a Kubernetes because of the nature of a Kafka cluster.

Some of the questions you might wonder are: 

  • Which container image do I use for all Kafka components (Kafka, Zookeeper, etc.)?
  • How do I deploy all these components easily in Kubernetes?
  • How do I create users, topics, or HA in a Kubernetes way?
  • And what about security?

You might try to do all these things by hand, such as crafting long YAML files and using Kafka CI tooling to configure Kafka parts. However, there is another way that is Kubernetes-native, fully automated and reproducible (perfect for CI/CD); and it’s using Strimzi.

Strimzi

Strimzi is a Kubernetes operator that installs a controller to create, configure and secure a Kafka cluster like any other Kubernetes resource such as a Pod, Deployment, ConfigMap, etc.

The Strimzi project comprises three operators: one for the Kafka cluster, another for the topics, and the last for user management.

By having the Strimzi operators installed in the Kubernetes cluster, you only need to apply the following YAML file to have a Kafka cluster up and running with one replica of Kafka and three replicas of the ZooKeeper with ephemeral storage (no persistent volume attached).

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
 name: my-cluster
spec:
 kafka:
   version: 3.2.0
   replicas: 1
   listeners:
     - name: plain
       port: 9092
       type: internal
       tls: false
     - name: tls
       port: 9093
       type: internal
       tls: true
   config:
     offsets.topic.replication.factor: 1
     transaction.state.log.replication.factor: 1
     transaction.state.log.min.isr: 1
     default.replication.factor: 1
     min.insync.replicas: 1
     inter.broker.protocol.version: "3.2"
   storage:
     type: ephemeral
 zookeeper:
   replicas: 3
   storage:
     type: ephemeral
 entityOperator:
   topicOperator: {}
   userOperator: {}

Let’s install Strimzi in the cluster we already started.

Installing Strimzi

The first thing is to create a namespace to install Strimzi operators; in this case, the kafka namespace is used. In a terminal window, run the following command:

kubectl create namespace kafka
namespace/kafka created

Next, apply the Strimzi install files, which include among other things, the CRDs (CustomerResourceDefinition) to use for declarative management of the Kafka cluster, Kafka topics, and users.

kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka

Validate the operator installation by running the following command:

kubectl get pods -n kafka

NAME                                        READY   STATUS    RESTARTS   AGE
strimzi-cluster-operator-597d67c7d6-ms987   1/1     Running   0          4m27s

We are now ready to create the Kafka cluster with the movie topic. This is the topic where we store all movies to be processed later on by Kafka Streams, as we saw in part 2 of this series of articles about Kafka.

Creating the Kafka Cluster

Create a new file (i.e., kafka.yaml) to install a Kafka cluster with one replica, no TLS, and internal as the type of Kubernetes service to access the Kafka cluster.

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
 name: my-cluster
spec:
 kafka:
   version: 3.2.0
   replicas: 1
   listeners:
     - name: plain
       port: 9092
       type: internal
       tls: false
   config:
     offsets.topic.replication.factor: 1
     transaction.state.log.replication.factor: 1
     transaction.state.log.min.isr: 1
     default.replication.factor: 1
     min.insync.replicas: 1
     inter.broker.protocol.version: "3.2"
   storage:
     type: ephemeral
 zookeeper:
   replicas: 1
   storage:
     type: ephemeral
 entityOperator:
   topicOperator: {}
   userOperator: {}

Then use the kubectl command in a terminal window to create this resource:

kubectl create -f kafka.yaml -n kafka
kafka.kafka.strimzi.io/my-cluster created

At this point, Strimzi starts installing a Kafka cluster in the default namespace.

Now, get the default namespace pods to check the cluster's creation.

kubectl get pods -n kafka

NAME                                          READY   STATUS    
my-cluster-entity-operator-755596449b-cw82g   3/3     Running   
my-cluster-kafka-0                            1/1     Running 
my-cluster-zookeeper-0                        1/1     Running

The Kafka cluster is up and running. Apart from installing it as any other Kubernetes resource, we can also query the resource or describe it. For example, run the following commands in a terminal window:

kubectl get kafka -n kafka

NAME         DESIRED KAFKA REPLICAS   DESIRED ZK REPLICAS   READY   WARNINGS
my-cluster   1                        1                     True    True


kubectl describe kafka my-cluster -n kafka

Name:         my-cluster
Namespace:    default
Labels:       <none>
Annotations:  <none>
API Version:  kafka.strimzi.io/v1beta2
Kind:         Kafka
Metadata:
  Creation Timestamp:  2022-08-09T10:57:39Z
…

And of course, you can delete it as any other Kubernetes resource. Moreover, four Kubernetes services are created to access the Kafka cluster:

kubectl get services -n kafka

NAME                          TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                      AGE
               143m
my-cluster-kafka-bootstrap    ClusterIP   172.30.77.150   <none>        9091/TCP,9092/TCP            21m
my-cluster-kafka-brokers      ClusterIP   None            <none>        9090/TCP,9091/TCP,9092/TCP   21m
my-cluster-zookeeper-client   ClusterIP   172.30.5.186    <none>        2181/TCP                     21m
my-cluster-zookeeper-nodes    ClusterIP   None            <none>        2181/TCP,2888/TCP,3888/TCP   21m

The application's important service for accessing the cluster is the my-cluster-kafka-bootstrap service, which exposes the 9092 Kafka port.

Before jumping to the application, let’s create and configure the movies topic using another YAML file.

Creating the Movie's Topic

Strimzi has an operator for the creation and management of topics. To create a new topic, we create a Kubernetes resource file of the kind KafkaTopic, setting the topic's name and the name of the cluster (my-cluster in our case) in the strimzi.io/cluster label. Let’s create a new file named movies-topic.yaml with the following content:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
 name: movies
 labels:
   strimzi.io/cluster: my-cluster
spec:
 partitions: 1
 replicas: 1
 config:
   retention.ms: 7200000
   segment.bytes: 1073741824

And apply the resource:

kubectl apply -f movies-topic.yaml -n kafka
kafkatopic.kafka.strimzi.io/movies create

And as with any other Kubernetes resource, we can query and describe it as well.

kubectl get kafkatopic -n kafka

NAME                                                                                               CLUSTER      PARTITIONS   REPLICATION FACTOR   READY
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a                                        my-cluster   50           1                    True
movies                                                                                             my-cluster   1            1                    True
strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55                                     my-cluster   1            1                    True
strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b   my-cluster   1            1                    True

And describing the created topic:

kubectl port-forward -n kafka service/my-cluster-kafka-bootstrap 9092:9092

Forwarding from 127.0.0.1:9092 -> 9092
Forwarding from [::1]:9092 -> 9092

Let’s check if the topic has been created by port forwarding the service to the local computer.

Run the following command in a terminal window:

kubectl port-forward -n kafka service/my-cluster-kafka-bootstrap 9092:9092

Forwarding from 127.0.0.1:9092 -> 9092
Forwarding from [::1]:9092 -> 9092

Open a new terminal window and use the kcat tool to list the elements of the Kafka cluster. We can use localhost as hostname as we did the port forwarding trick in the previous step.

kcat -b localhost:9092 -L

Metadata for all topics (from broker -1: localhost:9092/bootstrap):
 1 brokers:
  broker 0 at my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc:9092 (controller)
 4 topics:
  topic "movies" with 1 partitions:
    partition 0, leader 0, replicas: 0, isrs: 0

Finally, stop the port forwarding process, and let’s containerize the project, as we did in part 3 of this series, and configure it accordingly to connect to the Kafka cluster.

Movie Plays Producer Debezium

In the service we developed in part 3, we dealt with the dual-writes problem, fixing it using Debezium (concretely Debezium Embedded) by listening to the transaction logs from the MySQL server and producing a Kafka event with data content every time a new movie was inserted. But you run the example in the local machine, using dev services to boot up the required services (MySQL and Kafka) and automatically configuring the application to connect to them. 

Things are a bit different; the service must run in a Kubernetes cluster, with the Kafka cluster created in the previous steps and a MySQL database. Three changes are required to make it run in Kubernetes:

  • Configure the service with the new Kafka and MySQL parameters (hostname, port, username, password).
  • Containerize the application and push it to a container registry.
  • Create Kubernetes resource files to deploy the service.

Configure The Service

The first thing to configure is the Kafka broker hostname and port that point to the Kubernetes service created by Strimzi. Open the src/main/resources/application.properties file and append the following line:

%prod.kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092

INFO: The %prod prefix indicates that the property is only used when the application runs in prod mode (not in dev or test).

The second thing is to configure the database connection where movies are inserted. In the application.properties file, append the following lines:

quarkus.hibernate-orm.database.generation=drop-and-create
%prod.quarkus.datasource.username=alex
%prod.quarkus.datasource.password=alex
%prod.quarkus.datasource.jdbc.url=jdbc:mysql://mysql:3306/moviesdb

Later on, we’ll deploy a MySQL instance with these parameters; for now, just assume configuration parameters are correct.

Containerize

To create a container, Quarkus offers an integration with the Jib project, making the build and push of a container image as simple as running a simple Maven/Gradle task.

Open the pom.xml file and add in the dependencies section the following dependency:

<dependency>
     <groupId>io.quarkus</groupId>
     <artifactId>quarkus-container-image-jib</artifactId>
</dependency>

With the Jib extension registered, it will automatically containerize the application when packaged. Because some default values of Jib may not work in every case, you can override them in the src/main/resources/application.properties file. For this example, let’s override the group and the container registry host part of the generated container image name.

Open the application.properties file and add the following lines:

# Substitue the value with your account name
quarkus.container-image.group=lordofthejars
 
# Defaults to Docker.io, overriden to Quay.
quarkus.container-image.registry=quay.io

IMPORTANT: You need to set the credentials to the container registry to push the container; you can run the docker login command before running the build. Maven will read credentials from there, or you can use the quarkus.container-image.username and quarkus.container-image.password properties.

Run the following command in the root directory of the project to build the application, construct a container and push it to the defined container registry:

./mvnw clean package -DskipTests -Dquarkus.container-image.push=true

[INFO] Scanning for projects...
[INFO]
[INFO] ---------------< org.acme:movie-plays-producer-debezium >---------------
[INFO] Building movie-plays-producer-debezium 1.0.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ movie-plays-producer-debezium ---
[INFO] Deleting /Users/asotobu/git/quarkus-integrating-kafka/strimzi/movie-plays-producer-debezium/target
…
[INFO] [io.quarkus.container.image.jib.deployment.JibProcessor] Using base image with digest: sha256:1a2fddacdcda67494168749c7ab49243d06d8fbed34abab90566d81b94f5e1a5
[INFO] [io.quarkus.container.image.jib.deployment.JibProcessor] Container entrypoint set to [java, -Djava.util.logging.manager=org.jboss.logmanager.LogManager, -jar, quarkus-run.jar]
[INFO] [io.quarkus.container.image.jib.deployment.JibProcessor] Pushed container image quay.io/lordofthejars/movie-plays-producer-debezium:1.0.0-SNAPSHOT (sha256:73dfe42d53f8d7e3c268dbebc2e5f866596de33b8fcaf82c27bdd414d28bdb8a)

As you can see in the last log line, the container is created and pushed to the registry with the account name set in the application.properties file, but relying on defaults for the container name (artifactId value) and tag (version value).

Kubernetes

After pushing the container, we’re ready to deploy the service to Kubernetes. We could manually create the Kubernetes resource files, but it’s unnecessary as Quarkus has a Kubernetes extension.

Open the pom.xml file and add the following dependency in the dependencies section:

<dependency>
     <groupId>io.quarkus</groupId>
     <artifactId>quarkus-kubernetes</artifactId>
</dependency>

The Kubernetes extension is registered every time Maven packages the application and creates a kubernetes.yml file to deploy the application to a Kubernetes cluster. You can introduce changes to the generated file using the application.properties configuration file. Let’s set the kind of a Kubernetes Service to LoadBalancer instead of ClusterIP, and namespace to kafka.

Open application.properties and append the following line:

quarkus.kubernetes.service-type=load-balancer
quarkus.kubernetes.namespace=kafka

After this change, run the Maven package goal to generate the deployment file.

./mvnw clean package -DskipTests

[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------

Inspect the generated file located at target/kubernetes/kubernetes.yml:

cat target/kubernetes/kubernetes.yml

And the output should be similar to the following listing:

---
apiVersion: v1
kind: Service
metadata:
 …
 name: movie-plays-producer-debezium
spec:
 ports:
   - name: http
     port: 80
     targetPort: 8080
 selector:
   app.kubernetes.io/name: movie-plays-producer-debezium
   app.kubernetes.io/version: 1.0.0-SNAPSHOT
 # Type is LoadBalancer as set in the application.properties file 
 type: LoadBalancer
---
apiVersion: apps/v1
kind: Deployment
metadata:
 …
 name: movie-plays-producer-debezium
spec:
 replicas: 1
 selector:
   matchLabels:
     app.kubernetes.io/name: movie-plays-producer-debezium
     app.kubernetes.io/version: 1.0.0-SNAPSHOT
 template:
   metadata:
     …
   spec:
     containers:
       - env:
           - name: KUBERNETES_NAMESPACE
             valueFrom:
               fieldRef:
                 fieldPath: metadata.namespace
         # The image is correctly set automatically
         image: quay.io/lordofthejars/movie-plays-producer-debezium:1.0.0-SNAPSHOT
         imagePullPolicy: Always
         name: movie-plays-producer-debezium
         ports:
           - containerPort: 8080
             name: http
             protocol: TCP

TIP: In this example, configuration parameters are hardcoded to the application.properties, but you could override them as an environment variable. To set the environment variable in the Kubernetes Deployment object, you could add the following line to override the Kafka broker location:

quarkus.kubernetes.env.vars.kafka-bootstrap-servers=my-new-cluster:9092 

The resulting file will contain the env section with this new environment variable:

containers:
       - env:
           - name: KAFKA_BOOTSTRAP_SERVERS
             value: my-new-cluster:9092
         image: quay.io/lordofthejars/movie-plays-producer-debezium:1.0.0-SNAPSHOT

Putting The Pieces Together

We’ve already deployed a Kafka cluster using Strimzi into the Kubernetes cluster. Apply the following file (mysql-deployment.yaml) to deploy the MySQL instance with the configuration parameters set in the application.properties file.

apiVersion: v1
kind: Service
metadata:
 name: mysql
 labels:
   app: mysql
spec:
 ports:
   - port: 3306
 selector:
   app: mysql
 clusterIP: None
---
apiVersion: apps/v1
kind: Deployment
metadata:
 name: mysql
 labels:
   app: mysql
spec:
 selector:
   matchLabels:
     app: mysql
 strategy:
   type: Recreate
 template:
   metadata:
     labels:
       app: mysql
   spec:
     containers:
     - image: mysql:8.0.30
       name: mysql
       env:
       - name: MYSQL_ROOT_PASSWORD
         value: alex
       - name: MYSQL_DATABASE
         value: moviesdb
       - name: MYSQL_USER
         value: alex
       - name: MYSQL_PASSWORD
         value: alex
       ports:
       - containerPort: 3306
         name: mysql

Deploy the MySQL instance into the Kubernetes cluster:

kubectl apply -f mysql-deployment.yaml -n kafka

The last element to deploy is the application itself. You have two options; the first one is applying the resource directly:

kubectl apply -f target/kubernetes/kubernetes.yml -n kafka

The second one is packaging the application with the quarkus.kubernetes.deploy=true flag set to true. When this flag is set to true, Maven will:

  1. Create the application JAR files.
  2. Build the container image.
  3. Push the container image to the registry.
  4. Apply the kubernetes.yml resource file automatically to the logged Kubernetes cluster.
./mvnw clean package -DskipTests -Dquarkus.kubernetes.deploy=true

…
[INFO] [io.quarkus.kubernetes.deployment.KubernetesDeployer] Deploying to kubernetes server: https://192.168.59.104:8443/ in namespace: kafka.
[INFO] [io.quarkus.kubernetes.deployment.KubernetesDeployer] Applied: Service movie-plays-producer-debezium-server.
[INFO] [io.quarkus.kubernetes.deployment.KubernetesDeployer] Applied: Deployment movie-plays-producer-debezium-server.
[INFO] [io.quarkus.deployment.QuarkusAugmentor] Quarkus augmentation completed in 9537ms

To test that everything works correctly together, let’s send a request to insert a new movie and validate the insertion of a new event in the Kafka topic.

In a terminal window, run the following commands to get the IP and port to access the service:

Get the IP to access to the service:

minikube ip -p strimzi

192.168.59.104

Get the exposed port of the movie-plays-producer-debezium, which is the second port (shown as bold in the following snippet).

kubectl get services -n kafka

movie-plays-producer-debezium   LoadBalancer   10.100.117.203   <pending>     80:30306/TCP                 67m

Run the curl command to insert a new Movie.

curl -X 'POST' \
  'http://192.168.59.104:30306/movie' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "name": "Minions: The Rise of Gru",
  "director": "Kyle Balda",
  "genre": "Animation"
}'

Inspect the Quarkus logs and see the SQL statements run against the database:

kubectl get pods -n kafka

NAME                                             READY   STATUS      RESTARTS   AGE
movie-plays-producer-debezium-56f644cb87-5cchk   1/1     Running     0          6m5s
my-cluster-entity-operator-755596449b-cw82g      3/3     Running     0          35h
my-cluster-kafka-0                               1/1     Running     0          35h
my-cluster-zookeeper-0                           1/1     Running     0          35h

Print the logs of the movie-plays-producer-debezium pod.

kubectl logs movie-plays-producer-debezium-6b9b65bf4-9z524 -n kafka

2022-08-11 07:44:25,658 INFO  [org.acm.MovieResource] (executor-thread-1) New Movie inserted Minions: The Rise of Gru
:)
Hibernate:
    select
        next_val as id_val
    from
        hibernate_sequence for update

Hibernate:
    update
        hibernate_sequence
    set
        next_val= ?
    where
        next_val=?
Hibernate:
    insert
    into
        Movie
        (director, genre, name, id)
    values
        (?, ?, ?, ?)
Hibernate:
    insert
    into
        OutboxEvent
        (aggregatetype, aggregateid, type, timestamp, payload, tracingspancontext, id)
    values
        (?, ?, ?, ?, ?, ?, ?)

# Debezium reacts to the change
2022-08-11 07:44:25,867 INFO  [io.deb.con.com.BaseSourceTask] (executor-thread-0) 1 records sent during previous 00:20:44.297, last recorded offset: {transaction_id=null, ts_sec=1660203865, file=binlog.000002, pos=14795, row=1, server_id=1, event=4}
Movie Created and Reacting

You can also inspect the content of Kafka content by using the kafka-console-consumer.sh script present inside the Kafka container.  Get into the container and run the following command:

kubectl exec -ti my-cluster-kafka-0 -n kafka /bin/bash

./bin/kafka-console-consumer.sh --topic movies --from-beginning --bootstrap-server localhost:9092
{"id":1,"name":"Minions: The Rise of Gru","director":"Kyle Balda","genre":"Animation"}

To get back to the local terminal window, stop the kafka-console-consumer process by keying Ctrl+C, and then execute the exit command.

So far, so good; we’ve got the same application as in part 3 of this series, but now it’s running locally in a Kubernetes cluster.

So far, we have used Debezium in Embedded mode, but a Server mode is also possible when using Debezium.

Debezium Server

Debezium Server is a configurable, ready-to-use application that streams change events from a source database to messaging systems like Kafka. It’s registered as an Apache Kafka Connect component integrating as a source connector.

You cannot use Debezium Server in every scenario, but in my opinion, there are two big advantages using this approach:

  • You get all the advantages of a Kafka Connector (fault tolerance, scalable, reusable, etc.)
  • Since it is an external component, application code requires no changes, and no Debezium embedded code nor dependencies are required. So any application can start using Debezium with no changes or redeployments.

So let’s see how to move from Debezium Embedded to Debezium Server.

Removing Debezium Embedded 

The first thing to do is remove any dependency on any Debezium Embedded artifact.

Open pom.xml file and remove the following dependencies:

<dependency>
     <groupId>io.debezium</groupId>
     <artifactId>debezium-ddl-parser</artifactId>
</dependency>
<dependency>
     <groupId>io.debezium</groupId>
     <artifactId>debezium-embedded</artifactId>
</dependency>
<dependency>
     <groupId>io.debezium</groupId>
     <artifactId>debezium-connector-mysql</artifactId>
</dependency>

The next step is removing all the code related to Debezium Embedded configuration and listeners. Delete the following class files: DebeziumConfiguration.java, DebeziumListener.java, and MySqlJdbcParser.java

Since we don’t need any Kafka code because all the interaction to Kafka goes through the Kafka Connect component, the last step is to remove the following dependency from pom.xml:

<dependency>
     <groupId>io.quarkus</groupId>
     <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>

And the following line from the application.properties file is not required anymore:

%prod.kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092

No Kafka or Debezium Embedded dependencies are present in the project. Create a new container image containing these latest changes.

Delete the previous deployment by running the following commands in a terminal window:

kubectl delete deployment movie-plays-producer-debezium
kubectl delete service movie-plays-producer-debezium

To keep the container image with embedded Debezium, change the artifactId to movie-plays-producer-debezium-server.

Then deploy the new version with no Debezium code to the Kubernetes cluster as shown in the following command:

./mvnw clean package -DskipTests -Dquarkus.kubernetes.deploy=true

Validate the deployment of the new service by running the following command:

kubectl get pods -n kafkaa

NAME                                                    READY   STATUS    RESTARTS   AGE
movie-plays-producer-debezium-server-59db564b74-vhdmf   1/1     Running   0          73m

Deploying Debezium Kafka Connect

First, deploy a Kafka Connect component with the required MySQL connector plug-in; you can think that it’s like the logic we implemented in the DebeziumListener class but as a Kafka Connect element and reusable across any project. A container image for Kafka Connect with the plug-in has to be created as Debezium doesn’t provide an “official” container image for each of the possible combinations of Database-Kafka version. For this example, we create a container image using MySQL connector with Kafka 3.2.0 (as it’s the one used in production).

The container image of the MySQL connector for this article is found at quay.io/lordofthejars/debezium-connector-mysql:1.9.4, and if you are curious about how to build it, the Dockerfile is located at this GitHub repository. 

To deploy the Debezium Kafka Connect, we’ll use the KafkaConnect kind provided by Strimzi because it simplifies the whole process. In this Kubernetes resource file, we set the version of Kafka used, the location of the Kafka cluster (my-cluster-kafka-bootstrap:9092), and the container image (quay.io/lordofthejars/debezium-connector-mysql:1.9.4), and some specific configuration parameters.

Create a file named debezium-kafka-connect.yaml with the following content:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
 name: debezium-connect-cluster
 annotations:
   strimzi.io/use-connector-resources: "true"
spec:
 version: 3.2.0
 image: quay.io/lordofthejars/debezium-connector-mysql:1.9.4
 replicas: 1
 bootstrapServers: my-cluster-kafka-bootstrap:9092
 config:
   group.id: connect-cluster
   key.converter: org.apache.kafka.connect.json.JsonConverter
   value.converter: org.apache.kafka.connect.json.JsonConverter
   key.converter.schemas.enable: false
   value.converter.schemas.enable: false
   offset.storage.topic: connect-offsets
   offset.storage.replication.factor: 1
   config.storage.topic: connect-configs
   config.storage.replication.factor: 1
   status.storage.topic: connect-status
   status.storage.replication.factor: 1

Then in the terminal window, apply this resource:

kubectl apply -f debezium-kafka-connect.yaml -n kafka

And validate that it’s correctly deployed by running the following command:

kubectl get pods -n kafka

debezium-connect-cluster-connect-546c8695c-lszn7        1/1     Running   0          91m

Keep in mind that this process might take minutes to get ready.

The Kafka Connect component is now connected to the Kafka cluster; the last step is configuring it to listen for changes in the MySQL instance.

To do so, we’ll use the KafkaConnector kind provided by Strimzi. It’s similar to what we did in the DebeziumConfiguration class, providing configuration parameters like database.hostname or table.include.list. Moreover, set the strimzi.io/cluster label value to the KafkaConnect name set in the previous YAML file (debezium-connect-cluster)

Create a new file named debezium-kafka-connector.yaml with the following content:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
 name: debezium-connector-mysql
 labels:
   strimzi.io/cluster: debezium-connect-cluster
spec:
 class: io.debezium.connector.mysql.MySqlConnector
 tasksMax: 1
 config:
   tasks.max: 1
   database.hostname: mysql
   database.port: 3306
   database.user: root
   database.password: alex
   database.server.id: 184054
   database.server.name: mysql
   database.include.list: moviesdb
   database.allowPublicKeyRetrieval: true
   table.include.list: moviesdb.OutboxEvent
   database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
   database.history.kafka.topic: schema-changes.movies

Apply the resource to configure the Debezium Connector:

kubectl apply -f debezium-kafka-connector.yaml -n kafka

To verify that everything works well, add a new movie and validate that the insertion of the new movie to the database produces a new event in a Kafka topic.

Get the port of the new service, and the IP is still the same:

kubectl get services -n kafka

movie-plays-producer-debezium-server   LoadBalancer   10.100.117.203   <pending>     80:30307/TCP                 67m


curl -X 'POST' \
  'http://192.168.59.104:30307/movie' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "name": "Minions: The Rise of Gru",
  "director": "Kyle Balda",
  "genre": "Animation"
}'

And use kafka-console-consumer.sh script to validate the insertation:

kubectl exec -ti my-cluster-kafka-0 -n kafka /bin/bash

And then, inside the container, run the script. Note that Debezium Connector sends the event to a Kafka topic with the following name: <database.server.name>.<database.inlude.list>.<table> so in this case is mysql.moviesdb.OutboxEvent.

./bin/kafka-console-consumer.sh --topic mysql.moviesdb.OutboxEvent --from-beginning --bootstrap-server localhost:9092

{"before":null,"after":{"id":"Yxk0o5WwTvi0+nwBr2Y36wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","aggregatetype":"Movie","aggregateid":"5","type":"MovieCreated","timestamp":1660253420864918,"payload":"{\"id\":5,\"name\":\"Minions: The Rise of Gru\",\"director\":\"Kyle Balda\",\"genre\":\"Animation\"}","tracingspancontext":null},"source":{"version":"1.9.4.Final","connector":"mysql","name":"mysql","ts_ms":1660253420000,"snapshot":"false","db":"moviesdb","sequence":null,"table":"OutboxEvent","server_id":1,"gtid":null,"file":"binlog.000002","pos":8788,"row":0,"thread":41,"query":null},"op":"c","ts_ms":1660253420878,"transaction":null}

The before field is empty as it’s an insert, so there is no previous value, but in the after field, movie content is present.

Conclusions

We’ve moved our application from local development to a Kubernetes cluster one step forward.

Strimzi has shown a key element when deploying and managing an Apache Kafka cluster into Kubernetes. It lets you install and manage the cluster using Kubernetes resource files, enabling GitOps methodology for Kafka.

The Debezium Embedded works in some scenarios where you need some ad-hoc logic when a change is detected. Still, in others (especially in brownfield projects or projects requiring high scalability and fault-tolerance), Debezium Server might suit better. 

Moving from local development to a Kubernetes cluster should not be difficult, thanks to Strimzi and the Jib and Kubernetes Quarkus extensions.

The source code is available on GitHub.

 

About the Author

Rate this Article

Adoption
Style

BT