Key Takeaways
- Kafka Streams let you process, transform, join, and manipulate Kafka events in real-time.
- Quarkus integrates with Kafka Streams so the only thing you need to do is define the topology.
- Quarkus Dev mode helps with testing Kafka Streams code by providing a Kafka cluster.
- Kafka Streams Interactive Queries is the way to consume real-time processing synchronously.
In part 1 of this series, we learned about the integration between Apache Kafka and Quarkus where we developed a simple application producing and consuming events from two Kafka topics.
In that example, we simulated a Movies streaming company. We stored Movies in one Kafka topic and, in another Kafka topic, we stored each occurrence when a user stopped watching a movie and captured the amount of time it had been played.
The following figure shows the architecture of the application:
We saw that consuming messages are simple; you get them as long as they are produced, but nothing more. But if you need real-time processing of the data (for example, filtering events or manipulating events) or you need to make some correlations between events, just using the Kafka-consuming API might not be the best approach as the resulting code becomes complex.
Kafka Streams
The Kafka Streams project helps you consume real-time streams of events as they are produced, apply any transformation, join streams, etc., and optionally write new data representations back to a topic.
Kafka Streams is ideal for both stateless and stateful streaming applications, implements time-based operations (for example grouping events around a given time period), and has in mind the scalability, reliability, and maintainability always present in the Kafka ecosystem.
A Kafka Stream is composed of three elements: inputs (source processors), outputs (sink processors), and processors (stream processors).
Source processors: A source processor represents a Kafka topic. A source processor sends the events to one or multiple stream processors.
Stream processors: Stream processors apply transformations/logic to input streams like joining, grouping, counting, mapping, etc. A stream processor can be connected to another stream processor and/or a sink processor.
Sink processors: A sink processor represents the output data and is connected to a Kafka topic.
A topology is an acrylic graph (graph with no cycles) composed of sources, processors, and sinks, and then passed into a Kafka Streams instance that will begin the execution of the topology.
Kafka Streams and Quarkus
Quarkus integrates with Kafka Streams using the Quarkus KStreams extension.
Getting started with Quarkus
The quickest way to use Quarkus is to add the required dependencies through the start page. Each service may have different dependencies, and you can choose between Java 11 or Java 17. For the integration between Quarkus and Kafka Streams, you need to add at least the Kafka Streams extension.
The application under development
As mentioned at the beginning of this article, in part 1 of the series, we developed a Movies streaming company with two Kafka topics: one for storing a list of movies, and a second topic storing every time a user stops watching a movie, storing the user region where the movie played (the key of the event), and also the id of the movie and the amount of time it was watched as value.
All this logic was created in a producer service named Movie Plays Producer developed in Quarkus.
Moreover, we also developed a Movie Plays Consumer service developed in Quakus, which was consuming events from both topics and showing them in the console (and as HTTP server-side events).
But there was no processing of the data; it was just streamed as it was received. What’s happening if we want to do a join between the movies and playtimemovies topics to get the duration of each movie not with the id but with all of the movie information?
Implementing this logic with just Kafka messages could become a complex task as you need to store Movie information on a Map, and then, for every new playedmovie event, do the match.
Movie Plays KStream
Instead of handcrafting code for each use case, let’s see how to use Kafka Streams and how it’s integrated with Quarkus to solve this problem.
Creating the Project
Navigate to the Quarkus start page and select the Apache Kafka Streams extension for integration with Kafka Streams. Then select RestEasy and RestEasy Jackson extensions for marshaling/unmarshaling events from/to JSON-Java Object-Byte Array. Also, uncheck the Started Code generation option.
In the following screenshot, you can see it:
You have the option to skip this manual step and navigate to the Kafka Stream Quarkus Generator link where all of the dependencies are selected. Then push the Generate your application button to download the scaffolded application’s zip file.
Unzip the file and open the project in your favorite IDE.
Development
The first thing to do when developing a Kafka Stream is to create the Topology
instance and define the sources, processors, and sinks.
In Quarkus, you only need to create a CDI class with a method returning a Topology
instance.
Create a new class named TopologyProducer
which will implement the consuming events from the two topics and join them. Finally, the result is sent to a sink processor that sends the result as a console output.
One element not yet explained, and really useful in these use cases, is Kafka Tables.
A topic can contain multiple events with the same key. For example, you can insert a movie with a key, and then you could update the movie using the same key creating a new event:
But if you want to join a movies topic with a playtimemovies topic, which event with key 1 should be used? The first or the second one? In this specific case, it would be the latest one as it’s the one that contains the most updated version of the movie. To get the latest version of each of the events, Kafka Streams has the concept of a table (KTable/GlobalKTable).
Kafka Streams will navigate through a given topic, get the latest version of each event, and put it in a table instance.
The KafkaStream extension doesn’t automatically register a SerDes
class as happened with Kafka Messaging integration, so we need to register them manually in the topology.
package org.acme;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Printed;
import io.quarkus.kafka.client.serialization.ObjectMapperSerde;
@ApplicationScoped
public class TopologyProducer {
private static final String MOVIES_TOPIC = "movies";
private static final String PLAY_MOVIES_TOPIC = "playtimemovies";
@Produces
public Topology getTopCharts() {
final StreamsBuilder builder = new StreamsBuilder();
// SerDes for Movie and PlayedMovie
final ObjectMapperSerde<Movie> movieSerder = new ObjectMapperSerde<>(Movie.class);
final ObjectMapperSerde<MoviePlayed> moviePlayedSerder = new ObjectMapperSerde<>(MoviePlayed.class);
// Creation of a Global Kafka Table for Movies topic
final GlobalKTable<Integer, Movie> moviesTable = builder.globalTable(
MOVIES_TOPIC,
Consumed.with(Serdes.Integer(), movieSerder));
// Stream connected to playtimemovies topic, every event produced there is consumed by this stream
final KStream<String, MoviePlayed> playEvents = builder.stream(
PLAY_MOVIES_TOPIC, Consumed.with(Serdes.String(), moviePlayedSerder));
// PlayedMovies has the region as key, and the object as value. Let’s map the content so the key is the movie id (to do the join) and leave the object as value
// Moreover, we do the join using the keys of the movies table (movieId) and the keys of the stream (we changed it to be the movieId too in the map method).
// Finally, the result is streamed to console
playEvents
.map((key, value) -> KeyValue.pair(value.id, value)) // Now key is the id field
.join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie)
.print(Printed.toSysOut());
return builder.build();
}
}
The Movie
and MoviePlayed
POJOs contain the attributes required for the logic:
The Movie
object is:
package org.acme;
public class Movie {
public int id;
public String name;
public String director;
public String genre;
public Movie(int id, String name, String director, String genre) {
this.id = id;
this.name = name;
this.director = director;
this.genre = genre;
}
}
The MoviePlayed
object is:
package org.acme;
public class MoviePlayed {
public int id;
public long duration;
public MoviePlayed(int id, long duration) {
this.id = id;
this.duration = duration;
}
}
The last step before running the Kafka Stream application is to configure some parameters, the most important of which being quarkus.kafka-streams.topics
. It’s a list of topics required to be present in the Kafka cluster before the topology has started processing data as a precondition.
Open the src/main/resources/application.properties
file and add the following lines:
kafka-streams.cache.max.bytes.buffering=10240
kafka-streams.commit.interval.ms=1000
kafka-streams.metadata.max.age.ms=500
kafka-streams.auto.offset.reset=earliest
kafka-streams.metrics.recording.level=DEBUG
quarkus.kafka-streams.topics=playtimemovies,movies
Now, it’s time to test the stream. Let’s boot up the producer that was developed in the previous article. The source code of the producer may be found here.
Quarkus KStreams integrates with Quarkus DevServices. For this reason, we don’t need to start a Kafka cluster nor configure its location as Quarkus Dev mode will take care of everything. Just remember to have a working container runtime on your computer, such as Podman or any other OCI-compliant tool.
In one terminal window start the producer service:
cd movie-plays-producer
./mvnw compile quarkus:dev
2022-04-11 07:49:31,900 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Cruella played for 287 minutes
2022-04-11 07:49:31,941 INFO [io.quarkus] (Quarkus Main Thread) movie-plays-producer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.7.3.Final) started in 4.256s.
2022-04-11 07:49:31,942 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2022-04-11 07:49:31,943 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kafka-client, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]
2022-04-11 07:49:32,399 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Encanto played for 162 minutes
2022-04-11 07:49:32,899 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie The Hobbit played for 255 minutes
2022-04-11 07:49:33,404 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Sing 2 played for 264 minutes
2022-04-11 07:49:33,902 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Encanto played for 28 minutes
2022-04-11 07:49:34,402 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Star Trek: First Contact played for 137 minutes
2022-04-11 07:49:34,903 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Star Trek: First Contact played for 277 minutes
2022-04-11 07:49:35,402 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie The Hobbit played for 141 minutes
In another terminal window, start the Kafka Stream code we’ve developed now:
./mvnw compile quarkus:dev
2022-04-11 07:54:59,321 INFO [org.apa.kaf.str.pro.int.StreamTask] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] task [1_0] Restored and ready to run
2022-04-11 07:54:59,322 INFO [org.apa.kaf.str.pro.int.StreamThread] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] Restoration took 74 ms for all tasks [1_0]
2022-04-11 07:54:59,322 INFO [org.apa.kaf.str.pro.int.StreamThread] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
2022-04-11 07:54:59,324 INFO [org.apa.kaf.str.KafkaStreams] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-client [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea] State transition from REBALANCING to RUNNING
[KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella]
[KSTREAM-LEFTJOIN-0000000005]: 2, Movie [director=Jonathan Frakes, genre=Space, id=2, name=Star Trek: First Contact]
[KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella]
[KSTREAM-LEFTJOIN-0000000005]: 1, Movie [director=Peter Jackson, genre=Fantasy, id=1, name=The Hobbit]
[KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella]
[KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella]
[KSTREAM-LEFTJOIN-0000000005]: 3, Movie [director=Jared Bush, genre=Animation, id=3, name=Encanto]
[KSTREAM-LEFTJOIN-0000000005]: 5, Movie [director=Garth Jennings, genre=Jukebox Musical Comedy, id=5, name=Sing 2]
What’s printed in the output is an event (resulting from the join) where the key is the movieId
and the value is the movie
itself. What we now have is that every time a movie is stopped, Kafka Stream processes it and displays it with all Movie information.
So far, not so complicated, and you may be thinking that it isn’t worth using Kafka Streams for just this use case. But let’s start adding more requirements, so you see how powerful it is.
Instead of generating an event every time a user stops the movie, let’s send the event only for the movies that the user watched for more than 10 minutes.
Use the filter
method to filter movies by duration.
playEvents
.filter((region, event) -> event.duration >= 10) // filters by duration
.map((key, value) -> KeyValue.pair(value.id, value))
.join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie)
.print(Printed.toSysOut());
Restart the application, and you will notice that movies with a duration less than or equal to 10 minutes won’t be processed.
We’re starting to see that Kafka Streams helps with the cleanliness of the code, but let’s add the final requirement. We aren’t interested in the duration each movie plays, but the number of times each movie is played for more than 10 minutes.
So far, the processing of events is stateless as the event was received, processed, and sent to a sink processor (either to a topic or as a console output), but to count the number of times a movie has been played, we need some memory to remember how many times a movie has been played and increment by one when it’s watched again by any user for more than 10 minutes. The processing of the events needs to be done in a stateful way.
The first thing we need to create is a Java class to store the movie name and the number of times that is visualized.
public class MoviePlayCount {
public String name;
public int count;
public MoviePlayCount aggregate(String name) {
this.name = name;
this.count++;
return this;
}
@Override
public String toString() {
return "MoviePlayCount [count=" + count + ", name=" + name + "]";
}
}
This is the counter class, but it still needs two things:
- A place to store the instances of this class so they aren’t reset every time an event is fired.
- A logic that calls the
aggregate
method every time an event is triggered in a playtimemovies topic.
For the first problem, we need to use the KeyValueBytesStoreSupplier
interface.
public static final String COUNT_MOVIE_STORE = "countMovieStore";
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(COUNT_MOVIE_STORE);
For the second problem, Kafka Streams has the aggregate
method to aggregate results.
In our use case, the number of times a movie is played for more than 10 minutes.
// MoviePlayCount might be serialized/deserialized too
final ObjectMapperSerde<MoviePlayCount> moviePlayCountSerder = new ObjectMapperSerde<>(MoviePlayCount.class);
// This is the join call seen before, where key is the movie id and value is the movie
.join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie)
// Group events per key, in this case movie id
.groupByKey(Grouped.with(Serdes.Integer(), movieSerder))
// Aggregate method gets the MoviePlayCount object if already created (if not it creates it) and calls its aggregate method to increment by one the viwer counter
.aggregate(MoviePlayCount::new,
(movieId, movie, moviePlayCounter) -> moviePlayCounter.aggregate(movie.name),
Materialized.<Integer, MoviePlayCount> as(storeSupplier)
.withKeySerde(Serdes.Integer())
.withValueSerde(moviePlayCountSerder)
)
Restart the application, and the number of times a movie is played is shown on the console.
TIP: To restart the application, push the letter “s” in the terminal and it’s automatically restarted.
After restarting the application, the terminal starts displaying the stats for each movie:
[KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=13, name=Cruella]
[KTABLE-TOSTREAM-0000000011]: 3, MoviePlayCount [count=11, name=Encanto]
[KTABLE-TOSTREAM-0000000011]: 5, MoviePlayCount [count=14, name=Sing 2]
[KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=15, name=Star Trek: First Contact]
[KTABLE-TOSTREAM-0000000011]: 1, MoviePlayCount [count=16, name=The Hobbit]
[KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=16, name=Star Trek: First Contact]
[KTABLE-TOSTREAM-0000000011]: 3, MoviePlayCount [count=12, name=Encanto]
[KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=17, name=Star Trek: First Contact]
[KTABLE-TOSTREAM-0000000011]: 5, MoviePlayCount [count=15, name=Sing 2]
[KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=14, name=Cruella]
[KTABLE-TOSTREAM-0000000011]: 1, MoviePlayCount [count=17, name=The Hobbit]
[KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=15, name=Cruella]
[KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=16, name=Cruella]
Interactive Queries
The aggregated results are streamed to the console as we set the sink processor as the System.out
stream.
.toStream()
.print(Printed.toSysOut());
But you could also send the resulting stream to a Kafka topic:
.to("counter_movies", Produced.with(Serdes.Integer(), moviePlayCountSerder)
);
But what happens if you are not interested in reacting every time a new event is sent, but just query how many times a specific movie has been played at that time?
Kafka Streams interactive queries allow you to directly query the underlying state store for the value associated with a given key.
First of all, let’s create a class with the name MoviePlayCountData
for storing the result of the query. We do it in this way to decouple classes used in Kafka Streams from classes used in the rest of the application.
public class MoviePlayCountData {
private String name;
private int count;
public MoviePlayCountData(String name, int count) {
this.name = name;
this.count = count;
}
public int getCount() {
return count;
}
public String getName() {
return name;
}
}
Now create a class named InteractiveQueries
to implement the access to the state store (KeyValueBytesStoreSupplier
) and query the number of times a movie has been played by its id
.
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import static org.apache.kafka.streams.StoreQueryParameters.fromNameAndType;
import java.util.Optional;
@ApplicationScoped
public class InteractiveQueries {
@Inject
KafkaStreams streams;
public Optional<MoviePlayCountData> getMoviePlayCountData(int id) {
// gets the state store and get the movie count by movie id
MoviePlayCount moviePlayCount = getMoviesPlayCount().get(id);
// If there is a result
if (moviePlayCount != null) {
// Wrap the result into MoviePlayCountData
return Optional.of(new MoviePlayCountData(moviePlayCount.name, moviePlayCount.count));
} else {
return Optional.empty();
}
}
// Gets the state store
private ReadOnlyKeyValueStore<Integer, MoviePlayCount> getMoviesPlayCount() {
while (true) {
try {
return streams.store(fromNameAndType(TopologyProducer.COUNT_MOVIE_STORE, QueryableStoreTypes.keyValueStore()));
} catch (InvalidStateStoreException e) {
// ignore, store not ready yet
}
}
}
}
We now can add a simple REST endpoint to run this query.
import java.util.Optional;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
@Path("/movie")
public class MovieCountResource {
// Injects the previous class to make queries
@Inject
InteractiveQueries interactiveQueries;
@GET
@Path("/data/{id}")
public Response movieCountData(@PathParam("id") int id) {
Optional<MoviePlayCountData> moviePlayCountData = interactiveQueries.getMoviePlayCountData(id);
// Depending on the result returns the value or a 404
if (moviePlayCountData.isPresent()) {
return Response.ok(moviePlayCountData.get()).build();
} else {
return Response.status(Status.NOT_FOUND.getStatusCode(),
"No data found for movie " + id).build();
}
}
}
The schema of the implemented Kafka Stream is seen in the following figure:
Scaling Out
Kafka Streams applications can be scaled out, so the streams are distributed through multiple instances. In this case, each instance contains a subset of the aggregation results, so to get the total aggregation result, you need to fetch the data directly from the other instance by redirecting the REST API request to that instance.
Kafka Streams provides an API to know if the data requested is in the local Kafka Streams store or in another host.
Although the process is not complicated, it is beyond the scope of this article.
Conclusions
So far, you’ve seen that it is easy to connect a Quarkus application to Apache Kafka and start producing and consuming messages/events from a topic. Also, you’ve seen that Kafka Streams lets us consume Kafka messages but also process them in real-time, applying transformations, filtering, and for example consuming resulted data in a synchronous way. It’s a powerful technology that scales out easily to provide a real-time experience when the data to process changes constantly.
But we’ve not tackled the last problem of this architecture. Usually, data isn’t stored in a single place. The movie information might be stored in a relational database and the played movie information in a Kafka topic. So how do you maintain the information updated in both places so Kafka Streams can join data correctly?
There is a missing part here, a project called Debezium to help us on this. We’ll dedicate an entire article about Debezium and Quarkus. Stay tuned.