Created
August 6, 2025 18:49
-
-
Save melonamin/2716aba44a8144f4038f3c7ac45d53c2 to your computer and use it in GitHub Desktop.
PondPilot #206
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| [package] | |
| name = "generate-datafusion-parquet" | |
| version = "0.1.0" | |
| edition = "2021" | |
| [dependencies] | |
| datafusion = "49" | |
| tokio = { version = "1", features = ["rt-multi-thread", "macros"] } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| use datafusion::arrow::array::{ | |
| Array, ArrayRef, StructArray, UInt64Array, StringArray, RecordBatch, | |
| BinaryArray, Float64Array, BooleanArray, | |
| }; | |
| use datafusion::arrow::datatypes::{DataType, Field, Schema}; | |
| use datafusion::prelude::*; | |
| use datafusion::datasource::MemTable; | |
| use std::sync::Arc; | |
| #[tokio::main] | |
| async fn main() -> Result<(), Box<dyn std::error::Error>> { | |
| println!("🔨 Generating Parquet files with Snappy compression using DataFusion\n"); | |
| println!("This reproduces the issue reported where files work with DuckDB CLI"); | |
| println!("but fail in DuckDB WASM with 'Snappy decompression failure'\n"); | |
| // Basic test - matches user's description exactly | |
| create_basic_nested().await?; | |
| // Stress tests to try to trigger the issue | |
| create_large_dataset().await?; | |
| create_multiple_batches().await?; | |
| create_binary_nested().await?; | |
| create_deep_nesting().await?; | |
| println!("\n✅ All test files created!"); | |
| println!("\nTo test:"); | |
| println!("1. Copy *.parquet files to your project's sample-data folder"); | |
| println!("2. Try loading them in PondPilot (or any DuckDB WASM app)"); | |
| println!("3. Check if any trigger 'Snappy decompression failure'"); | |
| Ok(()) | |
| } | |
| // Basic test matching user's exact description | |
| async fn create_basic_nested() -> Result<(), Box<dyn std::error::Error>> { | |
| println!("📄 Creating basic_nested.parquet (matches user description)..."); | |
| let nested_struct = Field::new( | |
| "nested_data", | |
| DataType::Struct(vec![ | |
| Field::new("id", DataType::UInt64, false), | |
| Field::new("name", DataType::Utf8, true), | |
| Field::new("active", DataType::Boolean, false), | |
| ].into()), | |
| false, | |
| ); | |
| let schema = Schema::new(vec![ | |
| Field::new("user_id", DataType::UInt64, false), | |
| Field::new("is_active", DataType::Boolean, false), | |
| Field::new("description", DataType::Utf8, false), | |
| nested_struct, | |
| Field::new("count", DataType::UInt64, true), | |
| Field::new("status", DataType::Utf8, true), | |
| ]); | |
| let nested_id = UInt64Array::from(vec![1001, 1002, 1003]); | |
| let nested_name = StringArray::from(vec![Some("Alpha"), Some("Beta"), Some("Gamma")]); | |
| let nested_active = BooleanArray::from(vec![true, false, true]); | |
| let nested_struct_array = StructArray::from(vec![ | |
| ( | |
| Arc::new(Field::new("id", DataType::UInt64, false)), | |
| Arc::new(nested_id) as ArrayRef, | |
| ), | |
| ( | |
| Arc::new(Field::new("name", DataType::Utf8, true)), | |
| Arc::new(nested_name) as ArrayRef, | |
| ), | |
| ( | |
| Arc::new(Field::new("active", DataType::Boolean, false)), | |
| Arc::new(nested_active) as ArrayRef, | |
| ), | |
| ]); | |
| let user_ids = UInt64Array::from(vec![1, 2, 3]); | |
| let is_active = BooleanArray::from(vec![true, false, true]); | |
| let descriptions = StringArray::from(vec!["First", "Second", "Third"]); | |
| let counts = UInt64Array::from(vec![Some(100), None, Some(300)]); | |
| let statuses = StringArray::from(vec![Some("OK"), Some("PENDING"), None]); | |
| let batch = RecordBatch::try_new( | |
| Arc::new(schema.clone()), | |
| vec![ | |
| Arc::new(user_ids), | |
| Arc::new(is_active), | |
| Arc::new(descriptions), | |
| Arc::new(nested_struct_array), | |
| Arc::new(counts), | |
| Arc::new(statuses), | |
| ], | |
| )?; | |
| let mut config = SessionConfig::new(); | |
| config.options_mut().execution.parquet.compression = Some("snappy".to_string()); | |
| let ctx = SessionContext::new_with_config(config); | |
| let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch]])?; | |
| ctx.register_table("test", Arc::new(provider))?; | |
| let df = ctx.table("test").await?; | |
| df.write_parquet( | |
| "basic_nested.parquet", | |
| datafusion::dataframe::DataFrameWriteOptions::new().with_single_file_output(true), | |
| None, | |
| ).await?; | |
| println!(" ✓ Basic file with nested structs, u64/bool/utf8 fields"); | |
| Ok(()) | |
| } | |
| // Test 1: Large dataset (100K rows) - might trigger WASM memory issues | |
| async fn create_large_dataset() -> Result<(), Box<dyn std::error::Error>> { | |
| println!("📄 Creating large_100k.parquet (100,000 rows)..."); | |
| let nested_struct = Field::new( | |
| "nested", | |
| DataType::Struct(vec![ | |
| Field::new("id", DataType::UInt64, false), | |
| Field::new("data", DataType::Utf8, false), | |
| ].into()), | |
| false, | |
| ); | |
| let schema = Schema::new(vec![ | |
| Field::new("row_id", DataType::UInt64, false), | |
| Field::new("text", DataType::Utf8, false), | |
| nested_struct, | |
| ]); | |
| const ROWS: usize = 100_000; | |
| let mut row_ids = Vec::with_capacity(ROWS); | |
| let mut texts = Vec::with_capacity(ROWS); | |
| let mut nested_ids = Vec::with_capacity(ROWS); | |
| let mut nested_data = Vec::with_capacity(ROWS); | |
| let base_text = "Lorem ipsum dolor sit amet ".repeat(20); | |
| for i in 0..ROWS { | |
| row_ids.push(i as u64); | |
| texts.push(format!("{} {}", base_text, i)); | |
| nested_ids.push((i * 1000) as u64); | |
| nested_data.push(format!("Nested_{}", i)); | |
| } | |
| let nested_array = StructArray::from(vec![ | |
| ( | |
| Arc::new(Field::new("id", DataType::UInt64, false)), | |
| Arc::new(UInt64Array::from(nested_ids)) as ArrayRef, | |
| ), | |
| ( | |
| Arc::new(Field::new("data", DataType::Utf8, false)), | |
| Arc::new(StringArray::from(nested_data)) as ArrayRef, | |
| ), | |
| ]); | |
| let batch = RecordBatch::try_new( | |
| Arc::new(schema.clone()), | |
| vec![ | |
| Arc::new(UInt64Array::from(row_ids)), | |
| Arc::new(StringArray::from(texts)), | |
| Arc::new(nested_array), | |
| ], | |
| )?; | |
| let mut config = SessionConfig::new(); | |
| config.options_mut().execution.parquet.compression = Some("snappy".to_string()); | |
| config.options_mut().execution.parquet.data_pagesize_limit = 1024; | |
| config.options_mut().execution.parquet.max_row_group_size = 10000; | |
| let ctx = SessionContext::new_with_config(config); | |
| let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch]])?; | |
| ctx.register_table("test", Arc::new(provider))?; | |
| let df = ctx.table("test").await?; | |
| df.write_parquet( | |
| "large_100k.parquet", | |
| datafusion::dataframe::DataFrameWriteOptions::new().with_single_file_output(true), | |
| None, | |
| ).await?; | |
| println!(" ✓ 100K rows with small pages/row groups"); | |
| Ok(()) | |
| } | |
| // Test 2: Multiple batches with varying compression patterns | |
| async fn create_multiple_batches() -> Result<(), Box<dyn std::error::Error>> { | |
| println!("📄 Creating multiple_batches.parquet..."); | |
| let schema = Schema::new(vec![ | |
| Field::new("batch_id", DataType::UInt64, false), | |
| Field::new("data", DataType::Utf8, false), | |
| Field::new("value", DataType::Float64, true), | |
| ]); | |
| let mut config = SessionConfig::new(); | |
| config.options_mut().execution.parquet.compression = Some("snappy".to_string()); | |
| config.options_mut().execution.parquet.max_row_group_size = 100; | |
| let ctx = SessionContext::new_with_config(config); | |
| let mut all_batches = Vec::new(); | |
| for batch_num in 0..10 { | |
| let mut ids = Vec::new(); | |
| let mut data = Vec::new(); | |
| let mut values = Vec::new(); | |
| for i in 0..100 { | |
| ids.push((batch_num * 100 + i) as u64); | |
| match batch_num { | |
| 0..=2 => { | |
| data.push("AAAAAAAAAA".repeat(10)); | |
| values.push(Some(1.0)); | |
| }, | |
| 3..=5 => { | |
| data.push(format!("{:x}{:x}{:x}", i, batch_num, i*batch_num)); | |
| values.push(None); | |
| }, | |
| 6..=7 => { | |
| data.push(String::new()); | |
| values.push(Some(i as f64)); | |
| }, | |
| _ => { | |
| data.push(format!("Batch_{}_Row_{}", batch_num, i)); | |
| values.push(if i % 2 == 0 { Some(i as f64) } else { None }); | |
| }, | |
| } | |
| } | |
| let batch = RecordBatch::try_new( | |
| Arc::new(schema.clone()), | |
| vec![ | |
| Arc::new(UInt64Array::from(ids)), | |
| Arc::new(StringArray::from(data)), | |
| Arc::new(Float64Array::from(values)), | |
| ], | |
| )?; | |
| all_batches.push(batch); | |
| } | |
| let provider = MemTable::try_new(Arc::new(schema), vec![all_batches])?; | |
| ctx.register_table("test", Arc::new(provider))?; | |
| let df = ctx.table("test").await?; | |
| df.write_parquet( | |
| "multiple_batches.parquet", | |
| datafusion::dataframe::DataFrameWriteOptions::new().with_single_file_output(true), | |
| None, | |
| ).await?; | |
| println!(" ✓ 10 batches with varying compression patterns"); | |
| Ok(()) | |
| } | |
| // Test 3: Binary/BLOB data with nested structs | |
| async fn create_binary_nested() -> Result<(), Box<dyn std::error::Error>> { | |
| println!("📄 Creating binary_nested.parquet..."); | |
| let nested_struct = Field::new( | |
| "nested", | |
| DataType::Struct(vec![ | |
| Field::new("id", DataType::UInt64, false), | |
| Field::new("binary", DataType::Binary, false), | |
| ].into()), | |
| false, | |
| ); | |
| let schema = Schema::new(vec![ | |
| Field::new("id", DataType::UInt64, false), | |
| Field::new("large_binary", DataType::Binary, false), | |
| nested_struct, | |
| ]); | |
| let rows = 1000; | |
| let mut ids = Vec::new(); | |
| let mut large_binaries: Vec<Vec<u8>> = Vec::new(); | |
| let mut nested_ids = Vec::new(); | |
| let mut nested_binaries: Vec<Vec<u8>> = Vec::new(); | |
| for i in 0..rows { | |
| ids.push(i as u64); | |
| let mut large = vec![0u8; 10000]; | |
| for j in 0..large.len() { | |
| large[j] = ((i * j) % 256) as u8; | |
| } | |
| large_binaries.push(large); | |
| nested_ids.push((i * 100) as u64); | |
| let mut nested = vec![0u8; 500]; | |
| for j in 0..nested.len() { | |
| nested[j] = ((i + j) % 256) as u8; | |
| } | |
| nested_binaries.push(nested); | |
| } | |
| let large_binary_refs: Vec<&[u8]> = large_binaries.iter().map(|v| v.as_slice()).collect(); | |
| let nested_binary_refs: Vec<&[u8]> = nested_binaries.iter().map(|v| v.as_slice()).collect(); | |
| let nested_array = StructArray::from(vec![ | |
| ( | |
| Arc::new(Field::new("id", DataType::UInt64, false)), | |
| Arc::new(UInt64Array::from(nested_ids)) as ArrayRef, | |
| ), | |
| ( | |
| Arc::new(Field::new("binary", DataType::Binary, false)), | |
| Arc::new(BinaryArray::from(nested_binary_refs)) as ArrayRef, | |
| ), | |
| ]); | |
| let batch = RecordBatch::try_new( | |
| Arc::new(schema.clone()), | |
| vec![ | |
| Arc::new(UInt64Array::from(ids)), | |
| Arc::new(BinaryArray::from(large_binary_refs)), | |
| Arc::new(nested_array), | |
| ], | |
| )?; | |
| let mut config = SessionConfig::new(); | |
| config.options_mut().execution.parquet.compression = Some("snappy".to_string()); | |
| config.options_mut().execution.parquet.data_pagesize_limit = 4096; | |
| let ctx = SessionContext::new_with_config(config); | |
| let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch]])?; | |
| ctx.register_table("test", Arc::new(provider))?; | |
| let df = ctx.table("test").await?; | |
| df.write_parquet( | |
| "binary_nested.parquet", | |
| datafusion::dataframe::DataFrameWriteOptions::new().with_single_file_output(true), | |
| None, | |
| ).await?; | |
| println!(" ✓ Binary/BLOB data with nested structs"); | |
| Ok(()) | |
| } | |
| // Test 4: Very deep nesting (4 levels) | |
| async fn create_deep_nesting() -> Result<(), Box<dyn std::error::Error>> { | |
| println!("📄 Creating deep_nested.parquet (4 levels deep)..."); | |
| let level4 = DataType::Struct(vec![ | |
| Field::new("l4_id", DataType::UInt64, false), | |
| Field::new("l4_data", DataType::Utf8, true), | |
| ].into()); | |
| let level3 = DataType::Struct(vec![ | |
| Field::new("l3_id", DataType::UInt64, false), | |
| Field::new("l3_nested", level4, false), | |
| ].into()); | |
| let level2 = DataType::Struct(vec![ | |
| Field::new("l2_name", DataType::Utf8, false), | |
| Field::new("l2_nested", level3, false), | |
| ].into()); | |
| let level1 = Field::new( | |
| "deep_nested", | |
| DataType::Struct(vec![ | |
| Field::new("l1_id", DataType::UInt64, false), | |
| Field::new("l1_nested", level2, false), | |
| ].into()), | |
| false, | |
| ); | |
| let schema = Schema::new(vec![ | |
| Field::new("id", DataType::UInt64, false), | |
| level1, | |
| ]); | |
| let rows = 100; | |
| let mut ids = Vec::new(); | |
| let mut l1_ids = Vec::new(); | |
| let mut l2_names = Vec::new(); | |
| let mut l3_ids = Vec::new(); | |
| let mut l4_ids = Vec::new(); | |
| let mut l4_data = Vec::new(); | |
| for i in 0..rows { | |
| ids.push(i as u64); | |
| l1_ids.push((i * 1000) as u64); | |
| l2_names.push(format!("L2_{}", i)); | |
| l3_ids.push((i * 100) as u64); | |
| l4_ids.push((i * 10) as u64); | |
| l4_data.push(if i % 3 == 0 { None } else { Some(format!("L4_{}", i)) }); | |
| } | |
| let level4_array = StructArray::from(vec![ | |
| ( | |
| Arc::new(Field::new("l4_id", DataType::UInt64, false)), | |
| Arc::new(UInt64Array::from(l4_ids)) as ArrayRef, | |
| ), | |
| ( | |
| Arc::new(Field::new("l4_data", DataType::Utf8, true)), | |
| Arc::new(StringArray::from(l4_data)) as ArrayRef, | |
| ), | |
| ]); | |
| let level3_array = StructArray::from(vec![ | |
| ( | |
| Arc::new(Field::new("l3_id", DataType::UInt64, false)), | |
| Arc::new(UInt64Array::from(l3_ids)) as ArrayRef, | |
| ), | |
| ( | |
| Arc::new(Field::new("l3_nested", level4_array.data_type().clone(), false)), | |
| Arc::new(level4_array) as ArrayRef, | |
| ), | |
| ]); | |
| let level2_array = StructArray::from(vec![ | |
| ( | |
| Arc::new(Field::new("l2_name", DataType::Utf8, false)), | |
| Arc::new(StringArray::from(l2_names)) as ArrayRef, | |
| ), | |
| ( | |
| Arc::new(Field::new("l2_nested", level3_array.data_type().clone(), false)), | |
| Arc::new(level3_array) as ArrayRef, | |
| ), | |
| ]); | |
| let level1_array = StructArray::from(vec![ | |
| ( | |
| Arc::new(Field::new("l1_id", DataType::UInt64, false)), | |
| Arc::new(UInt64Array::from(l1_ids)) as ArrayRef, | |
| ), | |
| ( | |
| Arc::new(Field::new("l1_nested", level2_array.data_type().clone(), false)), | |
| Arc::new(level2_array) as ArrayRef, | |
| ), | |
| ]); | |
| let batch = RecordBatch::try_new( | |
| Arc::new(schema.clone()), | |
| vec![ | |
| Arc::new(UInt64Array::from(ids)), | |
| Arc::new(level1_array), | |
| ], | |
| )?; | |
| let mut config = SessionConfig::new(); | |
| config.options_mut().execution.parquet.compression = Some("snappy".to_string()); | |
| config.options_mut().execution.parquet.data_pagesize_limit = 2048; | |
| let ctx = SessionContext::new_with_config(config); | |
| let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch]])?; | |
| ctx.register_table("test", Arc::new(provider))?; | |
| let df = ctx.table("test").await?; | |
| df.write_parquet( | |
| "deep_nested.parquet", | |
| datafusion::dataframe::DataFrameWriteOptions::new().with_single_file_output(true), | |
| None, | |
| ).await?; | |
| println!(" ✓ 4-level deep nested structures"); | |
| Ok(()) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment