BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Building Reactive Applications with Akka Actors and Java 8

Building Reactive Applications with Akka Actors and Java 8

Key Takeaways

  • The actor model provides a higher level of abstraction for writing concurrent and distributed systems, which shields the developer from explicit locking and thread management.
  • The actor model provides the core functionality of reactive systems, defined in the Reactive Manifesto as responsive, resilient, elastic, and message driven.
  • Akka is an actor-based framework that is easy to implement with the available Java 8 Lambda support.
  • Actors enable developers to design and implement systems in ways that lets us focus more on the core functionality and less on the plumbing.
  • Actor based systems are the perfect foundation for quickly evolving microservices architectures

 

While the term “reactive” has been around for a long time, only recently has it been recognized by the industry as the de facto way forward in system design, and has hit mainstream adoption. In 2014 Gartner wrote that the three-tier architecture that used to be so popular was beginning to show its age. This has become even clearer with the ongoing modernisation efforts driven by enterprise, which had to start rethinking the way they learned to build applications for more than a decade.

Microservices are taking the software industry by storm and the shock waves are shaking a traditional development workflow to the core. We’ve seen software design paradigms change and project management methodologies evolve. But a consequent shift towards new application design and implementation has been forging it’s way through IT systems with an unprecedented momentum. And even if the term microservices isn’t completely new, our industry is realizing that it’s not only about coupling RESTful endpoints and slicing monoliths; the real value lies in better resources consumption and extreme scalability for unpredictable workloads. The traits of the Reactive Manifesto are quickly becoming the new bible for microservice based architectures as they are essentially distributed reactive applications.

The Actor Model in Today’s Applications

Applications must be highly responsive to retain user interest, and must evolve quickly to remain relevant to meet the ever-changing needs and expectations of the audience. The technologies available for building applications continue to evolve at a rapid pace; science has evolved, and the emerging requirements cannot rely on yesterday’s tools and methodologies. One concept that has emerged as an effective tool for building systems that take advantage of the processing power harnessed by multicore, in-memory, clustered environments is the actor model.

The actor model provides a relatively simple but powerful model for designing and implementing applications that can distribute and share work across all system resources—from threads and cores to clusters of servers and data centers. It provides an effective framework for building applications with high levels of concurrency and for increasing levels of resource efficiency. Importantly, the actor model also has well-defined ways for handling errors and failures gracefully, ensuring a level of resilience that isolates issues and prevents cascading failures and massive downtime.

In the past, building highly concurrent systems typically involved a great deal of low-level wiring and very technical programming techniques that were difficult to master. These technical challenges competed for attention with the core business functionality of the system, because much of the effort had to be focused on the functional details, requiring a considerable amount of time and effort for plumbing and wiring. When building systems with actors, things are done at a higher level of abstraction because the plumbing and wiring are already built into the actor model. Not only does this liberate us from the gory details of traditional system implementation, it also allows for more focus on core system functionality and innovation.

Actors with Java 8 and Akka

Akka is a toolkit and runtime for building highly concurrent, distributed, and resilient message-driven applications on the JVM. Akka “actors” are one of the tools in the Akka toolkit, that allow you to write concurrent code without having to think about low level threads and locks. Other tools include Akka Streams and the Akka http. Although Akka is written in Scala, there is a Java API, too. The following examples work with Akka 2.4.9 and above but the Java with lambda support part of Akka is marked as “experimental” as of its introduction in Akka 2.3.0 so expect some further changes until it is officially released and no longer experimental.

The actor is the basic unit of work in Akka. An actor is a container for state and behavior, and can create and supervise child actors. Actors interact with each other through asynchronous messages, which are processed synchronously, one message at a time. This model protects an actor’s internal state, makes it thread safe, and implements event-driven behavior that won’t block other actors. To get started, you don’t need much more than the akka maven dependency.

<dependency>
   <groupId>com.typesafe.akka</groupId>
   <artifactId>akka-actor_2.11</artifactId>
   <version>2.4.9</version>
</dependency>

Changing Actor State

Just like we exchange text messages on mobile devices, we use messages to invoke an actior. And also like text messages, the messages between actors must be immutable. The most important part of working with an actor is to define the messages it can receive. (The message is also commonly referred to as protocol because it defines the point of interaction between actors.) Actors receive messages and react to them in various ways. They can send other messages, change their state or behavior, and create other actors.

An actor’s initial behavior is defined by implementing the receive() method with the help of a ReceiveBuilder in the default constructor. receive()matches incoming messages and executes related behavior. The behavior for each message is defined using a Lambda Expression. In the following example the ReceiveBuilder uses a reference to the interface method “onMessage”. The onMessage method increases a counter (internal state) and logs an info message via the AbstractLoggingActor.log method.

static class Counter extends AbstractLoggingActor {
    static class Message { }

    private int counter = 0;

    {
      receive(ReceiveBuilder
        .match(Message.class, this::onMessage)
        .build()
      );
    }

    private void onMessage(Message message) {
      counter++;
      log().info("Increased counter " + counter);
    }
}

With the actor in place, it needs to be started. This is done via the ActorSystem, which controls the lifecycle of the actors in it. But first we need to supply some additional information about how to start the actor. The akka.actor.Props is a configuration object, that is used to expose pieces of context scoped configuration to all pieces of the framework. It is used in creating our actors; it is immutable, so it is thread-safe and fully shareable.

return Props.create(Counter.class);

The Props object describes the constructor arguments of the actor. It is a good practice to encapsulate it into a factory function that is closer to the actor’s constructor. The ActorSystem itself is created in the main method.

public static void main(String[] args) {
    ActorSystem system = ActorSystem.create("sample1");
    ActorRef counter = system.actorOf(Counter.props(), "counter");

}

Both ActorSystem (“sample1”) and the contained actor (“counter”) are given names for navigating actor hierarchies; more on these later. Now the ActorRef can send a message to the actor, for example:

counter.tell(new Counter.Message(), ActorRef.noSender());

Here, the two parameters define the Message to be sent, and the sender of the message. (As the name implies, noSender indicates the sender isn’t used in this case.) If you run the above example, you get the expected output:

[01/10/2017 10:15:15.400] [sample1-akka.actor.default-dispatcher-4] [akka://sample1/user/counter] Increased counter 1

This is a very simple example, yet it supplies all of our desired thread safety. Messages that are sent to actors from different threads are shielded from concurrency problems, since the actor framework serializes the message processing. You can find the complete example online.

Changing Actor Behaviour

You will notice that our simple example modified the actor state, but it never changed its behaviour and it never sent messages to other actors. Consider the case of a burglar alarm system; it can be enabled and disabled with a password, and its sensor detects activity. If someone tries to disable the alarm without the correct password, it will sound. The actor can react to three messages: disable, enable with a password (supplied as a payload), and the burglary activity. All of them go into the contract:.

static class Alarm extends AbstractLoggingActor {
    // contract
    static class Activity {}
    static class Disable {
      private final String password;
      public Disable(String password) {
        this.password = password;
      }
    }
    static class Enable {
      private final String password;
      public Enable(String password) {
        this.password = password;
      }
    }

    // ...
}

The actor gets a preset attribute for the password, which is also passed into the constructor.

private final String password;
public Alarm(String password) {
      this.password = password;
     // ...
}

The aforementioned akka.actor.Props configuration object also needs to know about the password attribute, in order to pass it to the actual constructor when the actor system is started.

public static Props props(String password) {
      return Props.create(Alarm.class, password);
}

The Alarm actor also needs a behaviour for each possible message. These behaviours are implementations of the receive method of AbstractActor. The receive method should define a series of match statements (each of type PartialFunction<Object, BoxedUnit>) defining which messages the actor can handle, along with the implementation of how the messages should be processed.

private final PartialFunction<Object, BoxedUnit> enabled;
private final PartialFunction<Object, BoxedUnit> disabled;

If this signature seems frightening, your code can effectively sweep it under the covers by using a ReceiveBuilder that can be used as shown earlier.

public Alarm(String password) {
      this.password = password;
      
     enabled = ReceiveBuilder
        .match(Activity.class, this::onActivity)
        .match(Disable.class, this::onDisable)
        .build();

      disabled = ReceiveBuilder
        .match(Enable.class, this::onEnable)
        .build();

      receive(disabled);
    }
}

Note that the call to receive at the end, sets the default behaviour to “disabled”. All three behaviours are implemented using three existing methods (onActivity, onDisable, onEnable). The easiest of these methods is onActivity. If there is activity, the alarm logs a string to the console. Note that there is no message payload required for activity, so we just give it the name ignored.  

 private void onActivity(Activity ignored) {
    log().warning("oeoeoeoeoe, alarm alarm!!!");
 }

If the actor receives an enable message, the new state will be logged to the console and the state changed to enabled. In case the password doesn’t match, a short warning is logged. The message payload now contains the password and we can access it to validate the password.

private void onEnable(Enable enable) {
   if (password.equals(enable.password)) {
     log().info("Alarm enable");
     getContext().become(enabled);
   } else {
     log().info("Someone failed to enable the alarm");
   }
}

In case a disable message is received, the actor needs to check the password, log a short message about the changed state and actually change the state to disabled or log a warning message that the password was wrong.

private void onDisable(Disable disable) {
  if (password.equals(disable.password)) {
    log().info("Alarm disabled");
    getContext().become(disabled);
  } else {
    log().warning("Someone who didn't know the password tried to disable it");
  }
}

That completes the actor logic and we are ready to start the actor system and send it a couple of messages. Note that our secret password “cats” is passed as a property to the actor system.

ActorSystem system = ActorSystem.create();
    final ActorRef alarm = system.actorOf(Alarm.props("cat"), "alarm");

The messages:

    alarm.tell(new Alarm.Activity(), ActorRef.noSender());
    alarm.tell(new Alarm.Enable("dogs"), ActorRef.noSender());
    alarm.tell(new Alarm.Enable("cat"), ActorRef.noSender());
    alarm.tell(new Alarm.Activity(), ActorRef.noSender());
    alarm.tell(new Alarm.Disable("dogs"), ActorRef.noSender());
    alarm.tell(new Alarm.Disable("cat"), ActorRef.noSender());
    alarm.tell(new Alarm.Activity(), ActorRef.noSender());

produce the output as follows:

