Skip to content

Instantly share code, notes, and snippets.

@sundy-li
Created November 30, 2022 15:42
Show Gist options
  • Save sundy-li/4ef25f0f14de1d0068191a03bd059c5d to your computer and use it in GitHub Desktop.
Save sundy-li/4ef25f0f14de1d0068191a03bd059c5d to your computer and use it in GitHub Desktop.
arrow-arrow2-duckdb
use std::fs;
use std::fs::File;
use std::sync::mpsc::channel;
use std::time::SystemTime;
use arrow2::error::Error;
use arrow2::io::parquet::read::{self, FileReader};
use threadpool::ThreadPool;
// wget https://repo.databend.rs/alapha/input.parquet -O /tmp/input.parquet
// for i in `seq 1 128`;do ✔ │ sundy@arch
// cp -rf /tmp/input.parquet /tmp/input_$i.parquet
// done
// cargo run --example arrow2_read --release
// cost -> 836 ms
// compare with duckdb
// select max(url) from read_parquet('/tmp/*.parquet', binary_as_string=True);
// ┌────────────────────────┐
// │ max(url) │
// │ varchar │
// ├────────────────────────┤
// │ http://zarplatia-nogin │
// └────────────────────────┘
// Run Time (s): real 0.430 user 4.347991 sys 4.099678
fn main() -> Result<(), Error> {
let files: Vec<String> = fs::read_dir("/tmp/")
.unwrap()
.map(|f| f.unwrap().path().display().to_string())
.filter(|f| f.ends_with(".parquet"))
.collect();
let start = SystemTime::now();
// duckdb use
let n_workers = 16;
let n_jobs = files.len();
let pool = ThreadPool::new(n_workers);
let (tx, rx) = channel();
for file in files {
let tx = tx.clone();
pool.execute(move || {
let chunk = builder_chunk(&file);
read_chunk(chunk);
tx.send(1).unwrap();
});
}
assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), n_jobs);
println!("cost -> {:?} ms", start.elapsed().unwrap().as_millis());
Ok(())
}
fn builder_chunk(f: &String) -> FileReader<File> {
let mut reader = File::open(&f).unwrap();
let metadata = read::read_metadata(&mut reader).unwrap();
let schema = read::infer_schema(&metadata).unwrap();
let schema = schema.filter(|_index, _field| _field.name == "url");
let row_groups = metadata
.row_groups
.into_iter()
.enumerate()
.map(|(_, row_group)| row_group)
.collect();
// duckdb use 2048 batch size
let chunk_size = Some(2048);
// we can then read the row groups into chunks
let chunks = read::FileReader::new(reader, row_groups, schema, chunk_size, None, None);
chunks
}
fn read_chunk(chunk: FileReader<File>) {
let mut rows = 0;
for maybe_chunk in chunk {
let chunk = maybe_chunk.unwrap();
rows += chunk.len();
}
assert_eq!(rows, 106742);
}
use std::fs;
use std::fs::File;
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::time::SystemTime;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::ProjectionMask;
use threadpool::ThreadPool;
// cargo run --example arrow_read --release
// arrow-rs cost -> 499 ms
fn main() {
let files: Vec<String> = fs::read_dir("/tmp/")
.unwrap()
.map(|f| f.unwrap().path().display().to_string())
.filter(|f| f.ends_with(".parquet"))
.collect();
let start = SystemTime::now();
let n_workers = 16;
let n_jobs = files.len();
let pool = ThreadPool::new(n_workers);
let (tx, rx) = channel();
for file in files {
let tx = tx.clone();
pool.execute(move || {
read_buffer(&file);
tx.send(1)
.expect("channel will be there waiting for the pool");
});
}
assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), n_jobs);
println!("arrow-rs cost -> {:?} ms", start.elapsed().unwrap().as_millis());
}
fn read_buffer(path: &String) {
let file = File::open(path).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let original_schema = Arc::clone(builder.schema());
let column = original_schema.index_of("url").unwrap();
let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![column]);
let parquet_reader = builder
.with_batch_size(2048)
.with_projection(mask)
.build()
.unwrap();
let mut rows = 0;
for batch in parquet_reader {
rows += batch.unwrap().num_rows();
}
assert_eq!(rows, 106742);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment