Transcript
Podila: A slowdown in request processing can eventually make your service unavailable. Chances are, not all requests need to be processed right away. Some of them just need an acknowledgement of receipt. Have you asked yourself, would I benefit from asynchronous processing of requests? If so, how do I make such a change in a live, large scale mission critical system? My name is Sharma Podila.
Outline
I'm going to talk about how we migrated a user facing system from synchronous request-response based system to an asynchronous one. I'm going to talk about what motivated us to embark on this journey. What system design changes we made. Most interestingly, what were the challenges in this process, and what design choices we had, what tradeoffs we made. I'm going to touch upon the validation process we used and how we rolled it out.
Netflix Streaming For Over 200 Million Members
Netflix is available to over 200 million members worldwide. Members watch TV shows, documentaries, movies on a variety of devices. When they come to Netflix, they are given a variety of choice through our personalized recommendations. You press play, sit back, and enjoy watching the movie. While the movie plays during the playback, we collect a lot of data, for both operational and analytical use cases. Some of these data drives our product features like viewing history, and continue watching, which lets our members stop a movie in between and come back to it to continue watching from that point on, on any other device later. The data also feeds personalization and recommendations engines, and the core business analytics. I'm going to talk about our experience migrating one of the product features, viewing history, which lets members see their past viewing activity and optionally hide it. I'll be talking about making available global materialized views in a very low latency near real time fashion.
Existing Architecture
What motivated us to embark on this journey? Let's look at our existing system at the time. At a high level, we have Netflix client on devices such as mobile phones, computers, laptops, TVs, that is sending data during playback into the Netflix cloud. First it reaches the gateway service. From there it goes to playback API. Playback API manages the lifecycle of the playback sessions. In addition, it sends the playback data into the request processor layer. Within request processor, among other things, it is storing both short term and long term viewing data into persistence, which is Cassandra for us, and also into a caching layer, which is EVCache, which lets us do really quick lookups. Most of the time, this system is working absolutely fine. Once in a rare while, it is possible that an individual request being processed is slowed down because of a network blip, or maybe one of the Cassandra nodes slowed down for a brief time. When that happens, since it is a synchronous processing, the request processing thread in the request processor layer has to wait. Then this in turn slows down the upstream playback API service, which in turn slows down the gateway service itself. Beyond a few retry strategies within the cloud, the slowdown can hit the Netflix client that's running on a memory device. Sometimes this is referred to as the backpressure. Backpressure can manifest itself as unavailability in your system, and can build up a queue of items that the client may have to retry. Some of this data is very critical to what we do and we want to avoid any data loss if the clients were to run out of the queue, for example.
Data Processing Stages
Before I go into what changes we made, you recognize that if we were to abstract things out, we are following a few steps that are part of a generic data processing pipeline. We ingest, enrich, process, store, and serve. There's business analytics, data warehouse. There's also personalization recommendation engines into which we serve this data. The rest of the talk is going to focus on these layers, from ingest to store.
New Async Architecture
What changes did we make? Between the playback API service and the request processor, we introduced a durable queue. Now when the request comes in, it's put into the durable queue, and immediately acknowledged. There is no need to wait for that request to be processed. It turns out, Apache Kafka fits this use case pretty well. Kafka presents a log abstraction to which the producers like playback API can append to, and then multiple consumers can then read from the Kafka logs at their own pace using offsets, for example. This sounds simple. We introduced Apache Kafka in between two of our processing layers, and can we call it done? Not quite. We were operating at a scale at an approximate order of magnitude of 1 million events per second. At that scale, you start to hit upon a few challenges in asynchronous processing.
Challenges in Async Processing - Data Loss
I'm going to touch upon data loss, processing latencies, out of order and duplicate records, consumer platform choice. We still have to manage intermittent processing failures. Then let's touch upon cross region aspects as well. When we think about data loss, there's two aspects that I wanted to talk about. One is, if the Kafka cluster itself were to be unavailable, of course, you might lose data. One simple way to address that would be to add an additional standby cluster. If the primary cluster were to be unavailable due to unforeseen reasons, then the publisher playback API here, could then publish into this standby cluster. The consumer request processor in this case can connect to both Kafka clusters and therefore not miss any data. Obviously, the tradeoff here is additional cost. For a certain kind of data, this makes sense. Does all data require this? Fortunately not. We categorize our data for playback into two, and a critical data gets this treatment with an additional cost with a standby cluster. The other less critical data gets a normal single Kafka cluster. Kafka itself is highly available. Within Kafka, it employs multiple strategies to improve availability. In general, it works fine.
Another aspect to data loss is what's happening at publish time. Playback API gets a request, it is publishing a record into Kafka. There's a choice here. Kafka has multiple partitions to increase scalability. Each partition is served by an ensemble of servers called brokers. One of them is elected as the leader. When you are writing into a partition or publishing into a partition, you are dealing with the leader broker. There's a choice to say, do I wait for the leader to acknowledge that the item has actually been persisted into durable storage, or do I also wait for the follower brokers to acknowledge that they have written into persistence as well? If you're dealing with critical data, it will make sense that, yes, you do want to wait for acknowledgement for all three of them. Turns out, at a large scale, this has implications beyond just the cost of waiting for multiple writes.
If you were to lose the leader broker, how do you handle that? We're dealing with a cloud native large scale distributed system, and failures can and will happen. In fact, it did happen for us when we deployed this within a couple of months. If the leader broker were to become unavailable, or actually any broker were to be unavailable, and if you were waiting for acknowledgment from all of them, obviously your processing is going to slow down. That slowdown again causes backpressure and unavailability, which we're trying to avoid. If we were to get acknowledgment just from one, which is the leader broker, there's an interesting case. What if you were to then lose the leader broker later? Leader election will come up with a different leader. However, if the item that was acknowledged by the leader was not completely replicated into the other brokers, then doing such an election of the leader, sometimes referred to as the unclean broker leader election, could make you lose data, which is what we're trying to avoid.
How did we handle the situation? Again, there's a tradeoff here to make. We have a producer library that is a wrapper on top of the Kafka producer client. There's two optimizations that are relevant here. One is that because we use non-keyed partitions, the library is able to write to a partition. If that partition were to be unavailable because the broker is unavailable, then it automatically writes to a different partition, because it's non-keyed partitioning strategy for us. Also, if the partition is on an under-replicated set of brokers, so that is the leader broker has more items than the follower leaders, and the replication has not caught up completely, then our library picks a different partition that is more well replicated.
With these strategies, we eventually ended up choosing to write in asynchronous mode, where the publisher writes it into an in-memory queue and asynchronously publishes into Kafka. This helps scale performance, but we were interested in making sure we have an upper bound on what is the worst case data loss we would incur if multiple errors are all happening at the same time. We were happy with the upper bound we were able to configure based on the in-memory queue size, and the strategy of avoiding under-replicated partitions. We monitor this data durability, and we consistently get four or five nines from it, which is acceptable for us. If your application must not lose any items of data, then you may want to pick acknowledgment from all brokers before you call that item processed. That would suit well for you.
Processing Latencies
Processing latencies are interesting in the sense that if you were to have a good idea on the peak traffic you're going to get, chances are you can figure out the number of consumer processing nodes you need in your system. You configure it once, and since you can handle the peak, it's all good. It's simple. That's a good situation to be in. For us, the traffic changes across the day, across the day of the week as well. We see a 5x change from peak to trough of our traffic. Because of such a big volume change, we wanted to be more efficient with our resources, and we chose to autoscale. Specifically, we add or remove a certain number of consumer processing nodes based on the traffic. There's a tradeoff to make there as well. Let's look at that.
Whenever you change the number of consumers, there is a rebalance that happens within Kafka. All of the partitions are rebalanced across the new number of consumers. The tradeoff here is resource efficiency versus paying the price of a rebalance. Rebalance can affect you in different ways. If your processing is stateful, then you would have to do something a little bit complex, as in, you get a signal for rebalance, or you pause processing. You take any in-memory state, and you checkpoint that along with the Kafka offset until which you have processed. You've had the rebalance happen, after the rebalance, you reload the checkpointed data, and then you start processing from the checkpointed offset. If your processing is a little simpler, or if you are storing state in an external fashion, then it is possible for you to let the rebalance happen, and just continue normally. What's going to happen here is that since you may have had items that were in process when the rebalance starts, and have not been acknowledged into Kafka, those items would show up on another processing node, because that node now gets that partition after the rebalance. In the worst case, you are going to reprocess some number of items. If your processing is idempotent, it's not a problem, or if you have other ways of dealing with duplicates, then this might actually turn out well, for you.
The next question is, how do I know when and by how much to autoscale? We said we would need to increase the number of processing nodes, because otherwise, the items would sit longer in Kafka, and that's referred to as the lag. How much lag are we seeing before an item is processed from the queue? One would think lag is a good metric to trigger autoscale. It makes sense, you could scale up based on that. The problem is you cannot easily scale down by this metric. When the lag is zero, how do we know if we were to scale down by one processing node, 10, 50? You might flip flop by removing some nodes and then observing lag, you add nodes, zero lag, remove nodes. In practice, a lot of people use a proxy instead. CPU utilization is a good proxy. For us, records per second turns out a good trigger for autoscaling. In steady state, you're able to tell how many records are processed per second. Then based on that, we can add more nodes or less. We have an autoscaler, and here's the result of a 24-hour period that I'm showing here where our workers which are the consumers, they aggressively scale up by the autoscaler. We want to avoid rebalances during scale-up because we are already seeing a lot of data, we want to quickly scale up and then let it slowly scale down later. A few rebalances in a day is ok, basically, of course been autoscaling.
Out Of Order and Duplicate Records
Out of order and duplicate records are going to happen in a distributed system. I talked about a few cases before on how they might happen. If you're familiar with stream processing, windowing is a technique a lot of people use. Here you are collecting events based on a time window, or you could do sessionization based on specifics of your application. This is for a video playback session, which has a specific start and events. Therefore, we could collect all events for a session within those boundaries. It's possible we might miss a stop event. If a member were to be watching in a laptop, for example, and just closed the laptop, which will not give the Netflix client a chance to transmit the stop event, in which case we timeout. Either way we have sessions, or multiple events of that session, and based on certain attributes within the data, we could order them and also deduplicate them. For example, you could have a monotonically increasing ID or a timestamp from the client within those events, and that could help you. For writes, we are able to deduplicate them by using the server timestamp that is the time when the event reaches our server. Since events are transmitted in order by the client, we can use that for modify time when writing, and therefore we do not see a problem with it.
What's the Best Consumer Platform?
It turns out there are multiple platforms we could use for consuming and processing requests. That's a luxury. We have three. Mantis is a stream processing system that Netflix open sourced a few years ago. Apache Flink is a popular stream processing system. Then there's the classic microservice, which could use consumer client, and then just process the data from Kafka. We started out with the question of, which platform is the best one for me to use, and realized that's not the right question. I should be asking, which processing platforms benefit which use cases? It turns out, based on the use case, you could use each of these three. They all have pros and cons. If you're doing complex stream processing, Mantis and Apache Flink are a very good fit. Apache Flink also has a built-in support for stateful stream processing where each node can store its local state, for example, for sessionization. Microservices are very interesting, at least for us, because Netflix engineers have an excellent support for the microservices ecosystem, all the way from generating or starting with a clean code base, all the way to CI/CD pipelines and monitoring. We use all three for different use cases.
Intermittent Processing Failures
Now that we've addressed challenges, we still have to deal with intermittent failures of processing. If you were to get an item and got an intermittent failure, beyond maybe a simple retry, we wouldn't want to block the entire queue behind because of one item. Sometimes people refer to that as head-of-line blocking. Instead, we put it aside, process the rest of the queue, and then come back to it later. A characteristic we would like for such a system is that there should be a finite time elapsing before we try it again, there is no need to try it immediately. That's what we mean by a delay queue here in this picture. There's a variety of ways you can implement this. Maybe you could write it into another Kafka topic, and then build another processor that builds in a delay. It turns out for us, it's very easy to achieve this using Amazon SQS, since we already operate on EC2. We use the simple queue service to put an item, and then the queue has a feature to optionally specify a future time, but it should be made visible. That works easy.
Cross Region Routing
Cross region aspects are important in the sense that since Netflix operates in multiple regions, and it's a large distributed system, it is possible a region may become unavailable once in a while. We routinely practice for this, and multiple times a year, we take a region down just to make sure that we exercise our muscle of cross region traffic forwarding. First part, it would make sense that an item that was meant to be for another region could be just remotely published into a Kafka topic, using a tunnel across the regions. That normally would work except when you do encounter a real outage of that region, that remote publish is not going to work. A simple but subtle change we made is that we always want to publish locally. We publish to another Kafka topic and asynchronously have a region router send it to the other side. This way, all events of a single playback session can be processed together.
Testing, Validation, and Rollout
Now that we have challenges figured out, tradeoffs made, how did we test and roll it out? Shadow testing is popular. Chances are, you may already be using such strategies in your environment. For us, this consisted of having playback API dual-write into the existing synchronous system, and Apache Kafka from which the asynchronous bus processor was consuming. Then we had a validator process that would validate that the in-flight requests are identical. The next step was to make sure that it's not just the in-flight requests processing that was identical, but also the stored artifacts for which we created a shadow Cassandra cluster. Here, you're trading off costs for higher confidence. If you have an environment where it is relatively easy to get additional resources for a short period of time, which is certainly possible in a cloud environment like ours, then it gives you the additional benefit of confidence before rolling it out. We rolled out using userId to give us a consistent slice of the traffic that we migrate into the new system, starting with 1%, 5%, all the way to 100%. That gave us really smooth migration without impact to upstream or downstream systems.
Current Rollout and Next Steps
This is a quick look at where we are and where we're going. The items in blue here, are the playback API, Kafka, of course, and the viewing history processor, bookmark processor out in production. We have the rest of the system that deal with other attributes. There's an attributes processor, and session logs, which will be interesting because the size of the data is very large, way larger than what you would normally write into Kafka. We have some new challenges to solve there.
Conclusion
I shared with you how asynchronous processing improved the availability and data quality for us. I showed you how we reasoned about the design choices and what tradeoffs made sense for our environment, and showed you how shadow testing and incremental rollout gave us confident and smooth rollout. With this, I invite you to think about how this applies to your environment, what other tradeoffs you may make for a similar journey.
Questions and Answers
Anand: There was a secondary cluster question. I think it was about the scale and availability and those sorts of things. Do you want to address those questions?
Podila: The motivation for having a standby cluster is to avoid data loss in case Kafka cluster itself becomes unavailable. Kafka itself is highly available, in general. It employs several strategies within the system for high availability. Since we're dealing with critical playback session data, we wanted the ability to have that cluster available. Again, we make this available for certain critical data, but not necessarily all of the data. The cluster is always available as a warm standby. The publisher, then upon detecting, or if we determine that the primary cluster is going to be unavailable for some time, then it switches publishing into the alternate cluster. They are similarly sized, so there is no difference there. The consumers have clients attached to both of these clusters, and therefore can continue processing.
Anand: In practice, has your service done a failover, beyond just your testing it?
Podila: Yes. Early on, we had a case where there was a misconfiguration on the Kafka cluster. Human error always happens. This caused a publish to be stuck. For example, for a particular partition, there are three brokers in an ensemble to serve traffic, and the publish mode was set to ack all, and in that case, if one of the brokers were unavailable, then it would stay there for a while. It caused an unexpected outage where we were able to switch quickly and then later realize there was a misconfiguration in how the broker was set up.
Anand: Was it the publisher side or the broker?
Podila: One of the brokers which is a cloud instance, became unavailable.
Anand: You had ack all and maybe RF was not equal the in-sync replica, ISR, that's usually the problem, it's not equal to RF minus one. Is that the general problem? You're doing a rolling upgrade of your broker, one went down and you stopped being able to publish?
Podila: This was not during an upgrade. This was just an instance becoming unavailable, which happened in the cloud.
Anand: It wasn't the ISR equals RF minus one problem.
Podila: Correct.
Anand: In this time, how long does it take for your publisher to detect the error and fail over?
Podila: This is something that we tuned since we learned from it. In the beginning, it basically caused an even worse case of backpressure, because the publisher slowed down waiting for acknowledgments that it could not get. This touches upon another question of data loss. The way we have made it work right now is, we have first switched to ack 1. Ack 1 will basically let us acknowledge or respond back as soon as we get an acknowledgment from a leader broker. This presents a problem that if the replication is not in sync, then you could potentially lose data if the leader broker were to go away. Then we can avoid that another way.
Then, we also move to an asynchronous mode of publishing. This is totally interesting in the sense that for a case where you do not want to lose data, it is not recommended. The asynchronous mode lets us put the data into the memory buffer and have an asynchronous thread published into Kafka. The main advantage here is to get scalability and performance with a tradeoff of a small amount of data loss. For certain applications, we can easily get an upper bound on this data loss, and therefore are able to make the tradeoff and be satisfied with it. For example, our data durability requirements are four to five nines, and we can easily get that with sizing the memory buffer, and ensuring that a flush happens rather quickly and can see that.
Anand: What was the overall improvement and availability for your system end-to-end? How did you measure it?
Podila: We are in the process of migrating multiple components. The true availability will be visible when all components have migrated. There are some components that are still in the synchronous mode. However, for the components that we did migrate, we are able to see quite a bit of less processing needed, in a sense that we can have fewer consumer nodes and then absorb any intermittent spike in traffic, with a small latency, without incurring too much latency, and autoscaling these instances. We've seen improved utilization of resources and reduced footprint in the cloud.
Anand: Why would that happen, just because Kafka is faster so therefore the readers and writers don't need to be scaled up as much.
Podila: In a synchronous system, we need to be better prepared for spikes, because otherwise there will be backpressure going back all the way to the clients and possibly data loss if the retries are exhausted. We need to run the system a little larger, to be prepared for spikes. Whereas in Kafka case, since Kafka is already sized for a certain amount of data that we were going to have, the consumers may experience a little lag when there is spike in traffic. As long as the lag is acceptable, and you put autoscaling on these, then you can run fewer consumer instances.
Anand: What's your autoscaling signal?
Podila: Originally, it would seem that a lag would be a good metric to scale on because that is what we're trying to avoid. The problem with that is you cannot scale down easily.
Anand: There's no signal for scaling down, or scaling in.
Podila: There's proxies for such a trigger that people end up using, CPU utilization is a good one. For us, it made sense to do RPS, records per second. In steady state, we are able to estimate what is the records per second per consumer instance we are able to handle. Based on that, put a target tracking autoscaling policy. This is a bit like PID controller style, where it is trying to match the amount of lag that is in the system or amount of RPS that is happening, and then add or remove instances. It's very aggressive in scale-up and then slowly scales down after.
Anand: Have you generally had to test this? How did you test this effectiveness? One issue with this is the proactive or predictive autoscaling, where upstream there's a bottleneck, you don't get anything, you scale in. You scale in very carefully and slowly, and then all of a sudden there's a burst you're not ready for.
Podila: This is something that we had tested because we're also using an autoscaling strategy that's built into Mantis stream processing system. There are similar strategies available on EC2 cloud as well. What we were doing in the beginning, for example, when we did a region outage or a Kafka cluster outage exercise was to pre-scale the consumer cluster to prepare to read from the other cluster, which would have a big spike in traffic. What we found was that the autoscaler is able to aggressively scale up within an acceptable lag of time. Otherwise, we would have to manually scale up. Predictive autoscaling does not seem to be required in that use case for us.
Anand: Do you change the number of partitions over time, automatically, or do you do it manually?
Podila: Right now for this application, we're using non-keyed partitions. It makes it simpler to add the partitions. Since we started this project, we've not added partitions. I think we tried to estimate the overall size of the traffic and have large number of partitions. It is less likely for us to add. There are other applications at Netflix that have added partitions, and Kafka handles that pretty well.
Anand: There's no auto increase in partitions currently. You don't auto increase partitions.
Podila: Correct.
Anand: Typically, again, it's easy to auto increase, it's harder to determine when to decrease, because it can cause all sorts of bottlenecks.
When you scale consumers, this causes a rebalance with Kafka, do you redistribute the traffic between the new amount of consumers? This is not a trivial thing to do in Kafka. How does it work? How do you make this thing work?
Podila: This is something that we had to spend some time figuring out, what do we want to do? At one extreme of the spectrum, we have the ability to get a notification when a rebalance is going to happen. Then in the consumer cluster, do a checkpoint of the state and of the Kafka offsets, and then let the rebalance happen. Upon start, you reload the same checkpoint state in the offsets and continue from there. This is a coordination that's built into our stream processing system. That's one way to do it. There is another way to think about it. If your processing is less stateful, that is, you are able to handle duplicates by either a better strategy in how you process and save those data, or if you have idempotent behavior in your operations, then you could let the rebalance happen. The worst case you are going to get a few duplicates process. There is an upper bound to that. If you take the number of consumers, multiply that by the number of records that it grabs from Kafka each time, in the worst case, that's the number of records you would process duplicate. I think it depends on the application. For stateful processing, you should probably do checkpoint and restore. For less stateful or idempotent processing, you could just let the duplicates happen. There is the case of overhead when you change the number of instances where you're doing the duplicate processing.
Anand: I have noticed something. Similarly, I do this autoscaling with Kafka. I noticed a funny thing that happens once in a while, where the nodes increase but the partitions don't rebalance. I've actually noticed that. Then I have to do some manual guesswork, and I find one Kubernetes pod has still got most of the traffic, and it's a bottleneck. Then I have to kill that pod. Of course, this is again, another thing someone should write an auto-detect and kill to force the rebalance. Just because there are more nodes available with consumers, Kafka doesn't always rebalance to them. The typical trigger for rebalance for Kafka is a timeout. There's a certain max poll interval. If it doesn't hear back from you on a manual ack, by default, it's 5 minutes or something, or maybe 3 minutes. It's some large number. I think it is 5 minutes by default, and it'll rebalance. If it's humming along, and you add more consumer threads, it won't interrupt what it's doing to balance it out. Even if the consumer CPU is really high. If the consumer CPU is so high, that it pegs the CPU to cause a 5-minute timeout to happen, the rebalance will be triggered. Then, at that point, you've killed your SLA, if you're SLA sensitive, waiting for that 5-minute or 2-minute timeout. It doesn't happen often. Sometimes it doesn't preemptively auto-balance. Sometimes it does. Ninety-nine percent of the time it does, 1% of the time it doesn't, and it's an operations all-hands-on-deck thing, where we have to identify the pod and kill it. Have you ever run into this?
Podila: I've not handled that case myself. There may be another aspect to what you're saying is that there is a little bit of a math in the sense that the number of consumer instances and the number of partitions and brokers, you divide them. Obviously, unless you have perfect multiples, you're not going to divide the partitions across, exactly. Even if you add one more consumer instance, it might not get any traffic because it doesn't necessarily get divided equally or in a healthy way. I think there's that aspect of how quickly you can see rebalance happening in a good way when you add more consumer instances. For that reason, it works a little bit better when the number of consumers is much smaller than the number of partitions. Then you have a better ability to divide the traffic across them.
Anand: Have you ever noticed hotspots? The way I noticed this is it scaled out, but performance is not improved. Then I noticed it's because it didn't rebalance. Have you ever seen that where autoscaling didn't result in a drop in lag?
Podila: The hotspots happen especially when you're using keyed partition. In this application I talked about, we are not using keyed partition. We basically do a round robin, or spread across. In fact, our publisher has a thin wrapper around the publisher client from Kafka. What it does is it looks at things like the load or replication status on each of the partitions and then prefers to spread across.
See more presentations with transcripts