Modern enterprise apps are about everything: complex backend, rich frontend, mobile clients, traditional and NoSQL databases, Big Data, Streaming and so on. Throw those into clouds, and voila — you’ve got yourself a project worth years of development.
In this article, I will discuss a bio-informatic software as a service (SaaS) product called Chorus, which was built as a public data warehousing and analytical platform for mass spectrometry data. Other features of the product include real-time visualization of raw mass-spec data.
Let’s start the discussion with requirements.
Data sets: This includes thousands of data files, each containing several two-dimensional int/float data arrays. The size may vary from file to file, but typically it’s about 2-4 GB. All data files are supposed to be stored in a cloud storage.
Expected results: Set of X/Y points suitable for rendering the high-definition images. Processing of data file is essentially adding and finding the maximum values inside the data arrays within given boundaries. This operation consumes significant CPU time. The resulting data size depends on the request from end-users; generally it doesn’t go below 50 KB. Upper limit is about 20 MB. To produce a result set like this, the algorithm looks through the source data which is about 30-200 greater in size. So for the 100 KB result, we need to read and process about 3 to 20 MB of raw data.
High-level Requirements:
- End-users should be able to view the results using either a browser based web application or a desktop application.
- Similar data browsing performance for the geographically distributed end-users.
- Simultaneous access for 100+ end-users with an ability to scale the services horizontally to handle spikes in traffic.
- Image rendering speed must provide good user experience (UX), i.e. to be under 500 ms per image.
What we had already:
Product owner has chosen Amazon Web Services (AWS) as a cloud solution provider. We have been using Amazon S3 as an “unlimited” data storage and Amazon EC2 for hosting the servers.
Frontend application should serve the images to the end-users providing the endpoint for browser and desktop apps hosted on Amazon EC2.
Backend services consisting of business logic, rules, data access and security layer are also hosted on Amazon EC2.
Both apps are built using the JEE stack.
Metadata for the mass spec files (storage location, ownership, properties) is stored in a MySQL database instance running on top of Amazon RDS.
How to start?
Try building a straightforward solution! Sounds easy: single app on a huge single instance, consuming the incoming requests, reading the data files from Amazon S3 and processing those in parallel. However, there are some obvious issues:
- Tests have shown the mean response size is around 4 MB. For 100 of nearly simultaneous requests the size of responses reaches 4 MB * 100 = 400 MB. The source data to process is at least 30 times bigger. So we need Amazon S3 to store 30 * 400 MB ~ 12 GB. At peak usage (200 times bigger, as we know it may happen) it would be 200 * 400 MB ~ 80 GB of raw data.
- Handing the 100 concurrent requests, each being CPU intensive, is possible if you provide a comparable number of CPU threads.
- Theoretical network bandwidth “S3 to EC2 instance” is 1 GB/second, which is 125 MB/s. So even under ideal circumstances reading of 12 GB of source files takes 12 * 1024 / 125 ~ 98 seconds.
- Single instance cannot serve distributed locations equally.
- To be able to scale “horizontally” you need to have a very large instance to launch several of such applications.
Amazon EC2 provides a wide choice of instances for these requirements. However, even the biggest of those, d2.8xlarge (36 x Intel Xeon E5-2676v3, 244 GB RAM), really solves the issues 1 and 2. But the time to process (issue 3) is unacceptable since 98 seconds is much more than the accepted response time of 500 ms. Also there are issues 4 and 5, which obviously cannot be solved with a single application approach.
Elastic MapReduce
For these kind of tasks AWS provides cloud based Elastic MapReduce or EMR (Apache Hadoop modified and pre-installed to work on the AWS infrastructure). With its load balancing capabilities, most of our issues are addressed by this solution. However, there are some limitations as well.
- Time taken to start a Hadoop-job is several seconds. Which, again, is much slower than required.
- EMR cluster requires some warm up time.
- Source data should be pre-loaded to the HDFS storage from S3.
- Final result is typically put to S3 or HDFS storage, forcing us to build some additional infrastructure to deliver the result from that storage to the end-user.
In general Elastic MapReduce is perfect for the delayed data processing. But for our near real-time visualization routines it just doesn’t fit.
Apache Storm
Another framework to consider for the distributed computations is Apache Storm. It keeps the MapReduce approach benefits, but also brings near real-time processing capabilities. It was originally developed by Nathan Marz and was later acquired by Twitter. Being used to handle events from Twitter analytics, this solution is well-suited for processing large data streams on-the-fly.
Storm installation into the AWS Cloud is relatively easy: there is a set of ready-to-go scripts for bringing up all worker nodes plus Zookeeper instance to keep the topology healthy. We decided to do a proof of concept using Storm and built a prototype. However, we ran into some challenges:
- Re-configuration of Storm cluster on-the-fly was not as transparent for the engineers as we had hoped. In particular, we found that addition of new nodes and logic updates to the deployed code are 100% effective only after the whole cluster restart.
- Storm provides RPC endpoint, which seemed to be ideal for us. However, this concept implied using at least three stages for data processing:
- split work into parts,
- part processing and
- merge the parts into a single result.
- Each of the three parts are executed on a separate node. This leads to an extra binary data serialization/deserialization step.
- Integration testing was not easy. Instantiating a test cluster with target topology requires additional resources and extra time.
- A bit restrictive API (this item solely depends on what you are used to).
Proof-of-concept application built with Storm satisfied all the high-level requirements, including the satisfactory data processing speed. However, due to maintenance issues (see #1 and #3) and some extra time dedicated to serialization upon each request (see #2) we decided to not proceed further with Storm.
Elastic Beanstalk
One more option was to build a custom processing application and deploy it to the cloud using the Amazon’s Elastic Beanstalk. It looked like our final choice: set of EC2 instances to balance CPU and network load, automatic scaling, out-of-the-box metrics and health checks. But after some investigation we started to have some doubts:
- Vendor lock-in. Discussing the potential of such an approach with the product owner we found that the whole app ideally has to be available for repacking into a box solution to be deployed in the intranet infrastructure. You can easily find alternatives to Amazon EC2 and Amazon S3 among the intranet-oriented solutions (e.g. Pivotal product range). As for the Beanstalk we could find no alternative solutions.
- Scaling configuration is not flexible enough. Our stats on user activity clearly showed spikes in the beginning of each day (9 AM). But with BeanStalk we were unable to pre-scale the cluster on schedule. [Amazon has recently released this feature].
- Beanstalk comes with Amazon SQS as a message delivery service. According to our experience with SQS SDK in past, it had some issues. In particular, at that time we encountered random failures of CreateQueue and SendMessage requests, so had to introduce retry policies. Also, we had to use custom serializers and deserializers to ensure the serialized message did not exceed the SQS message size limit. [Again, AWS have just announced they have increased the payload size limit to 2GB]
- Complex deployment procedure — EBS requires specific bundles to be created for Tomcat-based and Java SE-based components. Each bundle in turn should have a configuration for the environment per each AWS service to be used. Given the number of AWS services to be used for dev, staging and production deployments, it turned into a configuration hell, which is very easy to break by a random mistake.
Issue #1 in our requirements made this option not a good choice. In addition, taking into account the issues #2, #3 and #4 we finally rejected Beanstalk. However, as Amazon continues its development and increases stability, we will definitely consider it next time.
Custom solution
Looking through the available third-party solutions, we have made it to the end of the list. Finally, it was clear for us that whatever we build we’ll have to do that from scratch. Besides, all the experience we earned investigating and building prototypes has given us an opportunity for a good understanding and confidence in developing a custom solution.
Proposed architecture
Figure 1 below shows the architectural concept of our current solution. It is split into the main AWS services we have built the application upon: EC2, RDS and S3.
Figure 1. Current architecture
User Interface
Requests from end-users are sent to the frontend application. It is built with Java 8 on top of the Spring MVC framework on the server-side with HTML, CSS and AngularJS served to interact with the user in browser. It is now being deployed into Tomcat.
The incoming requests are validated to be in conformance with the request format and passed to the backend services. Each request is eventually returned to the frontend as an image or as a set of X/Y points in case the client has to render the image.
In future it makes sense to add LRU-caching to store results for the repeated calls (happens when end-user does zoom-in and zoom-out). In that case the maximum lifetime of each cached result needs to be set to the duration of the user session.
Business Services
The backend app holds and executes all the business-related checks and flows. It is built using Java 8, Spring libraries (MVC, WS, Security, AMQP) and deployed on a Tomcat instance.
As with the user interface application, we are trying to keep the business services app as simple as possible to keep the high testability and avoid memory issues. In case a part of the applications starts to look like a separate service or needs to be scaled out, it immediately gets split into a separate worker instances similar to those used to process the image rendering requests (see more on the worker nodes below).
For each request passed to it, the backend app performs the following actions:
- validate the incoming request and checks it for being in accordance with the security settings and data access policy,
- determine which raw data files are targets for the given request and create a general task for the request (processing result for this task will be later returned back to the frontend app)
- split the task into sub-tasks, taking into account the bounds set by user and data layout on S3 (it’s extremely important to do the split well to avoid repeated I/O calls),
- put the sub-tasks into a messaging queue powered by RabbitMQ,
- waits for all the sub-tasks to be returned as partial results,
- join the partial results together,
- render the image if the request from the client says to do so,
- return the result to the frontend app.
Processing of the sub-tasks is done by sending them as messages into the queue in RPC style. We create an application-scoped thread pool with a controllable size for that each thread is responsible for performing a single subtask. This way, it’s still possible to keep track of the general task context (outside of a thread pool) and at the same time perform the processing of subtasks in parallel.
Besides, as it is possible to tweak the pool size on the fly, we can have our own metrics, i.e. number of concurrent sub-tasks and messaging rates. We also know the approximate size of the expected result per subtask and per general task. Thus it gives the perfect chance to scale properly.
Figure 2 below displays the typical flow of the single rendering request sent from the user interface layer.
Figure 2. Typical request flow
To make this subsystem work reliably, we need to keep track of:
- “Time to handle a single subtask” divided by “total number of already enqueued subtasks-messages”.
- Prioritization of the messages in a queue, so that the subtasks related to a certain task are handled before the subtasks of the next task.
- Total number of general tasks handled at any moment should not be very large. Otherwise JVM heap gets overflowed by the partial results kept in memory.
Items #2 and #3 are addressed by manipulating the thread pool size and changing the priority of messages sent to RabbitMQ broker. In case the mean time to process the subtask (issue #1) is changed, we need to do something from another side of the queue, i.e. if it goes up, it’s time to increase the bandwidth of the queue itself by adding more consumers. And if it goes down, we can stop some consumers to avoid extra hosting expenses.
Worker nodes
The consumers of the RabbitMQ queue are standalone Java applications called workers. Each worker is located on a dedicated EC2 instance and configured to use the RAM, network and CPU resources most effectively.
Subtasks posted by the backend are consumed by a random worker. Each subtask is self-contained, meaning that no extra business context is required to handle it. That’s why each worker runs independently from the others.
An important feature of Amazon S3 used on a worker is the random access to any object stored in the data store. Instead of downloading the whole file, say, 500MB in size, we can read exactly the part we need for the current processing. End-users typically set some bounds in their requests; then the task is split into pieces, and finally each subtask area represents a small part of the data file. So the savings are huge. Without this kind of feature we would have I/O load hundreds of times heavier.
In case of any runtime error on a certain worker (e.g. out of memory) the subtask is automatically returned to the queue and is sent to another worker. To make the worker app even more stable, it is restarted by the cron job by schedule. This lets us avoid any memory leak issues or JVM heap overflow.
Scaling
There are several reasons to change the number of application worker nodes:
- Mean time to process a subtask goes up leading to slow responses for end-users and failing to satisfy 500 ms per response requirement
- No proper utilization of worker nodes
- CPU or RAM usage overload on the backend
We have been using EC2 API to address issues #1 and #2. In particular, we have created a separate scale module to operate worker instances. Each new worker node is created based upon the pre-configured OS image (Amazon Machine Image, AMI). We use spot instance requests to initialize it, which gives us about 5-times lower hosting price.
The downside of this approach is that it takes time to start the node. It takes an instance about 4-5 minutes to become operational from the time of spot request submission. By that time the load spike may have already gone, and thus there can be no more need to increase the number of nodes.
We are using load statistics we have been collecting, to improve the EC2 node usage. In particular, we are pre-instantiating more workers beforehand in time for 9 AM across the US (especially, US West Coast). The statistics are not perfect though, so sometimes we don’t pick the correct number of the instances. If we notice over- and under- estimation of the instances, it’s changed during the following 4-5 minutes.
Issue #3 is not solved yet. It’s definitely the most problematic area for us at the moment. Tight coupling of access control (a), business rules on data layout and task assembly (b) and post-processing of the subtasks (c) should be split into several micro-services.
However, as a matter of fact, current post-processing of subtasks (see “c” above) is basically just System.arrayCopy(...) calls, and total size of in-memory data (requests + intermediate subtask results) on a single backend instance has never exceeded 1 GB which is more than acceptable in terms of JVM heap size. So issue #3 has not hit us that bad yet.
Deployment
Any changes to the application are verified through several steps of the testing pipeline:
- Unit testing: Each commit to the code repository triggers the TeamCity build. We run all of our unit tests during each build.
- Integration testing: Once a day (sometimes, not so often) TeamCity is running several builds checking the cross-module integration. We are using the prepared data files along with expected reference processing results. As the feature set grows, we append more data files and more use cases to these tests.
- Regression Testing: For UI changes, we are using Selenium WebDriver to automate the regression testing. As for the new features, they are tested manually by the QA team.
As for the real-time visualization module, the changes are mostly related to new data type support and performance updates. Therefore, a successful pass of unit- and integration- tests is enough in 99% of cases.
After each successful build from the “production” branch, TeamCity publishes the artifacts ready for deployment. They are basically JARs, WARs and configuration scripts (to control the app launch parameters). Each EC2 instance executes the boot script built into the AMI and grabs the latest released production artifacts it needs.
So all we need to do for the new version deployment is to ensure that production build is successful and hit the “magic” button to reboot the instances.
In addition, we are able to perform A/B testing by executing builds from different branches and splitting the user requests into different sets of worker instances. It can be easily achieved by modifying the startup scripts and creating separate RabbitMQ queue set.
In particular, once we introduced the “averaged data” charts feature, it was first released from a separate feature branch. What we did was to provide the end-users an ability to see averaged chart based on the groups the original files were divided into. As it required a bit more computing power to form an answer, we had to understand how the processing time would increase in the real world use case. To do that, we have set up a separate RabbitMQ queue just to serve requests from the beta tester user group. The workers deployed from the branch were subscribed to this queue only. Thus, we were able to measure the response times and understand whether the performance met the original requirements.
Lessons Learned
- Study your data and learn its structure. Create random access to any data part within the minimal time.
- Use spot requests at EC2 to cut hosting expenses.
- Build prototype apps before actually deciding on which technologies or frameworks to use in your applications.
Conclusion
In this article, I described our approach to the typical use case in Big Data world of processing large data sets in a reasonable time and display the images to the end users. The application is continuing to grow, becoming more and more functional each day.
About the Author
Oleksii Tymchenko is a technical lead and development manager at TeamDev Ltd. Starting from 2012 he is also a lead engineer for InfoClinika, a company providing SaaS platform solutions in bioscience domain. Being involved into design and development of enterprise systems for more than 10 years, he now finds his passion in building high-performance cloud and big data solutions.