Skip to content

Instantly share code, notes, and snippets.

@dataders
Last active December 19, 2024 21:55
Show Gist options
  • Save dataders/ef982a2e6eae5d125705717fcce80111 to your computer and use it in GitHub Desktop.
Save dataders/ef982a2e6eae5d125705717fcce80111 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## DuckDB -> Substrait\n",
"\n",
"### Setup table in DuckDB"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [],
"source": [
"from pprint import pprint\n",
"import duckdb\n",
"import json\n",
"\n",
"con = duckdb.connect()\n",
"con.install_extension(\"substrait\")\n",
"con.load_extension(\"substrait\")"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>unit_price</th>\n",
" <th>quantity</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>10</td>\n",
" <td>1000</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>5</td>\n",
" <td>2000</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>3456</td>\n",
" <td>100</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" unit_price quantity\n",
"0 10 1000\n",
"1 5 2000\n",
"2 3456 100"
]
},
"execution_count": 26,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"con.execute(\"CREATE OR REPLACE TABLE payments (unit_price int, quantity int);\")\n",
"con.execute(\"INSERT INTO payments VALUES (10, 1000), (5,2000), (3456,100);\")\n",
"\n",
"con.execute(\"SELECT * FROM payments\").fetch_df()"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"b'\\n\\\\\\x08\\x01\\x12Xhttps://github.com/substrait-io/substrait/blob/main/extensions/functions_arithmetic.yaml\\x12\\x18\\x1a\\x16\\x08\\x01\\x10\\x01\\x1a\\x10multiply:i32_i32\\x1a|\\x12z\\nl:j\\x12B\\n@\\x12&\\n\\nunit_price\\n\\x08quantity\\x12\\x0e\\n\\x04*\\x02\\x10\\x01\\n\\x04*\\x02\\x10\\x01\\x18\\x02\"\\n\\n\\x06\\n\\x00\\n\\x02\\x08\\x01\\x10\\x01:\\n\\n\\x08payments\\x1a$\\x1a\"\\x08\\x01\\x1a\\x04*\\x02\\x10\\x01\"\\n\\x1a\\x08\\x12\\x06\\n\\x02\\x12\\x00\"\\x00\"\\x0c\\x1a\\n\\x12\\x08\\n\\x04\\x12\\x02\\x08\\x01\"\\x00\\x12\\ntotal_cost2\\n\\x105*\\x06DuckDB'"
]
},
"execution_count": 27,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"query = \"select unit_price * quantity as total_cost from payments;\"\n",
"\n",
"plan = con.get_substrait(query).fetchone()[0]\n",
"plan"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'extensionUris': [{'extensionUriAnchor': 1,\n",
" 'uri': 'https://github.com/substrait-io/substrait/blob/main/extensions/functions_arithmetic.yaml'}],\n",
" 'extensions': [{'extensionFunction': {'extensionUriReference': 1,\n",
" 'functionAnchor': 1,\n",
" 'name': 'multiply:i32_i32'}}],\n",
" 'relations': [{'root': {'input': {'project': {'expressions': [{'scalarFunction': {'arguments': [{'value': {'selection': {'directReference': {'structField': {}},\n",
" 'rootReference': {}}}},\n",
" {'value': {'selection': {'directReference': {'structField': {'field': 1}},\n",
" 'rootReference': {}}}}],\n",
" 'functionReference': 1,\n",
" 'outputType': {'i32': {'nullability': 'NULLABILITY_NULLABLE'}}}}],\n",
" 'input': {'read': {'baseSchema': {'names': ['unit_price',\n",
" 'quantity'],\n",
" 'struct': {'nullability': 'NULLABILITY_REQUIRED',\n",
" 'types': [{'i32': {'nullability': 'NULLABILITY_NULLABLE'}},\n",
" {'i32': {'nullability': 'NULLABILITY_NULLABLE'}}]}},\n",
" 'namedTable': {'names': ['payments']},\n",
" 'projection': {'maintainSingularStruct': True,\n",
" 'select': {'structItems': [{},\n",
" {'field': 1}]}}}}}},\n",
" 'names': ['total_cost']}}],\n",
" 'version': {'minorNumber': 53, 'producer': 'DuckDB'}}\n"
]
}
],
"source": [
"plan_json = con.get_substrait_json(query).fetchone()[0]\n",
"\n",
"pprint(json.loads(plan_json))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Substrait -> Datafusion\n",
"\n",
"### Setup"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [],
"source": [
"from datafusion import SessionContext\n",
"from datafusion import substrait as ss"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table border='1'>\n",
"<tr><th>count</td></tr>\n",
"<tr><td>3</td></tr>\n",
"</table>\n"
],
"text/plain": [
"DataFrame()\n",
"+-------+\n",
"| count |\n",
"+-------+\n",
"| 3 |\n",
"+-------+"
]
},
"execution_count": 30,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"q = {\n",
" \"drop\": \"DROP TABLE IF EXISTS payments;\",\n",
" \"create\": \"CREATE OR REPLACE TABLE payments (unit_price int, quantity int);\",\n",
" \"insert\": \"INSERT INTO payments VALUES (10, 1000), (5,2000), (3456,100);\",\n",
" \"select\": \"select * from payments;\",\n",
" \"transform\": \"select unit_price * quantity as total_cost from payments;\",\n",
"}\n",
"\n",
"## start & connect to Datafusion\n",
"ctx = SessionContext()\n",
"## create the same payments table that we already made in DuckDB\n",
"ctx.sql(q[\"drop\"])\n",
"ctx.sql(q[\"create\"])\n",
"ctx.sql(q[\"insert\"])"
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>unit_price</th>\n",
" <th>quantity</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>10</td>\n",
" <td>1000</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>5</td>\n",
" <td>2000</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>3456</td>\n",
" <td>100</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>10</td>\n",
" <td>1000</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>5</td>\n",
" <td>2000</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>3456</td>\n",
" <td>100</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" unit_price quantity\n",
"0 10 1000\n",
"1 5 2000\n",
"2 3456 100\n",
"3 10 1000\n",
"4 5 2000\n",
"5 3456 100"
]
},
"execution_count": 31,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# here's the table for reference\n",
"# somehow the insert is happening twice -- :shrug: not important\n",
"ctx.sql(q[\"select\"]).to_pandas()"
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<datafusion.substrait.Plan at 0x11d61fd50>"
]
},
"execution_count": 34,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"plan_deserialized = ss.Serde.deserialize_bytes(plan)\n",
"plan_deserialized"
]
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"<datafusion.substrait.Plan object at 0x11d61fd50>\n"
]
}
],
"source": [
"print(plan_deserialized.encode())"
]
},
{
"cell_type": "code",
"execution_count": 35,
"metadata": {},
"outputs": [
{
"ename": "Exception",
"evalue": "DataFusion error: Substrait(\"Named schema must contain names for all fields\")",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mException\u001b[0m Traceback (most recent call last)",
"Cell \u001b[0;32mIn[35], line 1\u001b[0m\n\u001b[0;32m----> 1\u001b[0m df_logical_plan \u001b[38;5;241m=\u001b[39m \u001b[43mss\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mConsumer\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mfrom_substrait_plan\u001b[49m\u001b[43m(\u001b[49m\u001b[43mctx\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mplan_deserialized\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 2\u001b[0m df_logical_plan\n",
"File \u001b[0;32m~/Developer/query-plan-diff/.venv/lib/python3.11/site-packages/datafusion/substrait.py:187\u001b[0m, in \u001b[0;36mConsumer.from_substrait_plan\u001b[0;34m(ctx, plan)\u001b[0m\n\u001b[1;32m 175\u001b[0m \u001b[38;5;129m@staticmethod\u001b[39m\n\u001b[1;32m 176\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mfrom_substrait_plan\u001b[39m(ctx: SessionContext, plan: Plan) \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m>\u001b[39m LogicalPlan:\n\u001b[1;32m 177\u001b[0m \u001b[38;5;250m \u001b[39m\u001b[38;5;124;03m\"\"\"Convert a Substrait plan to a DataFusion LogicalPlan.\u001b[39;00m\n\u001b[1;32m 178\u001b[0m \n\u001b[1;32m 179\u001b[0m \u001b[38;5;124;03m Args:\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 184\u001b[0m \u001b[38;5;124;03m LogicalPlan.\u001b[39;00m\n\u001b[1;32m 185\u001b[0m \u001b[38;5;124;03m \"\"\"\u001b[39;00m\n\u001b[1;32m 186\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m LogicalPlan(\n\u001b[0;32m--> 187\u001b[0m \u001b[43msubstrait_internal\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mConsumer\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mfrom_substrait_plan\u001b[49m\u001b[43m(\u001b[49m\u001b[43mctx\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mctx\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mplan\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mplan_internal\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 188\u001b[0m )\n",
"\u001b[0;31mException\u001b[0m: DataFusion error: Substrait(\"Named schema must contain names for all fields\")"
]
}
],
"source": [
"df_logical_plan = ss.Consumer.from_substrait_plan(ctx, plan_deserialized)\n",
"df_logical_plan"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df = ctx.create_dataframe_from_logical_plan(df_logical_plan).to_pandas()\n",
"df"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.10"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
{
"extensionUris": [
{
"extensionUriAnchor": 1,
"uri": "https://github.com/substrait-io/substrait/blob/main/extensions/functions_arithmetic.yaml"
}
],
"extensions": [
{
"extensionFunction": {
"extensionUriReference": 1,
"functionAnchor": 1,
"name": "multiply:i32_i32"
}
}
],
"relations": [
{
"root": {
"input": {
"project": {
"input": {
"read": {
"baseSchema": {
"names": [
"unit_price",
"quantity"
],
"struct": {
"types": [
{
"i32": {
"nullability": "NULLABILITY_NULLABLE"
}
},
{
"i32": {
"nullability": "NULLABILITY_NULLABLE"
}
}
],
"nullability": "NULLABILITY_REQUIRED"
}
},
"projection": {
"select": {
"structItems": [
{},
{
"field": 1
}
]
},
"maintainSingularStruct": true
},
"namedTable": {
"names": [
"payments"
]
}
}
},
"expressions": [
{
"scalarFunction": {
"functionReference": 1,
"outputType": {
"i32": {
"nullability": "NULLABILITY_NULLABLE"
}
},
"arguments": [
{
"value": {
"selection": {
"directReference": {
"structField": {}
},
"rootReference": {}
}
}
},
{
"value": {
"selection": {
"directReference": {
"structField": {
"field": 1
}
},
"rootReference": {}
}
}
}
]
}
}
]
}
},
"names": [
"total_cost"
]
}
}
],
"version": {
"minorNumber": 53,
"producer": "DuckDB"
}
}
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment