A data system is designed to store data. It is also designed to derive information from stored data. The desired information may be the data itself, or it can be computed from stored data. Data systems are very important and often outlive applications that are built around them. The same Oracle or SQL Server data store that powered rich client applications in the ’90s, stores data for today’s state-of-the-art single page applications. Data systems do not change as often as the surrounding application stack. It is therefore important to design data systems for the long term.
Recently, there has been a virtual explosion in the amount of data being produced, stored, and analyzed. As a consequence, data systems are subject to much more stress and are becoming more complex.
Lambda Architecture proposes a simpler, elegant paradigm that is designed to tame complexity while being able to store and effectively process large amounts of data. The Lambda Architecture was originally presented by Nathan Marz, who is well known in the big data community for his work on the Storm project.
In this article, we will present the motivation behind the Lambda Architecture, review its structure, and end with a working sample. For further details on the Lambda Architecture, readers are advised to refer to Nathan Marz’s upcoming book Big Data.
What ails current data systems?
For decades, data stores have been more or less synonymous with relational database systems (RDBMS). In recent years, there has been an increased adoption of NoSQL databases, primarily because of their advantage in scaling.
Relational and NoSQL databases have their advantages and indeed power most business applications today. That said, we contend that there are certain fundamental flaws with the current implementation of data systems. Some of these flaws include the following:
Current systems are not resilient: Current database systems (relational and NoSQL) are not designed to be resilient. Most current data systems support create, read, update, and delete (CRUD) operations. Of these, update and delete have immense potential to cause data corruption. With current data systems, it is too easy to delete an entire table of data when we intended to simply delete a single row. It is also easy for a software bug or hardware failure to corrupt data.
Conflation of queries and data: With current database systems, query processing is closely tied to data storage. While data storage best practices call for normalization of data, querying often benefits from de-normalization. This leads to an uneasy trade-off that must be made in every system—should one optimize for query performance or data storage?
Consequences of scaling—the CAP theorem: The CAP theorem states that it is impossible for a distributed computer system to simultaneously provide all three of the following guarantees:
- Consistency - all nodes see the same data at the same time.
- Availability - a guarantee that every request receives a response about whether it was successful or not.
- Partition tolerance - the system continues to operate despite arbitrary message loss or partial system failure.
Another way of looking at the CAP theorem is to simply assume that partition tolerance is required (given the need to store large amounts of data) and non-negotiable for most modern data systems. Once we fix partition tolerance, we can have only one of the other two aspects: availability or consistency.
Relational databases favor consistency over availability, while NoSQL databases often favor availability over consistency, with some offering a tunable system that can fine-tune behavior based on application needs. This is a difficult trade-off and one that poses practical difficulties that affect the entire system. If we favor availability, we risk having different nodes with different data. If we favor consistency there could be serious performance penalties, especially in cases where nodes are connected over high-latency connections.
Lambda Architecture
Lambda Architecture is designed to perform better in all of the problem areas that we have outlined. The Lambda Architecture specifies a data store that is immutable. An immutable data store essentially eliminates the update and delete aspects of CRUD, allowing only the creation and reading of data records.
At first glance, this seems like a major hurdle. How can we have a functional data store without the ability to update and delete data? On deeper analysis it will be clear that the changes are not as problematic. An immutable model will simply track and record each fact in sequence. This allows the same information to be gathered from an immutable data store as from a mutable data store, simply by aggregating facts stored over a period of time.
Consider the following example, implemented with a mutable data store and an immutable data store. Both are shown without normalization for easier review.
Mutable data store—initial scenario
Customer ID | Customer Name | Preferred Shipper ID | Shipper Name |
1 | Alfred | 1001 | UPS |
2 | Annie | 1008 | Federal Express |
Mutable data store—after update
Customer 2 now prefers DHL.
Customer ID | Customer Name | Preferred Shipper ID | Shipper Name |
1 | Alfred | 1001 | UPS |
2 | Annie | 1005 | DHL |
Immutable data store—initial scenario
Notice that each fact is recorded with a time stamp. Each fact as recorded remains true for all time.
Customer ID | Customer Name | Preferred Shipper ID | Shipper Name | Time Stamp |
1 | Alfred | 1001 | UPS | 1391123230 |
2 | Annie | 1008 | Federal Express | 1391423650 |
Immutable data store—after update
Customer 2 now prefers DHL. An additional record is appended to reflect this new fact. The new record contains an updated time stamp.
Customer ID | Customer Name | Preferred Shipper ID | Shipper Name | Time Stamp |
1 | Alfred | 1001 | UPS | 1391123230 |
2 | Annie | 1008 | Federal Express | 1391423650 |
2 | Annie | 1005 | DHL | 1391423769 |
Notice that existing data has not changed in any manner. Now consider the following query:
Which shipper should we use for an order placed by Customer ID 2?
With a mutable data store, we can directly look up this information and provide a response.
The immutable data store cannot directly look up this information since there are multiple facts related to this request. The system does have all the facts required to provide this information though. All it needs to do at a very basic level is examine records stored for this Customer ID and obtain the shipper stored in the record with the most recent time stamp.
Queries as a function of all data
An immutable data store that records facts as shown in the example can provide the same query responses as a mutable data store. It is just the processing details that differ. In general, a query result can be seen as some function of all data stored in the system.
Query results => function (all data stored)
Immutable data store—implementation
In practice, such a storage and processing system maps quite well to the Hadoop Distributed File System (HDFS) for storage and to MapReduce for processing. With HDFS, it is possible to store an arbitrary amount of data in a scalable fashion. With MapReduce, we can process this data. MapReduce can implement any arbitrary function that takes the stored data as input and operate on the data in a scalable manner.
Need for speed—pre-computed views
The downside of an immutable data store is that batch processing is not real-time. While we expect that there will be improvements in the speed of batch processing systems, it is also true that data will continue to skyrocket in volume. It is safe to assume that batch processes are not real-time, at least for now. Applications need to be able to access data quickly. They cannot wait for a batch process to complete. Consequently, we need a layer that contains pre-computed values produced by the batch process.
Architecture Model
Pre-computed batch layer views—implementation
The data store serving pre-computed views needs to be easily writable from a batch process. It does not, however, need to support random writes. It just needs to support random reads. This makes such a data store dramatically easier to implement than a full-fledged relational data store. ElephantDB is an example.
What about the time between batches?
With pre-computed batch views, we should be able to service most application needs. However, the batch process producing these views does take some time to run. During this period of time, additional data may be coming in. This data is not included in the process we have described so far. To account for this data, we need a parallel layer that can process additional data as it comes in. The Lambda Architecture provides for this aspect, and terms it the “real-time” layer.
Real-time layer
The real-time layer is designed to calculate query results on top of an incoming stream of data. Results, once computed, should be stored in such a manner that they can be queried by applications. Just as with the batch layer, the real-time layer also stores results as they are computed into a view.
The real-time layer itself can certainly be implemented to the specific needs of a project. The Storm project, originally created by Nathan Marz, is also an excellent solution for this need.
Pre-computed real-time views
The data store that implements the real-time view needs to support random writes. Consequently, such data stores need to be significantly more complicated (since they need to support random writes) than the view layer for the batch process. NoSQL data stores such as Apache Cassandra and Redis are well suited for serving in this capacity.
Final results
The application will query both the real-time and batch views, and aggregate results.
Figure 1 below shows the Lambda architecture model.
The overall structure of the Lambda Architecture
Evaluation of the Lambda Architecture
Let us now evaluate how the Lambda Architecture performs in the three main problem areas we discussed earlier.
Resilience
The batch layer fares well in this aspect. If implemented correctly, it is difficult for human errors or hardware faults to corrupt data stored in the system since the system does not allow update or delete operations in existing data.
The real-time layer, though, is susceptible to errors. There is potential for data corruption and loss in this area since the data stores being used are mutable.
The benefit is that even if the real-time layer fails, no data will be lost. As long as incoming writes are being propagated to the batch storage layer, the results will eventually catch up when the next batch job runs. So, while results may be out of date if the real-time layer fails, the data of records in the batch layer will not be corrupted. Results will eventually get back in sync when the real-time layer is back online.
Conflation of queries and data
Under the Lambda Architecture, we can store data in a completely normalized manner in the batch data store. We can also de-normalize as needed for the real-time and batch views. Data that services application queries is not related to the long-term storage structure in the batch layer. This allows us to fine-tune each layer as needed.
Consequences of scaling—the CAP theorem
Under the Lambda Architecture, results from the batch layer are always eventually consistent. As soon as a fresh batch update is completed, results from the batch layer are consistent. We must choose whether the real-time layer will be consistent or available. As stated previously, this is where most of the system complexity is isolated, and it is best to choose a system that is well suited for your specific needs.
Other aspects to consider
These topics are not related to the core architecture, but are important to consider.
Batch layer—well suited for analytics
It is worth noting that the immutable data store is very valuable for analytics. Such a data store can be seen as a complete record of everything that happens in a system. When data is stored in a mutable data store, potentially valuable information that can be used for analytics is lost each time a record is updated or deleted.
Complexity of algorithms
We should note that with some use cases, the complexity of algorithms that deal with the real-time layer is likely to be significantly higher than those in the batch layer. Readers can review web material on incremental and approximate algorithms for a better understanding of the issues involved.
Schemas
Schemas are valuable. Much like compile-time checking in statically typed languages, they are used to ensure that data being stored is valid for the context.
The implementation of schemas in real-world systems has left much to be desired. Schemas are often hard to change—think of the work involved in making schema changes to a relational model that has already been deployed. Schemas are often quite restrictive. For instance, there is no easy way to specify nested objects, nested collections, and such, though doing so will make our data model much simpler to understand and use. Schema systems also often involve substantial configuration.
These and other issues have caused many to start moving away from schemas to a schema-less core model. The Lambda Architecture strongly advocates against this. It recommends storing data in the batch layer using a schema that can adapt to changes over the life of the data system.
Schema libraries such as Apache Avro, Apache Thrift, or Google’s Protocol Buffers can be used for this purpose. These libraries offer a schema system that is simple to use and maintain. These can be used from most commonly used programming languages such as Java, C# and Python.
Sample application
I have prepared a sample application that illustrates key aspects of the Lambda Architecture. The following sections explain the structure of the sample application:
Use case
- The application receives streaming data from a single pressure sensor named “SensorZ”.
- A set of values is already available in the batch layer (stored in a single file).
- The client application has to display the top ten pressure values recorded across the dataset (the batch layer and the real-time layer) at any time.
The sample is implemented in Java with the exception of a Pig Latin script used for running MapReduce on the batch layer.
System requirements
The sample application has the following dependencies:
- Oracle JDK 1.7 (64-bit version)
- NetBeans 7.4
- Apache Maven—the version included with NetBeans 7.4
- Storm 0.9
- Apache Zookeeper (required by Storm)
- Apache Pig 0.12
- Redis 2.8.4
- Apache Avro
Figure 2 below illustrates the sample application data flow diagram.
Sample application data flow
Sample source code
The complete sample code is available here.
Application description and walk-through
There are four applications in the code sample:
- Data generator— located in generatedata folder.
- MapReduce script and batch layer file—the batch folder.
- Real-time processing system implemented using Storm—the realtime folder.
- Client application—the query folder.
Batch layer data generator
The application named generatedata generates random pressure readings and stores them in a regular file. This data forms the batch layer in the application architecture. Data is serialized using the Apache Avro schema system. A pre-generated file named pressure-data.avro is stored under the batch folder.
Calculation of top ten values from batch data
We take the Avro-formatted data that is stored in the file created by the data generator and compute the top ten pressure values using a Pig Latin script (calculate-max.pig). This script can be run in local mode or on a Hadoop cluster. The script will write the top ten values to a plain CSV file. In our sample, this CSV file acts as the precomputed batch view.
Process batch layer using MapReduce
values = load 'pressure-data.avro' using AvroStorage('{ "namespace":"com.syncfusion.stats", "type": "record", "name":"PressureReading", "fields":[ {"name": "name", "type":"string"}, {"name": "value", "type":"double"}, {"name": "date", "type":"long"} ] }'); sortedValues = ORDER values by value DESC; top10Values = LIMIT sortedValues 10; STORE top10Values INTO 'output' USING PigStorage(',');
Real-time layer
The sample includes code for deploying a Storm cluster that generates and processes pressure readings in real-time. The Storm cluster will monitor readings in real-time, and maintain the top ten values for the real-time data. These values are periodically stored in Redis, which acts as the storage system for the real-time view.
The Storm code has three essential components:
The spout component is responsible for servicing a stream of data and providing a stream of tuples for further processing. In real-world scenarios, a spout may read data from a queue or similar system. In our application, we simply simulate random data as shown in the following sample code:
Spout producing random pressure readings
public void nextTuple() { Utils.sleep(100); _collector.emit(new Values(Configuration.SENSOR_NAME, Common.getrandomDouble(), new Date().getTime())); }
In a Storm system, tuples produced by spouts can be processed downstream by bolts. Bolts can then choose to emit further streams of tuples which can then be processed, and so on.
In our case, we have a single bolt downstream (MaxTrackerBolt). For each incoming tuple, the execute method is called on the bolt that is configured to receive the tuple. This bolt stores incoming data into a sorted set. The sorted set is implemented using a queue that stores ten values at a time and removes the lowest value with each add operation.
Process data and maintain top ten values
public void execute(Tuple tuple) { … String sensor = tuple.getString(0); double value = tuple.getDouble(1); long time = tuple.getLong(2); PressureReading pressureReading = new PressureReading(sensor, value, time); this.addValue(pressureReading);
Please note that we use a Storm grouping named “Fields Grouping” that ensures all values with the same sensor name are routed to the same bolt. In our sample data, we produce only one sensor name. This system should be quite simple to extend to multiple named sensors. Also, note that in order to keep things simple, we do not use multiple layers of bolts. To scale well, it is important to consider using an intermediate layer that can aggregate tuples in parallel with a final consolidation layer aggregating all data from a sensor.
Storm supports the notion of a “tick” tuple. The system can be configured to send a tick tuple every x seconds. When a tick tuple is received, we react to it in the bolt’s execute code by storing, in Redis, sorted data values that have been gathered so far.
When we move data to Redis, we use the lpush method and store values under a predefined key. We then remove values that are left over from the last push using ltrim method, which retains data from the specified index range and removes everything else. At the end of this operation, only the top ten values last updated by the real-time layer are stored in Redis.
Update real-time view
public void execute(Tuple tuple) { if (isTickTuple(tuple)) { // write to redis when we receive a tick PressureReading[] pressureReadingMaxValues = new PressureReading[queue.size()]; queue.toArray(pressureReadingMaxValues); Jedis jedis = new Jedis(Configuration.REDIS_SERVER); if (pressureReadingMaxValues != null && pressureReadingMaxValues.length > 0) { Arrays.sort(pressureReadingMaxValues); for (PressureReading v : pressureReadingMaxValues) { try { jedis.lpush(Configuration.REDIS_CACHE_KEY, PressureReadingHelpers.serializeToString(v)); } catch (IOException ex) { Logger.getLogger(MaxTrackerBolt.class.getName()).log(Level.SEVERE, null, ex); } } jedis.ltrim(Configuration.REDIS_CACHE_KEY, 0, pressureReadingMaxValues.length - 1); } }
Query
The query application reads data from the text file where the batch layer stored its results, and also reads data from Redis. It combines and then sorts the data. From the sorted data obtained by combining the real-time and batch views, it outputs the top ten values from the combined array.
Gather results from the real-time view stored in Redis
// get near real-time data from redis // the running storm topology updates these results in redis Jedis jedis = new Jedis( Configuration.REDIS_SERVER); List<String> stored = jedis.lrange(Configuration.REDIS_CACHE_KEY, 0, Configuration.MAXVALUES - 1); for (String s : stored) { PressureReading pressureReading = PressureReadingHelpers.deserializeFromString(s); values.add(pressureReading); }
Gather results from the batch layer view stored in a text file
// get serialized data output from the batch process List<String> lines = Files.readAllLines(Paths.get(BATCH_RESULTS_FILENAME), Charset.defaultCharset()); for (String line : lines) { PressureReading pressureReading = PressureReadingHelpers.fromCSV(line); values.add(pressureReading); } }
Sort combined results
private static void sortResults(ArrayList<PressureReading> values) { Collections.sort(values, new Comparator<PressureReading>() { @Override public int compare(PressureReading reading1, PressureReading reading2) { return reading2.getValue().compareTo(reading1.getValue()); } }); }
Display top ten results
List<PressureReading> finalValues = values.subList(0, Configuration.MAXVALUES - 1); SimpleDateFormat ft = new SimpleDateFormat("E yyyy.MM.dd 'at' hh:mm:ss a zzz"); for (PressureReading reading : finalValues) { String text = String.format("Sensor - %s, Pressure - %f%n, Date - %s", reading.getName(), reading.getValue(), ft.format(reading.getDate())); System.out.println(text); }
Summary
We hope that you now have a good understanding of the Lambda Architecture. As with any system, it is important to understand the model and then tweak it for your specific needs. We strongly believe that data systems structured in accordance with the Lambda Architecture will better stand the test of time. We hope that your data systems benefit from the consideration of such an architecture.
About the author
Daniel Jebaraj is the vice president of development at Syncfusion, Inc. Syncfusion is a leading provider of enterprise software frameworks and solutions. Syncfusion’s big data solutions team helps customers deliver end-to-end, cost effective big data solutions that are designed and built for the long term. For more details, visit this site.