Key Takeaways
- DuckDB is an open-source OLAP database designed for analytical data management. Similar to SQLite, it is an in-process database that can be embedded within your application.
- In an in-process database, the engine resides within the application, enabling data transfer within the same memory address space. This eliminates the need to copy large amounts of data over sockets, resulting in improved performance.
- DuckDB leverages vectorized query processing, which enables efficient operations within the CPU cache and minimizes function call overhead.
- The use of Morsel-Driven parallelism in DuckDB allows efficient parallelization across multiple cores while maintaining awareness of multi-core processing.
Why did I embark on the journey of building a new database? It started with a statement by the well-known statistician and software developer Hadley Wickham:
If your data fits in memory there is no advantage to putting it in a database: it will only be slower and more frustrating.
This sentiment was a blow and a challenge to database researchers like myself. What are the aspects that make databases slow and frustrating? The first culprit is the client-server model.
When conducting data analysis and moving large volumes of data into a database from an application, or extracting it from a database into an analysis environment like R or Python, the process can be painfully slow.
I tried to understand the origins of the client-server architectural pattern, and I authored the paper, "Don’t Hold My Data Hostage – A Case For Client Protocol Redesign".
Comparing the database client protocols of various data management systems, I timed how long it took to transmit a fixed dataset between a client program and several database systems.
As a benchmark, I used the Netcat utility to send the same dataset over a network socket.
[Click on the image to view full-size]
Figure 1: Comparing different clients; the dashed line is the wall clock time for netcat to transfer a CSV of the data
Compared to Netcat, transferring the same volume of data with MySQL took ten times longer, and with Hive and MongoDB, it took over an hour. The client-server model appears to be fraught with issues.
SQLite
My thoughts then turned to SQLite. With billions and billions of copies existing in the wild, SQLite is the most extensively used SQL system in the world. It's quite literally everywhere: you're daily engaging with dozens, if not hundreds, of instances unbeknownst to you.
SQLite operates in-process, a different architectural approach integrating the database management system directly into a client application, avoiding the traditional client-server model. Data can be transferred within the same memory address space, eliminating the need to copy and serialize large amounts of data over sockets.
However, SQLite isn't designed for large-scale data analysis and its primary purpose is to handle transactional workloads.
DuckDB
Several years ago, Mark Raasveldt and I began working on a new database, DuckDB. Written entirely in C++, DuckDB is a database management system that employs a vectorized execution engine. It is an in-process database engine and we often refer to it as the 'SQLite for analytics'. Released under the highly permissive MIT license, the project operates under the stewardship of a foundation, rather than the typical venture capital model.
What does interacting with DuckDB look like?
import duckdb
duckdb.sql('LOAD httpfs')
duckdb.sql("SELECT * FROM 'https://github.com/duckdb/duckdb/blob/master/data/parquet-testing/userdata1.parquet'").df()
In these three lines, DuckDB is imported as a Python package, an extension is loaded to enable communication with HTTPS resources, and a Parquet file is read from a URL and converted back to a Panda DataFrame (DF).
DuckDB, as demonstrated in this example, inherently supports Parquet files, which we consider the new CSV. The LOAD httpfs
call illustrates how DuckDB can be expanded with plugins.
There's a lot of intricate work hidden in the conversion to DF, as it involves transferring a result set, potentially millions of lines. But as we are operating in the same address space, we can bypass serialization or socket transfer, making the process incredibly fast.
We've also developed a command-line client, complete with features like query autocompletion and SQL syntax highlighting. For example, I can initiate a DuckDB shell from my computer and read the same Parquet file:
If you consider the query:
SELECT * FROM userdata.parquet;
you realize that would not typically work in a traditional SQL system, as userdata.parquet
is not a table, it is a file. The table doesn't exist yet, but the Parquet file does. If a table with a specific name is not found, we search for other entities with that name, such as a Parquet file, directly executing queries on it.
In-Process Analytics
From an architectural standpoint, we have a new category of data management systems: in-process OLAP databases.
SQLite is an in-process system, but it is geared toward OLTP (Online Transaction Processing). When you think of a traditional client-server architecture for OLTP, PostgreSQL is instead the most common option.
Figure 2: OLTP versus OLAP
On the OLAP side, there have been several client-server systems, with ClickHouse being the most recognized open-source option. However, before the emergence of DuckDB, there was no in-process OLAP option.
Technical Perspective of DuckDB
Let's discuss the technical aspects of DuckDB, walking through the stages of processing the following query:
[Click on the image to view full-size]
Figure 3: A simple select query on DuckDB
The example involves selecting a name and sum from the joining of two tables, customer
, and sale
that share a common column, cid
. The goal is to compute the total revenue per customer, summing up all revenue and including tax for each transaction.
When we run this query, the system joins the two tables, aggregating customers based on the value in the cid
column. Then, the system computes the revenue + tax
projection, followed by a grouped aggregation by cid
, where we compute the first name and the final sum.
DuckDB processes this query through standard phases: query planning, query optimization, and physical planning, and the query planning stage is further divided into so-called pipelines.
For example, this query has three pipelines, defined by their ability to be run in a streaming fashion. The streaming ends when we encounter a breaking operator, that is an operator that needs to retrieve the entire input before proceeding.
Figure 4: First pipeline
The first pipeline scans the customer
table and constructs a hash table. The hash join is split into two phases, building the hash table on one side of the join, and probing, which happens on the other side. The building of the hash table requires seeing all data from the left-hand side of the join, meaning we must run through the entire customer
table and feed all of it into the hash join build
phase. Once this pipeline is completed, we move to the second pipeline.
[Click on the image to view full-size]
Figure 5: Second pipeline
The second pipeline is larger and contains more streaming operators: it can scan the sales
table, and look into the hash table we've built before to find join partners from the customer
table. It then projects the revenue + tax
column and runs the aggregate, a breaking operator. Finally, we run the group by build
phase and complete the second pipeline.
Figure 6: Third pipeline
We can schedule the third and final pipeline
that reads the results of the GROUP BY
and outputs the result. This process is fairly standard and many database systems take a similar approach to query planning.
Row-at-a-time
To understand how DuckDB processes a query, let's consider first the traditional Volcano-style iterator model that operates through a sequence of iterators: every operator exposes an iterator and has a set of iterators as its input.
The execution begins by trying to read from the top operator, in this case, the GROUP BY BUILD
phase. However, it can't read anything yet as no data has been ingested. This triggers a read request to its child operator, the projection, which reads from its child operator, the HASH JOIN PROBE
. This cascades down until it finally reaches the sale
table.
[Click on the image to view full-size]
Figure 7: Volcano-style iterator model
The sale
table generates a tuple, for example, 42
, 1233
, 422
, representing the ID, revenue, and tax columns. This tuple then moves up to the HASH JOIN PROBE
, which consults its built hash table. For instance, it knows that ID 42
corresponds to the company ASML and it generates a new row as the join result, which is ASML
, 1233
, 422
.
This new row is then processed by the next operator, the projection, which sums up the last two columns, resulting in a new row: ASML
, 1355
. This row finally enters the GROUP BY BUILD
phase.
This tuple-at-a-time, row-at-a-time approach is common to many database systems such as PostgreSQL, MySQL, Oracle, SQL Server, and SQLite. It's particularly effective for transactional use cases, where single rows are the focus, but it has a major drawback in analytical processing: it generates significant overhead due to the constant switching between operators and iterators.
One possible improvement suggested by the literature is to just-in-time (JIT)
compile the entire pipeline. This option, though viable, isn't the only one.
Vector-at-a-time
Let's consider the operation of a simple streaming operator like the projection.
[Click on the image to view full-size]
Figure 8: Implementation of a projection
We have an incoming row and some pseudocode: input.readRow
reads a row of input, the first value remains unchanged, and the second entry in the output becomes the result of adding the second and third values of the input, with the output then written. While this approach is easy to implement, it incurs a significant performance cost due to function calls for every value read.
An improvement over the row-at-a-time model is the vector-at-a-time model, first proposed in "MonetDB/X100: Hyper-Pipelining Query Execution" in 2005.
This model processes not just single values at a time, but short columns of values collectively referred to as vectors. Instead of examining a single value for each row, multiple values are examined for each column at once. This approach reduces the overhead as type switching is performed on a vector of values instead of a single row of values.
[Click on the image to view full-size]
Figure 9: The vector-at-a-time model
The vector-at-a-time model strikes a balance between columnar and row-wise executions. While columnar execution is more efficient, it can lead to memory issues. By limiting the size of columns to something manageable, the vector-at-a-time model avoids JIT compilation. It also promotes cache locality, which is critical for efficiency.
The importance of cache locality is illustrated by the well-known Latency Numbers Everyone Should Know.
[Click on the image to view full-size]
Figure 10: Latency Numbers Everyone Should Know
The graphic, provided by Google's Peter Norvig and Jeff Dean, highlights the disparity between the L1 cache reference (0.5 nanoseconds) and the main memory reference (100 nanoseconds), a factor of 200. Given that L1 cache reference has become 200 times faster since 1990 compared to memory reference, which is only twice as fast, there's a significant advantage in having operations fit within the CPU cache.
This is where the beauty of vectorized query processing lies.
[Click on the image to view full-size]
Figure 11: Implementation of a projection with vectorized query processing
Let's consider the same projection of revenue + tax
example we discussed before. Instead of retrieving a single row, we get as input three vectors of values and output two vectors of values. We read a chunk (a collection of small vectors of columns) instead of a single row. As the first vector remains unchanged, it's reassigned to the output. A new result vector is created, and an addition operation is performed on every individual value in the range
from 0 to 2048.
This approach allows the compiler to insert special instructions automatically and avoids function call overhead by interpreting and switching around data types and operators only at the vector level. This is the core of vectorized processing.
Exchange-Parallelism
Vectorized processing being efficient on a single CPU is not enough, it also needs to perform well on multiple CPUs. How can we support parallelism?
Goetz Graefe, principal scientist at Google, in his paper "Volcano - An Extensible and Parallel Query Evaluation System" described the concept of exchange operator parallelism.
Figure 12: Exchange operator parallelism
In this example, three partitions are read simultaneously. Filters are applied and values are pre-aggregated, then hashed. Based on the values of the hash, the data is split up, further aggregated, re-aggregated, and then the output is combined. By doing this, most parts of the query are effectively parallelized.
For instance, you can observe this approach in Spark's execution of a simple query. After scanning the files, a hash aggregate performs a partial_sum
. Then, a separate operation partitions the data, followed by a re-aggregation that computes the total sum. However, this has been proven to be problematic in many instances.
Morsel-Driven Parallelism
A more modern model for achieving parallelism in SQL engines is Morsel-Driven parallelism. As in the approach above, the input level scans are divided, resulting in partial scans. In our second pipeline, we have two partial scans of the sale table, with the first one scanning the first half of the table and the second one scanning the latter half.
[Click on the image to view full-size]
Figure 13: Morsel-Driven parallelism
The HASH JOIN PROBE
remains the same as it still operates on the same hash table from the two pipelines. The projection operation is independent, and all these results sync into the GROUP BY
operator, which is our blocking operator. Notably, you don't see an exchange operator here.
Unlike the traditional exchange operator-based model, the GROUP BY
is aware of the parallelization taking place and is equipped to efficiently manage the contention arising from different threads reading groups that could potentially collide.
[Click on the image to view full-size]
Figure 14: Partitioning hash tables for parallelized merging
In Morsel-Driven parallelism, the process begins (Phase 1) with each thread pre-aggregating its values. The separate subsets or morsels of input data, are built into separate hash tables.
The next phase (Phase 2) involves partition-wise aggregation: in the local hash tables, data is partitioned based on the radixes of the group keys, ensuring that each hash table cannot contain keys present in any other hash table. When all the data has been read and it's time to finalize our hash table and aggregate, we can select the same partition from each participating thread and schedule more threads to read them all.
Though this process is more complex than a standard aggregate hash table, it allows the Morsel-Driven model to achieve great parallelism. This model efficiently constructs an aggregation over multiple inputs, circumventing the issues associated with the exchange operator.
Simple Benchmark
I conducted a simple benchmark, using our example query with some minor added complexity in the form of an ORDER BY
and a LIMIT
clause. The query selects the name
and the sum of revenue + tax
from the customer
and sale
tables, which are joined and grouped by the customer ID.
The experiment involved two tables: one with a million customers and another with a hundred million sales entries. This amounted to about 1.4 gigabytes of CSV data, which is not an unusually large dataset.
[Click on the image to view full-size]
Figure 15: The simple benchmark
DuckDB completed the query on my laptop in just half a second. On the other hand, PostgreSQL, after I had optimized the configuration, took 11 seconds to finish the same task. With default settings, it took 21 seconds.
While DuckDB could process the query around 40 times faster than PostgreSQL, it's important to note that this comparison is not entirely fair, as PostgreSQL is primarily designed for OLTP workloads.
Conclusions
The goal of this article is to explain the design, functionality, and rationale behind DuckDB, a data engine encapsulated in a compact package. DuckDB functions as a library linked directly to the application process, boasting a small footprint and no dependencies and allowing developers to easily integrate a SQL engine for analytics.
I highlighted the power of in-process databases, which lies in their ability to efficiently transfer result sets to clients and write data to the database.
An essential component of DuckDB's design is vectorized query processing: this technique allows efficient in-cache operations and eliminates the burden of the function call overhead.
Lastly, I touched upon DuckDB's parallelism model: Morsel-Driven parallelism supports efficient parallelization across any number of cores while maintaining awareness of multi-core processing, contributing to DuckDB's overall performance and efficiency.