BT

Diffuser les Connaissances et l'Innovation dans le Développement Logiciel d'Entreprise

Contribuez

Sujets

Sélectionner votre région

Accueil InfoQ Articles Premiers Pas Avec Quarkus Reactive Messaging Et Apache Kafka

Premiers Pas Avec Quarkus Reactive Messaging Et Apache Kafka

Points Clés

  • Apache Kafka (ou Kafka) est un magasin d'événements distribué et une plate-forme de traitement de flux pour stocker, consommer et traiter des flux de données.
  • Quarkus s'intègre parfaitement aux applications réactives qui utilisent Apache Kafka pour le traitement des événements.
  • Quarkus Dev Services vous offre un moyen automatique de démarrer un cluster Kafka pour tester votre application.
  • Apache Kafka peut remplacer les systèmes de messagerie comme JMS, en particulier lorsque vous avez besoin d'un traitement de flux en temps réel.

Qu'est-ce qu'Apache Kafka et pourquoi ?

La façon dont les données sont traitées/consommées aujourd'hui est différente de la façon dont elles étaient pratiquées auparavant. Dans le passé, les données étaient stockées dans une base de données et traitées par lots pour obtenir des résultats.

Bien que cette approche soit correcte, des plates-formes plus modernes vous permettent de traiter les données en temps réel au fur et à mesure que les données arrivent dans le système.

Apache Kafka (ou Kafka) est un magasin d'événements distribué et une plate-forme de traitement de flux pour le stockage, la consommation et le traitement de flux de données.

Il y a cinq concepts essentiels dans Kafka pour comprendre son fonctionnement :

  • Event (ou message) : un événement est une paire clé-valeur horodatée représentant les données stockées dans le système à traiter. Du point de vue de Kafka, ce n'est qu'un morceau d'octets.
  • Partition : une partition est l'endroit où les événements sont produits et consommés. Dans une partition, l'ordre des événements est garanti.
  • Topic : un topic est composé d'une ou plusieurs partitions. Le topic est l'unité de travail avec laquelle les développeurs travaillent pour consommer ou produire des événements.
  • Consumer : un consommateur s'abonne à un topic et est averti chaque fois qu'un événement est publié dans un topic.
  • Producer : un producteur publie un événement dans un topic (en fait l'une des partitions appartenant au topic).

L'un des aspects clés d'Apache Kafka est qu'il a été créé dans un souci d'évolutivité et de tolérance aux pannes, ce qui le rend approprié pour les applications hautes performances. Kafka peut être considéré comme un remplaçant de certains systèmes de messagerie conventionnels tels que Java Message Service (JMS) et Advanced Message Queuing Protocol (AMQP).

Apache Kafka a des intégrations avec la plupart des langages utilisés de nos jours, mais dans cet article, nous aborderons son intégration avec Java, en particulier avec la stack Java Quarkus.

Qu'est-ce que Quarkus ?

Quarkus est un framework Java complet et Kubernetes-native conçu pour les machines virtuelles Java (JVM) et la compilation native, optimisant Java spécifiquement pour les conteneurs et lui permettant de devenir une plate-forme efficace pour les environnements serverless, cloud et Kubernetes.

Au lieu de réinventer la roue, Quarkus utilise des frameworks d'entreprise bien connus soutenus par des normes/spécifications et les rend compilables en binaire à l'aide de GraalVM.

Comment intégrer Kafka dans Quarkus

Quarkus utilise le projet SmallRye Reactive Messaging pour interagir avec Apache Kafka.

Premiers pas avec Quarkus

Le moyen le plus rapide de commencer à utiliser Quarkus consiste à ajouter les dépendances requises via la page de démarrage. Chaque service peut avoir des dépendances différentes, et vous pouvez choisir entre Java 11 ou Java 17. Pour l'intégration entre Quarkus et Kafka, vous devez ajouter l'extension SmallRye Reactive Messaging - Kafka Connector.

L'application en cours de développement

Supposons que nous soyons une société de streaming de films et qu'un cas d'utilisation consiste à stocker des films. Cela pourrait être réalisé en utilisant une base de données traditionnelle, c'est vrai, mais étant donné qu'une excellente expérience utilisateur nécessite une interaction en temps réel, nous avons décidé de les stocker dans Kafka.

Il existe donc deux services : un qui produit un événement chaque fois qu'un utilisateur arrête de lire un film et un autre service consomme ces événements et les affiche/diffuse en tant qu'événements côté serveur.

La figure suivante montre l'architecture :

Implémentons ces services dans Quarkus et expliquons un peu plus les détails internes.

Le producteur de films diffusés

Chaque fois qu'un utilisateur arrête un film, ce service envoie un événement au topic Kafka PlaytimeMovies. L'événement contient l'ID du film et la durée totale de visionnage. À des fins de démonstration, une minuterie déclenche automatiquement la logique simulant si un utilisateur a regardé un film.

Lorsque le service démarre, il produit des films sur le topic Kafka Movies.

La création du projet

Accédez à la page de démarrage de Quarkus et sélectionnez l'extension smallrye-reactive-messaging-kafka pour l'intégration à Kafka. Sélectionnez ensuite l'extension Jackson pour les événements de marshaling/unmarshaling depuis/vers JSON-tableau d'octets Java. Décochez également l'option de génération Starter Code.

Dans la capture d'écran suivante, vous pouvez le voir :

Vous avez la possibilité d'ignorer cette étape manuelle et d'accéder au lien Kafka Quarkus Generator où ils sont tous sélectionnés. Appuyez ensuite sur le bouton Generate your application pour télécharger le fichier zip généré de l'application.

Décompressez le fichier et ouvrez le projet dans votre IDE préféré.

Le développement

Créons deux POJO, l'un représentant un Movie et l'autre un PlayedMovie.

public class Movie {
  
   public int id;
   public String name;
   public String director;
   public String genre;
 
   public Movie(int id, String name, String director, String genre) {
       this.id = id;
       this.name = name;
       this.director = director;
       this.genre = genre;
   }
}

Un film (movie) contient un id, un nom (name), le réalisateur (director) et le genre du film.

public class MoviePlayed {
  
   public int id;
   public long duration;
 
   public MoviePlayed(int id, long duration) {
       this.id = id;
       this.duration = duration;
   }
}

Un film lu (MoviePlayed) contient un id faisant référence à l'identifiant du film lu et à la durée (duration) pendant laquelle l'utilisateur a regardé le film.

Nous aurons besoin d'une nouvelle classe nommée MovieKafkaGenerator qui a la responsabilité de stocker les films dans un topic Kafka et de simuler les films lus.

Deux classes sont initialement requises pour commencer à émettre des événements vers un topic ; l'une est l'annotation, @Outgoing, utilisée pour spécifier où envoyer les événements sous la forme d'un channel qui sera configuré pour pointer vers le topic Kafka Movies, et la classe Record, représentant un wrapper de l'événement où la clé et la valeur sont spécifiées.

Créons maintenant la classe MovieKafkaGenerator pour produire les films dans le topic Kafka Movies.

package org.acme.movieplays;
 
import java.time.Duration;
import java.util.List;
import java.util.Random;
 
import javax.enterprise.context.ApplicationScoped;
 
import org.eclipse.microprofile.reactive.messaging.Outgoing;
 
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.kafka.Record;
 
@ApplicationScoped
public class MovieKafkaGenerator {
 
   private List<Movie> movies = List.of(
       new Movie(1, "The Hobbit", "Peter Jackson", "Fantasy"),
       new Movie(2, "Star Trek: First Contact", "Jonathan Frakes", "Space"),
       new Movie(3, "Encanto", "Jared Bush", "Animation"),
       new Movie(4, "Cruella", "Craig Gillespie", "Crime Comedy"),
       new Movie(5, "Sing 2", "Garth Jennings", "Jukebox Musical Comedy")
   );
 
   // Populates movies into Kafka topic
   @Outgoing("movies")                                         
   public Multi<Record<Integer, Movie>> movies() {
       return Multi.createFrom().items(movies.stream()
               .map(m -> Record.of(m.id, m))
       );
   }
}

Il y a plusieurs choses importantes à remarquer dans cette classe :

  • La classe est une classe CDI de portée application (@ApplicationScoped)
  • Par souci de simplicité, les films sont définis dans une liste.
  • L'annotation @Outgoing est utilisée pour définir où envoyer les événements (channel movies). Les éléments de retour de la méthode movies() sont automatiquement envoyés au channel défini. Le type de retour peut être un type réactif/asynchrone (généralement io.smallrye.mutiny.Multi dans Quarkus) enveloppant le contenu de l'événement. Le channel est ensuite configuré pour pointer vers un topic.
  • Le Record (c'est l'événement/message) a l'ID du film comme clé et l'objet du film comme valeur.

La dernière étape consiste à configurer les paramètres Quarkus pour se connecter à l'instance Kafka. Les applications Quarkus sont configurées dans le fichier application.properties qui se trouve dans le répertoire src/main/resources/.

Vous pouvez facilement configurer la relation entre le channel et le topic via la propriété générique suivante :

mp.messaging.outgoing.<channel_name>.topic=<topic>

Dans notre exemple d'application, cela serait défini comme :

mp.messaging.outgoing.movies.topic=movies

Vous vous demandez peut-être où l'emplacement du broker Kafka est configuré ? À des fins de développement local, vous n'en avez pas besoin car Quarkus propose sa fonctionnalité Dev Services pour Kafka. Les Dev Services fournissent une instance d'une dépendance externe requise (c'est-à-dire une instance de base de données, un broker Kafka, un service Keycloak, etc.) dans un environnement d'exécution de conteneur fonctionnel, tel que Podman ou tout autre outil compatible OCI. Du point de vue d'un développeur, si vous incluez une extension et ne la configurez pas, Quarkus démarrera automatiquement le service et configurera l'application pour l'utiliser.

Pour cette raison, nous n'avons besoin d'aucun autre paramètre de configuration pendant le cycle de développement. Quarkus le fera pour nous.

IMPORTANT : vous devez disposer d'un hôte Docker en cours d'exécution sur votre machine locale pour exécuter l'exemple. Si vous n'en avez pas, mais que vous avez un broker Kafka déployé, nous verrons plus tard comment configurer cette instance "distante" dans une application Quarkus.

Avec l'hôte Docker opérationnel, démarrez l'application en mode dev de Quarkus à partir d'un terminal :

./mvnw compile quarkus:dev

Et la sortie du terminal devrait être quelque chose comme :

[INFO] Scanning for projects...
[INFO]
[INFO] -------------------< org.acme:movie-plays-producer >--------------------
[INFO] Building movie-plays-producer 1.0.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
….
2022-03-21 11:37:24,474 INFO  [io.qua.sma.dep.processor] (build-8) Configuring the channel 'movies' to be managed by the connector 'smallrye-kafka'
2022-03-21 11:37:24,483 INFO  [io.qua.sma.rea.kaf.dep.SmallRyeReactiveMessagingKafkaProcessor] (build-30) Generating Jackson serializer for type org.acme.movieplays.Movie

--
--
Checking Docker Environment 2022-03-21 11:37:25,018 INFO  [org.tes.uti.ImageNameSubstitut

--
2022-03-21 11:37:28,500 INFO  [io.qua.kaf.cli.dep.DevServicesKafkaProcessor] (build-22) Dev Services for Kafka started. Other Quarkus applications in dev mode will find the broker automatically. For Quarkus applications in production mode, you can connect to this by starting your application with -Dkafka.bootstrap.servers=OUTSIDE://localhost:32769

2022-03-21 11:37:29,581 INFO  [io.quarkus] (Quarkus Main Thread) movie-plays-producer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.7.3.Final) started in 6.666s.
2022-03-21 11:37:29,582 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2022-03-21 11:37:29,582 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kafka-client, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]

L'application est compilée, puis configure automatiquement le sérialiseur Jackson (rappelez-vous que nous avons ajouté l'extension au début) pour sérialiser l'objet Movie dans un tableau d'octets à stocker dans le topic Kafka. Ensuite, le broker Kafka est démarré automatiquement sur localhost:32769, et l'application est automatiquement configurée pour s'y connecter. Enfin, l'application est opérationnelle et tous les films sont insérés dans le topic Kafka Movies.

Nous pouvons utiliser l'outil kcat pour inspecter le contenu du topic. Exécutez la commande suivante dans une fenêtre de terminal en remplaçant l'adresse du broker Kafka par la vôtre :

kcat -b localhost:32769 -t movies -C -K:

:{"id":1,"name":"The Hobbit","director":"Peter Jackson","genre":"Fantasy"}
:{"id":2,"name":"Star Trek: First Contact","director":"Jonathan Frakes","genre":"Space"}
:{"id":3,"name":"Encanto","director":"Jared Bush","genre":"Animation"}
:{"id":4,"name":"Cruella","director":"Craig Gillespie","genre":"Crime Comedy"}
:{"id":5,"name":"Sing 2","director":"Garth Jennings","genre":"Jukebox Musical Comedy"}
% Reached end of topic movies [0] at offset 5

Arrêtez l'application et ajoutons la partie qui génère les films lus.

Ouvrez la classe MovieKafkaGenerator et ajoutez le code suivant :

private Random random = new Random();
 
@Inject
Logger logger;
 
@Outgoing("play-time-movies")
public Multi<Record<String, PlayedMovie>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofMillis(1000))   
            .onOverflow().drop()
            .map(tick -> {
                Movie movie = movies.get(random.nextInt(movies.size()));
                int time = random.nextInt(300);
                logger.info("movie {0} played for {1} minutes", movie.name, time);
                // Region as key
                return Record.of("eu", new PlayedMovie(movie.id, time));
    });
   }

Il y a plusieurs choses importantes à noter dans cette nouvelle méthode :

  • Les événements sont générés sur le channel play-time-movies.
  • Un nouvel événement est déclenché toutes les secondes.
  • Un film est sélectionné au hasard dans la map et la méthode lui attribue une durée de lecture aléatoire.
  • Un Record (événement/message) est créé, dans ce cas, la clé représente la région de l'utilisateur, et la valeur est l'objet PlayedMovie.

Enfin, ouvrez le fichier application.properties et configurez le nouveau channel :

mp.messaging.outgoing.play-time-movies.topic=playtimemovies

Redémarrez l'application et l'application générera un nouvel événement toutes les secondes.

./mvnw compile quarkus:dev

2022-03-21 12:36:01,297 INFO  [io.sma.rea.mes.kafka] (Quarkus Main Thread) SRMSG18258: Kafka producer kafka-producer-play-time-movies, connected to Kafka brokers 'OUTSIDE://localhost:32771', is configured to write records to 'playtimemovies'

2022-03-21 12:36:01,835 INFO  [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Cruella played for 148 minutes
2022-03-21 12:36:02,336 INFO  [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Star Trek: First Contact played for 288 minutes
2022-03-21 12:36:02,836 INFO  [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Cruella played for 176 minutes

La console affiche les lignes du journal chaque fois qu'un nouvel événement est publié. Utilisons l'outil kcat pour inspecter le contenu du topic.

kcat -b localhost:32773 -t playtimemovies -C -K:

eu:{"id":4,"duration":88}
eu:{"id":3,"duration":291}
eu:{"id":1,"duration":228}
eu:{"id":2,"duration":165}
eu:{"id":1,"duration":170}
eu:{"id":4,"duration":75}

Le consommateur de films diffusés

Ce service est chargé de consommer les événements d'un topic Kafka. L'événement consommé est transmis à l'appelant à l'aide d'événements côté serveur HTTP. L'événement correspond aux données du film lu contenant l'ID du film lu et la durée totale de visionnage.

La création du projet

Accédez à la page de démarrage de Quarkus et sélectionnez resteasy-reactive-jackson pour implémenter les endpoints réactifs JAX-RS avec la prise en charge de Jackson pour le marshaling/unmarshaling Java objets vers/depuis JSON, et l'extension smallrye-reactive-messaging-kafka pour l'intégration avec Kafka. Décochez également l'option de génération Starter Code.

Encore une fois, vous avez la possibilité d'ignorer cette étape manuelle et d'accéder à ce qui suit : Kafka Quarkus Generator où tous sont déjà sélectionnés. Appuyez ensuite sur le bouton Generate your application pour télécharger le fichier zip généré de l'application.

Décompressez le fichier et ouvrez le projet dans votre IDE préféré.

Le développement

Ce service traite les événements PlayedMovie, créons donc un POJO simple pour cet élément :

public class PlayedMovie {
  
   public int id;
   public long duration;
 
   public MoviePlayed(int id, long duration) {
       this.id = id;
       this.duration = duration;
   }
}

Créez ensuite une nouvelle classe nommée PlayedMovieResource et écrivez un endpoint réactif JAX-RS pour diffuser les événements consommés à partir du topic Kafka.

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
 
import org.eclipse.microprofile.reactive.messaging.Channel;
 
import io.smallrye.mutiny.Multi;
 
@Path("/movies")
public class PlayedMovieResource {
 
   @Channel("played-movies")
   Multi<MoviePlayed> playedMovies;
 
   @GET
   @Produces(MediaType.SERVER_SENT_EVENTS)
   public Multi<PlayedMovie> stream() {
       return playedMovives;
   }
}

C'est une petite classe qui fait beaucoup :

  • Expose un endpoint HTTP à un endpoint /movies à l'aide de l'annotation @Path.
  • Traite les événements sur un channel nommé played-movies. Chaque fois qu'un nouvel événement est envoyé au channel (c'est-à-dire qu'un événement est publié dans un topic Kafka), il est automatiquement publié dans l'instance Multi.
  • Lorsque vous appelez le endpoint /movies à l'aide d'une méthode HTTP GET, l'application commence à diffuser les événements reçus dans le channel.

Enfin, configurez le channel dans le fichier application.properties pour configurer le channel (le topic et la stratégie d'offset) et changez le port d'écoute en 9090 afin qu'il n'entre pas en collision avec le service producteur qui utilise le port 8080.

mp.messaging.incoming.movies-played.topic=playtimemovies
mp.messaging.incoming.movies-played.auto.offset.reset=earliest
 
%dev.quarkus.http.port=9090

Ayant le service movie-player-producer opérationnel dans un terminal dédié, démarrons le movie-player-consumer. Dans une nouvelle fenêtre de terminal, exécutez le service en mode dev.

./mvnw compile quarkus:dev

[INFO] Scanning for projects...
[INFO]
[INFO] -------------------< org.acme:movie-plays-consumer >--------------------
[INFO] Building movie-plays-consumer 1.0.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
….
2022-03-21 17:59:08,079 INFO  [io.qua.sma.dep.processor] (build-13) Configuring the channel 'movies-played' to be managed by the connector 'smallrye-kafka'
2022-03-21 17:59:08,092 INFO  [io.qua.sma.rea.kaf.dep.SmallRyeReactiveMessagingKafkaProcessor] (build-33) Generating Jackson deserializer for type org.acme.movieplays.MoviePlayed
….
2022-03-21 17:59:10,617 INFO  [io.sma.rea.mes.kafka] (smallrye-kafka-consumer-thread-0) SRMSG18257: Kafka consumer kafka-consumer-movies, connected to Kafka brokers 'localhost:32771, belongs to the 'movie-plays-consumer'
….
2022-03-21 17:59:10,693 INFO  [io.quarkus] (Quarkus Main Thread) movie-plays-consumer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.7.3.Final) started in 4.114s. Listening on: http://localhost:9090
2022-03-21 17:59:10,693 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2022-03-21 17:59:10,694 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kafka-client, resteasy-reactive, resteasy-reactive-jackson, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]

L'application est compilée, puis configure automatiquement le désérialiseur Jackson (rappelez-vous que nous avons ajouté l'extension au début) pour désérialiser les objets du tableau d'octets stocké dans un topic Kafka vers un objet Java. L'application en cours d'exécution détecte un cluster Kafka déjà démarré et s'y connecte automatiquement. Enfin, l'application est lancée sur le port 9090.

Dans une nouvelle fenêtre de terminal, exécutez la commande curl suivante pour obtenir les données de diffusion :

curl -N localhost:9090/movies
data:{"id":4,"duration":213}

data:{"id":4,"duration":3}

data:{"id":3,"duration":96}

data:{"id":5,"duration":200}

data:{"id":2,"duration":234}

data:{"id":1,"duration":36}

data:{"id":1,"duration":162}

data:{"id":3,"duration":88}

Vous pouvez observer comment les données sont diffusées automatiquement à partir du topic Kafka et envoyées sous forme de requêtes HTTP.

L'exemple précédent avait deux objectifs : injecter un channel en tant qu'instance Multi pour recevoir les événements et envoyer ces événements à une méthode annotée avec @Incoming.

Arrêtez le service client et ajoutez le morceau de code suivant à la classe PlayedMovieResource pour consommer les événements du topic Kafka Movie :

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;


@Inject
Logger logger;
 
@Incoming("movies")
public void newMovie(Movie movie) {
   logger.infov("New movie: {0}", movie);
}

Dans ce cas, à chaque fois qu'un nouveau film est envoyé au channel movies (topic Movies), la méthode newMovie() est appelée. Le paramètre de la méthode est la charge utile de l'événement à partir du topic Kafka.

Configurez le channel dans le fichier application.properties pour qu'il pointe vers la rubrique Movies.

mp.messaging.incoming.movies.topic=movies
mp.messaging.incoming.movies.auto.offset.reset=earliest

Redémarrez maintenant le service movie-plays-consumer et remarquez quelques lignes de journal affichant la liste des films :

./mvnw compile quarkus:dev

[INFO] Scanning for projects...
[INFO]
[INFO] -------------------< org.acme:movie-plays-consumer >--------------------
[INFO] Building movie-plays-consumer 1.0.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
…
2022-03-21 17:59:12,146 INFO  [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-13) SRMSG18256: Initialize record store for topic-partition 'movies-0' at position -1.
2022-03-21 17:59:12,160 INFO  [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Peter Jackson, genre=Fantasy, id=1, name=The Hobbit]
2022-03-21 17:59:12,164 INFO  [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Jonathan Frakes, genre=Space, id=2, name=Star Trek: First Contact]
2022-03-21 17:59:12,165 INFO  [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Jared Bush, genre=Animation, id=3, name=Encanto]
2022-03-21 17:59:12,166 INFO  [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella]
2022-03-21 17:59:12,167 INFO  [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Garth Jennings, genre=Jukebox Musical Comedy, id=5, name=Sing 2]

Un broker Kafka externe

Vous pouvez également utiliser votre propre broker Kafka, simplement en configurant la propriété kafka.bootstrap.servers dans le fichier application.properties.

kafka-bootstrap.servers=kafka:9092

Conclusion

Jusqu'à présent, vous avez vu qu'il est facile de connecter une application Quarkus à Apache Kafka et de commencer à produire et à consommer des messages/événements à partir d'un topic. Consommer des messages peut être simple ; vous les obtenez tant qu'ils sont produits, mais rien de plus. Que faire si vous avez besoin d'un traitement en temps réel des données (par exemple, filtrer des événements ou manipuler des événements) ? Que faire si vous avez besoin de faire des corrélations entre les événements (l'événement playedmovie a l'id du film, mais comment pourriez-vous joindre le topic Movie pour obtenir le nom du film) ?

Bien sûr, vous pouvez commencer à développer du code ad hoc pour manipuler toutes les données envoyées. Cependant, le projet Kafka Streams vous aide à consommer des flux d'événements en temps réel au fur et à mesure qu'ils sont produits, à appliquer n'importe quelle transformation, à joindre des flux, etc., et éventuellement à réécrire de nouvelles représentations de données dans un topic.

Kafka Streams est un sujet important et brille par sa polyvalence pour résoudre les problèmes de traitement en temps réel. Nous consacrerons un article entier à Kafka Streams et Quarkus. Restez à l'écoute.

 

Au sujet de l’Auteur

Contenu Éducatif

BT