Last active December 19, 2024 21:55
"## DuckDB -> Substrait\n",
"### Setup table in DuckDB"
"from pprint import pprint\n",
"import duckdb\n",
"import json\n",
"con = duckdb.connect()\n",
" unit_price quantity\n",
"0 10 1000\n",
"1 5 2000\n",
"2 3456 100"
"source": [
"con.execute(\"CREATE OR REPLACE TABLE payments (unit_price int, quantity int);\")\n",
"con.execute(\"INSERT INTO payments VALUES (10, 1000), (5,2000), (3456,100);\")\n",
"con.execute(\"SELECT * FROM payments\").fetch_df()"
"source": [
"query = \"select unit_price * quantity as total_cost from payments;\"\n",
"plan = con.get_substrait(query).fetchone()[0]\n",
"{'extensionUris': [{'extensionUriAnchor': 1,\n",
" 'uri': ''}],\n",
" 'extensions': [{'extensionFunction': {'extensionUriReference': 1,\n",
" 'functionAnchor': 1,\n",
" 'name': 'multiply:i32_i32'}}],\n",
" 'relations': [{'root': {'input': {'project': {'expressions': [{'scalarFunction': {'arguments': [{'value': {'selection': {'directReference': {'structField': {}},\n",
" 'rootReference': {}}}},\n",
" {'value': {'selection': {'directReference': {'structField': {'field': 1}},\n",
" 'rootReference': {}}}}],\n",
" 'functionReference': 1,\n",
" 'outputType': {'i32': {'nullability': 'NULLABILITY_NULLABLE'}}}}],\n",
" 'input': {'read': {'baseSchema': {'names': ['unit_price',\n",
" 'quantity'],\n",
" 'struct': {'nullability': 'NULLABILITY_REQUIRED',\n",
" 'types': [{'i32': {'nullability': 'NULLABILITY_NULLABLE'}},\n",
" {'i32': {'nullability': 'NULLABILITY_NULLABLE'}}]}},\n",
" 'namedTable': {'names': ['payments']},\n",
" 'projection': {'maintainSingularStruct': True,\n",
" 'select': {'structItems': [{},\n",
" {'field': 1}]}}}}}},\n",
" 'names': ['total_cost']}}],\n",
" 'version': {'minorNumber': 53, 'producer': 'DuckDB'}}\n"
"source": [
"plan_json = con.get_substrait_json(query).fetchone()[0]\n",
"cell_type": "markdown",
"metadata": {},
"source": [
"## Substrait -> Datafusion\n",
"### Setup"
"### Setup"
"from datafusion import SessionContext\n",
"from datafusion import substrait as ss"
"<table border='1'>\n",
"text/plain": [
"| count |\n",
"| 3 |\n",
"source": [
"q = {\n",
" \"drop\": \"DROP TABLE IF EXISTS payments;\",\n",
" \"create\": \"CREATE OR REPLACE TABLE payments (unit_price int, quantity int);\",\n",
" \"insert\": \"INSERT INTO payments VALUES (10, 1000), (5,2000), (3456,100);\",\n",
" \"select\": \"select * from payments;\",\n",
" \"transform\": \"select unit_price * quantity as total_cost from payments;\",\n",
"## start & connect to Datafusion\n",
"ctx = SessionContext()\n",
"## create the same payments table that we already made in DuckDB\n",
" unit_price quantity\n",
"0 10 1000\n",
"1 5 2000\n",
"2 3456 100\n",
"3 10 1000\n",
"4 5 2000\n",
"5 3456 100"
"source": [
"# here's the table for reference\n",
"# somehow the insert is happening twice -- :shrug: not important\n",
"source": [
"plan_deserialized = ss.Serde.deserialize_bytes(plan)\n",
"<datafusion.substrait.Plan object at 0x11d61fd50>\n"
"source": [
"\u001b[0;31mException\u001b[0m Traceback (most recent call last)",
"Cell \u001b[0;32mIn[35], line 1\u001b[0m\n\u001b[0;32m----> 1\u001b[0m df_logical_plan \u001b[38;5;241m=\u001b[39m \u001b[43mss\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mConsumer\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mfrom_substrait_plan\u001b[49m\u001b[43m(\u001b[49m\u001b[43mctx\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mplan_deserialized\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 2\u001b[0m df_logical_plan\n",
"File \u001b[0;32m~/Developer/query-plan-diff/.venv/lib/python3.11/site-packages/datafusion/\u001b[0m, in \u001b[0;36mConsumer.from_substrait_plan\u001b[0;34m(ctx, plan)\u001b[0m\n\u001b[1;32m 175\u001b[0m \u001b[38;5;129m@staticmethod\u001b[39m\n\u001b[1;32m 176\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mfrom_substrait_plan\u001b[39m(ctx: SessionContext, plan: Plan) \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m>\u001b[39m LogicalPlan:\n\u001b[1;32m 177\u001b[0m \u001b[38;5;250m \u001b[39m\u001b[38;5;124;03m\"\"\"Convert a Substrait plan to a DataFusion LogicalPlan.\u001b[39;00m\n\u001b[1;32m 178\u001b[0m \n\u001b[1;32m 179\u001b[0m \u001b[38;5;124;03m Args:\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 184\u001b[0m \u001b[38;5;124;03m LogicalPlan.\u001b[39;00m\n\u001b[1;32m 185\u001b[0m \u001b[38;5;124;03m \"\"\"\u001b[39;00m\n\u001b[1;32m 186\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m LogicalPlan(\n\u001b[0;32m--> 187\u001b[0m \u001b[43msubstrait_internal\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mConsumer\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mfrom_substrait_plan\u001b[49m\u001b[43m(\u001b[49m\u001b[43mctx\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mctx\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mplan\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mplan_internal\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 188\u001b[0m )\n",
"\u001b[0;31mException\u001b[0m: DataFusion error: Substrait(\"Named schema must contain names for all fields\")"
"source": [
"df_logical_plan = ss.Consumer.from_substrait_plan(ctx, plan_deserialized)\n",
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df = ctx.create_dataframe_from_logical_plan(df_logical_plan).to_pandas()\n",
