Skip to content

Instantly share code, notes, and snippets.

@mooreniemi
Last active May 6, 2024 04:46
Show Gist options
  • Save mooreniemi/c6a877dad51bb7d1458d2dbea37abc54 to your computer and use it in GitHub Desktop.
Save mooreniemi/c6a877dad51bb7d1458d2dbea37abc54 to your computer and use it in GitHub Desktop.
streaming parquet read from s3
[package]
name = "stream-pq"
version = "0.1.0"
edition = "2021"
[dependencies]
deltalake = { version = "*", features = ["s3"] }
tokio = { version = "1", features = ["full"] }
quick-xml = { version = "0.23.0-alpha3" }
aws-config = { version = "*", features = ["rustls"] }
aws-credential-types = "*"
aws-types = "*"
object_store = "*"
arrow = "*"
parquet = {"version" = "*", features = ["json"]}
aws-sdk-s3 = "*"
bytes = "*"
flate2 = "*"
use aws_config::meta::region::RegionProviderChain;
use aws_sdk_s3::Client as S3Client;
use parquet::{file::reader::{FileReader, Length, SerializedFileReader}};
use std::io::{Cursor, Read, Seek, SeekFrom};
use parquet::file::reader::ChunkReader;
use tokio::io::AsyncReadExt;
use parquet::errors::ParquetError;
use bytes::Bytes;
use std::time::Instant;
struct InMemoryFile {
cursor: Cursor<Vec<u8>>,
}
impl InMemoryFile {
fn new(data: Vec<u8>) -> Self {
InMemoryFile { cursor: Cursor::new(data) }
}
}
impl Read for InMemoryFile {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
std::io::Read::read(&mut self.cursor, buf)
}
}
impl Seek for InMemoryFile {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
self.cursor.seek(pos)
}
}
impl Length for InMemoryFile {
fn len(&self) -> u64 {
self.cursor.get_ref().len() as u64
}
}
impl ChunkReader for InMemoryFile {
type T = Cursor<Vec<u8>>;
fn get_read(&self, start: u64) -> parquet::errors::Result<Self::T> {
let mut cursor_clone = self.cursor.clone();
cursor_clone.set_position(start);
Ok(cursor_clone)
}
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes, ParquetError> {
let end = start as usize + length;
let data_len = self.cursor.get_ref().len();
if start as usize > data_len || end > data_len {
return Err(ParquetError::General(format!("Range out of bounds: {} to {}, data length {}", start, end, data_len)));
}
let data_slice = &self.cursor.get_ref()[start as usize..end];
Ok(bytes::Bytes::copy_from_slice(data_slice))
}
}
#[tokio::main]
async fn main() -> Result<(), aws_sdk_s3::Error> {
let region_provider = RegionProviderChain::default_provider().or_else("us-west-2");
let config = aws_config::from_env().region(region_provider).load().await;
let s3 = S3Client::new(&config);
let bucket = "example";
let prefix = "example";
let now = Instant::now();
let resp = s3.list_objects_v2().bucket(bucket).prefix(prefix).send().await.unwrap();
let elapsed = now.elapsed();
println!("Listed files took: {:.2?}s", elapsed.as_secs());
// dbg!(&resp);
let mut total_row_count = 0;
for object in resp.contents() {
if let Some(key) = object.key() {
println!("Reading object: {}", key);
if key.ends_with("parquet") {
let now = Instant::now();
let object_resp = s3.get_object().bucket(bucket).key(key).send().await.unwrap();
let elapsed = now.elapsed();
println!("Initial object_resp took: {:.2?}s", elapsed.as_secs());
let now = Instant::now();
let mut data = Vec::new();
object_resp.body.into_async_read().read_to_end(&mut data).await.unwrap();
let elapsed = now.elapsed();
println!("Read (of data.len={}) took: {:.2?}s", data.len(), elapsed.as_secs());
let in_memory_file = InMemoryFile::new(data);
let reader = SerializedFileReader::new(in_memory_file).unwrap();
let mut iter = reader.get_row_iter(None).unwrap();
let now = Instant::now();
let mut row_count = 0;
while let Some(record) = iter.next() {
total_row_count += 1;
row_count += 1;
let row = record.unwrap();
let json = row.to_json_value();
let uri = json["uri"].as_str().unwrap_or("no-uri-found");
println!("(parsed {}) uri={}, row_count: {}", key, uri, total_row_count);
// println!("{:?}", json);
}
let elapsed = now.elapsed();
println!("Read (of row_count={}) took: {:.2?}s", row_count, elapsed.as_secs());
} else {
println!("skipped non-parquet");
}
}
}
Ok(())
}
use aws_config::meta::region::RegionProviderChain;
use aws_sdk_s3::Client as S3Client;
use parquet::{file::reader::{FileReader, Length, SerializedFileReader}};
use std::io::{Cursor, Read, Seek, SeekFrom};
use parquet::file::reader::ChunkReader;
use tokio::io::AsyncReadExt;
use parquet::errors::ParquetError;
use bytes::Bytes;
use std::time::Instant;
use tokio::sync::Semaphore;
use std::sync::Arc;
use indicatif::{ProgressBar, ProgressStyle};
use std::time::Duration;
use std::sync::atomic::{AtomicUsize, Ordering};
struct InMemoryFile {
cursor: Cursor<Vec<u8>>,
}
impl InMemoryFile {
fn new(data: Vec<u8>) -> Self {
InMemoryFile { cursor: Cursor::new(data) }
}
}
impl Read for InMemoryFile {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
std::io::Read::read(&mut self.cursor, buf)
}
}
impl Seek for InMemoryFile {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
self.cursor.seek(pos)
}
}
impl Length for InMemoryFile {
fn len(&self) -> u64 {
self.cursor.get_ref().len() as u64
}
}
impl ChunkReader for InMemoryFile {
type T = Cursor<Vec<u8>>;
fn get_read(&self, start: u64) -> parquet::errors::Result<Self::T> {
let mut cursor_clone = self.cursor.clone();
cursor_clone.set_position(start);
Ok(cursor_clone)
}
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes, ParquetError> {
let end = start as usize + length;
let data_len = self.cursor.get_ref().len();
if start as usize > data_len || end > data_len {
return Err(ParquetError::General(format!("Range out of bounds: {} to {}, data length {}", start, end, data_len)));
}
let data_slice = &self.cursor.get_ref()[start as usize..end];
Ok(bytes::Bytes::copy_from_slice(data_slice))
}
}
#[tokio::main]
async fn main() -> Result<(), aws_sdk_s3::Error> {
// hacky rate-limiting
let max_s3_read_concurrency = 32;
let semaphore = Arc::new(Semaphore::new(max_s3_read_concurrency));
let region_provider = RegionProviderChain::default_provider().or_else("us-west-2");
let config = aws_config::from_env().region(region_provider).load().await;
let s3 = S3Client::new(&config);
let bucket = "example";
let prefix = "example";
let resp = s3.list_objects_v2().bucket(bucket).prefix(prefix).send().await?;
let progress_bar = ProgressBar::new_spinner();
progress_bar.set_style(ProgressStyle::default_spinner().template("{spinner} Elapsed: {elapsed}").unwrap());
progress_bar.enable_steady_tick(Duration::from_millis(100));
let total_row_count = Arc::new(AtomicUsize::new(0));
let mut tasks = vec![];
for object in resp.contents() {
if let Some(key) = object.key() {
if key.ends_with("parquet") {
// we were going too hard on s3
let permit = semaphore.clone().acquire_owned().await.unwrap();
let s3_client = s3.clone();
let bucket = bucket.to_string();
let key = key.to_string();
// fixme: not working
//let pb = progress_bar.clone();
let trc = total_row_count.clone();
let task = tokio::spawn(async move {
let now = Instant::now();
let object_resp = s3_client.get_object().bucket(&bucket).key(&key).send().await.unwrap();
let elapsed = now.elapsed();
println!("Initial object_resp took: {:.2?}s", elapsed.as_secs());
let now = Instant::now();
let mut data = Vec::new();
object_resp.body.into_async_read().read_to_end(&mut data).await.unwrap();
drop(permit); // ok to start downloading some other file
let elapsed = now.elapsed();
println!("Read (of data.len={}) took: {:.2?}s", data.len(), elapsed.as_secs());
let in_memory_file = InMemoryFile::new(data);
let reader = SerializedFileReader::new(in_memory_file).unwrap();
let mut iter = reader.get_row_iter(None).unwrap();
let now = Instant::now();
let mut row_count = 0;
while let Some(record) = iter.next() {
trc.fetch_add(1, Ordering::Relaxed);
row_count += 1;
let row = record.unwrap();
// just want to simulate some work here
let json = row.to_json_value();
let uri = json["uri"].as_str().unwrap_or("no-uri-found");
// println!("(parsed {}) uri={}, row_count: {}", key, uri, row_count);
// pb.set_message(format!("Processed {} items (eg. {})", row_count, uri));
}
let elapsed = now.elapsed();
println!("Read (of task row_count={}, total_row_count={}) took: {:.2?}s", row_count, trc.load(Ordering::Relaxed), elapsed.as_secs());
//pb.set_message(format!("Processed total {} items so far", trc.load(Ordering::Relaxed)));
row_count
});
tasks.push(task);
}
}
}
let results = futures::future::join_all(tasks).await;
for result in results {
match result {
Ok(row_count) => println!("Processed {} rows", row_count),
Err(e) => eprintln!("Error processing file: {:?}", e),
}
}
progress_bar.finish_with_message("Processing complete");
Ok(())
}
@mooreniemi
Copy link
Author

For my data this was pretty slow at less than 1000 rows per second.

@mooreniemi
Copy link
Author

Read (of row_count=44143) took: 46s

@mooreniemi
Copy link
Author

Parallel version:

Read (of task row_count=43018, total_row_count=498734) took: 151s
⠙ Elapsed: 3m                                                                    Initial object_resp took: 152s

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment