In keeping with IBM's commitment to their four-week release cycle of Open Liberty, version 19.0.0.9 was recently made available to the Java community with new features that include: support for reactive messaging; testing database connections with REST APIs; and support for OAuth 2.0 client secret and access token hashing.
First introduced in September 2017, Open Liberty is an open-source implementation of IBM's WebSphere Liberty application server to build microservices and cloud-native applications. Now compatible with Jakarta EE 8, Open Liberty's ongoing support for MicroProfile includes the new standalone reactive streams APIs: MicroProfile Reactive Streams Operators (version 19.0.0.4); MicroProfile Context Propagation (version 19.0.0.8); and MicroProfile Reactive Messaging (version 19.0.0.9).
Reactive Streams
The Reactive Streams initiative, based on the Reactive Manifesto, started in 2013 as a collaboration among Lightbend, Netflix and Pivotal. The initial specification, released in 2015, was ultimately adopted with the release of JDK 9 as the java.util.concurrent.Flow
API.
MicroProfile Reactive Messaging
Open Liberty 19.0.0.9 provides a full implementation of the MicroProfile Reactive Messaging 1.0 API. Released in July 2019, Reactive Messaging provides asynchronous messaging support to send, receive, and process messages in a microservices-based architecture. Utilizing the @Incoming
and @Outgoing
annotations on methods to consume and feed messages, respectively, Open Liberty processes the annotated methods connected by channels. Consider the following diagram:
The publisher method, send()
, sends the message "hello" to the processing method, process()
, where the messages "hello" and "world," are concatenated and sent to the subscribing method, receive()
. This can be coded as follows:
@Outgoing("hello")
public String send() {
return "hello";
}
@Incoming("hello")
@Outgoing("world")
public String process(String input) {
return input + " world";
}
@Incoming("bar")
public void receive(String input) {
System.out.println("Message received: " + input);
}
Reactive Messaging depends upon the MicroProfile APIs, Reactive Streams Operators and CDI (JSR 365).
MicroProfile Context Propagation
A full implementation of the MicroProfile Context Propagation 1.0 API was introduced in Open Liberty 19.0.0.8. Also released in July 2019, Context Propagation, formerly known as MicroProfile Concurrency, is built upon the Java SE CompleteableFuture
class that enables developers to create pipelines of dependant stages that run with predictable and reliable threads on the Open Liberty global thread pool.
Interfaces specified in this API include ManagedExecutor
and ThreadContext
. Instances of each are achieved through their respective builder patterns.
ThreadContext threadContext = ThreadContext.builder()
.propagated(ThreadContext.SECURITY)
.build();
ManagedExecutor executor = ManagedExecutor.builder()
.maxAsync(5)
.propagated(ThreadContext.CDI, ThreadContext.APPLICATION)
.build();
CompletableFuture<Long> stage = CompletableFuture.supplyAsync(supplier1)
.thenApplyAsync(function1)
.thenApply(threadContext.contextualFunction(function2));
A completion stage, not created by a managed executor, may still run with a predictable thread context if its action is pre-contextualized with the contextualFunction()
method defined in ThreadContext
. In the example above, supplier1
and function1
run with non-deterministic thread context. However, the pre-contextualized action, function2
, always runs with the context of the thread that invoked the contextualFunction()
method.
Managed executors in Context Propagation are fully compatible with the Java SE ManagedExecutorService
interface.
MicroProfile Reactive Streams Operators
A full implementation of the MicroProfile Reactive Streams Operators 1.0 API was Introduced in version 19.0.0.4. Released in January 2019, Reactive Streams Operators allows developers to create reactive streams, process the data transiting in those streams, and accumulate results.
Interfaces specified in this API include: PublisherBuilder
; SubscriberBuilder
; and ProcessorBuilder
. It's important to note that due to the 40-50 individual requirements and approximately 100 tests in the TCK, the Reactive Streams Operators interfaces were not intended to be directly implemented by developers. As stated on the website:
The semantics defined by Reactive Streams are very strict, and are non trivial, particularly in the areas of thread safety, to implement correctly. Typically, application developers are expected to use third party libraries that provide the tools necessary to manipulate and control streams.
The ReactiveStreams
class provides factory methods for publisher and processor builders. Consider the example marble diagram below where the of()
method defined in ReactiveStreams
creates an instance of PublisherBuilder
that emits a given set of elements:
This can be coded and manipulated as follows:
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
public class Main {
public static void main(String[] args) {
// create a stream of numbers
ReactiveStreams.of(3, 1, 5, 6)
.filter(s -> s > 4)
.forEach(number -> System.out.println(">> " + number))
.run();
}
}
With the applied filter, only the numbers, 5 and 6, are displayed.
Support for all three MicroProfile reactive streams APIs is now complete with this latest Open Liberty release.
Resources
- Reactive Programming in Microservices with MicroProfile on Open Liberty 19.0.0.4 by Laura Cowen (April 26, 2019)
- Enhanced concurrency capabilities with MicroProfile Context Propagation on Open Liberty 19.0.0.8 by Yasmin Aumeeruddy (August 16, 2019)
- Open Liberty is Jakarta EE 8 Compatible by Alasdair Nottingham (September 10, 2019)
- Sending and Receiving Messages Between Microservices with MicroProfile Reactive Messaging by Andrew Rouse and Gordon Hutchison (September 13, 2019)
- Reactive Messaging Between Microservices with MicroProfile on Open Liberty 19.0.0.9 by Tom Jennings (September 13, 2019)