Skip to content

Instantly share code, notes, and snippets.

@rjzamora
Last active April 18, 2024 16:02
Show Gist options
  • Save rjzamora/c94f389717160c7e858dfe17926c6db3 to your computer and use it in GitHub Desktop.
Save rjzamora/c94f389717160c7e858dfe17926c6db3 to your computer and use it in GitHub Desktop.
Simple shuffling example with `cudf` and `ray`
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "2740b37e-7828-4e49-9f9b-d3f5329c0e69",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-04-18 08:58:03,923\tINFO worker.py:1749 -- Started a local Ray instance.\n"
]
}
],
"source": [
"import cudf\n",
"import ray\n",
"import rmm\n",
"\n",
"from dask.dataframe.shuffle import shuffle_group\n",
"\n",
"\n",
"@ray.remote(num_gpus=1)\n",
"class GPUWorker:\n",
" def __init__(self, pool=True):\n",
" rmm.reinitialize(pool_allocator=pool)\n",
"\n",
" def sample_df(self, partition_id, size=1_000):\n",
" df = cudf.DataFrame({\"a\": range(size), \"b\": [partition_id] * size})\n",
" return ray.put(df)\n",
" \n",
" def shuffle_group(self, df_ref, by, nshards):\n",
" stage = 0\n",
" k = nshards\n",
" npartitions = nshards\n",
" ignore_index = False\n",
" nfinal = nshards\n",
" groups = shuffle_group(\n",
" ray.get(df_ref),\n",
" by,\n",
" stage,\n",
" k,\n",
" npartitions,\n",
" ignore_index,\n",
" nfinal,\n",
" )\n",
" # Put distinct groups separately\n",
" return {k: ray.put(v) for k, v in groups.items()}\n",
"\n",
" def collect_group(self, group_id_refs, partition_id):\n",
" group_id_dicts = ray.get(group_id_refs)\n",
"\n",
" dfs = []\n",
" for group_id_dict in group_id_dicts:\n",
" group_id = group_id_dict.get(partition_id, None)\n",
" group = ray.get(group_id)\n",
" if group is not None:\n",
" dfs.append(group)\n",
" \n",
" df = cudf.concat(dfs)\n",
" return ray.put(df)\n",
"\n",
" def len(self, partition_ref):\n",
" return len(ray.get(partition_ref))\n",
"\n",
"\n",
"def shuffle(partition_refs, by, workers):\n",
" n_workers = len(workers)\n",
" n_partitions = len(partition_refs)\n",
"\n",
" # Split partitions into groups\n",
" nshards = n_partitions # Assume partition count is preserved for now\n",
" shuffle_group_refs = [workers[n % n_workers].shuffle_group.remote(partition_refs[n], by, nshards) for n in range(n_partitions)]\n",
" \n",
" # Collect output data into output partitions\n",
" final_partition_refs = [workers[n % n_workers].collect_group.remote(shuffle_group_refs, n) for n in range(n_partitions)]\n",
" return final_partition_refs\n",
"\n",
"\n",
"def report_lens(partition_refs, workers):\n",
" n_workers = len(workers)\n",
" n_partitions = len(partition_refs)\n",
" result_refs = [workers[n % n_workers].len.remote(partition_refs[n]) for n in range(n_partitions)]\n",
" return ray.get(result_refs)\n",
"\n",
"\n",
"# Create initial workers\n",
"n_workers = 8\n",
"workers = [GPUWorker.remote() for _ in range(n_workers)]"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "cb597532-1425-4a30-bcea-dcc461f2ccdb",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 70.3 ms, sys: 38.9 ms, total: 109 ms\n",
"Wall time: 7.46 s\n"
]
}
],
"source": [
"%%time\n",
"\n",
"# Create initial partition data\n",
"size = 1_000_000\n",
"n_partitions = n_workers * 2\n",
"partition_refs = [workers[n % n_workers].sample_df.remote(n, size=size) for n in range(n_partitions)]\n",
"status = ray.wait(partition_refs, num_returns=n_partitions, timeout=None)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "23d0af0f-1050-42c8-9645-963b4fe89e1a",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"shuffled partition lens:\n",
"[1002656, 994576, 1003312, 1000240, 998304, 1001872, 1004880, 996272, 989872, 997824, 999296, 1004576, 1002192, 1000592, 1002464, 1001072]\n",
"CPU times: user 95.6 ms, sys: 51.7 ms, total: 147 ms\n",
"Wall time: 319 ms\n"
]
}
],
"source": [
"%%time\n",
"\n",
"# Shuffle the data by column \"a\" and report final partition lens\n",
"shuffled_partitions = shuffle(partition_refs, by=[\"a\"], workers=workers)\n",
"shuffled_lens = report_lens(shuffled_partitions, workers)\n",
"print(f\"Shuffled partition lens:\\n{shuffled_lens}\")\n",
"assert sum(shuffled_lens) == size * n_partitions"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a2bd66c1-efd1-4b1c-8b5a-6e8cb7b91ac7",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.8"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment