BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Presentations The Journey to a Million Ops / Sec / Node in Venice

The Journey to a Million Ops / Sec / Node in Venice

Bookmarks
49:14

Summary

Alex Dubrouski and Gaojie Liu discuss some of the tricks used in their pursuit to lower read latency and to reach 1M operations per second per node.

Bio

Alex Dubrouski is Technical Lead of Server Performance Team @LinkedIn. Gaojie Liu is Senior Staff Software Engineer @LinkedIn, Open Source Contributor @Venice.

About the conference

Software is changing the world. QCon empowers software development by facilitating the spread of knowledge and innovation in the developer community. A practitioner-driven conference, QCon is designed for technical team leads, architects, engineering directors, and project managers who influence innovation in their teams.

Transcript

Liu: I'm Gaojie Liu. I'm a software engineer at LinkedIn. Alex and I are going to describe the journey about how to achieve 1 million operations per second per node in Venice. Today's talk is going to focus on the high-level optimization strategy. We're not going to show any code in the presentation. This picture, we borrowed from Wikipedia. It is a picture of a naval battle in Mediterranean, which happened a long time ago. Contrary to popular belief, Henry Ford did not invent mass production. The city state of Venice was doing that centuries earlier, by continuously improving their battleship manufacturing process, Venice city was able to fight large scale naval battle, which made it a superpower in Mediterranean for many centuries. Fast forward to today, my team and I developed a database called Venice. We are constantly improving the Venice performance to serve large scale workload. I will introduce Venice briefly, and then talk about Venice architecture and its evolution in the past several years. Then I will hand it over to Alex to talk about lower-level optimization in Venice, and the conclusion.

The project named Venice is named after the Venice city in Italy. Venice city is built on top of 180 Islands, which are connected by canals. In the Venice project, we also build Venice style called Venice data stream, and the data item carried by Venice data stream will be presented in Venice Island, which we call Venice stores. Here is the brief story about Venice development. In 2014, to fulfill the ever-increasing requirement from AI customers to keep data more fresh, Venice project got started. At the end of 2016, we [inaudible 00:02:20] the production use case. In 2018, to fulfill the GDPR requirement, we migrated over 500 stores from Voldemort to Venice. Voldemort was another open source key-value store by LinkedIn. In 2020, we open sourced Venice in GitHub. Until now, the Venice setup in LinkedIn is serving over 2000 stores in production.

As we mentioned, Venice is open source. Venice supports very high throughput. The Venice setup in LinkedIn is serving over 100 million key lookups per second. In the meantime, it is ingesting over 1.2 petabytes of data daily. Venice offers multiple low latency with the client. Venice currently is high availability. By default, it's using replication factor 3, which can tolerate any random hardware failure, in some high read throughput cluster, it is using an even higher replication factor, such as 6, 8, or 12 to scale the read capacity. Venice is an eventual consistent system. That means it does not offer the traditional read or even write semantics. It does offer partial updates as an alternative. Venice supports large datasets, and it is using sharding to distribute the load into a group of nodes. As a key-value storage system, besides the single get, Batch Get API, Venice supports some more advanced API, which we will describe in a later slide.

From our view, there are two kinds of data types, primary data and derived data. Primary data is usually the outcome of users' activity. LinkedIn profile is a good example. A LinkedIn user can edit their profile directly. Those primary data is mainly served out of a SQL database or document store. Derived data is computed out of primary data. People you may know is a notable example. The LinkedIn user can use people you may know to pass their social graph, but LinkedIn user cannot edit those recommendations directly. Venice as a derived data platform is mainly to store the machine learning training output. As I mentioned, PYMK is heavily using Venice, and LinkedIn feed, LinkedIn Ads are also using Venice to store the training output, such as embeddings.

Venice Architecture

Now I'd like to describe the high-level architecture. You can see, this is a pretty complex diagram and we are going to break it down into five sections to describe each component in more detail. First the cluster management. Behind the scenes, Venice is using Apache Helix for cluster management. Apache Helix is an open source cluster management framework, will automate the partition assignments for the partitioned, replicated, and the distributed resources you group on node. It also automates the partition reassignment in the presence of node failure, recovery, cluster expansion, and reconfiguration. Reconfiguration, typically referring to, let's say, if we want to change the replication factor. In the Apache framework, there are three major routes, controller is to manage the lifecycle of resources. Participant is used to handle the partition assignment request. Spectator normally actively monitors the partition assignment mapping and keep in-memory for routing purpose. In the Venice project, there are correspondent for each row. This is the Venice ingestion architecture. As we mentioned, Venice guarantees eventual consistency, so all the writes will be produced to a pub-sub messaging service system. Inside LinkedIn we are using Kafka. The available writers are Grid and Samza. Grid is used to bulk load a new dataset, and Samza is used to write a real-time job to produce the latest update. Venice server is constantly polling the Kafka topic and process the latest message locally. Venice server is using RocksDB, which is one of the most popular embedded key-value stores. Venice offers multiple read client, and they are accessible for different use cases, and they offer different latency guarantees. We will dig deeper in a later section.

For the admin operation, Venice is using two-layer architecture. Only admin operations such as store creation, deletion, or schema evolution, they will be forwarded to the parent controller first, and the parent controller will asynchronously replicate those metadata to the child controller in each region. With this two-layer architecture, we can achieve eventual consistency, also it offers a better resilience because it can tolerate any shorter period of routine failure, the failed router can recover on its own automatically. Lastly, Venice supports multiple regions. As you can see in the diagram, the running server in each region is talking to both the local Kafka cluster and the remote Kafka cluster. It is using active-active replication to achieve eventual consistency. Even Venice server in different region could consume those Kafka topics at a different speed, they are still able to achieve the data convergence, since Venice server is using a timestamp-based conflict resolution strategy.

Venice Evolution

Now it's time to inquire the architecture evolution in the Venice write path. Firstly, we want to share the Venice data model. Venice supports a lambda architecture, which mainly supports both batch processing from Grid, and real-time processing from Samza. Samza is the framework we are using inside LinkedIn. Typically, there are two ways to work with two different datasets. Read path merge are merged in the write path. Venice chose to merge the data in the write path because of the following reasons. Write path merge offer better read latency, since read path merge, the application needs to talk to two different systems, the latency will be bounded by the slowest path. Also, the write path merge offers better read resilience, since the application only needs to talk to one single system. The application logic also got greatly simplified because this merge is handled in the platform.

Let's do a quick recap. As we mentioned, all the data updates will be put into the Kafka first, and when it's served, it will come to the consume Kafka topic and process them in a local replicated database. This is the initial release out of Venice, and each Venice server will allocate a dedicated pipeline for each hosted store. Dedicated pipeline means Venice server will allocate a dedicated consumer which will constantly be polling the corresponding Kafka topic, and in the same pipeline, it will constantly process those consumer message and process them locally. This simple and straightforward strategy allows faster development, which help us to roll out Venice production quickly. Apparently, there's inefficient issue here. Imagine if a Venice server intends to only host a few number of stores, distributed resources will not be fully utilized. To overcome this Inefficiency issue later, we introduce Venice Drainer Pool strategy. Each drainer is maintaining an in-memory queue. The ingestion path will constantly push the consumer message to the greener queue. Each greener side will constantly pull the message from the greener queue and processes them in the local RocksDB database. With this strategy, you would write few number of stores, distributed resources can fully utilize. Also, is offered to control the total amount of resource we allocate for data processing.

Venice server is using a hashing-based partition assignment to guarantee decrease of load, to make sure each greener side is processing enough message all the time. We have observed for each hosted store, Venice server still allocates a dedicated consumer. Each consumer is carrying a fixed amount of overhead. This overhead will be amplified if the total number of stores hosted by each link is increased significantly. To address this problem, the topic will share the consumer service. With this shared consumer service, the total amount of consumer will be fixed. Each ingestion task will be assigned to a shared consumer. With this strategy, we're able to achieve a better GC, and in return it also offers a better ingestion throughput. Recently, for some high write throughput use case, we observed that the Kafka consumer was the actual bottleneck. Potentially, one ingestion task can only leverage one consumer. To improve that situation, we introduced partition-wise shared consumer service. With that strategy, one ingestion task can leverage multiple consumers. We observed a huge ingestion throughput increase for those high write throughput use cases. For the shared consumer service, Venice server is using least loaded strategy to achieve even distribution of all the consumer services. Right now, we are running this mode in production.

Now it's time to talk about the Venice read path. Let's do a quick recap. In the following several slides we are going to focus on this component in the gray rectangle. You can see we have three offerings out of read client, Thin Client, Fast Client, and Da Vinci, and they are suitable for different use cases. The first offering is Venice Thin Client. It offers single-digit milliseconds for single key lookup at p99. It is using a three-layer architecture. The transmission protocol is HTTP. The web protocol is ever a binary. Thin Client is in charge of serialization/deserialization. It will serialize the upcoming request from application and forward the binary request to the router, in the meantime it also deserializes the binary response into Java objects for the application to consume. Venice router is actually monitoring the partition assignment from ZooKeeper and caching locally for routing purpose. Venice router is using least loaded routing strategy to cool down the unhealthy or busy replica, in the meantime it's also using long tail retry strategy to achieve good long tail latency, in the presence of a bad node or busy node. Venice server is quite straightforward, it will pass the incoming request, do the local route key lookup, return the binary response back. With this three-layer architecture, the complex components, Venice server, Venice router are still under control of Venice platform, it makes it easy to rule out any routing optimization.

In 2020, and to match the performance of Couchbase, Venice introduced Venice Fast Client. It using two-layer architecture. It can achieve below 2 milliseconds for single key lookup at p99. Essentially, the Fast Client will talk to a metadata endpoint, is pulled by the Venice server to client with this routing information, and is leveraging a similar routing strategy and retry strategy in the Venice router to deliver the similar resilience guarantee. By removing Venice router from the hot path, we achieve a much better latency, also it saves a lot of hardware. Lastly, there's another offering called Da Vinci. It is suitable for the small use cases, whose dataset can be fully accommodated in a single node. It offers two modes. If all your dataset can be fully kept in-memory, and Da Vinci typically uses a memory mode, it is super-fast. It can deliver below 10 microseconds at p99 for a single key lookup. Even with the memory mode, there's still an on-disk snapshot for faster bootstrap purpose. If your dataset could not be fully accommodated in-memory, you can choose to use the disk mode, and there will be an LRU cache in front of the on-disk data. It's used to cache the index and the frequent entries. Da Vinci is not a read through cache, it is not a EGOCache, it will constantly pull the Kafka topic, apply the latest updates locally.

Besides the architecture evolution, we also made some innovation in the API layer, more AI customer they would like to score and rank a large amount of candidates to improve the relevance of the recommendation. By using the naïve Batch Get, Venice was not able to meet their latency requirement. By Venice read compute, we were able to push some of the compute operation to the Venice server layer. One compute request will be executed in parallel because of scatter-gathering, and also the read path side got reduced significantly because Venice server only needs to return the compute results back. Currently, Venice compute is using a declarative DSL. It supports projection. It supports several vector arithmetic such as dot product, cosine similarity, and Hadamard product.

Improving the performance of this high read AI use case, we blow a scalability limit with the previous partition assignment strategy. In the past, when we double set the cluster to scale out the cluster to fulfill more read QPS, we observed that the read capacity stayed the same, even with the double size of hardware. We dug deeper, and we realized that the root cause is the file size for each request will increase proportionally to the cluster size, this is a really bad behavior. To mitigate that problem, we introduced a logical group based partition assignment strategy. We divided the whole cluster into several equal sized logical group. Each logical group will keep the full replication. It means each logical group can serve any request. When we try to scale out the cluster, still become much simpler, we just add a more logical group, but keep the file sizes same as before. Venice, we have been using this strategy in production for several years. It has proven to be a horizontally scalable solution for the large AI use case.

Lastly, Venice also adopts several mature transporting layer optimizations. First, we switched the JDK SSL, from the JDK SSL to the OpenSSL. To become a native implementation of OpenSSL, the GC overhead got reduced quite a bit. Because of more efficient implementation in OpenSSL, we also observed a good latency reduction, end-to-end. Venice supports streaming, and with the streaming API, the application will be able to process the read paths as early as possible. We were able to reduce the latency end-the-end by 15%. LinkedIn will constantly do the failover type in the path, we always observe a connection storm issue at the startup of the load test. We adopted HTTP 2.0 to address that problem. With all the optimization we presented in this tech talk, we achieved 1 million operations per second per node in Venice, with an acceptable latency. These 1 million operations, it makes up 620 key lookup and 680 key message write. We execute this workload by the Venice read compute traffic, which is high based workload we have running in production. The pmap was executed in a node with 32 CPU cores, and 256 gigabytes RAM.

Common Optimizations and How They Affect Performance

Dubrouski: My name is Alex. I'm one of the technical leads of Server Performance Group at LinkedIn. We did a lot of different optimizations to the code, to configuration of Venice, but I would like to share the set which might be applicable to pretty much any applications you have. I would like to start with misconceptions. You probably all know this famous statement that premature optimization is the root of all evil. Who actually knows the authorship of this sentence? It was published in the Donald Knuth book, but when he was asked about this specific quote, he said he probably heard it from Sir Tony Hoare. Sir Tony was not able to recall that. He said like, I probably heard it from Edsger Dijkstra. Initially, this statement, this quote, was about counting CPU cycles in your assembly application before even finishing the algorithm. Now, it's quite often used to push back any possible performance optimizations till it's too late. I was lucky to be tasked to help Venice team literally on my first week after joining LinkedIn, and since then we've been working together for many years. We utilize an approach which I call continuous optimization. We continuously monitor our applications, and we have automated alerts. We continuously profile the applications, we do regular assessment of the profiling data. If we find any bottlenecks, we document them and we suggest solutions. Usually, we benchmark those solutions using synthetic benchmarks mostly in the form of JMH. We inspect the results down to the assembly code. If we see and we can explain the improvement, we test it in staging environments. Then we'll roll it out to production and start over from step one.

To continue the topic about misconceptions, quite often, JDK version upgrades are considered a liability, a burden. Quite often, large organizations just keep the JDK version upgrades until it's way too late. At LinkedIn we realized that JDK upgrade is one of the cheapest ways to improve the application performance. Venice was always on the edge of JDK version upgrades. It was one of the first applications migrated to Java 11. It's pretty much the first application fully migrated to Java 17. With Java 11 upgrade, the latency improvement was in the single-digit percentage points, and the stop-the-world time was in the double-digit percentage points. With Java 17 upgrade, we experienced pretty much the same improvement. I think that all this misconception comes from the point that regular Java developers see and are aware of the changes in the JDK API, new features coming out in the new version of the JDK, but they are less frequently aware of the changes to the JVM itself, the just-in-time compiler, to the garbage collection logging, and so on. To give you a few examples, for example, ThreadLocal handshakes, which was introduced in Java 9, or concurrent stack walking, which was implemented by Erik Österlund in Java 16. Now it's used by ZGC to significantly improve the pauses.

Reality is that JDK version migrations could be easy. If you keep all of your open source libraries upgraded, especially if you have any automation around it, the migration could be down to just changing the configuration file, swapping runtime, and it just works. Another confusion, quite often, engineers think that any libraries or application, which are implemented in statically compiled languages are not tunable. Yes, of course, you cannot dynamically change the code, you don't have a just-in-time compiler. Still, the default is not always the best. In one of the regular rounds of performance profiling assessment, we found that seek and get operations in our JNI library, storage library, RocksDB, actually use up to 30% of active CPU cycles. Further research showed that we're using default configuration of the storage which is block cache. Those seek operations which actually try to find the block where the data is and then find the data in that block is just the plain overhead. Switching to plain table allowed us to reduce the CPU usage and reduce the latency by 25%. This is server-side compute latency. The interesting part is, actually a RocksDB developer called plain table, not so plain, because there are a lot of interesting optimizations behind the scenes on how it works. The only caveat in this case, you need to make sure that the whole dataset fits into memory. In this case, you can actually have quite a lot of improvements in terms of performance.

This optimization actually started as a suspected native memory leak. Quite often, developers think about JVM. We know that JVM allocates the heap during the bootstrap using malloc. Some of the portions of the JVM let's say like direct byte buffers, which use native memory, they're actually memory mapped. Java can increase and sometimes decrease the size of the heap, but there are no obvious sources of the memory fragmentation? No. I spend quite a lot of time trying to filter out possible root causes of native memory leak, and the only possible reason is the RocksDB library. Our SREs spent weeks building very interesting scripts, they actually published an amazing blog post about this, how to parse the pmap output and attribute the chunks of the memory in the pmap output to certain specific business code in your application. In the end of this research, we found that, we don't have any native memory leaks, we just have terrible memory fragmentation, probably mostly related to RocksDB. Switching from default Linux implementation of malloc, which is glibc, to BSD's version, jemalloc, helped us to significantly reduce the resident size of the JVM process, for up to 30%.

On top of that, Venice is one of the very few applications of LinkedIn which uses explicit TLB huge pages. TLB is a Translation Lookaside Buffer, a special portion of the CPU which stores the results of the translation of a virtual memory into physical memory, and it has limited capacity. If you experience the TLB cache miss, it significantly increases the cost of accessing the memory. Huge pages basically significantly improve the TLB cache hit rate, allowing you to access memory faster. In high throughput applications, it's a very important point. Also, it's basically increasing the addressable space cached in the TLB. What's interesting, we also noticed that after migrating to Java 11, we started seeing a lot of different applications having memory fragmentation issues. I discussed this with the JVM developers, with Erik Österlund in particular, and he provided quite interesting source of that. He said that one of the major sources of memory fragmentation in Java 11 applications is actually JIT compiler thread pool. It used to be static in Java 8, but in recent versions of applications of JVM, it's dynamic. It does have recycling. If you have a lot of JIT compilation happening in your application, those thread stacks for the JIT compiler, they are allocated in native memory using malloc. This basically causes quite a lot of memory fragmentation. Switching from glibc to jemalloc might help quite a lot.

In terms of the code optimizations, I think the biggest part of it is fastavro. Venice by default uses Avro serialization format. It's a pretty popular format. Kafka uses Avro by default. Especially in the early versions of Avro, serialization and deserialization was not very optimized because it's generic, and it works in all possible cases. As you might expect, generic solutions are not always the best. Originally, the fastavro library was developed by a company called RTBHouse. They open sourced it. We forked it, but at some point, we completely took over the development of this library. This library allows us to generate at runtime, very custom, specific to each schema, serializers and deserializers. For example, deserialization speed can improve by 90%. Yes, it can be 10 times faster comparing to the newer Avro. One of the very interesting optimizations started with the complaint from one of the client teams. They came to us saying that, we have quite a significant regression in latency accessing Venice. When we checked the continuous profiling data, we found this. This is a portion of a Flame Graph. Flame Graph is the way to visualize the performance profiling data. If you want to know a little bit more, you can visit Brendan Gregg's website. He's the author. The highlighted element is I2C/C2I adapter. This is a very interesting thing. JIT compiler inserts those adapters when it needs to bridge interpreted and compiled code. What it actually means is that on the very hot path, we have the code executing in interpreted mode. That was quite a surprise.

When we started investigating, JIT compilation unit is actually method. JIT compiler has a hard threshold on the size of the bytecode of the method in byte, which is 8000 bytes. If the bytecode is more than 8000 bytes, even if it's on a very hot path, JIT compiler will never compile it. It will continue running in interpreted mode. We had to optimize the code generators to make sure that the method never goes above this threshold. Of course, you might say that it might actually introduce quite a lot of regression. No, we did a very thorough benchmarking in production to make sure that this change doesn't actually affect performance. We were not able to detect any regressions. There is way more, primitive collections. This is one of those topics where premature optimization could actually be the root of all evil. There is no unison among performance experts on this topic. There are two points. The first, you need to prove that object collections are actually the bottleneck of the performance. That your application spends a lot of CPU cycles and the allocation of the object collection is the memory allocation hotspot, first thing. Second thing, if you're trying to swap object collections with primitive collections, you need to make sure that you do it across the entire business logic. That you don't have those transformations, object to primitive, primitive to object, and so on. We did an extensive benchmarking, and we found that, yes indeed, in our case, it is a bottleneck. In most of the cases, we use [inaudible 00:31:16] framework. For some of the APIs, like read compute, we had to develop very custom, specific primitive collections. It allows us to do partial deserialization.

In most of the cases, those read compute APIs are designed for the AI workloads. In AI workloads, the data we store is basically some metadata plus the AI vectors. We might not need to deserialize additional fields, we might just need the AI vector, so partial deserialization can even more improve the deserialization speed. On top of that, read compute might use the result multiple times. Caching allows us to avoid additional deserializations. On top of that, we can reuse those collections to reduce overall memory allocation rate and CPU overhead. Last but not least, recently, during one of the rounds of benchmarking, we found that the low-level transformation from serialized format, which is byte array into the real format, which is float array for the AI vectors, we spend a lot of active CPU cycles there, and we spend a lot of wall-clock time there. The optimization was to swap initial implementation used by buffers to do that low-level conversion. We replaced it with VarHandles. It's much more effective and much more JIT friendly. To show you the actual result, for example, this is reduction in memory allocation. This is real production graph from real production system, no benchmarking, no custom cases, it is just plain from production. There's more than 50% reduction in memory allocation rate, and more than 30% reduction in latency. The same JVM, just the different modes, the old library, which is strictly based on byte buffers versus the new library, which allows us to use VarHandles. We had to implement multi-release JAR because we still have to support Java 8 runtimes.

Observability

In the end, I would like to talk about observability. Quite often, especially developers working on large enterprise applications, they take observability for granted. Yes, we have to log a lot of different lines. Yes, we have to collect those thousands of metrics. It could be fine. As soon as you try to develop very high throughput and very low latency applications, you might easily realize that observability pillars become the bottlenecks for your application. During the regular review, we found that simply starting and stopping the stopwatch takes 5% of the active CPU cycles. Yes, the system gets given current time newly, function takes 5% of the active CPU cycles. There is more to that. This is just the tip of the iceberg. You need to store the data somewhere. You collect those thousands of metrics, you need to store. Yes, you can try to reduce the memory overhead by using primitive types, but if you have high throughput, you have multiple threads. Now you need to guard the changes to those metrics. Now you have synchronization. Synchronization becomes lock contention quite a lot of it. To reduce the lock contention, for example, we swapped the object synchronization with ReentrantLock. I also benchmarked the StampedLock. In our case, in our workloads, ReentrantLock works much better. It allows us to reduce the lock contention by 30%.

We can reduce the lock contention even further, but in this case, we will have to rely on objects like LongAdder. This is a part of java.util.concurrent.atomic package. The problem with it, yes, it allows to significantly reduce the lock contention, but it's heavily padded to avoid the full sharing. Full sharing is a very interesting phenomenon. Nowadays, CPU load the data in small chunks called cache lines, and imagine that inside the same chunk, inside the same cache line, you have two different variables. Two different threads running in two different CPU cores, load the same cache line. One of the threads changes one valid, at this point the cache coherence protocol, MESI, it has to invalidate the copy of the same cache line on a different core. Even though that thread didn't need the change to that field, it was going to change completely and work with completely different field. In this case, you have this invalidation of the cache, and you have to go back to memory and read it again. To avoid this, LongAdder is heavily padded. It uses stripe, and between each element of the stripe, it puts at least a cache line to make sure that it avoids the full sharing. This creates a gigantic memory overhead. You might ask, ok, what is the solution in this case? The solution is to treat logging and metrics as a possible technical debt. If you have any guesses that this logline is not needed, or maybe not very relevant, if you don't use this metric, just delete it. We started treating this as a technical debt. All the metrics which are not in use are immediately deleted, all the loglines immediately deleted. This is the only way to reduce the overhead of observability pillars in the high throughput, low latency applications.

Conclusion

In conclusion, I would like to get back to this topic of misconceptions. Quite often, especially in the enterprise world, there's that bias that during the lifetime of the application, it becomes bigger. It does have new and more features. It becomes bigger, it requires more memory, it becomes slower, and so on. Utilizing this continuous optimization approach over the course of the last 4 years, we were able to improve the throughput 10 times. We went up from 100,000 operations per second to 1 million. The latency improved five to seven times, from low double-digit milliseconds to low single-digit milliseconds. By continuously improving performance, by continuously caring about performance, we are trying to basically negate this bias in this trend. Our application gets more features, but it gets better and fast.

Questions and Answers

Participant: You sized the overhead of serialization quite a bit. I'm wondering if you actually checked Apache Arrow as a mechanism by which you have serialization all together, particularly with the integration [inaudible 00:38:07]?

Dubrouski: Quite often, we're locked to the LinkedIn infrastructure in all the protocols we use. Especially because this system mostly works with Kafka, and Kafka by default uses Avro, so we have to rely on that to avoid all the overhead of maintaining different formats and transformations and so on. I'm afraid that we didn't check that.

Participant: How many engineers are on this project?

Liu: We have 18 engineers on the team.

Participant: Do you run that business low-level JVM and null optimizations around that? How do you train those engineers?

Dubrouski: In terms of design, in terms of architecture, it's mostly the Venice team. We have separate performance teams at LinkedIn. I'm one of the technical leads. Usually, we work in collaborations. They're responsible for the architecture, I'm responsible for helping them with performance optimization suggestion, benchmarking, verifying the results and stuff like this. We have distinct responsibilities.

Participant: How many customers are there of Venice?

Liu: Inside LinkedIn, we have over 2000 stores in production, and outside of LinkedIn we have one known customer, DataStax. We also contribute the Pulsar support. Besides Kafka, Venice also supports Pulsar.

Participant: What you saw was [inaudible 00:40:09]. You have the design awareness to meet the derived data, the people you may know. It was a product statement that we were trying to solve. That's why they could achieve the eventual consistency and all those things that are already performant by design. Then the dependency on the JVM, that's where Alex touched, with all the JVM, and then the upgrade path to Java 17.

Participant: Assuming that most of these applications with a lot of numbers, how do we deal with immutable primitives? Can we use immutable primitives? Message parsing, how do we share this data without having to even worry about serialization?

Dubrouski: The data is technically pretty much immutable, because the clients only read it. Ingestion is pretty much offline or batch jobs, which work independently and they ingest new versions of the data. As soon as it ingests it, basically until the next ingestion, it's immutable. Serialization you use just the compacted data. Because it's easy to transfer over the wire, and it's easy to store because it just takes less space, that's why the serialization.

Participant: You parse the data into nodes. When you deserialize, do you create objects or do you create primitives? When you create these objects of primitives, often you make sure that when you're parsing those numbers around within that same node, that you're not recreating these objects [inaudible 00:42:06]?

Dubrouski: This is what my part was about. We're trying to make sure that we're constantly profiling, and when we see those types of hotspots, like let's say, memory allocation hotspots, we try to reduce that. There is still some object creations happening. You cannot just deserialize into just a set of primitives. If it is a schema, because fastavro does have schemas. Schemas, you will still have some kind of a wrapper here and there. You still have some overhead. We're trying to minimize as possible this overhead. One of the constant fights is to reduce the memory allocation rate. Because you see, just try to imagine during this benchmark, we showed with 1 million QPS, at the network interface card level, we had transmission speed of 1.2 gigabytes per second, 10 gigabit of traffic to a single node. This is on the wire level. Now imagine what is happening in the JVM, the allocation rate. One of the ways to improve the throughput and reduce the latency is actually fight as much as we can for reducing the allocation rates in the JVM, reuse the objects, cache the results, use the primitives where possible, but sometimes it just could be impossible.

Liu: Most of the Venice use case, it is already bounded. The binary Avro, it is mostly compacted format, among all other formats. The deserialization, normally, for most of the users, they only deserialize in the client, which means the Venice backend only returns the binary back, we don't deserialize in the backend, we just return the binary back. The application will handle the deserialization using the Venice client.

Dubrouski: It's mostly about read compute. Because, to optimize the performance, we move the computation to the server side. When you have to do computation, server side has to do deserialization. Even there, we constantly fight for reducing memory allocation rates.

Participant: Did you say you use JMH, the Java Microbenchmark Harness for synthetic benchmarking. You also do benchmarking in production when you're profiling, all the gains that you do for these incremental changes, are they the same for the ratio between the synthetic vs. the real benchmarks. How much value did you see in the JMH benchmark?

Dubrouski: Funnily enough, at least for the VarHandles benchmark, it was precise, 30% improvement. Thirty percent improvement in benchmark, 30% improvement in production latency. It was really funny, but it worked. Yet again, sometimes why we use JMH benchmarks, because it's a mission critical system, we cannot just invest into some development, because we want to. JMH benchmarks they use because you can quickly design some micro-harness, micro-benchmark to test the optimization. You copy a piece of code, as-is, and then you modify it and compare the results. Yet again, as I've said, for some of the JMH benchmark, I go down to the assembly code. With the VarHandle optimization, I was checking how it's actually folding, how it's basically inlining the code to achieve that better performance than the byte buffer.

Participant: [inaudible 00:45:37]

Dubrouski: Yet again, it's a very complicated topic. If you just come out of nowhere and try to develop the benchmark, you might have problems with it. It might not actually replicate the reality. You need to basically hone this skill. You need to work on it again and again. Do a lot of verifications. That's why, I dive into the assembly code, because I'm trying to verify that it actually executes what it's supposed to execute, to prove that literally on the assembly level. Then when I see as, yes, it is happening, then we can try. Yet again, it's not always happening. Sometimes benchmark shows improvement in production, that's why we have this intermediate step of building synthetic benchmark before we invest into the actual solution.

Participant: I totally agree. Benchmarking is an iterative process, but also what are you benchmarking? If you measure that aspect of it as well, as Alex mentioned, when you have this understanding of what you want to see at the assembly level, what kind of [inaudible 00:46:50], then you can understand the impact is actually happening, or if you had a lot of issues with that. It makes sense.

Participant: I was curious about the constraint you put on the method size. It was like 8 bytes.

Dubrouski: This is JVM's constraint. This is a JIT compiler threshold. It's not our constraint.

Participant: I was wondering whether it's related to the type of compiler of the JVM itself, because introspection, inlining, and so forth is dependent on the type of JVM you're using.

Dubrouski: We're relying on OpenJDK.

Participant: OpenJDK.

Dubrouski: We're relying on Microsoft OpenJDK. You might assume that fact. The threshold is there. Technically, it can be disabled. We knew that it might not be the best idea to disable, because at some point, the generator can generate something gigantic, and the compiled version will not work properly. We tried to adhere to the rules of the JVM, and yet again, benchmark to make sure that it doesn't cause any regressions.

Participant: All those things are by default. Using Microsoft built OpenJDK is also adhering to the defaults of OpenJDK. If you're seeing in the OpenJDK source, that's what you'll probably find in most of these builds of OpenJDK. If some JDKs out there change the default, and that's something that's observed, it receives feedback. For example, we do some intrinsics, we have added some datastore build, because it helps with the Azure stack. Things like that, people do modify it, change the defaults, and maybe it's because they have observed that they have seen it in prod, and maybe their customers are asking for it.

 

See more presentations with transcripts

 

Recorded at:

Mar 16, 2024

BT