Created
February 24, 2020 11:48
-
-
Save igorbrigadir/3bcc5170a93c531fa3973c5d9df24e62 to your computer and use it in GitHub Desktop.
Resumable, Multiprocessing apply for Pandas Dataframes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "cells": [ | |
| { | |
| "cell_type": "code", | |
| "execution_count": 1, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "# Split up a pandas dataframe into chunks,\n", | |
| "# perform some function on each row, \n", | |
| "# stick it back together, keeping track of progress and \n", | |
| "# making it resumable." | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 2, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "import pandas as pd\n", | |
| "from tqdm.notebook import tqdm\n", | |
| "from multiprocessing import Pool\n", | |
| "import shelve" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 16, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "def _df_split_apply(tup_arg):\n", | |
| " split_index, subset, func = tup_arg\n", | |
| " r = subset.apply(func, axis=1)\n", | |
| " return (split_index, r)\n", | |
| "\n", | |
| "def df_multiprocess(df, processes, chunk_size, func, dataset_name):\n", | |
| " # shelve saves a copy of the entire dataframe to disk, so be aware of that.\n", | |
| " with shelve.open('%s_%s_%s_results.tmp' % (dataset_name, func.__name__, chunk_size)) as results:\n", | |
| " \n", | |
| " pbar = tqdm(total=len(df), position=0)\n", | |
| "\n", | |
| " # Resume\n", | |
| " #finished_chunks = set([int(k) for k in results.keys()])\n", | |
| " #pbar.desc = \"Resuming\"\n", | |
| " #for k in results.keys():\n", | |
| " # pbar.update(len(results[str(k)][1]))\n", | |
| "\n", | |
| " # No Resume:\n", | |
| " finished_chunks = set()\n", | |
| "\n", | |
| " pool_data = ((index, df[i:i + chunk_size], func) for index, i in enumerate(range(0, len(df), chunk_size)) if index not in finished_chunks)\n", | |
| " \n", | |
| " print(int(len(df) / chunk_size), \"parts.\", chunk_size, \"per part.\", \"Using\", processes, \"processes\")\n", | |
| " pbar.desc = \"Working\" # Progress Bar Description\n", | |
| " \n", | |
| " with Pool(processes) as pool:\n", | |
| " for i, result in enumerate(pool.imap_unordered(_df_split_apply, pool_data, 2)):\n", | |
| " #Save\n", | |
| " results[str(result[0])] = result \n", | |
| " pbar.update(len(result[1]))\n", | |
| " pbar.close()\n", | |
| "\n", | |
| " print(\"Saving...\")\n", | |
| " \n", | |
| " keylist = sorted([int(k) for k in results.keys()])\n", | |
| " df = pd.concat([results[str(k)][1] for k in keylist], sort=True)\n", | |
| " \n", | |
| " print(\"Done.\")\n", | |
| " return df" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 25, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "# Load Data \n", | |
| "df = pd.read_csv(\"data/sample.tar.xz\", dtype=str) # 50,000 records, " | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 26, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "# Some function that's applied to every row:\n", | |
| "\n", | |
| "def dummy_func(row):\n", | |
| " row['new_field'] = \"foo\"\n", | |
| " return row" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 27, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "CPU times: user 1min 1s, sys: 813 ms, total: 1min 1s\n", | |
| "Wall time: 1min 1s\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "%%time \n", | |
| "\n", | |
| "df_new = df.apply(dummy_func, axis=1)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 28, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "application/vnd.jupyter.widget-view+json": { | |
| "model_id": "da9f874f87c444698ca5f4bd8b0cffed", | |
| "version_major": 2, | |
| "version_minor": 0 | |
| }, | |
| "text/plain": [ | |
| "HBox(children=(FloatProgress(value=0.0, max=50001.0), HTML(value='')))" | |
| ] | |
| }, | |
| "metadata": {}, | |
| "output_type": "display_data" | |
| }, | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "500 parts. 100 per part. Using 8 processes\n", | |
| "\n", | |
| "Saving...\n", | |
| "Done.\n", | |
| "CPU times: user 4.89 s, sys: 905 ms, total: 5.8 s\n", | |
| "Wall time: 16.7 s\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "%%time\n", | |
| "\n", | |
| "df_new = df_multiprocess(df=df, processes=8, chunk_size=100, func=dummy_func, dataset_name=\"df\")" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "import dask\n", | |
| "import dask.dataframe as dd\n", | |
| "from dask.distributed import Client\n", | |
| "client = Client(n_workers=4, threads_per_worker=2, processes=False, memory_limit='5GB')\n", | |
| "client" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "ddf = dask.dataframe.from_pandas(df, chunksize=100, sort=False, name=\"df\")" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "%%time\n", | |
| "\n", | |
| "df_new = ddf.apply(dummy_func, axis=1).compute()" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [] | |
| } | |
| ], | |
| "metadata": { | |
| "kernelspec": { | |
| "display_name": "Python 3", | |
| "language": "python", | |
| "name": "python3" | |
| }, | |
| "language_info": { | |
| "codemirror_mode": { | |
| "name": "ipython", | |
| "version": 3 | |
| }, | |
| "file_extension": ".py", | |
| "mimetype": "text/x-python", | |
| "name": "python", | |
| "nbconvert_exporter": "python", | |
| "pygments_lexer": "ipython3", | |
| "version": "3.7.4" | |
| } | |
| }, | |
| "nbformat": 4, | |
| "nbformat_minor": 2 | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment