BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Reactive Cloud Actors: An Evolvable Web of Events

Reactive Cloud Actors: An Evolvable Web of Events

Leia em Português

Lire ce contenu en français

Carl Hewitt defined the Actor Model in 1973 as

a mathematical theory that treats “Actors” as the universal primitives of concurrent digital computation.

Actors have been the focus of recent trends in distributed computing. Lately, there has been a lot of buzz around actors and their place in the cloud-based concurrent processing and in the Big Data world. While actor model has been historically used mainly to model and build parallel computing inside a single software process running on a single machine, it is now a fitting model for heavily-parallel processing in a cloud environment.

In this article we will first define three types of actors and explore their roles and responsibilities. We then review a paradigm of breaking business processes to an "evolvable web of events" in which any "significant" business step generates a durable event that can be further processed independently by one or more actors. We also have a look at a few real world examples of Actor Model. In the end, we consider implications of adopting this pattern and review a sample implementation of this approach using BeeHive library.

Actor Model

Carl Hewitt, along with Peter Bishop and Richard Steiger, published an article back in 1973 that proposed a formalism that identified a single class of objects, i.e. Actors, as the building blocks of systems designed to implement Artificial Intelligence algorithms.

A similar pattern can be later seen in Neural Networks where nodes act as the building block of the neural network - where a node has a bias, one or multiple inputs (and their corresponding weights) and one or multiple output (with their corresponding weights).

The difference is that an actor, unlike a neural node, is not just a simple computational unit. Actors inherently are autonomous units within a larger network that are triggered by receiving a message.

Actor

According to Hewitt an actor, in response to a message, can:

  1. send a finite number of messages to other actors
  2. create a finite number of other actors
  3. decide on the behaviour to be used for the next message it receives

Any combination of these actions can occur concurrently and in response to messages arriving in any order - as such, there is no constraint with regard to ordering and an actor implementation must be able to handle messages arriving out of band.

Processor Actor

In a later description of the Actor Model, first constraint is re-defined as "send a finite number of messages to the address of other actors". Addressing is an integral part of the model that decouples actors and limits the knowledge of actors from each other to mere a token (i.e. address). Familiar implementation of addressing includes Web Services endpoints, Publish/Subscribe queue endpoints and email addresses. Actors that respond to a message by using the first constraint can be called Processor Actors.

Factory Actor

Second constraint makes actors capable of creating other actors that we conveniently call Factory Actors. Factory actors are important elements of a message-driven system where an actor is consuming from a message queue and create handlers based on the message type. Factory actors control the lifetime of the actors they create and have a deeper knowledge of the actors they create - compared to processor actors knowing mere an address. It is useful to separate factory actors from processing ones - in line with the single responsibility principle.

Stateful Actor

Third constraint is the Stateful Actor. Actors capable of the third constraint have a memory that allows them to react differently on subsequent messages. Such actors can be subject to a myriad of side-effects. Firstly, when we talk about "subsequent messages" we inherently assume an ordering while as we said, there is no constraint with regard to ordering: an out of band message arrival can lead to complications. Secondly, all aspects of CAP theorem applies to this memory making a consistent yet highly available and partition tolerant impossible to achieve. In short, it is best to avoid stateful actors.

As originally proposed, an actor can act based on any combination of the three constraints but we stress that building actors with a single responsibility leads to a better and more decoupled design. In this article, we will heavily rely on this separation and propose solutions that take advantage of its benefits - and we will show an example of such implementation.

Actor Dependency

Actor Model, by constraining interactions of an actor, reduces knowledge of an actor from its surroundings. A Processor Actor only has to know the address of actors next up in the chain. We will even remove this knowledge by getting actors to generate an event which describes (event name) and documents (event state) what has happened and the network of actors can pick up the event based on its type.

A Factory Actor only has the knowledge of creating actors to pass the message it has received. It controls the lifetime of the actors it creates but essentially is oblivious to the processing details of the processor actor.

By reducing dependency, coupling of the elements of the system diminishes allowing for evolvability of the system. Behaviour of each actor can be independently modified and in fact, each actor can be deployed independently of others. Reduced dependency is one of the most important virtues of an actor-based system. It leads to the simplification of design, reducing dependency and decoupling elements comprising an application.

All actors, in addition to what mentioned above, depend on a read-only set of configuration parameters that provide the knowledge of the milieu they operate in. These configuration parameters could include various aspects of environments (such as endpoints addresses) or behaviour defaults (such as retry counts, buffer sizes or tracing verbosity).

Reactive Actors

Actors can be implemented in an imperative or reactive fashion. In case of Imperative actors, the message an actor sends is an RPC message destined for an actor type whose interface, type and endpoint are known to the initiating actor. The message is typically delivered through durable message buses. This is probably very similar to an enterprise service bus implementation where the deployable artefacts are reduced to the simplest possible form.

(Click on the image to enlarge it)

In case of a Reactive actors, the sender simply publishes an event signifying the business process accomplished (OrderAccepted, FraudDetected, ItemOutOfStock, ItemBackInStock, etc) and other actors choose to subscribe to such events and perform their actions. In this case, actors can evolve independently and business processes modified with only change to a single or a handful of actors. This results in higher level of decoupling and is a good fit for developing both analytics and transactional systems on the horizontal scale provided by the cloud.

Implementations of Reactive Actors already exist in the industry. Fred George's body of work on Reactive MicroServices is a prime example. Amazon Kinesis can be viewed as a coarsely granular Reactive Actor framework.

Modelling a system based on actors

Actor Model systems have been in use since the 1970s. These systems have leveraged actors to implement heavily-parallel systems. Erlang was one of the language that included Actor Model support in the language itself. Erlang applications have been successfully used in Telecomms industry for years.

Nowadays, with a relatively cheap and highly available commoditised compute and storage, we can deploy a system to hundreds or thousands of nodes with just a push of a button. The challenge is to design systems that can take advantage of all this power. A common mistake with the adoption of the cloud has been lift-and-shift approach which is basically deploying the same on-premise tiered architecture with a traditional RDBMS backend database onto the cloud - which in most cases just misses the point. Building systems that can scale horizontally and linearly requires new paradigms in modelling data structures, compute and dependencies.

Here we are aiming for a model that is horizontally and linearly scalable, is robust and highly-available yet is easy to evolve. Let's look at various aspects of such a system.

Messaging

Message based systems have long existed in the industry and were a mainstay of SOA adoption using Enterprise Service Bus. On the other hand, HTTP defines the communication in terms of a Responsemessage in response to a Request. A SOAP Service responds to SOAP request by sending back a SOAP message. Even at a lower level, in the RPC paradigm a method call can be modelled as a request message and a response message. So what can be different here with regard to messaging?

When it comes to Processor Actors, they only have to know the name of the topic from which they read messages and the name of the topic to which they send their message; details such as addressing, delivery, fail-over, etc can be left to the underlying message broker.

In order to build a robust actor-based system, one should use a topic-based, durable, highly-available, fault-tolerant and horizontally scalable messaging system. Example of this on the cloud includes Apache Kafka and Windows Azure Service Bus. EventStore or RabbitMQ is an alternative on-premise (or in the cloud) option.

Message, Event and Fact

A message is a piece of information that is destined for an actor. Traditionally messages have had a header and a body. This same definition suits an actor message. On the other hand, an event is normally defined as "something that has happened in the past".

In many ways an event and a message are similar. In fact Enterprise Integration Patterns defines Event as a subset of a message, i.e. Event Message. However, the book later notes that the difference between a Document Message (which is typically what is meant by the term Message) and an Event Message is timing and content. In other words, it believes Guaranteed Delivery is not important for an event and events can mainly be used for Business Intelligence (i.e. measuring business metrics) where there is tolerance for non-exact accuracy.

There is another end of spectrum that use events as the sole source of truth in a system - Event-Sourced-based systems are the main proponents of such approach. All other data is generated off these events and either stored in durable read models or in volatile memory-based soft stores.

We define Event as a time-stamped, immutable, unique and eternally-true piece of information. This definition is similar to the definition of Fact described by Nathan Marz. We realise that not all events have the same importance for a business, hence their storage and availability requirements are different. For example, in a retail e-Commerce scenario (amazon.com example), an OrderSubmitted event containing order details (product variants and their quantities) are of utmost importance to the business while the transitional stages of the order (products added then removed, quantities changed, etc) while useful for analytics, does not have the same business criticality. Also not all data is worth capturing: storing exact position of user's mouse at every millisecond when visiting the site is not helpful. So a business needs to make a judgement on what data to capture. On the other hand, storing Highly-Available data is expensive and we need to differentiate between SLA requirements of different types of data. As such we identify two types of events: business events and log events.

Business events are the building blocks of the business workflow and require transactional level of consistency and durability while log events are mainly used for measuring business metrics and analytics and don not require the same level of availability. Business events are fundamental to the core business (e.g. online selling for a retail e-Commerce) while log events are the data supporting the business in decision-making, handling customer care or technical troubleshooting. Losing any business events will be considered a serious incident while losing log events, while important for the business, will not have a catastrophic impact on the business function.

Events as a means of Inversion of Control

Dependency Injection (DI) is a mainstay of conventional software development and is a pattern of achieving the Inversion of Control principle. Events in a loosely-coupled architecture achieve the same goal that DI achieves in a piece of software. By removing the knowledge of the consumers from the producer of an event, we achieve the same degree of decoupling.

This is in contrast to a Command that is destined for a single handler.

This is basically the same Hollywood Principle "Don't call us, we'll call you" that is used in dependency injection. Using events lead to reactive programming that has been gaining popularity over the last few years. Using Reactive Programming (such as Reactive Extensions - Rx) simplifies dependency networks in a complex software, while using Reactive and event-driven architecture (as described in the Reactive Manifesto) can lead to a highly decoupled and evolvable architecture.

Replacing commands with events is the most fundamental break from the traditional ESB-based SOA approaches.

Modelling an event

As we described earlier, An event is a time-stamped, immutable, unique and eternally-true piece of information. Business examples of an event include: CustomerCreated, OrderDispatched, BlogTagAdded, etc.

It is convenient to ascribe types to events. Type of an event describes the kind of information it contains. An event type is normally unique within the system.

An event usually refers to one or more aggregates (see below) within the system. For example, CustomerCreated naturally contains the CustomerId while the BlogTagAdded contains PostId and the tag (if the tag is modelled as a Value Object) or TagId (if it is an entity).

Every event - regardless of its content - requires to have an identity of its own. It is possible for two events to contain the same information, for example StaffTakenOffsick. It is useful to use a GUID as the identity of event. In some cases it is possible to use the type plus the Id of the identities (for example there could be only a single CustomerCreated event for a CustomerId), however, identity of an event should always be defined independently of its content..

Event needs to economically capture all the information pertaining to the event. When modelling for Event Sourcing, the event should capture all the information related to the event in a way that all the different statuses an entity goes through can be reproduced from the stream of events for that entity. Event sourcing is specifically important for Big Data Analytics. This is where we are interested in all the information. ItemAddedToBasket is an important event for analytics while for transactional systems it has no bearance until it gets submitted as part of an order. On the other hand, a CustomerCreated event with all the information about the customer is mandatory for Big Data systems where data needs to be stored independently of transactional systems but not for a Credit Check service that has direct access to the data and is interested only in current status of the customer.

Basically for a transactional system, we favour publishing smaller events and relying for the Basic Data Structures for storing the rest of the state. In this case, events are used as the indicators of transitions of the state through the system and the state itself is stored in data structure stores we will shortly visit. This approach relies that all of the system has access to the data structure stores - an assumption that is conveniently achievable in a cloud scenario.

The problem with the approach above is that you will be overwriting the state of the entities so your analytics will be losing some data that can be valuable - not transactionally but for the analytics purposes. Also your analytics system needs to rely on its own data store so will not be accessing the transactional data stores. There are three solutions available:

  • you have EventEnricher actors that subscribe to the events and enrich the event and publish events containing the extra state to be consumed by the analytics system. In this case, you are not eliminating the risk of missing some state transitions as it is likely that enrichment happens after two successive state changes. In most cases this is unlikely or not much of an issue as for the analytics.
  • your actors publish two events: one for the transactional consumption and the other for the use of analytics Big Data system.
  • you completely abandon the data stores and embrace Event Sourcing. For the increased complexity and the reasons explained above, we do not recommend this option for a system that its main purpose is transactional. Also, they are not fit for some type of data structures such as counters and result in very high contention; but full discussion of this topic deserves another article.

Queues

Queue features required by Reactive Cloud Actor are:

  • High Availability and Fault Tolerance
  • Support for topic-based subscription
  • Ability to lease a message and then abandon or commit it
  • Ability to implement a implement a delay queue
  • Support for long polling
  • Optional support for batching

There are already technologies such as Kafka, Azure Service Bus, RabbitMQ and ActionMQ that provide the functionalities listed above - some not all features.

Other Data Structures

Traditional software systems have usually modelled all their data as columnar rows in an RDBMS. In fact, it was not uncommon to see database tables to be used as queues - when all you have is a hammer, everything is a nail. For pure event-sourced systems, all data are events and stored in an event store - with optional snapshots. In a document database, data are stored as documents with optional indexes. Regardless of tools used, choosing the appropriate data structure for modelling the data is essential. Each data structure is optimised for a certain operation and shoehorning a piece of data into an unfitting model usually results in taking compromises.

We believe that BigTable can be used to express all seminal data structures (other than queues that is implemented by a service bus) to build a horizontally scalable - yet performant enough - transactional cloud system. It is notable that Redis implements all seminal data structures - it is unfortunate that it is not designed to provide the level consistency required for a transactional system.

All data stored in various stores need to have an identity. This is achieved by each entity to be associated by an Id. As such, every entity must implement interface below:

public interface IHaveIdentity
{
   string Id { get; }
}

In order to cater for concurrency, entities could be concurrency-aware by implementing the interface below:

public interface IConcurrencyAware
{
   string ETag { get; }
   DateTimeOffset? LastModified { get; }
}

Data Structure Stores will check whether an entity implements this interface and if so, they will check for concurrency conflict in case of updates and deletes.

We will now review these key structures below.

Key-Value

Key-value is the most basic structure where an arbitrary value (usually an uninterpreted byte array) is stored against a string key. Keys can be combined to create collections, alternatively we can prefix the key name. Normally keys cannot be iterated through which is a big drawback. Operations on Key-Value stores are atomic and can be (but often are not) concurrency-aware. However, they should provide locking with timeout and automatic cleanup.

Keyed-List

Keyed list is similar to the key-value list, however, we can store a list of key-values against a key. The list against the key can grow very large and can be iterated through or updated without having to load all the data.

Collection

Collections are key-values stored under a logical collection type. Difference with the simple key-value is that the collection can be iterated through - although this could take a long time. Keys can be used for range iteration hence we could iterate through a smaller set of data - using a range. Collection should offer concurrency-awareness (ETag and/or LastModified/Timestamp).

Counter

Counter, as the name implies, holds a number that can be atomically incremented or decremented. This is usually the missing data structure in many data stores - at the time of writing this, there is no atomic counter available on Windows Azure.

Modelling Actors

As we said above, we need to design Processor Actors and Factory Actors and forget about the Stateful Actors. So we will look a close look at each.

Processor Actor

Design

Processor Actor is where the business process gets done so its design is of utmost importance. It must be be designed to cater for all scenarios and yet inherently allow for the fault tolerance required by cloud environments where failure is relatively common.

We propose the simple interface below for Processor Actors and we believe it will cover all scenarios (C# code):

public interface IProcessorActor : IDisposable

{
   Task<IEnumerable<Event>> ProcessAsync(Event message);
}

Method above accepts an event and returns a promise to a an enumerable list of 0-many events. An actor should typically return 0 or 1 event back. In some circumstances it might return more messages. Task is a promise in C# and is used for implementing asynchronous tasks - conveniently using async/await pattern. Cloud processes are typically are IO-bound (reading/writing to queues or stores) and importance of asynchronous implementation using IO Completion Ports cannot be overemphasised.

Event types and queues

Processor Actors subscribe to one or more event types. This is normally done by creating a dedicated subscription to a topic. Sometimes a simple queue is all needed but the nature of Event-driven Architecture is that events initially created for one purpose becomes useful for other purposes too so having a topic and a default subscription is usually a better option. Having said that, sometimes we need to be explicit about an event when it is used as a breakdown of a multi-step process.

An actor typically registers to a single event type. Having said that, if the step it is implementing involves a compensating action in case of failure in the downstream steps, it is the best place to implement the compensating action. In this case the actor registers to the event(s) which is associated with the compensating action.

Process

Actors must be designed to do a single piece of work - and do it well. In case of a crash, the message will return back to the queue and will be available to other actors after the timeout period. Retry process will govern the number of times a message can be tried and the possible delay after the failure.

A Scatter-Gather process can sometimes be broken and parallel process turned into a sequential one and define multiple event types and actors for each process. However, this is not always possible because of the delay introduced to the process. As such Scatter-Gather is best implemented by having a workflow state implemented as an entity (and storing in a Collection store) containing multiple flags. The Scatter actor returns multiple events (one for each step) upon finishing every scattered process, an event is raised and an actor sets the appropriate flag. When all flags are cleared, the item can move forward in the process. It is very important to implement this workflow state as concurrency-aware.

Breaking down a business process into single steps leads to:

  • Agility and flexibility of the process to change. Steps can be easily changed and parallel branches added or removed.
  • Isolated deployments. Each actor can be deployed independently and on its own.
  • Simpler programming model and testing of the Units of work by developers.
  • Simpler fault-tolerance model for actors.
  • Improved maintainability and traceability of the code.
  • Cross-cutting concerns such as logging is centralised surrounding a single method call.

All of above leads to reducing the overall cost of the project.

There are drawbacks:

  • Increased latency of the operations as it involves more hops to and from the queues. Reactive Cloud Actors target trade latency with simplicity and Eventual Consistency. In most settings latency will still be around a few seconds for a message to move through the entire process and is well within tolerance.
  • The code will be spread over multiple implementations making it more difficult to follow the process through in code, as well as by the business stakeholders. Up-to-date diagrams of the workflow are mandatory to maintain visibility of the process - a task that needs to be owned by the governance and architecture team.
  • There is a need for detailed logging and tools show status of items as it goes through the system. Building a cloud system, you would have to be doing this anyway but the cost of such an effort should not be underestimated.

Factory Actor

Factory actor is responsible for: Creating Processor Actors and managing their lifetime Setting up queue pollers to receive messages on subscriptions Pass the incoming messages to the actors for processing: Upon successful processing of the message publish the events returned back to the message queue and then flag the message received as successful In case of a failure, abandon the incoming message or reset its lease Retry can be handled by the factory actor or configured on the queue

Factory actor is normally part of the underlying Actor framework/library and does not have to be implemented . Factory Actor will typically uses a Dependency Injection framework to initialise the Processor Actor along with its dependencies.

BeeHive is a mini-framework which is our implementation of Reactive Actors for the cloud which we will review below.

BeeHive

BeeHive is an Open Source implementation of Reactive Cloud Actors in C#. The project focuses on defining interfaces and patterns to implement a cloud-based Reactive Actor solution. Azure implementation of the queue and basic data structures is currently available and implementing interfaces for other cloud platforms and technologies requires minimal effort.

Event

An event is comprised of a string body (BeeHive uses JSON serialisation and while this could be pluggable, currently no need is felt) with its metadata:

{
   Task<IEnumerable<Event>> ProcessAsync(Event message);
}

Next code

public sealed class Event : ICloneable
{
   public DateTimeOffset Timestamp { get; set; }
   public string Id { get; private set; }
   public string Url { get; set; }
   public string ContentType { get; set; }
   public string Body { get; set; }
   public string EventType { get; set; }
   public object UnderlyingMessage { get; set; }
   public string QueueName { get; set; }
   public T GetBody<T>()
   {
      ...
   }
   public object Clone()
   {
      ...
   }
}

Actor Implementation

In order to build a reactive actor, IProcessorActor interface needs to be implemented accepting an event and returing a prormise (Task) to IEnumerable<Event>. The actor does not have to handle unexpected errors as all unhandled exceptions will be handled by the Factory Actor responsible for feeding and managing the lifetime of the actor.

The IProcessorActor interface includes IDisposable interface which allows the actor an opportunity for clean up.

Since an actor can subscribe to different types of events, it can use EventType property of the event to discover the type of event.

Actor Configuration

Actor configuration tells Factory Actor how to create, feed and manage a Processor Actor. Since the configuration of actors can be a tedious and laborious task, BeeHive provides attribute-based configuration out of box:

[ActorDescription("OrderShipped-Email")]
public class OrderShippedEmailActor : IProcessorActor
{
   public async Task<IEnumerable<Event>> ProcessAsync(Event evnt)
   {
      ...
   }
}

The event name typically includes two strings separated by a hyphen where the first is the event type name (which is the same as the name of the queue) and second is the name of the subscription. For example if we deine "OrderAccepted-PaymentProcessing" for PaymentActor, this actor will be subscribing to the PaymentProcessing subscription of OrderAccepted topic. If queue is meant to be a simple queue (and not topic-based), only the name of the event type is used.

By implementing IActorConfiguration interface you could load your actor configuration from file or database:

public interface IActorConfiguration
{
   IEnumerable<ActorDescriptor> GetDescriptors();
}

If you are using attribute-based (default) configuration, you could create an instance of configuration as below:

var config = ActorDescriptors.FromAssemblyContaining<OrderAccepted>()
      ToConfiguration();

Prismo eCommerce sample

Prismo eCommerce is a non-trivial sample implementation of a business process in BeeHive. The business process implemented starts with an order accepted and follows it through all stages of the order until the order is either cancelled or shipped. This process involves payment authorisation, fraud check, inventory check, inventory replenishment if out of stock, etc.

(Click on the image to enlarge it)

Both an in-memory and Azure implementation is included in the BeeHive samples.

Conclusion

Reactive Cloud Actor is an event-driven paradigm for building cloud applications that are made of independent actors that function purely by consuming and publishing events. Unlike imperative actors that send RPC messages to known endpoints of other actors, a reactive actor does not know which actor/actors will be consuming events published by it. A reactive actor is interested in one or more event types and accomplishes a single atomic activity simplifying the fault tolerance and compensating actions. All events are stored in horizontally scalable message brokers - and they are typically durable.

This paradigms takes an opinionated view of Hewitt's actors and by separating responsibility of each actor, defining a triad: Processor Actor (responsible for processing events as explained above), Factory Actor (manages the lifetime of Processor Actors and feeds the actors and submits the returned events) and Stateful Actor. Processor Actors are application specific and accomplish business activities while Factory Actor is implemented and provided by the framework implementing the platform. Stateful Actors are avoided and all state are stored in Highly Available cloud-based data stores.

Reactive Cloud Actors identifies four different Basic Data Structure for storing the state: Key-Value, Collections, Counters and Keyed Lists. Key-Value can be implemented by a variety of technologies but the typical implementation is S3. Google's BigTable implementations (such as DynamoDB) are capable of implementing the other 3 data structures.

BeeHive is a C# implementation of Reactive Cloud Actor that comes with a non-trivial eCommerce sample.

This article is inspired by the works of Fred George on MicroServices and Reactive Event-Driven Architecture and is humbly dedicated to him.

About the Author

Ali Kheyrollahi is a Solutions Architect, author, blogger, open source author and contributor currently working for a large eCommerce in London. He is passionate about HTTP, Web APIs, REST, DDD and conceptual modelling while staying pragmatic in solving real business problems. He has +12 years experience in the industry working for a number of blue chip companies. He has a keen interest in computer vision and machine learning and has published a few papers in the field. In his previous life, he was a medical doctor and worked for 5 years as a GP. He blogs here and is an avid twitter using handle @aliostad.

Rate this Article

Adoption
Style

BT