Hello. I guess I am Martin. I do a lot of work on large-scale data systems on things like stream processing systems, especially I used to work at LinkedIn, on an open source project called Samza for large scale stream processing. At the moment I am taking a sabbatical to write a book for O’Reilly about data intensive applications.
Yes, exactly. I mean like your typical log4j or syslog type logs are one particular kind of log where there it is just plain text which is for humans to read. The kind of logs that we have been thinking about are just a generalization of that if you say you have a totally ordered sequence of records, that just means it is some kind of data and whenever you want to write to it, you simply append to the end. So, it is like an append-only file, but you can actually use that not just for informational logging, but actually to store all of the data of your application. If you do that, you get some very interesting effects that happen.
3. If you just append – does it make I/O easier or does it make it faster?
That is part of it, yes. The I/O is very efficient if it is append only. But that is the smallest part of the story. Actually, what I find more interesting is the idea that often you want data not just in a single database, but it needs to be copied to several different places. Like, for example, you write data to a database first, but then you also need to update a search index like ElasticSearch or Solr or something like that. That also has to be updated and maybe that are some caches which take the same data and render it into an HTML template and you want to update those too, whenever the data changes or maybe you want to feed the data into a data warehouse through some kind of ETL process or you want to feed it into an analytic system and so on. So, often when some data gets written into some database, it also needs to go to various different places and now there are several different ways you could implement that.
One way people implement it is by simply dual writing in the application code, making one request to the database and another request to the search index and another request to the cache. I think that is really problematic because you get race conditions and you get partial failures and it is very easy for the data to get out of sync that way. So, where we see logs coming in is really – rather than writing to all these different places, you append once to the log and the log defines in which order things have happened. So, anyone who wants to build some kind of view on that data like index it, for example, they just consume the log and if all of the consumers apply the writes in the same order as they appear in the log, then they end up with the same output in the end. It has nice properties like – if a consumer dies for a while, it can come back up again an catch up on the writes it had missed while it was down. So, it is a really great way to build resilient, robust data systems that scale well and that maintain eventual consistency, that don’t go diverging into different states without you realizing that they are diverging.
4. Isn’t it basically how databases are built underneath, the storage engines of databases?
Yes, exactly. That is actually one of the points I was making in the talk – this idea of logs appears in all sorts of different places already and we don’t normally think about it but it is already there, like inside every database storage engine you have a write-ahead log – it is doing exactly the same thing. It is writing to an append-only file the things that it is going to write before it makes its update to the b-tree, for example, or in a log-structured storage engine, you write these segments of log and then merge them together and you get that there, you get it in databases for replication as well, if you have your leader, your primary, your master, which needs to send data to any secondaries, any followers, any slaves. That data has to go from the leader to the follower somehow and that goes through a replication log. Again, it is very much the same idea. But what happens at the moment is this is an implementation detail of databases that we are not supposed to think about because it is like abstracted away beneath the data model and what I am encouraging people to do is to actually think of this kind of replication log, this commit log, as a first class concept that we actually work with at an application level because it is so immensely powerful.
Werner: Basically, to learn from our elders who came up with this system for databases.
Yes, absolutely. I have been going around recently just reading a lot of papers from the ‘70s and the ‘80s and it is amazing what they did back then. So much of the stuff has kind of been forgotten by a lot of the software engineers today but there is a lot we can learn there still.
Werner: We are like the people in the Middle Ages that looked at the arches the Romans built and said “They are nice, but we do not know how to build them again”
Maybe, yes. I don’t know. Maybe we have become better at other things. It is hard to tell.
Werner: I like to think of us in the Middle Ages.
Yes, sometimes it feels a bit like that.
Werner: Yes. Well, it might be the surroundings of this interview.
It could be. It is a very nice place.
Yes, that is sort of the direction that I see things going in the future actually. The stream processing framework that I have worked with – Samza – is very closely associated with Kafka, another open source project, which is basically a message queue, a message broker structured in the form of a log. It means you can use it as a message broker like you would use a JMS system or an AMQP system - quite similar to those. But it means you also get this ordering property of the log.
So, the messages you send to a particular partition are guaranteed to appear in a certain order and they always remain in that order and that is exactly what you want if you are using it for this kind of replication type purpose. So, one of the things I have been doing recently is, for example, just now I launched a new open source project called Bottled Water which connects to Postgres, just any standard Postgres database and it does two things: first of all it takes a consistent snapshot of the entire contents of the database and writes all of that out to Kafka. This already seems a little bizarre – why would you take a database and replicate it to a message queue? – but it actually makes sense if you stay with me. The second thing it does is that it follows the stream of writes that are being written to the database.
It does that actually by parsing the write-ahead log and Postgres provides an API for doing that. So, you get the inserts, the updates and the deletes that are happening to the database table, in the order in which they were committed to the database – again, it is an ordered log of changes in commit order – and all of those get sent to Kafka and it gets encoded in a nice Avro schema. So, what Kafka can now do is that it has this mode you can run it in called log compaction which is perfectly suited for exactly this kind of thing.
The way it works is: what does a message queue do when it runs out of space, when it runs out of memory? One option is it can simply throw away old events that are older than some threshold and that is one mode in which you can run Kafka. The other mode in which you can run it is this log-compacted mode where each message has a key and whenever the message queue sees multiple messages with the same key, the contract is that the message queue has to keep only the most recent and any previous messages with the same key are allowed to be thrown away. So this is very much like a key/value store actually overwriting one key with the later value for the same key. So, they kind of get compacted down. But that way, if a key never gets overwritten and never gets deleted it always stays there and so Kafka actually becomes this durable store that is acting kind of like a key/value store, except that you cannot look up things by key. It does not have that index, but it does store all of the data durably on disk and replicates it across multiple machines until you write to it by simply appending to the end of this log and it will asynchronously go and compact it so that the disk space does not keep growing forever. So the disk space it takes is just the size of your database, the same it would take to store it in a regular database.
Now, the really cool thing is you can just consume this log, you can start from time zero and go through the entire history of messages and you are guaranteed that the latest value for every key is still there and you're guaranteed that writes still appear in the order in which they were applied. So, once you reach the end of the log, once you scanned through the entire thing, you have rebuilt the entire state of a database.
So, if you decide to index your database differently or if you had a catastrophic failure and you really need to rebuild your state or anything like that, you can do that by simply jumping back to the “beginning of the universe” in some sense, which includes a snapshot and all the subsequent changes and just streams through the whole thing. That is just an empowering way of building systems because it gives you so much more flexibility to try a new piece of infrastructure, try a new scheme for indexing things, try a new caching approach, build a new view onto your existing data. I really like that style of building systems.
Werner: So Bottled Water is basically an adapter between those two worlds.
Exactly. It is a kind of way where you can continue writing to a Postgres database and reading from it as you always have been, but if you want to start experimenting with building some kind of downstream views onto to the same data, this is a way of liberating your data from the confines of this one database, getting it out and then also getting that change stream – so not just the snapshot, but also the stream of whenever it changes, and then you can feed that into analytics, data warehouses, caching, indexing, whatever downstream systems you want. It is very similar to some existing systems – LinkedIn built a system called Databus for Oracle, Facebook built something, I believe, called Wormhole for MySQL which does a similar thing, though that is not open source. So, this is building on a long tradition of change capture tools, but I feel that this is something that we can build a bit more awareness for because it is really such a great way of building systems.
Werner: It is an interesting way to get into the log paradigm, I guess.
Yes, totally, because you are not committing to changing your data model or re-writing you entire application or anything. It is just an additional consumer thing that you can tack at the end and then, independently, without affecting the rest of your application, you can start experimenting with new things. And if it works, great – you can start moving more in that log-oriented direction. If it does not work, it doesn’t matter – you just remove that database cluster again and you will not have lost anything. So, it is a very low risk way of trying a new approach.
6. How does this relate to systems like CQRS or Event Sourcing?
There are a lot of similar ideas there. So, I love about Event Sourcing this idea that data is immutable. You structure your data at an application level in these immutable events, but it is a fairly big idea to buy into. So, it is really saying that you are going to use immutable events at your application data model, which I think is great if you can do it. But I think that not everybody is ready to make such a radical change right now.
I think what we are proposing here is kind of a graceful path to migrating towards that kind of style of system where we say it is OK to continue using a database as a mutable data structure while you do your updates and deletes, but every time you do make a change, that fact that the change occurred is actually in itself an immutable event and what a database does when it is replicating from a leader to a follower, for example, is actually exactly that: it writes out that diff, that change as an immutable event that gets appended to the end of a log and what we are doing is just extracting that and making that same thing available to applications.
So, you kind of get a sort of quasi Event Sourcing out of it as a side effect, though I think there is still a lot of benefit of Event Sourcing on top of it, by being explicit in your application data model, on how we are actually going to structure data in this way, because it is strictly more meaningful to have these events captured because you can see the exact history of what changed and when. So, I think that this is kind of the next step in the evolution.
7. You mentioned Samza. How does Samza fit into everything we talked about here?
Samza is a stream processing framework. So, the idea is that once you got your data into something like Kafka, then you will want to consume it. You will want to do interesting things with it, you might want to summarize it for analytics, you might want to join several streams together and Samza is a framework which lets you write code to do those sorts of things. So, it is a fairly low level approach. It is a bit like the old Hadoop MapReduce API in a way, but it is optimized for these continuous streams of low-latency events and what you can do with it – it simply gives you an API which processes one message at a time.
So, whenever a message comes in from a source system, it gives it to your code and you can decide what to do with it, like perhaps update a database, or perhaps emit it to a downstream, some other stream, perhaps wait and buffer it for a while and see whether another matching event comes in within some time window. So, it allows you to write all of those things and .what Samza gives you is really the distributing that computation across a cluster of machines using Yarn – so it uses the Hadoop cluster manager for that. It has some nice fault tolerance mechanisms and some state handling mechanisms built in and it gives you a deployment mechanism – those sort of things.
Samza is the other major thing that I have been working on. It is a stream processing project. There are various other things I have contributed to, like Avro, for example - the serialization format and I made minor contributions to a whole bunch of things, whatever happens to come my way.
Werner: But you have a big project underway right now which is the book.
Yes. It is called “Designing Data Intensive Applications”. It is currently in early release with O’Reilly. We have so far released 7 chapters which you can go and download now and hopefully the final version should be done later this year. That is proving to be a huge amount of work, but it is very satisfying as well. It is basically my reason for going out and learning lots of other different types of data systems.
So, what I am trying to do in this book is really do a broad summary of all the algorithms, the architectures and the tradeoffs that you get in data systems, at different levels. So, it is not about any particular one tool, like many computing books are. Instead it groups things by topic areas: there is one chapter on data models, one on storage engines, one on replication, one on partitioning, one on transactions, one on data encoding and compatibility and the upcoming ones will be on consensus and batch processing and stream processing and all those different things. And then within each area, we look at the major different options for doing something.
Often, there are two or three or four main choices you can make within an area and there are different tools which implement it this way, some tools implement it another way and then you can see the tradeoffs – some ways of doing things are good for write-heavy workloads, some are good for read-heavy workloads, or some are good if you, I don’t know, need synchronous consistency, strong consistency and can tolerate low availability, some optimize more for availability at the cost of consistency and so on. So, really trying to understand exactly what the pros and cons of these different choices are, not just praising the great sides, but also seeing exactly what you give up when you make certain decisions.
Werner: That sounds like a book I want.
Well, hopefully.
Werner: Get writing.
I am keeping going with it.
With that one, that chapter I am still working on how to structure it exactly. So there are popular consensus algorithms like Paxos, like ZAB in ZooKeeper, like Raft, example. A lot of it is actually just understanding what is the problem of consensus exactly and a lot of what I have been doing has been going off and looking at some old academic research and new academic research and putting it in the context of systems that are used in practice. People quote the CAP theorem a lot, for example, which is probably something most overcited and often misunderstood things ever.
So, what I am trying to do with this is to really explain what is at the core of these ideas. What does it mean and what does it not mean, stop over-interpreting all sorts of things into the theorem and understand exactly what it is and then show how that affects the design decisions that the different systems make. So, it is often just trying to go through that reasoning process of not just how does something work, but why does it work that way and what impact does that have on applications, what impact does that have on which tools you chooe to solve what kind of problem, for example.
Werner: Well, that is definitely interesting because everybody is talking about CAP. All the cool kids are talking about it. So, is CAP relevant or not? I guess it depends on what you do.
Yes, totally. I mean it is a thing, but it is much more specific than what people think. So, CAP describes a very particular kind of system. It describes a replicated register which means there is exactly one key and value, one value really, which is spread across multiple different nodes and it has very specific notions of what consistency means and a very specific idea of what availability means. If, for example you can have a system which is “highly available” in the way people mostly use the term “highly available”, but is not actually available in a CAP sense. You can have a system which is consistent in some sense, well, the CAP meaning of consistency is linearizability which is a very different meaning of consistency compared to ACID consistency, for example. That is a totally different meaning of the same word. So, disentangling those things and just adding a bit of clarity about it is important because it is so often misunderstood. The CAP theorem is important and it is true, of course, but it actually only applies to a fairly narrow sliver of systems and people have used it to justify all sorts of silly design decisions or sometimes good design decisions, but which are simply irrelevant to the CAP theorem. I think it is one of the things that people need to stop talking about, basically, because more often than not, it is simply not relevant.
There are thousands of good papers, of course and that is hard to answer. One area that I find particularly intriguing which we have not discussed today in detail so far is called CRDTs. It stands for commutative and/or conflict-free replicated data types. What that is basically saying is that you can build data structures which you can modify and in fact, which several different people can modify at the same time without having to have any global locks or consensus algorithm. So, they can be independently modified and then when they come back together, they get merged and it defines clear semantics how that merge is going to work.
There is a very nice survey paper from Mark Shapiro and others, which is a survey of a whole bunch of different data types and CRDTs and how to implement them. It is very readable as well, as academic papers go. I think it is actually going to be tremendously important for the applications we build in future, as we go forward because you need applications to work offline, for example. You need applications that run on mobile devices with bad network connectivity. It is simply unrealistic to assume that everyone is going to be connected to the internet every single moment, but you do want people to continue editing data for example, anytime, whether they are online or offline and you want people to edit something concurrently - think of Google Docs for example - for that to be simply merged and so any changes to be resynchronized when people come back online.
At the moment, if you wanted to build Google Docs from scratch, it can be really hard because the algorithm that is based on, operational transform, is pretty hairy. It is possible for mortals to implement it, but it is still very difficult and the promise I see with CRDTs actually, is in providing a nice application framework where you can just work with data and not worry whether you are online or offline and just have it transparently synchronized in the background and have that be nice to work with from an application developer point of view. That actually plays a lot again into logs and immutable events and Event Sourcing because you could see it as Event Sourcing taken to the extreme where every single modification to a data structure, like an insertion of an item into a list, is actually represented as an immutable event in a CRDT and you can append it at the end of a log. So, it all beautifully links back together again and I could well see that in five years’ time it will all be building software built on some kind of CRDT based framework. I think it is going to be really important in the future.
Werner: You have brought us full circle. We’ll link to your website and your book. Thank you, Martin.