Created
May 17, 2022 15:05
-
-
Save ian-r-rose/ab533b819db61f9220cc15f19f09de07 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"id": "5ca5061d-4d62-4212-8d33-efd64655c3a0", | |
"metadata": { | |
"tags": [] | |
}, | |
"source": [ | |
"# Parquet Performance Comparisons\n", | |
"\n", | |
"In March 2022 we kicked off an effort to improve the parquet user expreience in Dask.\n", | |
"This notebook is intended to measure how we did, both in terms of improvements to default parameters and in performance." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"id": "2af778b2-2a1a-4d8f-baae-56d2d54640d2", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"'2022.05.0'" | |
] | |
}, | |
"execution_count": 1, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"import contextlib\n", | |
"import time\n", | |
"\n", | |
"import coiled\n", | |
"import dask\n", | |
"import dask.dataframe as dd\n", | |
"import distributed\n", | |
"import pandas\n", | |
"import s3fs\n", | |
"from dask.datasets import timeseries\n", | |
"\n", | |
"dask.__version__" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "b474bd73-f8e9-4d16-a99f-010894a2e630", | |
"metadata": { | |
"tags": [] | |
}, | |
"source": [ | |
"## Utilities and setup\n", | |
"\n", | |
"Here we create two utilities: one for getting timing information of task groups from the scheduler (roughly speaking, thread-seconds), and one for measuring the wall clock time from the client." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"id": "b89843d6-fb86-4e09-b2fa-71b407b0dbd0", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from distributed.diagnostics import SchedulerPlugin\n", | |
"from distributed.utils import key_split, key_split_group\n", | |
"\n", | |
"class TaskGroupStatistics(SchedulerPlugin):\n", | |
" \"\"\"\n", | |
" A plugin for collecting task group timing information\n", | |
" from the scheduler.\n", | |
" \"\"\"\n", | |
" def __init__(self):\n", | |
" \"\"\"Initialize the plugin\"\"\"\n", | |
" self.groups = {}\n", | |
" self.scheduler = None\n", | |
"\n", | |
" def start(self, scheduler):\n", | |
" \"\"\"Called on scheduler start as well as on registration time\"\"\"\n", | |
" self.scheduler = scheduler\n", | |
" scheduler.handlers[\"get_task_groups\"] = self.get_task_groups\n", | |
"\n", | |
" def transition(self, key, start, finish, *args, **kwargs):\n", | |
" \"\"\"On key transition to memory, update the task group data\"\"\"\n", | |
" if self.scheduler is None:\n", | |
" # Should not get here if initialization has happened correctly\n", | |
" return\n", | |
"\n", | |
" if start == \"processing\" and finish == \"memory\":\n", | |
" prefix_name = key_split(key)\n", | |
" group_name = key_split_group(key)\n", | |
"\n", | |
" if group_name not in self.groups:\n", | |
" self.groups[group_name] = {}\n", | |
"\n", | |
" group = self.scheduler.task_groups[group_name]\n", | |
" self.groups[group_name][\"prefix\"] = prefix_name\n", | |
" self.groups[group_name][\"duration\"] = group.duration\n", | |
" self.groups[group_name][\"start\"] = str(\n", | |
" datetime.datetime.fromtimestamp(group.start)\n", | |
" )\n", | |
" self.groups[group_name][\"stop\"] = str(\n", | |
" datetime.datetime.fromtimestamp(group.stop)\n", | |
" )\n", | |
" self.groups[group_name][\"nbytes\"] = group.nbytes_total\n", | |
"\n", | |
" async def get_task_groups(self, comm):\n", | |
" return self.groups\n", | |
"\n", | |
" def restart(self, scheduler):\n", | |
" self.groups = {}\n", | |
"\n", | |
"def get_tg_data(client):\n", | |
" tg_data = client.sync(client.scheduler.get_task_groups)\n", | |
"\n", | |
" df = pandas.DataFrame.from_dict(tg_data, orient=\"index\")\n", | |
" df.index.name = \"group\"\n", | |
" return df" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"id": "780290e6-2626-47b5-aed3-7aca17581fe6", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from IPython.display import Markdown\n", | |
"\n", | |
"@contextlib.contextmanager\n", | |
"def timer(label=\"Block\"):\n", | |
" \"\"\"\n", | |
" Time a block of code and print out the result when done.\n", | |
" \"\"\"\n", | |
" start = time.time()\n", | |
" yield\n", | |
" end = time.time()\n", | |
" display(Markdown(f\"**{label}** took **{end-start:.2f}** seconds\"))" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "82c7e406-dd6a-4142-a2fa-ba843b3799a5", | |
"metadata": {}, | |
"source": [ | |
"## Software environments\n", | |
"\n", | |
"Create two software environments, one from February, and one from May." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"id": "f5075232-55cb-44a9-a74f-bff37eaff538", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Found existing software environment build, returning\n", | |
"Found existing software environment build, returning\n" | |
] | |
} | |
], | |
"source": [ | |
"old=\"2022.2.1\"\n", | |
"new=\"2022.5.0\"\n", | |
"\n", | |
"coiled.create_software_environment(\n", | |
" f\"parquet-{old.replace('.', '-')}\",\n", | |
" conda={\n", | |
" \"channels\": [\"conda-forge\"],\n", | |
" \"dependencies\": [\n", | |
" \"python=3.9\",\n", | |
" f\"dask={old}\",\n", | |
" f\"distributed={old}\",\n", | |
" \"s3fs\",\n", | |
" \"pyarrow=7\",\n", | |
" \"fastparquet=0.8.0\",\n", | |
" ],\n", | |
" }\n", | |
")\n", | |
"\n", | |
"coiled.create_software_environment(\n", | |
" f\"parquet-{new.replace('.', '-')}\",\n", | |
" conda={\n", | |
" \"channels\": [\"conda-forge\"],\n", | |
" \"dependencies\": [\n", | |
" \"python=3.9\",\n", | |
" f\"dask=={new}\",\n", | |
" f\"distributed=={new}\",\n", | |
" \"s3fs\",\n", | |
" \"pyarrow=7\",\n", | |
" \"fastparquet=0.8.1\",\n", | |
" ],\n", | |
" },\n", | |
")\n", | |
"kind = new" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "0be911c1-4ad3-4ecd-920f-d83eb3715d84", | |
"metadata": { | |
"tags": [] | |
}, | |
"source": [ | |
"## Test #1: naive data read\n", | |
"\n", | |
"Let's read a single year of NYC taxi parquet data from Ursa labs with no changes to default parameters (except to specify that the engine is `pyarrow` so we can make apples-to-apples comparisons)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"id": "a07a30f6-0c8a-4e96-9f49-125f31bd8423", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"application/vnd.jupyter.widget-view+json": { | |
"model_id": "00708a6d970c430ca12e33a40d24cd3c", | |
"version_major": 2, | |
"version_minor": 0 | |
}, | |
"text/plain": [ | |
"Output()" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
}, | |
{ | |
"data": { | |
"text/html": [ | |
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\"></pre>\n" | |
], | |
"text/plain": [] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
}, | |
{ | |
"data": { | |
"text/markdown": [ | |
"**Naive read** took **25.21** seconds" | |
], | |
"text/plain": [ | |
"<IPython.core.display.Markdown object>" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
}, | |
{ | |
"data": { | |
"text/markdown": [ | |
"### Task Group timing" | |
], | |
"text/plain": [ | |
"<IPython.core.display.Markdown object>" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
}, | |
{ | |
"data": { | |
"text/html": [ | |
"<div>\n", | |
"<style scoped>\n", | |
" .dataframe tbody tr th:only-of-type {\n", | |
" vertical-align: middle;\n", | |
" }\n", | |
"\n", | |
" .dataframe tbody tr th {\n", | |
" vertical-align: top;\n", | |
" }\n", | |
"\n", | |
" .dataframe thead th {\n", | |
" text-align: right;\n", | |
" }\n", | |
"</style>\n", | |
"<table border=\"1\" class=\"dataframe\">\n", | |
" <thead>\n", | |
" <tr style=\"text-align: right;\">\n", | |
" <th></th>\n", | |
" <th>prefix</th>\n", | |
" <th>duration</th>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>group</th>\n", | |
" <th></th>\n", | |
" <th></th>\n", | |
" </tr>\n", | |
" </thead>\n", | |
" <tbody>\n", | |
" <tr>\n", | |
" <th>read-parquet-9e269c22827660b0d75e012b32ce933f</th>\n", | |
" <td>read-parquet</td>\n", | |
" <td>210.45336</td>\n", | |
" </tr>\n", | |
" </tbody>\n", | |
"</table>\n", | |
"</div>" | |
], | |
"text/plain": [ | |
" prefix duration\n", | |
"group \n", | |
"read-parquet-9e269c22827660b0d75e012b32ce933f read-parquet 210.45336" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"# Create a separate cluster for ursa data in a the same region\n", | |
"cluster = coiled.Cluster(\n", | |
" name=f\"parquet-{kind.replace('.', '-')}\",\n", | |
" software=f\"parquet-{kind.replace('.', '-')}\",\n", | |
" n_workers=12,\n", | |
" worker_vm_types=[\"t3.xlarge\"],\n", | |
" scheduler_vm_types=[\"t3.large\"],\n", | |
" backend_options={\"region\": \"us-east-2\"},\n", | |
")\n", | |
" \n", | |
"client = distributed.Client(cluster)\n", | |
"client.register_scheduler_plugin(TaskGroupStatistics())\n", | |
"\n", | |
"# Note: reads from `s3://ursa-labs-taxi-data` seem to be particularly slow!\n", | |
"# Other buckets don't seem to have that feature (including ones with what are\n", | |
"# notionally the same data). I don't understand why right now.\n", | |
"with timer(\"Naive read\"):\n", | |
" ddf = dd.read_parquet(\"s3://ursa-labs-taxi-data/2012/**.parquet\", engine=\"pyarrow\")\n", | |
"\n", | |
" ddf = ddf.persist()\n", | |
" distributed.wait(ddf)\n", | |
"\n", | |
"display(Markdown(\"### Task Group timing\"))\n", | |
"display(get_tg_data(client))\n", | |
"\n", | |
"client.close()\n", | |
"cluster.close()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "951d679f-0b7c-411a-ba3e-34159cff11ec", | |
"metadata": { | |
"tags": [] | |
}, | |
"source": [ | |
"## Create a new shared cluster\n", | |
"\n", | |
"We create it in `us-east-1` to have fast communication with our test s3 bucket `s3://dask-io`." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"id": "c1519363-da48-48ba-aba6-197a8ec0b00b", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/markdown": [ | |
"Running with **Dask==2022.5.0**" | |
], | |
"text/plain": [ | |
"<IPython.core.display.Markdown object>" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
}, | |
{ | |
"data": { | |
"application/vnd.jupyter.widget-view+json": { | |
"model_id": "f7ad6bdc55094feebf8002ca2b724ab3", | |
"version_major": 2, | |
"version_minor": 0 | |
}, | |
"text/plain": [ | |
"Output()" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
}, | |
{ | |
"data": { | |
"text/html": [ | |
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\"></pre>\n" | |
], | |
"text/plain": [] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"display(Markdown(f\"Running with **Dask=={kind}**\"))\n", | |
"\n", | |
"cluster = coiled.Cluster(\n", | |
" name=f\"parquet-{kind.replace('.', '-')}\",\n", | |
" software=f\"parquet-{kind.replace('.', '-')}\",\n", | |
" n_workers=25,\n", | |
" worker_vm_types=[\"m5.2xlarge\"],\n", | |
" scheduler_vm_types=[\"t3.large\"],\n", | |
" backend_options={\"region\": \"us-east-1\"},\n", | |
")\n", | |
" \n", | |
"client = distributed.Client(cluster)\n", | |
"client.register_scheduler_plugin(TaskGroupStatistics())" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "560617ac-ed8d-4fa6-88d4-7ff8b213d4b9", | |
"metadata": {}, | |
"source": [ | |
"## Test #2: naive data write\n", | |
"\n", | |
"Let's write a ~200 GB, 700 partition dataset with 100 columns. In older Dask versions the metadata writing will knock down workers, resulting in a frustrating failure at the end of the computation." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"id": "c825ae34-7a90-473f-b3f6-81defaf2192c", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/markdown": [ | |
"**Naive write 200 GB** took **191.12** seconds" | |
], | |
"text/plain": [ | |
"<IPython.core.display.Markdown object>" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"with timer(\"Naive write 200 GB\"):\n", | |
" ddf = timeseries(\n", | |
" dtypes={\n", | |
" **{f\"name-{i}\": str for i in range(25)},\n", | |
" **{f\"price-{i}\": float for i in range(25)},\n", | |
" **{f\"id-{i}\": int for i in range(25)},\n", | |
" **{f\"cat-{i}\": \"category\" for i in range(25)},\n", | |
" },\n", | |
" start=\"2021-01-01\",\n", | |
" end=\"2021-02-01\",\n", | |
" freq=\"10ms\",\n", | |
" partition_freq=\"1H\",\n", | |
" )\n", | |
" ddf.to_parquet(\"s3://dask-io/parquet-performance-200GB/\")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 8, | |
"id": "441d824c-399e-4ac0-85de-9cd98ee643d6", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/markdown": [ | |
"### Task Group timing" | |
], | |
"text/plain": [ | |
"<IPython.core.display.Markdown object>" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
}, | |
{ | |
"data": { | |
"text/html": [ | |
"<div>\n", | |
"<style scoped>\n", | |
" .dataframe tbody tr th:only-of-type {\n", | |
" vertical-align: middle;\n", | |
" }\n", | |
"\n", | |
" .dataframe tbody tr th {\n", | |
" vertical-align: top;\n", | |
" }\n", | |
"\n", | |
" .dataframe thead th {\n", | |
" text-align: right;\n", | |
" }\n", | |
"</style>\n", | |
"<table border=\"1\" class=\"dataframe\">\n", | |
" <thead>\n", | |
" <tr style=\"text-align: right;\">\n", | |
" <th></th>\n", | |
" <th>prefix</th>\n", | |
" <th>duration</th>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>group</th>\n", | |
" <th></th>\n", | |
" <th></th>\n", | |
" </tr>\n", | |
" </thead>\n", | |
" <tbody>\n", | |
" <tr>\n", | |
" <th>to-parquet-f87fb14ac1735d2fb4686061862f4475</th>\n", | |
" <td>to-parquet</td>\n", | |
" <td>34057.172713</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>store-to-parquet-f87fb14ac1735d2fb4686061862f4475</th>\n", | |
" <td>store-to-parquet</td>\n", | |
" <td>1.342747</td>\n", | |
" </tr>\n", | |
" </tbody>\n", | |
"</table>\n", | |
"</div>" | |
], | |
"text/plain": [ | |
" prefix \\\n", | |
"group \n", | |
"to-parquet-f87fb14ac1735d2fb4686061862f4475 to-parquet \n", | |
"store-to-parquet-f87fb14ac1735d2fb4686061862f4475 store-to-parquet \n", | |
"\n", | |
" duration \n", | |
"group \n", | |
"to-parquet-f87fb14ac1735d2fb4686061862f4475 34057.172713 \n", | |
"store-to-parquet-f87fb14ac1735d2fb4686061862f4475 1.342747 " | |
] | |
}, | |
"execution_count": 8, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"# Cleanup\n", | |
"try:\n", | |
" fs = s3fs.S3FileSystem()\n", | |
" fs.rm(\"s3://dask-io/parquet-performance-200GB/\", recursive=True)\n", | |
"except:\n", | |
" pass\n", | |
"\n", | |
"display(Markdown(\"### Task Group timing\"))\n", | |
"get_tg_data(client)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "7af0dc12-c01d-4b71-a8a4-052d18cb3fe4", | |
"metadata": {}, | |
"source": [ | |
"## Test #3: writing terabyte scale data\n", | |
"\n", | |
"Let's go bigger, writing ~2TB of data with ~800 partitions, this time specifying `write_metadata_file=False`, which is the default in newer versions." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 9, | |
"id": "d36a1ca5-c1b8-4e9a-87ea-f382b78ee7a5", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/markdown": [ | |
"**Writing 2TB of data** took **939.72** seconds" | |
], | |
"text/plain": [ | |
"<IPython.core.display.Markdown object>" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"client.restart()\n", | |
"\n", | |
"ddf = timeseries(\n", | |
" dtypes={\n", | |
" **{f\"name-{i}\": str for i in range(25)},\n", | |
" **{f\"price-{i}\": float for i in range(25)},\n", | |
" **{f\"id-{i}\": int for i in range(25)},\n", | |
" **{f\"cat-{i}\": \"category\" for i in range(25)},\n", | |
" },\n", | |
" start=\"2021-01-01\",\n", | |
" end=\"2022-01-01\",\n", | |
" freq=\"10ms\",\n", | |
" partition_freq=\"1H\",\n", | |
")\n", | |
"\n", | |
"with timer(\"Writing 2TB of data\"):\n", | |
" s = ddf.to_parquet(\n", | |
" \"s3://dask-io/parquet-performance-2TB/\",\n", | |
" engine=\"pyarrow\",\n", | |
" write_metadata_file=False,\n", | |
" )" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 10, | |
"id": "7febef9b-e228-41aa-a034-8f065af32942", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/markdown": [ | |
"### Task Group timing" | |
], | |
"text/plain": [ | |
"<IPython.core.display.Markdown object>" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
}, | |
{ | |
"data": { | |
"text/html": [ | |
"<div>\n", | |
"<style scoped>\n", | |
" .dataframe tbody tr th:only-of-type {\n", | |
" vertical-align: middle;\n", | |
" }\n", | |
"\n", | |
" .dataframe tbody tr th {\n", | |
" vertical-align: top;\n", | |
" }\n", | |
"\n", | |
" .dataframe thead th {\n", | |
" text-align: right;\n", | |
" }\n", | |
"</style>\n", | |
"<table border=\"1\" class=\"dataframe\">\n", | |
" <thead>\n", | |
" <tr style=\"text-align: right;\">\n", | |
" <th></th>\n", | |
" <th>prefix</th>\n", | |
" <th>duration</th>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>group</th>\n", | |
" <th></th>\n", | |
" <th></th>\n", | |
" </tr>\n", | |
" </thead>\n", | |
" <tbody>\n", | |
" <tr>\n", | |
" <th>to-parquet-1f1a94537674901f856fda0c3878fb65</th>\n", | |
" <td>to-parquet</td>\n", | |
" <td>182482.985309</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>store-to-parquet-1f1a94537674901f856fda0c3878fb65</th>\n", | |
" <td>store-to-parquet</td>\n", | |
" <td>7.688994</td>\n", | |
" </tr>\n", | |
" </tbody>\n", | |
"</table>\n", | |
"</div>" | |
], | |
"text/plain": [ | |
" prefix \\\n", | |
"group \n", | |
"to-parquet-1f1a94537674901f856fda0c3878fb65 to-parquet \n", | |
"store-to-parquet-1f1a94537674901f856fda0c3878fb65 store-to-parquet \n", | |
"\n", | |
" duration \n", | |
"group \n", | |
"to-parquet-1f1a94537674901f856fda0c3878fb65 182482.985309 \n", | |
"store-to-parquet-1f1a94537674901f856fda0c3878fb65 7.688994 " | |
] | |
}, | |
"execution_count": 10, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"display(Markdown(\"### Task Group timing\"))\n", | |
"get_tg_data(client)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "9148e115-0b4b-4553-ae2c-e0e7cc5f7d5d", | |
"metadata": {}, | |
"source": [ | |
"## Test #4, an ETL workflow on our previous data" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 11, | |
"id": "088541d7-7837-41f9-ab16-29d0d7f95320", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"def transform(df):\n", | |
" \"\"\"\n", | |
" T the data!\n", | |
" \"\"\"\n", | |
" transform = {}\n", | |
" for c in df.columns:\n", | |
" dtype = str(df[c].dtype)\n", | |
" if dtype == \"object\":\n", | |
" transform[c] = df[c].str.upper()\n", | |
" elif dtype == \"int64\":\n", | |
" transform[c] = df[c] * 2\n", | |
" elif dtype == \"float64\":\n", | |
" transform[c] = df[c] / 2\n", | |
" return df.assign(**transform)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 12, | |
"id": "235263ec-d809-4154-ac7f-be8fbbf4255a", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/markdown": [ | |
"**ETL 2TB** took **812.56** seconds" | |
], | |
"text/plain": [ | |
"<IPython.core.display.Markdown object>" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"client.restart()\n", | |
"\n", | |
"with timer(\"ETL 2TB\"):\n", | |
" ddf2 = dd.read_parquet(\"s3://dask-io/parquet-performance-2TB/\", engine=\"pyarrow\")\n", | |
" # This transform does pretty badly! Probably GIL related, and may be fixed\n", | |
" # by pyarrow strings.\n", | |
" #ddf3 = ddf2.map_partitions(transform, meta=ddf2._meta)\n", | |
" ddf3 = ddf2.assign(date=pandas.Timestamp.now())\n", | |
" ddf3.to_parquet(\n", | |
" \"s3://dask-io/parquet-performance-2TB-transform/\",\n", | |
" engine=\"pyarrow\",\n", | |
" write_metadata_file=False\n", | |
" )" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 13, | |
"id": "3c841703-f101-4cbf-b694-2c259aded2bf", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/markdown": [ | |
"### Task Group timing" | |
], | |
"text/plain": [ | |
"<IPython.core.display.Markdown object>" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
}, | |
{ | |
"data": { | |
"text/html": [ | |
"<div>\n", | |
"<style scoped>\n", | |
" .dataframe tbody tr th:only-of-type {\n", | |
" vertical-align: middle;\n", | |
" }\n", | |
"\n", | |
" .dataframe tbody tr th {\n", | |
" vertical-align: top;\n", | |
" }\n", | |
"\n", | |
" .dataframe thead th {\n", | |
" text-align: right;\n", | |
" }\n", | |
"</style>\n", | |
"<table border=\"1\" class=\"dataframe\">\n", | |
" <thead>\n", | |
" <tr style=\"text-align: right;\">\n", | |
" <th></th>\n", | |
" <th>prefix</th>\n", | |
" <th>duration</th>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>group</th>\n", | |
" <th></th>\n", | |
" <th></th>\n", | |
" </tr>\n", | |
" </thead>\n", | |
" <tbody>\n", | |
" <tr>\n", | |
" <th>to-parquet-a6475820fa72d24303cd6b6ed83ff9d7</th>\n", | |
" <td>to-parquet</td>\n", | |
" <td>155695.348040</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>store-to-parquet-a6475820fa72d24303cd6b6ed83ff9d7</th>\n", | |
" <td>store-to-parquet</td>\n", | |
" <td>3.830255</td>\n", | |
" </tr>\n", | |
" </tbody>\n", | |
"</table>\n", | |
"</div>" | |
], | |
"text/plain": [ | |
" prefix \\\n", | |
"group \n", | |
"to-parquet-a6475820fa72d24303cd6b6ed83ff9d7 to-parquet \n", | |
"store-to-parquet-a6475820fa72d24303cd6b6ed83ff9d7 store-to-parquet \n", | |
"\n", | |
" duration \n", | |
"group \n", | |
"to-parquet-a6475820fa72d24303cd6b6ed83ff9d7 155695.348040 \n", | |
"store-to-parquet-a6475820fa72d24303cd6b6ed83ff9d7 3.830255 " | |
] | |
}, | |
"execution_count": 13, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"display(Markdown(\"### Task Group timing\"))\n", | |
"get_tg_data(client)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 14, | |
"id": "011c2e61-d6b2-48f9-9559-075fcdfde653", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# Cleanup\n", | |
"try:\n", | |
" fs = s3fs.S3FileSystem()\n", | |
" fs.rm(\"s3://dask-io/parquet-performance-2TB/\", recursive=True)\n", | |
" fs.rm(\"s3://dask-io/parquet-performance-2TB-transform/\", recursive=True)\n", | |
"except:\n", | |
" pass\n", | |
"\n", | |
"client.close()\n", | |
"cluster.close()" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3 (ipykernel)", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.9.12" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 5 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment