Skip to content

Instantly share code, notes, and snippets.

@igorbrigadir
Created February 24, 2020 11:48
Show Gist options
  • Save igorbrigadir/3bcc5170a93c531fa3973c5d9df24e62 to your computer and use it in GitHub Desktop.
Save igorbrigadir/3bcc5170a93c531fa3973c5d9df24e62 to your computer and use it in GitHub Desktop.
Resumable, Multiprocessing apply for Pandas Dataframes
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"# Split up a pandas dataframe into chunks,\n",
"# perform some function on each row, \n",
"# stick it back together, keeping track of progress and \n",
"# making it resumable."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"from tqdm.notebook import tqdm\n",
"from multiprocessing import Pool\n",
"import shelve"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [],
"source": [
"def _df_split_apply(tup_arg):\n",
" split_index, subset, func = tup_arg\n",
" r = subset.apply(func, axis=1)\n",
" return (split_index, r)\n",
"\n",
"def df_multiprocess(df, processes, chunk_size, func, dataset_name):\n",
" # shelve saves a copy of the entire dataframe to disk, so be aware of that.\n",
" with shelve.open('%s_%s_%s_results.tmp' % (dataset_name, func.__name__, chunk_size)) as results:\n",
" \n",
" pbar = tqdm(total=len(df), position=0)\n",
"\n",
" # Resume\n",
" #finished_chunks = set([int(k) for k in results.keys()])\n",
" #pbar.desc = \"Resuming\"\n",
" #for k in results.keys():\n",
" # pbar.update(len(results[str(k)][1]))\n",
"\n",
" # No Resume:\n",
" finished_chunks = set()\n",
"\n",
" pool_data = ((index, df[i:i + chunk_size], func) for index, i in enumerate(range(0, len(df), chunk_size)) if index not in finished_chunks)\n",
" \n",
" print(int(len(df) / chunk_size), \"parts.\", chunk_size, \"per part.\", \"Using\", processes, \"processes\")\n",
" pbar.desc = \"Working\" # Progress Bar Description\n",
" \n",
" with Pool(processes) as pool:\n",
" for i, result in enumerate(pool.imap_unordered(_df_split_apply, pool_data, 2)):\n",
" #Save\n",
" results[str(result[0])] = result \n",
" pbar.update(len(result[1]))\n",
" pbar.close()\n",
"\n",
" print(\"Saving...\")\n",
" \n",
" keylist = sorted([int(k) for k in results.keys()])\n",
" df = pd.concat([results[str(k)][1] for k in keylist], sort=True)\n",
" \n",
" print(\"Done.\")\n",
" return df"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [],
"source": [
"# Load Data \n",
"df = pd.read_csv(\"data/sample.tar.xz\", dtype=str) # 50,000 records, "
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [],
"source": [
"# Some function that's applied to every row:\n",
"\n",
"def dummy_func(row):\n",
" row['new_field'] = \"foo\"\n",
" return row"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 1min 1s, sys: 813 ms, total: 1min 1s\n",
"Wall time: 1min 1s\n"
]
}
],
"source": [
"%%time \n",
"\n",
"df_new = df.apply(dummy_func, axis=1)"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "da9f874f87c444698ca5f4bd8b0cffed",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"HBox(children=(FloatProgress(value=0.0, max=50001.0), HTML(value='')))"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"500 parts. 100 per part. Using 8 processes\n",
"\n",
"Saving...\n",
"Done.\n",
"CPU times: user 4.89 s, sys: 905 ms, total: 5.8 s\n",
"Wall time: 16.7 s\n"
]
}
],
"source": [
"%%time\n",
"\n",
"df_new = df_multiprocess(df=df, processes=8, chunk_size=100, func=dummy_func, dataset_name=\"df\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask\n",
"import dask.dataframe as dd\n",
"from dask.distributed import Client\n",
"client = Client(n_workers=4, threads_per_worker=2, processes=False, memory_limit='5GB')\n",
"client"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ddf = dask.dataframe.from_pandas(df, chunksize=100, sort=False, name=\"df\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"df_new = ddf.apply(dummy_func, axis=1).compute()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment