BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Presentations Dataflow-Based Query Caching with Readyset

Dataflow-Based Query Caching with Readyset

Bookmarks
51:45

Summary

Alana Marzoev discusses the fundamentals of streaming dataflow and the architecture of ReadySet, a streaming dataflow system designed for operational workloads.

Bio

Alana Marzoev is one of the founders of ReadySet, the company commercializing Noria, a transparent SQL caching solution for relational databases. Before starting ReadySet, Alana was a PhD student at MIT CSAIL where she worked on databases and distributed systems. Prior to MIT, she did research in the Systems & Networking group at Microsoft Research and worked on the Ray project at UC Berkeley.

About the conference

InfoQ Dev Summit Boston software development conference focuses on the critical software challenges senior dev teams face today. Gain valuable real-world technical insights from 20+ senior software developers, connect with speakers and peers, and enjoy social events.

Transcript

Marzoev: I'm Alana. I'm one of the founders of Readyset. I'm going to be telling you about a computational model called partially-stateful streaming dataflow. Specifically, I'll share more about how the team at Readyset has been leveraging this model to build a high-performance database cache. Partially-stateful streaming dataflow is actually a relatively new concept. It originated in the computer systems research community a couple of years back, over at MIT. The main idea is that you can represent computations like SQL queries, as these long-running computational graphs, and essentially stream data through those graphs to incrementally update the results, and do so in a very memory efficient way.

The team at Readyset has been using this to build a database cache that has the same performance profile as a cache that you've spent a lot of time hand optimizing based on an in-memory key-value store, but that alleviates a lot of the pains that typically come with caching. Specifically, having to rewrite your business logic to now suddenly reference the cache, and then, more importantly, keep the cache state in sync with the state of your database as the data in your database changes.

Scope

I'm going to start by describing the problem domain a little bit more depth. I'll give you a sense for the types of workloads and applications that this approach to caching is best for. Then I will do a live demo of Readyset. You can get a sense for what dataflow-based caches in Readyset in particular look like in action. We'll cache a query and see some cool stuff there. Then I'm going to do a deep dive into dataflow-based caching. The way I'll approach this is essentially by layer of abstraction to layer of abstraction, like peeling the layers of an onion. I'll start with the highest level. We'll talk through the interface. I'll make a lot of the design decisions that were implicit in the demo, more explicit.

You'll get a sense for what it's like to actually program against and operate a dataflow-based cache. Then we'll discuss the system architecture, and I'll do this mostly through going through the life of a query. We'll both set up a cache for a query and then talk through what serving traffic from that cache actually looks like. Then, finally, I'll do a deep dive into partial materialization, which is the star of the show. Specifically, I'll share how you're able to use partial materialization to solve cache staleness issues in an automated way. Then, finally, I'll wrap up by comparing dataflow-based caching to other common alternatives for database scaling, so traditional caching based on in-memory key-value stores, query optimization, read replicas, all this.

Problem Domain

Problem domain. Caching is useful for a large number of types of applications, but in this talk, I'm focused on specifically caching queries from relational databases. I'm focused on database backed applications. I'm also going to be really narrowing in on web apps in particular, because web apps tend to have a set of shared characteristics that make them particularly amenable to caching. Concretely, they're highly latency sensitive. Not super surprising, there's people on the other end of the screen waiting for their pages to load. If that's slow, they're unhappy, they churn. Obviously, this problem's a lot harder nowadays, because most companies leverage ORMs, and ORMs generate both large numbers of redundant queries as well as particularly inefficient ones, unintentionally.

Even though they're a boon for productivity, it makes scaling a lot harder because there tends to be a lot more contention at the data layer, just by not necessarily having that many users. Web apps, they have a lot of variance in terms of traffic volumes that they serve. I think the canonical example of this is Black Friday. Companies will spend the other 11 months of the year just capacity planning for Black Friday. Of course, you see this dynamic in smaller scales and other contexts as well. If you're building a B2B SaaS app, then most of your usage is going to be during business hours, not on evenings or weekends. If you're building an e-commerce site, then you might have a flash sale and see more users or more customers that day than many of the days prior.

Maybe there's some geographic type patterns as well, where in certain regions, like on a booking site, you might get an uptick in hotel bookings or Airbnb bookings, because it's summer, it's like peak season for that location. Having your database in a state that it's able to deal with the traffic that you see at this high watermark, even if the high watermark is orders of magnitude higher than the low watermark, is incredibly important for user experience, because obviously these high watermarks correspond with really important business events. Websites are also really read-heavy, there are just more reads than writes. Like a 90 to 10 reads-write ratio, or 80 to 20 reads-write ratio would all constitute as read-heavy, in my opinion.

To give you a real-world example, if you've been on the website, Hacker News, the tech forum, most people on Hacker News, they're just looking at the front page, maybe clicking the top link. Most people aren't actually posting new things to Hacker News. Great example of, this is a very read-heavy workload. Finally, websites tend to have skewed data access distributions. When users request data, they're not just pulling rows from your table at random. There's really some sense of, some data is more popular. Some data is more recent. That's going to be what's shown to the users. In the wild, this is often governed by Zipf's Law, or the Zipfian distribution, which is this idea that the nth most common term is 1 over n times as popular as the most common term.

To bring this back to our Hacker News example, this is like, most people are seeing the front page, and then maybe a small fraction of them are making it to the second page, and then a small fraction of that group is making it to the third page. There's really a steep, almost exponential drop-off in terms of popularity of items. That makes this really great for caching. To summarize, applications that are highly cacheable, are latency sensitive, have high variance in load, are read-heavy, and have skewed data access distributions, or some combination of these factors.

Again, if that is your application, then caching is going to be a great bet. I think caching is actually a choose your own adventure type of solution. When I'm talking about caching, I'm going to specifically be referencing read-through caching. The idea is that you have an in-memory key-value store off to the side somewhere from your database. Whenever there's a database read, you're going to first check the cache to see if that item is in the cache. If it is, great, you'll just return it from the cache. That'll be a really fast read, it'll be a cache hit. If not, we're going to just trigger a request to the database to run that query and then store the results in the cache, and then return it. It's very simple. It's like read-through caching. Let's take a look at what this actually might look like in code.

We'll do a little before and after type of moment. I just found this code snippet on the internet, it was on some Python SQLAlchemy and Redis tutorial, but it illustrates my point. Here, there's this Python function, fetch_user_from_database, takes user ID as input. All it's doing is opening up a DB session using SQLAlchemy to effectively generate a SQL query that's pulling some user data and then returning it. It's super simple, simple as can be. When we start to modify our code to incorporate caching, read-through caching in particular, suddenly there's 2x the code. Now we have this helper function, get_user_from_cache, that's implementing this read-through logic that I mentioned previously.

It's checking if the user info is in Redis. If it is, great, returns that. If not, it's running that other function that I just showed you to fetch from the database. You're like, I have to write a helper function. Of course, this is just one place in your code where you might want to use caching. If you had dozens of things that you wanted to cache, it might be a little bit annoying to have to write all these helper functions.

Is that it? Is your problem solved? Will your database stay up now? Can we just go home? The answer to that question is, unfortunately, no, because as annoying as having to mess up your beautiful, clean business logic with caching code is, the real hard part of caching is actually keeping the state in the cache in sync with your database in a way that essentially ensures that you're not bothering your users or causing a database outage. I'll elaborate on that. For every query that you want to cache, you're going to have to first sit down and really think through what the user is expecting from you around data freshness. Then, from there, you're going to have to design an invalidation policy that you believe will guarantee that for the user.

Then, finally, you're going to have to implement it without causing any distributed systems bugs. Harder than it sounds. Again, we'll elaborate. The way that I'm going to explain this in greater depth is by walking through two examples. On the left here we have Facebook, and specifically, the Facebook News Feed. On the right we have the checkout window of an e-commerce store. In Facebook, let's say that you have a friend who went out to brunch and they posted photos from their brunch on Facebook at time point t, and then you don't see the photos on your Facebook News Feed until time point t plus 10. Ten minutes have gone by.

In most cases, you'll be none the wiser. You have no idea that your friend posted photos in brunch. The news feed's not even chronological anymore. It hasn't been for a decade. It's like random algorithmic, whatever. In this case, it really doesn't matter if your caching logic is really simple, naive. There's just TTL. It's going to be ok. Contrast that to the case on the right, where you have an e-commerce app, and let's say your goal in particular is to just buy a t-shirt on the internet, and you already know which t-shirt you want. You put it in the shopping cart, and it takes 10 minutes for it to show up. If you're anything like me, you're not going to stick around for 10 minutes refreshing the page, hoping that the cache will be cleared, or whatever.

You're going to go on Amazon or buy it from some other competitor, because nobody has the time to sit around. That's an example of how user expectations around data freshness can really vary across apps and also between different functions within a given app. In Facebook, there's loads of situations, like the checkout example, that we could talk about as well, but I thought this illustrates the point better.

Let's say you've done this. You've sat down, thought about it. You're like, I have a good sense for what users need here. Now you have to actually come up with an invalidation policy. The tricky thing here is that you really are on your own. There's no automation around this. You could set a TTL, but TTLs tend to be overly simplistic, because if you frequently have writes, then the data will really quickly become stale. If you set the TTL to be too short, then you're continuously just running queries against your database. At that point, it's like, why even bother using a cache? You're just rerunning your queries against a database. Your cache hit rate will be really low. You could set a TTL, but if you want your cache to actually do something for you, you might have to do something a little bit more sophisticated.

The more sophisticated you get, the more likely it is that you'll mess something up. Caching is really prone to certain types of distributed systems bugs. I think thundering herds are one great example of this that are really easy to introduce and are really catastrophic when you do. A thundering herd is when you essentially have a popular key in your cache, and there's a long queue of people waiting to read that key, and that key gets evicted. Then all of those requests get sent to the database simultaneously, and your database was not right-sized for this sudden traffic spike, and it just takes it down. Then you're in a situation where you invested all this time and effort into using caching, and you just shot yourself in the foot effectively. This dynamic is why we have this cheeky quote, that there are only two hard things in computer science, cache invalidation and naming things. It's because caching seems like a great idea in theory. Like, "It's so simple. We pre-compute some results. How hard could it be?" Then, in practice, it's actually quite challenging to get right, unfortunately.

If we were to do a cost-benefit analysis here just for fun, then I think that the pros would be like, with caching, you get, essentially, best case tail latencies, because you have an O of 1 lookup for a cache hit, into an in-memory key-value store. It's going to be a lot faster than actually doing work at the time that you run the query. You're doing all of the work beforehand. That's nice. There's also a pretty straightforward path to scaling, at least if you contrast it to sharding your database. This is going to be a lot simpler.

On the other hand, there's lots of things you're going to have to watch out for. You're going to have to rewrite your application potentially in a pretty substantial way. Caching generally is error prone. I talked about all of the different levels at which you can get caching wrong. Then when you do, the bugs are fairly obvious to your end users. On top of that, because conventionally caches are not isolated from the database. They're tightly intertwined. If something goes wrong with the cache and you introduce a distributed system bug by accident, which, again, really easy to do, the difference in code is really minor between code that triggers that bug and code that doesn't, then you could actually take down your database, which defeats the point of the whole endeavor.

Dataflow-Based Caching: Demo

I'm now going to transition into talking about dataflow-based caching. As I mentioned previously, I'll start off by just showing you a demo. Here I have an EC2 instance and two Docker containers running on the EC2 instance. One of them has just some Postgres database in it, and the other has Readyset, which is the dataflow-based cache that I'm going to be telling you about all throughout. Here I'm using psql like a Postgres client to connect to this connection string. This is the Readyset connection string. Readyset has a Postgres-esque connection string that I can use psql to connect with. Dataset wise, we're going to be working with the IMDb dataset. It's what it sounds like. It's like IMDb data about movie ratings, that sort of thing.

The query that we're going to be working with, the natural language interpretation of it is how many movies filmed after the year 2000 had over a 5-star rating on IMDb. This is how that query looks in actual SQL. You can see there's a count, there's a join, but in the grand scheme of things, it's fairly simple. I ran the query. I'm actually going to turn on timing so we can see how long various operations take. I ran the query with timing on, and we can see that it took 42 milliseconds to run. We're like, it's a little slow. Maybe we should cache it. The way that I'm going to do this is by prepending, CREATE CACHE FROM, and the query string. Remember, we're connected to Readyset. This is telling Readyset we're creating a cache from this query. The cache was created.

Now I'm going to run that same query a bunch of times to essentially warm up the cache and then see what the new latencies are. Ran it a few times. Cache was warm. Now it takes 1 millisecond. Before it took 42, now it took 1. The result is just living in memory. That's pretty neat. Let's now see what happens when the underlying data changes, because I think that's the interesting part. Let's say there is a movie that I really like but the rest of the world hates. Historical Revisionism, I'm going to say this movie deserves a 6.0 rating. Now I'm going to run the same query again, and it's still being cached.

We can see now, before the answer was 2418, and now, because I was like, I actually think this movie deserves a higher rating because it was quite good, so I changed that. I updated that row, and then I reran the same query, and you can see that we have the new value. It's the refresh value that accounts for that. We can also see that it's still cache. It took 0.89 milliseconds, so it's even faster now. I'm not actually making a point about the faster, it's noise. It wasn't a cache hit. It's not like we reran this query from scratch to get the update. It was, in fact, incrementally updated. We took the old value, 2418, and we repaired it to reflect the new data. We added 1. That's why it's still in the cache, and it was fast.

Dataflow-Based Caching: Interface

Now I'm going to essentially spend the rest of this presentation telling you about how that worked, because it might be a little mysterious at the moment. I'll start by talking about the interface. The first thing to know is that Readyset is set up as a read replica. Concretely, there is a local copy of any base tables that are referenced by cache queries on Readyset. If you're caching just one query, and that query references tables a and b, then we need to have a local copy of a and b, specifically in RocksDB on the Readyset instance for this whole thing to work. Notably, we don't need tables, like see through whatever, that you might also have on your database. We only need the ones that are actually used for caching. We're going to snapshot those tables, and then we're going to open up a replication slot on your primary database to get updates to those tables.

As writes are happening, we're getting a sense for how is the data changing over time. We support Postgres and MySQL. If you're using Readyset as a Postgres cache, then this would be done via logical replication. If you're using it as a MySQL cache, row-based replication. Readyset is also wire compatible with Postgres and MySQL. That's how we were able to get away with using psql to connect to it. If you're already using some database like driver, client, ORM, then you should be able to use your Readyset connection string interchangeably with the connections that are actually over a database, that database, without having to change code in any way. Obviously, there's not one-to-one feature parity. It's not like we support every single Postgres feature or anything like that, but you can still connect to it, and you can cache SQL-92-esque operations using Readyset.

The next thing to note is that caches are specified explicitly, so you have to actually tell the cache that you want certain queries to be cached. It's not going to automatically do it for you. The way that you do this is via these DDL equivalents for cache management. We saw this in the demo. I was like, CREATE CACHE FROM query string, and that's how we did it. That's what I'm referring to here. Notably, when we're caching in Readyset we're caching prepared statements. If you have a bunch of queries, and the only difference between those queries is that certain literals are different, then those queries could be parameterized, and they will all be resolved by the same query cache in Readyset. We're not creating new caches for every possible instantiation of structurally similar queries. Rather, we're dealing with prepared statements.

Finally, any queries that you have not cached explicitly, as well as anything that can't be cached, like writes, for example, will just get proxied to your primary database. If you remember, in the demo, we had the Postgres instance and the Readyset instance, and Readyset was connected to Postgres. At first, we didn't cache anything, and so, effectively, we proxied the query that we were working with to Postgres, and it continued to be proxied until we specifically cached it. This is nice, because this gives you fine-grain control over what's cached or not while still using the same connection string. If you have already invested in the infrastructure required to deal with multiple read replicas and separate read-write connections, then you can totally just use your Readyset connection string as another read replica. If you haven't, and you don't want to do that, then you can get away with just using a single connection string, but still have an interface to your database. It's not like you have to cache everything or anything like that.

Life of a Query

Now I will tell you about the system architecture via the life of a query. The way I'll do this is I will first walk you through a bird's eye view of the architecture, and I will fairly quickly step through the various components and what they do, and how they relate to each other. Then, after that, I will slow down, and I will walk you through more slowly, the process for creating a cache and what's actually happening behind the scenes, as well as the process for actually serving traffic from your cache. Bird's eye view. Up top here, we have the application server, which has the business logic, which is presumably using the ORM. The ORM, in turn, is connecting to the database via some DB client, like a driver of sorts. Although the actual logic in your application is unchanged, you have changed the connection string that you have in your application that is connecting to your database previously, to now point to Readyset instead.

Specifically, you'll be connecting to the Readyset adapter component here in purple. The first thing that's going to happen once that query or whatever reaches the adapter, is that it will hit the SQL shim component that is responsible for decoding from the binary protocol of the underlying database that your application is assuming that you're talking to, and converting that query string into an internal representation of that query. Essentially, like a relational algebra type of representation that Readyset can work with internally. This is database agnostic. It's just like the raw SQL-esque type operations. This is like our intermediate representation. Then the SQL shim, after doing that, will pass it along to the Readyset client, and the Readyset client will be essentially responsible for deciding whether that query or whatever should be sent to the database or to Readyset.

From there, it will either use the database connector to proxy to the database and return the result back up the chain, or otherwise, it'll use the Readyset connector to send it to the Readyset server. The Readyset server has a specific component called the reader. The reader is responsible for actually storing all of the cache query results and returning them back to users who asked for them. Behind the scenes, the data in your database is likely constantly changing, and so those updates are being sent to Readyset, specifically to a component in the server called the replicator. The replicator is responsible for updating the local copy of these tables on disk, and then emitting those row updates or whatever, through the dataflow graph component, which is responsible for incrementally updating the old cached values to reflect this new change. Then storing that new value in the reader. It's doing this eagerly.

First, I'll talk about setting up a cache. The first thing that you're going to want to do is swap out the database URL in your application to point to Readyset instead of your database. Here I'm assuming that you're not going to use Readyset as a read replica. Mind you, if you wanted to do this in production, you might want to use Readyset as a read replica instead. If you're just playing around, or you don't have read replica things set up that way, you can just change your connections, point to Readyset. The specific way that you'll do this will, of course, vary depending on the programming language, the framework ORM that you're using. Generally speaking, we give you this wire compatible connection string that you can use as you wish. If Readyset is a cache for a Postgres instance, it'll be a Postgres connection. If it's a cache for MySQL, it'll be a MySQL connection.

From there, you have to decide which queries you want to cache. There are many ways you could do this. To name a few, you could check the database slow query logs. You could maybe check your APM, if you're using Datadog, or New Relic, or something like that, to see which queries could be good candidates. Readyset will show you a bunch of stats about any queries that you run through it. It will show you things like the number of times that query was run, the tail latencies that it saw, the total CPU times. You could also use Readyset's built-in monitoring interface to get some sense for what could be cacheable or a good candidate for caching. You don't have to use that. You can just do whatever you want.

You have to just, at some level, figure out what queries you want to cache. Once you've decided, you have to actually create the caches. You do that via running those CREATE CACHE FROM statements using the SQL extensions that I talked about. That's it. Once you run CREATE CACHE FROM, and then the query string, it's not going to be literally immediate. We do have to actually construct a dataflow graph. We have to sometimes index data in RocksDB. If you have really big tables, that could take a few minutes. Typically, it takes a couple of seconds.

Let's say you've done that, you've waited a couple of minutes, and your queries are ready, now I'm going to talk about how we actually serve traffic. User comes along, requests some data. You know that your app is connected to the Readyset adapter now, so that query is getting sent there. Those queries will be decoded from the binary protocol to this internal representation that I was telling you about. Then that representation will then be passed to the Readyset client. The Readyset client is essentially just pattern matching against the caches that it knows exist on the Readyset server, and it's seeing, is there a match? Remember, we're doing this at the prepared statement level. If the query matches a prepared statement cache, then that means it should indeed go to Readyset.

If not, we assume that it should be proxied, because maybe we've never seen it before. In the case where it's going to Readyset, it'll get sent to the reader node. The reader node will just do a key-value lookup. It'll return the results. If it gets sent to the database, the database will run the query, return the results. Get sent to the user. In the background, the cache is continuously being updated to reflect the latest state of the world, as indicated by your database. We have this replication slot open on your database, and we're receiving these data changes in the replicator component of the Readyset server. That component, again, is updating the local tables that we have in RocksDB, and then it's propagating those changes through the dataflow graph component. The dataflow graph component is responsible for doing this incremental update operation that I keep referencing. Then, once it has this new value, it'll replace the old value in the reader with that new value.

Partially-Stateful Streaming Dataflow

The question of the hour is, how does dataflow work? Because we have this fancy incremental update thing, what are we actually doing? That brings me to partially-stateful streaming dataflow. It's the thing that I was telling you about right at the beginning of my presentation. The way that I'll go about explaining this is first by explaining what streaming dataflow is, and then I will explain the partially-stateful component, just for the sake of completeness. Dataflow computing is actually a fairly overloaded term in computer science, but the version that I'm referring to, you can think of as being stream processing. Concretely, you are representing computations as a directed graph. The nodes of the graph are operators that perform the computation on incoming data. Data is flowing through the edges. In our use case of database caching, it's coming from the replication stream from your database. Those are the changes that are flowing through the graph.

To give you a toy example, to just illustrate the concept, let's forget about database caching for 30 seconds, and let's say that our goal is to compute this basic arithmetic operation. We want to multiply x times y and we want to add z to the results of that. This is what the dataflow graph would look like. We have two nodes of the graph, they're the gray ones. We have the multiplication node and we have the addition node. The multiplication node takes x and y as inputs, so this is the data flowing through the edges, and emits the result. In this case, x times y. It emits the result out of its outgoing edge, which we can see, goes into the multiplication node. The addition node will then add this result, x times y with the z, and emit it. Then that's just the final result. This is how a dataflow graph would look like. This would be long running. It's not just like this ephemeral computation or whatever. This is like a system that's responsible for doing this.

When I think about dataflow computing/stream processing, I like to explain it in contrast to batch processing. Batch processing, how I think of is like normal computing, in a way, where you're ingesting data for some time, and at some point, you're like, you have a dataset, you're going to run some compute over it. Of course, there will be a delay between maybe when some of these data entries were collected, and when you run that computation, which means that the computation, if you're running this periodically, will be stale by some amount of time. Contrast that to stream processing, where, essentially, you're ingesting data continuously, and you're running the computation continuously, like all the time, eagerly.

Every time you get a new data point, you're running some computation to output a new result, and so the response is immediate. Let's bring this back to our domain of database caching, and let's talk about what it would look like to represent queries as a dataflow graph. It looks similar. The only difference now is that, rather than having arithmetic operations, now we have relational algebra operations. We can do SELECTs, PROJECTs, JOINs, like any aggregate, whatever, any of the constituent operations of a SQL query. As input, we're taking data from your database, and data changes to those tables over time. As those writes occur, those row updates will be propagated through the dataflow graph and will be passed into these relational algebra operators.

I'll make this more concrete, and we're going to come back to Hacker News as an example. Imagine that we have two tables. There's the stories table and the votes table. The stories table just has the story ID, the author, title, URL. The votes table is just a mapping of the username to the ID of the story that they voted for. Let's say that we had this query here, which is essentially just returning all the story information and metadata, and joining it with the vote count for that story, and its prepared statement. Notice the question mark in the query. We're taking a story ideas input, and when we pass in that input, we'll return all of the story info, plus vote count for that story. This is how it might look in dataflow. The dataflow part is the purple part. Over here, we have a local copy of the votes table and the stories table, and that's on disk. In memory, we have this dataflow graph, and it has a count node and a join node. The count node is taking input from the votes base table, the join node is taking input from the stories base table, as well as the count node. We have this reader node, which, if you remember, from the architecture part of this, that's responsible for returning the cache query results to the user.

To enliven this a bit. We'll look at how writes and reads work and how things flow through the system. This is the same graph as before, but now we actually have tables instead of this abstract blob representation. Let's say that Justin votes for story with ID 2, that's just an insert into the votes base table. That row insert is going to be propagated first to the count node, which will be responsible for updating the old vote count to reflect the fact that now Justin voted for story 2. Before, the vote count for story 2 was 580, now it's 581. The count node will propagate that new result to the join node. The join node will be like, I will update my old vote count to reflect this new vote count.

Then I will store that in the reader so that when users come around asking for the info for story 2, it will be there and it will be updated. This is happening eagerly in the background. Now let's talk about reads. Here's the query again. Now we are going to pass in a parameter, specifically parameter 2. We're going to compute the query result for story ID 2. All we have to do for that is go to this reader node and do a key-value lookup for ID 2 in the reader node. This is in memory, so it's really fast. We're just doing a little key-value lookup with normal caches.

Next, I'll talk about how efficient this is, as currently posed. Let's talk about cache hits first. In the case of a cache hit, both this approach and a "normal cache" are pretty much doing the same thing. They're doing a key-value lookup inside an in-memory data structure, so it's going to be pretty fast. That's good. No big change between the two. Let's think about cache updates. The data in your database has changed, and you want that to be reflected in the cache, so you have to update it in some way. If you're doing this traditional read-through caching approach that I talked about before, the way that this is probably going to work is you are going to evict the old value in the cache and then recompute it by rerunning that query against your database.

Presumably, if you thought to introduce caching, then either the query was slow, it was complicated, or you didn't want to upsize your database to deal with running that many queries. This could potentially be a slow operation, and it could potentially increase the load on your DB in a way that you don't want. Compare that to dataflow-based caching, where, essentially, we are incrementally updating the old cache result, which means we don't have to do an expensive recompute. We don't have to compute an aggregate starting from scratch. That's nice. It's less compute intensive. Importantly, with dataflow-based caching, we have our own query engine. It's this whole dataflow graph mechanism. We don't actually have to use your database's query engine. We can just figure out the result ourselves, even if it's not currently there. That's pretty cool. You don't run the risk of thundering herds. That's a nice plus. You're not going to accidentally take down your database using this approach.

Now let's talk about memory efficiency. This one's the kicker. With traditional caches, it is common knowledge that you should allocate at least enough space for the working set of your application. If you don't, you're going to experience something called thrashing, where essentially you are continuously swapping in and out frequently accessed cache keys because you haven't allocated enough space. Your cache hit rate is going to be artificially low because of that, because there's something that should be cached, but it can't be because there's not enough space for it. With dataflow-based caches, not only do you have to account for that working set, but you also have a ton of intermediate state in your graph. If we look at this graph again, the reader node, that's the working set, essentially. If you look in the purple, all of that's in memory, and you're like, that's actually a lot to put in memory. I have this join and I have this count. I'm really just worried about the join in this case.

Generally speaking, the intermediate state size will depend a lot, of course, on how big are the tables that you're dealing with to begin with. It will depend on query complexity, because if you have a 10 way join, then suddenly you're going to have 10 join nodes that all have a pretty heavy memory footprint. That wouldn't be good. It's also going to depend on the data distribution, because in some cases, you might be filtering 99% of your data out, right after, as you're pulling it into memory, and that would be ok. If you're not, then suddenly maybe you have an expensive 10 way join or something. Then this could be a pretty pathological memory blowup.

Really, if I come back to the motivation behind this talk, there's this idea of partial state or partial materialization that will specifically enable us to use streaming dataflow in this context for database caching. Without it, it's very easy to run out of memory and have just a ridiculous memory footprint for caching fairly simple queries. It's not worth it. I'm going to explain how this works. The main insight is that the full result set for the queries doesn't need to be cached, because data access, again, we're not picking rows at random from our table to return or users aren't doing that, but rather, there's really some notion of popularity and recency that's probably going to influence it. To bring this back to Hacker News, I don't need to preemptively compute and store the results for any given story ID. We know that there's going to be an exponential drop-off in what stories people are looking at. It would really only make sense to cache the most popularly requested items. That's what we're going to do, and we're going to do it lazily.

Essentially, the main idea of partial materialization is the cache will start off empty, and we're going to fill it on demand, lazily. We're going to do it via a mechanism called an upquery. An upquery, I will show you a pictorial diagram showing you what it looks like. The main idea is like, the cache is starting off empty, it's starting off cold, users will request specific results via specific input parameters, and then will lazily compute the missing results. Here's what it would look like. We're going to do the same read as before, so we're going to be getting the result for this query for story ID 2. Here's what the graph would look like at startup. You can see it's just empty, like those things in gray over there are on disk. The things in purple are in memory. It's empty. A user comes around and requests the story with ID 2, or the result for story ID 2, that's getting sense of the reader as it was before. The reader is going to be like, I don't have it. It's going to recursively traverse up the graph to find the closest parent that has the missing value.

Then, once it finds that parent, it will ask the parent to replay all of the missing data that it needs to be able to compute the result. In this case, the whole graph is empty, so it's going to ask the join node. The join node will be like, I don't know, let me ask my parents. Then, same with the count node. By the time you get to the base tables, which are stored locally on disk, the base tables will necessarily have the data that you're looking for, because it has all of the data. It's like a slightly out of date copy from your primary database that's being replicated continuously. It's like a read replica. The votes base table, in this case, would be like, I have the data for story 2. I'm going to replay all of the rows that had story ID 2 in them. Then the count node will recompute the vote count, given all of those inputs. It will simply count all of the votes, in this case. Then it will send the result to the join node. The join node would have, in parallel, sent off a request to the stories-based table node to get the other story data. Then it will finally place that result in the reader and return it to the user that requested it. This is like a cache miss.

Although this took a little bit more time because we had to do all of that work, we're saving so much on memory. Like before, we were storing the query results for any possible story ID. Now we're just storing it for this one. Coming back to Hacker News, maybe we only want to cache the first three pages, four pages, that will get us 98%, 99% of the way there. We're able to really not have as hefty of a memory footprint, but we're still able to get this nice mechanism where the cache is automatically being maintained for us. It's automatically syncing up with the database without having to do anything, and without having to evict and recompute and artificially lower cache hit rate, any of that.

What happens even with partial materialization and all the space savings we're getting from that? What happens when we start to deal with memory pressure? It will happen eventually, in most cases. Here we actually just handle things the normal way. We'll just start evicting. You can evict via LRU, LFU, and so forth. Notably, let's say, later on, a user requests a value that you evicted, we'll just use the same upquery mechanism to deal with it behind the scenes for you. It'll be a little bit slower that first time you ask for it again, but then it'll be stored in the cache, so the next 100,000 people that request that value, it'll be there for them.

Comparison to Alternatives

That's dataflow and dataflow-based caching. Now I'll compare it to some alternatives. I'll compare it to traditional caching. I've been doing this implicitly all throughout. The way that I think about dataflow-based caching compared to traditional caching is that you get faster updates, but you get a heavier memory footprint. You have to be prepared to spend a little bit more on memory, but you're going to get a really high cache hit rate, and you're going to get fresh data, because we're able to do this incremental update rather than a full computation. At the same time, we're able to avoid a lot of the headaches that typically come with caching. We don't have to rewrite the application because it's wire compatible with the database. We can just swap connection string. We don't have to worry about picking an invalidation policy, because the cache is eagerly kept up to date. The cache will never make things worse, because we have our own query engine, and it's isolated from the database.

Moreover, because of the fact that we're able to incrementally update state, we're able to tolerate comparatively write-heavy workloads. Let's say that you have a lot of writes, your data is constantly changing. You're going to have a really tough time caching that workload, because the data will really quickly go out of date, and the only way to refresh it is by running that same query over again against the database. At that point, you might as well not be using the cache. With this approach, you're able to, essentially, still cache without having to worry about lowering the cache hit rate, because it's always there in memory and it's just being updated behind the scenes. Every once in a while, we're just doing a pointer swap to reflect the latest changes. It also makes it easier to cache complex queries, because typically, the more complicated a query is, the more sophisticated your invalidation policy will end up being. There's a large set of queries that you might have felt a little bit nervous to try to cache before, that's now a little bit friendlier to do with dataflow-based caching.

As compared to query optimization, obviously, with caching, you don't have to know how to optimize queries, so the barrier of entry is a little bit lower. In some cases, query optimization will only get you so far, because the query is just doing a lot of work. Caching can be really helpful if you want to just click a button, and if you're willing to spend the memory on it, then you don't have to make sure your whole team knows how to optimize queries or anything like that. If you're on a platform team, and you have a lot of application teams that you support, then this could help you deal with maybe ORM generated queries and so forth, without having to spend a lot of human time on it, which is nice. As compared to read replicas, this has a similar interface.

In fact, you can set Readyset up as your read replica and interact with it in that way. Assuming that we're just dealing with cacheable queries and queries that we're caching, if we compare the two, then you will get better tail latencies, and therefore be able to support higher load if you're doing caching, because, again, you're just doing this O of 1 lookup each time you're doing a read. You will have to allocate more memory.

Conclusion

Dataflow-based caching has a very similar performance profile on cache hits to conventional caches. It's easier to use. It expands the set of use cases that you could imagine caching to incorporate ones that have comparatively more writes. It's source available on GitHub. If you want to take a look, you can Google it. As I mentioned, it's based on research, and so you can read the research paper. It's called Noria. You can Google that. There's an open-source repo that you can take a look at as well.

Questions and Answers

Participant 1: For your cache invalidation scheme, do you just use LRU, or did you find any value in caring about that? If you need to deploy multiple of your cache proxies, either for query volume or geo distribution reasons, do you have any model for coordination between those or do you just treat them all as entirely independent caches?

Marzoev: It's important to note that cache invalidation and cache eviction are actually two different things for us. Normally, they're conflated. For invalidation, we're just trying to keep the cache state fresh. With eviction, it's really going to depend, like we just use LRU most of the time, and that's fine. That's a parameter that you can toggle, but we haven't. There's no new insight that we give you there. It's just evict based on what is appropriate. We give you a nice mechanism for bringing things back into memory after you've evicted. That's our main contribution there.

Let's say you want to run multiple of these? We do support sharding, but not in the open source, but we haven't productionized it in our cloud product yet. It is theoretically possible. In fact, we've built it out. Indeed, we do think about, could you build a CDN for your database? It'd be cool to put this in different regions and so forth. It's not production ready yet, but you can do it.

Participant 2: It sounds like the partial materialization strategy is helpful for transactional queries where you're just really working on one row, but for analytics type queries that you might want to cache, is it still helpful, or is it going to still have a really high memory footprint?

Marzoev: No, I actually think it's pretty useful for analytics ones as well. It really just depends on what operations you're doing. Actually, counts and aggregates tend to have a lesser memory footprint. I think joins tend to be the really heavyweight one, but we've done a lot of work on optimizing that as well. Although this is typically used for transactional workloads, the more complicated the query is, and the more stuff that you're doing in it, the better comparative latency improvement you'll get from caching. There's no fundamental limitation on that. We see a lot of complex analytical queries as well.

 

See more presentations with transcripts

 

Recorded at:

Dec 11, 2024

BT