Skip to content

Instantly share code, notes, and snippets.

@melonamin
Created August 6, 2025 18:49
Show Gist options
  • Save melonamin/2716aba44a8144f4038f3c7ac45d53c2 to your computer and use it in GitHub Desktop.
Save melonamin/2716aba44a8144f4038f3c7ac45d53c2 to your computer and use it in GitHub Desktop.
PondPilot #206
[package]
name = "generate-datafusion-parquet"
version = "0.1.0"
edition = "2021"
[dependencies]
datafusion = "49"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
use datafusion::arrow::array::{
Array, ArrayRef, StructArray, UInt64Array, StringArray, RecordBatch,
BinaryArray, Float64Array, BooleanArray,
};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::prelude::*;
use datafusion::datasource::MemTable;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("🔨 Generating Parquet files with Snappy compression using DataFusion\n");
println!("This reproduces the issue reported where files work with DuckDB CLI");
println!("but fail in DuckDB WASM with 'Snappy decompression failure'\n");
// Basic test - matches user's description exactly
create_basic_nested().await?;
// Stress tests to try to trigger the issue
create_large_dataset().await?;
create_multiple_batches().await?;
create_binary_nested().await?;
create_deep_nesting().await?;
println!("\n✅ All test files created!");
println!("\nTo test:");
println!("1. Copy *.parquet files to your project's sample-data folder");
println!("2. Try loading them in PondPilot (or any DuckDB WASM app)");
println!("3. Check if any trigger 'Snappy decompression failure'");
Ok(())
}
// Basic test matching user's exact description
async fn create_basic_nested() -> Result<(), Box<dyn std::error::Error>> {
println!("📄 Creating basic_nested.parquet (matches user description)...");
let nested_struct = Field::new(
"nested_data",
DataType::Struct(vec![
Field::new("id", DataType::UInt64, false),
Field::new("name", DataType::Utf8, true),
Field::new("active", DataType::Boolean, false),
].into()),
false,
);
let schema = Schema::new(vec![
Field::new("user_id", DataType::UInt64, false),
Field::new("is_active", DataType::Boolean, false),
Field::new("description", DataType::Utf8, false),
nested_struct,
Field::new("count", DataType::UInt64, true),
Field::new("status", DataType::Utf8, true),
]);
let nested_id = UInt64Array::from(vec![1001, 1002, 1003]);
let nested_name = StringArray::from(vec![Some("Alpha"), Some("Beta"), Some("Gamma")]);
let nested_active = BooleanArray::from(vec![true, false, true]);
let nested_struct_array = StructArray::from(vec![
(
Arc::new(Field::new("id", DataType::UInt64, false)),
Arc::new(nested_id) as ArrayRef,
),
(
Arc::new(Field::new("name", DataType::Utf8, true)),
Arc::new(nested_name) as ArrayRef,
),
(
Arc::new(Field::new("active", DataType::Boolean, false)),
Arc::new(nested_active) as ArrayRef,
),
]);
let user_ids = UInt64Array::from(vec![1, 2, 3]);
let is_active = BooleanArray::from(vec![true, false, true]);
let descriptions = StringArray::from(vec!["First", "Second", "Third"]);
let counts = UInt64Array::from(vec![Some(100), None, Some(300)]);
let statuses = StringArray::from(vec![Some("OK"), Some("PENDING"), None]);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(user_ids),
Arc::new(is_active),
Arc::new(descriptions),
Arc::new(nested_struct_array),
Arc::new(counts),
Arc::new(statuses),
],
)?;
let mut config = SessionConfig::new();
config.options_mut().execution.parquet.compression = Some("snappy".to_string());
let ctx = SessionContext::new_with_config(config);
let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch]])?;
ctx.register_table("test", Arc::new(provider))?;
let df = ctx.table("test").await?;
df.write_parquet(
"basic_nested.parquet",
datafusion::dataframe::DataFrameWriteOptions::new().with_single_file_output(true),
None,
).await?;
println!(" ✓ Basic file with nested structs, u64/bool/utf8 fields");
Ok(())
}
// Test 1: Large dataset (100K rows) - might trigger WASM memory issues
async fn create_large_dataset() -> Result<(), Box<dyn std::error::Error>> {
println!("📄 Creating large_100k.parquet (100,000 rows)...");
let nested_struct = Field::new(
"nested",
DataType::Struct(vec![
Field::new("id", DataType::UInt64, false),
Field::new("data", DataType::Utf8, false),
].into()),
false,
);
let schema = Schema::new(vec![
Field::new("row_id", DataType::UInt64, false),
Field::new("text", DataType::Utf8, false),
nested_struct,
]);
const ROWS: usize = 100_000;
let mut row_ids = Vec::with_capacity(ROWS);
let mut texts = Vec::with_capacity(ROWS);
let mut nested_ids = Vec::with_capacity(ROWS);
let mut nested_data = Vec::with_capacity(ROWS);
let base_text = "Lorem ipsum dolor sit amet ".repeat(20);
for i in 0..ROWS {
row_ids.push(i as u64);
texts.push(format!("{} {}", base_text, i));
nested_ids.push((i * 1000) as u64);
nested_data.push(format!("Nested_{}", i));
}
let nested_array = StructArray::from(vec![
(
Arc::new(Field::new("id", DataType::UInt64, false)),
Arc::new(UInt64Array::from(nested_ids)) as ArrayRef,
),
(
Arc::new(Field::new("data", DataType::Utf8, false)),
Arc::new(StringArray::from(nested_data)) as ArrayRef,
),
]);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(UInt64Array::from(row_ids)),
Arc::new(StringArray::from(texts)),
Arc::new(nested_array),
],
)?;
let mut config = SessionConfig::new();
config.options_mut().execution.parquet.compression = Some("snappy".to_string());
config.options_mut().execution.parquet.data_pagesize_limit = 1024;
config.options_mut().execution.parquet.max_row_group_size = 10000;
let ctx = SessionContext::new_with_config(config);
let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch]])?;
ctx.register_table("test", Arc::new(provider))?;
let df = ctx.table("test").await?;
df.write_parquet(
"large_100k.parquet",
datafusion::dataframe::DataFrameWriteOptions::new().with_single_file_output(true),
None,
).await?;
println!(" ✓ 100K rows with small pages/row groups");
Ok(())
}
// Test 2: Multiple batches with varying compression patterns
async fn create_multiple_batches() -> Result<(), Box<dyn std::error::Error>> {
println!("📄 Creating multiple_batches.parquet...");
let schema = Schema::new(vec![
Field::new("batch_id", DataType::UInt64, false),
Field::new("data", DataType::Utf8, false),
Field::new("value", DataType::Float64, true),
]);
let mut config = SessionConfig::new();
config.options_mut().execution.parquet.compression = Some("snappy".to_string());
config.options_mut().execution.parquet.max_row_group_size = 100;
let ctx = SessionContext::new_with_config(config);
let mut all_batches = Vec::new();
for batch_num in 0..10 {
let mut ids = Vec::new();
let mut data = Vec::new();
let mut values = Vec::new();
for i in 0..100 {
ids.push((batch_num * 100 + i) as u64);
match batch_num {
0..=2 => {
data.push("AAAAAAAAAA".repeat(10));
values.push(Some(1.0));
},
3..=5 => {
data.push(format!("{:x}{:x}{:x}", i, batch_num, i*batch_num));
values.push(None);
},
6..=7 => {
data.push(String::new());
values.push(Some(i as f64));
},
_ => {
data.push(format!("Batch_{}_Row_{}", batch_num, i));
values.push(if i % 2 == 0 { Some(i as f64) } else { None });
},
}
}
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(UInt64Array::from(ids)),
Arc::new(StringArray::from(data)),
Arc::new(Float64Array::from(values)),
],
)?;
all_batches.push(batch);
}
let provider = MemTable::try_new(Arc::new(schema), vec![all_batches])?;
ctx.register_table("test", Arc::new(provider))?;
let df = ctx.table("test").await?;
df.write_parquet(
"multiple_batches.parquet",
datafusion::dataframe::DataFrameWriteOptions::new().with_single_file_output(true),
None,
).await?;
println!(" ✓ 10 batches with varying compression patterns");
Ok(())
}
// Test 3: Binary/BLOB data with nested structs
async fn create_binary_nested() -> Result<(), Box<dyn std::error::Error>> {
println!("📄 Creating binary_nested.parquet...");
let nested_struct = Field::new(
"nested",
DataType::Struct(vec![
Field::new("id", DataType::UInt64, false),
Field::new("binary", DataType::Binary, false),
].into()),
false,
);
let schema = Schema::new(vec![
Field::new("id", DataType::UInt64, false),
Field::new("large_binary", DataType::Binary, false),
nested_struct,
]);
let rows = 1000;
let mut ids = Vec::new();
let mut large_binaries: Vec<Vec<u8>> = Vec::new();
let mut nested_ids = Vec::new();
let mut nested_binaries: Vec<Vec<u8>> = Vec::new();
for i in 0..rows {
ids.push(i as u64);
let mut large = vec![0u8; 10000];
for j in 0..large.len() {
large[j] = ((i * j) % 256) as u8;
}
large_binaries.push(large);
nested_ids.push((i * 100) as u64);
let mut nested = vec![0u8; 500];
for j in 0..nested.len() {
nested[j] = ((i + j) % 256) as u8;
}
nested_binaries.push(nested);
}
let large_binary_refs: Vec<&[u8]> = large_binaries.iter().map(|v| v.as_slice()).collect();
let nested_binary_refs: Vec<&[u8]> = nested_binaries.iter().map(|v| v.as_slice()).collect();
let nested_array = StructArray::from(vec![
(
Arc::new(Field::new("id", DataType::UInt64, false)),
Arc::new(UInt64Array::from(nested_ids)) as ArrayRef,
),
(
Arc::new(Field::new("binary", DataType::Binary, false)),
Arc::new(BinaryArray::from(nested_binary_refs)) as ArrayRef,
),
]);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(UInt64Array::from(ids)),
Arc::new(BinaryArray::from(large_binary_refs)),
Arc::new(nested_array),
],
)?;
let mut config = SessionConfig::new();
config.options_mut().execution.parquet.compression = Some("snappy".to_string());
config.options_mut().execution.parquet.data_pagesize_limit = 4096;
let ctx = SessionContext::new_with_config(config);
let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch]])?;
ctx.register_table("test", Arc::new(provider))?;
let df = ctx.table("test").await?;
df.write_parquet(
"binary_nested.parquet",
datafusion::dataframe::DataFrameWriteOptions::new().with_single_file_output(true),
None,
).await?;
println!(" ✓ Binary/BLOB data with nested structs");
Ok(())
}
// Test 4: Very deep nesting (4 levels)
async fn create_deep_nesting() -> Result<(), Box<dyn std::error::Error>> {
println!("📄 Creating deep_nested.parquet (4 levels deep)...");
let level4 = DataType::Struct(vec![
Field::new("l4_id", DataType::UInt64, false),
Field::new("l4_data", DataType::Utf8, true),
].into());
let level3 = DataType::Struct(vec![
Field::new("l3_id", DataType::UInt64, false),
Field::new("l3_nested", level4, false),
].into());
let level2 = DataType::Struct(vec![
Field::new("l2_name", DataType::Utf8, false),
Field::new("l2_nested", level3, false),
].into());
let level1 = Field::new(
"deep_nested",
DataType::Struct(vec![
Field::new("l1_id", DataType::UInt64, false),
Field::new("l1_nested", level2, false),
].into()),
false,
);
let schema = Schema::new(vec![
Field::new("id", DataType::UInt64, false),
level1,
]);
let rows = 100;
let mut ids = Vec::new();
let mut l1_ids = Vec::new();
let mut l2_names = Vec::new();
let mut l3_ids = Vec::new();
let mut l4_ids = Vec::new();
let mut l4_data = Vec::new();
for i in 0..rows {
ids.push(i as u64);
l1_ids.push((i * 1000) as u64);
l2_names.push(format!("L2_{}", i));
l3_ids.push((i * 100) as u64);
l4_ids.push((i * 10) as u64);
l4_data.push(if i % 3 == 0 { None } else { Some(format!("L4_{}", i)) });
}
let level4_array = StructArray::from(vec![
(
Arc::new(Field::new("l4_id", DataType::UInt64, false)),
Arc::new(UInt64Array::from(l4_ids)) as ArrayRef,
),
(
Arc::new(Field::new("l4_data", DataType::Utf8, true)),
Arc::new(StringArray::from(l4_data)) as ArrayRef,
),
]);
let level3_array = StructArray::from(vec![
(
Arc::new(Field::new("l3_id", DataType::UInt64, false)),
Arc::new(UInt64Array::from(l3_ids)) as ArrayRef,
),
(
Arc::new(Field::new("l3_nested", level4_array.data_type().clone(), false)),
Arc::new(level4_array) as ArrayRef,
),
]);
let level2_array = StructArray::from(vec![
(
Arc::new(Field::new("l2_name", DataType::Utf8, false)),
Arc::new(StringArray::from(l2_names)) as ArrayRef,
),
(
Arc::new(Field::new("l2_nested", level3_array.data_type().clone(), false)),
Arc::new(level3_array) as ArrayRef,
),
]);
let level1_array = StructArray::from(vec![
(
Arc::new(Field::new("l1_id", DataType::UInt64, false)),
Arc::new(UInt64Array::from(l1_ids)) as ArrayRef,
),
(
Arc::new(Field::new("l1_nested", level2_array.data_type().clone(), false)),
Arc::new(level2_array) as ArrayRef,
),
]);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(UInt64Array::from(ids)),
Arc::new(level1_array),
],
)?;
let mut config = SessionConfig::new();
config.options_mut().execution.parquet.compression = Some("snappy".to_string());
config.options_mut().execution.parquet.data_pagesize_limit = 2048;
let ctx = SessionContext::new_with_config(config);
let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch]])?;
ctx.register_table("test", Arc::new(provider))?;
let df = ctx.table("test").await?;
df.write_parquet(
"deep_nested.parquet",
datafusion::dataframe::DataFrameWriteOptions::new().with_single_file_output(true),
None,
).await?;
println!(" ✓ 4-level deep nested structures");
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment