Skip to content

Instantly share code, notes, and snippets.

@mrocklin
Last active October 25, 2023 17:26
Show Gist options
  • Save mrocklin/c1fd89575b40c055a9be77b2a47894df to your computer and use it in GitHub Desktop.
Save mrocklin/c1fd89575b40c055a9be77b2a47894df to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"id": "ac7b089d-b128-403e-ae72-6aec681aaef7",
"metadata": {},
"source": [
"## Setup\n",
"\n",
"This is run on an m6i.xlarge (4 core generic Intel machine) on AWS in the same region as the data"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "e840f019-140f-4807-8fe3-a0411beb29a9",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1\n"
]
}
],
"source": [
"!echo $OMP_NUM_THREADS"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "9c8f0c06-3580-4dad-9a6d-b49fefbd4d74",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"os.environ[\"OMP_NUM_THREADS\"] = \"8\""
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "19a60ae0-2763-437b-8548-2c7ae4ae7682",
"metadata": {},
"outputs": [],
"source": [
"import contextlib\n",
"import pyarrow.parquet as pq\n",
"from dask.utils import format_bytes\n",
"import time"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "04731fb7-d84d-4342-afb1-8a1c3060ae4b",
"metadata": {},
"outputs": [],
"source": [
"import s3fs\n",
"s3 = s3fs.S3FileSystem()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "3ba34ae5-d3c3-4f87-a77a-c1f317cec64d",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['coiled-runtime-ci/tpc-h/scale-10/lineitem/lineitem_0da92240-ed31-4eb5-a93b-81d2ee4d5e14.parquet',\n",
" 'coiled-runtime-ci/tpc-h/scale-10/lineitem/lineitem_13ab3fb8-8e60-4bc5-a8a2-52da5bd6ee62.parquet',\n",
" 'coiled-runtime-ci/tpc-h/scale-10/lineitem/lineitem_2ad37431-801f-4b5e-b469-4b6748f76e2d.parquet',\n",
" 'coiled-runtime-ci/tpc-h/scale-10/lineitem/lineitem_2cc68d48-eec1-47e1-8d15-f4465bb028bf.parquet',\n",
" 'coiled-runtime-ci/tpc-h/scale-10/lineitem/lineitem_331c7400-9f58-465d-83da-5e4d8418c373.parquet']"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"filenames = s3.glob(\"coiled-runtime-ci/tpc-h/scale-10/lineitem/*.parquet\")\n",
"filenames[:5]"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "8a806cf3-1f32-437e-b123-7f6e870ec7fe",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'2.87 GiB'"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"nbytes = s3.du(\"coiled-runtime-ci/tpc-h/scale-10/lineitem/\")\n",
"format_bytes(nbytes )"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "89bed71c-ae93-437a-8359-1404ff6fa20b",
"metadata": {},
"outputs": [],
"source": [
"@contextlib.contextmanager\n",
"def bandwidth():\n",
" start = time.time()\n",
" yield\n",
" stop = time.time()\n",
" print(int(nbytes / (stop - start) // 2**20), \"MiB/s\")"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "e44d379e-fbe6-4665-b0c7-db43d6c61dcd",
"metadata": {},
"outputs": [],
"source": [
"from concurrent.futures import ThreadPoolExecutor"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "5fd114db-e62a-4938-91a6-21906ec17af3",
"metadata": {},
"outputs": [],
"source": [
"import multiprocessing\n",
"ncores = multiprocessing.cpu_count()"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "1ed941b4-cbcd-41a4-b835-4a66908fdb35",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"4"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ncores"
]
},
{
"cell_type": "markdown",
"id": "d0437b23-220b-408e-a891-b4ac3d0c981e",
"metadata": {},
"source": [
"### Roughly how much do files expand?"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "eef72144-f793-42b3-a592-c7f1dd242882",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'126.08 MiB'"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"format_bytes(s3.du(filenames[0]))"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "bfec59d1-4a0e-4d9f-b3ee-6de61ec60ba8",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'339.21 MiB'"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"t = pq.ParquetFile(\n",
" \"s3://\" + filenames[0], pre_buffer=True\n",
").read(use_threads=False, use_pandas_metadata=True)\n",
"\n",
"format_bytes(t.nbytes)"
]
},
{
"cell_type": "markdown",
"id": "cfccf3f9-d01d-4f41-89c5-a04b55cbbadf",
"metadata": {},
"source": [
"## Test general bandwidth onto these machines"
]
},
{
"cell_type": "code",
"execution_count": 14,
"id": "8947d8df-523c-47c2-af9c-fbbe356c6be3",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"42 MiB/s\n"
]
}
],
"source": [
"with bandwidth():\n",
" with ThreadPoolExecutor(1) as e:\n",
" def load(fn):\n",
" s3.open(fn).read()\n",
" list(e.map(load, filenames))"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "aee4b03f-d9d4-43f5-a6f0-ce3bf9e97f2f",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"205 MiB/s\n"
]
}
],
"source": [
"with bandwidth():\n",
" with ThreadPoolExecutor(ncores) as e:\n",
" def load(fn):\n",
" s3.open(fn).read()\n",
" list(e.map(load, filenames))"
]
},
{
"cell_type": "code",
"execution_count": 16,
"id": "bdfe6184-b332-4253-a34e-cedc3d6d92c7",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"378 MiB/s\n"
]
}
],
"source": [
"with bandwidth():\n",
" with ThreadPoolExecutor(int(ncores * 1.5)) as e:\n",
" def load(fn):\n",
" s3.open(fn).read()\n",
" list(e.map(load, filenames))"
]
},
{
"cell_type": "code",
"execution_count": 17,
"id": "5f309e61-83e9-4a54-af90-7d820bd4d96a",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"427 MiB/s\n"
]
}
],
"source": [
"with bandwidth():\n",
" with ThreadPoolExecutor(ncores * 2) as e:\n",
" def load(fn):\n",
" s3.open(fn).read()\n",
" list(e.map(load, filenames))"
]
},
{
"cell_type": "markdown",
"id": "b198d9b5-f2b1-400f-9c4e-b69032d27699",
"metadata": {},
"source": [
"## ParquetFile\n",
"\n",
"We have three knobs to play with:\n",
"\n",
"- `pre_buffer`: this is always going to be a good choice\n",
"- `use_threads`: it seems that we want this to be off\n",
"- `filesystem`: s3fs seems to work best with external threads\n",
"\n",
"*maybe these change with large `OMP_NUM_THREADS` but I haven't been able to make this happen."
]
},
{
"cell_type": "markdown",
"id": "516ec3e2-f7fe-4210-9146-3bd34b5f19d6",
"metadata": {},
"source": [
"### S3FS"
]
},
{
"cell_type": "code",
"execution_count": 18,
"id": "734daba8-7160-4b13-b38b-f7c56c36bc5d",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"168 MiB/s\n"
]
}
],
"source": [
"with bandwidth():\n",
" with ThreadPoolExecutor(ncores) as e:\n",
" def load(fn):\n",
" with s3.open(fn, mode=\"rb\") as f:\n",
" pq.ParquetFile(f, pre_buffer=True).read(\n",
" use_threads=False,\n",
" use_pandas_metadata=True,\n",
" )\n",
" list(e.map(load, filenames))\n"
]
},
{
"cell_type": "code",
"execution_count": 19,
"id": "75bada5a-81fd-41ce-9f8e-b6ede8fda9f4",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"172 MiB/s\n"
]
}
],
"source": [
"with bandwidth():\n",
" with ThreadPoolExecutor(ncores) as e:\n",
" def load(fn):\n",
" with s3.open(fn, mode=\"rb\") as f:\n",
" pq.ParquetFile(f, pre_buffer=True).read(\n",
" use_threads=True,\n",
" use_pandas_metadata=True,\n",
" )\n",
" list(e.map(load, filenames))\n"
]
},
{
"cell_type": "code",
"execution_count": 20,
"id": "20b8f1e1-1b4a-4dec-a7f2-8a1b9b52ed30",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"211 MiB/s\n"
]
}
],
"source": [
"with bandwidth():\n",
" with ThreadPoolExecutor(ncores * 2) as e:\n",
" def load(fn):\n",
" with s3.open(fn, mode=\"rb\") as f:\n",
" pq.ParquetFile(f, pre_buffer=True).read(\n",
" use_threads=False,\n",
" use_pandas_metadata=True,\n",
" )\n",
" list(e.map(load, filenames))\n"
]
},
{
"cell_type": "code",
"execution_count": 21,
"id": "8661490c-1708-4745-8a8c-614da973e87d",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"215 MiB/s\n"
]
}
],
"source": [
"with bandwidth():\n",
" with ThreadPoolExecutor(ncores * 2) as e:\n",
" def load(fn):\n",
" with s3.open(fn, mode=\"rb\") as f:\n",
" pq.ParquetFile(f, pre_buffer=True).read(\n",
" use_threads=True,\n",
" use_pandas_metadata=True,\n",
" )\n",
" list(e.map(load, filenames))\n"
]
},
{
"cell_type": "code",
"execution_count": 22,
"id": "1dcb397e-b347-4b89-9754-14d807c9f45a",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"32 MiB/s\n"
]
}
],
"source": [
"with bandwidth():\n",
" with ThreadPoolExecutor(ncores * 2) as e:\n",
" def load(fn):\n",
" with s3.open(fn, mode=\"rb\") as f:\n",
" pq.ParquetFile(f, pre_buffer=False).read(\n",
" use_threads=False,\n",
" use_pandas_metadata=True,\n",
" )\n",
" list(e.map(load, filenames))\n"
]
},
{
"cell_type": "markdown",
"id": "f6acf24f-3db9-4490-be21-e3811914b7ab",
"metadata": {},
"source": [
"### Arrow FS"
]
},
{
"cell_type": "code",
"execution_count": 23,
"id": "e8969a7f-3c2a-4af3-af6c-a5edfadc53e6",
"metadata": {},
"outputs": [],
"source": [
"from pyarrow.fs import S3FileSystem\n",
"import boto3\n",
"session = boto3.session.Session()\n",
"credentials = session.get_credentials()\n",
"\n",
"fs = S3FileSystem(\n",
" secret_key=credentials.secret_key,\n",
" access_key=credentials.access_key,\n",
" region=\"us-east-2\",\n",
" session_token=credentials.token,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 24,
"id": "b4036f7b-4532-4404-9d88-a2845185763a",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"233 MiB/s\n"
]
}
],
"source": [
"with bandwidth():\n",
" with ThreadPoolExecutor(ncores) as e:\n",
" def load(fn):\n",
" pq.ParquetFile(\"s3://\" + fn, pre_buffer=True).read(\n",
" use_threads=False,\n",
" use_pandas_metadata=True,\n",
" )\n",
" list(e.map(load, filenames))\n"
]
},
{
"cell_type": "code",
"execution_count": 25,
"id": "2e99023f-f3c5-4eab-b09c-392221921b0e",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"229 MiB/s\n"
]
}
],
"source": [
"with bandwidth():\n",
" with ThreadPoolExecutor(ncores) as e:\n",
" def load(fn):\n",
" pq.ParquetFile(\"s3://\" + fn, pre_buffer=True).read(\n",
" use_threads=True,\n",
" use_pandas_metadata=True,\n",
" )\n",
" list(e.map(load, filenames))\n"
]
},
{
"cell_type": "code",
"execution_count": 26,
"id": "fc1bd5b5-a64c-4d2c-a833-855239b1235a",
"metadata": {},
"outputs": [],
"source": [
"import pyarrow as pa\n",
"pa.set_io_thread_count(ncores * 4)"
]
},
{
"cell_type": "code",
"execution_count": 27,
"id": "05f385a6-da20-4b8b-9a8a-603c92457fae",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"237 MiB/s\n"
]
}
],
"source": [
"with bandwidth():\n",
" with ThreadPoolExecutor(ncores) as e:\n",
" def load(fn):\n",
" pq.ParquetFile(\"s3://\" + fn, pre_buffer=True).read(\n",
" use_threads=False,\n",
" use_pandas_metadata=True,\n",
" )\n",
" list(e.map(load, filenames))\n"
]
},
{
"cell_type": "code",
"execution_count": 28,
"id": "95e146ae-51d8-40ea-86e1-cd3ed81dacd6",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"241 MiB/s\n"
]
}
],
"source": [
"with bandwidth():\n",
" with ThreadPoolExecutor(ncores) as e:\n",
" def load(fn):\n",
" pq.ParquetFile(\"s3://\" + fn, pre_buffer=True).read(\n",
" use_threads=True,\n",
" use_pandas_metadata=True,\n",
" )\n",
" list(e.map(load, filenames))\n"
]
},
{
"cell_type": "code",
"execution_count": 29,
"id": "d39f908d-e1bc-46a7-8d54-1dce7a98f0c6",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"252 MiB/s\n"
]
}
],
"source": [
"with bandwidth():\n",
" with ThreadPoolExecutor(ncores * 2) as e:\n",
" def load(fn):\n",
" pq.ParquetFile(\"s3://\" + fn, pre_buffer=True).read(\n",
" use_threads=True,\n",
" use_pandas_metadata=True,\n",
" )\n",
" list(e.map(load, filenames))\n"
]
},
{
"cell_type": "code",
"execution_count": 30,
"id": "b21f96f3-f3f9-4db2-a887-552fdf182f16",
"metadata": {},
"outputs": [],
"source": [
"import pyarrow as pa\n",
"pa.set_io_thread_count(ncores)"
]
},
{
"cell_type": "code",
"execution_count": 31,
"id": "a4a328c9-a3d4-4bab-aee0-7e2659f630d4",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"227 MiB/s\n"
]
}
],
"source": [
"with bandwidth():\n",
" with ThreadPoolExecutor(ncores) as e:\n",
" def load(fn):\n",
" pq.ParquetFile(\"s3://\" + fn, pre_buffer=True).read(\n",
" use_threads=True,\n",
" use_pandas_metadata=True,\n",
" )\n",
" list(e.map(load, filenames))\n"
]
},
{
"cell_type": "code",
"execution_count": 32,
"id": "2645c274-fbda-4f4a-967c-74c2b6f00af0",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"238 MiB/s\n"
]
}
],
"source": [
"with bandwidth():\n",
" with ThreadPoolExecutor(ncores * 2) as e:\n",
" def load(fn):\n",
" pq.ParquetFile(\"s3://\" + fn, pre_buffer=True).read(\n",
" use_threads=False,\n",
" use_pandas_metadata=True,\n",
" )\n",
" list(e.map(load, filenames))\n"
]
},
{
"cell_type": "markdown",
"id": "04a43d98-c921-4e88-868b-69bd0c62307e",
"metadata": {},
"source": [
"1. pre_buffer is critical\n",
"2. We shouldn't use the dedicated Arrow thread pool (or maybe this is OMP_NUM_THREADS specific)\n",
"3. We should oversaturate cores"
]
},
{
"cell_type": "markdown",
"id": "63ece358-23f4-4c16-bd69-07b553a70c4d",
"metadata": {},
"source": [
"I also tried using the parquet method here, but honestly I couldn't figure out how to invoke it. Emperically I've found that it gives the best results so far."
]
},
{
"cell_type": "markdown",
"id": "344cde23-f7f4-41fc-913e-972cb161c9f4",
"metadata": {},
"source": [
"## Dask"
]
},
{
"cell_type": "code",
"execution_count": 33,
"id": "d3d65ec7-ef73-4a8e-9bae-1b9e0be48ea6",
"metadata": {},
"outputs": [],
"source": [
"import dask\n",
"import dask.dataframe as dd"
]
},
{
"cell_type": "code",
"execution_count": 34,
"id": "77b0d19b-e0ca-4209-9fad-ccafe7e086bd",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"160 MiB/s\n"
]
}
],
"source": [
"df = dd.read_parquet(\"s3://coiled-runtime-ci/tpc-h/scale-10/lineitem\")\n",
"\n",
"with bandwidth():\n",
" with ThreadPoolExecutor(ncores) as e:\n",
" with dask.config.set(pool=e):\n",
" df.map_partitions(lambda df: df.head(1)).persist(scheduler=\"threads\")"
]
},
{
"cell_type": "code",
"execution_count": 35,
"id": "18028478-ae9f-4029-9764-b61cdca8d67f",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"109 MiB/s\n"
]
}
],
"source": [
"df = dd.read_parquet(\n",
" \"s3://coiled-runtime-ci/tpc-h/scale-10/lineitem\",\n",
" open_file_options={\"precache_options\": {\"method\": \"parquet\"}},\n",
")\n",
"\n",
"with bandwidth():\n",
" with ThreadPoolExecutor(ncores) as e:\n",
" with dask.config.set(pool=e):\n",
" df.map_partitions(lambda df: df.head(1)).persist(scheduler=\"threads\")"
]
},
{
"cell_type": "code",
"execution_count": 36,
"id": "13916f66-c83f-44b8-b811-ac49b5190a62",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"204 MiB/s\n"
]
}
],
"source": [
"df = dd.read_parquet(\n",
" \"s3://coiled-runtime-ci/tpc-h/scale-10/lineitem\",\n",
" filesystem=\"arrow\",\n",
")\n",
"\n",
"with bandwidth():\n",
" with ThreadPoolExecutor(ncores) as e:\n",
" with dask.config.set(pool=e):\n",
" df.map_partitions(lambda df: df.head(1)).persist(scheduler=\"threads\")"
]
},
{
"cell_type": "markdown",
"id": "b114f560-f389-48f1-a946-aeaff2321969",
"metadata": {},
"source": [
"### 8 threads"
]
},
{
"cell_type": "code",
"execution_count": 37,
"id": "9ce04d23-b181-451a-8be7-a531b40d9372",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"134 MiB/s\n"
]
}
],
"source": [
"df = dd.read_parquet(\"s3://coiled-runtime-ci/tpc-h/scale-10/lineitem\")\n",
"\n",
"with bandwidth():\n",
" with ThreadPoolExecutor(ncores * 2) as e:\n",
" with dask.config.set(pool=e):\n",
" df.map_partitions(lambda df: df.head(1)).persist(scheduler=\"threads\")"
]
},
{
"cell_type": "code",
"execution_count": 38,
"id": "34848586-f3b1-4707-9e77-f1cdf283e0cb",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"131 MiB/s\n"
]
}
],
"source": [
"df = dd.read_parquet(\n",
" \"s3://coiled-runtime-ci/tpc-h/scale-10/lineitem\",\n",
" open_file_options={\"precache_options\": {\"method\": \"parquet\"}},\n",
")\n",
"\n",
"with bandwidth():\n",
" with ThreadPoolExecutor(ncores * 2) as e:\n",
" with dask.config.set(pool=e):\n",
" df.map_partitions(lambda df: df.head(1)).persist(scheduler=\"threads\")"
]
},
{
"cell_type": "code",
"execution_count": 39,
"id": "62994e45-4c73-44da-be77-1cd423fe0d85",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"219 MiB/s\n"
]
}
],
"source": [
"df = dd.read_parquet(\n",
" \"s3://coiled-runtime-ci/tpc-h/scale-10/lineitem\",\n",
" filesystem=\"arrow\",\n",
")\n",
"\n",
"with bandwidth():\n",
" with ThreadPoolExecutor(ncores * 2) as e:\n",
" with dask.config.set(pool=e):\n",
" df.map_partitions(lambda df: df.head(1)).persist(scheduler=\"threads\")"
]
},
{
"cell_type": "markdown",
"id": "03df3577-5303-4ed9-97df-7f9195587fab",
"metadata": {},
"source": [
"## Run locally"
]
},
{
"cell_type": "code",
"execution_count": 40,
"id": "5f1fac5d-fb18-4e91-8b32-f8a5164db539",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"45 MiB/s\n"
]
}
],
"source": [
"\n",
"with bandwidth():\n",
" import os\n",
" os.mkdir(\"data\")\n",
" for fn in filenames:\n",
" s3.get(fn, \"data/\")"
]
},
{
"cell_type": "code",
"execution_count": 41,
"id": "26f2dc98-fd0c-4774-8d51-9548f2171478",
"metadata": {},
"outputs": [],
"source": [
"import glob\n",
"local_filenames = glob.glob(\"data/*\")"
]
},
{
"cell_type": "code",
"execution_count": 42,
"id": "de17c5b7-b877-497a-aa63-e623c9785803",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"265 MiB/s\n"
]
}
],
"source": [
"with bandwidth():\n",
" with ThreadPoolExecutor(1) as e:\n",
" def load(fn):\n",
" pq.ParquetFile(fn, pre_buffer=True).read(\n",
" use_threads=True,\n",
" use_pandas_metadata=True,\n",
" )\n",
" list(e.map(load, local_filenames))\n"
]
},
{
"cell_type": "code",
"execution_count": 43,
"id": "2c21e60d-43c9-4a74-a4d8-57bb06dd8b37",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"344 MiB/s\n"
]
}
],
"source": [
"with bandwidth():\n",
" with ThreadPoolExecutor(ncores) as e:\n",
" def load(fn):\n",
" pq.ParquetFile(fn, pre_buffer=True).read(\n",
" use_threads=True,\n",
" use_pandas_metadata=True,\n",
" )\n",
" list(e.map(load, local_filenames))\n"
]
},
{
"cell_type": "code",
"execution_count": 45,
"id": "48ecfdd4-909c-4099-b23c-035523e848fa",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2218 MiB/s\n"
]
}
],
"source": [
"with bandwidth():\n",
" with ThreadPoolExecutor(1) as e:\n",
" def load(fn):\n",
" open(fn, mode=\"rb\").read()\n",
" list(e.map(load, local_filenames))\n"
]
},
{
"cell_type": "code",
"execution_count": 46,
"id": "e9ef6f90-dda5-4b02-84d8-72648f0c3881",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"5567 MiB/s\n"
]
}
],
"source": [
"with bandwidth():\n",
" with ThreadPoolExecutor(ncores) as e:\n",
" def load(fn):\n",
" open(fn, mode=\"rb\").read()\n",
" list(e.map(load, local_filenames))\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment