BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Reactive Real-Time Notifications with SSE, Spring Boot, and Redis Pub/Sub

Reactive Real-Time Notifications with SSE, Spring Boot, and Redis Pub/Sub

Key Takeaways

  • The reactive approach for a real-time notification system efficiently handles a high volume of simultaneous requests, enabling optimal application scalability.
  • Reactive architectures leverage non-blocking operations to maximize system resource utilization, reducing system load and improving efficiency.
  • Spring Boot Reactive and Spring WebFlux frameworks enable reactive programming with asynchronous data flows, which is essential for implementing real-time notification management.
  • Redis Pub/Sub is a message broker that enables clients to subscribe to specific events and receive immediate notifications when those events occur.
  • The SSE protocol enables servers to send real-time notifications to clients asynchronously over a persistent connection, eliminating the need for continuous client requests.

Server-Sent Events

Server-Sent Events (SSE), standardized via the EventSource API, is a web technology that allows the server to asynchronously send data to clients over a persistent HTTP connection without them actively requesting it. This is particularly useful for cases where the server needs to inform the client about events or updates without the client having to make repeated polling calls.

Unlike traditional HTTP requests, SSE connections remain open after the first data packet is sent, using the same connection to send subsequent updates, reducing latency and the overhead of multiple connections.

SSE uses the GET method to establish a one-way connection from the server to the client. Once the connection is established, the server can send a series of events to the client through the same channel, keeping it open until either the server or the client explicitly closes it.

This mechanism is based on the keep-alive concept, which allows the TCP connection between client and server to remain open for long periods.

The data sent via the SSE are in text format and structured on separate lines. Two blank lines separate each event and may contain fields such as 'id', 'event', 'date' and 'retry'. The 'event' field represents the type of event, while the 'data' field contains the actual event data. The ‘id’ is a unique identifier for the message, used for recovery in case of disconnection, whereas the 'retry' indicates the amount of time to wait (in milliseconds) before attempting to reestablish the connection in the event of a disconnection.

id:514cf5d7-7ea6-4c57-8dd3-a7768f9a220d
event:GOAL
data:{"id":"1","score":"1-1","homeTeam":"Roma","awayTeam":"Lazio","scorer":"Lorenzo Pellegrini"}

SSEs are suitable for all scenarios where it is necessary to dynamically update the contents of a web page without having to reload the entire page. However, because they are unidirectional communications, there is no support for sending data from the client to the server over the same connection; to achieve a bidirectional connection, the most suitable choice is to use WebSockets.

Additional Features of SSE

Some additional features of SSEs improve reliability, robustness, and error handling in the connection between client and server.

Event IDs: servers can assign IDs to events using the 'id' field, and clients can use these IDs to keep track of the last event received. If a connection is lost and later reestablished, the client can send the ID of the last event received, allowing the server to resume transmission from where it was interrupted.

Automatic Reconnection and Retry: SSE clients attempt to reconnect automatically if the connection is lost. Servers can specify a custom interval for the retry of the connection using the 'retry' field in the event stream.

From the server-side perspective, I can specify a retry interval of 5 seconds.

public Flux<ServerSentEvent<String>> streamEventsWithRetry() {
   return Flux.interval(Duration.ofSeconds(1))
           .map(sequence -> ServerSentEvent.<String>builder()
                   .event("retry-event")
                   .data("Message " + sequence)
                   .retry(Duration.ofSeconds(5))
                   .build());
}

And from the client side, I have to attend 5 seconds to receive the event

const eventSource = new EventSource("/events");

eventSource.onmessage = function(event) {
   console.log("Events received with retry: ", event.data);
};

Error handling: clients can listen for errors using the 'onerror' event handler. This allows them to respond to communication failures or other problems that may be propagated by the server to the client.

public Flux<ServerSentEvent<String>> streamEventsWithErrorHandling() {
   return Flux.interval(Duration.ofSeconds(1))
           .map(sequence -> {
               return ServerSentEvent.<String>builder()
                       .event("error-event")
                       .data("Error Event " + sequence)
                       .build();
           })
           .onErrorResume(e -> Flux.just(ServerSentEvent.builder("Error: " + e.getMessage()).build()));
}

On the client side, I can handle the error like this

const eventSource = new EventSource("/events");

eventSource.onerror = function(event) {
   console.error("SSE Error:", event);
};

ReadyState: the EventSource object used by the client-side SSE channel has a property named "readyState", which allows it to check the current state of the connection (open, closed or connecting)

const eventSource = new EventSource("/events");

eventSource.onopen = function(event) {
   console.log("Connection open, readyState: ", eventSource.readyState);
};

eventSource.onmessage = function(event) {
   console.log("Event received: ", event.data);
};

eventSource.onerror = function(event) {
   console.error("SSE error, readyState: ", eventSource.readyState);
};

Spring Boot Reactive & WebFlux

Spring Boot Reactive is an extension of the Spring Boot framework that supports reactive programming. This type of programming is based on principles that allow applications to be written that can efficiently handle a large number of concurrent requests, making optimal use of available resources. These characteristics make this technology ideal in highly concurrent scenarios, such as web applications that manage real-time notifications from asynchronous data streams.

Reactive programming is based on four cornerstone principles:

  1. Responsive: a responsive application ensures fast response times, ensuring an enjoyable user experience with low latency
  2. Resilient: a responsive application ensures resilience to failures through replication mechanisms, resource isolation, and delegation of responsibilities
  3. Elastic: a reactive application scales efficiently in the face of peak loads, adapting to variable modulation of demands
  4. Message-Driven: a reactive application relies on asynchronous communication via messages, to manage the flow of data by propagating events without blocking the thread

The application of these principles leads to advantages in the end experience:

  • Scalability: in use cases with many I/O operations that cause blocking (such as database access or integration of external services), the asynchronous non-blocking allows more concurrent requests to be handled using fewer resources
  • UX & UI: the ability to deliver content in a reactive mode allows the construction of more fluid and responsive interfaces and user experiences, which compose and change as updates in information content occur
  • Error management: errors can be handled as events that, instead of interrupting the flow, travel along with the flow

Spring WebFlux is the reactive programming module within the Spring framework. It is an alternative to the Spring MVC model, which is based on a synchronous programming model.

Image source: the Spring.io website

According to the Reactive Streams specification, Spring WebFlux's asynchronous, non-blocking nature allows concurrent requests to be handled efficiently without having to dedicate separate threads to each request. Spring WebFlux natively integrates Reactor (built on top of the Reactive Streams specification), which offers a set of reactive tools to handle asynchronous operations and transformations on data streams.

In Spring WebFlux, you can work with two main types of reactive data:

  • Flux: Represents a data stream that can contain zero or more elements and is used to handle incoming data sequences, such as the results of a query to a database or the events of a real-time feed.

For example, consider the following code creates a Flux of strings:

Flux<String> items = Flux.just("Item 1", "Item 2", "Item 3", "Item 4");
items.subscribe(System.out::print);
//Output: Item 1Item 2Item 3Item 4
  • Mono: Represents a data stream that can contain at most one element and is used to represent single results or optional values.

For instance, the following code creates a Mono containing a single string:

Mono<String> item = Mono.just("Item 1");
item.subscribe(System.out::println);
// Output: Item 1

Pub/Sub with Redis

Redis Pub/Sub is an asynchronous messaging mechanism provided by Redis that allows different parts of an application to communicate with each other in real-time via the publish/subscribe paradigm.

In this model, the components that publish messages (publishers) place the messages on specific channels. The components interested in receiving these messages (subscribers) subscribe to one or more channels from which they receive the messages of interest.

Redis Pub/Sub exploits the asynchronous nature of messaging: when an event occurs, the publisher sends the event to the corresponding channel. All subscribers interested in that event automatically receive the message.

The possibility of exploiting multi-channel, i.e. the ability to create several dedicated communication channels, makes it possible to subdivide the flow of data, organizing it, for example, by individual event types.

This is the case with real-time notifications, where the affected components must receive real-time updates on certain event types and react to them without interrupting their execution flow.

A Reactive Mix

Now that we've presented all the ingredients let's mix them to build our fully responsive real-time notification system.

This will be the reference architecture we will use to show how the real-time notification system works.

Let's explore the code implementation of a component that provides an API for handling Server-Sent Events (SSE) connections. This component establishes a communication channel between a client (user's browser) and the server, enabling real-time updates.

We're using Spring Boot 3.3.0 and Java 21. The following dependencies are crucial for our implementation:

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
  • spring-boot-starter-webflux makes our component reactive, allowing non-blocking APIs based on Netty.
  • spring-boot-starter-data-redis-reactive enables reactive interaction with Redis, which is crucial for reacting to events published on channels the user subscribes to.

We'll work with a simple event model to represent match updates:

public record NotifyEvent(String id, String score, String homeTeam, String awayTeam, String scorer) {
}

The core of our implementation is the ScoreController, which exposes the API for subscribing to match events:

@RestController
@RequestMapping("/api/v1/scores")
public class ScoreController {

   @Autowired
   private ReactiveRedisOperations<String, NotifyEvent> eventRedisOperations;
   @Autowired
   private ObjectMapper objectMapper;

   private static final String SCORE_EVENT_TYPE = "GOAL";
   private static final String SCORE_CHANNEL_PREFIX = "scores:";

   @GetMapping(path = "/events/{idMatch}")
   Flux<ServerSentEvent<NotifyEvent>> getScoreEvents(@PathVariable String idMatch) {
       return Mono.just(ServerSentEvent.<NotifyEvent>builder().comment("connected").build())
               .mergeWith(Flux.interval(Duration.ofSeconds(30L))
                       .map(i -> ServerSentEvent.<NotifyEvent>builder().comment("keepalive").build()))
               .mergeWith(this.eventRedisOperations
                       .listenToChannel(
                               composeChannelName(idMatch))
                       .map(ReactiveSubscription.Message::getMessage)
                       .map(event -> ServerSentEvent.builder(event)
                               .id(UUID.randomUUID().toString())
                               .event(SCORE_EVENT_TYPE)
                               .data(objectMapper.convertValue(event,NotifyEvent.class))
                               .build()));
   }

   private String composeChannelName(String idMatch){
       return SCORE_CHANNEL_PREFIX + idMatch;
   }

}

Let's break down what this code does:

  1. Endpoint:
    1. The @GetMapping annotation exposes the API at /api/v1/scores/events/{idMatch}, where idMatch is the match identifier.
    2. The return type Flux<ServerSentEvent<NotifiyEvent>> indicates a stream of SSE events modeled after the NotifyEvent class.
  2. Connection Handling:
    1. Initially, a "connected" comment is sent to acknowledge the connection.
    2. A keepalive signal ("keepalive" comment) is sent every 30 seconds to maintain the connection.
  3. Redis Subscription:
    1. eventRedisOperations.listenToChannel(...) subscribes to a Redis channel based on the idMatch.
    2. Incoming events are mapped to ServerSentEvent objects with appropriate IDs, event types, and data.

To subscribe to the Redis Pub/Sub mechanism, I will need to use the following configuration class. It uses the RedisConfiguration class to set up the ReactiveRedisTemplate for interacting with Redis:

@Configuration(proxyBeanMethods = false)
public class RedisConfiguration {

   @Bean
   ReactiveRedisOperations<String, NotifyEvent> eventRedisOperations(
           ReactiveRedisConnectionFactory redisConnectionFactory, ObjectMapper objectMapper) {
       Jackson2JsonRedisSerializer<NotifyEvent> jsonRedisSerializer = new Jackson2JsonRedisSerializer<>(
               NotifyEvent.class);
       return new ReactiveRedisTemplate<>(redisConnectionFactory,
               RedisSerializationContext.<String, NotifyEvent>newSerializationContext()
                       .key(RedisSerializer.string())
                       .value(jsonRedisSerializer)
                       .hashKey(RedisSerializer.string())
                       .hashValue(jsonRedisSerializer)
                       .build());
   }
}

This configuration ensures proper serialization and deserialization of NotifyEvent objects when working with Redis.

Now, let's assume we have another component that's responsible for randomly publishing events to our Redis instance using the Pub/Sub mechanism.

With everything in place, if we access http://localhost:8080/api/v1/scores/events/1 in the browser, we'll start receiving real-time updates for the match with idMatch=1.

Security in Action

It is important to ensure the security of information in the segments of interest so that the transmission of data securely takes place and that the infrastructure is not vulnerable to attacks or malfunctions. This must be ensured both in client-server communication implementing the SSE channel and in transmitting information through Redis.

Communication between the client and server over the SSE channel must be authenticated and authorized, through the use of access tokens such as a JWT token and the application of authorization policies that go to verify that only eligible users can receive their data streams. All communications must be encrypted via HTTPS to protect the data in transit. In addition, rules must be applied to protect against cross-site scripting (XSS) and cross-site request forgery (CSRF) so that the information transmitted comes from authorized domains during legitimate sessions without the presence of malicious code in the messages sent.

Similar considerations apply to the protection of the Redis Pub/Sub channel, which makes it necessary to implement authentication and authorization policies and configure Redis ACL mechanisms to define specific permissions for different channels or different Redis commands. In addition, to ensure security for data in transit, it is possible to enable support for the SSL/TSL that is supported as an optional feature, starting with Redis version 6.

Use Cases in the Real World

Server-sent events can be used in all cases requiring continuous and unidirectionally updated client data in real-time. Below, we can list some real use cases in which to apply this model:

  • Real-time notifications: sending updates, alerts, or informational messages to users
  • Dashboards and monitoring: sending updates on metrics, charts and information without the need to reload the page
  • News feeds: updating content and news
  • Finance and trading applications: real-time updating of stocks, indexes and currencies
  • Tracking applications: sending real-time information on vehicles and people's movements
  • Collaboration applications: sending information about updates to objects such as documents, whiteboards, dashboards, tasks
  • Multiplayer games: sending updates on in-game events, such as actions of other players or changes to leaderboards

Conclusion

Through this article, we have seen how to design and implement a real-time notification system using technologies such as Server-Sent Events(SSE), Spring Boot Reactive with WebFlux, and Redis Pub/Sub, showing how with these technologies, it is possible to build a scalable, responsive, and performant architecture.

As a protocol for notification communication, SSE allowed a persistent, unidirectional HTTP connection to be established with information flowing from the server to the client. When I have a unidirectional data flow from the server to the client, this model allows us to eliminate polling and work in event-driven push mode.

Spring Boot Reactive supported their communication with the WebFlux module, which provides us with non-blocking request handling that can reactively process data streams from asynchronous contexts. All are held together by using Redis Pub/Sub as a message distribution backbone, capable of distributing data with low latency and horizontally scalable.

The complete source code for this project can be found on GitHub.

That's it for this article! I hope you found it informative and enjoyed learning how to implement Server-Sent Events with Spring Boot and Redis. Until next time!

About the Author

Rate this Article

Adoption
Style

BT