Transcript
Gallego: We're going to talk about codesign in Raft on a thread per core model for the Kafka API. All of the code will be available on GitHub, https://github.com/vectorizedio/redpanda. My name is Alex. I'm a developer. I originally wrote the storage engine and [participated in] all of the techniques that we're going to cover. I was heavily involved with either as the developer that wrote the code or as a reviewer on the code. We've been hacking on Redpanda, which is the project that we're going to talk about. It's a streaming platform for mission critical systems. Prior to this, I was an engineer at Akamai, through an acquisition of concord.io. You can think of it like Spark streaming, but we wrote it in C++, on top of Mesos.
Outline
The agenda will cover two observations: a new way of building software using a thread per core model, and the practical implementation. Specifically, we'll cover eight techniques for building low-latency software. The first real observation here is how hardware is fundamentally different than it was a decade ago. The second observation is that given the fundamental shifts in hardware, CPU becomes the new bottleneck for a storage system like ours. Embracing the hardware where everything is asynchronous is really the second observation. With these two premises, we can reason about how to build new software, or what are new techniques to build in software. In particular, we will walk through the eight most impactful techniques for Redpanda to achieve 10x to 100x better tail latencies.
Reinventing the Wheel, when the Road Changes
One of our advisors really loves this saying, "Sometimes you get to reinvent the wheel when the road changes." The road is the hardware in this analogy. The idea here is that hardware is fundamentally different from what it was a decade ago, in particular for a storage engine. The thing that is materially different is, one, disks are 1000 times faster, and at the same time, they are 100x cheaper. The second material improvement on hardware over the last decade is that computers are 20 times taller. You can go on GCP and rent a 225-core vCPU VM, and so, massively different scales. The contention moves for a storage engine out of the spinning disk subsystem for older storage systems, to CPU coordination. Then, the last one that I think is worth noting and is an important distinction, is that in the land, basically on Amazon, you can go and rent 100 gigabit per second NICs and actually get that sustained traffic delivered to all of the VMs. The idea is that software doesn't run on category theory, it runs on superscalar CPUs with massive amounts of memory, multi-socket motherboards. Disks that are actually three orders of magnitude faster, and so you start to move the bottleneck in compute to a different subsystem.
Observation 1: Evolution of Streaming
This is an evolution of the system that I'll talk about. In particular, the mental modeling techniques and how it evolved through history, superimposed with the evolutions of hardware. RabbitMQ really came about and replaced a bunch of older enterprise, old-school message queuing like TIBCO and Solace. Around that time, SSDs were about $2,500 per terabyte. In 2010, 2011, Kafka came out, really bringing a lot of the MapReduce style thinking to streaming. The idea is, can you do streaming on cheap spinning disk type computers with 4 vCPUs, perhaps maybe a gigabit of network. The point I want to highlight here is that the threading model, and the mental model for building software a decade ago, was for a totally different bottleneck in computing. Namely, that to write a page to a spinning disk, it was in the order of milliseconds rather than microseconds. Pulsar came about as the evolution of streaming here, and added the disaggregation of compute and store, largely with the context of HDFS. In the public clouds, the S3 API became the true disaggregation of compute and store. As of this year, Redpanda is now stable and ready for production use. What I wanted to highlight here is that it is software designed for the modern platform. When we ask ourselves, what could we do differently, if we were to start from zero? We knew where we wanted to be, which was to provide, effectively, a Raft implementation with a Kafka API, given the modern platform and modern hardware.
Observation 2: Everything is Async; CPU the New Bottleneck
The second observation here is that everything in hardware is actually asynchronous. The most important point is really that even on an NVMe SSD device, you can have sustained writes throughput of 1.2 gigabytes per second. The one on the right is Western Digital NVMe SSD device. However, if you use the classical model of IO, where it's blocking, so you do a file write, and you give it the file handle and some bytes, and then you do file close, so basically synchronous and sequential in your program. To write a single page to disk, you're roughly wasting 60 million to 420 million clock cycles. The observation here is, could the CPU be actually doing something else, instead of waiting for the hardware to complete the operation? Obviously, the answer is yes. From a software construction perspective, is, if we embrace the platform, if we embrace that hardware is always asynchronous, and we design our software from the ground up to be asynchronous first, then we get to take advantage of this 420 million clock cycles that are wasted. We can do other interesting work like compression or verifying checksum, and verifying the integrity of your data, or pulling more data out of the network, while your NVMe SSD device is busy writing it to the actual sectors of the NVMe controller.
Thread per Core Architecture
The architecture that we chose to build Redpanda on, is on a thread per core architecture. For building a storage engine, we think that we've measured that the new bottleneck is a CPU. How do we start to build on a new way, to build a new storage engine or new software using thread per core architecture? The distinction here is that asynchronicity is a first class citizen and it becomes explicitly how you start to reason about your software building. A word of warning, though, is there's no real free lunch here. When you move to a thread per core architecture, you will end up rewriting large parts of your software that COM will build team threading models. A classical example is that often you add a logging library to your program, and it has this thread pooling implementation. That doesn't really work that well with a thread per core architecture where every logical core presented to the program is an actual pthread. Inside that particular pthread, there is a cooperative scheduling implementation. It doesn't necessarily have to be the way that I'm going to explain, but this is rather an example of an architectural thinking that you can leverage.
New Way to Build Software: Async-only Cooperative Scheduling Framework
The idea that we're going to cover is in C++, because we wrote this storage engine in C++, in large because we're trying to extract every ounce of performance of the hardware. It was important to us to be able to access all of the lowest level primitives. This cooperative scheduling framework with C++ futures, and we used a library called C*, that gives us this thread per core architecture and a cooperative scheduling mechanism. Go has a similar thing where every time you return on a function call, the cooperative scheduler gets to choose, what is the next goroutine that is going to run? Similarly, for C*, every time you return a future, the future is either executing or suspended. There is no other state about this particular future. What it is, is that you start to worry about the concurrent structure of your program rather than the parallelism. The parallelism is a free variable that is decided at runtime. You can take a program that was built for a concurrent structure on your 4-core laptop, and run it on 225 virtual cores. Present it to your program on GCP, and know that because you've programmed to the correct concurrent structure, parallelism is a variable that you can tune at runtime. The idea here is that these futures are viral primitives, and so, just like actors in frameworks like Orleans, or Akka, or Pony the language, they're really composable. You can mix them, MapReduce them for an aggregation. You can filter them, chain them. Complete a particular future. You can have a generator. Sleeps, for example, is a really powerful mechanism because sleep on a future is actually just a suspension of the particular frame.
In C*, the framework that we're using, it gives you no locks. Once you enter the thread-local domain, you don't have to worry about other actual parallel access to the same variable, because there's exactly one thread to do everything. It is limiting in that there's exactly one way to write code. It is freeing in that as long as your domain fits into this thread per core model, and we made the Kafka API fit in this model so we think it's fairly flexible, then you no longer have to worry about parallel access to a particular variable. A lot of problems with parallel programming just go away, because you are explicitly doing this from the get-go.
Technique 1: No Virtual Memory
The first technique that we use to achieve 10x to 100x lower tail latency in Redpanda, and exposing the Kafka API, was to use no virtual memory. This is rather a contentious subject. What we do is, remember that we're using a thread per core architecture, and so when the program starts up, we allocate the entire machine's memory, and we actually do some reservation for the operating system and daemons. The point is that you take the memory that you presented to the program, and the program allocates 100% of the memory presented. Then, you split the memory evenly across the number of cores. Let's say that you have 10 cores, and 20 gigabytes of RAM, then every core is going to have 2 gigabytes of memory for it. That is 100%. If your core allocates more than 2 gigabytes of memory, you will OOM and crash your program. What it gives you is this low latency thread-local memory allocation access. At the lowest level, we use a buddy allocator. A buddy allocator divides memory by half every time you need a new region. The mental model is that for allocation pools, where every time you say a new integer, it has to come from somewhere. We swap the allocator library so that all allocation pools below 64 kilobytes are part of a range of allocations so that you could recycle the allocations, and everything above 64 kilobytes just gets added to the large memory pool allocation.
It is difficult to use this technique in practice, because every data structure, you have to think apriori. You're like, how much memory am I going to dedicate to my RPC subsystem versus my compaction and my background threading model versus any other part of this subsystem, or my read-ahead for the disk, or my write-behind, my training for my dictionary compression? As a programmer, just like you have explicitly opted into a concurrent and parallel execution model, you're also opting in to deciding explicitly where and how your memory is allocated. This is perhaps one of the more difficult mental modeling techniques that we had to retrain ourselves so that we don't OOM. This is really hard as memory fragmentation increases over time.
Technique 2: iobuff - TPC Buffer Management
The second technique is iobuff. It's a thread per core buffer management system. The idea with thread per core architecture is that once you land on a particular core, and let's reduce the mental complexity model to two cores, for fragmentation, as your program runs over time, your memory is going to start to look like grated cheese. It has these particular holes of memory. What an iobuff optimization does for us is that instead of forcing a reallocation and compaction of memory, where it's similar to the disk defragmentation in Windows on spinning disk days. Where you would click defragment disk, and you would take a bunch of fragments and put them together, and all of a sudden, your program would get faster. Similar things happen to memory, where, as a function of time, your program is going to allocate memory in the entire memory region allocated to it, and it's never going to be pretty. It's never going to be like the program always allocates from the bottom part of the memory. The allocator just keeps a pointer to where it is, the next free slot, and then it moves around and scans until it finds the next pointer. It's really low latency in that it doesn't do any compaction at runtime.
This is fundamentally different how some of the virtual memory subsystems work. Fundamentally, we never allocate a large linearized memory, because that puts a lot of pressure on our memory allocator, the buddy allocator. If not done correctly, if you do that for really large allocations, then you can actually end up OOMing your system. We only have usually around 2 gigabytes per core, and we don't use virtual memory. What this is saying is, let's embrace that you are going to be mostly in a memory fragmented world, and let's just use memory as efficiently as possible without causing this latency spike, of causing a compaction. Where you take all of your memory, you're recompacted to the bottom part of the region, and you leave the upper regions of memory free, so that you could do linearized memory allocation. Instead, we build this structure that allows us to iterate over data structures as though they were contiguous memory in code, but in reality is backed by a bunch of fragmented buffers. We weren't the first ones to invent this thing. The Linux kernel has a kbuf implementation that is very similar. The FreeBSD kernel has another implementation called mbuf, which is exactly the same. These techniques have been around for a long time. What we've added here is we've tied it to our thread per core architecture. Instead of having a global deallocation pool, when you send memory to a remote region, instead of calling free on that remote region, because that particular core doesn't actually own the data, it will send a message back to the originating core. Then the originating core has to delete the memory. This is all handled for you as a buffer abstraction. As long as you build everything on top of this buffer abstraction, largely your code ends up looking roughly the same.
Technique 3: Out of Order DMA Writes
The next technique is out of order writes. This is really important for trying to maximize throughput of the device. Your physical hard drive has a particular set of parallelism, and also a concurrent structure. To highlight this technique, we ask the file system, give me your alignment for writing a page to, effectively the NVMe SSD controller. Let's say 4096 bytes is the alignment. When you allocate something, you say, give me a hardware align 4096-byte boundary, and then I'm going to write the page. In this particular case, the size is 128 kilobytes for this example. Instead of dispatching one, waiting for acknowledgment, dispatching the next one, waiting for acknowledgment, and so on, we dispatch them all, and simply do this scatter-gather approach to writing a particular set of pages. What we are doing is we're trying to saturate the hardware while minimizing latency. You're doing both things here. You are reducing the latency for a particular batch of operations, so the average latency is much lower. The average latency for writing a block in this case is higher, but the average latency for writing all of them is much lower. It's probably 1.5x the cost of writing one. Your throughput in this case could potentially quadruple. This is a really powerful technique when you're trying to saturate the NVMe device.
Technique 4: No Page Cache - Embed Domain Knowledge
The next technique that we use, building on top of this thread per core architecture is no page cache. This is the second most controversial thing for us, for the particular application that we're building, which is Redpanda, a drop-in Kafka replacement. We give the user a Kafka API. You can take your 100,000 Kafka API compatible code, and point it at Redpanda, and it should just work. There should be no code changes. That's the API we give the user. Kafka as an API is a very specific problem, it is not a generic problem. To us, the Linux kernel page cache actually introduces non-determinism in the IO path. At worst, it introduces correctness bugs.
Let's walk through it. Every time you instantiate a file handle, the kernel will instantiate a page cache object that has global locking semantics for that particular page cache object. If you have two writers, or if you have a writer and a reader, they're all competing and contending on that particular lock. Especially, if you have tailing iterators to get exclusive access to that particular memory region. That's the first one. We've architected our code in a way that is a thread per core, which means we have a single thing at any point in time accessing files. The way I like to frame this, as a general rule of thumb, is that the generic page cache is never a bad choice. It's just always not a good choice, if you have an application, a specific problem that you're trying to solve, like offering our users, effectively, a log interface with the Kafka API access patterns. It is always really a good middle ground to use. Writing your coding in DMA is really tricky and hard to get right. It took us a couple of years to make sure that it's solid, and it crashes correctly, and so on.
The last one is that it introduces hard to track bugs. I linked in the presentation, a Postgres bug. The bug was there for 20 years in Postgres, and Postgres is one of the better engineers out in the world. You can start to understand why a generic scheduling algorithm is not really that useful when you have specific embedded domain knowledge about the problem you're trying to solve. What eliminating the page cache gives us though, in particular, for performance, because the goal is to get 10x to 100x better tail latencies, is that it gives us a thread-local. It's really low-latency cache that the format is ready to be shipped straight onto the wire. There's very little translation. We move away from caching pages into caching objects. The reason that works for us is because the Kafka model is just a header with a bunch of payload data. We pay the cost of materializing the header apriori, so that we can just take that entire byte array and ship it over onto the wire when the next request comes in. There's really no translation.
Perhaps more fundamentally, is that we can learn the statistics from the file system latency backpressure, and actually fail the next request. One example is assume that you're trying to heartbeat a particular endpoint, and you know that in order to do the next heartbeat you have to write some data on the file system. We know if the current latency is twice as much as the next heartbeat, then we can simply fail it without adding additional throughput or latency that is wasted work on disk. That transparency really buys us a lot of performance when you get to the saturation, when you get to the tail latency scale, so above 99.9 percentile, and every percentile between that and the max latency.
Technique 5: Adaptive fallocation
Another big allocation is adaptive fallocation. This is really a trivial thing that I think a lot of applications could really benefit from. You don't really need even a thread per core architecture here. The idea is that every time you update a global metadata, like your file size, it incurs all sorts of additional costs in the Linux kernel. It might have to update other inodes or other metadata, whether it's folder metadata, or just the file system metadata. That operation is a global operation, and it requires a lock on the kernel. What we're doing here is you can use the fallocate syscall, to effectively write zeros every 32 megabytes. Instead of doing fsync, we do fdatasync, because we know we have the size. Every time we're about to hit over that 32 megabytes, we pre-allocate another 32 megabytes. In real life use case, that has anywhere from 15% to 20% latency improvement by doing this ahead of time. Effectively, what you're doing is you're amortizing the cost of updating metadata in the kernel.
Technique 6: Raft Read-Ahead Op Dispatching
How does all of this relate to Raft? By bypassing the page cache and the virtual memory and all of these techniques, we get to embed hardware level knowledge into Raft. One really interesting use case that we found while trying to optimize, trying to reduce the flushes which are a global file system operation, is, every time you write in Raft, you have to flush to guarantee durability of the writes on disk. What we did is we learned from the guys that were building hardware where they get to scan the next series of operations and reorder them as long as the resulting set doesn't change the semantics. We can't change the write order, but what we can do is we can drop the flushes. If you have write, flush, write, flush, write, flush ratio, what we do is we say, for this batch of operations, we're going to write three times, drop two flushes, and then flush at the end of the call. Thus, have this huge throughput improvement at the cost of actually artificially debouncing latency. Actually, increasing the latency just a little bit, reduces the overall system latency and increases the throughput. This is really useful for us. We do this because we get to learn transparently what the latencies being propagated from disk are, all the way into higher application level protocols. If you really think about it, what we're doing here is we're just doing better buffer packing at the lowest level bits. Instead of writing half a page, flush, read the next half page, what we're doing is we're writing bigger buffers. Then we're flushing bigger buffers, and we're reducing the number of flush operations.
Technique 7: Request Pipelining Per Partition
The next one that I think is really important when you build the thread per core model is an execution model for pipelining. Pipelining is really critical in order to get latency. Why? Without pipelining, you can't do the debouncing trick that I mentioned. If you dispatch one request, wait for it to finish, then do the next one, that sequential ordering is really slow. What we do is the dispatching, once you get a TCP connection accepted, let's say, and this example is on core-0, and you write into both core-0 and core-1 in this particular example, what we do is we figure out the assignment of, which cores are actually going to handle this particular data? Then for those cores, we simply pipeline the writes. If they're in two different cores, you're actually going to improve parallelism. If you're in the same core, then you're getting pipelining, and you can take advantage of a lot of other optimizations like the debouncing for lowering the number of flush operations.
Technique 8: Core-local Metadata Piggybacking
The last one, perhaps the most important one that you'll do is when you adopt a thread per core execution model, its metadata becomes really difficult. The idea is that you no longer have global metadata, and so, if you don't have global metadata, you always are going to be operating on stale metadata. You have to embrace that idea. You will never have a global view of the data. It is very expensive to do synchronization events across 225 cores. That includes basically pushing a barrier to all of the cores, serializing the metadata, and then making sure that every core has a copy of it. Instead, what we do is every time we send data to a remote core, we piggyback information on the way back that is useful to the protocol dispatcher. For the Kafka model, we say, go write this partition on a remote core, and on your way back, tell me, what are the latency statistics of this for that particular core? How much data is there on that particular core? If this TCP connection is a consumer, let me know, what is the gap between my head and the tail of the log, so that I can start to do an execution planning on the next phase that comes in. This particular metadata piggybacking has had some of the most profound effects in simplifying the architecture.
We could do this many different ways. You could add an exception to the rule and simply say, for metadata, we're going to have a global lock. What we found is that piggybacking information is very cheap, computationally speaking. You're copying integers around. Two, really gives the TCP connection the view that it needs to do the job. It doesn't have to be correct at all times. By correct, it just can be incorrect. It can have a partial view of the data, which is fine, because not every core needs to have an understanding of what every other core is doing. It just needs to know the information that it needs to know. It's a much smaller problem. Piggybacking information on the way back from a remote core to the originating core was one of the most profound architectural differences.
Wrap-up
The bottom two graphs show, what are the difference in impact in terms of performance? What I'll highlight here is that a lot of these techniques are really aimed for anything above the p90th percentile. I'm comparing Kafka in black with Redpanda in red, once Redpanda understands the hardware saturation. If your hardware isn't being saturated, there's really very little that we could do if there's lots of room for computation and growth, and there's really no backpressure. All of these techniques matter when you start to care about the tail latency, when you start to care about, basically the entire distribution of your users. When you care to give predictable performance to your application.
Questions and Answers
Schuster: Do you still need an operating system, or has the operating system become your enemy?
Gallego: I've thought a lot about this. Not as much as other programs do. We really want to own most of it for predictable latencies. The next step for us would be to write our own file system. We're actually not very far from it. If you look at the Kafka API, we keep track of timestamps, of offset, of metadata, we have multiple indexes. I've thought about actually writing our own file system, so that you open up a device block file, as opposed to open up a file handle file, and you do a little bit of recovery. We have to do a lot of these techniques already, if you're any storage system, because if the system crashes, you have to look up some metadata and then you have to recover and re-checksum that the data is the thing that the program said it stored. If we have recovery, timestamp, offset, size sets, and metadata compression, then what is the file system really doing for us? What it's really doing is it's being compatible with other programs. The reason we haven't gone into just a full-on include OS is that there's just other programs that people need to run often on the same machine, whether it's for authorization or authentication. In our cloud, there's an SSH program that verifies that only the SRE is allowed to SSH into that machine. It becomes an operational hassle to just really create your own OS image. Right now, you're right, I think that's a really fantastic observation. We really tried to just own the whole thing.
Schuster: There's a trend of exokernel type development, where you basically link your program in to the kernel, as long as you use a safe language, that's fine. Have you considered that maybe as an option, or rump kernels, I think they're also called.
Gallego: All we need is really the initialization sequence of like, bootstrap and really start this process, in terms of owing it, in terms of managing memory, CPU, cross core. One of I think the hardest thing was to do proper CPU accounting. It's really not just crossing the memory boundary, it's actually crossing the memory boundary with accounting. You say, what is my budget to leave my core? Otherwise, I need to do local operations before I spend all my budget, all my latency going to remote cores. Once we got that worked out in Redpanda, then I really started thinking about that. There are so many cool problems to solve. It's hard to work on that.
Schuster: Is the rise of ARM on the server interesting for you?
Gallego: It's hugely interesting. We're releasing ARMv8 support. What it does is actually your price per byte on a storage subsystem, even for your active workload, is hugely cheaper, it's 20% to 30% cheaper on the price per byte, or dollar per cycle basically spent. No, I think it just has to do with the cost of how Amazon is renting those computers to you. It's just cheaper.
Someone asked, is this designed for super tall machines?
The answer is actually the opposite. The idea is that when you design your system concurrently, then parallelism becomes a free variable. We have this embedded firewall company that is using us in a single core in a distributed router. Instead of using the file system, they use the Kafka API because they get ACLs, and TLS, and a bunch of security and user management, and so encryption. I think that is a really interesting thing to think about parallelism and concurrency, where parallelism is just the number of cores, and is this free runtime variable. You can run from 1 core, all the way up to 225 cores.
See more presentations with transcripts