Skip to content

Instantly share code, notes, and snippets.

@crypdick
Created August 8, 2025 20:04
Show Gist options
  • Save crypdick/53fbd17978fc6609717aae437bfd4e98 to your computer and use it in GitHub Desktop.
Save crypdick/53fbd17978fc6609717aae437bfd4e98 to your computer and use it in GitHub Desktop.
Convert large Arrow shards into Parquet without loading the entire dataset into memory.
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "pyarrow",
# ]
# ///
"""Convert .arrow shards to Parquet without loading entire dataset into memory.
- Discovers all .arrow files under a given source directory
- Converts each file to Parquet
- Uses streaming in order to keep memory bounded and convert files larger than available RAM
- Handles both Arrow IPC File and Stream formats (tries file, falls back to stream)
Usage:
uv run convert_arrow_to_parquet_streaming.py --source-dir /path/to/arrow_files --output-dir /path/to/parquet_files
Notes:
- Use `--preserve-subdirs` to mirror the input directory tree under the output dir.
- Use `--overwrite` to re-create files; otherwise existing outputs are skipped.
"""
import os
import sys
import glob
from pathlib import Path
from typing import List
import pyarrow as pa
import pyarrow.ipc as ipc
import pyarrow.parquet as pq
def find_arrow_files(source_dir: str) -> List[str]:
pattern = os.path.join(source_dir, "**", "*.arrow")
return sorted(glob.glob(pattern, recursive=True))
def convert_arrow_to_parquet(source_path: str, dest_path: str) -> None:
"""Convert a single .arrow file into a .parquet file using streaming batches.
This tries File format first, then falls back to Stream format.
"""
Path(os.path.dirname(dest_path)).mkdir(parents=True, exist_ok=True)
with pa.memory_map(source_path, "r") as in_stream:
writer = None
try:
reader = ipc.open_file(in_stream)
schema = reader.schema
writer = pq.ParquetWriter(dest_path, schema)
for i in range(reader.num_record_batches):
batch = reader.get_batch(i)
writer.write_batch(batch)
except pa.lib.ArrowInvalid:
in_stream.seek(0)
reader = ipc.open_stream(in_stream)
schema = reader.schema
writer = pq.ParquetWriter(dest_path, schema)
for batch in reader:
writer.write_batch(batch)
finally:
if writer is not None:
writer.close()
def main() -> int:
import argparse
parser = argparse.ArgumentParser(description="Convert Arrow shards to Parquet.")
parser.add_argument(
"--source-dir",
help="Directory containing .arrow files",
)
parser.add_argument(
"--output-dir",
default="parq_convert",
help="Directory to write .parquet files",
)
parser.add_argument(
"--overwrite", action="store_true", help="Overwrite existing parquet files"
)
parser.add_argument(
"--preserve-subdirs",
action="store_true",
help="Preserve input subdirectory structure inside output dir",
)
args = parser.parse_args()
source_dir = os.path.abspath(args.source_dir)
output_dir = os.path.abspath(args.output_dir)
Path(output_dir).mkdir(parents=True, exist_ok=True)
arrow_files = find_arrow_files(source_dir)
if not arrow_files:
print(f"No .arrow files found under {source_dir}", file=sys.stderr)
return 1
print(f"Found {len(arrow_files)} .arrow files under {source_dir}")
converted_count = 0
for idx, arrow_path in enumerate(arrow_files, start=1):
arrow_path = os.path.abspath(arrow_path)
if args.preserve_subdirs:
rel = os.path.relpath(arrow_path, source_dir)
parquet_path = os.path.join(
output_dir, os.path.splitext(rel)[0] + ".parquet"
)
else:
parquet_name = (
os.path.splitext(os.path.basename(arrow_path))[0] + ".parquet"
)
parquet_path = os.path.join(output_dir, parquet_name)
if os.path.exists(parquet_path) and not args.overwrite:
print(f"[{idx}/{len(arrow_files)}] Skip (exists): {parquet_path}")
continue
try:
convert_arrow_to_parquet(arrow_path, parquet_path)
pf = pq.ParquetFile(parquet_path)
print(
f"[{idx}/{len(arrow_files)}] Wrote {parquet_path} "
f"(row_groups={pf.metadata.num_row_groups}, rows={pf.metadata.num_rows})"
)
converted_count += 1
except Exception as exc: # noqa: BLE001
print(
f"[{idx}/{len(arrow_files)}] ERROR converting {arrow_path}: {exc}",
file=sys.stderr,
)
print(
f"Done. Converted {converted_count}/{len(arrow_files)} files into {output_dir}"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment