Last active
April 18, 2024 16:02
-
-
Save rjzamora/c94f389717160c7e858dfe17926c6db3 to your computer and use it in GitHub Desktop.
Simple shuffling example with `cudf` and `ray`
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"id": "2740b37e-7828-4e49-9f9b-d3f5329c0e69", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"2024-04-18 08:58:03,923\tINFO worker.py:1749 -- Started a local Ray instance.\n" | |
] | |
} | |
], | |
"source": [ | |
"import cudf\n", | |
"import ray\n", | |
"import rmm\n", | |
"\n", | |
"from dask.dataframe.shuffle import shuffle_group\n", | |
"\n", | |
"\n", | |
"@ray.remote(num_gpus=1)\n", | |
"class GPUWorker:\n", | |
" def __init__(self, pool=True):\n", | |
" rmm.reinitialize(pool_allocator=pool)\n", | |
"\n", | |
" def sample_df(self, partition_id, size=1_000):\n", | |
" df = cudf.DataFrame({\"a\": range(size), \"b\": [partition_id] * size})\n", | |
" return ray.put(df)\n", | |
" \n", | |
" def shuffle_group(self, df_ref, by, nshards):\n", | |
" stage = 0\n", | |
" k = nshards\n", | |
" npartitions = nshards\n", | |
" ignore_index = False\n", | |
" nfinal = nshards\n", | |
" groups = shuffle_group(\n", | |
" ray.get(df_ref),\n", | |
" by,\n", | |
" stage,\n", | |
" k,\n", | |
" npartitions,\n", | |
" ignore_index,\n", | |
" nfinal,\n", | |
" )\n", | |
" # Put distinct groups separately\n", | |
" return {k: ray.put(v) for k, v in groups.items()}\n", | |
"\n", | |
" def collect_group(self, group_id_refs, partition_id):\n", | |
" group_id_dicts = ray.get(group_id_refs)\n", | |
"\n", | |
" dfs = []\n", | |
" for group_id_dict in group_id_dicts:\n", | |
" group_id = group_id_dict.get(partition_id, None)\n", | |
" group = ray.get(group_id)\n", | |
" if group is not None:\n", | |
" dfs.append(group)\n", | |
" \n", | |
" df = cudf.concat(dfs)\n", | |
" return ray.put(df)\n", | |
"\n", | |
" def len(self, partition_ref):\n", | |
" return len(ray.get(partition_ref))\n", | |
"\n", | |
"\n", | |
"def shuffle(partition_refs, by, workers):\n", | |
" n_workers = len(workers)\n", | |
" n_partitions = len(partition_refs)\n", | |
"\n", | |
" # Split partitions into groups\n", | |
" nshards = n_partitions # Assume partition count is preserved for now\n", | |
" shuffle_group_refs = [workers[n % n_workers].shuffle_group.remote(partition_refs[n], by, nshards) for n in range(n_partitions)]\n", | |
" \n", | |
" # Collect output data into output partitions\n", | |
" final_partition_refs = [workers[n % n_workers].collect_group.remote(shuffle_group_refs, n) for n in range(n_partitions)]\n", | |
" return final_partition_refs\n", | |
"\n", | |
"\n", | |
"def report_lens(partition_refs, workers):\n", | |
" n_workers = len(workers)\n", | |
" n_partitions = len(partition_refs)\n", | |
" result_refs = [workers[n % n_workers].len.remote(partition_refs[n]) for n in range(n_partitions)]\n", | |
" return ray.get(result_refs)\n", | |
"\n", | |
"\n", | |
"# Create initial workers\n", | |
"n_workers = 8\n", | |
"workers = [GPUWorker.remote() for _ in range(n_workers)]" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"id": "cb597532-1425-4a30-bcea-dcc461f2ccdb", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"CPU times: user 70.3 ms, sys: 38.9 ms, total: 109 ms\n", | |
"Wall time: 7.46 s\n" | |
] | |
} | |
], | |
"source": [ | |
"%%time\n", | |
"\n", | |
"# Create initial partition data\n", | |
"size = 1_000_000\n", | |
"n_partitions = n_workers * 2\n", | |
"partition_refs = [workers[n % n_workers].sample_df.remote(n, size=size) for n in range(n_partitions)]\n", | |
"status = ray.wait(partition_refs, num_returns=n_partitions, timeout=None)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"id": "23d0af0f-1050-42c8-9645-963b4fe89e1a", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"shuffled partition lens:\n", | |
"[1002656, 994576, 1003312, 1000240, 998304, 1001872, 1004880, 996272, 989872, 997824, 999296, 1004576, 1002192, 1000592, 1002464, 1001072]\n", | |
"CPU times: user 95.6 ms, sys: 51.7 ms, total: 147 ms\n", | |
"Wall time: 319 ms\n" | |
] | |
} | |
], | |
"source": [ | |
"%%time\n", | |
"\n", | |
"# Shuffle the data by column \"a\" and report final partition lens\n", | |
"shuffled_partitions = shuffle(partition_refs, by=[\"a\"], workers=workers)\n", | |
"shuffled_lens = report_lens(shuffled_partitions, workers)\n", | |
"print(f\"Shuffled partition lens:\\n{shuffled_lens}\")\n", | |
"assert sum(shuffled_lens) == size * n_partitions" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"id": "a2bd66c1-efd1-4b1c-8b5a-6e8cb7b91ac7", | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3 (ipykernel)", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.11.8" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 5 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment