Key Takeaways
- Choose the appropriate persistence store for your microservices
- By providing polyglot persistence as a service, developers can focus on building great applications and not worry about tuning, tweaking, and capacity of various back ends
- Operating various persistence stores at scale involves unique challenges, but common components can simplify the process
- Netflix’s common platform drives operational excellence in managing, maintaining, and scaling persistence infrastructures (including building reliable systems on unreliable infrastructure)
Adapted from a presentation at QCon San Francisco 2017, by Roopa Tangirala, engineering manager at Netflix.
We have all worked in companies that started small, and have a monolithic app built as a single unit. That app generates a lot of data for which we pick a data store. Very quickly, the database becomes the lifeline of the company.
Since we are doing such an amazing job, growth picks up and we need to scale the monolithic app. It starts to fail under high load and runs into scaling issues. Now, we must do the right thing. We break our monolithic app into multiple microservices that have better fallback and can scale well horizontally. But we don't worry about the back-end data store; we continue to fit the microservices to the originally chosen back end.
Soon, things become complicated at our back-end tier. Our data team feels overwhelmed because they're the ones who have to manage the up time of our data store. They are trying to support all kinds of antipatterns of which the database might not be capable.
Imagine that instead of trying to make all of our microservices fit one persistence store, we leverage the strengths and features of our back-end data tier to fit our application needs. No longer do we worry about fitting our graph usage into RDBMS or trying to fit ad hoc search queries into Cassandra. Our data team can work peacefully, in a state of Zen.
Polyglot persistence powering microservices
I manage the cloud database engineering team at Netflix. I have been with Netflix for almost a decade and I have seen the company transition from being monolithic in the data center to microservices and polyglot persistence in the cloud. Netflix has embraced polyglot persistence. I will cover five use cases for it, and discuss the reasons for choosing different back-end data stores.
Being a central platform team, my team faces many challenges in providing different flavors of database as a service across all of Netflix’s microservice platforms.
About Netflix
Netflix has been leading the way for digital content since 1997. We have over 109 million subscribers in 190 countries and we are a global leader in streaming. Netflix delivers an amazing viewing experience across a wide variety of devices, and brings you great original content in the form of Stranger Things, Narcos, and many more titles.
All your interactions as a Netflix customer with the Netflix UI, all your data such as membership information or viewing history, all of the metadata that a title needs to move from script to screen, and so much more are stored in some form in one of the data stores we manage.
The Cloud Database Engineering (CDE) team at Netflix runs on the Amazon cloud, and we support a wide variety of polyglot persistence. We have Cassandra, Dynomite, EVCache, Elastic, Titan, ZooKeeper, MySQL, Amazon S3 for some datasets, and RDS.
Elasticsearch provides great search, analysis, and visualization of any dataset in any format in near real time. EVCache is a distributed in-memory caching solution based on Memcached that was open-sourced by Netflix in 2011. Cassandra is a distributed NoSQL data store that can handle large datasets and can provide high-availability, multi-region replication, and high scalability. Dynomite is a distributed Dynamo layer, again open-sourced by Netflix, that provides support for different storage engines. Currently, it supports Redis, Memcached, and RocksDB. Inspired by Cassandra, it adds sharding and replication to non-distributed datasets. Lastly, Titan is a scalable graph database that’s optimized for storing and querying graph datasets.
Let's look at the architecture, the cloud deployment, and how the datasets are persisted in Amazon Web Services (AWS). We are running in three AWS regions, which take all of the traffic. User traffic is routed to the closest region: primarily, US West 2, US East 1, and EU West 1. If there's a problem with one region, our traffic team can shift the traffic in less than seven minutes to the other two regions with minimal or no downtime. So all of our data stores need to be distributed and highly scalable.
Use case 1: CDN URL
If, like me, you’re a fan of Netflix (and love to binge-watch Stranger Things and other titles), you know you have to click the play button. From the moment you click to the time you see the video on the screen, many things happen in the background. Netflix has to look at the user authorization and licensing for the content. Netflix has a network of Open Connect Appliances (OCAs) spread all over the world. These OCAs are where Netflix stores the video bits, and the sole purpose of these appliances is to deliver the bits as quickly and efficiently as possible to your devices while we have an Amazon plane that handles the microservices and data-persistence store. This service is the one responsible for generating the URL, and from there, we can stream the movie to you.
The very first requirement for this service is to be highly available. We don't want any user experience to be compromised when you are trying to watch a movie, say, so high availability was priority number one. Next, we want tiny read and write latencies, less than one millisecond, because this service lies in the middle of the path of streaming, and we want the movie to play for you the moment you click play.
We also want high throughput per node. Although the files are pre-positioned in all of these caches, they can change based on the cache held or when Netflix introduces new movies — there are multiple dimensions along which these movie files can change. So this service receives high read as well as write throughputs. We want something where per-node throughput can be high so we can optimize.
For this particular service we used EVCache. It is a distributed caching solution that provides low latency because it is all in memory. The data model for this use case was simple: it was a simple key value, and you can easily get that data from the cache. EVCache is distributed, and we have multiple copies in different AWS Availability Zones, so we get better fault tolerance as well.
Use case 2: Playback error
Imagine that you click play to watch a movie but you get a playback error. The playback error happens whenever you click the title — it’s just not playable.
Titles have multiple characteristics and metadata. It has ratings, the genre, and the description. It has the audio languages and the subtitle languages it supports. It has the Netflix Open Connect CDN URL, discussed in the first use case, which is the location from where the movie streams to you. We call all of this metadata the “playback manifest”. And we need it to play the title for you.
There are hundreds of dimensions that can lead to a playback metadata error, and there are hundreds of dimensions that can alter the user’s playback experience. For example, some content is licensed only in specific countries and we cannot play that to you if you cross a border. Maybe a user wants to watch Narcos in Spanish. We might have to change the bit rate at which we are streaming the movie depending on your use of Wi-Fi or a fixed network. Some devices do not support 4K or HD and we have to change the stream based on the device. Beyond these few examples, there are hundreds of dimensions on which your playback experience depends.
For this service, we wanted the ability to quickly resolve incidents. We want to have someplace where we can quickly look for the cause of an issue — which dimension is not in sync, which is causing your playback error. If we have ruled out a push, we want to see if we need to roll back, or roll forward, based on the scope of the error: is the error happening in all three regions, in only specific regions, or on only a particular device? There are multiple dimensions which we need to figure out the dataset.
Another requirement was interactive dashboards. We wanted the ability to slice and dice the dataset to see the root cause of that error. Near-real-time search is important because we want to figure out whether or not a recent push has caused the problem at hand. We need ad hoc queries because there are so many dimensions; we don't know our query patterns. There may be multiple ways for us to query the dataset to arrive at what is causing the error.
We used Elasticsearch for this service. It provides great search and analysis for data in any form, and it has interactive dashboards through Kibana. We use Elasticsearch a lot at Netflix, especially for debugging and logging use cases.
Kibana provides a great UI for interactive exploration that allows us to examine the dataset to find the error. We can determine that the error is in a specific region across multiple devices, in a specific device, or confined to a particular title. Elasticsearch also supports queries such as "What are the top 10 devices across Netflix?"
Before Elasticsearch, the incident-to-resolution time was more than two hours. The process involved looking at the logs, grepping the logs, and looking at the cause of error and where there's a mismatch between the manifest and what is being streamed to you. With Elasticsearch, the resolution time decreased to under 10 minutes. That has been a great thing.
Use case 3: Viewing history
As you watch Netflix, you build what we call a “viewing history”, which is basically the titles you have been watching over the past few days. It keeps a bookmark of where you were, and you can click to resume from where you stopped. If you look at your account activity, you can see the date that you watched a particular title and you can report if there's a problem viewing a title.
For viewing history, we needed a data store that could store time series in a dataset. We needed to support a high number of writes. A lot of people are watching Netflix, which is great, so the viewing history service receives a lot of writes. Because we are deployed in three regions, we wanted cross-region replication so that if there's a problem within one region, we can shift the traffic and have the user's viewer history available in the other regions as well. Support of large datasets was important, since viewing history has been growing exponentially.
We used Cassandra for this. Cassandra is a great NoSQL distributed data store that offers multi-data-center, multi-directional replication. This works out great because Cassandra is doing the replication for us. It is highly available and highly scalable. It has great fault detection and multiple replicas, so that a node going down doesn't cause website downtime. We can define different consistency levels so that we never experience downtime, even though there are nodes that will always go down in our regions.
Data model
The data model for viewing history started simple. We have a row key, which is the customer or user ID. Each title a user watches is a column in that particular column family. When you watch, you are writing to the viewing history, and we just write a tiny payload: the latest title you watched. Viewing history grows over time, and Cassandra capably handles wide rows, so there is no problem. You can read your whole viewing history, and when you do so, you are paginating through your rows.
We quickly ran into issues with this model. The viewing history is quite popular, so the dataset is growing rapidly. A few customers have a huge viewing history, so the row becomes very wide. Even though Cassandra is great for wide rows, trying to read all of that data in memory causes heap pressures and compromises the 99th-percentile latencies.
New data model
So we have a new model, which we split into two column families. One is the live viewing history, with a similar pattern of each column being a title, so we can continue to write small payloads. And then we have a roll-up column family, which is a combination of all historical datasets that is rolled up into another, compressed column family. This means we have to do two reads, once from the compressed family and once from the live column family. This definitely helps with the size. We drastically reduced the size of the dataset because half of the data was compressed.
The roll-up happens in the path of read. When the user is trying to read from viewing history, the service knows how many columns they have read. And if the number of columns is more than whatever we think it should be, then we compress the historical data and move it to the other column family. This happens all the time based on your reads, which works out very nicely.
Use case 4: Digital-asset management
Our content platform engineering team at Netflix deals with tons of digital assets, and needed a tool to store the assets as well as the connections and relationships among these assets.
For example, we have lots of artwork, which is what you see on the website. The art can come in different formats, including JPEG, PNG, etc. We also have various categories of artwork: a movie can have art, a character can have art, and a person can have art, etc.
And each title is a combination of different things in a package. The package can include video elements, such as trailers and montages, and the video, audio, and subtitle combination. For example, we can have French in the video format with subtitles in French and Spanish. And then you have relationships, like a montage is a type of video.
We wanted a data store where we could store all of these entities as well as the relationships.
Our requirements for the digital-asset management service were one back-end plane to store the asset metadata, the relationships, and the connected datasets — and the ability to quickly search that. We used Titan, which is a distributed graph database. It's great for storing graph datasets, and it supports various storage back ends. Since we already support Cassandra and Elasticsearch, it was easy to integrate into our service.
Use case 5: Distributed delayed queues
The Netflix content platform engineering team runs a number of business processes. Rolling out a new movie, content ingestion and encoding, or uploading to the CDN are all business processes that require asynchronous orchestration between multiple microservices. Delayed queues form an integral part of this orchestration.
We want delayed queues that are distributed and highly concurrent because multiple microservices are accessing them. And we wanted at-least-once delivery semantics for the queue and a delayed queue, because there are relationships between all these microservices and we don't know when the queue will be consumed. A critical requirement was having priorities within the shard, so that we can pick up the queue with the highest priority.
For this particular service, we used Dynomite. Netflix open-sourced Dynomite some time ago. It is a pluggable data store that works with Redis, Memcached, and Rocks DB. It works for this use case because Redis has data structures that support queues very well. Early on, we tried to make queues work with Cassandra and failed miserably, running into all kinds of edge cases. Dynomite worked superbly for us in this case. And it provides multiple-data-center replication and sharding so we, as application owners, need not worry about data being replicated across regions or data centers.
Netflix maintains three sets of Redis structures for each queue. One is a sorted set that contains queue elements by score. The second is a hash set that contains the payload, and the key is the message ID. The third is a sorted set that contains messages consumed by the client, but which have yet to be acknowledged. So the third is the unacknowledged set.
Identifying the challenges
I love this quote, but I don't think my on-call team feels like this: "I expected times like this — but I never felt that they'd be so bad, so long, and so frequent."
The first challenge my team faces is the wide variety and the scale. We have so many different flavors of data store, and we have to manage and monitor all these different technologies. We need to build a team that is capable of doing all this while making sure the team has the skills to cater to all of these different technologies. Handling that variety, especially with a small team, becomes a challenge to manage.
The next challenge is predicting the future. With a combination of all of these technologies, we have thousands of clusters, tens and thousands of nodes, petabytes of data. We need to predict when our cluster risks running out of capacity. My central-platform team should know each cluster’s head room so that if the application team says they are increasing capacity or throughput or adding a new feature that causes an increase in the back-end IOPS, we should be able to tell them that their cluster is sufficient or needs to scale up.
For maintenance and upgrades across all clusters, software or hardware, we need to know whether we can perform maintenance without impacting production services. Can we build our own solution or should we buy something that’s out there?
Another challenge is monitoring. We have tens and thousands of instances, and all of these instances are sending metrics. When there's a problem, we should know which metrics make the most sense and which we should be looking at. We must maintain a high signal-to-noise ratio.
Overcoming challenges
The very first step in meeting these challenges is to have experts. We have two or three core people in our Cassandra cloud database engineering team that we call subject-matter experts. These people provide best practices and work closely with the microservice teams to understand their requirements and suggest a back-end data store. They are the ones who drive the features and best practices, as well as the product future and vision.
Everybody in the team goes on call for all of these technologies, so it's useful to have a core set of people that understand what's happening and how we can really fix the back end. Instead of building automation that applies patches on top of what is broken, we can contribute to the open source or to the back-end data tier — and produce a feature.
Next, we build intelligent systems to work for us. These systems take on all automation and remediation. They accept the alerts, look at the config, and use the latency thresholds we have for each application to make decisions, saving people from getting paged for each and every alert.
CDE Service
CDE Service helps the CDE team provide data stores as a service. Its first component captures the thresholds and SLAs. We have thousands of microservices; how do we know which service requires what 99th-percentile latency? We need a way to look at the clusters and see both the requirements and what have we promised so that we can tell if a cluster is sized effectively or needs to scale up.
Cluster metadata helps provide a global view of all the clusters: the software and kernel version each runs, its size, and the cost of managing it. The metadata helps the application team understand the cost associated with a particular back end and the data they are trying to store, and whether or not their approach makes sense.
The self-service capability of CDE Service allows application users to create clusters on their own, without the CDE team getting in the way. The users don't need to understand all the nitty-gritty details of the back-end YAML; they only need to provide minimal information. We create the cluster and make sure that it is using the right settings, it has the right version, and it has the best practices built in.
Before CDE Service, contact information only sat outside the system. For each application, we’d need to know who to contact and which team to page. It becomes tricky when you’re managing so many clusters, and having some central place to capture this metadata is crucial.
Lastly, we track maintenance windows. Some clusters can have maintenance windows at night, while others receive high traffic at the same time. We decide on an appropriate maintenance window for a cluster’s use case and traffic pattern.
Architecture
Figure 1 shows the architecture, with the datastore in the center. For the scheduler on the left, we use Jenkins, which is based on cron and which allows us to click a button to do upgrades or node replacements. Under that is CDE Service, which captures the cluster metadata and is the source of all information like SLAs, PagerDuty information, and much more. On the top is the monitoring system. At Netflix, we use Atlas, an open-source telemetry system, to capture all of the metrics. Whenever there's a problem and we cannot meet the 99th-percentile latency, the alert will go off. On the very right is the remediation system, an execution framework that runs on containers and that can execute automation.
Figure 1: CDE architecture
Anytime an alert fires, the monitoring system will send the alert to the remediation system. That system will perform automated remediation on the data store and won't even let the alert go to the CDE team. Only in situations for which we have not yet built automation will alerts come directly to us. It is in our team's best interest to build as much automation as possible, to limit the number of on-call pages we need to respond to.
SLA
Figure 2 shows the cluster view where I can look at all of my clusters. I can see what version they are running, which environment they are, which region they are in, and what are the number of nodes. This view also shows the customer email, the Cassandra version, the software version, the hardware version, the average node count, and various costs. I can also look at my oldest node, so I can see if the cluster has a very old node we need to replace, then we will just run remediations. There's a job that scans for old nodes and run terminations. In the interest of space, I have not shown many columns, but you can pick what information you want to see.
Figure 2: CDE Self Service UI
We have another UI for creating new clusters, specific to each data store. An application user needs to provide only a cluster name, email address, the amount of data they are planning to store, and the regions in which to create the cluster — then the automation kicks off the cluster creation in the background. This process makes it easy for a user to create clusters whenever they want, and since we own the infrastructure, we make sure that the cluster creation is using the right version of the data store with all of the best practices built in.
When an upgrade is running, it can be tricky to figure out what percentage of the test clusters and prod clusters have been upgraded across a fleet that numbers in the thousands. We have a self-service UI to which application teams can log in to see how far along we are in the upgrade process.
Machine learning
Earlier, I mentioned having to predict the future. Our telemetry system stores two weeks of metrics, and previous historical data is pushed to S3. We analyze this data using Kibana dashboards to predict when the cluster will run out of capacity.
We have a system called predictive analysis, which runs models to predict when a cluster will run out of capacity. The system runs in the background and pages us or notifies us on a Slack channel when it expects a cluster to exceed capacity in 90 days. With Cassandra, we only want to use a third of the storage allocation for the dataset, a third for the backups, and the last third for compactions. It is important to have monitoring in place and to have a system that warns us beforehand, not at the cusp of the problem because that leads to all kinds of issues.
Since we are dealing with stateful persistence stores, it is not easy to scale up. It’s easier with stateless services; you can do red/black or scale up the clusters with auto-scaling groups and the clusters can increase in size. But it’s tricky for persistence stores because it's all data on nodes, and the stores have to stream to multiple nodes. That's why we use predictive analysis.
Proactive maintenance
Things go down in the cloud and hardware is bound to fail. We registered to receive Amazon's notifications and we terminate the nodes in advance instead of waiting for Amazon to terminate them for us. Because we are proactive, we can do the maintenance in the window we like, as well as hardware replacements, terminations, or whatever we want to do.
For example, we don't rely on Cassandra's bootstrap ability to bring up nodes because that takes a lot of time. It takes hours and sometimes even days for clusters, like some of ours, with more than one terabyte of data per node. In those cases, we have built a process that copies the data from the node, puts it into a new node, then terminates the first node.
Upgrades
Software and hardware upgrades across all these different instances of polyglot persistence is an effort because any change to the back end can have a big impact. A problem, like a buggy version, can compromise all of your uptime. We have built a lot of confidence into our upgrades with Netflix Data Bench (NDBench), an open-sourced benchmarking tool. It is extensible so we can use it for Cassandra, Elasticsearch, or any store that we want. In the NDBench client, we specify the number of operations we want to throw at our cluster, the payload, and the data model we want. This allows application teams to test their own applications using NDBench.
When we upgrade, we look at four or five popular use cases. For example, we may try to capture 80 percent reads and 20 percent writes or 50 percent reads and 50 percent writes. We are trying, with only a few use cases, to capture the more common payloads people are using in the clusters. We run the benchmark before the upgrade, capturing the 99th-percentile and average latencies. We perform the upgrade and run the benchmark again. We compare the before and after benchmarks to see if the upgrade has introduced any regression or has caused problems that increased the latencies. This helps debug a lot of issues before they happen in production. We never upgrade when this particular comparison reveals a problem. That's the reason we are able to roll out all these upgrades behind the scenes without our application teams even realizing that we are upgrading their cluster.
Real-time health checks
We also handle health checks at the node level and cluster level. Node level is whether or not a data store is running and if we have any hardware failures. Cluster level is what one node thinks about the other nodes in the cluster.
The common approach is to use cron to poll all the nodes, then use that input to figure out whether or not the cluster is healthy. This is noisy, and will produce false positives if there are network problems from the cron system to the node or if the cron system goes down.
We moved from that poll-based system to continual, streaming health checks. We have a continual stream of fine-grained snapshots being pushed from all the instances to a central service we call Mantis, which aggregates all the data and creates a health score. If the score exceeds a certain threshold, the cluster is determined to be not healthy.
We have a few dashboards where we can see the real-time health. The macro view shows the relative sizes of the clusters with color coding to indicate if a cluster is healthy or not. Clicking on a unhealthy node will show a detailed view of the cluster and that node. Clicking on the bad instance shows details about what is causing trouble, which helps us easily debug and troubleshoot the problem.
Takeaway
The takeaway from all of this is that balance is the key to life. You cannot have all your microservices using one persistent store. At the same time, you don't want each and every microservice to use a distinct persistent store. There's always a balance, and I'm hoping with what I've covered you will find your own balance and build your own data store as a service.
About the Authors
Roopa Tangirala leads the Cloud Database Engineering team at Netflix, responsible for cloud persistent runtime stores for Netflix, ensuring data availability, durability, and scalability to meet the growing business needs. The team specializes in providing polyglot persistence as a service which includes Cassandra, ElasticSearch, Dynomite, Mysql etc.
Thomas Betts is a Principal Software Engineer at IHS Markit, with two decades of professional software development experience. His focus has always been on providing software solutions that delight his customers. He has worked in a variety of industries, including retail, finance, health care, defense and travel. Thomas lives in Denver with his wife and son, and they love hiking and otherwise exploring beautiful Colorado.