[01/10/2017 10:15:15.400] [default-akka.actor.default-dispatcher-4] [akka://default/user/alarm] Someone failed to enable the alarm
[01/10/2017 10:15:15.401] [default-akka.actor.default-dispatcher-4] [akka://default/user/alarm] Alarm enable
[WARN] [01/10/2017 10:15:15.403] [default-akka.actor.default-dispatcher-4] [akka://default/user/alarm] oeoeoeoeoe, alarm alarm!!!
[WARN] [01/10/2017 10:15:15.404] [default-akka.actor.default-dispatcher-4] [akka://default/user/alarm] Someone who didn't know the password tried to disable it
[01/10/2017 10:15:15.404] [default-akka.actor.default-dispatcher-4] [akka://default/user/alarm] Alarm disabled

You can find the complete working example online. Until now we only used individual actors to process messages. But just like in a business organization, actors form natural hierarchies.

Actor hierarchies

Actors may create other actors. When one actor creates another actor, the creator is known as the supervisor and the created actor is known as the worker. Worker actors may be created for many reasons, the most common reason being to delegate work. The supervisor creates one or more worker actors and delegates work to them.

The supervisor also becomes a caretaker of the workers. Just as with a parent watching over the well-being of their children, the supervisor tends to the well-being of its workers. If a worker runs into a problem, it suspends itself (which means that it will not process normal messages until resumed) and notifies its supervisor of the failure.

So far, we created actors and assigned a name. Actor names are used to identify actors in the hierarchy. The actor that is generally most interacted with is the parent of all user-created actors, the guardian with the path "/user". Actors created using the primordial system.actorOf() are direct children of this guardian, and when it terminates, all normal actors in the system will be shutdown as well. In the above alarm example, a user Actor with the path /user/alarm was created. Since actors are created in a strictly hierarchical fashion, there exists a unique sequence of actor names given by recursively following the supervision links between child and parent down towards the root of the actor system. This sequence can be seen as enclosing folders in a file system, hence the adopted name “path” to refer to it, although actor hierarchy has some fundamental difference from file system hierarchy.

Inside of an actor you can call getContext().actorOf(props, “alarm-child”) to create a new actor named “alarm-child” as a child of the alarm actor . The child lifecycle is bound to the supervising actor, which means that if you stop the “alarm” actor, it will also stop the child:

This hierarchy also has a significant influence on how failures are handled in actor based systems. The quintessential feature of actor systems is that tasks are split up and delegated until they become small enough to be handled in one piece. In doing so, not only is the task itself clearly structured, but the resulting actors can be reasoned about in terms of:

  • which messages they should process
  • how they should react normally
  • and how failures should be handled.

If one actor does not have the means for dealing with a certain situation, it sends a corresponding failure message to its supervisor, asking for help. The supervisor has four different options for reacting to a failure:

  • Resume the child, keeping its accumulated internal state but ignoring the message that lead to the failure.
  • Restart the child, clearing out its accumulated internal state by starting a new instance.
  • Stop the child permanently and send all future messages for the child to the Dead-Letter Office.
  • Escalate the failure, thereby failing the supervisor itself

Let’s concretize all of this with an example: A NonTrustWorthyChild receives Command messages and increases an internal counter with each message. If the message count is divisible by 4 it throws a RuntimeException, which will be escalated to the Supervisor. Nothing really new here, as the command message has no payload.

public class NonTrustWorthyChild extends AbstractLoggingActor {

  public static class Command {}
  private long messages = 0L;

  {
    receive(ReceiveBuilder
      .match(Command.class, this::onCommand)
      .build()
    );
  }

  private void onCommand(Command c) {
    messages++;
    if (messages % 4 == 0) {
      throw new RuntimeException("Oh no, I got four commands, can't handle more");
    } else {
      log().info("Got a command " + messages);
    }
  }

  public static Props props() {
    return Props.create(NonTrustWorthyChild.class);
  }
}

The Supervisor starts the NonTrustWorthyChild in it’s constructor and forwards all command messages that he receives directly to the child.

public class Supervisor extends AbstractLoggingActor {
{
    final ActorRef child = getContext().actorOf(NonTrustWorthyChild.props(), "child");

    receive(ReceiveBuilder
      .matchAny(command -> child.forward(command, getContext()))
      .build()
    );

  }
  //…
}

When the Supervisor actor is actually started, the resulting hierarchy will be “/user/supervisor/child”. Before this can be done, the so called supervision strategy needs to be defined. Akka provides two classes of supervision strategies: OneForOneStrategy and AllForOneStrategy. The difference between them is that the former applies the obtained directive only to the failed child, whereas the latter applies it to all siblings as well. Normally, you should use the OneForOneStrategy, which is the default if none is explicitly specified. It is defined by overriding the SupervisorStrategy method.

@Override
public SupervisorStrategy supervisorStrategy() {
   return new OneForOneStrategy(
      10,
      Duration.create(10, TimeUnit.SECONDS),
      DeciderBuilder
          .match(RuntimeException.class, ex -> stop())
          .build()
   );
}

The first parameter defines the maxNrOfRetries, which is the number of times a child actor is allowed to be restarted before the child actor is stopped. (A negative value indicates no limit). The withinTimeRange parameter defines a duration of the time window for maxNrOfRetries. As defined above the strategy tries 10 times in 10 seconds. The DeciderBuilder works exactly as the ReceiveBuilder to define matches on occurring exceptions and how to react to them. In this case if there are 10 retries within 10 seconds the Supervisor stops the NonTrustWorthyChild and all remaining messages are sent to the dead letter box. .

The actor system is started with the Supervisor actor.

ActorSystem system = ActorSystem.create();
final ActorRef supervisor = system.actorOf(Supervisor.props(), "supervisor");

When the system is up, we start to send 10 command messages to the Supervisor. Note, that the message “Command” was defined in the NonTrustWorthyChild.

for (int i = 0; i < 10; i++) {
  supervisor.tell(new NonTrustWorthyChild.Command(), ActorRef.noSender());
}

The output shows, that after four messages the exception gets escalated to the Supervisor and the remaining messages are sent to the deadLetters box. If the SupervisorStrategy would have been defined to restart() instead of stop(), a new instance of the NonTrustWorthyChild actor would have been started.

[01/10/2017 12:33:47.540] [default-akka.actor.default-dispatcher-3] [akka://default/user/supervisor/child] Got a command 1
[01/10/2017 12:33:47.540] [default-akka.actor.default-dispatcher-3] [akka://default/user/supervisor/child] Got a command 2
[01/10/2017 12:33:47.540] [default-akka.actor.default-dispatcher-3] [akka://default/user/supervisor/child] Got a command 3
[01/10/2017 12:33:47.548] [default-akka.actor.default-dispatcher-4] [akka://default/user/supervisor] Oh no, I got four commands, I can't handle any more
java.lang.RuntimeException: Oh no, I got four commands, I can't handle any more
	...
[01/10/2017 12:33:47.556] [default-akka.actor.default-dispatcher-3] [akka://default/user/supervisor/child] Message [com.lightbend.akkasample.sample3.NonTrustWorthyChild$Command] from Actor[akka://default/deadLetters] to Actor[akka://default/user/supervisor/child#-1445437649] was not delivered. [1] dead letters encountered. 

This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

You can follow up with the complete example online and play around with the SupervisorStrategy.

Summary

With Akka and Java 8, it is now possible to create distributed, microservices-based systems that just a few years ago were the stuff of dreams. Enterprises across all industries now desire the ability to create systems that can evolve at the speed of the business and cater to the whims of users. We can now elastically scale systems that support massive numbers of users and process huge volumes of data. It is now possible to harden systems with a level of resilience that enables them to measure downtime not in hours but seconds. Actor based systems enable us to create quickly evolving microservice architectures that can scale and run without stopping.

It’s the actor model that provides the core functionality of reactive systems, defined in the Reactive Manifesto as responsive, resilient, elastic, and message driven.

The fact that actor systems can scale horizontally, from a single node to clusters with many nodes, provides us with the flexibility to scale our systems for massive load. In addition, it is also possible to implement systems with the capability to scale elastically, that is, scale the capacity of systems, either manually or automatically, to adequately support the peaks and valleys of system activity.

With actors and actor systems, failure detection and recovery is an architectural feature, not something that can be patched on later. Out of the box you get actor supervision strategies for handling problems with subordinate worker actors, up to the actor system level, with clusters of nodes that actively monitor the state of the cluster, where dealing with failures is baked into the DNA of actors and actor systems. This starts at the most basic level with the asynchronous exchange of messages between actors: if you send me a message, you have to consider the possible outcomes; what do you do when you get the reply you expect… or not!? This goes all the way towards implementing strategies for handling nodes that leave and join a cluster.

Thinking in terms of actors is in many ways much more intuitive for us to think about when designing systems. The way actors interact is more natural to us since it has, on a simplistic level, more in common with how we as humans interact. This enables us to design and implement systems in ways that allow us to focus more on the core functionality of the systems and less on the plumbing.

About the Author

Markus Eisele is a Java Champion, former Java EE Expert Group member, founder of JavaLand, reputed speaker at Java conferences around the world, and a very well known figure in the Enterprise Java world. He works as a developer advocate at Lightbend. Find him on Twitter @myfear.

Rate this Article

Adoption
Style

BT