BT

Diffuser les Connaissances et l'Innovation dans le Développement Logiciel d'Entreprise

Contribuez

Sujets

Sélectionner votre région

Accueil InfoQ Actualités MongoDB : la magie des Reactive-Streams avec les Capped Collections

MongoDB : la magie des Reactive-Streams avec les Capped Collections

MongoDB vient de publier son driver Java pour les traitements réactifs et non-bloquants. Bien que ce driver repose entièrement sur son homologue MongoDB Async Java Driver, il met pleinement en œuvre l'API reactive-streams pour fournir une interopérabilité transparente avec d'autres flux réactifs dans l'écosystème JVM.

En guise de rappel, MongoDB offre une fonctionnalité intéressante connexe : les capped collections sous forme d’un anneau circulaire de taille fixe. Le présent article essaie d’exploiter les capped collections d’une manière réactive.

Avant les reactive-streams

Tugdual Grall a publié un article sur l’utilisation des capped collections avec les tailables cursors. Les 'tailables cursors' représentent un moyen permettant la consommation des données générées à partir des capped collections au fil de l’eau. Tug a fait le tour des différentes implémentations en Node.js, Java et Scala.

Reprenons sa modélisation Java :

 

       MongoClient mongoClient = new MongoClient();
       DBCollection coll = mongoClient.getDB("chat").getCollection("messages");
       DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural"1).get())
               .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);
       System.out.println("== open cursor ==");
       Runnable task = () -> {
           System.out.println("\tWaiting for events");
           while (cur.hasNext()) {
               DBObject obj = cur.next();
               System.out.println( obj );
           }
       };
       new Thread(task).start();

 

Bien que l’implémentation est asynchrone, il est remarquable que l'opération cur.hasNext() est malheureusement bloquante (initialisée au moment de la création du curseur).

Les reactive-streams

Avant de débarquer avec les capped collections, commençons par définir notre Subscriber.

 

    class SubscriberImpl implements Subscriber<Document> {
        @Override
        public void onSubscribe(Subscription s) {
             s.request(Long.MAX_VALUE);
        }

        @Override
        public void onNext(Document document) {
            System.out.println(document.toJson());
        }

        @Override
        public void onError(Throwable thrwbl) {
        }

        @Override
        public void onComplete() {
        }
    }

 

Au moment de la souscription, cette classe va prendre le relais conformément au protocole suivant :

onSubscribe onNext* (onError | onComplete)?

En résumé, cela signifie que la méthode onSubscribe est toujours invoquée, suivie d'un nombre éventuellement illimité de signaux onNext (on affiche les documents comme l’exemple précédent), suivi par un signal onError s'il y a un échec, ou un signal onComplete lorsque plus aucun élément n’est disponible.

Avec les reactive-streams, pas de majeurs changements par rapport au driver standard (sauf au niveau des imports) :

 

    import com.mongodb.reactivestreams.client.*;

        MongoClient mongoClient = MongoClients.create();
        MongoDatabase database = mongoClient.getDatabase("chat");
        MongoCollection<Document> messages = database.getCollection("messages");

 

Les reactive-streams entrent en jeu avec :

 

FindPublisher<Document> findPublisher = messages.find().cursorType(CursorType.Tailable);

 

L’invocation de MongoCollection.find retourne un Publisher au sens reactive-streams au lieu d'un curseur. findPublisher va servir à streamer les documents de notre capped collection messages pour tout subscriber ceux voulant être notifiés.

C’est le moment de brancher notre SubscriberImpl :

 

findPublisher.subscribe(new SubscriberImpl());

 

Désormais, toutes les insertions dans la collection messages seront automatiquement reflétées sur la console et c’est toute la magie des reactive-streams.

db.messages.insert({ "name" : "MongoDB", "type" : "database" })

Conclusion

Ce qui est intéressant avec le driver reactive-streams, c'est qu'il n'y a plus besoin de faire des itérations bloquantes. Le streaming des flux de données est traité d’une manière déclarative, asynchrone et non bloquante.

Références

Contenu Éducatif

BT