Skip to content

Instantly share code, notes, and snippets.

@praateekmahajan
Last active December 13, 2024 07:30
Show Gist options
  • Save praateekmahajan/f6c41fec9b9bba0b81fa2596541ec74e to your computer and use it in GitHub Desktop.
Save praateekmahajan/f6c41fec9b9bba0b81fa2596541ec74e to your computer and use it in GitHub Desktop.
Dask DataFrame read behavior when schema / metadata (column order) across files doesn't match (for pandas / cudf)
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "86ced08c-6f70-4638-a1d9-444df3578034",
"metadata": {},
"outputs": [],
"source": [
"import dask.dataframe as dd\n",
"import dask\n",
"import tempfile\n",
"import pandas as pd\n",
"import os\n",
"import traceback\n",
"import cudf\n",
"import dask_cudf"
]
},
{
"cell_type": "markdown",
"id": "3d033a22-757c-4424-b7e5-97ae57762deb",
"metadata": {},
"source": [
"## Create Data \n",
"\n",
"From a combination of the following two\n",
"\n",
"1. **Inconsistent Schema** : one file has `{\"id\", \"text\"}` columns while other has `{\"id\", \"text\", \"meta\"}`)\n",
"2. **Inconsistent Metadata** : one file has `{\"id\", \"text\"}` column while other has `{\"text\", \"id\"}`\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "957dad78-6c0e-47dc-a314-ab2ee869ad0a",
"metadata": {},
"outputs": [],
"source": [
"records = [\n",
" {\"id\": 123, \"text\": \"foo\"},\n",
" {\n",
" \"text\": \"bar\",\n",
" \"meta1\": [{\"field1\": \"cat\"}],\n",
" \"id\": 456,\n",
" },\n",
" {\"text\": \"foo\", \"id\": 456},\n",
"]\n",
"columns = [\"text\", \"id\"]\n",
"\n",
"\n",
"\n",
"inconsistent_schema_inconsistent_metadata = [records[0], records[1]]\n",
"inconsistent_schema_consistent_metadata = [records[1], records[2]]\n",
"consistent_schema_inconsistent_metadata = [records[0], records[2]]\n",
"consistent_schema_consistent_metadata = [records[0], records[0]]"
]
},
{
"cell_type": "markdown",
"id": "54a2420a-a309-4496-9fb4-b7d2305559e8",
"metadata": {},
"source": [
"## Read using JSON"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "d79cd005-fcb0-4b77-af06-f43e3c816636",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"schema ❌ metadata ❌\n",
"\tcudf ❌ due to e=ValueError(\"The columns in the computed data do not match the columns in the provided metadata.\\nOrder of columns does not match.\\nActual: ['text', 'id']\\nExpected: ['id', 'text']\")\n",
"\tpandas ❌ due to e=ValueError(\"Metadata mismatch found in `from_delayed`.\\n\\nPartition type: `pandas.core.frame.DataFrame`\\n+---------+--------+----------+\\n| Column | Found | Expected |\\n+---------+--------+----------+\\n| 'meta1' | object | - |\\n+---------+--------+----------+\")\n",
"\n",
"\n",
"schema ❌ metadata ✅\n",
"\tcudf ❌ due to e=ValueError(\"The columns in the computed data do not match the columns in the provided metadata.\\nOrder of columns does not match.\\nActual: ['text', 'id']\\nExpected: ['id', 'text']\")\n",
"\tpandas ❌ due to e=ValueError(\"Metadata mismatch found in `from_delayed`.\\n\\nPartition type: `pandas.core.frame.DataFrame`\\n+---------+-------+----------+\\n| Column | Found | Expected |\\n+---------+-------+----------+\\n| 'meta1' | - | object |\\n+---------+-------+----------+\")\n",
"\n",
"\n",
"schema ✅ metadata ❌\n",
"\tcudf ❌ due to e=ValueError(\"The columns in the computed data do not match the columns in the provided metadata.\\nOrder of columns does not match.\\nActual: ['text', 'id']\\nExpected: ['id', 'text']\")\n",
"\tpandas ❌ due to e=ValueError(\"The columns in the computed data do not match the columns in the provided metadata.\\nOrder of columns does not match.\\nActual: ['text', 'id']\\nExpected: ['id', 'text']\")\n",
"\n",
"\n",
"schema ✅ metadata ✅\n",
"\tcudf ✅\n",
"\tpandas ✅\n",
"\n",
"\n"
]
}
],
"source": [
"for bug_name, bug_list in [\n",
" (\"schema ❌ metadata ❌\", inconsistent_schema_inconsistent_metadata),\n",
" (\"schema ❌ metadata ✅\", inconsistent_schema_consistent_metadata),\n",
" (\"schema ✅ metadata ❌\", consistent_schema_inconsistent_metadata),\n",
" (\"schema ✅ metadata ✅\", consistent_schema_consistent_metadata),\n",
"]:\n",
" with tempfile.TemporaryDirectory() as tmpdir:\n",
" file1 = os.path.join(tmpdir, \"part.0.jsonl\")\n",
" file2 = os.path.join(tmpdir, \"part.1.jsonl\")\n",
" \n",
" # Dump files now reload them\n",
" for i, record in enumerate(bug_list):\n",
" # print(f\"\\t{bug_name=} record({i})={record}\")\n",
" pd.DataFrame([record]).to_json(\n",
" os.path.join(tmpdir, f\"part.{i}.jsonl\"), \n",
" orient=\"records\", lines=True,\n",
" )\n",
" \n",
" print(f\"{bug_name}\")\n",
" for backend in [\"cudf\", \"pandas\"]:\n",
" read_kwargs = dict()\n",
" read_kwargs[\"dtype\"] = {\"id\": \"str\", \"text\": \"str\"}\n",
" if backend == \"cudf\":\n",
" read_kwargs[\"meta\"] = cudf.from_pandas(\n",
" dask.dataframe.utils.make_meta({\"id\": \"str\", \"text\": \"str\"})\n",
" )\n",
" read_kwargs[\"prune_columns\"] = True\n",
" \n",
" backend_str = f\"{backend} ❌\"\n",
" try:\n",
" with dask.config.set({\"dataframe.backend\": backend}):\n",
" df = dd.read_json(\n",
" [file1, file2],\n",
" lines=True,\n",
" **read_kwargs,\n",
" )\n",
" with tempfile.TemporaryDirectory() as tmpdir:\n",
" df.to_parquet(tmpdir)\n",
" backend_str = f\"{backend} ✅\"\n",
" except Exception as e:\n",
" backend_str += f\" due to {e=}\"\n",
" continue\n",
" finally:\n",
" print(f\"\\t{backend_str}\")\n",
" print(\"\\n\")"
]
},
{
"cell_type": "markdown",
"id": "4836f137-0f71-4e4e-a008-2306430a904f",
"metadata": {},
"source": [
"## Read using Parquet"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "17e67a70-9d13-48d2-b7d6-49462981a98d",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"schema ❌ metadata ❌\n",
"\tcudf ❌ due to e=KeyError('Column meta1 does not exist in schema')\n",
"\tpandas ✅\n",
"\n",
"\n",
"schema ❌ metadata ✅\n",
"\tcudf ✅\n",
"\tpandas ❌ due to e=KeyError(\"['meta1'] not in index\")\n",
"\n",
"\n",
"schema ✅ metadata ❌\n",
"\tcudf ✅\n",
"\tpandas ✅\n",
"\n",
"\n",
"schema ✅ metadata ✅\n",
"\tcudf ✅\n",
"\tpandas ✅\n",
"\n",
"\n"
]
}
],
"source": [
"for bug_name, bug_list in [\n",
" (\"schema ❌ metadata ❌\", inconsistent_schema_inconsistent_metadata),\n",
" (\"schema ❌ metadata ✅\", inconsistent_schema_consistent_metadata),\n",
" (\"schema ✅ metadata ❌\", consistent_schema_inconsistent_metadata),\n",
" (\"schema ✅ metadata ✅\", consistent_schema_consistent_metadata),\n",
"]:\n",
" # Inconsistent Schema \n",
" \n",
" with tempfile.TemporaryDirectory() as tmpdir:\n",
" file1 = os.path.join(tmpdir, \"part.0.parquet\")\n",
" file2 = os.path.join(tmpdir, \"part.1.parquet\")\n",
" \n",
" # Dump files now reload them\n",
" for i, record in enumerate(bug_list):\n",
" # print(f\"\\t{bug_name=} record({i})={record}\")\n",
" pd.DataFrame([record]).to_parquet(\n",
" os.path.join(tmpdir, f\"part.{i}.parquet\"), \n",
" )\n",
"\n",
" print(f\"{bug_name}\")\n",
" for backend in [\"cudf\", \"pandas\"]:\n",
" read_kwargs = dict()\n",
" if backend == \"cudf\":\n",
" read_kwargs[\"allow_mismatched_pq_schemas\"] = True\n",
" \n",
" backend_str = f\"{backend} ❌\"\n",
" try:\n",
" with dask.config.set({\"dataframe.backend\": backend}):\n",
" df = dd.read_parquet(\n",
" [file1, file2],\n",
" **read_kwargs,\n",
" )\n",
" with tempfile.TemporaryDirectory() as tmpdir:\n",
" df.to_parquet(tmpdir)\n",
" backend_str = f\"{backend} ✅\"\n",
" except Exception as e:\n",
" backend_str += f\" due to {e=}\"\n",
" continue\n",
" finally:\n",
" print(f\"\\t{backend_str}\")\n",
" print(\"\\n\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.16"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment