Last active
May 6, 2024 04:46
-
-
Save mooreniemi/c6a877dad51bb7d1458d2dbea37abc54 to your computer and use it in GitHub Desktop.
streaming parquet read from s3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "stream-pq" | |
version = "0.1.0" | |
edition = "2021" | |
[dependencies] | |
deltalake = { version = "*", features = ["s3"] } | |
tokio = { version = "1", features = ["full"] } | |
quick-xml = { version = "0.23.0-alpha3" } | |
aws-config = { version = "*", features = ["rustls"] } | |
aws-credential-types = "*" | |
aws-types = "*" | |
object_store = "*" | |
arrow = "*" | |
parquet = {"version" = "*", features = ["json"]} | |
aws-sdk-s3 = "*" | |
bytes = "*" | |
flate2 = "*" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use aws_config::meta::region::RegionProviderChain; | |
use aws_sdk_s3::Client as S3Client; | |
use parquet::{file::reader::{FileReader, Length, SerializedFileReader}}; | |
use std::io::{Cursor, Read, Seek, SeekFrom}; | |
use parquet::file::reader::ChunkReader; | |
use tokio::io::AsyncReadExt; | |
use parquet::errors::ParquetError; | |
use bytes::Bytes; | |
use std::time::Instant; | |
struct InMemoryFile { | |
cursor: Cursor<Vec<u8>>, | |
} | |
impl InMemoryFile { | |
fn new(data: Vec<u8>) -> Self { | |
InMemoryFile { cursor: Cursor::new(data) } | |
} | |
} | |
impl Read for InMemoryFile { | |
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { | |
std::io::Read::read(&mut self.cursor, buf) | |
} | |
} | |
impl Seek for InMemoryFile { | |
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> { | |
self.cursor.seek(pos) | |
} | |
} | |
impl Length for InMemoryFile { | |
fn len(&self) -> u64 { | |
self.cursor.get_ref().len() as u64 | |
} | |
} | |
impl ChunkReader for InMemoryFile { | |
type T = Cursor<Vec<u8>>; | |
fn get_read(&self, start: u64) -> parquet::errors::Result<Self::T> { | |
let mut cursor_clone = self.cursor.clone(); | |
cursor_clone.set_position(start); | |
Ok(cursor_clone) | |
} | |
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes, ParquetError> { | |
let end = start as usize + length; | |
let data_len = self.cursor.get_ref().len(); | |
if start as usize > data_len || end > data_len { | |
return Err(ParquetError::General(format!("Range out of bounds: {} to {}, data length {}", start, end, data_len))); | |
} | |
let data_slice = &self.cursor.get_ref()[start as usize..end]; | |
Ok(bytes::Bytes::copy_from_slice(data_slice)) | |
} | |
} | |
#[tokio::main] | |
async fn main() -> Result<(), aws_sdk_s3::Error> { | |
let region_provider = RegionProviderChain::default_provider().or_else("us-west-2"); | |
let config = aws_config::from_env().region(region_provider).load().await; | |
let s3 = S3Client::new(&config); | |
let bucket = "example"; | |
let prefix = "example"; | |
let now = Instant::now(); | |
let resp = s3.list_objects_v2().bucket(bucket).prefix(prefix).send().await.unwrap(); | |
let elapsed = now.elapsed(); | |
println!("Listed files took: {:.2?}s", elapsed.as_secs()); | |
// dbg!(&resp); | |
let mut total_row_count = 0; | |
for object in resp.contents() { | |
if let Some(key) = object.key() { | |
println!("Reading object: {}", key); | |
if key.ends_with("parquet") { | |
let now = Instant::now(); | |
let object_resp = s3.get_object().bucket(bucket).key(key).send().await.unwrap(); | |
let elapsed = now.elapsed(); | |
println!("Initial object_resp took: {:.2?}s", elapsed.as_secs()); | |
let now = Instant::now(); | |
let mut data = Vec::new(); | |
object_resp.body.into_async_read().read_to_end(&mut data).await.unwrap(); | |
let elapsed = now.elapsed(); | |
println!("Read (of data.len={}) took: {:.2?}s", data.len(), elapsed.as_secs()); | |
let in_memory_file = InMemoryFile::new(data); | |
let reader = SerializedFileReader::new(in_memory_file).unwrap(); | |
let mut iter = reader.get_row_iter(None).unwrap(); | |
let now = Instant::now(); | |
let mut row_count = 0; | |
while let Some(record) = iter.next() { | |
total_row_count += 1; | |
row_count += 1; | |
let row = record.unwrap(); | |
let json = row.to_json_value(); | |
let uri = json["uri"].as_str().unwrap_or("no-uri-found"); | |
println!("(parsed {}) uri={}, row_count: {}", key, uri, total_row_count); | |
// println!("{:?}", json); | |
} | |
let elapsed = now.elapsed(); | |
println!("Read (of row_count={}) took: {:.2?}s", row_count, elapsed.as_secs()); | |
} else { | |
println!("skipped non-parquet"); | |
} | |
} | |
} | |
Ok(()) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use aws_config::meta::region::RegionProviderChain; | |
use aws_sdk_s3::Client as S3Client; | |
use parquet::{file::reader::{FileReader, Length, SerializedFileReader}}; | |
use std::io::{Cursor, Read, Seek, SeekFrom}; | |
use parquet::file::reader::ChunkReader; | |
use tokio::io::AsyncReadExt; | |
use parquet::errors::ParquetError; | |
use bytes::Bytes; | |
use std::time::Instant; | |
use tokio::sync::Semaphore; | |
use std::sync::Arc; | |
use indicatif::{ProgressBar, ProgressStyle}; | |
use std::time::Duration; | |
use std::sync::atomic::{AtomicUsize, Ordering}; | |
struct InMemoryFile { | |
cursor: Cursor<Vec<u8>>, | |
} | |
impl InMemoryFile { | |
fn new(data: Vec<u8>) -> Self { | |
InMemoryFile { cursor: Cursor::new(data) } | |
} | |
} | |
impl Read for InMemoryFile { | |
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { | |
std::io::Read::read(&mut self.cursor, buf) | |
} | |
} | |
impl Seek for InMemoryFile { | |
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> { | |
self.cursor.seek(pos) | |
} | |
} | |
impl Length for InMemoryFile { | |
fn len(&self) -> u64 { | |
self.cursor.get_ref().len() as u64 | |
} | |
} | |
impl ChunkReader for InMemoryFile { | |
type T = Cursor<Vec<u8>>; | |
fn get_read(&self, start: u64) -> parquet::errors::Result<Self::T> { | |
let mut cursor_clone = self.cursor.clone(); | |
cursor_clone.set_position(start); | |
Ok(cursor_clone) | |
} | |
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes, ParquetError> { | |
let end = start as usize + length; | |
let data_len = self.cursor.get_ref().len(); | |
if start as usize > data_len || end > data_len { | |
return Err(ParquetError::General(format!("Range out of bounds: {} to {}, data length {}", start, end, data_len))); | |
} | |
let data_slice = &self.cursor.get_ref()[start as usize..end]; | |
Ok(bytes::Bytes::copy_from_slice(data_slice)) | |
} | |
} | |
#[tokio::main] | |
async fn main() -> Result<(), aws_sdk_s3::Error> { | |
// hacky rate-limiting | |
let max_s3_read_concurrency = 32; | |
let semaphore = Arc::new(Semaphore::new(max_s3_read_concurrency)); | |
let region_provider = RegionProviderChain::default_provider().or_else("us-west-2"); | |
let config = aws_config::from_env().region(region_provider).load().await; | |
let s3 = S3Client::new(&config); | |
let bucket = "example"; | |
let prefix = "example"; | |
let resp = s3.list_objects_v2().bucket(bucket).prefix(prefix).send().await?; | |
let progress_bar = ProgressBar::new_spinner(); | |
progress_bar.set_style(ProgressStyle::default_spinner().template("{spinner} Elapsed: {elapsed}").unwrap()); | |
progress_bar.enable_steady_tick(Duration::from_millis(100)); | |
let total_row_count = Arc::new(AtomicUsize::new(0)); | |
let mut tasks = vec![]; | |
for object in resp.contents() { | |
if let Some(key) = object.key() { | |
if key.ends_with("parquet") { | |
// we were going too hard on s3 | |
let permit = semaphore.clone().acquire_owned().await.unwrap(); | |
let s3_client = s3.clone(); | |
let bucket = bucket.to_string(); | |
let key = key.to_string(); | |
// fixme: not working | |
//let pb = progress_bar.clone(); | |
let trc = total_row_count.clone(); | |
let task = tokio::spawn(async move { | |
let now = Instant::now(); | |
let object_resp = s3_client.get_object().bucket(&bucket).key(&key).send().await.unwrap(); | |
let elapsed = now.elapsed(); | |
println!("Initial object_resp took: {:.2?}s", elapsed.as_secs()); | |
let now = Instant::now(); | |
let mut data = Vec::new(); | |
object_resp.body.into_async_read().read_to_end(&mut data).await.unwrap(); | |
drop(permit); // ok to start downloading some other file | |
let elapsed = now.elapsed(); | |
println!("Read (of data.len={}) took: {:.2?}s", data.len(), elapsed.as_secs()); | |
let in_memory_file = InMemoryFile::new(data); | |
let reader = SerializedFileReader::new(in_memory_file).unwrap(); | |
let mut iter = reader.get_row_iter(None).unwrap(); | |
let now = Instant::now(); | |
let mut row_count = 0; | |
while let Some(record) = iter.next() { | |
trc.fetch_add(1, Ordering::Relaxed); | |
row_count += 1; | |
let row = record.unwrap(); | |
// just want to simulate some work here | |
let json = row.to_json_value(); | |
let uri = json["uri"].as_str().unwrap_or("no-uri-found"); | |
// println!("(parsed {}) uri={}, row_count: {}", key, uri, row_count); | |
// pb.set_message(format!("Processed {} items (eg. {})", row_count, uri)); | |
} | |
let elapsed = now.elapsed(); | |
println!("Read (of task row_count={}, total_row_count={}) took: {:.2?}s", row_count, trc.load(Ordering::Relaxed), elapsed.as_secs()); | |
//pb.set_message(format!("Processed total {} items so far", trc.load(Ordering::Relaxed))); | |
row_count | |
}); | |
tasks.push(task); | |
} | |
} | |
} | |
let results = futures::future::join_all(tasks).await; | |
for result in results { | |
match result { | |
Ok(row_count) => println!("Processed {} rows", row_count), | |
Err(e) => eprintln!("Error processing file: {:?}", e), | |
} | |
} | |
progress_bar.finish_with_message("Processing complete"); | |
Ok(()) | |
} |
Read (of row_count=44143) took: 46s
Parallel version:
Read (of task row_count=43018, total_row_count=498734) took: 151s
⠙ Elapsed: 3m Initial object_resp took: 152s
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
For my data this was pretty slow at less than 1000 rows per second.