BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Getting Started to Quarkus Reactive Messaging with Apache Kafka

Getting Started to Quarkus Reactive Messaging with Apache Kafka

This item in japanese

Lire ce contenu en français

Key Takeaways

  • Apache Kafka (or Kafka) is a distributed event store and stream-processing platform for storing, consuming, and processing data streams.
  • Quarkus integrates perfectly with reactive applications that use Apache Kafka for event processing.
  • Quarkus Dev Services provides you with an automatic way to boot up a Kafka cluster to test your application.
  • Apache Kafka can replace messaging systems like JMS, especially when you need real-time stream processing.

What Is and Why Apache Kafka?

How data is processed/consumed nowadays is different from how it was previously practiced. In the past, data was stored in a database and it was batch processed to get some analytics.

Although this approach is correct, more modern platforms let you process data in real-time as data comes to the system.

Apache Kafka (or Kafka) is a distributed event store and stream-processing platform for storing, consuming, and processing data streams.

There are five essential concepts in Kafka to understand how it works:

  • Event (or message): An event is a timestamped key-value pair representing the data stored in the system to be processed. From the Kafka point of view, it is just a chunk of bytes.
  • Partition: A partition is the place where events are produced and consumed. In a partition, the order of the events is guaranteed.
  • Topic: A topic is composed of one or more partitions. The topic is the unit of work that developers work with either to consume or produce events.
  • Consumer: A consumer subscribes to a topic and is notified every time an event is published into a topic.
  • Producer: A producer publishes an event into a topic (actually one of the partitions belonging to the topic).

One of the key aspects of Apache Kafka is that it was created with scalability and fault-tolerant in mind, making it appropriate for high-performance applications. Kafka can be considered a replacement for some conventional messaging systems such as Java Message Service (JMS) and Advanced Message Queuing Protocol (AMQP).

Apache Kafka has integrations with most of the languages used these days, but in this article, we’ll cover its integration with Java, specifically to the Quarkus Java stack.

What is Quarkus?

Quarkus is a full-stack, Kubernetes-native Java framework made for Java virtual machines (JVMs) and native compilation, optimizing Java specifically for containers and enabling it to become an effective platform for serverless, cloud, and Kubernetes environments.

Instead of reinventing the wheel, Quarkus uses well-known enterprise-grade frameworks backed by standards/specifications and makes them compilable to a binary using GraalVM.

How to Integrate Kafka in Quarkus

Quarkus uses the SmallRye Reactive Messaging project to interact with Apache Kafka.

Getting Started with Quarkus

The quickest way to start using Quarkus is by adding the required dependencies through the start page. Each service may have different dependencies, and you can choose between Java 11 or Java 17. For the integration between Quarkus and Kafka, you need to add the SmallRye Reactive Messaging - Kafka Connector extension.

The Application under Development

Suppose we are a Movies streaming company, and one use case is to store movies. This could be achieved using a traditional database, that’s true, but given that a great user experience requires real-time interaction, we decided to store them in Kafka.

So there are two services: one that produces an event every time a user stops playing a movie and another service consumes these events and shows/streams them as Server-Side events.

The following figure shows the architecture:

Let’s implement these services in Quarkus and explain a bit more of the internal details.

Movie Plays Producer

Every time a user stops a movie, this service sends an event to the Kafka PlaytimeMovies topic. The event contains the movie ID and the total amount of time watched. For demo purposes, a timer automatically triggers the logic simulating whether a user has watched a movie.

When the service starts, it produces some movies on the Kafka Movies topic.

Creating the Project

Navigate to the Quarkus start page and select the smallrye-reactive-messaging-kafka extension for integration with Kafka. Then select the Jackson extension for marshaling/unmarshaling events from/to JSON-Java Object-Byte Array. Also, uncheck the Started Code generation option.

In the following screenshot, you can see it:

You have the option to skip this manual step and navigate to the Kafka Quarkus Generator link where all of them are selected. Then push the Generate your application button to download the scaffolded application's zip file.

Unzip the file and open the project in your favorite IDE.

Development

Let’s create two POJOs, one representing a Movie and another one a PlayedMovie.

public class Movie {
  
   public int id;
   public String name;
   public String director;
   public String genre;
 
   public Movie(int id, String name, String director, String genre) {
       this.id = id;
       this.name = name;
       this.director = director;
       this.genre = genre;
   }
}

A movie contains an id, a name, the director, and the genre of the movie.

  
   public int id;
   public long duration;
 
   public MoviePlayed(int id, long duration) {
       this.id = id;
       this.duration = duration;
   }
}

A played movie contains an id referring to the identifier of the played movie and the duration that the user has been watching the movie.

We will need a new class named MovieKafkaGenerator that has the responsibility of storing movies into a Kafka topic and simulating played movies.

Two classes are initially required to start emitting events to a topic; one is the annotation, @Outgoing, used to specify where to send the events in the form of a channel that will be configured to point to the Kafka Movies topic, and Record class, representing a wrapper of the event where the key and value are specified.

Now let’s create the MovieKafkaGenerator class to produce the movies into the Kafka Movies topic.

package org.acme.movieplays;
 
import java.time.Duration;
import java.util.List;
import java.util.Random;
 
import javax.enterprise.context.ApplicationScoped;
 
import org.eclipse.microprofile.reactive.messaging.Outgoing;
 
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.kafka.Record;
 
@ApplicationScoped
public class MovieKafkaGenerator {
 
   private List<Movie> movies = List.of(
       new Movie(1, "The Hobbit", "Peter Jackson", "Fantasy"),
       new Movie(2, "Star Trek: First Contact", "Jonathan Frakes", "Space"),
       new Movie(3, "Encanto", "Jared Bush", "Animation"),
       new Movie(4, "Cruella", "Craig Gillespie", "Crime Comedy"),
       new Movie(5, "Sing 2", "Garth Jennings", "Jukebox Musical Comedy")
   );
 
   // Populates movies into Kafka topic
   @Outgoing("movies")                                         
   public Multi<Record<Integer, Movie>> movies() {
       return Multi.createFrom().items(movies.stream()
               .map(m -> Record.of(m.id, m))
       );
   }
}

There are several important things to notice in this class:

  • The class is a CDI scope class (@ApplicationScoped)
  • For the sake of simplicity, the movies are defined in a list.
  • The @Outgoing annotation is used to set where to send the events (movies channel). Return elements from the movies() method are automatically sent to the defined channel. The return type may be a reactive/asynchronous type (usually io.smallrye.mutiny.Multi in Quarkus) wrapping the event content. The channel is later configured to point to a topic.
  • The Record (it’s the event/message) has the movie ID as the key and the movie object as a value.

The final step is to configure the Quarkus parameters to connect to the Kafka instance. Quarkus applications are configured in the application.properties file found in the src/main/resources/ directory.

You can easily configure the relationship between the channel and the topic via the following generic property:

mp.messaging.outgoing.<channel_name>.topic=<topic>

In our example application, this would be defined as:

mp.messaging.outgoing.movies.topic=movies

So you might be wondering where the Kafka broker location is configured? For local development purposes, you don’t need it as Quarkus offers their Dev Services feature for Kafka. Dev Services provisions an instance of a required external dependency (i.e., database instance, Kafka broker, Keycloak service, etc.) in a working container runtime, such as Podman or any other OCI-compliant tool. From a developer’s perspective, if you include an extension and don’t configure it, Quarkus will automatically boot up the service and configure the application to use it.

For this reason, we don’t need any other configuration parameters during the development cycle. Quarkus will do it for us.

IMPORTANT: You need to have a Docker host running in your local machine to run the example. If you don’t have one, but you have a deployed Kafka broker, we’ll see later on how to configure this “remote” instance into a Quarkus application.

With the Docker host up and running, start the application in Quarkus dev mode from a terminal:

./mvnw compile quarkus:dev

And the terminal output should be something like:

[INFO] Scanning for projects...
[INFO]
[INFO] -------------------< org.acme:movie-plays-producer >--------------------
[INFO] Building movie-plays-producer 1.0.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
….
2022-03-21 11:37:24,474 INFO  [io.qua.sma.dep.processor] (build-8) Configuring the channel 'movies' to be managed by the connector 'smallrye-kafka'
2022-03-21 11:37:24,483 INFO  [io.qua.sma.rea.kaf.dep.SmallRyeReactiveMessagingKafkaProcessor] (build-30) Generating Jackson serializer for type org.acme.movieplays.Movie

--
--
Checking Docker Environment 2022-03-21 11:37:25,018 INFO  [org.tes.uti.ImageNameSubstitut

--
2022-03-21 11:37:28,500 INFO  [io.qua.kaf.cli.dep.DevServicesKafkaProcessor] (build-22) Dev Services for Kafka started. Other Quarkus applications in dev mode will find the broker automatically. For Quarkus applications in production mode, you can connect to this by starting your application with -Dkafka.bootstrap.servers=OUTSIDE://localhost:32769

2022-03-21 11:37:29,581 INFO  [io.quarkus] (Quarkus Main Thread) movie-plays-producer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.7.3.Final) started in 6.666s.
2022-03-21 11:37:29,582 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2022-03-21 11:37:29,582 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kafka-client, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]

The application is compiled, then automatically configures the Jackson serializer (remember we added the extension in the beginning) to serialize the Movie object to a byte array to store in the Kafka topic. Then the Kafka broker is started automatically at localhost:32769, and the application is automatically configured to connect to it. Finally, the application is up and running, and all movies are inserted into the Kafka Movies topic.

We can use the kcat tool to inspect topic content. Run the following command in a terminal window substituting the Kafka broker address with yours:

kcat -b localhost:32769 -t movies -C -K:

:{"id":1,"name":"The Hobbit","director":"Peter Jackson","genre":"Fantasy"}
:{"id":2,"name":"Star Trek: First Contact","director":"Jonathan Frakes","genre":"Space"}
:{"id":3,"name":"Encanto","director":"Jared Bush","genre":"Animation"}
:{"id":4,"name":"Cruella","director":"Craig Gillespie","genre":"Crime Comedy"}
:{"id":5,"name":"Sing 2","director":"Garth Jennings","genre":"Jukebox Musical Comedy"}
% Reached end of topic movies [0] at offset 5

Stop the application, and let’s add the part that generates played movies.

Open MovieKafkaGenerator class and add the following code:

private Random random = new Random();
 
@Inject
Logger logger;
 
@Outgoing("play-time-movies")
public Multi<Record<String, PlayedMovie>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofMillis(1000))   
            .onOverflow().drop()
            .map(tick -> {
                Movie movie = movies.get(random.nextInt(movies.size()));
                int time = random.nextInt(300);
                logger.info("movie {0} played for {1} minutes", movie.name, time);
                // Region as key
                return Record.of("eu", new PlayedMovie(movie.id, time));
    });
   }

There are several important things to notice in this new method:

  • Events are generated to the play-time-movies channel.
  • A new event is triggered every second.
  • A movie is randomly selected in the map method and assigns a random time played.
  • A Record (event/message) is created, in this case, the key represents the user's region, and the value is the PlayedMovie object.

Finally, open the application.properties file and configure the new channel:

mp.messaging.outgoing.play-time-movies.topic=playtimemovies

Start the application again, and the application will generate a new event every second.

./mvnw compile quarkus:dev

2022-03-21 12:36:01,297 INFO  [io.sma.rea.mes.kafka] (Quarkus Main Thread) SRMSG18258: Kafka producer kafka-producer-play-time-movies, connected to Kafka brokers 'OUTSIDE://localhost:32771', is configured to write records to 'playtimemovies'

