Last active
December 13, 2024 07:30
-
-
Save praateekmahajan/f6c41fec9b9bba0b81fa2596541ec74e to your computer and use it in GitHub Desktop.
Dask DataFrame read behavior when schema / metadata (column order) across files doesn't match (for pandas / cudf)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"id": "86ced08c-6f70-4638-a1d9-444df3578034", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import dask.dataframe as dd\n", | |
"import dask\n", | |
"import tempfile\n", | |
"import pandas as pd\n", | |
"import os\n", | |
"import traceback\n", | |
"import cudf\n", | |
"import dask_cudf" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "3d033a22-757c-4424-b7e5-97ae57762deb", | |
"metadata": {}, | |
"source": [ | |
"## Create Data \n", | |
"\n", | |
"From a combination of the following two\n", | |
"\n", | |
"1. **Inconsistent Schema** : one file has `{\"id\", \"text\"}` columns while other has `{\"id\", \"text\", \"meta\"}`)\n", | |
"2. **Inconsistent Metadata** : one file has `{\"id\", \"text\"}` column while other has `{\"text\", \"id\"}`\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"id": "957dad78-6c0e-47dc-a314-ab2ee869ad0a", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"records = [\n", | |
" {\"id\": 123, \"text\": \"foo\"},\n", | |
" {\n", | |
" \"text\": \"bar\",\n", | |
" \"meta1\": [{\"field1\": \"cat\"}],\n", | |
" \"id\": 456,\n", | |
" },\n", | |
" {\"text\": \"foo\", \"id\": 456},\n", | |
"]\n", | |
"columns = [\"text\", \"id\"]\n", | |
"\n", | |
"\n", | |
"\n", | |
"inconsistent_schema_inconsistent_metadata = [records[0], records[1]]\n", | |
"inconsistent_schema_consistent_metadata = [records[1], records[2]]\n", | |
"consistent_schema_inconsistent_metadata = [records[0], records[2]]\n", | |
"consistent_schema_consistent_metadata = [records[0], records[0]]" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "54a2420a-a309-4496-9fb4-b7d2305559e8", | |
"metadata": {}, | |
"source": [ | |
"## Read using JSON" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"id": "d79cd005-fcb0-4b77-af06-f43e3c816636", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"schema ❌ metadata ❌\n", | |
"\tcudf ❌ due to e=ValueError(\"The columns in the computed data do not match the columns in the provided metadata.\\nOrder of columns does not match.\\nActual: ['text', 'id']\\nExpected: ['id', 'text']\")\n", | |
"\tpandas ❌ due to e=ValueError(\"Metadata mismatch found in `from_delayed`.\\n\\nPartition type: `pandas.core.frame.DataFrame`\\n+---------+--------+----------+\\n| Column | Found | Expected |\\n+---------+--------+----------+\\n| 'meta1' | object | - |\\n+---------+--------+----------+\")\n", | |
"\n", | |
"\n", | |
"schema ❌ metadata ✅\n", | |
"\tcudf ❌ due to e=ValueError(\"The columns in the computed data do not match the columns in the provided metadata.\\nOrder of columns does not match.\\nActual: ['text', 'id']\\nExpected: ['id', 'text']\")\n", | |
"\tpandas ❌ due to e=ValueError(\"Metadata mismatch found in `from_delayed`.\\n\\nPartition type: `pandas.core.frame.DataFrame`\\n+---------+-------+----------+\\n| Column | Found | Expected |\\n+---------+-------+----------+\\n| 'meta1' | - | object |\\n+---------+-------+----------+\")\n", | |
"\n", | |
"\n", | |
"schema ✅ metadata ❌\n", | |
"\tcudf ❌ due to e=ValueError(\"The columns in the computed data do not match the columns in the provided metadata.\\nOrder of columns does not match.\\nActual: ['text', 'id']\\nExpected: ['id', 'text']\")\n", | |
"\tpandas ❌ due to e=ValueError(\"The columns in the computed data do not match the columns in the provided metadata.\\nOrder of columns does not match.\\nActual: ['text', 'id']\\nExpected: ['id', 'text']\")\n", | |
"\n", | |
"\n", | |
"schema ✅ metadata ✅\n", | |
"\tcudf ✅\n", | |
"\tpandas ✅\n", | |
"\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"for bug_name, bug_list in [\n", | |
" (\"schema ❌ metadata ❌\", inconsistent_schema_inconsistent_metadata),\n", | |
" (\"schema ❌ metadata ✅\", inconsistent_schema_consistent_metadata),\n", | |
" (\"schema ✅ metadata ❌\", consistent_schema_inconsistent_metadata),\n", | |
" (\"schema ✅ metadata ✅\", consistent_schema_consistent_metadata),\n", | |
"]:\n", | |
" with tempfile.TemporaryDirectory() as tmpdir:\n", | |
" file1 = os.path.join(tmpdir, \"part.0.jsonl\")\n", | |
" file2 = os.path.join(tmpdir, \"part.1.jsonl\")\n", | |
" \n", | |
" # Dump files now reload them\n", | |
" for i, record in enumerate(bug_list):\n", | |
" # print(f\"\\t{bug_name=} record({i})={record}\")\n", | |
" pd.DataFrame([record]).to_json(\n", | |
" os.path.join(tmpdir, f\"part.{i}.jsonl\"), \n", | |
" orient=\"records\", lines=True,\n", | |
" )\n", | |
" \n", | |
" print(f\"{bug_name}\")\n", | |
" for backend in [\"cudf\", \"pandas\"]:\n", | |
" read_kwargs = dict()\n", | |
" read_kwargs[\"dtype\"] = {\"id\": \"str\", \"text\": \"str\"}\n", | |
" if backend == \"cudf\":\n", | |
" read_kwargs[\"meta\"] = cudf.from_pandas(\n", | |
" dask.dataframe.utils.make_meta({\"id\": \"str\", \"text\": \"str\"})\n", | |
" )\n", | |
" read_kwargs[\"prune_columns\"] = True\n", | |
" \n", | |
" backend_str = f\"{backend} ❌\"\n", | |
" try:\n", | |
" with dask.config.set({\"dataframe.backend\": backend}):\n", | |
" df = dd.read_json(\n", | |
" [file1, file2],\n", | |
" lines=True,\n", | |
" **read_kwargs,\n", | |
" )\n", | |
" with tempfile.TemporaryDirectory() as tmpdir:\n", | |
" df.to_parquet(tmpdir)\n", | |
" backend_str = f\"{backend} ✅\"\n", | |
" except Exception as e:\n", | |
" backend_str += f\" due to {e=}\"\n", | |
" continue\n", | |
" finally:\n", | |
" print(f\"\\t{backend_str}\")\n", | |
" print(\"\\n\")" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "4836f137-0f71-4e4e-a008-2306430a904f", | |
"metadata": {}, | |
"source": [ | |
"## Read using Parquet" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"id": "17e67a70-9d13-48d2-b7d6-49462981a98d", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"schema ❌ metadata ❌\n", | |
"\tcudf ❌ due to e=KeyError('Column meta1 does not exist in schema')\n", | |
"\tpandas ✅\n", | |
"\n", | |
"\n", | |
"schema ❌ metadata ✅\n", | |
"\tcudf ✅\n", | |
"\tpandas ❌ due to e=KeyError(\"['meta1'] not in index\")\n", | |
"\n", | |
"\n", | |
"schema ✅ metadata ❌\n", | |
"\tcudf ✅\n", | |
"\tpandas ✅\n", | |
"\n", | |
"\n", | |
"schema ✅ metadata ✅\n", | |
"\tcudf ✅\n", | |
"\tpandas ✅\n", | |
"\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"for bug_name, bug_list in [\n", | |
" (\"schema ❌ metadata ❌\", inconsistent_schema_inconsistent_metadata),\n", | |
" (\"schema ❌ metadata ✅\", inconsistent_schema_consistent_metadata),\n", | |
" (\"schema ✅ metadata ❌\", consistent_schema_inconsistent_metadata),\n", | |
" (\"schema ✅ metadata ✅\", consistent_schema_consistent_metadata),\n", | |
"]:\n", | |
" # Inconsistent Schema \n", | |
" \n", | |
" with tempfile.TemporaryDirectory() as tmpdir:\n", | |
" file1 = os.path.join(tmpdir, \"part.0.parquet\")\n", | |
" file2 = os.path.join(tmpdir, \"part.1.parquet\")\n", | |
" \n", | |
" # Dump files now reload them\n", | |
" for i, record in enumerate(bug_list):\n", | |
" # print(f\"\\t{bug_name=} record({i})={record}\")\n", | |
" pd.DataFrame([record]).to_parquet(\n", | |
" os.path.join(tmpdir, f\"part.{i}.parquet\"), \n", | |
" )\n", | |
"\n", | |
" print(f\"{bug_name}\")\n", | |
" for backend in [\"cudf\", \"pandas\"]:\n", | |
" read_kwargs = dict()\n", | |
" if backend == \"cudf\":\n", | |
" read_kwargs[\"allow_mismatched_pq_schemas\"] = True\n", | |
" \n", | |
" backend_str = f\"{backend} ❌\"\n", | |
" try:\n", | |
" with dask.config.set({\"dataframe.backend\": backend}):\n", | |
" df = dd.read_parquet(\n", | |
" [file1, file2],\n", | |
" **read_kwargs,\n", | |
" )\n", | |
" with tempfile.TemporaryDirectory() as tmpdir:\n", | |
" df.to_parquet(tmpdir)\n", | |
" backend_str = f\"{backend} ✅\"\n", | |
" except Exception as e:\n", | |
" backend_str += f\" due to {e=}\"\n", | |
" continue\n", | |
" finally:\n", | |
" print(f\"\\t{backend_str}\")\n", | |
" print(\"\\n\")" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3 (ipykernel)", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.10.16" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 5 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment