Skip to content

Instantly share code, notes, and snippets.

@ian-r-rose
Last active April 8, 2022 22:23
Show Gist options
  • Save ian-r-rose/35d51426e5f6ee5519c9f86e360bcd76 to your computer and use it in GitHub Desktop.
Save ian-r-rose/35d51426e5f6ee5519c9f86e360bcd76 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "6664ef8e-7fa4-4c30-bea6-60a92100a019",
"metadata": {},
"outputs": [],
"source": [
"import contextlib\n",
"import datetime\n",
"import os\n",
"import time\n",
"\n",
"import dask\n",
"import dask.dataframe as dd\n",
"import distributed\n",
"import coiled\n",
"import pandas\n",
"import s3fs"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "d4f18719-b156-47ef-94e3-ccb87433664c",
"metadata": {},
"outputs": [],
"source": [
"from distributed.diagnostics import SchedulerPlugin\n",
"from distributed.utils import key_split, key_split_group\n",
"\n",
"class TaskGroupStatistics(SchedulerPlugin):\n",
" def __init__(self):\n",
" \"\"\"Initialize the plugin\"\"\"\n",
" self.groups = {}\n",
" self.scheduler = None\n",
"\n",
" def start(self, scheduler):\n",
" \"\"\"Called on scheduler start as well as on registration time\"\"\"\n",
" self.scheduler = scheduler\n",
" scheduler.handlers[\"get_task_groups\"] = self.get_task_groups\n",
"\n",
" def transition(self, key, start, finish, *args, **kwargs):\n",
" \"\"\"On key transition to memory, update the task group data\"\"\"\n",
" if self.scheduler is None:\n",
" # Should not get here if initialization has happened correctly\n",
" return\n",
"\n",
" if start == \"processing\" and finish == \"memory\":\n",
" prefix_name = key_split(key)\n",
" group_name = key_split_group(key)\n",
"\n",
" if group_name not in self.groups:\n",
" self.groups[group_name] = {}\n",
"\n",
" group = self.scheduler.task_groups[group_name]\n",
" self.groups[group_name][\"prefix\"] = prefix_name\n",
" self.groups[group_name][\"duration\"] = group.duration\n",
" self.groups[group_name][\"start\"] = str(\n",
" datetime.datetime.fromtimestamp(group.start)\n",
" )\n",
" self.groups[group_name][\"stop\"] = str(\n",
" datetime.datetime.fromtimestamp(group.stop)\n",
" )\n",
" self.groups[group_name][\"nbytes\"] = group.nbytes_total\n",
"\n",
" async def get_task_groups(self, comm):\n",
" return self.groups\n",
"\n",
" def restart(self, scheduler):\n",
" self.groups = {}"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "c47a34a1-bb5a-4810-8f8f-118c60e697b9",
"metadata": {},
"outputs": [],
"source": [
"@contextlib.contextmanager\n",
"def timer(label=\"Block\"):\n",
" start = time.time()\n",
" yield\n",
" end = time.time()\n",
" print(f\"{label} took {end-start} seconds\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "40fec172-8a43-42a1-bc12-20fd1eac4116",
"metadata": {},
"outputs": [],
"source": [
"cluster = coiled.Cluster(\n",
" name=\"network\",\n",
" n_workers=12,\n",
" scheduler_vm_types=[\"t3.medium\"],\n",
" worker_vm_types=[\"t3.xlarge\"],\n",
" backend_options={\"region\": \"us-east-2\"},\n",
")\n",
"client = distributed.Client(cluster)\n",
"client.register_scheduler_plugin(TaskGroupStatistics())\n",
"client.restart();"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "551e6d8e-b7cf-4bdb-99e5-0e7cd099e0b9",
"metadata": {},
"outputs": [],
"source": [
"fs = s3fs.S3FileSystem(anon=True)\n",
"paths = fs.glob(\"s3://ursa-labs-taxi-data/2009/**.parquet\")\n",
"\n",
"def download(path):\n",
" import os\n",
" out = \".\".join(path.split(\"/\")[-2:])\n",
" fs.download(\"s3://\" + path, out)\n",
" os.remove(out)\n",
" \n",
"def pandas_read(path):\n",
" import pandas\n",
" df = pandas.read_parquet(\"s3://\" + path)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "987f3bbb-5f9c-48db-bcc4-852812a041fa",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"s3fs download took 5.758858680725098 seconds\n"
]
}
],
"source": [
"with timer(\"s3fs download\"):\n",
" futs = client.map(download, paths)\n",
" distributed.wait(futs)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "4f1896cc-2125-4f11-8ec7-de8f1f50dfc1",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"pandas read took 12.032827138900757 seconds\n"
]
}
],
"source": [
"with timer(\"pandas read\"):\n",
" futs = client.map(pandas_read, paths)\n",
" distributed.wait(futs)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "eb72fdf5-26c6-4684-9a63-cb56f34131d6",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Dask took 106.65854954719543 seconds\n"
]
}
],
"source": [
"ddf = dd.read_parquet(\"s3://ursa-labs-taxi-data/2009/**.parquet\", engine=\"pyarrow\")\n",
"\n",
"with timer(\"Dask\"):\n",
" ddf = ddf.persist()\n",
" distributed.wait(ddf)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "950236a0-3227-40c8-b7df-7c78ef6243f4",
"metadata": {},
"outputs": [],
"source": [
"tg_data = client.sync(client.scheduler.get_task_groups)\n",
"df = pandas.DataFrame.from_dict(tg_data, orient=\"index\")\n",
"df.index.name = \"group\""
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "27c9733b-8b55-49f3-9e85-770dc30812b6",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>prefix</th>\n",
" <th>duration</th>\n",
" <th>start</th>\n",
" <th>stop</th>\n",
" <th>nbytes</th>\n",
" <th>wall_clock</th>\n",
" </tr>\n",
" <tr>\n",
" <th>group</th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>download</th>\n",
" <td>download</td>\n",
" <td>61.891020</td>\n",
" <td>2022-04-08 22:15:35.387949</td>\n",
" <td>2022-04-08 22:15:40.674404</td>\n",
" <td>192</td>\n",
" <td>5.286455</td>\n",
" </tr>\n",
" <tr>\n",
" <th>pandas_read</th>\n",
" <td>pandas_read</td>\n",
" <td>129.928092</td>\n",
" <td>2022-04-08 22:15:40.779161</td>\n",
" <td>2022-04-08 22:15:52.711967</td>\n",
" <td>192</td>\n",
" <td>11.932806</td>\n",
" </tr>\n",
" <tr>\n",
" <th>read-parquet-acc3f354554a33ef3f042bd5b66275c2</th>\n",
" <td>read-parquet</td>\n",
" <td>1152.861340</td>\n",
" <td>2022-04-08 22:15:56.220673</td>\n",
" <td>2022-04-08 22:17:42.778900</td>\n",
" <td>41798474191</td>\n",
" <td>106.558227</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" prefix duration \\\n",
"group \n",
"download download 61.891020 \n",
"pandas_read pandas_read 129.928092 \n",
"read-parquet-acc3f354554a33ef3f042bd5b66275c2 read-parquet 1152.861340 \n",
"\n",
" start \\\n",
"group \n",
"download 2022-04-08 22:15:35.387949 \n",
"pandas_read 2022-04-08 22:15:40.779161 \n",
"read-parquet-acc3f354554a33ef3f042bd5b66275c2 2022-04-08 22:15:56.220673 \n",
"\n",
" stop \\\n",
"group \n",
"download 2022-04-08 22:15:40.674404 \n",
"pandas_read 2022-04-08 22:15:52.711967 \n",
"read-parquet-acc3f354554a33ef3f042bd5b66275c2 2022-04-08 22:17:42.778900 \n",
"\n",
" nbytes wall_clock \n",
"group \n",
"download 192 5.286455 \n",
"pandas_read 192 11.932806 \n",
"read-parquet-acc3f354554a33ef3f042bd5b66275c2 41798474191 106.558227 "
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.assign(wall_clock=(pandas.to_datetime(df.stop) - pandas.to_datetime(df.start)).dt.total_seconds())"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.8"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment