L'histoire commence avec une idée simple: créer un outil de développement sympathique, un framework de communication inter-thread simple et léger sans verrouillages, synchroniseurs, sémaphores, temps d'attentes, notifications; pas de files d'attente, de messages, d’événements ou tout autres termes ou outils spécifiques et concurrentiels. Seulement obtenir des communications POJO à travers de bonnes vieilles interfaces Java.
Cela pourrait être quelque chose de similaire au modèle d'acteurs Akka, mais c'est peut être exagéré car le nouveau framework devra être ultra-léger, et optimisé pour une communication inter-thread sur un seul ordinateur multi-core.
Le framework Akka est idéal pour des communications entre process lorsque les acteurs se croisent au travers de différentes instances de JVM sur la machine ou sur des machines distribuées sur un réseau.
Néanmoins, il est peut-être excessif d'utiliser les acteurs typés Akka pour les petits projets, quand on souhaite seulement une communication inter-thread mais toujours en se basant sur le modèle d'acteurs.
J'ai développé une solution en quelques jours en utilisant des proxys dynamiques, des files d'attente de verrouillage et un pool de threads en cache.
La figure 1 présente l'architecture haut niveau du framework:
Figure 1 : Architecture haut niveau du framework
La file d'attente SPSC est une file Single Producer/Single Consumer. La file d'attente MPSC est un Multi Producer/Single Consumer.
Le Dispatcher Thread reçoit des messages des Actor Threads et les envoie dans la file d'attente SPSC appropriée.
Les Actor Threads, en utilisant les données d'un message reçu, invoquent une méthode correspondante aux instances de l'acteur. En utilisant des proxys d'autres acteurs, les instances d'acteur envoient des messages à la file MPSC et les messages passent à l'Acteur Thread ciblé.
Comme simple test, j'ai créé un exemple de ping-pong :
public interface PlayerA (
void pong(long ball); //send and forget method call
}
public interface PlayerB {
void ping(PlayerA playerA, long ball); //send and forget method call
}
public class PlayerAImpl implements PlayerA {
@Override
@ublic void pong(long ball) {
}
}
public class PlayerBImpl implements PlayerB {
@Override
public void ping(PlayerA playerA, long ball) {
playerA.pong(ball);
}
}
public class PingPongExample {
public void testPingPong() {
// this manager hides the complexity of inter-thread communications
// and it takes control over actor proxies, actor implementations and threads
ActorManager manager = new ActorManager();
// registers actor implementations inside the manager
manager.registerImpl(PlayerAImpl.class);
manager.registerImpl(PlayerBImpl.class);
//Create actor proxies. Proxies convert method calls into internal messages
//which would be sent between threads to a specific actor instance.
PlayerA playerA = manager.createActor(PlayerA.class);
PlayerB playerB = manager.createActor(PlayerB.class);
for(int i = 0; i < 1000000; i++) {
playerB.ping(playerA, i);
}
}
La vitesse des échanges était d'environ 500 000 Ping/Pongs par seconde; jusqu'ici tout allait bien. Cependant, comparé à la vitesse d'exécution quand on utilise un seul thread, cela ne semble tout à coup pas très bon. Le code s'exécutant dans un seul thread peut effectuer plus de 2 milliards (2681850373) d'opérations par seconde !
La différence est plus de 5000 fois supérieure. Cela m'a déçu. Cela démontre qu'un code mono-thread est plus efficace qu'un code multi-thread dans de nombreux cas.
J'ai commencé à rechercher les raisons de la lenteur de mes joueurs de ping-pong. Après quelques recherches et tests, j'ai découvert que les files d'attente de verrouillage que j'ai utilisé pour transmettre des messages entre les acteurs ont une incidence sur les performances.
Figure 2 : Une file d'attente SPSC avec un unique Producer et un unique Consumer
J'ai donc commencé la quête de l'implémentation de la file d'attente la plus rapide en Java. J'ai trouvé le super blog de Nitsan Wakart. Il a fait plusieurs posts qui décrivent l'implémentation de file Single Producer/Single Consumer (SPSC) sans verrouillage. Les posts étaient basés sur la présentation de Martin Thompson Lock-Free Algorithms for Ultimate Performance.
Les files sans verrouillage offrent de meilleures performances en comparaison aux files verrouillées. Dans le cas des files basées sur le verrouillage quand un thread est verrouillé, les autres échanges seront bloqués tant que le verrou n'aura pas été retiré. Dans le cas d'algorithmes sans verrou, un thread producteur peut produire des messages sans bloquer les autres threads producteurs et les receveurs ne sont pas bloqués par d'autres receveurs quand ils déroulent la file d'attente.
Les résultats de performance des files SPSC décrits dans la présentation de Martin Thompson et dans le blog de Nitsan étaient incroyables - plus de 100M op/s. C'est 10 fois plus rapide que les implémentations de pile concurrente du JDK (dont la performance sur un Intel Core i7 avec 4 noyaux est autour de 8M op/s).
Avec beaucoup d'excitation, j'ai remplacé tous les liens bloquant connectés aux acteurs par des implémentations de files SPSC sans verrou. Malheureusement, les tests de performance n'ont pas indiqué d'amélioration significative du débit. Il n'a pas fallu longtemps pour comprendre que la partie handicapante n'était pas la file SPSC mais la file Multi Producer/Single Consumer (MPSC).
Utiliser des files SPSC comme des MPSC n'est pas une tâche aisée; plusieurs producteurs peuvent se remplacer les uns les autres par le biais d'opérations de transfert.
Les files SPSC n'ont tout simplement pas le code permettant les opérations de transfert par plusieurs producteurs. Par conséquent, même les files SPSC les plus rapides ne sauraient résoudre ce problème.
Pour le MPSC, j'ai décidé de tirer partie de LMAX Disruptor - une librairie de messagerie inter-thread à haute performance basée sur un tampon circulaire.
Figure 3 : LMAX Disruptor avec un unique producteur et un unique receveur
En utilisant Disruptor, il est facile d'obtenir une très faible latence, et une communication inter-thread à très haut débit. Il fournit également des cas d'utilisations pour différentes combinaisons de producteurs et de receveurs. Plusieurs threads peuvent lire à partir du buffer circulaire sans bloquer les autres:
Figure 4 : LMAX Disruptor avec un unique producteur et deux receveurs
Un cas où plusieurs producteurs écrivent dans le buffer circulaire avec de multiples receveurs recevant leurs messages.
Figure 5 : LMAX Disruptor avec deux producteurs et deux receveurs
Après une recherche rapide sur des tests de performance, j'ai trouvé un test de débit pour trois producteurs et un receveur. C'était juste ce qui était prescrit et les résultats produits sont les suivants:
LinkedBlockingQueue | Disruptor | |
---|---|---|
Run 0 | 4,550,625 ops/sec | 11,487,650 ops/sec |
Run 1 | 4,651,162 ops/sec | 11,049,723 ops/sec |
Run 2 | 4,404,316 ops/sec | 11,142,061 ops/sec |
Le Disruptor était plus de deux fois plus rapide que le LinkedBlockingQueue pour le cas 3 producteurs / 1 receveur. Néanmoins, ce fut encore un long chemin par rapport à mes attentes pour produire des résultats 10 fois plus performants. J'étais frustré par cet état de faits et mon esprit était focalisé sur la recherche d'une solution. Comme si c'était le destin, j'avais récemment modifié mon trajet en utilisant une station de métro à la place du covoiturage. Soudainement, une illumination est arrivée et mon esprit a modélisé les stations avec les producteurs et les receveurs. Pour une station, on a deux producteurs (sous la forme d'un wagon avec des personnes sortant de celui-ci) et les receveurs (le même wagon avec des gens qui y entrent.) J'ai créé une classe Railway et utilisé AtomicLong pour suivre le "train" quand il passe de station en station. Comme scénario simple, j'ai commencé avec un chemin de fer contenant un train unique.
public class RailWay {
private final Train train = new Train();
// the stationNo tracks the train and defines which station has the received train
private final AtomicInteger stationIndex = new AtomicInteger();
// Multiple threads access this method and wait for the train on the specific station.
public Train waitTrainOnStation(final int stationNo) {
while (stationIndex.get() % stationCount != stationNo) {
Thread.yield(); // this is necessary to keep a high throughput of message passing.
//But it eats CPU cycles while waiting for a train
}
// the busy loop returns only when the station number will match
//stationIndex.get() % stationCount condition
return train;
}
// this method moves this train to the next station by incrementing the train station index…
public void sendTrain() {
stationIndex.getAndIncrement();
}
}
A des fins de test, j'ai utilisé les mêmes conditions que celles utilisées dans les tests de performance pour Disruptor et pour les files d'attente SPSC - des tests de transfert entre les threads pour les valeurs longues. J'ai créé la classe Train suivante, qui contient une liste de long :
public class Train {
//
public static int CAPACITY = 2*1024;
private final long[] goodsArray; // array to transfer freight goods
private int index;
public Train() {
goodsArray = new long[CAPACITY];
}
public int goodsCount() { // returns the count of goods
return index;
}
public void addGoods(long i) { // adds item to the train
goodsArray[index++] = i;
}
public long getGoods(int i) { //removes the item from the train
index--;
return goodsArray[i];
}
}
Puis j'ai écrit un test simple : deux threads de long qui transfèrent de l'un à l'autre pour un train.
Figure 6 : Railway avec un producteur simple et un receveur simple utilisant un seul train
public void testRailWay() {
final Railway railway = new Railway();
final long n = 20000000000l;
//starting a consumer thread
new Thread() {
long lastValue = 0;
@Override
public void run() {
while (lastValue < n) {
Train train = railway.waitTrainOnStation(1); //waits for the train at the station #1
int count = train.goodsCount();
for (int i = 0; i < count; i++) {
lastValue = train.getGoods(i); // unload goods
}
railway.sendTrain(); //sends the current train to the first station.
}
}
}.start();
final long start = System.nanoTime();
long i = 0;
while (i < n) {
Train train = railway.waitTrainOnStation(0); // waits for the train on the station #0
int capacity = train.getCapacity();
for (int j = 0; j < capacity; j++) {
train.addGoods((int)i++); // adds goods to the train
}
railway.sendTrain();
if (i % 100000000 == 0) { //measures the performance per each 100M items
final long duration = System.nanoTime() - start;|
final long ops = (i * 1000L * 1000L * 1000L) / duration;
System.out.format("ops/sec = %,d\n", ops);
System.out.format("trains/sec = %,d\n", ops / Train.CAPACITY);
System.out.format("latency nanos = %.3f%n\n",
duration / (float)(i) * (float) Train.CAPACITY);
}
}
}
En exécutant le test avec différentes capacités de train, les résultats m'ont surpris :
Capacity | Throughput: ops/sec | Latency: ns |
---|---|---|
1 | 5,190,883 | 192.6 |
2 | 10,282,820 | 194.5 |
32 | 104,878,614 | 305.1 |
256 | 344,614,640 | 742. 9 |
2048 | 608,112,493 | 3,367.8 |
32768 | 767,028,751 | 42,720.7 |
Le taux de transfert entre deux threads atteint 767,028,751 ops/sec avec une capacité de train d'une longueur de 32,768. C'est beaucoup plus rapide que les files SPSC du blog de Nitsan.
Poursuivant sur ma vision du train sur le chemin de fer, je me demandais ce qu'il se passerait si on avait deux trains. J'ai senti que cela devrait améliorer le débit et réduire dans le même temps la latence. Chaque station devait avoir son propre train. Pendant qu'un train chargera les marchandises à la première station, le second train se chargera de le décharger à la station suivante, et ainsi de suite.
Figure 7 : Railway utilisant deux trains avec un seul producteur et un seul receveur
Voici les résultats concernant le débit :
Capacity | Throughput: ops/sec | Latency: ns |
---|---|---|
1 | 7,492,684 | 133.5 |
2 | 14,754,786 | 135.5 |
32 | 174,227,656 | 183.7 |
256 | 613,555,475 | 417.2 |
2048 | 940,144,900 | 2,178.4 |
32768 | 797,806,764 | 41,072.6 |
Les résultats ont été étonnants; ce test s'est avéré 1,4 fois plus rapide que celui pour un seul train. Pour la capacité d'un train, la latence s'est réduite, passant de 192,6 à 133,5 nanosecondes; un signe clairement prometteur.
Mes expériences ne s'arrêtaient donc pas là. Le temps de latence de transfert des messages entre les threads pour la capacité d'un train de 2048 était - 2,178,4 nanosecondes, ce qui était beaucoup trop. Je me suis donc demandé comment le réduire et j'ai crée un cas avec plusieurs trains :
Figure 8 : Railway utilisant plusieurs trains avec un seul producteur et un seul receveur
J'ai également réduit la capacité des trains pour une valeur longue et commencé à jouer sur le nombre de trains. Voici les résultats des tests :
Train Count | Throughput: ops/sec | Latency: ns |
---|---|---|
2 | 10,917,951 | 91.6 |
32 | 31,233,310 | 32.0 |
256 | 42,791,962 | 23.4 |
1024 | 53,220,057 | 18.8 |
32768 | 71,812,166 | 13.9 |
Avec 32768 trains, la latence pour envoyer une valeur long entre les threads a été réduite de 13,9 nanosecondes. En jouant sur le nombre de trains et sur la capacité des trains, on peut atteindre un équilibre optimal entre la latence et le débit, à condition que la latence ne soit pas trop élevée et le débit pas trop bas.
Ces chiffres sont parfaits pour un seul producteur et un unique receveur (CDPS), mais comment peut-on atteindre les mêmes résultats pour plusieurs producteurs et receveurs ? La réponse était simple : ajouter plus de stations !
Figure 9 : Railway avec un seul producteur et deux receveurs
Chacun des threads attend le prochain train, puis charge/décharge les articles, et envoie le train à la station suivante. Le thread producteur charge les articles de la gare dans le train tandis que les receveurs les reçoivent. Les trains se déplacent constamment à travers le cercle, d'une station à l'autre.
Afin de tester le Single Producer/Multiple Consumer (SPMC), j'ai créé le test Railway avec 8 stations. Une station est dédiée à un seul producteur tandis que les 7 autres stations sont dédiées aux receveurs. Les résultats sont les suivants :
Pour un nombre de trains = 256 et une capacité du train = 32 :
ops/sec = **116,604,397**
latency nanos = 274.4
Pour un nombre de trains = 32 et une capacité du train = 256 :
ops/sec = **432,055,469**
latency nanos = 592.5
Comme on peut l'observer, même avec huit threads actifs, le test montre des résultats assez bons - 432,055,469 ops/sec avec 32 trains et une capacité de 256. Pendant le test, les coeurs du CPU étaient chargés à 100%.
Figure 10 : Utilisation du CPU durant le test Railway avec 8 stations
En jouant avec l'algorithme Railway, j'en ai presque oublié mon objectif; améliorer la performance du cas Multiple Producers/Single Consumer (MPSC).
Figure 11 : Railway avec trois producteurs et un receveur
J'ai créé un nouveau test avec 3 producteurs et 1 receveur. Chaque train retrace le cercle de station en station pendant que les producteurs ne chargent seulement qu'un tiers de la capacité de chaque train. Le receveur reçoit les trois chargements des producteurs par le biais des trains. Le test de performance nous donne les résultats suivants :
ops/sec = 162,597,109
trains/sec = 54,199,036
latency ns = 18.5
C'est assez bon. Les producteurs et le receveur tournent à une vitesse de 160M ops/sec.
Pour analyser les différences, le test suivant montre les résultats obtenus avec Disruptor pour le même cas - 3 producteurs et 1 receveur:
Run 0, Disruptor=11,467,889 ops/sec
Run 1, Disruptor=11,280,315 ops/sec
Run 2, Disruptor=11,286,681 ops/sec
Run 3, Disruptor=11,254,924 ops/sec
Ci-dessous, les résultats d'un autre test de Disruptor dans le cas 3P:1C avec des messages batch (10 messages pour un batch) :
Run 0, Disruptor=116,009,280 ops/sec
Run 1, Disruptor=128,205,128 ops/sec
Run 2, Disruptor=101,317,122 ops/sec
Run 3, Disruptor=98,716,683 ops/sec;
Et enfin, voici les résultats des tests pour Disruptor mais l'implémentation LinkedBlockingQueue du scénario 3P:1C :
Run 0, BlockingQueue=4,546,281 ops/sec
Run 1, BlockingQueue=4,508,769 ops/sec
Run 2, BlockingQueue=4,101,386 ops/sec
Run 3, BlockingQueue=4,124,561 ops/sec
Comme on peut le voir, l'approche Railway fournit un débit moyen de 162,597,109 ops/sec, alors que le meilleur résultat de Disruptor ne fournit que 128,205,128 ops/sec. Dans le cas de LinkedBlockingQueue, le meilleur résultat est de seulement 4,546,281 ops/sec.
L'algorithme Railway présente un moyen simple d'augmenter considérablement le débit pour les événements liés aux batchs.
De plus, le Railway pourrait être utilisé pour des cas vraiment complexes en mélangeant les producteurs et receveurs lorsque le même thread pourrait être utilisé pour transférer les messages, les traiter et restituer les résultats au cercle:
Figure 12 : Railway avec un mélange de producteurs et de receveurs
Et pour finir, je vais présenter le test d'optimisation du Single Producer/Single Consumer pour un ultra haut débit :
Figure 13 : Railway avec un producteur et un receveur
Il retourne les résultats moyens suivants : un débit de plus d'un milliard et demi (1,569,884,271) d'opérations par seconde et une latence égale à 1,3 microsecondes. Comme on peut le voir, les résultats du test sont du même ordre de grandeur que ceux du test avec un unique thread décrit au début de l'article qui étaient de 2,681,850,373 opérations par secondes.
A ce point, je vous laisse en tirer vos propres conclusions.
Dans un prochain article, j'espère démontrer comment sauvegarder l'algorithme Railway avec des files d'attente et des files de verrouillage pour différentes combinaisons de producteurs et de receveurs. Restez à l'écoute.
A propos de l'auteur
Aliaksei Papou est Ingénieur logiciel en chef et Architecte chez Specific Group, une société de développement logiciel située à Vienna, en Australie. Aliaksei a plus de 10 ans d'expérience dans le développement d'applications d'entreprise à une petite et grande échelle. Il a une forte conviction : l'écriture de code suivant la programmation concurrente ne doit pas être si difficile.