The story begins with a simple idea: create a developer friendly, simple and lightweight inter-thread communication framework without using any locks, synchronizers, semaphores, waits, notifies; and no queues, messages, events or any other concurrency specific words or tools.
Just get POJOs communicating behind plain old Java interfaces.
It could be something similar to Akka typed actors, but that might be overkill as the new framework has to be ultra-lightweight, and optimized for inter-thread communication on a single multi-core computer.
The Akka framework is great for inter-process communications when actors cross process boundaries between different JVM instances on the same machine or on machines distributed across a network.
However it may be excessive to use Akka typed actors for smaller projects, where you need only inter-thread communication but you still want to stick with the typed actor approach.
I created one solution in a couple of days by using dynamic proxies, blocking queues and a cached thread pool.
Figure 1 shows the high-level architecture of the created framework:
Figure 1: High-Level Architecture of Framework
SPSC queue is a Single Producer/Single Consumer queue. MPSC queue is Multi Producer/Single Consumer.
The Dispatcher Thread receives messages from Actor Threads and sends them into appropriate SPSC queue.
Actor threads, using data from a received message, invoke a corresponding method of the actor instances. By using proxies of other actors, actor Instances send messages to the MPSC queue and then the messages go to the target Actor Thread.
For a simple test I created a ping-pong example:
public interface PlayerA ( void pong(long ball); //send and forget method call } public interface PlayerB { void ping(PlayerA playerA, long ball); //send and forget method call } public class PlayerAImpl implements PlayerA { @Override @ublic void pong(long ball) { } } public class PlayerBImpl implements PlayerB { @Override public void ping(PlayerA playerA, long ball) { playerA.pong(ball); } } public class PingPongExample { public void testPingPong() { // this manager hides the complexity of inter-thread communications // and it takes control over actor proxies, actor implementations and threads ActorManager manager = new ActorManager(); // registers actor implementations inside the manager manager.registerImpl(PlayerAImpl.class); manager.registerImpl(PlayerBImpl.class); //Create actor proxies. Proxies convert method calls into internal messages //which would be sent between threads to a specific actor instance. PlayerA playerA = manager.createActor(PlayerA.class); PlayerB playerB = manager.createActor(PlayerB.class); for(int i = 0; i < 1000000; i++) { playerB.ping(playerA, i); } }
The speed they play was around 500,000 ping/pongs a second; so far so good. However, when compared with the execution speed using just a single thread, it suddenly looks not so good. The code running in a single thread can perform more than 2 billion (2,681,850,373) operations per second!
The difference is more than 5,000 times. This disappointed me. It produces single threaded code that is more effective than multi-thread code in many cases.
I started looking for reasons for the slowness of my ping-pong players. After some investigation and testing I found that the blocking queues that I used to pass messages between actors were affecting performance.
Figure 2: SPSC queue with single producer and single consumer
So I launched a quest for one of the fastest queue implementations in Java as a replacement. I found a great blog by Nitsan Wakart. He has several posts describing some implementations of Single Producer/Single Consumer (SPSC) Lock-Free Queues. The posts were inspired by Martin Thompson’s presentation of Lock-Free Algorithms for Ultimate Performance.
Lock-Free queues provide better performance in comparison to queues based on lock primitives. In the case of lock based queues when one thread gets a lock, other threads will be blocked until the lock is free. In the case of lock free algorithms a producer thread can produce messages without blocking by other producer threads, and consumers will not be blocked by other consumers while reading from the queue.
The performance results of SPSC queues described in Martin Thompson’s presentation and in Nitsan’s blog were incredible - more than 100M ops/sec. It’s more than 10 times faster the JDK’s Concurrent Queue implementations (which performance on Intel Core i7 with 4 cores has been around 8M ops/sec).
With great anticipation I replaced the linked blocking queues connected to each actor with lock-free SPSC queue implementations. Sadly, the performance tests didn’t produce a significant improvement in throughput. It did not take long to realize that the bottleneck was not a SPSC queue but a Multi Producer/ Single Consumer (MPSC) one.
Using SPSC queues in a role of MPSC queue is not a straightforward task; multiple producers can overwrite each other’s values by doing a put operation. SPSC queues just do not have code controlling put operations by multiple producers. Therefore even the fastest SPSC queues would not fix my problem.
For the Multiple Producers/Single Consumer I decided to leverage LMAX Disruptor – a High Performance Inter-thread Messaging Library based on a ring buffer.
Figure 3: LMAX Disruptor with single producer and single consumer
By using Disruptor it’s easy to achieve very low-latency, high-throughput inter-thread message communication. It also provides use cases for different combination of producers and consumers. Several threads can read from the ring buffer without blocking each other:
Figure 4: LMAX Disruptor with single producer and two consumers
A scenario when multiple producers write into the ring buffer with multiple consumers getting messages from it.
Figure 5: LMAX Disruptor with two producers and two consumers
After a quick search for performance tests I found a throughput test for three publishers and one consumer. That was just what the doctor ordered and it produced the following results:
LinkedBlockingQueue |
Disruptor |
|
Run 0 |
4,550,625 ops/sec |
11,487,650 ops/sec |
Run 1 |
4,651,162 ops/sec |
11,049,723 ops/sec |
Run 2 |
4,404,316 ops/sec |
11,142,061 ops/sec |
The Disruptor was more than twice as fast as the LinkedBlockingQueue for the 3 Producers/1 Consumer case. However this was still a long way from my expectations of producing a 10 times improvement in performance results.
I was frustrated by this order of things and my mind was searching for a solution. As fate had it, I had recently modified my commute to use a subway instead of the old carpool. Suddenly a reverie came over me and my mind started mapping stations to producers and consumers. At one station we have both producers (in the form of a wagon with people exitng from it) and consumers (the same wagon with people who enter it.)
I created a Railway class and used AtomicLong to track the train as it passed from station to station. For a simple scenario I started with a single-train railway.
public class RailWay { private final Train train = new Train(); // the stationNo tracks the train and defines which station has the received train private final AtomicInteger stationIndex = new AtomicInteger(); // Multiple threads access this method and wait for the train on the specific station. public Train waitTrainOnStation(final int stationNo) { while (stationIndex.get() % stationCount != stationNo) { Thread.yield(); // this is necessary to keep a high throughput of message passing. //But it eats CPU cycles while waiting for a train } // the busy loop returns only when the station number will match // stationIndex.get() % stationCount condition return train; } // this method moves this train to the next station by incrementing the train station index… public void sendTrain() { stationIndex.getAndIncrement(); } }
For testing purposes I used the same conditions used in Disruptor performance tests and tests for SPSC queues - tests transfer long values between threads. I created the following Train class, which contains a long array:
public class Train { // public static int CAPACITY = 2*1024; private final long[] goodsArray; // array to transfer freight goods private int index; public Train() { goodsArray = new long[CAPACITY]; } public int goodsCount() { // returns the count of goods return index; } public void addGoods(long i) { // adds item to the train goodsArray[index++] = i; } public long getGoods(int i) { //removes the item from the train index--; return goodsArray[i]; } }
Then I wrote a simple test: two threads transfer longs between each other by a train.
Figure 6: Railway with single producer and single consumer uses single train
public void testRailWay() { final Railway railway = new Railway(); final long n = 20000000000l; //starting a consumer thread new Thread() { long lastValue = 0; @Override public void run() { while (lastValue < n) { Train train = railway.waitTrainOnStation(1); //waits for the train at the station #1 int count = train.goodsCount(); for (int i = 0; i < count; i++) { lastValue = train.getGoods(i); // unload goods } railway.sendTrain(); //sends the current train to the first station. } } }.start(); final long start = System.nanoTime(); long i = 0; while (i < n) { Train train = railway.waitTrainOnStation(0); // waits for the train on the station #0 int capacity = train.getCapacity(); for (int j = 0; j < capacity; j++) { train.addGoods((int)i++); // adds goods to the train } railway.sendTrain(); if (i % 100000000 == 0) { //measures the performance per each 100M items final long duration = System.nanoTime() - start;| final long ops = (i * 1000L * 1000L * 1000L) / duration; System.out.format("ops/sec = %,d\n", ops); System.out.format("trains/sec = %,d\n", ops / Train.CAPACITY); System.out.format("latency nanos = %.3f%n\n", duration / (float)(i) * (float) Train.CAPACITY); } } }
By running the test with different train capacity, the results surprised me:
Capacity |
Throughput: ops/sec |
Latency: ns |
1 |
5,190,883 |
192.6 |
2 |
10,282,820 |
194.5 |
32 |
104,878,614 |
305.1 |
256 |
344,614,640 |
742. 9 |
2048 |
608,112,493 |
3,367.8 |
32768 |
767,028,751 |
42,720.7 |
The throughput of transferring messages between two threads reached 767,028,751 ops/sec with train capacity in 32,768 longs. It’s several times faster than SPSC queues in Nitsan’s blog.
Continuing the railway train of thought, I considered what would happen if we would have two trains? I felt it should improve throughput and reduce latency at the same time. Every station will have its own train. While one train will be loading goods at the first station, the second train will unload goods at the second station and vice versa.
Figure 7: Railway with single producer and single consumer uses two trains
Here are the results for the throughput:
Capacity |
Throughput: ops/sec |
Latency: ns |
1 |
7,492,684 |
133.5 |
2 |
14,754,786 |
135.5 |
32 |
174,227,656 |
183.7 |
256 |
613,555,475 |
417.2 |
2048 |
940,144,900 |
2,178.4 |
32768 |
797,806,764 |
41,072.6 |
The results were amazing; it was more than 1.4 times faster than the test results for a single train. For the train capacity of one the latency was reduced from 192.6 nanoseconds to 133.5 nanoseconds; clearly a promising sign.
Therefore my experiments were not over. The latency of transferring messages between threads for the train capacity of 2048 was - 2,178.4 nanoseconds, which was too much. I was considering how to reduce that and created a case with many trains:
Figure 8: Railway with single producer and single consumer uses many trains
I also reduced the train capacity to one long value and started playing with the train count. Below are test results:
Train Count |
Throughput: ops/sec |
Latency: ns |
2 |
10,917,951 |
91.6 |
32 |
31,233,310 |
32.0 |
256 |
42,791,962 |
23.4 |
1024 |
53,220,057 |
18.8 |
32768 |
71,812,166 |
13.9 |
With 32,768 trains the latency of sending a long value between threads was reduced to 13.9 nanoseconds. Playing with the train count and train capacity the throughput and latency can be tuned up to optimal balance when the latency is not so high and the throughput is not so low.
These numbers are great for single producer and single consumer (SPSC); but how could we make that work for several producers and consumers? The answer was simple- add more stations!
Figure 9: Railway with single producer and two consumers
Every thread waits for the next train, then loads/unloads items, and sends the train to the next station. The producer thread puts items to the train while consumers get items from it. Trains constantly move by the circle from one station to another.
In order to test the Single Producer/Multiple Consumer (SPMC) case I created the Railway test with 8 stations. One station belongs to a single producer while other 7 stations belong to consumers. The results are:
For the train count = 256 and train capacity = 32:
ops/sec = 116,604,397 latency nanos = 274.4
For the train count = 32 and train capacity = 256:
ops/sec = 432,055,469 latency nanos = 592.5
As you can see even with eight working threads the test shows pretty good results - 432,055,469 ops/sec with 32 trains and a capacity of 256 longs. During the test all CPU cores were loaded to 100%.
Figure 10: CPU utilization during the Railway test with 8 stations
While playing with the Railway algorithm I almost forgot about my goal; to improve the performance of the Multiple Producers/Single Consumer case.
Figure 11: Railway with three producers and single consumer
I created a new test with 3 producers and 1 consumer. Each train traces the circle from station to station while each producer loads only 1/3 the capacity of each train. By every train the consumer gets all three items received from three producers. The performance test shows the following average results:
ops/sec = 162,597,109 trains/sec = 54,199,036 latency ns = 18.5
That’s pretty good. Producers and the consumer work at a speed of more than 160M ops/sec.
To fill the difference the following results show Disruptor test for the same case - 3 producers and 1 consumer:
Run 0, Disruptor=11,467,889 ops/sec Run 1, Disruptor=11,280,315 ops/sec Run 2, Disruptor=11,286,681 ops/sec Run 3, Disruptor=11,254,924 ops/sec
Below the results of running another Disruptor 3P:1C test with message batching (10 messages per a batch):
Run 0, Disruptor=116,009,280 ops/sec Run 1, Disruptor=128,205,128 ops/sec Run 2, Disruptor=101,317,122 ops/sec Run 3, Disruptor=98,716,683 ops/sec;
And finally there are the results from the Disruptor tests but with the LinkedBlockingQueue implementation of the 3P:1C scenario:
Run 0, BlockingQueue=4,546,281 ops/sec Run 1, BlockingQueue=4,508,769 ops/sec Run 2, BlockingQueue=4,101,386 ops/sec Run 3, BlockingQueue=4,124,561 ops/sec
As you can see the Railway approach provides average throughput 162,597,109 ops/sec, whereas the best result using the Disruptor for the same case was only 128,205,128 ops/sec. In the case of LinkedBlockingQueue the best result was just 4,546,281 ops/sec.
The Railway algorithm introduces an easy way for event batching that increases throughput significantly. It can be easily configurable to achieve desired results for throughput/latency by playing with train capacity or train count.
Also the Railway could be used for really complex cases by mixing producers and consumers when the same thread could be used to consume messages, process them and to return the results back to the ring:
Figure 12: Railway with mix of producers and consumers
And finally, I will provide optimized for ultra high throughput Single Producer/Single Consumer test:
Figure 13: Railway with single producer and single consumer
It had the following average results: the throughput more than one and a half billion (1,569,884,271) operations per second and the latency equals 1.3 microseconds. As you can see the results for the test are on the same order of magnitude with the results for the single threaded test described at the beginning of the article which was 2,681,850,373 operations per second.
Here I'll leave you to draw your own conclusions.
In a future article I hope to demonstrate how to back the Railway algorithm with Queue and BlockingQueue interfaces for different combinations of producers and consumers. Stay tuned.
About the Author
Aliaksei Papou is the Lead Software Engineer and Architect at Specific Group, a software development company located in Vienna, Austria. Aliaksei has more than 10 years experience in the development of small and large scale enterprise applications. He has a strong belief: writing concurrent code shouldn't be so hard.