Created
August 8, 2025 20:04
-
-
Save crypdick/53fbd17978fc6609717aae437bfd4e98 to your computer and use it in GitHub Desktop.
Convert large Arrow shards into Parquet without loading the entire dataset into memory.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.12" | |
| # dependencies = [ | |
| # "pyarrow", | |
| # ] | |
| # /// | |
| """Convert .arrow shards to Parquet without loading entire dataset into memory. | |
| - Discovers all .arrow files under a given source directory | |
| - Converts each file to Parquet | |
| - Uses streaming in order to keep memory bounded and convert files larger than available RAM | |
| - Handles both Arrow IPC File and Stream formats (tries file, falls back to stream) | |
| Usage: | |
| uv run convert_arrow_to_parquet_streaming.py --source-dir /path/to/arrow_files --output-dir /path/to/parquet_files | |
| Notes: | |
| - Use `--preserve-subdirs` to mirror the input directory tree under the output dir. | |
| - Use `--overwrite` to re-create files; otherwise existing outputs are skipped. | |
| """ | |
| import os | |
| import sys | |
| import glob | |
| from pathlib import Path | |
| from typing import List | |
| import pyarrow as pa | |
| import pyarrow.ipc as ipc | |
| import pyarrow.parquet as pq | |
| def find_arrow_files(source_dir: str) -> List[str]: | |
| pattern = os.path.join(source_dir, "**", "*.arrow") | |
| return sorted(glob.glob(pattern, recursive=True)) | |
| def convert_arrow_to_parquet(source_path: str, dest_path: str) -> None: | |
| """Convert a single .arrow file into a .parquet file using streaming batches. | |
| This tries File format first, then falls back to Stream format. | |
| """ | |
| Path(os.path.dirname(dest_path)).mkdir(parents=True, exist_ok=True) | |
| with pa.memory_map(source_path, "r") as in_stream: | |
| writer = None | |
| try: | |
| reader = ipc.open_file(in_stream) | |
| schema = reader.schema | |
| writer = pq.ParquetWriter(dest_path, schema) | |
| for i in range(reader.num_record_batches): | |
| batch = reader.get_batch(i) | |
| writer.write_batch(batch) | |
| except pa.lib.ArrowInvalid: | |
| in_stream.seek(0) | |
| reader = ipc.open_stream(in_stream) | |
| schema = reader.schema | |
| writer = pq.ParquetWriter(dest_path, schema) | |
| for batch in reader: | |
| writer.write_batch(batch) | |
| finally: | |
| if writer is not None: | |
| writer.close() | |
| def main() -> int: | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Convert Arrow shards to Parquet.") | |
| parser.add_argument( | |
| "--source-dir", | |
| help="Directory containing .arrow files", | |
| ) | |
| parser.add_argument( | |
| "--output-dir", | |
| default="parq_convert", | |
| help="Directory to write .parquet files", | |
| ) | |
| parser.add_argument( | |
| "--overwrite", action="store_true", help="Overwrite existing parquet files" | |
| ) | |
| parser.add_argument( | |
| "--preserve-subdirs", | |
| action="store_true", | |
| help="Preserve input subdirectory structure inside output dir", | |
| ) | |
| args = parser.parse_args() | |
| source_dir = os.path.abspath(args.source_dir) | |
| output_dir = os.path.abspath(args.output_dir) | |
| Path(output_dir).mkdir(parents=True, exist_ok=True) | |
| arrow_files = find_arrow_files(source_dir) | |
| if not arrow_files: | |
| print(f"No .arrow files found under {source_dir}", file=sys.stderr) | |
| return 1 | |
| print(f"Found {len(arrow_files)} .arrow files under {source_dir}") | |
| converted_count = 0 | |
| for idx, arrow_path in enumerate(arrow_files, start=1): | |
| arrow_path = os.path.abspath(arrow_path) | |
| if args.preserve_subdirs: | |
| rel = os.path.relpath(arrow_path, source_dir) | |
| parquet_path = os.path.join( | |
| output_dir, os.path.splitext(rel)[0] + ".parquet" | |
| ) | |
| else: | |
| parquet_name = ( | |
| os.path.splitext(os.path.basename(arrow_path))[0] + ".parquet" | |
| ) | |
| parquet_path = os.path.join(output_dir, parquet_name) | |
| if os.path.exists(parquet_path) and not args.overwrite: | |
| print(f"[{idx}/{len(arrow_files)}] Skip (exists): {parquet_path}") | |
| continue | |
| try: | |
| convert_arrow_to_parquet(arrow_path, parquet_path) | |
| pf = pq.ParquetFile(parquet_path) | |
| print( | |
| f"[{idx}/{len(arrow_files)}] Wrote {parquet_path} " | |
| f"(row_groups={pf.metadata.num_row_groups}, rows={pf.metadata.num_rows})" | |
| ) | |
| converted_count += 1 | |
| except Exception as exc: # noqa: BLE001 | |
| print( | |
| f"[{idx}/{len(arrow_files)}] ERROR converting {arrow_path}: {exc}", | |
| file=sys.stderr, | |
| ) | |
| print( | |
| f"Done. Converted {converted_count}/{len(arrow_files)} files into {output_dir}" | |
| ) | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment