Key Takeaways
- Dual writes is something to avoid as you may end up losing data because of concurrent writes
- Debezium reads database transaction logs and sends an event for each new record
- You can either use Debezium embedded in Java code or Debezium server as an external service
- kcat is a tool to inspect Kafka topics
In part 2 of this series, we learned about the integration between Apache Kafka Streams and Quarkus, where we developed a simple application producing events to a Kafka topic and consuming and processing them in real-time with Kafka Streams.
In that example, we simulated a Movies streaming company. We stored Movies in one Kafka topic and, in another Kafka topic, we held each occurrence when a user stopped watching a movie and captured the time it had been played. We post-processed these events in real-time to count the number of times a movie is played for more than 10 minutes.
The following figure shows the architecture of the application:
But all the information was stored in Kafka topics. But it’s unlikely that this can happen in a real-world project.
Movie information is probably stored in a traditional database; some distributed cache to speed up queries, or movies are indexed in a search engine. But let’s keep things simple; let’s assume that movie information is stored in a database.
This raises the question of how we can maintain the same data in two different systems, in the database as the primary place and in the Kafka movies topic to process data with Kafka Streams.
This article will teach you how to have the same data in different forms correctly.
Dual Writes
The first thing that may come to mind to fix this problem is the dual writes approach. It’s a straightforward approach as it’s the responsibility of your application code to maintain data in all the places. For example, an insert of a new Movie should execute an insert to the database and fire an event to the Kafka topic.
In terms of code, this could be something like:
@Channel("movies")
Emitter<Record<Long, String>> movieEmitter;
private static ObjectMapper objectMapper = new ObjectMapper();
public Movie dualWriteInsert(Movie movie) throws JsonProcessingException {
// Inserts to DB
movie.persist();
// Send an event to movies topic
final String payloadJson = objectMapper.writeValueAsString(movie);
long id = movie.id;
movieEmitter.send(Record.of(id, payloadJson));
This looks correct, is easy to implement, and works until you start getting some weird issues if you try it. Let’s explore some of them:
-
If data is persisted in the database but fails when it’s sent to the Kafka topic, you could wrap both operations in a transaction block. This can fix the transaction problem because there will be a rollback in case of an error. You are paying a big price in performance; the bigger the transaction scope, the more time you block the database. This isn’t fixing concurrency.
-
What happens if two concurrent users want to update the same Movie entry simultaneously? It could happen that the execution of the first request updates the database and sends the event to Kafka altogether, and then the execution of the second request updates the Movie again. In this case, the database and the Kafka topic content are aligned. But what happens if the execution of the first request only persists to the database operation, then the second request runs the persist and the send event to Kafka. After that, the first request sends the event to the Kafka topic. At this time, database Movie data and Kafka topic data diverged to have different values, leading to inconsistencies between data. Of course, you could synchronize the whole method, but this would mean a huge performance loss.
This last problem occurs because of the nature of mixing different systems; a database transaction ensures within its persistence layer but not between systems.
2-Phase Commit
One possible solution to this problem is to use the 2-Phase Commit protocol. Although this could be a good solution, two problems are present:
- First of all, not all system supports distributed transactions, and the 2-Phase Commit
- This protocol has issues because of the communication between all the parties required for coordination purposes.
It is a possible solution, but it’s not a generic solution, and for this specific case, Apache Kafka isn’t supporting distributed transactions, so let’s explore another solution.
Change Data Capture
Change Data Capture (CDC) is a pattern used to track data that has changed (i.e., new entries added, updated registries, etc.) and trigger an event, making it possible for the application to react to the change.
There are several ways for implementing CDC, for example, using timestamps, versions, or status indicators at the row level, so you periodically check the elements from one specific point (i.e., SELECT all elements WHERE status=not_read
). But this approach has the drawback that you are regularly accessing the database for no business purposes or dealing with the deletions of the entries.
Another option is using database triggers, i.e., any change triggers an event and stores it in a specific event table. It works; you can capture any event, but you are still periodically polling the database.
Most databases have a transaction log that records all changes made to the database. Log scanners scan this log and capture any change in a non-intrusive way. The benefits of this approach are:
- Minimal impact in the database.
- Changes are transparent to the application; inserts in special columns are unnecessary.
- Transactional integrity
- No changes to the database schema
Log scanners are the best approach, and one of the most popular open-source projects is Debezium.
Debezium
Debezium is an open-source project for change data capture using the log scanner approach. Start the database, and configure Debezium to consume data from the transaction log of that database. At this point, for every insert, delete, or update committed to the database, Debezium will trigger an event so an application can register to it and react accordingly.
But why do Debezium, CDC, and Kafka help us fix the problem of dual-writes? An Apache Kafka topic is composed of one or more partitions. Each partition orders the events in the arriving order (events are always appended at the end of the partition). So if we want to maintain the order of concurrent operations (to avoid having misplaced data between systems), Kafka's topic resolves this part of the problem.
But of course, we still have the other part, reading from a database in the correct order in the case of concurrent operations. The CDC and log scanner approaches assure that the contents are in the correct order after transaction commitment and are non-intrusive. Debezium makes this possible.
You can operate Debezium in two different ways, and both are valid depending on the use case. These two methods are the Debezium Server or Debezium Engine (embed).
Debezium Server
Debezium Server runs Debezium as a Kafka Connect instance. Kafka Connect is a standalone process started by a consumer and/or producer to read data from Kafka. Kafka Connect defines connectors to different data systems and then moves large data sets into and out of Kafka. Since connectors use the Kafka API, they are scalable, fault-tolerant, and with low latency.
Suppose the following example; you want to export content from one Kafka topic to an index engine like ElasticSearch. You have two options:
- Create an application (like the one we saw in part 1 of this series) using the Kafka API to read events from a Kafka topic and then use the ElasticSearch client to populate data to the index.
- Use ElasticSearch Kafka Connect, which already implements all this logic, and you only need to configure and start.
Debezium does the same, but reads the transaction log from a database and sends the content to a Kafka topic.
One of the great things about Debezium is that it can connect to several databases such as MySQL, MongoDB, PostgreSQL, Oracle DB, SQL Server, DB 2, Cassandra and Vitesse.
Debezium Engine
The usual way to run Debezium is through Debezium Server, as it’s not intrusive to the application; it’s a service that takes data changes and populates a Kafka topic.
But not all applications require the same level of fault tolerance or scalability offered by Kafka Connect. Also, sometimes the application must capture the data change event but execute some custom logic and not send the change to a messaging system or an unsupported messaging system.
In these cases, a debezium-api module defines a small API to embed the Debezium Engine in the application.
So far, we’ve learned dual writes are something to avoid. The solution uses Change Data Capture to get data directly from the transaction log and push it to a Kafka topic so any other system can consume it in a “transactional” way and order.
Outbox Pattern
If you arrived at this point, you might wonder: “OK nice, I can use CDC to react to data changes, but I’m exposing the internal entity to external systems.” While this is true, allow me to introduce you to the Outbox Pattern to avoid this problem.
The Outbox Pattern provides an outbox table where you record all entities' operations (maybe with denormalized data). Then the CDC system (Debezium in our case) reacts to changes placed in the outbox table and not the entity table making the data model isolated from other systems:
The important part you need to be aware of is that both entity modifications and the outbox must be within the same transaction.
Let’s start putting all these pieces together in a Quarkus project and fix the problem we introduced at the beginning, how to insert a movie in the database and also populate it into an external system (Kafka topic).
Movie Plays Debezium
Instead of handcrafting code for each use case, let’s see how to use Debezium Embedded and how it’s integrated with Quarkus to solve this problem.
Creating the Project
Navigate to the Quarkus start page and select RestEasy Reactive and RestEasy Reactive Jackson extensions for marshaling/unmarshaling events from/to JSON-Java Object-Byte Array and implement JAX-RS endpoints, Panache and MySQL driver to insert movies into the MySQL database, and the SmallRye Reactive Messaging for interacting with Kafka. Also, uncheck the Started Code generation option.
In the following screenshot, you can see it:
You can skip this manual step and navigate to the Quarkus Generator link, where all the dependencies are selected. Then push the Generate your application button to download the scaffolded application’s zip file.
Unzip the file and open the project in your favorite IDE.
Development
Before we start to code, we need to add two new dependencies: one for using the Debezium Engine and another for adding the Debezium Quarkus Outbox extension.
Debezium Engine
Open the pom.xml
file and add the following dependencies.
In the dependencyManagement
section:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-bom</artifactId>
<version>1.9.4.Final</version>
<type>pom</type>
<scope>import</scope>
</dependency>
In the dependencies
section:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-ddl-parser</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
</dependency>
<!-- We connect to a MySQL database, so we need debezium MySQL connector -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
</dependency>
This is to use the Debezium Engine embedded in the application. None of these dependencies would be required if we used Debezium Server since it’s a standalone service.
Debezium Quarkus Outbox
Quarkus integrates with the Outbox Pattern through the Debezium Quarkus Outbox extension.
Open the pom.xml
file and add the following dependencies.
In the dependencyManagement
section:
<dependency>
<groupId>${quarkus.platform.group-id}</groupId>
<artifactId>quarkus-debezium-bom</artifactId>
<version>${quarkus.platform.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Note that the version of the BOM is aligned with the Quarkus version, 2.10.1.Final in this case.
In the dependencies
section:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-quarkus-outbox</artifactId>
</dependency>
Implementation
You can choose not to use the Outbox Pattern or implement it yourself; in this case, none of these dependencies are required. But we’ll use it to simplify the development.
With all these dependencies in place, create the Movie
entity annotated with JPA annotations and extend the PanacheEntity
class:
import javax.persistence.Entity;
import io.quarkus.hibernate.orm.panache.PanacheEntity;
@Entity
public class Movie extends PanacheEntity {
// No worries Quarkus will change them
// to private and auto-generate getters/setters at compilation time
public String name;
public String director;
public String genre;
}
The next step is to create an HTTP endpoint to insert the movie content into the database using JAX-RS annotations:
import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import org.jboss.logging.Logger;
@Path("/movie")
public class MovieResource {
// Service to insert the movie data into Movie and Outbox tables
@Inject
MovieService movieService;
// Injects the logger
@Inject
Logger logger;
// Http Post method to insert a movie
@POST
public Movie insert(Movie movie) {
logger.info("New Movie inserted " + movie.name);
System.out.println(":)");
return movieService.insertMovie(movie);
}
}
Since we are using the Debezium Quarkus Outbox extension, we need to create an entity representing the content stored in the outbox table. The entity must implement the ExportedEvent
interface and implement the required methods to identify the kind of event put in the outbox table.
import java.time.Instant;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.debezium.outbox.quarkus.ExportedEvent;
public class MovieEvent implements ExportedEvent<String, JsonNode> {
private static ObjectMapper mapper = new ObjectMapper();
// Set the type enclosed inside the event
private static final String TYPE = "Movie";
// Set the event type
private static final String EVENT_TYPE = "MovieCreated";
private final long gameId;
private final JsonNode jsonNode;
private final Instant timestamp;
// Saves Game info in the class
public MovieEvent(Movie movie) {
this.gameId = movie.id;
this.timestamp = Instant.now();
// Saves game content in a string column in JSON format
this.jsonNode = convertToJson(movie);
}
@Override
public String getAggregateId() {
return String.valueOf(this.gameId);
}
@Override
public String getAggregateType() {
return TYPE;
}
@Override
public JsonNode getPayload() {
return jsonNode;
}
@Override
public Instant getTimestamp() {
return timestamp;
}
@Override
public String getType() {
return EVENT_TYPE;
}
private JsonNode convertToJson(Movie movie) {
ObjectNode asJson = mapper.createObjectNode()
.put("id", movie.id)
.put("name", movie.name)
.put("director", movie.director)
.put("genre", movie.genre);
return asJson;
}
}
The last step before adding Debezium logic to the code is to implement the MovieService
class with insert logic. This logic should persist the movie into the Movie table and the MovieEvent
entity into a table managed by the OutboxEvent table extension.
The extension provides a specific CDI event to persist an event that implements the ExportedEvent
interface. The only thing to do is fire an event, and the data is automatically persisted.
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Event;
import javax.inject.Inject;
import javax.transaction.Transactional;
import io.debezium.outbox.quarkus.ExportedEvent;
@ApplicationScoped
public class MovieService {
// CDI event interface triggering Outbox entities
@Inject
Event<ExportedEvent<?, ?>> event;
// Transaction method
@Transactional
public Movie insertMovie(Movie movie) {
// Persists data
movie.persist();
// Persists outbox content
event.fire(new MovieEvent(movie));
return movie;
}
}
The last step is configuring Debezium Engine and to start it embedded within the application.
To configure the engine, you need to set the database information (hostname, port, credentials), and the database and tables Debezium should monitor to trigger events.
import java.io.File;
import java.io.IOException;
import javax.enterprise.inject.Produces;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import io.debezium.config.Configuration;
public class DebeziumConfiguration {
// Debezium needs Database URL and credentials to login and
// monitor transaction logs
@ConfigProperty(name = "quarkus.datasource.jdbc.url")
String url;
@ConfigProperty(name = "quarkus.datasource.password")
String password;
@ConfigProperty(name = "quarkus.datasource.username")
String username;
@Produces
public Configuration configureDebezium() throws IOException {
// Custom class to get database name or hostname of Database server
MySqlJdbcParser jdbcParser = MySqlJdbcParser.parse(url);
File fileOffset = File.createTempFile("offset", ".dat");
File fileDbHistory = File.createTempFile("dbhistory", ".dat");
return io.debezium.config.Configuration.create()
.with("name", "movies-mysql-connector")
// configures MySQL connector
.with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", fileOffset.getAbsolutePath())
.with("offset.flush.interval.ms", "60000")
// Configures database location
.with("database.hostname", jdbcParser.getHost())
.with("database.port", jdbcParser.getPort())
.with("database.user", "root")
.with("database.allowPublicKeyRetrieval", "true")
.with("database.password", password)
.with("database.dbname", jdbcParser.getDatabase())
.with("database.include.list", jdbcParser.getDatabase())
// Debezium only sends events for the modifications of OutboxEvent table and not all tables
.with("table.include.list", jdbcParser.getDatabase() + ".OutboxEvent")
.with("include.schema.changes", "false")
.with("database.server.id", "10181")
.with("database.server.name", "movies-mysql-db-server")
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename", fileDbHistory.getAbsolutePath())
.build();
}
}
The DebeziumListener
CDI class starts Debezium when the application is up and running.
Debezium Engine doesn’t run in a separate thread, so we need to provide a thread to run in parallel, not blocking the application thread. Using the ManagedExecutor
is the correct way to provide an executor thread within Quarkus to run Debezium.
Then we need to instantiate the Debezium Engine using the DebeziumEngine
class, setting the configuration properties created in the previous step. One of the most important steps is registering a method triggered every time Debezium generates an event. The notifying
method registers this custom method, and in our example, we named it handleChangeEvent
.
This method receives the event and we can implement any logic we wish, from sending to a Kafka topic, manipulating and sending it to another service, anything you can implement in Java.
import java.io.IOException;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.config.Configuration;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.reactive.messaging.kafka.Record;
import static io.debezium.data.Envelope.FieldName.*;
import static io.debezium.data.Envelope.Operation;
@ApplicationScoped
public class DebeziumListener {
private static ObjectMapper objectMapper = new ObjectMapper();
// Start the Debezium engine in a different thread
ManagedExecutor executor;
// Debezium configuration object
Configuration configuration;
private DebeziumEngine<RecordChangeEvent<SourceRecord>> engine;
public DebeziumListener(ManagedExecutor executor, Configuration configuration) {
this.executor = executor;
this.configuration = configuration;
}
// Interface to send events to movies Kafka topic
@Channel("movies")
Emitter<Record<Long, JsonNode>> movieEmitter;
void onStart(@Observes StartupEvent event) {
// Configures Debezium engine
this.engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(this.configuration.asProperties())
// For each event triggered by Debezium, the handleChangeEvnt method is called
.notifying(this::handleChangeEvent)
.build();
// Starts Debezium in different thread
this.executor.execute(this.engine);
}
void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
// For each triggered event, we get the information
SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
Struct sourceRecordChangeValue= (Struct) sourceRecord.value();
if (sourceRecordChangeValue != null) {
Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
// Only insert operations are processed
if(operation == Operation.CREATE) {
// Get insertation info
Struct struct = (Struct) sourceRecordChangeValue.get(AFTER);
String type = struct.getString("type");
String payload = struct.getString("payload");
if ("GameCreated".equals(type)) {
try {
final JsonNode payloadJson = objectMapper.readValue(payload, JsonNode.class);
long id = payloadJson.get("id").asLong();
// Populate content to Kafka topic
movieEmitter.send(Record.of(id, payloadJson));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
}
}
}
}
}
void onStop(@Observes ShutdownEvent event) throws IOException {
if (this.engine != null) {
this.engine.close();
}
}
}
Running
This example is self-contained, so you don’t need to start anything as Quarkus will do it for you.
Panache and Kafka Connector integrate with Quarkus DevServices. For this reason, we don’t need to start a Kafka cluster or a MySQL database nor configure them as Quarkus Dev mode will take care of everything. Remember to have a working container runtime on your computers, such as Podman or any other OCI-compliant tool.
Before running the application, we’ll add two configuration properties to the application to make things more traceable; in the application.properties
file, add the following lines:
quarkus.hibernate-orm.log.sql=true
quarkus.debezium-outbox.remove-after-insert=false
The first line logs SQL statements executed to the database. This is useful to validate both tables (Movies and OutboxEvent) when it inserts data.
The second one avoids Debezium deleting data from the outbox table after it’s consumed.
In one terminal window, start the service:
./mvnw clean quarkus:dev
…
2022-07-07 11:36:22,942 INFO [io.deb.con.mys.MySqlStreamingChangeEventSource] (debezium-mysqlconnector-movies-mysql-db-server-change-event-source-coordinator) Waiting for keepalive thread to start
2022-07-07 11:36:22,948 INFO [io.deb.con.mys.MySqlStreamingChangeEventSource] (debezium-mysqlconnector-movies-mysql-db-server-change-event-source-coordinator) Keepalive thread is running
2022-07-07 11:37:43,889 INFO [org.acm.MovieResource] (executor-thread-1) New Movie inserted string
After a few seconds, a Kafka cluster, MySQL instance, and the application are up and running.
Inspect the running containers to validate instances:
docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS
fa316bfae219 vectorized/redpanda:v21.11.3 "sh -c 'while [ ! -f…" 49 seconds ago Up 45 seconds 8081-8082/tcp, 9644/tcp, 0.0.0.0:55002->9092/tcp
4c220f7ee066 mysql:8.0 "docker-entrypoint.s…" 50 seconds ago Up 46 seconds 33060/tcp, 0.0.0.0:60652->3306/tcp
e41cae02ff02 testcontainers/ryuk:0.3.3 "/app" 53 seconds ago Up 50 seconds 0.0.0.0:60650->8080/tcp
Kafka cluster runs at port 55002 and MySQL with id (4c220f7ee066
) at port 60652
.
NOTE: Ports and IDs might be different in each case.
In another terminal window, run the curl
command to insert a new Movie.
curl -X 'POST' \
'http://localhost:8080/movie' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"name": "Minions: The Rise of Gru",
"director": "Kyle Balda",
"genre": "Animation"
}'
Inspect the Quarkus terminal window and see the SQL statements run against the database:
:)
Hibernate:
select
next_val as id_val
from
hibernate_sequence for update
Hibernate:
update
hibernate_sequence
set
next_val= ?
where
next_val=?
// Insert into Movie
Hibernate:
insert
into
Movie
(director, genre, name, id)
values
(?, ?, ?, ?)
// Automatically OutboxEvent table receives an insert
Hibernate:
insert
into
OutboxEvent
(aggregatetype, aggregateid, type, timestamp, payload, tracingspancontext, id)
values
(?, ?, ?, ?, ?, ?, ?)
To validate that Debezium detects the change and pushes it to the Movies Kafka topic, run the kcat
tool to query a Kafka topic, setting the exposed port of the service.
kcat -b localhost:55002 -C -t movies
{"id":1,"name":"Minions: The Rise of Gru","director":"Kyle Balda","genre":"Animation"}
% Reached end of topic movies [0] at offset 1
Conclusions
We’ve implemented a solution that fixes the dual writes problem between a database and an external system by using Debezium to read transaction logs and trigger an event for every change.
In this example, we used Debezium Embedded, and we implemented the logic to execute when an event was fired.
The embedded approach might work in some scenarios, but in others (especially in brownfield projects or projects where you require high scalability and fault-tolerance), Debezium Server might suit better. With Debezium Server (as a Kafka Connect process), no change in your code is required (no embed dependencies), as Debezium is a standalone process connecting to a database transaction log, detecting the changes, and sending them to a Kafka topic. Since events are ordered, any system can consume these changes from the topic.
Although an Outbox Pattern is not mandatory when using Debezium (at the very end, Debezium can listen for changes in any table), it’s a good practice to isolate your data, and an Outbox Pattern helps you with this.
Integrating (micro)service architectures might seem easy initially, but when you start integrating data, things become more complex, and the Debezium project is here to help you with this task.
Source code is available on GitHub.