Last active
April 8, 2022 22:23
-
-
Save ian-r-rose/35d51426e5f6ee5519c9f86e360bcd76 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"id": "6664ef8e-7fa4-4c30-bea6-60a92100a019", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import contextlib\n", | |
"import datetime\n", | |
"import os\n", | |
"import time\n", | |
"\n", | |
"import dask\n", | |
"import dask.dataframe as dd\n", | |
"import distributed\n", | |
"import coiled\n", | |
"import pandas\n", | |
"import s3fs" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"id": "d4f18719-b156-47ef-94e3-ccb87433664c", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from distributed.diagnostics import SchedulerPlugin\n", | |
"from distributed.utils import key_split, key_split_group\n", | |
"\n", | |
"class TaskGroupStatistics(SchedulerPlugin):\n", | |
" def __init__(self):\n", | |
" \"\"\"Initialize the plugin\"\"\"\n", | |
" self.groups = {}\n", | |
" self.scheduler = None\n", | |
"\n", | |
" def start(self, scheduler):\n", | |
" \"\"\"Called on scheduler start as well as on registration time\"\"\"\n", | |
" self.scheduler = scheduler\n", | |
" scheduler.handlers[\"get_task_groups\"] = self.get_task_groups\n", | |
"\n", | |
" def transition(self, key, start, finish, *args, **kwargs):\n", | |
" \"\"\"On key transition to memory, update the task group data\"\"\"\n", | |
" if self.scheduler is None:\n", | |
" # Should not get here if initialization has happened correctly\n", | |
" return\n", | |
"\n", | |
" if start == \"processing\" and finish == \"memory\":\n", | |
" prefix_name = key_split(key)\n", | |
" group_name = key_split_group(key)\n", | |
"\n", | |
" if group_name not in self.groups:\n", | |
" self.groups[group_name] = {}\n", | |
"\n", | |
" group = self.scheduler.task_groups[group_name]\n", | |
" self.groups[group_name][\"prefix\"] = prefix_name\n", | |
" self.groups[group_name][\"duration\"] = group.duration\n", | |
" self.groups[group_name][\"start\"] = str(\n", | |
" datetime.datetime.fromtimestamp(group.start)\n", | |
" )\n", | |
" self.groups[group_name][\"stop\"] = str(\n", | |
" datetime.datetime.fromtimestamp(group.stop)\n", | |
" )\n", | |
" self.groups[group_name][\"nbytes\"] = group.nbytes_total\n", | |
"\n", | |
" async def get_task_groups(self, comm):\n", | |
" return self.groups\n", | |
"\n", | |
" def restart(self, scheduler):\n", | |
" self.groups = {}" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"id": "c47a34a1-bb5a-4810-8f8f-118c60e697b9", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"@contextlib.contextmanager\n", | |
"def timer(label=\"Block\"):\n", | |
" start = time.time()\n", | |
" yield\n", | |
" end = time.time()\n", | |
" print(f\"{label} took {end-start} seconds\")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"id": "40fec172-8a43-42a1-bc12-20fd1eac4116", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"cluster = coiled.Cluster(\n", | |
" name=\"network\",\n", | |
" n_workers=12,\n", | |
" scheduler_vm_types=[\"t3.medium\"],\n", | |
" worker_vm_types=[\"t3.xlarge\"],\n", | |
" backend_options={\"region\": \"us-east-2\"},\n", | |
")\n", | |
"client = distributed.Client(cluster)\n", | |
"client.register_scheduler_plugin(TaskGroupStatistics())\n", | |
"client.restart();" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"id": "551e6d8e-b7cf-4bdb-99e5-0e7cd099e0b9", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"fs = s3fs.S3FileSystem(anon=True)\n", | |
"paths = fs.glob(\"s3://ursa-labs-taxi-data/2009/**.parquet\")\n", | |
"\n", | |
"def download(path):\n", | |
" import os\n", | |
" out = \".\".join(path.split(\"/\")[-2:])\n", | |
" fs.download(\"s3://\" + path, out)\n", | |
" os.remove(out)\n", | |
" \n", | |
"def pandas_read(path):\n", | |
" import pandas\n", | |
" df = pandas.read_parquet(\"s3://\" + path)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"id": "987f3bbb-5f9c-48db-bcc4-852812a041fa", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"s3fs download took 5.758858680725098 seconds\n" | |
] | |
} | |
], | |
"source": [ | |
"with timer(\"s3fs download\"):\n", | |
" futs = client.map(download, paths)\n", | |
" distributed.wait(futs)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"id": "4f1896cc-2125-4f11-8ec7-de8f1f50dfc1", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"pandas read took 12.032827138900757 seconds\n" | |
] | |
} | |
], | |
"source": [ | |
"with timer(\"pandas read\"):\n", | |
" futs = client.map(pandas_read, paths)\n", | |
" distributed.wait(futs)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 8, | |
"id": "eb72fdf5-26c6-4684-9a63-cb56f34131d6", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Dask took 106.65854954719543 seconds\n" | |
] | |
} | |
], | |
"source": [ | |
"ddf = dd.read_parquet(\"s3://ursa-labs-taxi-data/2009/**.parquet\", engine=\"pyarrow\")\n", | |
"\n", | |
"with timer(\"Dask\"):\n", | |
" ddf = ddf.persist()\n", | |
" distributed.wait(ddf)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 9, | |
"id": "950236a0-3227-40c8-b7df-7c78ef6243f4", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"tg_data = client.sync(client.scheduler.get_task_groups)\n", | |
"df = pandas.DataFrame.from_dict(tg_data, orient=\"index\")\n", | |
"df.index.name = \"group\"" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 10, | |
"id": "27c9733b-8b55-49f3-9e85-770dc30812b6", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/html": [ | |
"<div>\n", | |
"<style scoped>\n", | |
" .dataframe tbody tr th:only-of-type {\n", | |
" vertical-align: middle;\n", | |
" }\n", | |
"\n", | |
" .dataframe tbody tr th {\n", | |
" vertical-align: top;\n", | |
" }\n", | |
"\n", | |
" .dataframe thead th {\n", | |
" text-align: right;\n", | |
" }\n", | |
"</style>\n", | |
"<table border=\"1\" class=\"dataframe\">\n", | |
" <thead>\n", | |
" <tr style=\"text-align: right;\">\n", | |
" <th></th>\n", | |
" <th>prefix</th>\n", | |
" <th>duration</th>\n", | |
" <th>start</th>\n", | |
" <th>stop</th>\n", | |
" <th>nbytes</th>\n", | |
" <th>wall_clock</th>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>group</th>\n", | |
" <th></th>\n", | |
" <th></th>\n", | |
" <th></th>\n", | |
" <th></th>\n", | |
" <th></th>\n", | |
" <th></th>\n", | |
" </tr>\n", | |
" </thead>\n", | |
" <tbody>\n", | |
" <tr>\n", | |
" <th>download</th>\n", | |
" <td>download</td>\n", | |
" <td>61.891020</td>\n", | |
" <td>2022-04-08 22:15:35.387949</td>\n", | |
" <td>2022-04-08 22:15:40.674404</td>\n", | |
" <td>192</td>\n", | |
" <td>5.286455</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>pandas_read</th>\n", | |
" <td>pandas_read</td>\n", | |
" <td>129.928092</td>\n", | |
" <td>2022-04-08 22:15:40.779161</td>\n", | |
" <td>2022-04-08 22:15:52.711967</td>\n", | |
" <td>192</td>\n", | |
" <td>11.932806</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>read-parquet-acc3f354554a33ef3f042bd5b66275c2</th>\n", | |
" <td>read-parquet</td>\n", | |
" <td>1152.861340</td>\n", | |
" <td>2022-04-08 22:15:56.220673</td>\n", | |
" <td>2022-04-08 22:17:42.778900</td>\n", | |
" <td>41798474191</td>\n", | |
" <td>106.558227</td>\n", | |
" </tr>\n", | |
" </tbody>\n", | |
"</table>\n", | |
"</div>" | |
], | |
"text/plain": [ | |
" prefix duration \\\n", | |
"group \n", | |
"download download 61.891020 \n", | |
"pandas_read pandas_read 129.928092 \n", | |
"read-parquet-acc3f354554a33ef3f042bd5b66275c2 read-parquet 1152.861340 \n", | |
"\n", | |
" start \\\n", | |
"group \n", | |
"download 2022-04-08 22:15:35.387949 \n", | |
"pandas_read 2022-04-08 22:15:40.779161 \n", | |
"read-parquet-acc3f354554a33ef3f042bd5b66275c2 2022-04-08 22:15:56.220673 \n", | |
"\n", | |
" stop \\\n", | |
"group \n", | |
"download 2022-04-08 22:15:40.674404 \n", | |
"pandas_read 2022-04-08 22:15:52.711967 \n", | |
"read-parquet-acc3f354554a33ef3f042bd5b66275c2 2022-04-08 22:17:42.778900 \n", | |
"\n", | |
" nbytes wall_clock \n", | |
"group \n", | |
"download 192 5.286455 \n", | |
"pandas_read 192 11.932806 \n", | |
"read-parquet-acc3f354554a33ef3f042bd5b66275c2 41798474191 106.558227 " | |
] | |
}, | |
"execution_count": 10, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"df.assign(wall_clock=(pandas.to_datetime(df.stop) - pandas.to_datetime(df.start)).dt.total_seconds())" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3 (ipykernel)", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.8.8" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 5 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment