BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Presentations Incremental Data Processing with Apache Hudi

Incremental Data Processing with Apache Hudi

Bookmarks
41:53

Summary

The presenters discuss an introduction to incremental data processing, contrasting it with the two prevalent processing models of today - batch and stream data processing.

Bio

Saketh Chintapalli is Software Engineer @Uber, Bringing Incremental Data Processing to Data Warehouse Models. Bhavani Sudha Saktheeswaran is Distributed Systems Engineer @Onehouse, Apache Hudi PMC, Ex-Moveworks, Ex-Uber, Ex-Linkedin.

About the conference

Software is changing the world. QCon empowers software development by facilitating the spread of knowledge and innovation in the developer community. A practitioner-driven conference, QCon is designed for technical team leads, architects, engineering directors, and project managers who influence innovation in their teams.

Transcript

Saktheeswaran: We are going to introduce incremental data processing with Apache Hudi. We are going to contrast it with batch and stream processing, and see how Hudi bridges the gap between both these models. My name is Bhavani Sudha. I'm a PMC member of the Apache Hudi project. I'm a distributed systems engineer at Onehouse. I was part of different data teams across Moveworks, Uber, and LinkedIn.

Chintapalli: My name is Saketh. I'm a software engineer on the data platform at Uber.

Outline

Saktheeswaran: We'll take a short look at what Apache Hudi is at a high level. Then I'll pass it over to Saketh for explaining what is the need for incremental processing. He's going to use some use cases at Uber to go over this. After that, we have a deep dive on how Hudi empowers an incremental processing framework. We will go over certain components and go over the design choices that Hudi takes. Saketh is going to tie it back to the use cases at Uber. We're going to see some metrics. We will finally wrap up with roadmap and community updates.

Introduction to Apache Hudi (Data Lakes, Data Lakehouses, Table Formats)

To recap, let's go over the evolution of the modern data architecture. In 2000s, we had the popularity in the on-prem data warehousing where it solved for use cases like BI and reporting. There was also parallelly a growth in applications of search and social that needed large volumes of data to be scanned. That's how the Hadoop based data lakes became popular. Both of these architectures made steady progress, with the warehouse shift in BigQuery that introduced a serverless part and then Redshift took it to the cloud.

In 2014, Snowflake was introduced which came up with decoupled storage and compute-based architecture. Similarly, if you see the data lake side, that also saw a steady and solid progress, where we had Apache Spark introduced in 2014, that was predominantly used for data processing. Then, in 2016, Apache Hudi introduced transactions into the data lake, followed by Delta in 2017. Both of these architectures were converging into this decoupled storage and compute base type. Then you can ask, what's the difference between warehouse and lakehouse? Warehouses are closed in nature, lakehouse is open. Warehouses solve the BI and reporting applications, whereas lakehouses, more for ML, DS, and AI based applications.

Warehouses are fully managed and lakehouses are DIY, and rewarding if you're building it at scale, and can be cheaper. Warehouses are expensive. What's the difference between lakehouse and data lakes? What's the real difference? If you see the diagrams here, they look pretty similar, except that the lakehouses bring certain components like table format, services, table services, transaction manager, metadata manager. What these really give you is the power to introduce ACID guarantees into the data lake. Read-write isolation between queries, and concurrent writing between different writing processes.

Let's see how Hudi fits into this architecture. Here is a high-level overview of what Hudi is and how it fits into the lakehouse based architecture. Hudi integrates with different upstream sources. It has these components. It has these transaction managers and all the components that it needs to provide the read isolation. Basically, the transaction database layer here talks about the components that gives you these capabilities. It has a concurrency control, table services for keeping your table in an optimized state, metadata, and caching servers, and things like that. Above that is a programming API that gives all the read and write APIs that the query engines above can use to interact with the Hudi Tables.

This is high level Hudi Table under the hood. The writers produce data which are grouped, we call this file groups. Also, in parallel there is a concept called timeline which logs the events to the table, basically, the metadata. The data and the metadata are colocated and the query engines can leverage this to understand what version the table is right now at. That basically gives the ability to isolate the read and write. Hudi is used by a lot of companies: ByteDance, Uber, Walmart, GE Aviation. Across this board, what you would find is, most of these companies are able to use Hudi at a high volume, but not only that, they're able to bring down their analytics latency from daily to minute level current.

Need for Incremental Processing (Combine the Best of Batch and Streaming)

Chintapalli: Before we talk about the need for incremental processing, I wanted to touch up on the difference between how a typical streaming versus a batch processing pipeline is run. If you look at a streaming pipeline, let's assume that we have a lot of transactions in a database like Postgres, we would typically extract change logs into Debezium, get that data into Kafka, and we would write Spark or Flink programs to process those data streams and serve that data on Elasticsearch or something similar. Architecturally, what you can see is stream processors consume only new input, and they do some state lookup or merges with intermediate results in some state store, and they write only change output or new output to their sync data streams.

Conversely, in the batch world, you would dump that data into some storage like Amazon S3, and you would schedule Spark jobs to run periodically using some workflow orchestration tool, for example, Apache Airflow. Then you would serve that data in your warehouse. Looking at the architectural pattern for the batch stack, batch processors consume all input or large amounts of input from their upstream data sources.

They are overwriting large output or large amounts of output in their downstream. What's the difference? Stream processors are intelligent because they're purely incremental. They're fast and they're efficient. However, they are row oriented and data files are not scan optimized. Conversely, for the flip side of this, in the batch stack, you have columnar format and scan optimized data files. There's very scalable compute, however, you're doing a lot of inefficient recomputation. There's no way to do targeted updates or deletes to your affected records in your downstream datasets. To sum it all up, batch processes are slow and inefficient.

Batch ingestion is really slow. Prior to using Hudi at Uber, what we were doing is we would typically run our batch ETL pipelines every 6 to 10 hours, where we'd be rewriting entire tables with overlaps. At Uber, all the row datasets are date partitioned due to the volume of data. To populate our derived datasets, what we would do is we would do an n day lookback in our ETL logic, understand what had changed, recompute all of the data, and then process it and repopulate all of that data downstream. That's not a smart way to recompute all of this data from upstream, and late arriving data is also a nightmare.

To exemplify that, let's go into our first use case, which is related to driver and courier earnings. Let's say I'm an Uber driver, and there is a dataset that relates to the amount of earnings that a driver has on a given day. That is defined as how much did that driver earn as a result of doing activities on that specific day. Let's say I finish a trip, and I earned $10 on Monday. On Tuesday, I finish another trip that earned me $8, but in addition to that, I got a $1 tip from the trip that I took on Monday. That's just one example of a late arriving data point after the fact.

Let's look at how these late arriving data points can actually be scattered in our downstream datasets. Let's say on May 10th, these are all of the date partitions that were actually touched by earnings events that were published on that day. As you can see, in this red box over here, there's earnings events that were published as late arriving facts almost as far back as December. In our batch ETLs, effectively if we're not looking back as far as December in our ETL logic, we would essentially be missing those updates downstream. Our downstream datasets would remain inconsistent with our upstream data sources. Let's look at the ETL load strategies.

For streaming, as we saw earlier, it's incremental, so you're only consuming updates from upstream, and you're only writing updates to your downstream table. How the batch ETL processes worked for our derived datasets prior to using Hudi, what we were doing is an n day lookback. In our case, we were looking back 90 days. Over here you can look at the different data points. Blue is related to an actual trip and yellow is related to an incentive, which could be a tip that the driver has gotten or any other incentive that's possible. If you look at data points beyond that 90-day lookback period, if there's any late arriving data that's arriving beyond that 90 days, we are essentially missing those updates to capture those downstream.

In addition to that, what we're doing is we have no idea which of the partitions in this 90-day window have actually changed. We're essentially reprocessing all of that 90 days' worth of data, and then writing that entirely into our downstream datasets. The idea for the incremental processing model was born from this. We needed a way to bring the stream processing model to our batch data stack. By leveraging Hudi, what we were able to do is process only new input, and only updates and all updates from upstream, while also retaining our scan optimized storage and our columnar format. Going back to the same example, incremental ETL using Hudi. With using Hudi, what we were able to do is only read updates from our upstream table. We did not have to process the entire window of 90 days, like I explained in our batch example. We were able to update only those targeted records in our downstream table using Hudi writer.

What are the pros and cons of this? Full load, it's very easy to implement, for example, if you needed to join with other data sources. However, it's very expensive and slow. We're reprocessing 90 days' worth of partitions on ETL runs, and overwriting those entire partitions in our downstream tables. Updates to very old data are essentially lost. Same example with that 90-day use case. Incremental ETL with Hudi. It's still easy to implement.

It's very efficient due to the fact that we're processing data only incrementally. It's not real time, but it's definitely close enough. Then, finally, we have our streaming use case with Flink. If you want to enrich your data, you might have to call various services or join data streams. The benefit to this is you'll get your data in real time. This is a high level of Uber's transactional data lake. We have around 8000-plus tables, to which we're writing around 500 billion-plus records per day. This amasses to about 250-plus petabytes of data. By using Hudi, we're able to bring our analytics latency down from daily to a matter of minutes.

The need for incremental ETL for our derived datasets. While we were discussing the actual necessity of incremental ETL, these were the three main categories that came up during those discussions. First is how could we reduce the latency/freshness of our derived datasets. Our batch pipelines were running every 6 to 10 hours, they'd be processing large amounts of data. Because of the fact that they're very time taking and resource extensive, our freshness could go upwards of 31 hours for our derived datasets. We required these datasets to be as fresh as possible for various use cases like regulatory reporting, analytics. Second thing that we thought about was how could we cascade all of the updates from our upstream sources to our downstream datasets?

Going back to the driver earnings example, if there was a late arriving data beyond that 90-day lookback, we were missing it in the batch world of things. We essentially could not afford to do that. We needed a way to minimize the data inconsistency between our model tables with respect to our row datasets. Third and final thing is how could we minimize the recomputation while creating our derived datasets. What we were doing in the batch world of things is looking back, specific number of days of partitions, reprocessing all that data because we had no idea what had changed. We were writing all of that back into our downstream datasets, very time extensive, very resource extensive, and we needed a way to fix that problem.

Let's go into one more use case. This is related to menu updates for Uber Eats merchants. There are several layers of datasets that we have that contain different levels of granularity related to merchant menus on Uber Eats. There is a level for the whole menu itself, for sections within a menu, items within a section, modifier groups and options within an item, and a flattened version of it all. Prior to using Hudi, what we were doing is sequential ingestion. We would look up the data from our upstream sources, our row datasets, and each ingestion pipeline would be dependent on its respective upstream granular dataset. We would be reprocessing and essentially recreating these entire datasets on every single ETL run.

Merchants like to update their menu very frequently. There are also other sources from which menus are actually updated. For various analytics use cases, we needed the latest menu information in our downstream datasets within 10 hours. What percentage of menu updates are actually occurring compared to all the menus that exist on Uber Eats? As you can see from this graph, there's around 11 billion records related to menus, and only around 500 million records are getting updates every single day, which is around 5% of the data. Because of the lack of incremental primitives, we were actually processing 100%, basically all 11 billion of these records every single ETL run, which was a nightmare.

How Hudi Unlocks Incremental Processing (Purpose-Built for Fast-Changing Data)

Saktheeswaran: In this section, we're going to see how Hudi is designed for incremental processing capabilities. Today, Hudi holds a lot of these features. We have incremental reads, field level upserts. We make sure that we have a lot of runtime components that maintain that low level latency. Our multi-modal indexing system is pretty comprehensive. It hosts a whole bunch of indexes that ensures good upsert performance for different types of workloads.

Again, with the clustering, Hudi is able to offer standard clustering techniques like Hilbert curves, Z-Ordering. They do really make sure that your Hudi Tables are able to offer columnar-grade performance, while at the same time the table is going through storage optimizations. For concurrency control, think of it like regular database systems, like where there are daemons that are still operating on the table while the clients are writing to the database. Similarly, Hudi also has these necessities and is able to offer different levels of concurrency control mechanisms, right from lock-free execution to non-blocking execution. Hudi is designed to be scalable for all the metadata. I have a bitly link here, https://bit.ly/hudi-feature-comparison, that compares Hudi feature by feature with other open source projects in the space.

A quick tour at the Hudi Table types. There are two types of Hudi Tables. One is the copy on write table. This is the simplest form. Basically, any change to the table will cause a rewrite of the data files causing a new version. What that means is that the table is going to be really performant for read queries. Whereas there is a lot of write amplification because of the need to overwrite the data. There is also some cost during the writes.

When there are workloads that are rather update heavy, we recommend users to try the merge on read table in production. What the merge on read table gives you is flexibility. Basically here, the changes are not creating a new version of the file, but rather they are queued up as row level updates colocated with the base files. A background process like compaction can periodically stitch these deltas to the base file and create a version file, and that would improve the read queries at that point in time. This gives the flexibility in terms of picking your write and read cost. You can achieve that by tuning the compaction frequency. Here is the difference between COW with MOR table, when we would use what.

Let's take a look at the query types in Hudi. There are three different query types. The snapshot query, read optimized query, and incremental query. Snapshot query like the name suggests gives the latest state of the table as of the last commit. Incremental queries basically give all the data that changed within a specific interval. Read optimized query is a practical application.

Basically, it gives you the data as of the last compaction time. It is a tradeoff between how you're ensuring your queries are performant versus whether you're willing to tradeoff a little bit of staleness in the data. This here is the kernel of the problem for incremental processing. How do we optimize for large scale updates? In this pipeline, there are different things that are going on, which is, how do you bring in change streams into Hudi Table? How do you ingest into Hudi? How do you take changes out of the Hudi Table? This is much harder than format designing, basically, because as there is more data mutations coming in, you also need to update the metadata. You need to ensure that the data and indexes are consistent.

It brings a whole bunch of problems in the database book. There are different steps that are going on into this pipeline. This is all packaged into what we're talking about. How do we tie up all this into incremental processing? You have to deal with deduping. How do you index the data? How do you lay out the data? Whether you're using tables services, how do they interact with each other? What sort of concurrency control mechanisms they need to talk with each other. What we're going to do in this section is pick and choose a few components in this whole pipeline, and talk about how Hudi is designed, and what design choices that it'd make to empower the incremental processing framework.

Let's talk about the first one, merging. What merging means is basically you have multiple records for the same key. How do you handle those duplicates? These duplicates can happen in different places. One is in the incoming stream itself. The other is, you have updates for an existing record. This is crucial for incremental processing, because you need to be able to do this on the fly and really fast. Hudi has this Hudi record merger interface with a lot of implementations out of the box available.

One is the partial updates. Many databases do not give you a full row change when there are updates, they hand you only partial columns, because supplemental logging is really expensive. Hudi is able to really handle these kinds of situations where it can just take a partial update and merge with the existing record. There are implementations in Hudi that does this. Similarly, it can also handle situations like database like running in active-active deployments where you need conflict resolution techniques. The record merger interface has an extendable framework where you can plug in different implementations to handle these sorts of things. This also goes beyond being able to read CDC style format.

Now we've talked about merging. I know how to handle the records when there are duplicates, but how do I really locate the data? That also needs to be faster when you're talking about incremental processing. That's where the concept of indexing and metadata comes into picture. Indexing is a pretty standard technique in database systems, which is used to locate data. It's much similar to how we would look for a page in a book referring to the index. Indexing has been used like so long in different systems for locating information quickly, reducing I/O cost, and improving the query performance.

Most of these systems offer different index implementations, like bitmap index, primary index, secondary index, and so on. Hudi also has a bunch of index implementations out of box. To name a few, we have bloom filter based index, simple index, HBase, bucket index, and so on. This link here, https://hudi.apache.org/blog/2020/11/11/hudi-indexing-mechanisms/, gives you a high-level picture on which index implementation to pick for which workloads. Basically, indexing helps give an efficient upsert performance for different types of workloads.

The most recent one that got added to the index is the record level index implementation, which is coming up in 0.14.0, which is in progress. We did not stop with multiple indexes, we went to generalize this into a framework, what we call as multi-modal indexing subsystem. The idea here is we wanted to use the indexing framework in Hudi to be able to use both in the read side and the write side to be able to scale for big volumes of data. We should also be able to add or snap in any type of index going forward in the table, do all of this asynchronously without blocking the ingestion writer. That's the whole idea behind the multi-modal index.

There is a link here, https://www.onehouse.ai/blog/introducing-multi-modal-index-for-the-lakehouse-in-apache-hudi, if you're interested in knowing what this whole framework does. Just to add on to that, the latest one, the record level index is part of the multi-modal index. It's right there in the metadata table inside Hudi, and takes the format of the merge on read table type to support faster upserts. The record level index has a one-on-one mapping of record key to find locations, and it can scale really well for large tables with updates. Based on our benchmarks we see up to 17x improvement in index lookup latency. There is also a link for the record level index RFC, if you're interested in details.

We saw merging, like how to handle duplicates. We saw how to locate the records quickly. All these are crucial for incremental processing. How do we even lay out the table? We have the data deduping, that's all fine. You also need to be able to think about optimizing your data storage. That's what we are going to talk about next. We're not going to be comprehensive, we're going to pick one or two table services and talk about them.

Compaction, we did talk about this briefly, but this gives the flexibility to balance the read and write cost. Think of it this way. For example, not all the time the update patterns are same as your analytic query patterns. For example, think of GDPR requests where you have to delete a bunch of users or profiles from much older partitions scattered across the table. Whereas your analytical queries are more happy with the recent partitions. In this case, Hudi's compaction can be leveraged in interesting ways. For example, the compaction can run on most recent partitions aggressively to make sure that you have fresher data and faster, whereas the compaction on older partitions can be run in an asynchronous way, or things like that.

Basically, Hudi gives the flexibility in deployment strategies for these table services. What this means for incremental query is that as data comes in, you can still keep writing without taking a hit in the write latency, and rely on the compaction based on your workloads to specifically compact certain partitions more frequently than others. That will be done in the background with the different deployment strategies.

The next one is clustering. Clustering is very similar to what other database systems do, basically storing and sorting the data in such a way that queries can be performant. In Hudi, clustering can be used for two purposes. One is for sorting the data and laying out in a way that queries can be performant. This is crucial for streaming ingestion where write latency is pretty sensitive. You just want to write the data as it's coming in, without having to do much of sorting and things like that at the time of write. You'll want to rely on something in the background to do this job for you. That's what clustering does. To put it in perspective, let's take Uber's trips data. Trips, they happen based on the event time.

Most likely, your data is partitioned based on the creation date of the trips. Your partitions are based on the creation date of the trips, whereas queries sometimes might be based on the city granularity. Like give me all trips that happened in this particular city or geolocation, and so on. Here, what would happen without any optimization of the data is that, as soon as your query hits, it's going to scan pretty much all files in all partitions, because there is no way to know which files have the query predicates and cities that we are interested in. Clustering comes to the rescue. What you can do is schedule clustering asynchronously to make sure that within every date partition, the files are being clustered based on city. Now, your queries can leverage file pruning to make sure they only access a few files that have the city that we desire, instead of scanning all of them.

Table services in Hudi are very carefully designed so they are aware of each other and they don't block the ingestion writer by any means. For example, cleaning is aware of archival, and compaction is aware of clustering. For example, if clustering is actively under progress, compaction wouldn't touch those files that are being clustered. This, again, needs an advanced concurrency control mechanism, so the table services don't accidentally step toes on each other, and they don't block the ingestion writer by any means. Hudi's design takes care of ensuring that these things don't happen. We're also looking into non-blocking concurrency control in the 1.0 release that's upcoming. What this gives us is the flexibility to do N-way stream joins with partial updates and lockless multi-writing. Again, there is an RFC here, if you're interested in the details.

We saw the merge, like how to handle duplicates. We saw how to locate the records quickly. We saw how to lay out the data in an optimized way. It's also important to see how to get the change stream out of Hudi. Here's an example. Let's take a Hudi Table here, which represents bank balances, and let's say there are four columns: UUID, name, timestamp is the last updated timestamp, and balance is the latest balance. Let's say UUID is a primary key. Hudi definitely requires a primary key. As soon as time, t2, there is an incoming data with more records, and Hudi is able to tag these records as inserts and updates and deletes based on the UUID.

Here we see one insert and one update. As soon as Hudi processes it, it tags along multiple meta fields at the record level, and then stores it in the storage. What this gives us is change level tracking capabilities, which we will see in a bit. Let's say we have another incoming data. Here, again, Hudi is able to tag the records, whether they're inserts, updates, or deletes based on the UUID. Let's say when we process this, there is one interesting thing to note here, is that the update for one is not processed, although Hudi tagged it as an update. The reason here is that the timestamp field, 2000, is belonging to an older time than what is already in storage. Basically, Hudi is able to tell apart from late arriving data, and so it wouldn't merge into the storage.

These sorts of event time semantics is crucial as we indulge in streaming and incremental data processing. This is already taken care of by Hudi with different merge implementations that we talked about earlier. One thing I mentioned is that we have a primary key that's required by Hudi. So far, we have been asking users to configure this as a mandatory field in 14.0, which is in progress or being announced this week. We have relaxed this restraint. If users are not able to configure the record key, Hudi is still able to automatically generate one for them. One classic example is log ingestion, where you don't have an inherent record key. These are how the changes happened in table based on the incoming batches in t1, t2, and t3. How do we get the CDC data out of this table?

With this config, you're able to tell Hudi that this table needs to be queried in a CDC format, so enable this logging for me. This has to happen in ingestion time. Once that has happened, then using the Spark query just like this, you say, give me an incremental query of CDC format between t1, and t3, give me all the changes. That gives a neat Debezium style output like this, which has the operation time, timestamp, before and after images. This is a quick peek of the Hudi, Flink CDC.

Let's go through some common use cases of incremental processing. Here's a streaming ingest example. We have a user that wants to ingest data from Kafka, go through some transformations, and load it into a Hudi Table. Hudi has a sophisticated tool called the HoodieStreamer tool. It does not need any coding from the users, but it would be as simple as a Spark submit job, which takes in some configs. It can connect to different sources. In this case, it would be a Kafka source. It takes Hudi related configs. For example, here, what is the table name, path, record key, ordering field? It also can take transformations. Here we have chained multiple transformations together, like flattening transformer, projections, and things like that.

We can also do SQL based transformation. Then, this tool can be run in two ways. One is a continuous way, where you would say, run this tool after every so period of time, denoted by the sync interval, or you can run it just once. The tool is very capable of tracking the checkpoint so it knows where to start from when it's running the next time. For example, if it's Kafka as a source, it can track the checkpoint as Kafka offsets. When it comes back again the next time, it is able to resume from those particular offsets where it left. It also goes above and beyond to give the capability to users to configure table services, apart from what is by default available. Users can even configure the clustering and other table services like this.

Let's take another example which is the CDC type example. Here we have Postgres CDC, Postgres SQL change log sent via the Debezium Kafka connector and ingested into Hudi. On the left is the Postgres-Debezium connector configs. On the right is the streamer tool again. Here, in this example, the source for the streamer tool would be the Postgres-Debezium source with CDC payload.

Here, we also are tuning the configs for small file handling. This is done by default. In case you want to tune it, you can specify the configs here. There are hives in configs. These are basically configs to tell where to sync the Hudi Table so it can be queried by other query engines like Spark, Presto, Trino. This is more an advanced example, like incremental ideal example, where we have data from Kafka streamed into a Hudi Table.

Let's say this is trips table, for example, road trips. You want to join that with additional information from different data sources to create a derived table. Here, again, we are going to use the HoodieStreamer tool. The source now would be like an upstream Hudi Table. That's what is specified with the Hudi incremental source. Based on Hudi's commit time, it can track until which commit it has processed and when to pick next. All the giant logic is inside the SQL transformer. There are some bunch of configs that are specific to the incremental source. So far, we saw some examples.

Incremental ETL at Uber Scale

Chintapalli: Previously, we talked about the batch processing architecture for our Uber Eats Merchant Menu updates use case. I'm going to go back to that same use case, and show you a little before and after of the data architecture and how the data flow looked like. As I mentioned, we had different tables related to different levels of granularity for our menus on Uber Eats. What we were doing prior to using Hudi was we were scanning our upstream partitions, and then doing a full scan and merge with our old version of our menu base table.

Rewriting that whole version as a new version of menu base, and promoting that to the production dataset. What we would do for the subsequent tables for finer granularities is, again, reprocess and recreate those tables by processing 100% of that data. As we saw, it was 11 billion records, compared to the 500 million that are only being updated every single day. This ETL strategy, again, is full load. The execution time end-to-end for this entire pipeline took greater than 12 hours. Because of the fact that it took so long, and it was processing so much data, we only ran this pipeline once a day, so every 24 hours. How did Hudi come to the picture?

By using HoodieStreamer tool, we were able to incrementally fetch data, only the updates, the 500 million, as I mentioned before, from our upstream, and directly upsert those 500 million records into our menu base dataset without needing to touch the other 10-and-a-half billion records that existed. By directly upserting that pipeline, the menu base took only 40 minutes. That 5% of data were subsequently upserted into the respective granular tables. Our end-to-end execution time was less than 4 hours. Because of the fact that it was processing a lot less data, and taking a lot less time to run, we were able to run this pipeline every 6 hours to basically have fresher data in our downstream datasets.

What were the wins? In terms of accuracy, we achieved 100% data accuracy with respect to upstream and our downstream datasets. As we talked about the driver earnings use case, there is no updates that are lost even for year-old trips. Coming to efficiency, because of the fact that we were using incremental primitives with the HoodieStreamer tool, we're processing a lot less data on each run. For weekly driver earnings aggregations with full load, it used to take around 4 to 5 hours per run, with incremental reads and writes, that was brought down to 45 minutes.

That could further be improved with various configuration tuning. Freshness, this was another big concern due to which we decided to adopt incremental ETL model. We were able to reduce our freshness SLAs for our derived datasets from 31 hours all the way down to 8 hours. That could also be further improved. This unlocks earnings features, of course, closer to real time.

All in all, because of the incremental primitives, in our benchmarking, we found that lakehouse based incremental ETLs were actually about 50% cheaper than our old school batch pipelines. Here are some performance improvements for a couple of tables for a dimensional driver table and a driver status fact table in the batch world of things compared to the incremental ETL world of things. There are just improvements across the board in terms of vCores, memories, the cost of the actual pipeline, and the time it takes to run. It's about 50% on average for any specific pipeline.

As we started scaling the number of data pipelines that we onboarded to this incremental ETL model, observability became a huge thing that we became concerned with. HoodieDeltaStreamer actually emits various metrics related to the commit and the actual data capture progress during the pipeline execution. By exposing those metrics and setting up alerts around them, we were actually able to reduce the time to detect various freshness failures or pipeline failures that could occur. In addition to that, we also had improved observability surrounding data loads and volumes with each run without having to go and manually query the datasets ourselves.

Here are some examples of some extraction and duration metrics that the streamer job emits. On the top left you can see the total streamer job runtime, the time it took for the actual commit to happen. On the bottom, you can see with each run how many commits are actually being processed from upstream, and how many commits are left to be processed from our downstream table. Then, finally, I want to take you guys through some commit status metrics, so you could see the total number of records that are actually written on a daily basis on every run, and then the number of records that were inserted versus updated as a part of that run.

Hudi Community

Saktheeswaran: We have some stats here from the community. We have Hudi pre-installed in 5-plus cloud providers, and has a diverse community with PMC members and committers all across the globe. It has a rich community of participants. Our Slack is super active. These are some metrics from our GitHub and Slack. These are companies that are using Hudi actively in their production pipelines and data lakes.

Here are some more resources to check out. There is a roadmap link here that talks about future things that are coming in Hudi, what are the next releases, and things like that? We have 0.14.0 being announced. Hudi 1.0 also talks about changes that are major that are coming up in Hudi. These are different resources on how you can connect to the community.

 

See more presentations with transcripts

 

Recorded at:

Aug 16, 2024

BT