Created
December 20, 2023 18:01
-
-
Save rjzamora/2c55d7e1c614b7df8f27dd92205a43fa to your computer and use it in GitHub Desktop.
Experimenting with simpler ``blocksize`` logic for ``read_parquet``
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"id": "e25aa48f-d4c4-498d-b30a-0ff0704a7fce", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import math\n", | |
"import dask_expr as dx\n", | |
"import pyarrow.dataset as ds\n", | |
"from fsspec.core import get_fs_token_paths\n", | |
"from dask.utils import parse_bytes\n", | |
"import pandas as pd" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"id": "53fe730c-b21e-4c43-a792-73f7576c2b9b", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"###\n", | |
"### Helper functions\n", | |
"###\n", | |
"\n", | |
"# Main function to construct a single DataFrame partition\n", | |
"def read_partition(\n", | |
" partition_info,\n", | |
" columns=None,\n", | |
" filters=None,\n", | |
" filesystem=None,\n", | |
" dataset_kwargs=None,\n", | |
" convert_kwargs=None,\n", | |
"):\n", | |
" # MAIN IDEA:\n", | |
" # We shouldn't need to parse parquet metadata ahead of time to\n", | |
" # aggregate files or split files when defining the mapping\n", | |
" # between files and output partitions. To aggregate files,\n", | |
" # we can simply assing a list of paths to a given partitions.\n", | |
" # To split files, we can the same path to multiple partitions.\n", | |
" # This is fine, as long as `read_partition` knows how many\n", | |
" # other partitions are being generated from the same path\n", | |
" # (`num_ranks`), and the index of THIS partition (`rank`).\n", | |
" # This way, the necessary metadata can be parsed at IO time.\n", | |
" # (where the metadata is already parsed anyway)\n", | |
" paths, rank, num_ranks = partition_info\n", | |
"\n", | |
" if num_ranks > 1:\n", | |
" # Probably no good reason to support this.\n", | |
" # We should either split a file or map multiple files to a partition\n", | |
" assert len(paths) == 1\n", | |
"\n", | |
" dataset = ds.dataset(paths, filesystem=filesystem, **(dataset_kwargs or {}))\n", | |
" if num_ranks == 1:\n", | |
" # Simple case: This rank is reading all of dataset\n", | |
" table = dataset.to_table(\n", | |
" columns=columns,\n", | |
" filter=filters,\n", | |
" )\n", | |
" else:\n", | |
" # Other case: This rank is responsible for a subset of dataset\n", | |
" table = _get_partial_table(\n", | |
" dataset,\n", | |
" rank,\n", | |
" num_ranks,\n", | |
" columns,\n", | |
" filters,\n", | |
" )\n", | |
"\n", | |
" # Convert to pandas\n", | |
" return table.to_pandas(**(convert_kwargs or {}))\n", | |
"\n", | |
"\n", | |
"def _get_partial_table(\n", | |
" dataset,\n", | |
" rank,\n", | |
" num_ranks,\n", | |
" columns,\n", | |
" filters,\n", | |
"):\n", | |
" # This is used by `read_partition` (above)\n", | |
" # NOTE: dataset must be a single file (for now)\n", | |
" file_fragment = next(dataset.get_fragments())\n", | |
" num_row_groups = file_fragment.num_row_groups\n", | |
" rg_stride = math.ceil(num_row_groups / num_ranks)\n", | |
" rg_start = rg_stride * rank\n", | |
" rg_end = min(rg_start + rg_stride, num_row_groups)\n", | |
" if rg_end > rg_start:\n", | |
" fragment = file_fragment.format.make_fragment(\n", | |
" # TODO: Do these args cover everything?\n", | |
" file_fragment.path,\n", | |
" file_fragment.filesystem,\n", | |
" file_fragment.partition_expression,\n", | |
" row_groups=range(rg_start, rg_end),\n", | |
" )\n", | |
" table = fragment.to_table(columns=columns, filter=filters)\n", | |
" else:\n", | |
" # Nothing to read for this task?\n", | |
" table = dataset.schema.empty_table()\n", | |
" return table\n", | |
"\n", | |
"\n", | |
"# Aggregate small files\n", | |
"def aggregate_small_partitions(partitions, blocksize, columns, column_factors, file_sizes):\n", | |
" if blocksize is None:\n", | |
" return partitions\n", | |
"\n", | |
" size_factor = column_factors[columns].sum() if columns else column_factors.sum()\n", | |
" aggregated_partitions = []\n", | |
" blocksize = parse_bytes(blocksize)\n", | |
" group = None\n", | |
" group_size = 0\n", | |
" for part in partitions:\n", | |
" paths = part[0]\n", | |
" assert len(paths) > 0\n", | |
" size = 0\n", | |
" for path in paths:\n", | |
" size += size_factor * file_sizes.get(path, blocksize)\n", | |
" if group is None:\n", | |
" group = paths.copy()\n", | |
" group_size = size\n", | |
" elif group_size + size > blocksize:\n", | |
" aggregated_partitions.append((group, 0, 1))\n", | |
" group = paths.copy()\n", | |
" group_size = size\n", | |
" else:\n", | |
" # TODO: May need to enforce other \"aggregation rules\" here\n", | |
" group += paths.copy()\n", | |
" group_size += size\n", | |
" aggregated_partitions.append((group, 0, 1))\n", | |
" return aggregated_partitions\n", | |
"\n", | |
"\n", | |
"# Split large files\n", | |
"def split_large_partitions(partitions, blocksize, columns, column_factors, file_sizes):\n", | |
" if blocksize is None:\n", | |
" return partitions\n", | |
" size_factor = column_factors[columns].sum() if columns else column_factors.sum()\n", | |
" split_partitions = []\n", | |
" blocksize = parse_bytes(blocksize)\n", | |
" for part in partitions:\n", | |
" paths = part[0]\n", | |
" if len(paths) > 1:\n", | |
" split_partitions.append(part)\n", | |
" continue\n", | |
" assert len(paths) == 1\n", | |
" size = size_factor * file_sizes.get(paths[0], blocksize)\n", | |
" if size > blocksize:\n", | |
" # This is a large file. Split it into multiple partitions\n", | |
" num_ranks = round(size / blocksize)\n", | |
" for rank in range(num_ranks):\n", | |
" split_partitions.append((paths, rank, num_ranks))\n", | |
" else:\n", | |
" split_partitions.append(part)\n", | |
" return split_partitions" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"id": "3beb6900-fa8f-4619-afaa-f987f3142458", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# Define rough `read_parquet` API\n", | |
"def read_parquet(\n", | |
" urlpath,\n", | |
" blocksize=\"128 MiB\",\n", | |
" columns=None,\n", | |
" storage_options=None,\n", | |
" dataset_kwargs=None,\n", | |
"):\n", | |
" ###\n", | |
" ### STEP 1: Extract filesystem and PyArrow Dataset\n", | |
" ###\n", | |
" storage_options = storage_options or {}\n", | |
" # TODO: Handle PyArrow filesystem\n", | |
" fs, _, paths = get_fs_token_paths(\n", | |
" urlpath, mode=\"rb\", storage_options=storage_options\n", | |
" )\n", | |
" # TODO: Make sure we correctly handle dir/path/list/etc.\n", | |
" dataset = ds.dataset(paths[0], filesystem=fs, **(dataset_kwargs or {}))\n", | |
" meta = dataset.schema.empty_table().to_pandas()\n", | |
" \n", | |
" ###\n", | |
" ### STEP 2: Perform default partitioning\n", | |
" ###\n", | |
" # NOTE: When this is written as a dedicated Expr,\n", | |
" # this step won't be necessary until graph construction\n", | |
" partitions = [([path], 0, 1) for path in dataset.files]\n", | |
" \n", | |
" ###~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n", | |
" ### Handle blocksize optimizations\n", | |
" ###~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n", | |
" if blocksize is not None:\n", | |
" # NOTE: When this is written as a dedicated Expr,\n", | |
" # this step won't be necessary until graph construction\n", | |
"\n", | |
" # Get file sizes from the filesystem\n", | |
" # (this is MUCH faster than parsing parquet metadata)\n", | |
" file_sizes = {path: fs.info(path)[\"size\"] for path in dataset.files}\n", | |
" fn, du = next(iter(file_sizes.items()))\n", | |
"\n", | |
" # Use a single file to calucluate a rough relationship\n", | |
" # between columns and uncompressed storage size\n", | |
" for file_frag in dataset.get_fragments():\n", | |
" total_uncompressed_file_size = 0\n", | |
" column_factors = {name: 0 for name in dataset.schema.names}\n", | |
" for row_group in file_frag.row_groups:\n", | |
" for cc in range(row_group.metadata.num_columns):\n", | |
" col_chunk = row_group.metadata.column(cc)\n", | |
" name = col_chunk.path_in_schema\n", | |
" col_chunk_size = col_chunk.total_uncompressed_size\n", | |
" column_factors[name] += col_chunk_size\n", | |
" total_uncompressed_file_size += col_chunk_size\n", | |
" break # Only want to sample the first file\n", | |
" expansion_factor = total_uncompressed_file_size / du\n", | |
" for column_name in list(column_factors.keys()):\n", | |
" column_factors[column_name] /= total_uncompressed_file_size\n", | |
" column_factors[column_name] *= expansion_factor\n", | |
" # `column_factors` now provides a reasonable estimate for how\n", | |
" # much each file will expand in memory from each column.\n", | |
" # For example, if we want to know how much a 1MiB file is\n", | |
" # likely to expand in memory, we can do the following:\n", | |
" # `int(parse_bytes(\"1MiB\") * column_factors[columns].sum())`\n", | |
" column_factors = pd.Series(column_factors)\n", | |
"\n", | |
" # Apply optimizations to modify the default `partitions`\n", | |
" partitions = aggregate_small_partitions(partitions, blocksize, columns, column_factors, file_sizes)\n", | |
" partitions = split_large_partitions(partitions, blocksize, columns, column_factors, file_sizes)\n", | |
"\n", | |
" # Using `from_map` for now, but this would need to be a new `Expr`\n", | |
" # to delay the `blocksize` optimizations above from running until\n", | |
" # after column projection\n", | |
" return dx.from_map(read_partition, partitions, columns=columns, filesystem=fs, meta=meta, enforce_metadata=False)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"id": "9d0f0784-363b-4588-af51-4c1d218d45c2", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"if True:\n", | |
" # Remote data\n", | |
" path = \"s3://coiled-data/tpch/scale-1000/customer\"\n", | |
" storage_options = {\"anon\": True}\n", | |
" columns = [\"c_custkey\", \"c_address\"]\n", | |
" blocksize = \"32MiB\"\n", | |
"else:\n", | |
" # Local data\n", | |
" path = \"/datasets/rzamora/crit_pq_int\"\n", | |
" storage_options = None\n", | |
" columns = [\"C1\", \"C2\"]\n", | |
" blocksize = \"128MiB\"" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"id": "88533ddf-c002-4ef9-965d-65532f740d9b", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"partition size: 150000\n", | |
"CPU times: user 1.28 s, sys: 351 ms, total: 1.63 s\n", | |
"Wall time: 4.49 s\n" | |
] | |
} | |
], | |
"source": [ | |
"%%time\n", | |
"\n", | |
"df = read_parquet(path, blocksize=blocksize, storage_options=storage_options)\n", | |
"print(f\"partition size: {len(df.partitions[0].compute())}\")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"id": "f1133022-7eb3-4311-90c8-c5f427565ed7", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"partition size: 900000\n", | |
"CPU times: user 931 ms, sys: 222 ms, total: 1.15 s\n", | |
"Wall time: 3.17 s\n" | |
] | |
} | |
], | |
"source": [ | |
"%%time\n", | |
"\n", | |
"# When columns are projected, we can increase the partiton size\n", | |
"df2 = read_parquet(path, columns=columns, blocksize=blocksize, storage_options=storage_options)\n", | |
"print(f\"partition size: {len(df2.partitions[0].compute())}\")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"id": "6c6e961c-89a7-43a3-b016-28c2ceaf51d7", | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3 (ipykernel)", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.10.13" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 5 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment