Created
November 30, 2022 15:42
-
-
Save sundy-li/4ef25f0f14de1d0068191a03bd059c5d to your computer and use it in GitHub Desktop.
arrow-arrow2-duckdb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::fs; | |
use std::fs::File; | |
use std::sync::mpsc::channel; | |
use std::time::SystemTime; | |
use arrow2::error::Error; | |
use arrow2::io::parquet::read::{self, FileReader}; | |
use threadpool::ThreadPool; | |
// wget https://repo.databend.rs/alapha/input.parquet -O /tmp/input.parquet | |
// for i in `seq 1 128`;do ✔ │ sundy@arch | |
// cp -rf /tmp/input.parquet /tmp/input_$i.parquet | |
// done | |
// cargo run --example arrow2_read --release | |
// cost -> 836 ms | |
// compare with duckdb | |
// select max(url) from read_parquet('/tmp/*.parquet', binary_as_string=True); | |
// ┌────────────────────────┐ | |
// │ max(url) │ | |
// │ varchar │ | |
// ├────────────────────────┤ | |
// │ http://zarplatia-nogin │ | |
// └────────────────────────┘ | |
// Run Time (s): real 0.430 user 4.347991 sys 4.099678 | |
fn main() -> Result<(), Error> { | |
let files: Vec<String> = fs::read_dir("/tmp/") | |
.unwrap() | |
.map(|f| f.unwrap().path().display().to_string()) | |
.filter(|f| f.ends_with(".parquet")) | |
.collect(); | |
let start = SystemTime::now(); | |
// duckdb use | |
let n_workers = 16; | |
let n_jobs = files.len(); | |
let pool = ThreadPool::new(n_workers); | |
let (tx, rx) = channel(); | |
for file in files { | |
let tx = tx.clone(); | |
pool.execute(move || { | |
let chunk = builder_chunk(&file); | |
read_chunk(chunk); | |
tx.send(1).unwrap(); | |
}); | |
} | |
assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), n_jobs); | |
println!("cost -> {:?} ms", start.elapsed().unwrap().as_millis()); | |
Ok(()) | |
} | |
fn builder_chunk(f: &String) -> FileReader<File> { | |
let mut reader = File::open(&f).unwrap(); | |
let metadata = read::read_metadata(&mut reader).unwrap(); | |
let schema = read::infer_schema(&metadata).unwrap(); | |
let schema = schema.filter(|_index, _field| _field.name == "url"); | |
let row_groups = metadata | |
.row_groups | |
.into_iter() | |
.enumerate() | |
.map(|(_, row_group)| row_group) | |
.collect(); | |
// duckdb use 2048 batch size | |
let chunk_size = Some(2048); | |
// we can then read the row groups into chunks | |
let chunks = read::FileReader::new(reader, row_groups, schema, chunk_size, None, None); | |
chunks | |
} | |
fn read_chunk(chunk: FileReader<File>) { | |
let mut rows = 0; | |
for maybe_chunk in chunk { | |
let chunk = maybe_chunk.unwrap(); | |
rows += chunk.len(); | |
} | |
assert_eq!(rows, 106742); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::fs; | |
use std::fs::File; | |
use std::sync::mpsc::channel; | |
use std::sync::Arc; | |
use std::time::SystemTime; | |
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; | |
use parquet::arrow::ProjectionMask; | |
use threadpool::ThreadPool; | |
// cargo run --example arrow_read --release | |
// arrow-rs cost -> 499 ms | |
fn main() { | |
let files: Vec<String> = fs::read_dir("/tmp/") | |
.unwrap() | |
.map(|f| f.unwrap().path().display().to_string()) | |
.filter(|f| f.ends_with(".parquet")) | |
.collect(); | |
let start = SystemTime::now(); | |
let n_workers = 16; | |
let n_jobs = files.len(); | |
let pool = ThreadPool::new(n_workers); | |
let (tx, rx) = channel(); | |
for file in files { | |
let tx = tx.clone(); | |
pool.execute(move || { | |
read_buffer(&file); | |
tx.send(1) | |
.expect("channel will be there waiting for the pool"); | |
}); | |
} | |
assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), n_jobs); | |
println!("arrow-rs cost -> {:?} ms", start.elapsed().unwrap().as_millis()); | |
} | |
fn read_buffer(path: &String) { | |
let file = File::open(path).unwrap(); | |
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); | |
let original_schema = Arc::clone(builder.schema()); | |
let column = original_schema.index_of("url").unwrap(); | |
let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![column]); | |
let parquet_reader = builder | |
.with_batch_size(2048) | |
.with_projection(mask) | |
.build() | |
.unwrap(); | |
let mut rows = 0; | |
for batch in parquet_reader { | |
rows += batch.unwrap().num_rows(); | |
} | |
assert_eq!(rows, 106742); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment