Key Takeaways
- Strimzi simplifies the deployment of a Kafka cluster to a Kubernetes cluster.
- Strimzi is a must to manage Kafka using GitOps methodology.
- Debezium Embedded can be easily replaced to Debezium Server and maintain the application without Debezium/Kafka dependencies.
- It is recommended to use Debezium Server for brownfield projects as it requires no modification of the original code.
In part 1 and part 2 of this series, we learned about the integration between Apache Kafka, Apache Kafka Streams, and Quarkus, where we developed a simple application producing events to a Kafka topic and consuming and processing them in real-time with Kafka Streams.
In that example, we simulated a Movies streaming company. We stored Movies in one Kafka topic and, in another Kafka topic, held each occurrence when a user stopped watching a movie and captured the time it had been played. We post-processed these events in real-time to count the number of times a movie is played for more than 10 minutes.
The following figure shows the architecture of the application:
Then in part 3, we covered the Outbox pattern and Debezium to avoid the dual-writes problem that occurs when different systems need to synchronize the same data.
In all three previous articles, we’ve been learning all these technologies from the developer's point of view, and we end up deploying the application locally (in the dev mode) on the developer’s machine.
In this article, we’ll explore how to put everything in production; more concretely, in Kubernetes, we’ll learn how to:
- Install and manage an Apache Kafka cluster in a Kubernetes cluster.
- Containerize a Quarkus application.
- Configure a Quarkus application with production parameters.
- Migrate from using Debezium Embedded to Debezium Server.
Kubernetes
Kubernetes is an open-source container orchestrator, becoming the de-facto platform to deploy microservices that can be run either in bare-metal or in the cloud.
This article uses minikube as a Kubernetes cluster, but the same steps should be valid in any other implementation.
Starting the Cluster
Execute the following command in a terminal window to start the cluster in a VirtualBox machine with 8GB of memory and two vCPUs:
minikube start -p strimzi --kubernetes-version='v1.22.12' --vm-driver='virtualbox' --memory=8096
[strimzi] minikube v1.24.0 on Darwin 12.5
minikube 1.26.1 is available! Download it: https://github.com/kubernetes/minikube/releases/tag/v1.26.1
To disable this notice, run: 'minikube config set WantUpdateNotification false'
✨ Using the virtualbox driver based on user configuration
Starting control plane node strimzi in cluster strimzi
Creating virtualbox VM (CPUs=2, Memory=8096MB, Disk=20000MB) ...
> kubelet.sha256: 64 B / 64 B [--------------------------] 100.00% ? p/s 0s
> kubeadm.sha256: 64 B / 64 B [--------------------------] 100.00% ? p/s 0s
> kubectl.sha256: 64 B / 64 B [--------------------------] 100.00% ? p/s 0s
> kubeadm: 43.74 MiB / 43.74 MiB [-------------] 100.00% 13.98 MiB p/s 3.3s
> kubectl: 44.77 MiB / 44.77 MiB [-------------] 100.00% 11.11 MiB p/s 4.2s
> kubelet: 115.30 MiB / 115.30 MiB [-----------] 100.00% 20.16 MiB p/s 5.9s
▪ Generating certificates and keys ...
▪ Booting up control plane ...
▪ Configuring RBAC rules ...
▪ Using image gcr.io/k8s-minikube/storage-provisioner:v5
Verifying Kubernetes components...
Enabled addons: storage-provisioner, default-storageclass
❗ /usr/local/bin/kubectl is version 1.24.0, which may have incompatibilites with Kubernetes 1.22.12.
▪ Want kubectl v1.22.12? Try 'minikube kubectl -- get pods -A'
Done! kubectl is now configured to use "strimzi" cluster and "default" namespace by default
To check if the Kubernetes cluster is up and running, run the following two commands in a terminal window:
kubectl get nodes
NAME STATUS ROLES AGE VERSION
strimzi Ready control-plane,master 3m4s v1.22.12
kubectl get pods
No resources found in default namespace.
Apache Kafka
In all past articles, we used the Quarkus dev mode to leverage Quarkus to boot-up all external dependencies required to run the application (Kafka cluster and MySQL database). Although dev mode is fantastic from the development point of view when it’s time to deploy to production, you might find things more complicated to manage. The first barrier might be installing and configuring a Kafka cluster into a Kubernetes because of the nature of a Kafka cluster.
Some of the questions you might wonder are:
- Which container image do I use for all Kafka components (Kafka, Zookeeper, etc.)?
- How do I deploy all these components easily in Kubernetes?
- How do I create users, topics, or HA in a Kubernetes way?
- And what about security?
You might try to do all these things by hand, such as crafting long YAML files and using Kafka CI tooling to configure Kafka parts. However, there is another way that is Kubernetes-native, fully automated and reproducible (perfect for CI/CD); and it’s using Strimzi.
Strimzi
Strimzi is a Kubernetes operator that installs a controller to create, configure and secure a Kafka cluster like any other Kubernetes resource such as a Pod, Deployment, ConfigMap, etc.
The Strimzi project comprises three operators: one for the Kafka cluster, another for the topics, and the last for user management.
By having the Strimzi operators installed in the Kubernetes cluster, you only need to apply the following YAML
file to have a Kafka cluster up and running with one replica of Kafka and three replicas of the ZooKeeper with ephemeral storage (no persistent volume attached).
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 3.2.0
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
inter.broker.protocol.version: "3.2"
storage:
type: ephemeral
zookeeper:
replicas: 3
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
Let’s install Strimzi in the cluster we already started.
Installing Strimzi
The first thing is to create a namespace to install Strimzi operators; in this case, the kafka namespace is used. In a terminal window, run the following command:
kubectl create namespace kafka
namespace/kafka created
Next, apply the Strimzi install files, which include among other things, the CRDs (CustomerResourceDefinition) to use for declarative management of the Kafka cluster, Kafka topics, and users.
kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
Validate the operator installation by running the following command:
kubectl get pods -n kafka
NAME READY STATUS RESTARTS AGE
strimzi-cluster-operator-597d67c7d6-ms987 1/1 Running 0 4m27s
We are now ready to create the Kafka cluster with the movie topic. This is the topic where we store all movies to be processed later on by Kafka Streams, as we saw in part 2 of this series of articles about Kafka.
Creating the Kafka Cluster
Create a new file (i.e., kafka.yaml
) to install a Kafka cluster with one replica, no TLS, and internal as the type of Kubernetes service to access the Kafka cluster.
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 3.2.0
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
inter.broker.protocol.version: "3.2"
storage:
type: ephemeral
zookeeper:
replicas: 1
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
Then use the kubectl command in a terminal window to create this resource:
kubectl create -f kafka.yaml -n kafka
kafka.kafka.strimzi.io/my-cluster created
At this point, Strimzi starts installing a Kafka cluster in the default namespace.
Now, get the default namespace pods to check the cluster's creation.
kubectl get pods -n kafka
NAME READY STATUS
my-cluster-entity-operator-755596449b-cw82g 3/3 Running
my-cluster-kafka-0 1/1 Running
my-cluster-zookeeper-0 1/1 Running
The Kafka cluster is up and running. Apart from installing it as any other Kubernetes resource, we can also query the resource or describe it. For example, run the following commands in a terminal window:
kubectl get kafka -n kafka
NAME DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS READY WARNINGS
my-cluster 1 1 True True
kubectl describe kafka my-cluster -n kafka
Name: my-cluster
Namespace: default
Labels: <none>
Annotations: <none>
API Version: kafka.strimzi.io/v1beta2
Kind: Kafka
Metadata:
Creation Timestamp: 2022-08-09T10:57:39Z
…
And of course, you can delete it as any other Kubernetes resource. Moreover, four Kubernetes services are created to access the Kafka cluster:
kubectl get services -n kafka
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
143m
my-cluster-kafka-bootstrap ClusterIP 172.30.77.150 <none> 9091/TCP,9092/TCP 21m
my-cluster-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,9092/TCP 21m
my-cluster-zookeeper-client ClusterIP 172.30.5.186 <none> 2181/TCP 21m
my-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 21m
The application's important service for accessing the cluster is the my-cluster-kafka-bootstrap service, which exposes the 9092 Kafka port.
Before jumping to the application, let’s create and configure the movies topic using another YAML
file.
Creating the Movie's Topic
Strimzi has an operator for the creation and management of topics. To create a new topic, we create a Kubernetes resource file of the kind KafkaTopic, setting the topic's name and the name of the cluster (my-cluster in our case) in the strimzi.io/cluster label. Let’s create a new file named movies-topic.yaml with the following content:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: movies
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 1
replicas: 1
config:
retention.ms: 7200000
segment.bytes: 1073741824
And apply the resource:
kubectl apply -f movies-topic.yaml -n kafka
kafkatopic.kafka.strimzi.io/movies create
And as with any other Kubernetes resource, we can query and describe it as well.
kubectl get kafkatopic -n kafka
NAME CLUSTER PARTITIONS REPLICATION FACTOR READY
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a my-cluster 50 1 True
movies my-cluster 1 1 True
strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55 my-cluster 1 1 True
strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b my-cluster 1 1 True
And describing the created topic:
kubectl port-forward -n kafka service/my-cluster-kafka-bootstrap 9092:9092
Forwarding from 127.0.0.1:9092 -> 9092
Forwarding from [::1]:9092 -> 9092
Let’s check if the topic has been created by port forwarding the service to the local computer.
Run the following command in a terminal window:
kubectl port-forward -n kafka service/my-cluster-kafka-bootstrap 9092:9092
Forwarding from 127.0.0.1:9092 -> 9092
Forwarding from [::1]:9092 -> 9092
Open a new terminal window and use the kcat tool to list the elements of the Kafka cluster. We can use localhost as hostname as we did the port forwarding trick in the previous step.
kcat -b localhost:9092 -L
Metadata for all topics (from broker -1: localhost:9092/bootstrap):
1 brokers:
broker 0 at my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc:9092 (controller)
4 topics:
topic "movies" with 1 partitions:
partition 0, leader 0, replicas: 0, isrs: 0
Finally, stop the port forwarding process, and let’s containerize the project, as we did in part 3 of this series, and configure it accordingly to connect to the Kafka cluster.
Movie Plays Producer Debezium
In the service we developed in part 3, we dealt with the dual-writes problem, fixing it using Debezium (concretely Debezium Embedded) by listening to the transaction logs from the MySQL server and producing a Kafka event with data content every time a new movie was inserted. But you run the example in the local machine, using dev services to boot up the required services (MySQL and Kafka) and automatically configuring the application to connect to them.
Things are a bit different; the service must run in a Kubernetes cluster, with the Kafka cluster created in the previous steps and a MySQL database. Three changes are required to make it run in Kubernetes:
- Configure the service with the new Kafka and MySQL parameters (hostname, port, username, password).
- Containerize the application and push it to a container registry.
- Create Kubernetes resource files to deploy the service.
Configure The Service
The first thing to configure is the Kafka broker hostname and port that point to the Kubernetes service created by Strimzi. Open the src/main/resources/application.properties
file and append the following line:
%prod.kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092
INFO: The %prod prefix indicates that the property is only used when the application runs in prod mode (not in dev or test).
The second thing is to configure the database connection where movies are inserted. In the application.properties file, append the following lines:
quarkus.hibernate-orm.database.generation=drop-and-create
%prod.quarkus.datasource.username=alex
%prod.quarkus.datasource.password=alex
%prod.quarkus.datasource.jdbc.url=jdbc:mysql://mysql:3306/moviesdb
Later on, we’ll deploy a MySQL instance with these parameters; for now, just assume configuration parameters are correct.
Containerize
To create a container, Quarkus offers an integration with the Jib project, making the build and push of a container image as simple as running a simple Maven/Gradle task.
Open the pom.xml
file and add in the dependencies
section the following dependency:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-container-image-jib</artifactId>
</dependency>
With the Jib extension registered, it will automatically containerize the application when packaged. Because some default values of Jib may not work in every case, you can override them in the src/main/resources/application.properties
file. For this example, let’s override the group and the container registry host part of the generated container image name.
Open the application.properties
file and add the following lines:
# Substitue the value with your account name
quarkus.container-image.group=lordofthejars
# Defaults to Docker.io, overriden to Quay.
quarkus.container-image.registry=quay.io
IMPORTANT: You need to set the credentials to the container registry to push the container; you can run the docker login command before running the build. Maven will read credentials from there, or you can use the quarkus.container-image.username
and quarkus.container-image.password
properties.
Run the following command in the root directory of the project to build the application, construct a container and push it to the defined container registry:
./mvnw clean package -DskipTests -Dquarkus.container-image.push=true
[INFO] Scanning for projects...
[INFO]
[INFO] ---------------< org.acme:movie-plays-producer-debezium >---------------
[INFO] Building movie-plays-producer-debezium 1.0.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ movie-plays-producer-debezium ---
[INFO] Deleting /Users/asotobu/git/quarkus-integrating-kafka/strimzi/movie-plays-producer-debezium/target
…
[INFO] [io.quarkus.container.image.jib.deployment.JibProcessor] Using base image with digest: sha256:1a2fddacdcda67494168749c7ab49243d06d8fbed34abab90566d81b94f5e1a5
[INFO] [io.quarkus.container.image.jib.deployment.JibProcessor] Container entrypoint set to [java, -Djava.util.logging.manager=org.jboss.logmanager.LogManager, -jar, quarkus-run.jar]
[INFO] [io.quarkus.container.image.jib.deployment.JibProcessor] Pushed container image quay.io/lordofthejars/movie-plays-producer-debezium:1.0.0-SNAPSHOT (sha256:73dfe42d53f8d7e3c268dbebc2e5f866596de33b8fcaf82c27bdd414d28bdb8a)
As you can see in the last log line, the container is created and pushed to the registry with the account name set in the application.properties
file, but relying on defaults for the container name
(artifactId
value) and tag
(version
value).
Kubernetes
After pushing the container, we’re ready to deploy the service to Kubernetes. We could manually create the Kubernetes resource files, but it’s unnecessary as Quarkus has a Kubernetes extension.
Open the pom.xml file and add the following dependency in the dependencies section:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kubernetes</artifactId>
</dependency>
The Kubernetes extension is registered every time Maven packages the application and creates a kubernetes.yml
file to deploy the application to a Kubernetes cluster. You can introduce changes to the generated file using the application.properties
configuration file. Let’s set the kind of a Kubernetes Service
to LoadBalancer
instead of ClusterIP
, and namespace
to kafka
.
Open application.properties
and append the following line:
quarkus.kubernetes.service-type=load-balancer
quarkus.kubernetes.namespace=kafka
After this change, run the Maven package goal to generate the deployment file.
./mvnw clean package -DskipTests
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
Inspect the generated file located at target/kubernetes/kubernetes.yml:
cat target/kubernetes/kubernetes.yml
And the output should be similar to the following listing:
---
apiVersion: v1
kind: Service
metadata:
…
name: movie-plays-producer-debezium
spec:
ports:
- name: http
port: 80
targetPort: 8080
selector:
app.kubernetes.io/name: movie-plays-producer-debezium
app.kubernetes.io/version: 1.0.0-SNAPSHOT
# Type is LoadBalancer as set in the application.properties file
type: LoadBalancer
---
apiVersion: apps/v1
kind: Deployment
metadata:
…
name: movie-plays-producer-debezium
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: movie-plays-producer-debezium
app.kubernetes.io/version: 1.0.0-SNAPSHOT
template:
metadata:
…
spec:
containers:
- env:
- name: KUBERNETES_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
# The image is correctly set automatically
image: quay.io/lordofthejars/movie-plays-producer-debezium:1.0.0-SNAPSHOT
imagePullPolicy: Always
name: movie-plays-producer-debezium
ports:
- containerPort: 8080
name: http
protocol: TCP
TIP: In this example, configuration parameters are hardcoded to the application.properties
, but you could override them as an environment variable. To set the environment variable in the Kubernetes Deployment
object, you could add the following line to override the Kafka broker location:
quarkus.kubernetes.env.vars.kafka-bootstrap-servers=my-new-cluster:9092
The resulting file will contain the env section with this new environment variable:
containers:
- env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: my-new-cluster:9092
image: quay.io/lordofthejars/movie-plays-producer-debezium:1.0.0-SNAPSHOT
Putting The Pieces Together
We’ve already deployed a Kafka cluster using Strimzi into the Kubernetes cluster. Apply the following file (mysql-deployment.yaml
) to deploy the MySQL instance with the configuration parameters set in the application.properties
file.
apiVersion: v1
kind: Service
metadata:
name: mysql
labels:
app: mysql
spec:
ports:
- port: 3306
selector:
app: mysql
clusterIP: None
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: mysql
labels:
app: mysql
spec:
selector:
matchLabels:
app: mysql
strategy:
type: Recreate
template:
metadata:
labels:
app: mysql
spec:
containers:
- image: mysql:8.0.30
name: mysql
env:
- name: MYSQL_ROOT_PASSWORD
value: alex
- name: MYSQL_DATABASE
value: moviesdb
- name: MYSQL_USER
value: alex
- name: MYSQL_PASSWORD
value: alex
ports:
- containerPort: 3306
name: mysql
Deploy the MySQL instance into the Kubernetes cluster:
kubectl apply -f mysql-deployment.yaml -n kafka
The last element to deploy is the application itself. You have two options; the first one is applying the resource directly:
kubectl apply -f target/kubernetes/kubernetes.yml -n kafka
The second one is packaging the application with the quarkus.kubernetes.deploy=true
flag set to true. When this flag is set to true, Maven will:
- Create the application JAR files.
- Build the container image.
- Push the container image to the registry.
- Apply the
kubernetes.yml
resource file automatically to the logged Kubernetes cluster.
./mvnw clean package -DskipTests -Dquarkus.kubernetes.deploy=true
…
[INFO] [io.quarkus.kubernetes.deployment.KubernetesDeployer] Deploying to kubernetes server: https://192.168.59.104:8443/ in namespace: kafka.
[INFO] [io.quarkus.kubernetes.deployment.KubernetesDeployer] Applied: Service movie-plays-producer-debezium-server.
[INFO] [io.quarkus.kubernetes.deployment.KubernetesDeployer] Applied: Deployment movie-plays-producer-debezium-server.
[INFO] [io.quarkus.deployment.QuarkusAugmentor] Quarkus augmentation completed in 9537ms
To test that everything works correctly together, let’s send a request to insert a new movie and validate the insertion of a new event in the Kafka topic.
In a terminal window, run the following commands to get the IP and port to access the service:
Get the IP to access to the service:
minikube ip -p strimzi
192.168.59.104
Get the exposed port of the movie-plays-producer-debezium, which is the second port (shown as bold in the following snippet).
kubectl get services -n kafka
movie-plays-producer-debezium LoadBalancer 10.100.117.203 <pending> 80:30306/TCP 67m
Run the curl command to insert a new Movie.
curl -X 'POST' \
'http://192.168.59.104:30306/movie' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"name": "Minions: The Rise of Gru",
"director": "Kyle Balda",
"genre": "Animation"
}'
Inspect the Quarkus logs and see the SQL statements run against the database:
kubectl get pods -n kafka
NAME READY STATUS RESTARTS AGE
movie-plays-producer-debezium-56f644cb87-5cchk 1/1 Running 0 6m5s
my-cluster-entity-operator-755596449b-cw82g 3/3 Running 0 35h
my-cluster-kafka-0 1/1 Running 0 35h
my-cluster-zookeeper-0 1/1 Running 0 35h
Print the logs of the movie-plays-producer-debezium
pod.
kubectl logs movie-plays-producer-debezium-6b9b65bf4-9z524 -n kafka
2022-08-11 07:44:25,658 INFO [org.acm.MovieResource] (executor-thread-1) New Movie inserted Minions: The Rise of Gru
:)
Hibernate:
select
next_val as id_val
from
hibernate_sequence for update
Hibernate:
update
hibernate_sequence
set
next_val= ?
where
next_val=?
Hibernate:
insert
into
Movie
(director, genre, name, id)
values
(?, ?, ?, ?)
Hibernate:
insert
into
OutboxEvent
(aggregatetype, aggregateid, type, timestamp, payload, tracingspancontext, id)
values
(?, ?, ?, ?, ?, ?, ?)
# Debezium reacts to the change
2022-08-11 07:44:25,867 INFO [io.deb.con.com.BaseSourceTask] (executor-thread-0) 1 records sent during previous 00:20:44.297, last recorded offset: {transaction_id=null, ts_sec=1660203865, file=binlog.000002, pos=14795, row=1, server_id=1, event=4}
Movie Created and Reacting
You can also inspect the content of Kafka content by using the kafka-console-consumer.sh
script present inside the Kafka container. Get into the container and run the following command:
kubectl exec -ti my-cluster-kafka-0 -n kafka /bin/bash
./bin/kafka-console-consumer.sh --topic movies --from-beginning --bootstrap-server localhost:9092
{"id":1,"name":"Minions: The Rise of Gru","director":"Kyle Balda","genre":"Animation"}
To get back to the local terminal window, stop the kafka-console-consumer
process by keying Ctrl+C
, and then execute the exit
command.
So far, so good; we’ve got the same application as in part 3 of this series, but now it’s running locally in a Kubernetes cluster.
So far, we have used Debezium in Embedded mode, but a Server mode is also possible when using Debezium.
Debezium Server
Debezium Server is a configurable, ready-to-use application that streams change events from a source database to messaging systems like Kafka. It’s registered as an Apache Kafka Connect component integrating as a source connector.
You cannot use Debezium Server in every scenario, but in my opinion, there are two big advantages using this approach:
- You get all the advantages of a Kafka Connector (fault tolerance, scalable, reusable, etc.)
- Since it is an external component, application code requires no changes, and no Debezium embedded code nor dependencies are required. So any application can start using Debezium with no changes or redeployments.
So let’s see how to move from Debezium Embedded to Debezium Server.
Removing Debezium Embedded
The first thing to do is remove any dependency on any Debezium Embedded artifact.
Open pom.xml
file and remove the following dependencies:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-ddl-parser</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
</dependency>
The next step is removing all the code related to Debezium Embedded configuration and listeners. Delete the following class files: DebeziumConfiguration.java
, DebeziumListener.java
, and MySqlJdbcParser.java
.
Since we don’t need any Kafka code because all the interaction to Kafka goes through the Kafka Connect component, the last step is to remove the following dependency from pom.xml
:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
And the following line from the application.properties file is not required anymore:
%prod.kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092
No Kafka or Debezium Embedded dependencies are present in the project. Create a new container image containing these latest changes.
Delete the previous deployment by running the following commands in a terminal window:
kubectl delete deployment movie-plays-producer-debezium
kubectl delete service movie-plays-producer-debezium
To keep the container image with embedded Debezium, change the artifactId to movie-plays-producer-debezium-server.
Then deploy the new version with no Debezium code to the Kubernetes cluster as shown in the following command:
./mvnw clean package -DskipTests -Dquarkus.kubernetes.deploy=true
Validate the deployment of the new service by running the following command:
kubectl get pods -n kafkaa
NAME READY STATUS RESTARTS AGE
movie-plays-producer-debezium-server-59db564b74-vhdmf 1/1 Running 0 73m
Deploying Debezium Kafka Connect
First, deploy a Kafka Connect component with the required MySQL connector plug-in; you can think that it’s like the logic we implemented in the DebeziumListener
class but as a Kafka Connect element and reusable across any project. A container image for Kafka Connect with the plug-in has to be created as Debezium doesn’t provide an “official” container image for each of the possible combinations of Database-Kafka version. For this example, we create a container image using MySQL connector with Kafka 3.2.0 (as it’s the one used in production).
The container image of the MySQL connector for this article is found at quay.io/lordofthejars/debezium-connector-mysql:1.9.4, and if you are curious about how to build it, the Dockerfile is located at this GitHub repository.
To deploy the Debezium Kafka Connect, we’ll use the KafkaConnect
kind provided by Strimzi because it simplifies the whole process. In this Kubernetes resource file, we set the version of Kafka used, the location of the Kafka cluster (my-cluster-kafka-bootstrap:9092
), and the container image (quay.io/lordofthejars/debezium-connector-mysql:1.9.4
), and some specific configuration parameters.
Create a file named debezium-kafka-connect.yaml
with the following content:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.2.0
image: quay.io/lordofthejars/debezium-connector-mysql:1.9.4
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9092
config:
group.id: connect-cluster
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter.schemas.enable: false
offset.storage.topic: connect-offsets
offset.storage.replication.factor: 1
config.storage.topic: connect-configs
config.storage.replication.factor: 1
status.storage.topic: connect-status
status.storage.replication.factor: 1
Then in the terminal window, apply this resource:
kubectl apply -f debezium-kafka-connect.yaml -n kafka
And validate that it’s correctly deployed by running the following command:
kubectl get pods -n kafka
debezium-connect-cluster-connect-546c8695c-lszn7 1/1 Running 0 91m
Keep in mind that this process might take minutes to get ready.
The Kafka Connect component is now connected to the Kafka cluster; the last step is configuring it to listen for changes in the MySQL instance.
To do so, we’ll use the KafkaConnector
kind provided by Strimzi. It’s similar to what we did in the DebeziumConfiguration
class, providing configuration parameters like database.hostname
or table.include.list
. Moreover, set the strimzi.io/cluster
label value to the KafkaConnect name set in the previous YAML
file (debezium-connect-cluster
)
Create a new file named debezium-kafka-connector.yaml
with the following content:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-connector-mysql
labels:
strimzi.io/cluster: debezium-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
tasks.max: 1
database.hostname: mysql
database.port: 3306
database.user: root
database.password: alex
database.server.id: 184054
database.server.name: mysql
database.include.list: moviesdb
database.allowPublicKeyRetrieval: true
table.include.list: moviesdb.OutboxEvent
database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
database.history.kafka.topic: schema-changes.movies
Apply the resource to configure the Debezium Connector:
kubectl apply -f debezium-kafka-connector.yaml -n kafka
To verify that everything works well, add a new movie and validate that the insertion of the new movie to the database produces a new event in a Kafka topic.
Get the port of the new service, and the IP is still the same:
kubectl get services -n kafka
movie-plays-producer-debezium-server LoadBalancer 10.100.117.203 <pending> 80:30307/TCP 67m
curl -X 'POST' \
'http://192.168.59.104:30307/movie' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"name": "Minions: The Rise of Gru",
"director": "Kyle Balda",
"genre": "Animation"
}'
And use kafka-console-consumer.sh
script to validate the insertation:
kubectl exec -ti my-cluster-kafka-0 -n kafka /bin/bash
And then, inside the container, run the script. Note that Debezium Connector sends the event to a Kafka topic with the following name: <database.server.name>.<database.inlude.list>.<table>
so in this case is mysql.moviesdb.OutboxEvent.
./bin/kafka-console-consumer.sh --topic mysql.moviesdb.OutboxEvent --from-beginning --bootstrap-server localhost:9092
{"before":null,"after":{"id":"Yxk0o5WwTvi0+nwBr2Y36wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","aggregatetype":"Movie","aggregateid":"5","type":"MovieCreated","timestamp":1660253420864918,"payload":"{\"id\":5,\"name\":\"Minions: The Rise of Gru\",\"director\":\"Kyle Balda\",\"genre\":\"Animation\"}","tracingspancontext":null},"source":{"version":"1.9.4.Final","connector":"mysql","name":"mysql","ts_ms":1660253420000,"snapshot":"false","db":"moviesdb","sequence":null,"table":"OutboxEvent","server_id":1,"gtid":null,"file":"binlog.000002","pos":8788,"row":0,"thread":41,"query":null},"op":"c","ts_ms":1660253420878,"transaction":null}
The before
field is empty as it’s an insert, so there is no previous value, but in the after
field, movie content is present.
Conclusions
We’ve moved our application from local development to a Kubernetes cluster one step forward.
Strimzi has shown a key element when deploying and managing an Apache Kafka cluster into Kubernetes. It lets you install and manage the cluster using Kubernetes resource files, enabling GitOps methodology for Kafka.
The Debezium Embedded works in some scenarios where you need some ad-hoc logic when a change is detected. Still, in others (especially in brownfield projects or projects requiring high scalability and fault-tolerance), Debezium Server might suit better.
Moving from local development to a Kubernetes cluster should not be difficult, thanks to Strimzi and the Jib and Kubernetes Quarkus extensions.
The source code is available on GitHub.