- Building a High-Performance SQL Query Engine with Apache Arrow and DataFusion
Apache Arrow and Apache DataFusion form a powerful foundation for building a high-performance SQL query engine in Rust. Arrow provides a language-agnostic columnar in-memory format and efficient data interchange (with zero-copy sharing across systems), while DataFusion is an embedded query engine (written in Rust) that leverages Arrow for in-memory data. This guide covers Arrow’s format and the arrow-rs
crate, the architecture and use of DataFusion, integration points (Parquet, Arrow Flight, object stores), and practical tips for using these technologies in an application (as one might in a startup setting).
Apache Arrow defines a standardized columnar memory layout for tabular data. It is designed for zero-copy data interchange and efficient analytical processing across languages. Key aspects of the Arrow format include how data is laid out in memory (columnar layout), the representation of data types and schemas, memory buffers and validity bitmaps for null values (with alignment/padding for performance), the IPC formats for serialization, and the Arrow Flight protocol for high-speed data transport.
In Arrow’s columnar layout, each column of a table is stored in a contiguous memory region (an array). A table (or RecordBatch) is a collection of equal-length arrays, one per column. Storing data column-by-column (rather than row-by-row) improves cache locality and vectorization for analytical workloads. Each array’s memory is laid out sequentially for values of the same type, which means operations can be applied on entire chunks of values efficiently.
Crucially, Arrow arrays are immutable once created (for safe zero-copy sharing) and relocatable (they can be memcopied between processes without pointer fix-ups). The Arrow format is defined to be little-endian by default for cross-language consistency (Physical memory layout — Apache Arrow v0.12.1.dev425+g828b4377f.d20190316). This layout allows constant-time random access to any element and is designed such that even nested structures (like lists or structs) still enable efficient access by level of nesting.
Data Types and Schema Representation: Arrow has a rich type system (integers, floats, binary, UTF-8 strings, timestamps, nested types like structs, lists, unions, etc.). A Schema in Arrow is a collection of Fields, each field having a name, a data type, and nullability. For example, a field might be "age": Int32 (nullable)
. The schema, along with the data itself, is serialized as part of Arrow’s metadata when data is sent over the wire or saved to disk. Each primitive data type corresponds to a fixed-size binary representation (e.g., Int32 is 4 bytes) or a variable-size one (like strings, which use offset indices). Complex types are built from a combination of primitive arrays (for instance, a list type has an offsets buffer pointing into a child array for values).
Columnar vs Row-based: In a row-based format, each row’s fields are stored together, which is efficient for transactional access patterns but not for analytics. Arrow’s columnar format means memory is organized by columns. This maximizes the efficiency of scans and vectorized operations (e.g., summing all values in a column can happen with sequential memory access). It also aligns with CPU vectorization (SIMD) and modern CPU cache behavior. By standardizing the format, Arrow enables different systems (in possibly different languages) to share data without conversion.
Each Arrow array consists of one or more buffers in memory. A buffer is simply a contiguous chunk of memory. For a given array, buffers store the raw data and associated information needed to interpret it. The simplest case is a fixed-size primitive array (e.g., an Int32
array): it has a values buffer of length = 4 bytes * number_of_elements
storing the binary values. If the array is nullable, it also has a validity bitmap buffer (also called a null bitmap) which flags which entries are null. In the bitmap, each bit corresponds to an element (1 for non-null, 0 for null) (Arrow Columnar Format — Apache Arrow v19.0.1). Eight elements are packed per byte in the bitmap (LSB bit numbering) (Arrow Columnar Format — Apache Arrow v19.0.1). If an array has no nulls (null count = 0), the bitmap can be omitted to save space (Arrow Columnar Format — Apache Arrow v19.0.1).
For example, consider an Arrow array of 5 Int32 values: [1, null, 2, 4, 8]
. This would be represented as a length (5) and null count (1) in metadata, a validity bitmap buffer (with bits 10111
representing null/non-null) and a 20-byte data buffer (5 values * 4 bytes each) storing the numbers 1, (undefined), 2, 4, 8
. The bitmap in memory (in binary) might be 00011101
for those 5 values (packed in a byte with three unused bits) (Arrow Columnar Format — Apache Arrow v19.0.1) (Arrow Columnar Format — Apache Arrow v19.0.1). Figure 1 below illustrates this memory layout for an Arrow array with some nulls:
(Internal structure of Arrow objects • Arrow R Package) Figure 1: Arrow Array memory layout for a nullable Int32
array with values [1, NA, 2, 4, 8]
. The array consists of a metadata header (gray) and two buffers: a validity bitmap (null=0 / valid=1) and a data buffer for values. Here NA
represents a null. The bitmap 00011101
(binary) indicates which entries are valid (reading right-to-left, it corresponds to [1, null, 2, 4, 8]
). The data buffer stores the actual values in 4-byte slots, with undefined contents for null positions. Both buffers are padded to 8-byte multiples for alignment (Arrow Columnar Format — Apache Arrow v19.0.1) (Arrow Columnar Format — Apache Arrow v19.0.1).
Variable-length types (like strings or binary) use offset buffers in addition to data and bitmap buffers. For instance, a StringArray
in Arrow has an offsets buffer (Int32 or Int64 offsets) with length+1
entries, where each offset marks the start of a string’s bytes in the data buffer (Arrow Columnar Format — Apache Arrow v19.0.1). The data buffer then contains all the string bytes back-to-back. There is still a validity bitmap for nulls. Other nested types are built similarly:
- A List has an offsets buffer (to delineate each sublist’s start index in the child values array) and a child array of type T for the values.
- A Struct is basically a group of child arrays (one per field) plus a validity bitmap for the struct as a whole.
- Union types have type buffers (and optionally offsets) to indicate the type of each element.
Alignment and Padding: To maximize hardware efficiency, Arrow specifies that all buffers should be aligned to 8-byte boundaries and padded to multiples of 8 bytes (Physical memory layout — Apache Arrow v0.12.1.dev425+g828b4377f.d20190316). This means even if an array’s data buffer isn’t exactly a multiple of 8 bytes, it will be padded up to the next 8. Moreover, although 8-byte alignment is required, 64-byte alignment is recommended for some cases to better exploit CPU cache lines (Physical memory layout — Apache Arrow v0.12.1.dev425+g828b4377f.d20190316). Such alignment ensures that vectorized operations and DMA transfers can be done without misaligned accesses, improving performance. In practice, the Arrow libraries handle this padding transparently (e.g., when creating an array or serializing to IPC, padding bytes are added as needed). The padded bytes are simply unused memory. Arrow uses little-endian representation, as noted, which means cross-platform data can be shared on little-endian systems without conversion; an endianness field in the metadata can indicate if byte swapping is needed when reading on a big-endian system (Physical memory layout — Apache Arrow v0.12.1.dev425+g828b4377f.d20190316).
Arrow’s DataType metadata describes the logical type of the array’s values (e.g., Int32
, Float64
, Utf8
, List<Int32>
, etc.). Each field in a schema has a data type and a nullability. The schema is effectively a description of columns (fields) and their types. For nested types, the schema will have child fields. For example, a field locations: List<Struct<lat: Double, lon: Double>>
would indicate a list of struct coordinates.
Under the hood, Arrow’s format categorizes types into a few layouts:
- Fixed-size primitives: (int, float, bool) – single data buffer (and optional null bitmap). Booleans are bit-packed (8 booleans per byte) as a special case.
- Variable-size: (binary, string, list) – offset buffer + data buffer (+ null bitmap).
- Nested structs: – each child is an array on its own, plus a parent bitmap.
- Dictionary-encoded: – an indices array (integer values) with its own bitmap, plus a separate dictionary of values (which itself is an Arrow array). Arrow has a built-in dictionary encoding to efficiently represent categorical data.
- Union types: – have buffers for type tags (and possibly offsets) plus children for each type variant.
The Arrow IPC (Inter-Process Communication) metadata captures the schema of a RecordBatch (a batch of rows, analogous to a table chunk). The schema includes all field names, types, and any custom metadata. When writing Arrow data out (to file or stream), the schema is typically the first thing written so that a reader knows how to interpret subsequent buffers of data.
Arrow defines a language-agnostic IPC format for serializing data (for sharing via memory, files, or network). The fundamental unit is a RecordBatch message, which contains a length (number of rows) and a sequence of buffers for each array (per column). The Arrow IPC format uses FlatBuffers (a binary serialization library) to encode the metadata (like schema, record batch descriptors, etc.), followed by the raw binary buffers for the data.
There are two primary IPC formats:
- Streaming format: A stream of record batches (for example, sent over a socket or pipe). In the stream format, a sequence of encapsulated messages (Schema, RecordBatch, DictionaryBatch, etc.) are sent one after another (Arrow Columnar Format — Apache Arrow v19.0.1) (Arrow Columnar Format — Apache Arrow v19.0.1). Each message has a small header (including a sentinel 0xFFFFFFFF and metadata length) and is padded to 8-byte alignment (Arrow Columnar Format — Apache Arrow v19.0.1) (Arrow Columnar Format — Apache Arrow v19.0.1). The first message in a stream is typically the Schema message, describing the schema. Subsequent messages are record batches containing the data. A streaming IPC does not have a footer or global index; it’s meant for one-pass consumption. Streams can be indefinite (e.g., a server continually sending record batches).
- File format: This is an encapsulated stream with extra markers to allow random access. An Arrow file starts with a magic header
ARROW1
and ends with the same magic string (Arrow Columnar Format — Apache Arrow v19.0.1). After the header, the content is basically the same sequence of IPC messages (schema, record batches, etc.), and at the end a metadata footer contains an index of the record batches (their offsets in the file) for random access. The file format is ideal for storage because a reader can seek to a particular batch without reading the entire file. The footer lists the schema and all record batch metadata.
Both formats leverage the encapsulated message structure such that the data buffers are zero-copy ready. For instance, when an Arrow file is memory-mapped, the pointers in the Arrow library can directly point to the memory-mapped region for each buffer. The alignment and padding rules guarantee that even if you concatenate messages, the proper alignment is preserved for each message’s data.
IPC Example: If you were to write two record batches to a file, the file would contain:
- Magic header
ARROW1
- Schema message (FlatBuffer) + padding
- RecordBatch 1 message (metadata + data buffers) + padding
- RecordBatch 2 message + padding
- Footer (FlatBuffer containing the schema, record count, and the file offset and length of each message)
- Magic footer
ARROW1
(again)
The Arrow libraries (including Rust’s arrow
crate) provide readers and writers for these formats so that you typically don’t manually construct these messages – you use, for example, arrow::ipc::writer::FileWriter
or StreamWriter
to write data, and the corresponding FileReader
/StreamReader
to read.
Apache Arrow Flight is a separate subproject/protocol built on top of Arrow’s IPC format to enable high-performance data transfer over networks. Flight is essentially an RPC framework using gRPC under the hood, specialized for Arrow data. The key idea is that instead of, say, REST/JSON or JDBC transferring rows and serializing/deserializing, Flight sends Arrow record batches (already a binary format) directly over the wire with minimal overhead. This avoids conversion at the sending and receiving end – the client and server can directly exchange Arrow memory format.
Flight defines a set of services (RPC methods) for data exchange. At its core, it’s about two things:
- Data streams: A client can request a dataset (identified by a
FlightDescriptor
) and the server will stream back Arrow record batches (this is called a DoGet operation). Conversely, a client can stream upload Arrow batches to a server (DoPut). - Metadata/Control: Flight also provides methods to list what datasets are available (ListFlights), get a schema for a dataset (GetSchema), do an initial handshake/auth, etc., as well as support application-defined RPCs if needed.
Flight is built on gRPC and Arrow IPC, meaning it uses gRPC’s bi-directional streaming capabilities to send the Arrow IPC messages. It ensures that the Arrow buffers remain contiguous through the transmission (often zero-copy from the network buffer to application). The protocol allows for parallel data transfers: a single logical dataset can be split into multiple streams from a cluster of servers to a client, maximizing throughput (Introducing Apache Arrow Flight: A Framework for Fast Data Transport | Apache Arrow). This is useful for scalability (e.g., a distributed query engine can return partitions of data in parallel).
In terms of architecture, an Arrow Flight server implements the Flight service interface. Clients connect using Flight clients available in many languages. For example, a Flight server might implement an endpoint where a specific FlightDescriptor
(like a path or query) will trigger the server to run a query or look up a dataset and then pipe the results (as Arrow record batches) back to the client. Because both sides speak Arrow, no conversion is needed on either end – the client gets Arrow record batches it can directly use (or convert to Pandas, etc., if Python).
Arrow Flight also has an extension called Flight SQL, which standardizes how to execute SQL queries over Flight (like an Arrow-native alternative to ODBC/JDBC). With Flight SQL, a client can send a SQL query to a Flight SQL-enabled server; the server executes it (perhaps via DataFusion or another engine) and returns results as Arrow record batches over Flight. Flight SQL provides interoperability – for example, there’s a Flight SQL JDBC driver that allows BI tools to query a Flight SQL server as if it were a SQL database, getting Arrow data as results (Flight, DataFusion, Arrow, and Parquet: Using the FDAP Architecture to build InfluxDB 3.0 | InfluxData).
Performance: Arrow Flight is designed for speed: it minimizes copies (data is sent in contiguous chunks as it is in memory) and uses efficient protocol buffers only for the small message headers. It can also use zero-copy optimizations (like using memory references when within the same host). Benchmarking has shown it can significantly outperform traditional JDBC/ODBC for large data transfers by keeping everything in columnar form and utilizing modern RPC (gRPC with HTTP/2) features (like streaming and flow control).
In practice, to use Flight in Rust, you would use the arrow-flight
crate, which provides a Rust implementation of the Flight protocol (built on tonic
for gRPC). You can create a Flight server by implementing the Flight service trait (providing methods for DoGet, DoPut, etc.), or use Flight clients to fetch data from a Flight server. The arrow-flight
crate documentation includes examples for building a Flight server (arrow-rs/arrow-flight/README.md at main · apache/arrow-rs · GitHub). Typically, Arrow Flight is more relevant when you have a distributed system or need to feed data to/from remote clients with low latency and high throughput.
The Rust implementation of Arrow is provided by the arrow-rs
project (crates in the arrow
ecosystem). It includes core crates like arrow-array
, arrow-schema
, arrow-buffer
, arrow-ipc
, etc., and it is published on crates.io (for example as arrow
or feature-specific subcrates). The arrow-rs
crates enable you to create and manipulate Arrow data structures in Rust, read/write Arrow IPC streams and files, and interoperate with formats like Parquet. Here we focus on common usage patterns and best practices:
- Array and ArrayRef: In Arrow Rust, each column of data is represented as an
Array
(actually as a traitArray
and concrete struct implementations for each type, likeInt32Array
,Float64Array
,StringArray
, etc.). Typically one works withArrayRef
which is anArc<dyn Array>
– a reference-counted pointer to an immutable array. AnArray
in Rust Arrow has methods to get its length, null count, and to access or iterate over values (often by downcasting to the specific type). - Buffer: A low-level type representing a contiguous memory region (often an
ArrowBuffer
internally). Users don’t often manipulate Buffers directly except when building arrays from raw parts, but it’s good to know that Arrow uses specialized memory allocation for Buffers (which are aligned & padded as per Arrow spec). - RecordBatch: This is a struct representing a collection of equal-length arrays, corresponding to a schema. A
RecordBatch
has aSchema
(which holds the field definitions) and a vector of columns (arrays). It’s analogous to a chunk of a table – all columns for a set of rows. In Rust, you create aRecordBatch
by providing anArc<Schema>
and a list ofArrayRef
for each column, or using helper constructors. For example,RecordBatch::try_new(schema, columns)
will validate that column lengths match the schema and create the batch. There’s alsoRecordBatch::try_from_iter
which builds a schema from an iterator of(name, array)
pairs automatically (RecordBatch in arrow::record_batch - Rust) (RecordBatch in arrow::record_batch - Rust). - Schema and Field: The
Schema
object defines the names and data types of columns. It’s essentially a list ofField
objects. EachField
has a name, aDataType
(which can be a complex nested type), and a boolean for nullability. You might construct aSchema
in Rust with something likeSchema::new(vec![ Field::new("id", DataType::Int32, false), Field::new("value", DataType::Utf8, true) ])
. The Arrow Rust crate provides all the Arrow data types under theDataType
enum (including nested types likeDataType::List(Box::new(DataType::Int32))
etc.).
Using these structures, you can represent your data in memory in Arrow format. For example:
use arrow_array::{Int32Array, StringArray, RecordBatch};
use arrow_schema::{Schema, Field, DataType};
use std::sync::Arc;
// Define a schema: two fields (columns)
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, true),
]));
// Create some columns as Arrow arrays
let id_array = Int32Array::from(vec![1, 2, 3, 4]);
let name_array = StringArray::from_iter([Some("Alice"), Some("Bob"), None, Some("Eve")]);
// Create a RecordBatch from the arrays
let batch = RecordBatch::try_new(schema.clone(), vec![
Arc::new(id_array) as Arc<dyn arrow_array::Array>,
Arc::new(name_array) as Arc<dyn arrow_array::Array>,
])?;
// Access batch data
assert_eq!(batch.num_rows(), 4);
println!("First row id = {}, name = {:?}",
batch.column(0).as_any().downcast_ref::<Int32Array>().unwrap().value(0),
batch.column(1).as_any().downcast_ref::<StringArray>().unwrap().value(0));
// This should print: First row id = 1, name = "Alice"
Note how we downcast column(1)
to StringArray
to get to the .value()
method that returns a Rust &str
(or rather a Cow<str>
internally). The Arrow arrays provide efficient access without copying (the StringArray
holds a buffer of all the bytes and an offset buffer; calling .value(i)
computes the slice in that buffer corresponding to the i-th string).
Also, Arrow arrays often support vectorized operations through the Arrow compute kernels (in arrow-arith
or similar modules). For example, there are functions to filter, take, sort, or perform arithmetic on arrays directly. These utilize the SIMD where possible.
To create Arrow arrays, you can either use convenience constructors like Int32Array::from(vec![...])
for simple cases (which will internally allocate and populate the buffers), or use Array Builder types for more complex or performance-sensitive construction. The Arrow crate provides builders for each type (e.g., Int32Builder
, Float64Builder
, StringBuilder
, ListBuilder
, etc.). Builders manage the memory allocation and null bitmap as you append values, and then you finish()
to produce the immutable array.
Example of using builders:
use arrow_array::{Int32Builder, StringBuilder};
let mut int_builder = Int32Builder::new();
int_builder.append_value(42);
int_builder.append_null();
int_builder.append_value(100);
let int_array = int_builder.finish();
// int_array is an Int32Array with values [42, null, 100]
let mut str_builder = StringBuilder::new();
str_builder.append_value("foo");
str_builder.append_value("bar");
str_builder.append_null();
let str_array = str_builder.finish();
// str_array is a StringArray with values ["foo", "bar", null]
This builder pattern is convenient when you have to construct arrays in a loop or dynamically, because it will handle resizing the buffers and setting null bits for you. Under the hood, builders often double the capacity when needed (amortized linear growth) and use SIMD instructions to set memory (for bitmaps etc.) efficiently. As a best practice, if you know the final size of your array, you can call Int32Builder::with_capacity(n)
to pre-allocate exact capacity to avoid reallocations.
There are also nested builders like ListBuilder<PrimitiveBuilder>
for building list arrays, or StructBuilder
for struct arrays. These allow you to descend into child builders, append values to them, and then indicate when a top-level element is finished. For instance, a ListBuilder<Int32Builder>
might be used like:
let mut list_builder = ListBuilder::new(Int32Builder::new());
list_builder.values().append_value(1);
list_builder.values().append_value(2);
list_builder.append(true); // one list entry [1,2]
list_builder.append(false); // one list entry null
list_builder.values().append_value(3);
list_builder.values().append_value(4);
list_builder.append(true); // one list entry [3,4]
let list_array = list_builder.finish();
This would create a ListArray
of Int32, perhaps representing [[1,2], null, [3,4]]
. The builder API ensures that offsets and validity bits are correctly set. (In this snippet, append(false)
indicates a null list, whereas append(true)
finalizes a list from the values that have been appended since the last finalize.)
Another handy pattern: Arrow builders implement Rust’s Extend
trait, so you can often do builder.extend(from_iterable)
to append from an iterator of Option<T>
values, which can simplify code and internally optimize appending in bulk (arrow_array::builder - Rust).
The arrow-rs
crates include support for reading and writing Arrow IPC (stream and file) as well as converting to/from Arrow’s C data interface (useful for FFI with C/C++). For reading/writing, look at the arrow::ipc
module (or arrow_ipc
crate). Commonly used components:
FileWriter
/FileReader
for writing/reading Arrow files (with the .arrow or .feather file format).StreamWriter
/StreamReader
for the streaming format (useful for sending data over network or pipes).- These operate on
RecordBatch
sequences. For example, you might do:to write two batches to a file. And correspondingly, useuse arrow_ipc::writer::FileWriter; let mut writer = FileWriter::try_new(file, &schema)?; writer.write(&batch1)?; writer.write(&batch2)?; writer.finish()?;
FileReader
on the reading side, which gives an iterator overRecordBatch
es.
The crate also supports Arrow Flight via arrow-flight
(discussed earlier) if you need to send/receive over gRPC, though using Flight involves setting up servers and clients.
Streaming vs File in code: Typically, the stream writer is used when you don’t need random access, e.g., sending response to a client. The file writer is when you want to save to disk. The main difference in usage is just whether you finalize with a footer or not, which the finish()
call handles.
Working with Arrow in Rust can yield very high performance, but here are some best practices and things to be aware of:
- Avoid unnecessary conversions or allocations: Once data is in an Arrow array, try to use Arrow’s kernel functions or iterate in a vectorized fashion rather than extracting element by element. For instance, if you need to compute
col1 + col2
for twoInt32Array
s, using Arrow’sadd
kernel (fromarrow_arith::arith::add
) will operate on whole chunks with SIMD, instead of writing a loop in Rust that checks nulls and adds one by one. - Memory management: The Arrow Rust implementation uses a memory allocator that ensures alignment. If you are interfacing with foreign data (FFI or untrusted input), be careful to uphold the alignment and padding requirements. The crate provides safe interfaces for creating arrays, so normally you won’t violate those invariants unless you use
unsafe
or manual buffer handling. - Reusing allocations: If building many arrays of the same type, reuse builders when possible. Builders can be cleared/reset and re-used to reduce repeated allocations of buffers.
- Chunking large data: If you have a very large dataset, it might be beneficial to chunk it into multiple RecordBatches (for example, 64K rows each) rather than one giant batch of millions of rows. This can help with parallel processing and memory management. DataFusion and other engines often work on a stream of RecordBatches.
- Null handling: The Arrow compute kernels typically handle nulls efficiently by using the validity bitmap. If writing custom algorithms on Arrow arrays, make sure to check the bitmap to avoid treating uninitialized memory as valid data. You can use
Array::is_null(i)
/is_valid(i)
or work with the bitmap buffer for bulk checks. - Arrow vs Arrow2: There is a community-developed crate
arrow2
(by Jorge Carleitao) which provides an alternate implementation with a focus on safety and certain performance aspects. It’s not Apache Arrow official, but if you find a need (like more flexibility or different performance characteristics), you might explore it. The officialarrow-rs
crate, however, is quite performant and is what DataFusion uses internally. - Multi-threading: The Arrow arrays themselves are thread-safe (they are basically immutable data + atomic refcounts on buffers). You can freely share them across threads. If you need to parallelize an operation, you can partition your data and use threads without worrying about thread safety of Arrow structures (just avoid mutating builders from multiple threads simultaneously, as those are not Sync).
In summary, the arrow-rs
crate lets you create Arrow arrays, either by converting from Rust vectors or using builders, and then use those arrays for efficient computation or to feed into DataFusion or to serialize for IPC. Always prefer batch operations over element-wise loops for speed, and leverage the fact that Arrow’s design (like alignment and SIMD optimizations) will give you speed-ups out of the box.
Apache DataFusion is an extensible query engine written in Rust, which uses Apache Arrow as the in-memory data format for intermediate results and computation. DataFusion provides a SQL and DataFrame API to build queries, an optimizer to plan efficient execution, and a vectorized execution engine that can run across multiple threads. It is embeddable – meaning you can use it as a library in your Rust application to perform SQL queries on your data – and is designed to be customized (you can add your own functions, data sources, etc.).
At a high level, DataFusion takes in a SQL query (or DataFrame operations), produces a logical plan (an abstract representation of the query), applies optimization rules to produce an optimized logical plan, then converts that into a physical execution plan composed of execution operators (like scans, joins, aggregations). The physical plan is then executed by the DataFusion runtime, producing results as Arrow record batches (which you can collect in memory or stream).
Logical Plan: When you provide a SQL query to DataFusion, it first parses it using the SQL parser (sqlparser-rs
under the hood). The parser output (an abstract syntax tree of the SQL) is then converted into DataFusion’s LogicalPlan
representation (using the SqlToRel
planner). The logical plan is a representation of the relational algebra of the query, without considering execution details. It consists of operators like Projection, Filter, Scan, Join, Aggregate, etc., linked in a tree (or DAG). The logical plan is analogous to what other systems call the “query plan” before optimization. It’s independent of physical concerns like memory or parallelism; it focuses on what operations to do, not how to execute them.
Optimizer: DataFusion includes a query optimizer with a series of rule-based optimizations (and some constant folding, etc.). It will do things like predicate pushdown (moving filters down to scans), projection pushdown (only read necessary columns), simplifying expressions (folding constants, simplifying boolean expressions), and so on (Insights from paper: Apache Arrow DataFusion: a Fast, Embeddable, Modular Analytic Query Engine | by Hemant Gupta | Medium) (Insights from paper: Apache Arrow DataFusion: a Fast, Embeddable, Modular Analytic Query Engine | by Hemant Gupta | Medium). The optimizer takes the initial logical plan and produces a potentially more efficient logical plan. The optimization rules are extensible; you can add your own or disable certain ones if needed. It’s not as exhaustive as, say, a cost-based optimizer in a full-fledged database, but it covers common transformations to improve performance. After optimization, the logical plan is ready to be converted to a physical plan.
Physical Plan: The physical plan is the actual execution plan that DataFusion will run. In DataFusion, physical plans consist of ExecutionPlan
operators, which are implemented to produce RecordBatches (often as a Rust Stream
of batches for each operator). DataFusion’s physical plan nodes include things like DataSourceExec
(for scanning files, etc.), FilterExec
, ProjectionExec
, HashJoinExec
, AggregateExec
, SortExec
, etc., as well as some special ones like CoalesceBatchesExec
(to ensure batches are of a certain size) and RepartitionExec
(to shuffle partitions if needed for joins/aggregates). The physical planner will take into account the available physical resources – for example, it knows how to partition the plan for multi-core execution. DataFusion by default will try to execute partitions in parallel (using a thread pool via Tokio).
The physical plan is structured as a tree of operators where each operator can have partitions (the unit of parallelism). For example, reading a CSV might result in N partitions (if CSV is read in parallel in chunks), a join might require shuffling partitions, etc. DataFusion uses a pull-based model (each operator pulls from its children). Execution happens by spawning tasks for partitions.
Execution Engine: The DataFusion execution engine is built on asynchronous Rust (using the Tokio runtime). Each partition of the physical plan is executed, and operators implement the Stream
trait to yield batches. The execution is vectorized, meaning each operator processes data in units of RecordBatch
(columnar batch of (e.g.) 1024 or 8192 rows) rather than tuple-at-a-time. It’s also multi-threaded: DataFusion will use a thread pool to schedule tasks, and different partitions can be executed concurrently on different threads. Most operations are implemented to be streaming; they don’t block on the entire dataset (except sorts or global aggregates might need to consume all input). This streaming design means DataFusion can start yielding results as soon as they are computed, and can handle pipelines without materializing all intermediate data on a single thread.
DataFusion’s execution is dataflow-oriented: you can think of data flowing up from table scans, through filters, through joins, etc., with backpressure managed by Rust futures. Because of the Arrow format, even the intermediate data (between operators) is in Arrow arrays, so every operator benefits from the same memory layout and can use SIMD where applicable. The engine is vectorized and uses Arrow’s kernel implementations (like for filtering or math) to operate on whole arrays at once.
One important point is that DataFusion’s physical plan is not tied to a single machine. It’s possible to take a DataFusion logical plan and distribute it (this is what the related project Ballista does for cluster execution), but “out of the box” DataFusion runs in a single process with multi-threading. The design, however, was kept similar to frameworks like Spark or Calcite in terms of separating logical and physical layers, to ease integration into distributed contexts.
To summarize architecture:
- SQL or DataFrame API -> parse/plan -> LogicalPlan (relational algebra) -> optimize -> LogicalPlan (optimized) -> create PhysicalPlan (with execution partitions, specific operators) -> execute on Rust async tasks -> yield Arrow
RecordBatch
results (which can then be converted to other formats or collected).
This architecture follows established patterns in database systems, but with Rust and Arrow ensuring efficiency. DataFusion’s design goals explicitly include providing a high-performance engine with minimal setup and allowing customization at all stages (datafusion - Rust) (datafusion - Rust).
SQL Support: DataFusion supports a subset of SQL (it’s been growing with each release). It can handle:
- SELECT projections (including expressions, math, string ops, etc.)
- WHERE filters
- JOINs (inner, left, right, and probably full outer joins; mostly hash join implementation for equi-joins, and maybe cross join/nested loop for non-equi)
- GROUP BY aggregations (and HAVING)
- ORDER BY and LIMIT
- Subqueries (to some extent; simpler subqueries are handled, but it’s not a full SQL92 coverage yet)
- Common functions: DataFusion comes with many built-in scalar functions (math functions, string functions, date functions, etc.), aggregate functions (SUM, AVG, MIN, MAX, COUNT, etc.), and even window functions (rank, dense_rank, lag, etc. – if window functions are supported by now; it has been an area of active development).
It also supports SQL extensions like EXPLAIN
(to show plan) and CREATE EXTERNAL TABLE
(conceptually, via the Rust API, you register tables).
One thing to note is that DataFusion uses the sqlparser
crate, which means it expects ANSI SQL syntax. Some dialect-specific things might not parse unless added.
DataFrame API: In addition to SQL strings, DataFusion offers a DataFrame API (similar to Spark’s DataFrame or Pandas, but lazy). For example, you can do:
let df = ctx.read_csv("myfile.csv", CsvReadOptions::new()).await?;
let df2 = df.filter(col("x").gt_eq(lit(10)))?
.aggregate(vec![col("y")], vec![min(col("z"))])?
.limit(0, Some(100))?;
let results = df2.collect().await?;
This builds the same logical plan as the SQL SELECT y, MIN(z) FROM myfile WHERE x >= 10 GROUP BY y LIMIT 100
. The DataFrame API is just another way to build queries, ultimately it produces a logical plan that goes through the same optimizer and execution path (datafusion - Rust) (datafusion - Rust). So whether you use SQL or the DataFrame builder, you get the same performance and capabilities.
Extension Points: DataFusion is designed to be extensible at almost every stage. Some extension points include:
- TableProvider / DataSource: You can register custom data sources (not just CSV/Parquet). By implementing the
TableProvider
trait (which specifies how to scan the data, schema, etc.), you can make DataFusion query anything (for example, an in-memory array, a remote API, etc.). DataFusion comes with providers for Parquet, CSV, JSON, Avro, Arrow files, and maybe others (Insights from paper: Apache Arrow DataFusion: a Fast, Embeddable, Modular Analytic Query Engine | by Hemant Gupta | Medium) (Insights from paper: Apache Arrow DataFusion: a Fast, Embeddable, Modular Analytic Query Engine | by Hemant Gupta | Medium), but you can add your own. - User-Defined Functions (UDFs): You can define custom scalar functions and register them with the context, so they can be used in SQL. For example, if you have a Rust function that takes a value and returns something, you can wrap it in a DataFusion UDF (providing a return type, and an Arrow kernel implementation) and use it in
SELECT my_udf(column) FROM ...
. DataFusion provides acreate_udf
function to make this easy, and similarly you can create user-defined aggregate functions (UDAFs) and even user-defined window functions (datafusion - Rust). The UDF will appear in SQL as a recognized function. (In Python, DataFusion even allows you to define a Python function and use it via PyArrow integration – but for Rust usage, you’d use native Rust). - Custom Expressions or Plan Nodes: In cases where you want to extend what the logical plan can do (e.g., a special type of join or a custom optimization), DataFusion allows custom logical plan nodes and corresponding physical execution plan nodes. This is an advanced extension: you would implement a new struct that implements the
LogicalPlan
trait or rather you’d use extensions in the plan and provide your ownPhysicalPlanner
to create exec nodes. There are documented ways to do this (extending the planner) (datafusion - Rust) (datafusion - Rust). - Optimizer rules: You can add your own optimizer rules (implementing trait
OptimizerRule
) and plug them in. For example, if you know a specific pattern in your data that can be optimized in a custom way, you could write an optimizer pass. - Scheduler / distributed execution: DataFusion’s core is single-process, but it was built to be used in distributed scenarios via Ballista. If needed, one could integrate DataFusion with a cluster scheduler or even use the emerging Substrait standard (DataFusion has support for serializing plans to Substrait protobuf, which could be sent to another system).
Because of these extension points, DataFusion is suitable for embedding in custom data systems: you can use the parts you want and replace parts. For most users, the typical extensions are adding UDFs or custom data sources. The public API is relatively stable compared to internal code, so DataFusion maintainers encourage using these extension points rather than forking the code (Architecture — Apache DataFusion documentation) (Architecture — Apache DataFusion documentation).
Using DataFusion in Rust usually goes like this:
-
Create a context: This is represented by
SessionContext
(previouslyExecutionContext
in older versions). TheSessionContext
holds configuration, registered tables, and the execution runtime. You create one withlet mut ctx = SessionContext::new();
. -
Register data source (table): There are convenience methods to register CSV, Parquet, etc. For example,
ctx.register_csv("mytable", "path/to/file.csv", CsvReadOptions::new()).await?;
will register a table named "mytable" that you can query (Example Usage — Apache DataFusion documentation). Similarly,register_parquet
,register_json
, etc. You can also register an in-memory table by providing aVec<RecordBatch>
usingMemTable
as a TableProvider.- If you have custom data, you can use
ctx.register_table("name", Box::new(my_table_provider))?;
. - DataFusion will infer schema if you don’t provide one for CSV/JSON (you can provide options for schema inference or specify schema).
- If you have custom data, you can use
-
Execute a query: There are two main ways:
- SQL string:
let dataframe = ctx.sql("SELECT * FROM mytable WHERE ...").await?;
. This returns aDataFrame
(DataFusion’s lazy query representation). You then either dodataframe.collect().await?
to execute and get results in memory (asVec<RecordBatch>
), or you can dodataframe.show().await?
to print results to stdout (mainly for quick REPL use), ordataframe.write_parquet(...).await?
to write to Parquet, etc. TheDataFrame
API allows further chaining as well, e.g., you could dodataframe.filter(...)?
on it as an alternative to writing the full SQL up front. - DataFrame API: As described, you can construct the query by starting with a
ctx.read_X
method orctx.table("mytable")?
to get a DataFrame, then calling methods like.select()
,.filter()
,.join()
, etc. Finally, call.collect().await?
on the final DataFrame to get results.
For example (SQL way):
let df = ctx.sql("SELECT a, MIN(b) FROM mytable WHERE a <= b GROUP BY a LIMIT 100").await?; let results = df.collect().await?;
This would plan and execute the query, returning a vector of RecordBatches (perhaps just one batch if small, or multiple if partitioned).
Example (DataFrame way, equivalent to above):
use datafusion::prelude::*; use datafusion::common::Result; #[tokio::main] async fn main() -> Result<()> { let ctx = SessionContext::new(); ctx.register_csv("mytable", "path/to/example.csv", CsvReadOptions::new()).await?; let df = ctx.table("mytable").await? .filter(col("a").lt_eq(col("b")))? .aggregate(vec![col("a")], vec![min(col("b"))])? .limit(0, Some(100))?; let results = df.collect().await?; for batch in results { pretty::print_batch(&batch)?; } Ok(()) }
Both approaches use async/await because DataFusion’s planning and execution are async (file I/O is async, etc.). Ensure you run on a Tokio runtime (as shown with
#[tokio::main]
). - SQL string:
-
Inspecting results: The result of
.collect()
isVec<RecordBatch>
. You might then convert that to your desired format. For example, convert to aDataFrame
in Polars or to JSON, or just iterate. Arrow provides pretty printers and JSON serialization if needed. If the result is large, you might prefer to stream results rather than collect all at once. DataFusion’s DataFrame also has a.execute_stream()
which gives you aStream
of RecordBatch so you can consume batches one by one without storing everything in memory.Also, DataFusion supports
EXPLAIN
in SQL to get the plan. For debugging performance,EXPLAIN ANALYZE
can even run the query and return execution metrics.
Streaming vs Batch: DataFusion’s internal execution is streaming, but the API by default collects to completion. If you need true streaming (e.g., piping results out continuously), you can use the DataFrame::execute_stream()
as mentioned, or even use the PhysicalPlan
API directly to get a stream from an ExecutionPlan
. However, for most uses, collecting into memory is fine for moderate result sizes.
One of the strengths of DataFusion is allowing custom logic to be added:
-
Scalar UDF (User-Defined Function): Suppose you have a Rust function that computes something not built-in, like a special string manipulation or a complex calculation. You can wrap it as a DataFusion UDF. The API (somewhat simplified) is:
let my_udf = create_udf("my_func_name", Vec<DataType> -> DataType, Volatility, fun_scalar)
wherefun_scalar
is a closure or function that takes an array of values (as ArrowArrayRef
s) and returns an ArrowArrayRef
. You then doctx.register_udf(my_udf)
. After that, in SQL you can doSELECT my_func_name(column) FROM table
. DataFusion will call your function for each batch (vectorized, receiving an Arrow array and expected to return an Arrow array of equal length). You don’t handle one value at a time; you handle whole arrays, which is what makes it efficient. Your function can internally downcast theArrayRef
to a concrete type, perform computation (ideally using Arrow kernels or SIMD as well), and produce a newArrayRef
.Similarly, aggregate UDFs (UDAFs) can be created by specifying update/merge/finalize logic (this is a bit more involved, as you need to define a struct that implements the aggregator trait).
Example: If you wanted a UDF
add_one(x)
that adds 1 to an integer, you could implement it using Arrow compute:use datafusion::physical_plan::functions::make_scalar_function; use arrow::array::{Int32Array, Int32Builder}; let func = make_scalar_function(|args: &[ArrayRef]| { let input = args[0].as_any().downcast_ref::<Int32Array>().unwrap(); // Create output by adding 1 to each value let result: Int32Array = input.iter().map(|opt_v| opt_v.map(|v| v + 1)).collect(); Ok(Arc::new(result) as ArrayRef) }); let udf = create_udf( "add_one", // input types vec![DataType::Int32], // return type Arc::new(DataType::Int32), Volatility::Immutable, func ); ctx.register_udf(udf); // Now "add_one" can be used in SQL or DataFrame expressions.
This example uses
iter().map()
which is a safe way but note it will be a bit slower than using Arrow’s own vectorized add kernel. A more efficient implementation might usearrow_arith::add
by adding an array of 1s. But it demonstrates the idea. -
User-Defined Table Functions / Sources: If you want DataFusion to query a custom data source, implement the
TableProvider
trait. You need to provide schema, and a methodscan
which returns anExecutionPlan
(usually you return a customExecutionPlan
that knows how to fetch the data). For instance, you could implement aTableProvider
for an HTTP API that when scanned, hits the API and yields results. Once you have it, you doctx.register_table("myapi", Arc::new(MyApiTableProvider {}))
. Then you canSELECT * FROM myapi
in queries.
DataFusion also has a concept of listing tables (for filesystem directories) and object store integration which we will touch on in the next section, but from the extension perspective, you can plug in anything as a table.
- Custom Plan Nodes: For specialized needs, one can implement new logical node types or physical exec nodes. DataFusion has recently formalized some of this via an
Extension
node. This could be used, for example, to add a new join algorithm or an optimized operator that the standard DataFusion doesn’t have. You would then instruct the planner/optimizer to use it. This is an advanced use-case and requires digging into DataFusion’s code, but it’s possible and some forks have done that for specific use cases.
To ensure that adding custom functions or sources doesn’t break, DataFusion’s maintainers prefer contributions of extension points. The project even has a datafusion-contrib
where experimental features (like the S3 object store, or a connector to other systems) can live until stable (Introducing Apache Arrow DataFusion Contrib - Apache DataFusion Blog). The key takeaway: DataFusion is built to be a framework for building database systems, not just a black-box SQL engine.
A query engine doesn’t live in isolation; you often need to integrate with storage formats, other protocols, or distributed environments. DataFusion and Arrow provide several integration points:
Parquet is a popular columnar on-disk format, and it complements Arrow well (Parquet for storage, Arrow for in-memory). The Rust Arrow project includes a crate (often just called parquet
on crates.io) which is the official native Rust implementation of Parquet. It provides the ability to read Parquet files into Arrow arrays and write Arrow arrays out to Parquet files. DataFusion uses this under the hood to implement ctx.read_parquet()
and to allow querying Parquet files.
To read Parquet directly using the Arrow API, you can use ParquetRecordBatchReader
. For example:
use parquet::arrow::ParquetRecordBatchReader;
use std::fs::File;
let file = File::open("data.parquet")?;
let reader = ParquetRecordBatchReader::try_new(file)?;
for batch in reader {
let batch = batch?;
println!("Read {} rows", batch.num_rows());
}
This will yield RecordBatch
es. Under the hood it’s decoding Parquet column chunks and assembling Arrow arrays.
Writing is also straightforward using ArrowWriter
:
use parquet::arrow::{ArrowWriter, WriterProperties};
use parquet::basic::Compression;
let file = File::create("out.parquet")?;
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))?;
writer.write(&batch)?;
writer.close()?;
This snippet creates a Parquet file with Snappy compression, writing one RecordBatch (you could call write()
multiple times for multiple batches) (parquet::arrow - Rust) (parquet::arrow - Rust). You must call close()
to finalize the file (write the footer).
DataFusion makes this easier by allowing a SQL statement CREATE EXTERNAL TABLE ... STORED AS PARQUET
, or simply by using ctx.register_parquet("table", "path", ParquetReadOptions::default())
. Once registered, you can query the Parquet data like a table. DataFusion will push down filters and column projections into the Parquet reader where possible (to take advantage of Parquet page stats, etc., although the pushdown in Rust might be limited to just not materializing unused columns).
Integration tip: If your data is stored in Parquet, using DataFusion to query it is very convenient because DataFusion will read only the necessary parts. It uses the Arrow Parquet integration which is quite efficient (though the Rust Parquet implementation may not be as optimized as the C++ one, it’s improving). DataFusion can also write results to Parquet (through DataFrame API write_parquet
).
If you want to query data that is spread across nodes or serve query results to clients, Arrow Flight can be a building block:
- DataFusion with Flight: You could imagine a setup where you have a Flight server that wraps DataFusion. A client could send a query (perhaps via Flight SQL) and the server runs DataFusion to execute it, then streams back Arrow record batches via Flight. This way, you leverage DataFusion for the heavy lifting and Flight for the transport. In fact, there is work in DataFusion to support Flight SQL – for example, the
datafusion-cli
can connect to a Flight SQL server. - Ballista: Apache Ballista was an adaptation of DataFusion for distributed execution, using Arrow Flight for shuffling data between nodes. Ballista essentially would take a DataFusion physical plan, partition it for multiple executors, use Flight to send executors their tasks and to shuffle intermediate results (each executor is a mini DataFusion engine processing a partition). As of late, Ballista is being merged more tightly with DataFusion (it might even just be DataFusion’s distributed mode now, often referred to as DataFusion cluster mode). It uses gRPC and Flight under the hood to coordinate. The scheduler in Ballista would plan the distributed query, and executors (which run DataFusion on subsets of data) would send data between each other via Flight. If you are building a distributed system, you can use these components – but note that this is complex; a simpler approach if you have a primarily single-node but need to feed data out, is to just run a Flight server on top of DataFusion.
- Client integration: On the client side, many languages have Flight clients (Python, Java, C++, etc.). So if your Rust server uses DataFusion to handle SQL and Arrow Flight to send results, a client in (say) Python could connect with PyArrow’s Flight client and get a Pandas DataFrame result with no serialization overhead.
In short, Arrow Flight is the mechanism for distributed Arrow data. Use it when you need to either (a) parallelize execution across nodes (shuffle data, etc.), or (b) serve results to remote clients efficiently. DataFusion doesn’t automatically distribute unless you set up something like Ballista, but you can manually orchestrate multiple processes and use Flight to exchange data among them.
Setting up an Arrow Flight server in Rust involves the arrow-flight
crate (which uses tonic under the hood). You implement the FlightService
trait methods. For example, for DoGet
you would look at the FlightDescriptor
and based on that, perhaps run a DataFusion query or fetch a table, then return a Stream<RecordBatch>
as a response. The arrow-flight
crate provides utilities to easily send RecordBatches over the channel. The Flight protocol will handle chunking large batches and let the client stream.
If using Flight SQL, you could integrate with DataFusion by implementing the Flight SQL server (which handles actions like GetSqlInfo
, Execute
, etc.) and translating those into DataFusion calls. This is more involved, but the benefit is you instantly become compatible with JDBC/ODBC via Flight connectors (Flight, DataFusion, Arrow, and Parquet: Using the FDAP Architecture to build InfluxDB 3.0 | InfluxData).
In modern data architectures, data might reside not just on local disk but in cloud object storage (S3, Azure Blob, GCS). Arrow and DataFusion have been evolving to support this scenario:
- There is a Rust crate
object_store
(from the Apache Arrow ecosystem, donated by InfluxData) which provides a unified abstraction over object storage (Rust Object Store Donation | InfluxData) (Rust Object Store Donation | InfluxData). It supports local filesystem, Amazon S3, Azure, Google Cloud Storage, and in-memory, through a common trait. - DataFusion leverages this to allow, for example, reading a Parquet file from S3 by just specifying an S3 path.
DataFusion and S3:
A specific crate datafusion-objectstore-s3
exists which implements an S3-backed ObjectStore and a TableProvider
that can list files from S3 (Introducing Apache Arrow DataFusion Contrib - Apache DataFusion Blog). With this, you can register an S3 bucket or prefix as a table in DataFusion. For instance:
use datafusion::datasource::listing::ListingOptions;
use datafusion_objectstore_s3::object_store::s3::S3FileSystem;
use aws_sdk_s3::Config;
let s3 = S3FileSystem::new_your_region(...); // create an S3 filesystem client
ctx.runtime_env().register_object_store("s3", s3);
ctx.register_listing_table("mydata", "s3://my-bucket/path/", ListingOptions::new(ParquetFormat::default())).await?;
This is conceptual (actual API may differ slightly), but essentially you can prefix paths with s3://
and DataFusion, if configured with an S3 object store, will know how to read those. The DataFusion S3 integration aims to make querying data in S3 almost as easy as local files (Introducing Apache Arrow DataFusion Contrib - Apache DataFusion Blog). It handles the fact that you need asynchronous reads (using the AWS Rust SDK under the hood).
Listing and Partitioned Data: DataFusion’s listing
tables can treat a directory of files as a single table (similar to how Hive/Spark treat partitions). For instance, if you have a partitioned Parquet dataset on S3 (say partitioned by date), you can point DataFusion at the root and it will read all Parquet files under it. If your directory structure encodes partition columns, DataFusion can even automatically add those as columns (this is part of ListingTable functionality).
Local filesystem and others: By default, DataFusion has a local filesystem object store registered for schemes file://
and empty scheme. You typically can just use local paths. For other object stores, you register them. The object_store
crate currently supports:
- Local (through standard file I/O),
- In-memory (useful for tests),
- AWS S3,
- Azure Blob,
- Google Cloud Storage.
It’s possible to extend to others by implementing the trait.
Using Object Store directly: If not using DataFusion, you could still use the object_store
crate along with the Parquet/Arrow reader by providing an object store as the source of data (for example, reading a file from S3 via a streaming HTTP range request). But DataFusion abstracts that so you can simply register and query.
Note on performance: Reading from object stores can be slower (because of network latency). DataFusion’s readers try to use large request sizes and parallel prefetch if possible. For example, Parquet is typically read in parallel by row groups or by ranges; the object_store abstraction supports range reads.
In summary, DataFusion in a cloud environment can query data directly in S3 by using the object store integration. This means you don’t have to download files manually; you just point DataFusion at the S3 path. Under the hood it might use AWS SDK with async GET requests. This is powerful for serverless or cloud analytics use cases.
- ODBC/JDBC via Flight SQL: As mentioned, if you implement Flight SQL on top of DataFusion, you essentially get ODBC/JDBC connectivity for free (because Arrow provides a Flight SQL ODBC driver). This could be a way to connect BI tools to your Rust-based query engine.
- Python Integration: There’s also a
datafusion-python
project (PyO3 bindings) that let you use DataFusion from Python as if it were a library (with Arrow data interchange). That’s more of an aside, but it shows the ecosystem is trying to integrate DataFusion across languages. - Substrait: DataFusion can export and import Substrait plans (Introducing Apache Arrow DataFusion Contrib - Apache DataFusion Blog), which is an industry-standard for representing queries. This could allow, for example, another query planner to generate a plan and execute it in DataFusion (or vice versa). It’s still emerging tech, but it’s an integration point to keep an eye on for the future, especially in multi-engine environments.
When embedding DataFusion and Arrow in a new application (say you’re building a SaaS or an internal tool that requires query engine capabilities), there are some best practices and considerations to ensure you get the most out of them:
- Leverage “out of the box” defaults: DataFusion is designed to work with minimal configuration to provide a “world class” query engine out of the box (datafusion - Rust). Use the default
SessionContext
and default settings initially – they enable multi-threading and reasonable batch sizes. Only tweak if needed (viaSessionConfig
if you have specific needs like enabling/disabling certain optimizations or setting a target batch size). - Data loading and caching: If your data is on disk (or S3), be mindful of how often it’s read. DataFusion does not (as of now) automatically cache data between queries (unless you keep the same context and use
ctx.cache_table("t", ...)
). If you have frequent queries on the same data, you might manually cache results (e.g., read a large Parquet into memory as an ArrowMemTable
). MemTable allows registering an in-memory table (you supply the RecordBatches). This can accelerate repeated queries at the cost of memory. - Concurrent queries: You can execute multiple DataFusion queries in parallel (just create separate contexts or even share one context, though sharing context might serialize access when registering tables). The heavy lifting is done on the Tokio threadpool, which can handle many futures. Ensure your process has enough threads (Tokio’s default runtime will use as many threads as CPUs by default). DataFusion’s operations are thread-safe for parallel query execution.
- Resource limits: Currently, DataFusion might not have built-in resource quota management (like memory limits per query). In an embedded setting, be cautious with very large queries as they could consume a lot of memory. You might need to implement safeguards (e.g., limit the amount of data that can be scanned or use
LIMIT
to constrain results). - Schema management: If you have evolving schemas or many tables, consider using DataFusion’s catalog features. You can have multiple schemas (namespaces) and register many tables. It might be useful to wrap DataFusion in an API that updates the schema as your underlying data changes.
- Version compatibility: Arrow and DataFusion versions move fast. Keep an eye on upgrade guides (DataFusion release notes often highlight breaking changes). Pin the version in Cargo.toml, and test when upgrading.
Using DataFusion (and Arrow) has certain cost/performance implications:
- Memory vs CPU: Arrow’s columnar format is memory-heavy (data is uncompressed in memory, and also padded/aligned). This is a deliberate trade-off to gain CPU speed. If your data is very large (doesn’t fit in RAM), DataFusion can still query it (especially from Parquet in chunks), but ensure that you project only needed columns and filter early to minimize in-memory footprint. Parquet is compressed; reading it into Arrow will use more memory. You might want to estimate memory usage or chunk processing if needed.
- Latency vs Throughput: DataFusion is optimized for throughput (scanning millions of rows/second). The latency for small queries might be a bit higher than a hand-coded solution because it goes through planning, etc. However, for most use cases the overhead is small (milliseconds) and not an issue unless you’re doing many tiny queries per second. If you need sub-millisecond responses for single-row lookups, an arrow-based column store might not be ideal (that’s more the realm of key-value stores). But for analytical queries, DataFusion performs well.
- Parallelism: DataFusion will try to use all cores. This is great for large queries, but if you run many queries concurrently, they’ll compete for cores. You might need to size the thread pool or use tokio’s cooperative scheduling to ensure fairness. Currently, each DataFusion query will spawn tasks and the runtime will schedule them; you might observe high CPU if multiple big queries run together.
- Feature completeness vs customization: DataFusion might not (yet) implement every SQL feature (e.g., complex subqueries, CTEs, some window functions). If you encounter a limitation, you can often work around it (e.g., break a query into two, or implement a UDF for something). The trade-off here is development time vs waiting for upstream improvements. Since DataFusion is active, many SQL gaps are closing over time. The flipside is you can customize a lot – if a certain optimization doesn’t exist, you can implement a specific solution for your case due to the extensibility.
- Comparing to other engines: If you considered alternatives like DuckDB (another embeddable engine) or writing a custom engine: Arrow/DataFusion excel when you want a library approach (no separate database process) and interoperability (Arrow data can be sent to Python or others easily). DuckDB might have some more advanced SQL features or different performance characteristics (e.g., very fast on single-thread sometimes). DataFusion’s advantage is Rust safety, multi-thread scaling, and easier integration into a Rust ecosystem (and easier to extend in Rust). For a startup, using DataFusion means you don’t have to implement a ton of query logic, but you should allocate some time to learn its API and possibly contribute if you need a missing feature.
When running a query engine in production, you need insight into query performance and behavior:
- Execution Metrics: DataFusion collects metrics for each operator (if enabled) – things like the number of rows produced, time taken, etc. By running an
EXPLAIN ANALYZE
on a query, you can get a report of how many nanoseconds each part took, how many batches, etc. Programmatically, theExecutionPlan
can be inspected for metrics after execution (each operator has an optional metrics registry). For example, after.collect()
you could iterate throughplan.metrics()
to gather stats (this API may require enabling a feature flag or using internal structures). These metrics can be logged or displayed to users. They help identify bottlenecks (e.g., if a join took most time or if a lot of time was spent reading files). - Logging: DataFusion uses the
log
crate for some debug info. You can turn on debug logging to see, for example, the logical plan, the optimized plan, and the physical plan printed out. It’s helpful during development to verify that your filters are being pushed down or such. - Tracing: You could integrate with
tracing
crate to instrument query execution. For instance, wrap the.collect().await
in a span, etc. Each operator doesn’t natively emit tracing events, but you can measure around the entire query or per step if you break it down (like manually plan then execute). - Error handling: DataFusion returns errors (
DataFusionError
) for issues like parsing or execution problems. You should handle these gracefully, perhaps by reporting back to the user with the SQL error message. - Monitoring resource usage: Since DataFusion runs in-process, standard process metrics (CPU, memory) are your primary signals. You might add guards in your application to cancel a query if it exceeds some threshold (DataFusion does support a sort of cancellation by dropping the future, but a more cooperative cancellation might require custom code).
For a startup embedding DataFusion, it could be valuable to implement a small web endpoint or CLI command to show current queries, or perhaps to dump the plan of a running query. While DataFusion doesn’t have a built-in query manager UI, you can build something to track when queries start and finish in your app.
Debugging Tips: If you suspect something like “why is my query slow?”, use EXPLAIN
to get the plan. Ensure that it’s doing what you expect (e.g., it’s using an index if you implemented one, or it’s pruning partitions). If not, you may need to adjust how you query (like adding a filter to help it out). If the query plan looks fine, then you might profile the Rust code (with a profiler) to see where time is spent – e.g., maybe decoding Parquet is the bottleneck (then maybe enabling predicate pushdown or using a different file format could help). Because DataFusion is vectorized, often the bottlenecks are either I/O (reading data) or a single-threaded portion like if you do an order-by without limit (it gathers on one thread). For large sorts, consider adding a LIMIT or filtering more.
The Arrow project (and DataFusion) is under active development. New releases come with improvements (for instance, Arrow might add a new data type or Flight might add a feature; DataFusion might optimize a function or add SQL features). It’s good to periodically update to newer versions to get performance boosts and bug fixes, but always read the release notes. Minor version bumps can include breaking changes as the API evolves (especially in DataFusion’s extension traits). However, the core usage (register table, run SQL) remains fairly consistent.
Because you are building on these, it’s worth engaging with the community – Arrow dev mailing list or GitHub issues – if you encounter bugs or have feature requests. Since you are effectively building a query engine for your app, you have the source and power to debug issues deeply if needed (unlike a black box database).
Imagine you are building an analytics service where users upload data (CSV/Parquet) to an S3 bucket and then run SQL queries via a web UI:
- You can use DataFusion to query the data on S3 directly, using the S3 object store integration, avoiding a ETL to a database.
- Use Arrow Flight to serve results to a frontend or to allow a Python client to connect for big result sets.
- Implement a few UDFs for domain-specific calculations that SQL doesn’t have.
- Use Parquet as the storage format for efficiency (perhaps convert incoming CSV to Parquet and register those).
- For multi-tenant handling, you might use a separate
SessionContext
per user or per query to isolate catalogs. - Monitor query times and log them; possibly use
EXPLAIN ANALYZE
for heavy queries to see where time goes. - Keep Arrow updated to benefit from speedups (say, a newer version might make filter 2x faster due to SIMD optimizations).
This way, you can deliver a lot of database-like functionality without writing a database from scratch – leveraging Arrow for memory format and DataFusion for SQL, and still be in control of the integration in your Rust application.
Apache Arrow and DataFusion together enable high-performance analytics in Rust. Arrow’s columnar format ensures that your in-memory data is optimized for analytics, and DataFusion builds on it to provide a full SQL execution engine. By understanding the memory format (Arrow) and the execution model (DataFusion’s query plans and engine), you can architect a system that efficiently handles large data with the convenience of SQL. Moreover, their integration with formats like Parquet, and protocols like Arrow Flight, means your engine can interoperate with the big data ecosystem (Python pandas, BI tools, cloud storage) with ease. As you deploy this in a startup environment, pay attention to performance tuning (filter pushdown, parallelism) and observability (logging plans, metrics), and you’ll have a powerful, modern analytics backend with relatively little effort compared to writing one from scratch. Happy querying!
Sources:
- Apache Arrow Columnar Format documentation – details on memory layout, buffers, and bitmaps (Arrow Columnar Format — Apache Arrow v19.0.1) (Arrow Columnar Format — Apache Arrow v19.0.1).
- Apache Arrow Format IPC specification – description of streaming vs file format and alignment/padding requirements (Arrow Columnar Format — Apache Arrow v19.0.1) (Arrow Columnar Format — Apache Arrow v19.0.1).
- Apache Arrow Flight RPC documentation – high-level overview of Flight and its RPC methods (Arrow Flight RPC — Apache Arrow v19.0.1).
- Apache Arrow Rust documentation (
arrow-rs
) – examples of array builders and usage in Rust (arrow_array::builder - Rust) (Internal structure of Arrow objects • Arrow R Package). - Apache Arrow Rust Parquet documentation – reading/writing Arrow
RecordBatch
es to Parquet in Rust (parquet::arrow - Rust) (parquet::arrow - Rust). - Apache DataFusion crate documentation – DataFusion features, SQL/DataFrame API, and architecture goals (datafusion - Rust) (datafusion - Rust).
- Apache DataFusion user guide and examples – registering CSV, running SQL queries, using DataFrame API (Example Usage — Apache DataFusion documentation) (datafusion - Rust).
- Apache DataFusion contributor docs – extension points (TableProvider, UDFs, optimizer rules) and use cases (datafusion - Rust) (Introducing Apache Arrow DataFusion Contrib - Apache DataFusion Blog).
- Apache DataFusion blog posts – DataFusion usage in Python, S3 integration, and Flight SQL in practice (Introducing Apache Arrow DataFusion Contrib - Apache DataFusion Blog) (Flight, DataFusion, Arrow, and Parquet: Using the FDAP Architecture to build InfluxDB 3.0 | InfluxData).
- “Rust Object Store Donation” by InfluxData – explanation of the unified object store abstraction for Arrow (S3, GCS, etc.) (Rust Object Store Donation | InfluxData) (Rust Object Store Donation | InfluxData).