2022-03-21 12:36:01,835 INFO  [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Cruella played for 148 minutes
2022-03-21 12:36:02,336 INFO  [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Star Trek: First Contact played for 288 minutes
2022-03-21 12:36:02,836 INFO  [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Cruella played for 176 minutes

The console shows the log lines every time a new event is published. Let’s use the kcat tool to inspect the topic's content.

kcat -b localhost:32773 -t playtimemovies -C -K:

eu:{"id":4,"duration":88}
eu:{"id":3,"duration":291}
eu:{"id":1,"duration":228}
eu:{"id":2,"duration":165}
eu:{"id":1,"duration":170}
eu:{"id":4,"duration":75}

Movie Plays Consumer

This service is responsible for consuming events from a Kafka topic. The consumed event is streamed using HTTP server-side events to the caller. The event is the playedmovie data containing the played movie ID and the total amount of time that has been watched.

Creating the project

Navigate to the Quarkus start page and select the resteasy-reactive-jackson to implement JAX-RS reactive endpoints with Jackson support for marshaling/unmarshaling Java objects to/from JSON, and the smallrye-reactive-messaging-kafka extension for integrating with Kafka. Also, uncheck the Started Code generation option.

Again, You have the option to skip this manual step and navigate to the following Kafka Quarkus Generator link where all of them are already selected. Then push the Generate your application button to download the scaffolded application's zip file.

Unzip the file and open the project in your favorite IDE.

Development

This service deals with PlayedMovie events, so let’s create a simple POJO for this element:

public class PlayedMovie {
  
   public int id;
   public long duration;
 
   public MoviePlayed(int id, long duration) {
       this.id = id;
       this.duration = duration;
   }
}

Then create a new class named PlayedMovieResource and write a JAX-RS reactive endpoint to stream the consumed events from the Kafka topic.

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
 
import org.eclipse.microprofile.reactive.messaging.Channel;
 
import io.smallrye.mutiny.Multi;
 
@Path("/movies")
public class PlayedMovieResource {
 
   @Channel("played-movies")
   Multi<MoviePlayed> playedMovies;
 
   @GET
   @Produces(MediaType.SERVER_SENT_EVENTS)
   public Multi<PlayedMovie> stream() {
       return playedMovives;
   }
}

It’s a small class that does a lot:

  • Exposes an HTTP endpoint to a /movies endpoint using the @Path annotation.
  • Processes events on a channel named played-movies. Every time a new event is sent to the channel (i.e., an event is published to a Kafka topic), it’s automatically published into the Multi instance.
  • When invoking the /movies endpoint using an HTTP GET method, the application starts streaming the events received in the channel.

Finally, configure the channel in the application.properties file to configure the channel (the topic and the offset strategy) and change the listening port to 9090 so it doesn’t collide with the producer service at port 8080.

mp.messaging.incoming.movies-played.topic=playtimemovies
mp.messaging.incoming.movies-played.auto.offset.reset=earliest
 
%dev.quarkus.http.port=9090

Having the movie-player-producer service up and running in one terminal, let’s start the movie-player-consumer. In a new terminal window, run the service in dev mode.

./mvnw compile quarkus:dev

[INFO] Scanning for projects...
[INFO]
[INFO] -------------------< org.acme:movie-plays-consumer >--------------------
[INFO] Building movie-plays-consumer 1.0.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
….
2022-03-21 17:59:08,079 INFO  [io.qua.sma.dep.processor] (build-13) Configuring the channel 'movies-played' to be managed by the connector 'smallrye-kafka'
2022-03-21 17:59:08,092 INFO  [io.qua.sma.rea.kaf.dep.SmallRyeReactiveMessagingKafkaProcessor] (build-33) Generating Jackson deserializer for type org.acme.movieplays.MoviePlayed
….
2022-03-21 17:59:10,617 INFO  [io.sma.rea.mes.kafka] (smallrye-kafka-consumer-thread-0) SRMSG18257: Kafka consumer kafka-consumer-movies, connected to Kafka brokers 'localhost:32771, belongs to the 'movie-plays-consumer'
….
2022-03-21 17:59:10,693 INFO  [io.quarkus] (Quarkus Main Thread) movie-plays-consumer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.7.3.Final) started in 4.114s. Listening on: http://localhost:9090
2022-03-21 17:59:10,693 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2022-03-21 17:59:10,694 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kafka-client, resteasy-reactive, resteasy-reactive-jackson, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]

The application is compiled, then automatically configures the Jackson deserializer (remember we added the extension in the beginning) to deserialize the objects from the byte array stored in a Kafka topic to a Java object. The running application detects an already-started Kafka cluster and automatically connects to it. Finally, the application is started at port 9090.

In a new terminal window, run the following curl command to get the streaming data:

curl -N localhost:9090/movies
data:{"id":4,"duration":213}

data:{"id":4,"duration":3}

data:{"id":3,"duration":96}

data:{"id":5,"duration":200}

data:{"id":2,"duration":234}

data:{"id":1,"duration":36}

data:{"id":1,"duration":162}

data:{"id":3,"duration":88}

You can observe how the data is streamed automatically from the Kafka topic and sent as HTTP requests.

The previous example served two purposes: injecting a channel as a Multi instance to receive the events and to send those events to a method annotated with @Incoming.

Stop the consumer service and add the following piece of code to the PlayedMovieResource class to consume events from the Kafka Movie topic:

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;


@Inject
Logger logger;
 
@Incoming("movies")
public void newMovie(Movie movie) {
   logger.infov("New movie: {0}", movie);
}

In this case, every time a new movie is sent to the movies channel (Movies topic), the newMovie() method is called. The method parameter is the event's payload from the Kafka topic.

Configure the channel in the application.properties file to point to the Movies topic.

mp.messaging.incoming.movies.topic=movies
mp.messaging.incoming.movies.auto.offset.reset=earliest

Now start the movie-plays-consumer service again, and notice some log lines printing the list of movies:

./mvnw compile quarkus:dev

[INFO] Scanning for projects...
[INFO]
[INFO] -------------------< org.acme:movie-plays-consumer >--------------------
[INFO] Building movie-plays-consumer 1.0.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
…
2022-03-21 17:59:12,146 INFO  [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-13) SRMSG18256: Initialize record store for topic-partition 'movies-0' at position -1.
2022-03-21 17:59:12,160 INFO  [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Peter Jackson, genre=Fantasy, id=1, name=The Hobbit]
2022-03-21 17:59:12,164 INFO  [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Jonathan Frakes, genre=Space, id=2, name=Star Trek: First Contact]
2022-03-21 17:59:12,165 INFO  [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Jared Bush, genre=Animation, id=3, name=Encanto]
2022-03-21 17:59:12,166 INFO  [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella]
2022-03-21 17:59:12,167 INFO  [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Garth Jennings, genre=Jukebox Musical Comedy, id=5, name=Sing 2]

External Kafka Broker

You can also utilize your own Kafka broker, just by configuring the kafka.bootstrap.servers property in the application.properties file.

kafka-bootstrap.servers=kafka:9092

Conclusions

So far, you’ve seen that it is easy to connect a Quarkus application to Apache Kafka and start producing and consuming messages/events from a topic. Consuming messages can be simple; you get them as long as they are produced, but nothing more. What if you need real-time processing of the data (for example, filtering events or manipulating events)? What if you need to make some correlations between events (playedmovie event has the id of the movie, but how could you join with the Movie topic to get the movie name)?

Of course, you could start developing ad-hoc code to manipulate all sent data. Still, the Kafka Streams project helps you consume real-time streams of events as they are produced, apply any transformation, join streams, etc., and optionally write new data representations back to a topic.

Kafka Streams is a big topic and shines for its versatility to solve real-time processing problems. We’ll dedicate an entire post about Kafka Streams and Quarkus. Stay tuned.

About the Author

Rate this Article

Adoption
Style

BT