MongoDB vient de publier son driver Java pour les traitements réactifs et non-bloquants. Bien que ce driver repose entièrement sur son homologue MongoDB Async Java Driver, il met pleinement en œuvre l'API reactive-streams pour fournir une interopérabilité transparente avec d'autres flux réactifs dans l'écosystème JVM.
En guise de rappel, MongoDB offre une fonctionnalité intéressante connexe : les capped collections sous forme d’un anneau circulaire de taille fixe. Le présent article essaie d’exploiter les capped collections d’une manière réactive.
Avant les reactive-streams
Tugdual Grall a publié un article sur l’utilisation des capped collections avec les tailables cursors. Les 'tailables cursors' représentent un moyen permettant la consommation des données générées à partir des capped collections au fil de l’eau. Tug a fait le tour des différentes implémentations en Node.js, Java et Scala.
Reprenons sa modélisation Java :
MongoClient mongoClient = new MongoClient(); DBCollection coll = mongoClient.getDB("chat").getCollection("messages"); DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get()) .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA); System.out.println("== open cursor =="); Runnable task = () -> { System.out.println("\tWaiting for events"); while (cur.hasNext()) { DBObject obj = cur.next(); System.out.println( obj ); } }; new Thread(task).start();
Bien que l’implémentation est asynchrone, il est remarquable que l'opération cur.hasNext() est malheureusement bloquante (initialisée au moment de la création du curseur).
Les reactive-streams
Avant de débarquer avec les capped collections, commençons par définir notre Subscriber.
class SubscriberImpl implements Subscriber<Document> { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(Document document) { System.out.println(document.toJson()); } @Override public void onError(Throwable thrwbl) { } @Override public void onComplete() { } }
Au moment de la souscription, cette classe va prendre le relais conformément au protocole suivant :
onSubscribe onNext* (onError | onComplete)?
En résumé, cela signifie que la méthode onSubscribe est toujours invoquée, suivie d'un nombre éventuellement illimité de signaux onNext (on affiche les documents comme l’exemple précédent), suivi par un signal onError s'il y a un échec, ou un signal onComplete lorsque plus aucun élément n’est disponible.
Avec les reactive-streams, pas de majeurs changements par rapport au driver standard (sauf au niveau des imports) :
import com.mongodb.reactivestreams.client.*; MongoClient mongoClient = MongoClients.create(); MongoDatabase database = mongoClient.getDatabase("chat"); MongoCollection<Document> messages = database.getCollection("messages");
Les reactive-streams entrent en jeu avec :
FindPublisher<Document> findPublisher = messages.find().cursorType(CursorType.Tailable);
L’invocation de MongoCollection.find retourne un Publisher au sens reactive-streams au lieu d'un curseur. findPublisher va servir à streamer les documents de notre capped collection messages pour tout subscriber ceux voulant être notifiés.
C’est le moment de brancher notre SubscriberImpl :
findPublisher.subscribe(new SubscriberImpl());
Désormais, toutes les insertions dans la collection messages seront automatiquement reflétées sur la console et c’est toute la magie des reactive-streams.
db.messages.insert({ "name" : "MongoDB", "type" : "database" })
Conclusion
Ce qui est intéressant avec le driver reactive-streams, c'est qu'il n'y a plus besoin de faire des itérations bloquantes. Le streaming des flux de données est traité d’une manière déclarative, asynchrone et non bloquante.