Last active
October 25, 2023 17:26
-
-
Save mrocklin/c1fd89575b40c055a9be77b2a47894df to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"id": "ac7b089d-b128-403e-ae72-6aec681aaef7", | |
"metadata": {}, | |
"source": [ | |
"## Setup\n", | |
"\n", | |
"This is run on an m6i.xlarge (4 core generic Intel machine) on AWS in the same region as the data" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"id": "e840f019-140f-4807-8fe3-a0411beb29a9", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"1\n" | |
] | |
} | |
], | |
"source": [ | |
"!echo $OMP_NUM_THREADS" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"id": "9c8f0c06-3580-4dad-9a6d-b49fefbd4d74", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import os\n", | |
"os.environ[\"OMP_NUM_THREADS\"] = \"8\"" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"id": "19a60ae0-2763-437b-8548-2c7ae4ae7682", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import contextlib\n", | |
"import pyarrow.parquet as pq\n", | |
"from dask.utils import format_bytes\n", | |
"import time" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"id": "04731fb7-d84d-4342-afb1-8a1c3060ae4b", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import s3fs\n", | |
"s3 = s3fs.S3FileSystem()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"id": "3ba34ae5-d3c3-4f87-a77a-c1f317cec64d", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"['coiled-runtime-ci/tpc-h/scale-10/lineitem/lineitem_0da92240-ed31-4eb5-a93b-81d2ee4d5e14.parquet',\n", | |
" 'coiled-runtime-ci/tpc-h/scale-10/lineitem/lineitem_13ab3fb8-8e60-4bc5-a8a2-52da5bd6ee62.parquet',\n", | |
" 'coiled-runtime-ci/tpc-h/scale-10/lineitem/lineitem_2ad37431-801f-4b5e-b469-4b6748f76e2d.parquet',\n", | |
" 'coiled-runtime-ci/tpc-h/scale-10/lineitem/lineitem_2cc68d48-eec1-47e1-8d15-f4465bb028bf.parquet',\n", | |
" 'coiled-runtime-ci/tpc-h/scale-10/lineitem/lineitem_331c7400-9f58-465d-83da-5e4d8418c373.parquet']" | |
] | |
}, | |
"execution_count": 5, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"filenames = s3.glob(\"coiled-runtime-ci/tpc-h/scale-10/lineitem/*.parquet\")\n", | |
"filenames[:5]" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"id": "8a806cf3-1f32-437e-b123-7f6e870ec7fe", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"'2.87 GiB'" | |
] | |
}, | |
"execution_count": 6, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"nbytes = s3.du(\"coiled-runtime-ci/tpc-h/scale-10/lineitem/\")\n", | |
"format_bytes(nbytes )" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"id": "89bed71c-ae93-437a-8359-1404ff6fa20b", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"@contextlib.contextmanager\n", | |
"def bandwidth():\n", | |
" start = time.time()\n", | |
" yield\n", | |
" stop = time.time()\n", | |
" print(int(nbytes / (stop - start) // 2**20), \"MiB/s\")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 8, | |
"id": "e44d379e-fbe6-4665-b0c7-db43d6c61dcd", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from concurrent.futures import ThreadPoolExecutor" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 9, | |
"id": "5fd114db-e62a-4938-91a6-21906ec17af3", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import multiprocessing\n", | |
"ncores = multiprocessing.cpu_count()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 10, | |
"id": "1ed941b4-cbcd-41a4-b835-4a66908fdb35", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"4" | |
] | |
}, | |
"execution_count": 10, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"ncores" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "d0437b23-220b-408e-a891-b4ac3d0c981e", | |
"metadata": {}, | |
"source": [ | |
"### Roughly how much do files expand?" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 11, | |
"id": "eef72144-f793-42b3-a592-c7f1dd242882", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"'126.08 MiB'" | |
] | |
}, | |
"execution_count": 11, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"format_bytes(s3.du(filenames[0]))" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 12, | |
"id": "bfec59d1-4a0e-4d9f-b3ee-6de61ec60ba8", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"'339.21 MiB'" | |
] | |
}, | |
"execution_count": 12, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"t = pq.ParquetFile(\n", | |
" \"s3://\" + filenames[0], pre_buffer=True\n", | |
").read(use_threads=False, use_pandas_metadata=True)\n", | |
"\n", | |
"format_bytes(t.nbytes)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "cfccf3f9-d01d-4f41-89c5-a04b55cbbadf", | |
"metadata": {}, | |
"source": [ | |
"## Test general bandwidth onto these machines" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 14, | |
"id": "8947d8df-523c-47c2-af9c-fbbe356c6be3", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"42 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(1) as e:\n", | |
" def load(fn):\n", | |
" s3.open(fn).read()\n", | |
" list(e.map(load, filenames))" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 15, | |
"id": "aee4b03f-d9d4-43f5-a6f0-ce3bf9e97f2f", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"205 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(ncores) as e:\n", | |
" def load(fn):\n", | |
" s3.open(fn).read()\n", | |
" list(e.map(load, filenames))" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 16, | |
"id": "bdfe6184-b332-4253-a34e-cedc3d6d92c7", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"378 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(int(ncores * 1.5)) as e:\n", | |
" def load(fn):\n", | |
" s3.open(fn).read()\n", | |
" list(e.map(load, filenames))" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 17, | |
"id": "5f309e61-83e9-4a54-af90-7d820bd4d96a", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"427 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(ncores * 2) as e:\n", | |
" def load(fn):\n", | |
" s3.open(fn).read()\n", | |
" list(e.map(load, filenames))" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "b198d9b5-f2b1-400f-9c4e-b69032d27699", | |
"metadata": {}, | |
"source": [ | |
"## ParquetFile\n", | |
"\n", | |
"We have three knobs to play with:\n", | |
"\n", | |
"- `pre_buffer`: this is always going to be a good choice\n", | |
"- `use_threads`: it seems that we want this to be off\n", | |
"- `filesystem`: s3fs seems to work best with external threads\n", | |
"\n", | |
"*maybe these change with large `OMP_NUM_THREADS` but I haven't been able to make this happen." | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "516ec3e2-f7fe-4210-9146-3bd34b5f19d6", | |
"metadata": {}, | |
"source": [ | |
"### S3FS" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 18, | |
"id": "734daba8-7160-4b13-b38b-f7c56c36bc5d", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"168 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(ncores) as e:\n", | |
" def load(fn):\n", | |
" with s3.open(fn, mode=\"rb\") as f:\n", | |
" pq.ParquetFile(f, pre_buffer=True).read(\n", | |
" use_threads=False,\n", | |
" use_pandas_metadata=True,\n", | |
" )\n", | |
" list(e.map(load, filenames))\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 19, | |
"id": "75bada5a-81fd-41ce-9f8e-b6ede8fda9f4", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"172 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(ncores) as e:\n", | |
" def load(fn):\n", | |
" with s3.open(fn, mode=\"rb\") as f:\n", | |
" pq.ParquetFile(f, pre_buffer=True).read(\n", | |
" use_threads=True,\n", | |
" use_pandas_metadata=True,\n", | |
" )\n", | |
" list(e.map(load, filenames))\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 20, | |
"id": "20b8f1e1-1b4a-4dec-a7f2-8a1b9b52ed30", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"211 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(ncores * 2) as e:\n", | |
" def load(fn):\n", | |
" with s3.open(fn, mode=\"rb\") as f:\n", | |
" pq.ParquetFile(f, pre_buffer=True).read(\n", | |
" use_threads=False,\n", | |
" use_pandas_metadata=True,\n", | |
" )\n", | |
" list(e.map(load, filenames))\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 21, | |
"id": "8661490c-1708-4745-8a8c-614da973e87d", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"215 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(ncores * 2) as e:\n", | |
" def load(fn):\n", | |
" with s3.open(fn, mode=\"rb\") as f:\n", | |
" pq.ParquetFile(f, pre_buffer=True).read(\n", | |
" use_threads=True,\n", | |
" use_pandas_metadata=True,\n", | |
" )\n", | |
" list(e.map(load, filenames))\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 22, | |
"id": "1dcb397e-b347-4b89-9754-14d807c9f45a", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"32 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(ncores * 2) as e:\n", | |
" def load(fn):\n", | |
" with s3.open(fn, mode=\"rb\") as f:\n", | |
" pq.ParquetFile(f, pre_buffer=False).read(\n", | |
" use_threads=False,\n", | |
" use_pandas_metadata=True,\n", | |
" )\n", | |
" list(e.map(load, filenames))\n" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "f6acf24f-3db9-4490-be21-e3811914b7ab", | |
"metadata": {}, | |
"source": [ | |
"### Arrow FS" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 23, | |
"id": "e8969a7f-3c2a-4af3-af6c-a5edfadc53e6", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from pyarrow.fs import S3FileSystem\n", | |
"import boto3\n", | |
"session = boto3.session.Session()\n", | |
"credentials = session.get_credentials()\n", | |
"\n", | |
"fs = S3FileSystem(\n", | |
" secret_key=credentials.secret_key,\n", | |
" access_key=credentials.access_key,\n", | |
" region=\"us-east-2\",\n", | |
" session_token=credentials.token,\n", | |
")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 24, | |
"id": "b4036f7b-4532-4404-9d88-a2845185763a", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"233 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(ncores) as e:\n", | |
" def load(fn):\n", | |
" pq.ParquetFile(\"s3://\" + fn, pre_buffer=True).read(\n", | |
" use_threads=False,\n", | |
" use_pandas_metadata=True,\n", | |
" )\n", | |
" list(e.map(load, filenames))\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 25, | |
"id": "2e99023f-f3c5-4eab-b09c-392221921b0e", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"229 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(ncores) as e:\n", | |
" def load(fn):\n", | |
" pq.ParquetFile(\"s3://\" + fn, pre_buffer=True).read(\n", | |
" use_threads=True,\n", | |
" use_pandas_metadata=True,\n", | |
" )\n", | |
" list(e.map(load, filenames))\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 26, | |
"id": "fc1bd5b5-a64c-4d2c-a833-855239b1235a", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import pyarrow as pa\n", | |
"pa.set_io_thread_count(ncores * 4)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 27, | |
"id": "05f385a6-da20-4b8b-9a8a-603c92457fae", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"237 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(ncores) as e:\n", | |
" def load(fn):\n", | |
" pq.ParquetFile(\"s3://\" + fn, pre_buffer=True).read(\n", | |
" use_threads=False,\n", | |
" use_pandas_metadata=True,\n", | |
" )\n", | |
" list(e.map(load, filenames))\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 28, | |
"id": "95e146ae-51d8-40ea-86e1-cd3ed81dacd6", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"241 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(ncores) as e:\n", | |
" def load(fn):\n", | |
" pq.ParquetFile(\"s3://\" + fn, pre_buffer=True).read(\n", | |
" use_threads=True,\n", | |
" use_pandas_metadata=True,\n", | |
" )\n", | |
" list(e.map(load, filenames))\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 29, | |
"id": "d39f908d-e1bc-46a7-8d54-1dce7a98f0c6", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"252 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(ncores * 2) as e:\n", | |
" def load(fn):\n", | |
" pq.ParquetFile(\"s3://\" + fn, pre_buffer=True).read(\n", | |
" use_threads=True,\n", | |
" use_pandas_metadata=True,\n", | |
" )\n", | |
" list(e.map(load, filenames))\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 30, | |
"id": "b21f96f3-f3f9-4db2-a887-552fdf182f16", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import pyarrow as pa\n", | |
"pa.set_io_thread_count(ncores)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 31, | |
"id": "a4a328c9-a3d4-4bab-aee0-7e2659f630d4", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"227 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(ncores) as e:\n", | |
" def load(fn):\n", | |
" pq.ParquetFile(\"s3://\" + fn, pre_buffer=True).read(\n", | |
" use_threads=True,\n", | |
" use_pandas_metadata=True,\n", | |
" )\n", | |
" list(e.map(load, filenames))\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 32, | |
"id": "2645c274-fbda-4f4a-967c-74c2b6f00af0", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"238 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(ncores * 2) as e:\n", | |
" def load(fn):\n", | |
" pq.ParquetFile(\"s3://\" + fn, pre_buffer=True).read(\n", | |
" use_threads=False,\n", | |
" use_pandas_metadata=True,\n", | |
" )\n", | |
" list(e.map(load, filenames))\n" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "04a43d98-c921-4e88-868b-69bd0c62307e", | |
"metadata": {}, | |
"source": [ | |
"1. pre_buffer is critical\n", | |
"2. We shouldn't use the dedicated Arrow thread pool (or maybe this is OMP_NUM_THREADS specific)\n", | |
"3. We should oversaturate cores" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "63ece358-23f4-4c16-bd69-07b553a70c4d", | |
"metadata": {}, | |
"source": [ | |
"I also tried using the parquet method here, but honestly I couldn't figure out how to invoke it. Emperically I've found that it gives the best results so far." | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "344cde23-f7f4-41fc-913e-972cb161c9f4", | |
"metadata": {}, | |
"source": [ | |
"## Dask" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 33, | |
"id": "d3d65ec7-ef73-4a8e-9bae-1b9e0be48ea6", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import dask\n", | |
"import dask.dataframe as dd" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 34, | |
"id": "77b0d19b-e0ca-4209-9fad-ccafe7e086bd", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"160 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"df = dd.read_parquet(\"s3://coiled-runtime-ci/tpc-h/scale-10/lineitem\")\n", | |
"\n", | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(ncores) as e:\n", | |
" with dask.config.set(pool=e):\n", | |
" df.map_partitions(lambda df: df.head(1)).persist(scheduler=\"threads\")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 35, | |
"id": "18028478-ae9f-4029-9764-b61cdca8d67f", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"109 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"df = dd.read_parquet(\n", | |
" \"s3://coiled-runtime-ci/tpc-h/scale-10/lineitem\",\n", | |
" open_file_options={\"precache_options\": {\"method\": \"parquet\"}},\n", | |
")\n", | |
"\n", | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(ncores) as e:\n", | |
" with dask.config.set(pool=e):\n", | |
" df.map_partitions(lambda df: df.head(1)).persist(scheduler=\"threads\")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 36, | |
"id": "13916f66-c83f-44b8-b811-ac49b5190a62", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"204 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"df = dd.read_parquet(\n", | |
" \"s3://coiled-runtime-ci/tpc-h/scale-10/lineitem\",\n", | |
" filesystem=\"arrow\",\n", | |
")\n", | |
"\n", | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(ncores) as e:\n", | |
" with dask.config.set(pool=e):\n", | |
" df.map_partitions(lambda df: df.head(1)).persist(scheduler=\"threads\")" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "b114f560-f389-48f1-a946-aeaff2321969", | |
"metadata": {}, | |
"source": [ | |
"### 8 threads" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 37, | |
"id": "9ce04d23-b181-451a-8be7-a531b40d9372", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"134 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"df = dd.read_parquet(\"s3://coiled-runtime-ci/tpc-h/scale-10/lineitem\")\n", | |
"\n", | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(ncores * 2) as e:\n", | |
" with dask.config.set(pool=e):\n", | |
" df.map_partitions(lambda df: df.head(1)).persist(scheduler=\"threads\")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 38, | |
"id": "34848586-f3b1-4707-9e77-f1cdf283e0cb", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"131 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"df = dd.read_parquet(\n", | |
" \"s3://coiled-runtime-ci/tpc-h/scale-10/lineitem\",\n", | |
" open_file_options={\"precache_options\": {\"method\": \"parquet\"}},\n", | |
")\n", | |
"\n", | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(ncores * 2) as e:\n", | |
" with dask.config.set(pool=e):\n", | |
" df.map_partitions(lambda df: df.head(1)).persist(scheduler=\"threads\")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 39, | |
"id": "62994e45-4c73-44da-be77-1cd423fe0d85", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"219 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"df = dd.read_parquet(\n", | |
" \"s3://coiled-runtime-ci/tpc-h/scale-10/lineitem\",\n", | |
" filesystem=\"arrow\",\n", | |
")\n", | |
"\n", | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(ncores * 2) as e:\n", | |
" with dask.config.set(pool=e):\n", | |
" df.map_partitions(lambda df: df.head(1)).persist(scheduler=\"threads\")" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "03df3577-5303-4ed9-97df-7f9195587fab", | |
"metadata": {}, | |
"source": [ | |
"## Run locally" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 40, | |
"id": "5f1fac5d-fb18-4e91-8b32-f8a5164db539", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"45 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"\n", | |
"with bandwidth():\n", | |
" import os\n", | |
" os.mkdir(\"data\")\n", | |
" for fn in filenames:\n", | |
" s3.get(fn, \"data/\")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 41, | |
"id": "26f2dc98-fd0c-4774-8d51-9548f2171478", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import glob\n", | |
"local_filenames = glob.glob(\"data/*\")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 42, | |
"id": "de17c5b7-b877-497a-aa63-e623c9785803", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"265 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(1) as e:\n", | |
" def load(fn):\n", | |
" pq.ParquetFile(fn, pre_buffer=True).read(\n", | |
" use_threads=True,\n", | |
" use_pandas_metadata=True,\n", | |
" )\n", | |
" list(e.map(load, local_filenames))\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 43, | |
"id": "2c21e60d-43c9-4a74-a4d8-57bb06dd8b37", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"344 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(ncores) as e:\n", | |
" def load(fn):\n", | |
" pq.ParquetFile(fn, pre_buffer=True).read(\n", | |
" use_threads=True,\n", | |
" use_pandas_metadata=True,\n", | |
" )\n", | |
" list(e.map(load, local_filenames))\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 45, | |
"id": "48ecfdd4-909c-4099-b23c-035523e848fa", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"2218 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(1) as e:\n", | |
" def load(fn):\n", | |
" open(fn, mode=\"rb\").read()\n", | |
" list(e.map(load, local_filenames))\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 46, | |
"id": "e9ef6f90-dda5-4b02-84d8-72648f0c3881", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"5567 MiB/s\n" | |
] | |
} | |
], | |
"source": [ | |
"with bandwidth():\n", | |
" with ThreadPoolExecutor(ncores) as e:\n", | |
" def load(fn):\n", | |
" open(fn, mode=\"rb\").read()\n", | |
" list(e.map(load, local_filenames))\n" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3 (ipykernel)", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.11.6" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 5 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment