Skip to content

Instantly share code, notes, and snippets.

@rjzamora
Created May 28, 2024 19:49
Show Gist options
  • Save rjzamora/7d35b525e22d2cbfcfc9cb8cced37325 to your computer and use it in GitHub Desktop.
Save rjzamora/7d35b525e22d2cbfcfc9cb8cced37325 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "9b31963c-b7dc-4330-96c4-3d70fc0ad893",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/datasets/rzamora/miniconda3/envs/cudf_dev_2406/lib/python3.11/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n",
" from .autonotebook import tqdm as notebook_tqdm\n",
"2024-05-28 12:43:48,677\tINFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.\n",
"2024-05-28 12:43:49,394\tINFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.\n"
]
}
],
"source": [
"import fsspec\n",
"import daft\n",
"import cudf\n",
"import dask.dataframe as dd\n",
"\n",
"# Use fsspec to get directory listing\n",
"path = \"s3://curator-data/1_tb_dataset\"\n",
"fs = fsspec.core.get_fs_token_paths(path)[0]\n",
"all_paths = fs.ls(path)\n",
"\n",
"def read_partition_cudf(path_list):\n",
" # Default cudf behavior for reading\n",
" # a list of files from s3. This will\n",
" # typically serialize data transfer,\n",
" # and is expected to be slow.\n",
" paths = [\"s3://\" + p for p in path_list]\n",
" return cudf.read_json(paths, lines=True)\n",
"\n",
"def read_partition_fsspec(path_list, blocksize=4_000_000):\n",
" # Use cat_ranges to transfer data for\n",
" # multiple files at once. Then read\n",
" # in-memory bytes with cudf. This should\n",
" # be the \"fastest\" way to use fsspec.\n",
" paths, starts, ends = [], [], []\n",
" for i, size in enumerate(fs.sizes(path_list)):\n",
" blocksize = blocksize or size\n",
" for j in range(0, size, blocksize):\n",
" paths.append(all_paths[i])\n",
" starts.append(j)\n",
" ends.append(min(j + blocksize, size))\n",
" return cudf.read_json(\n",
" b\"\".join(fs.cat_ranges(paths, starts, ends)),\n",
" lines=True,\n",
" )\n",
"\n",
"def read_partition_daft(path_list):\n",
" # Read with daft and convert to pandas via arrow.\n",
" # Daft is known to interact with s3 efficiently,\n",
" # but this is probably not how you get good perf.\n",
" paths = [\"s3://\" + p for p in path_list]\n",
" return daft.read_json(paths).to_arrow().to_pandas()"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "0a30eab9-4067-45f6-9872-2d2744812c7e",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 216 ms, sys: 320 ms, total: 536 ms\n",
"Wall time: 3 s\n"
]
}
],
"source": [
"%%time\n",
"\n",
"files_per_partition = 32\n",
"inputs = [\n",
" all_paths[start:start + files_per_partition]\n",
" for start in range(0, len(all_paths), files_per_partition)\n",
"]\n",
"meta = cudf.read_json(\"s3://\" + all_paths[0], lines=True)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "52be16c0-785d-4a59-9fba-8c2e3949fed3",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 3.89 s, sys: 1.93 s, total: 5.82 s\n",
"Wall time: 1min\n"
]
},
{
"data": {
"text/plain": [
"188819"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"%%time\n",
"\n",
"df1 = dd.from_map(read_partition_cudf, inputs, meta=meta, enforce_metadata=False)\n",
"len(df1.partitions[0].compute())"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "9a467ef9-a0b3-4f98-b169-3f196bcbe7c4",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 3.57 s, sys: 1.89 s, total: 5.46 s\n",
"Wall time: 12.9 s\n"
]
},
{
"data": {
"text/plain": [
"188819"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"\n",
"df2 = dd.from_map(read_partition_fsspec, inputs, meta=meta, enforce_metadata=False)\n",
"len(df2.partitions[0].compute())"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "fe161973-651b-44fb-9190-5367982e3c81",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 9.56 s, sys: 6.04 s, total: 15.6 s\n",
"Wall time: 25.4 s\n"
]
},
{
"data": {
"text/plain": [
"188819"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"\n",
"df3 = dd.from_map(read_partition_daft, inputs, meta=meta, enforce_metadata=False)\n",
"len(df3.partitions[0].compute())"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "dab4ce7b-87e1-4c05-a9df-b0cf48d732f0",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.9"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment