BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Apache Arrow and Java: Lightning Speed Big Data Transfer

Apache Arrow and Java: Lightning Speed Big Data Transfer

Leia em Português

This item in japanese

Key Takeaways

  • Arrow features zero-copy data transfers for analytics applications
  • Arrow enables in-memory, columnar format, data processing
  • Arrow is cross-platform, cross-language interoperable data exchange
  • Arrow is a back bone for Big data systems

By its very nature, Big Data is too big to fit on a single machine. Datasets need to be partitioned across multiple machines. Each partition is assigned to one primary machine, with optional backup assignments. Hence, every machine holds multiple partitions. Most big data frameworks use a random strategy for assigning partitions to machines. If each computation job uses one partition, this strategy results in a good spreading of computational load across a cluster. However, if a job needs multiple partitions, there is a big chance that it needs to fetch partitions from other machines. Transferring data is always a performance penalty.

Apache Arrow puts forward a cross-language, cross-platform, columnar in-memory data format for data. It eliminates the need for serialization as data is represented by the same bytes on each platform and programming language. This common format enables zero-copy data transfer in big data systems, to minimize the performance hit of transferring data. 

The goal of this article is to introduce Apache Arrow and get you acquainted with the basic concepts of the Apache Arrow Java library. The source code accompanying this article can be found here.

Typically, a data transfer consists of:

  • serializing data in a format
  • sending the serialized data over a network connection
  • deserializing the data on the receiving side

Think for example about the communication between frontend and backend in a web application. Commonly, the JavaScript Object Notation (JSON) format is used to serialize data. For small amounts of data, this is perfectly fine. The overhead of serializing and deserializing is negligible, and JSON is human-readable which simplifies debugging. However, when data volumes increase, the serialization cost can become the predominant performance factor. Without proper care, systems can end up spending most of their time serializing data. Clearly, there are more useful things to do with our CPU cycles.

In this process, there is one factor we control in software: (de)serialization. Needless to say, there are a plethora of serialization frameworks out there. Think of ProtoBuf, Thrift, MessagePack, and many others. Many of them have minimizing serialization costs as a primary goal.

Despite their efforts to minimize serialization, there is inevitably still a (de)serialization step. The objects your code acts on, are not the bytes that are sent over the network. The bytes that are received over the wire, are not the objects the code on the other side crunches. In the end, the fastest serialization is no serialization.

Is Apache Arrow for me?

Conceptually, Apache Arrow is designed as a backbone for Big Data systems, for example, Ballista or Dremio, or for Big Data system integrations. If your use cases are not in the area of Big Data systems, then probably the overhead of Apache Arrow is not worth your troubles. You’re likely better off with a serialization framework that has broad industry adoption, such as ProtoBuf, FlatBuffers, Thrift, MessagePack, or others. 

Coding with Apache Arrow is very different from coding with plain old Java objects, in the sense that there are no Java objects. Code operates on buffers all the way down. Existing utility libraries, e.g., Apache Commons, Guava, etc., are no longer usable. You might have to re-implement some algorithms to work with byte buffers. And last but not least, you always have to think in terms of columns instead of objects. 

Building a system on top of Apache Arrow requires you to read, write, breathe, and sweat Arrow buffers. If you are building a system that works on collections of data objects (i.e., some kind of database), want to compute things that are columnar-friendly, and are planning to run this in a cluster, then Arrow is definitely worth the investment. 

The integration with Parquet (discussed later) makes persistence relatively easy. The cross-platform, cross-language aspect supports polyglot microservice architectures and allows for easy integration with the existing Big Data landscape. The built-in RPC framework called Arrow Flight makes it easy to share/serve datasets in a standardized, efficient way. 

Zero-copy data transfer

Why do we need serialization in the first place? In a Java application, you typically work with objects and primitive values. Those objects are mapped somehow to bytes in the RAM memory of your computer. The JDK understands how objects are mapped to bytes on your computer. But this mapping might be different on another machine. Think for example about the byte order (a.k.a. endianness). Moreover, not all programming languages have the same set of primitive types or even store similar types in the same way. 

Serialization converts the memory used by objects into a common format. The format has a specification, and for each programming language and platform, a library is provided converting objects to serialized form and back. In other words, serialization is all about sharing data, without disrupting the idiosyncratic ways of each programming language and platform. Serialization smooths out all the differences in platform and programming language, allowing every programmer to work the way he/she likes. Much like translators smooth out language barriers between people speaking different languages. 

Serialization is a very useful thing in most circumstances. However, when we are transferring lots of data, it will become a big bottleneck. Hence, can we eliminate the serialization process in those cases? This is actually the goal of zero-copy serialization frameworks, such as Apache Arrow and FlatBuffers. You could think of it as working on the serialized data itself instead of working on objects, in order to avoid the serialization step. Zero-copy refers here to the fact that the bytes you application works on can be transferred over the wire without any modification. Likewise, on the receiving end, the application can start working on the bytes as is, without a deserialization step. 

The big advantage here is that data can be transferred as-is from one environment to another environment without any translation because the data is understood as-is on both sides of the connection. 

The major disadvantage is the loss of idiosyncrasies in programming. All operations are carried out on byte buffers. There is no integer, there is a sequence of bytes. There is no array, there is a sequence of bytes. There is no object, there is a collection of sequences of bytes. Naturally, you can still convert the data in the common format to integers, arrays, and objects. But, then you would be doing deserialization, and that would be defeating the purpose of zero-copy. Once transferred to Java objects, it is again only Java that can work with the data. 

How does this work in practice? Let’s have a quick look at two zero-copy serialization frameworks: Apache Arrow and FlatBuffers from Google. Although both are zero-copy frameworks, they are different flavors serving different use cases. 

FlatBuffers was initially developed to support mobile games. The focus is on the fast transmission of data from server to client, with minimal overhead. You can send a single object or a collection of objects. The data is stored in (on heap) ByteBuffers, formatted in the FlatBuffers common data layout. The FlatBuffers compiler will generate code, based on the data specification, that simplifies your interaction with the ByteBuffers. You can work with the data as if it is an array, object, or primitive. Behind the scenes, each accessor method fetches the corresponding bytes and translates the bytes into understandable constructs for the JVM and your code. If you need, for whatever reason, access to the bytes, you still can. 

Arrow differs from FlatBuffers in the way that they lay out lists/arrays/tables in memory. Whereas FlatBuffers uses a row-oriented format for its tables, Arrow uses a columnar format for storing tabular data. And that makes all the difference for analytical (OLAP) queries on big data sets.

Arrow is aimed at big data systems in which you typically don’t transfer single objects, but rather big collections of objects. FlatBuffers, on the other hand, is marketed (and used) as a serialization framework. In other words, your application code works on Java objects and primitives and only transforms data into the FlatBuffers’ memory layout when sending data. If the receiving side is read-only, they don’t have to deserialize data into Java objects, the data can be read directly from the FlatBuffers’ ByteBuffers. 

In a big dataset, the number of rows can typically range from thousands to trillions of rows. Such a dataset may have from a couple to thousands of columns.

A typical analytics query on such a dataset references but a handful of columns. Imagine for example a dataset of e-commerce transactions. You can imagine that a sales manager wants an overview of sales, of a specific region, grouped by item category. He doesn’t want to see each individual sale. The average sale price is sufficient. Such a query can be answered in three steps:

  • traversing all values in the region column, keeping track of all the row/object ids of sales in the requested region
  • grouping the filtered ids based on the corresponding values in the item category column
  • computing aggregations for each group

Essentially, a query processor only needs to have one column in memory at any given time. By storing a collection in a columnar format, we can access all values of a single field/column separately. In well- designed formats this is done in such a way that layout is optimized for SIMD instructions of CPUs. For such analytics workloads, the Apache Arrow columnar layout is better suited than the FlatBuffers row-oriented layout.

Apache Arrow

The core of Apache Arrow is the in-memory data layout format. On top of the format, Apache Arrow offers a set of libraries (including C, C++, C#, Go, Java, JavaScript, MATLAB, Python, R, Ruby, and Rust), to work with data in the Apache Arrow format. The remainder of this article is about getting comfortable with the basic concepts of Arrow, and how to write a Java application using Apache Arrow.

Basic Concepts

Vector Schema Root

 Let’s imagine we’re modeling the sales record of a chain of stores. Typically you encounter an object to represent a sale. Such an object will have various properties, such as 

  • an id 
  • information about the store in which the sale was made, like region, city, and perhaps the type of store
  • some customer information
  • an id of the sold good
  • a category (and possibly subcategory) of the sold good
  • how many goods were sold
  • etc…

In Java, a sale is modeled by a Sale class. The class contains all the information of a single sale. All the sales are represented (in-memory) by a collection of Sale objects. From a database perspective, a collection of Sale objects is equivalent to a row-oriented relational database. Indeed, typically in such an application, the collection of objects is mapped to a relational table in a database for persistence. 

In a column-oriented database, the collection of objects is decomposed in a collection of columns. All the ids are stored in a single column. In memory, all the ids are stored sequentially. Similarly, there is a column for storing all the store cities for each sale. Conceptually this columnar format can be thought of as decomposing a collection of objects into a set of equal-length arrays. One array per field in an object.

To reconstruct a specific object, the decomposing arrays are combined by picking the values of each column/array at a given index. For example, the 10th sale is recomposed by taking the 10th value of the id array, the 10th value of the store city array, etc. 

Apache Arrow works like a column-oriented relational database. A collection of Java objects is decomposed into a collection of columns, which are called vectors in Arrow. A vector is the basic unit in the Arrow columnar format. 

The mother of all vectors is the FieldVector. There are vector types for primitive type, such as Int4Vector and Float8Vector. There is a vector type for Strings: the VarCharVector. There is a vector type for arbitrary binary data: VarBinaryVector. Several types of vectors exist to model time, such as TimeStampVector, TimeStampSecVector, TimeStampTZVector, and TimeMicroVector. 

More complex structures can be composed. A StructVector is used to group a set of vectors into one field. Think for example about the store information in the sales example above. All store information (region, city, and type) can be grouped in one StructVector. A ListVector allows for storing a variable-length list of elements in one field. A MapVector stores a key-value mapping in one vector. 

Continuing on the database analogy, a collection of objects is represented by a table. To identify values in a table, a table has a schema: a name to type mapping. In a row-oriented database, each row maps a name to a value of the predefined type. In Java, a schema corresponds to the set of member variables of a class definition. A column-oriented database equally has a schema. In a table, each name in the schema maps to a column of the predefined type.

In Apache Arrow terminology, a collection of vectors is represented by a VectorSchemaRoot. A VectorSchemaRoot also contains a Schema, mapping names (a.k.a. Fields) to columns (a.k.a. Vectors).

Buffer Allocator

Where are the values stored that we add to a vector? An Arrow vector is backed by a buffer. Typically this is a java.nio.ByteBuffer. Buffers are pooled in a buffer allocator. You can ask a buffer allocator to create a buffer of a certain size, or you can let the buffer allocator take care of the creation and automatic expansion of buffers to store new values. The buffer allocator keeps track of all the allocated buffers. 

A vector is managed by one allocator. We say that the allocator owns the buffer backing the vector. Vector ownership can be transferred from one allocator to another. 

For example, you’re implementing a data flow. The flow consists of a sequence of processing stages. Each stage does some operations on the data, before passing on the data to the next stage. Each stage would have its own buffer allocator, managing the buffers that are currently being processed. Once processing is completed, data is handed to the next stage. 

In other words, the ownership of the buffers backing the vectors is transferred to the buffer allocator of the next stage. Now, that buffer allocator is responsible for managing the memory and freeing it up when it is no longer needed.

The buffers created by an allocator are DirectByteBuffers, hence they are stored off-heap. This implies that when you’re done using the data, the memory must be freed. This feels strange at first for a Java programmer. But it is an essential part of working with Apache Arrow. Vectors implement the AutoCloseable interface, hence, it is recommended to wrap vector creation in a try-with-resources block which will automatically close the vector, i.e., free the memory. 

Example: writing, reading, and processing

To conclude this introduction, we’ll walk through an example application using Apache Arrow. The idea is to read a “database” of people from a file on disk, filter and aggregate the data, and print out the results.

Do note that Apache Arrow is an in-memory format. In a real application, you’re better off with other (columnar) formats that are optimized for persisted storage, for example, Parquet. Parquet adds compression and intermediate summaries to the data written to disk. As a result, reading and writing Parquet files from disk should be faster than reading and writing Apache Arrow files. Arrow is used in this example purely for educational purposes.

Let’s imagine we have a class Person and a class Address (only showing relevant parts):

public Person(String firstName, String lastName, int age, Address address) {
    this.firstName = firstName;
    this.lastName = lastName;
    this.age = age;

    this.address = address;
}

public Address(String street, int streetNumber, String city, int postalCode) {
    this.street = street;
    this.streetNumber = streetNumber;
    this.city = city;
    this.postalCode = postalCode;
}

We’re going to write two applications. The first application will generate a collection of randomly generated people and write them, in Arrow format, to disk. Next, we’ll write an application that reads the “people database” in Arrow format from disk into memory. Select all people

  • having a last name starting with “P”
  • are aged between 18 and 35
  • are living in a street ending with “way”

For the selected people, we compute the average age, grouped per city. This example should give you some perspective on how to use Apache Arrow to implement in-memory data analytics.

The code for this example can be found in this Git repository.

Writing data

Before we start writing out data. Do note that the Arrow format is aimed at in-memory data. It is not optimized for disk storage of data. In a real application, you should look into formats such as Parquet, which support compression and other tricks to speed up on-disk storage of columnar data, to persist your data. Here we will write out data in Arrow format to keep the discussion focused and short. 

Given an array of Person objects, let start writing out data to a file called people.arrow. The first step is to convert the array of Person objects to an Arrow VectorSchemaRoot. If you really want to get the most out of Arrow, you would write your whole application to use Arrow vectors. But for educational purposes it is useful to do the conversion here.

private void vectorizePerson(int index, Person person, VectorSchemaRoot schemaRoot) {
    // Using setSafe: it increases the buffer capacity if needed
    ((VarCharVector) schemaRoot.getVector("firstName")).setSafe(index, person.getFirstName().getBytes());
    ((VarCharVector) schemaRoot.getVector("lastName")).setSafe(index, person.getLastName().getBytes());
    ((UInt4Vector) schemaRoot.getVector("age")).setSafe(index, person.getAge());

    List<FieldVector> childrenFromFields = schemaRoot.getVector("address").getChildrenFromFields();

    Address address = person.getAddress();
    ((VarCharVector) childrenFromFields.get(0)).setSafe(index, address.getStreet().getBytes());
    ((UInt4Vector) childrenFromFields.get(1)).setSafe(index, address.getStreetNumber());
    ((VarCharVector) childrenFromFields.get(2)).setSafe(index, address.getCity().getBytes());
    ((UInt4Vector) childrenFromFields.get(3)).setSafe(index, address.getPostalCode());
}

In vectorizePerson, a Person object is mapped to the vectors in the schemaRoot with the person schema. The setSafe method ensures that the backing buffer is big enough to hold the next value. If the backing buffer is not big enough, the buffer will be extended.

A VectorSchemaRoot is a container for a schema and a collection of vectors. As such the class VectorSchemaRoot can be thought of as a schemaless database, the schema is only known when the schema is passed in the constructor, at object instantiation. Therefore all methods, e.g., getVector, have very generic return types, FieldVector in this case. As a result, a lot of casting, based on the schema or knowledge of the dataset, is required. 

In this example, we could have opted to pre-allocate the UInt4Vectors and UInt2Vector (as we know how many people there are in a batch in advance). Then we could have used the set method to avoid buffer size checks and re-allocations to expand the buffer.

The vectorizePerson function can be passed to a ChunkedWriter, an abstraction that handles the chunking and writing to Arrow formatted binary file. 

void writeToArrowFile(Person[] people) throws IOException {
   new ChunkedWriter<>(CHUNK_SIZE, this::vectorizePerson).write(new File("people.arrow"), people);
}

The ChunkedWriter has a write method that looks like this:
public void write(File file, Person[] values) throws IOException {
   DictionaryProvider.MapDictionaryProvider dictProvider = new DictionaryProvider.MapDictionaryProvider();

   try (RootAllocator allocator = new RootAllocator();
        VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(personSchema(), allocator);
        FileOutputStream fd = new FileOutputStream(file);
        ArrowFileWriter fileWriter = new ArrowFileWriter(schemaRoot, dictProvider, fd.getChannel())) {
       fileWriter.start();

       int index = 0;
       while (index < values.length) {
           schemaRoot.allocateNew();
           int chunkIndex = 0;
           while (chunkIndex < chunkSize && index + chunkIndex < values.length) {
               vectorizer.vectorize(values[index + chunkIndex], chunkIndex, schemaRoot);
               chunkIndex++;
           }
           schemaRoot.setRowCount(chunkIndex);
           fileWriter.writeBatch();

           index += chunkIndex;
           schemaRoot.clear();
       }
       fileWriter.end();
   }
}

Let’s break this down. First, we create an (i) allocator, (ii) schemaRoot, and (iii) dictProvider. We need those to (i) allocate memory buffers, (ii) be a container for vectors (backed by buffers), and (iii) facilitating dictionary compression (you can ignore this for now).

Next, in (2) an ArrowFileWriter is created. It handles the writing to disk, based on a VectorSchemaRoot. Writing out a dataset in batches is very easy in this way. Last but not least, do not forget to start the writer.

The rest of the method is about vectorizing the Person array, in chunks, into the vector schema root, and writing it out batch by batch.

What is the benefit of writing in batches? At some point, the data is read from disk. If the data is written in one batch, we have to read all of the data at once and store it in the main memory. By writing batches, we allow the reader to process the data in smaller chunks, thereby limiting the memory footprint.

Never forget to set the value count of a vector or the row count of a vector schema root (which indirectly sets the value counts of all contained vectors). Without setting the count, a vector will appear empty, even after storing values in the vector.

Finally, when all data is stored in vectors, fileWriter.writeBatch() commits them to disk.

A note on memory management

Do note the schemaRoot.clear() and allocator.close() on lines (3) and (4). The former clears all data in all the vectors contained in the VectorSchemaRoot and resets the row and value counts to zero. The latter closes the allocator. If you would have forgotten to free up any allocated buffers, this call will inform you that there is a memory leak.

In this setting, the closing is a bit superfluous, as the program exits shortly after the closing of the allocator. However, in a real-world, long-running application, memory management is critical.

Memory management concerns will feel foreign for Java programmers. But in this case, it is the price to pay for performance. Be very conscious about allocated buffers and freeing them up at the end of their lifetime.

Reading Data

Reading data from an Arrow formatted file is similar to writing. You set up an allocator, a vector schema root (without schema, it is part of the file), open up a file, and let ArrowFileReader take care of the rest. Don’t forget to initialize, as this will read in the Schema from the file.

To read a batch, make a call to fileReader.loadNextBatch(). The next batch, if one is still available, is read from disk and the buffers of the vectors in schemaRoot are filled with data, ready to be processed.

The following code snippet briefly describes how to read an Arrow file. For every execution of the while loop, a batch will be loaded into the VectorSchemaRoot. The content of the batch is described by the VectorSchemaRoot: (i) the schema of the VectorSchemaRoot, and (ii) the value count, equals the number of entries. 

try (FileInputStream fd = new FileInputStream("people.arrow");
    ArrowFileReader fileReader = new ArrowFileReader(new SeekableReadChannel(fd.getChannel()), allocator)) {
   // Setup file reader
   fileReader.initialize();
   VectorSchemaRoot schemaRoot = fileReader.getVectorSchemaRoot();

   // Aggregate: Using ByteString as it is faster than creating a String from a byte[]
   while (fileReader.loadNextBatch()) {
      // Processing … 
   }
}

Processing Data

Last but not least, the filtering, grouping, and aggregating steps should give you a taste of how to work with Arrow vectors in data analytics software. I definitely don’t want to pretend that this is the way of working with Arrow vectors—but it should give a solid starting ground for exploring Apache Arrow. Have a look at the source code of the Gandiva processing engine for real-world Arrow code. Data processing with Apache Arrow is a big topic. You can literally write a book about it

Note that the example code is very specific for the Person use case. When building, for example, a query processor with Arrow vectors, the vector names and types are not known in advance, leading to more generic, and harder to understand, code.

Because Arrow is a columnar format, we can apply the filtering steps independently, using just one column.

private IntArrayList filterOnAge(VectorSchemaRoot schemaRoot) {
    UInt4Vector age = (UInt4Vector) schemaRoot.getVector("age");
    IntArrayList ageSelectedIndexes = new IntArrayList();
    for (int i = 0; i < schemaRoot.getRowCount(); i++) {
        int currentAge = age.get(i);
        if (18 <= currentAge && currentAge <= 35) {
            ageSelectedIndexes.add(i);
        }
    }
    ageSelectedIndexes.trim();
    return ageSelectedIndexes;
}

This method collects all indexes in the loaded chunk of the age vector for which the value is between 18 and 35.

Each filter produces a sorted list of such indexes. In the next step, we intersect/merge these lists into a single list of selected indexes. This list contains all indexes for rows meeting all criteria.

The next code snippet shows how we can easily fill the aggregation data structures (mapping city to a count and a sum), from the vectors and the collection of selected ids. 

VarCharVector cityVector = (VarCharVector) ((StructVector) schemaRoot.getVector("address")).getChild("city");
UInt4Vector ageDataVector = (UInt4Vector) schemaRoot.getVector("age");

for (int selectedIndex : selectedIndexes) {
   String city = new String(cityVector.get(selectedIndex));
   perCityCount.put(city, perCityCount.getOrDefault(city, 0L) + 1);
   perCitySum.put(city, perCitySum.getOrDefault(city, 0L) + ageDataVector.get(selectedIndex));
}

After the aggregation data structure have been filled, printing out the average age per city is very easy:

for (String city : perCityCount.keySet()) {
    double average = (double) perCitySum.get(city) / perCityCount.get(city);
    LOGGER.info("City = {}; Average = {}", city, average);
}

Conclusion

This article introduced Apache Arrow, a columnar, in-memory, cross-language data layout format. It is a building block for big data systems, focusing on efficient data transfers between machines in a cluster and between different big data systems. To get started with developing Java applications using Apache Arrow, we looked at two example applications that write and read data in the Arrow format. We also got the first taste of processing data with the Apache Arrow Java library.

Apache Arrow is a columnar format. A column-oriented layout is usually a better fit for analytics workloads than row-oriented layouts. However, there are always tradeoffs. For your specific workload, a row-oriented format might give better results.

The VectorSchemaRoots, buffers, and memory management will not look like your idiomatic Java code. If you can get all the performance you need, from a different framework, e.g., FlatBuffers, that less idiomatic way of working might play a role in your decision to adopt Apache Arrow in your application.

Author the Author

Joris Gillis is a research developer at TrendMiner. TrendMiner creates self-service analytics software for IIoT time series data. As a research developer, he works on scalable analysis algorithms, time-series databases, and connectivity to external time series data sources. 

Rate this Article

Adoption
Style

BT