Created
May 23, 2019 18:32
-
-
Save knaaptime/f111109a6416a9906a1b4fd8652a3e79 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from quilt.data.spatialucr.census import tracts_2010" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from quilt.data.spatialucr import census" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"/Users/knaaptime/anaconda3/envs/scratch/lib/python3.6/site-packages/sklearn/externals/six.py:31: DeprecationWarning: The module is deprecated in version 0.21 and will be removed in version 0.23 since we've dropped support for Python 2.7. Please rely on the official version of six (https://pypi.org/project/six/).\n", | |
" \"(https://pypi.org/project/six/).\", DeprecationWarning)\n", | |
"/Users/knaaptime/anaconda3/envs/scratch/lib/python3.6/site-packages/sklearn/externals/joblib/__init__.py:15: DeprecationWarning: sklearn.externals.joblib is deprecated in 0.21 and will be removed in 0.23. Please import this functionality directly from joblib, which can be installed with: pip install joblib. If this warning is raised when loading pickled models, you may need to re-serialize those models with scikit-learn 0.21+.\n", | |
" warnings.warn(msg, category=DeprecationWarning)\n" | |
] | |
} | |
], | |
"source": [ | |
"from geosnap.util import convert_gdf" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import os\n", | |
"def listdir_fullpath(d):\n", | |
" return [os.path.join(d, f) for f in os.listdir(d)]" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# load all state-level block dfs in parallel \n", | |
"blocks = dd.read_parquet(listdir_fullpath('/Users/knaaptime/projects/tiger/blocks_2000'))" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Test: dask vs. vanilla pandas" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"tracts = dd.from_pandas(tracts_2010(),npartitions = 50)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 8, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"27 s ± 5.91 s per loop (mean ± std. dev. of 7 runs, 1 loop each)\n" | |
] | |
} | |
], | |
"source": [ | |
"%%timeit\n", | |
"\n", | |
"# vanilla pandas\n", | |
"\n", | |
"convert_gdf(census.tracts_2010())" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 9, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"/Users/knaaptime/anaconda3/envs/scratch/lib/python3.6/site-packages/distributed/worker.py:3101: UserWarning: Large object of size 30.62 MB detected in task graph: \n", | |
" ( GEOID ... 9c977ee0cca2e')\n", | |
"Consider scattering large objects ahead of time\n", | |
"with client.scatter to reduce scheduler burden and \n", | |
"keep data on workers\n", | |
"\n", | |
" future = client.submit(func, big_data) # bad\n", | |
"\n", | |
" big_future = client.scatter(big_data) # good\n", | |
" future = client.submit(func, big_future) # good\n", | |
" % (format_bytes(len(b)), s)\n" | |
] | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"16.5 s ± 617 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n" | |
] | |
} | |
], | |
"source": [ | |
"%%timeit\n", | |
"\n", | |
"# parallelizing with dask threads\n", | |
"\n", | |
"c = tracts.map_partitions(convert_gdf,meta={'GEOID':'O', 'geometry': 'O'})\n", | |
"gpd.GeoDataFrame(c.compute())" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Test: parallel approaches" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Rocklin [says](https://stackoverflow.com/questions/31361721/python-dask-dataframe-support-for-trivially-parallelizable-row-apply) \n", | |
">As of version 0.6.0 dask.dataframes parallelizes with threads. Custom Python functions will not receive much benefit from thread-based parallelism. You could try processes instead" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 10, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"39 s ± 348 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n" | |
] | |
} | |
], | |
"source": [ | |
"%%timeit\n", | |
"\n", | |
"# parallelizing with dask processes\n", | |
"\n", | |
"c = tracts.map_partitions(convert_gdf,meta={'GEOID':'O', 'geometry': 'O'})\n", | |
"gpd.GeoDataFrame(c.compute(scheduler='processes'))" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Test: how many partitions" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Rocklin [says](https://stackoverflow.com/questions/46645477/what-is-the-role-of-npartitions-in-a-dask-dataframe) \n", | |
">Generally you want a few times more partitions than you have cores. Every task takes up a few hundred microseconds in the scheduler." | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Datashader [says](http://datashader.org/user_guide/10_Performance.html) \n", | |
">With dask on a single machine, a rule of thumb for the number of partitions to use is multiprocessing.cpu_count(), which allows Dask to use one thread per core for parallelizing computations" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 11, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"8" | |
] | |
}, | |
"execution_count": 11, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"import multiprocessing\n", | |
"multiprocessing.cpu_count()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"This machine has 8 cores, so lets try a few multiples" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Using 32 partitions" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 12, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"tracts32 = dd.from_pandas(tracts_2010(),npartitions = 32)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 13, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"15.8 s ± 163 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n" | |
] | |
} | |
], | |
"source": [ | |
"%%timeit\n", | |
"c = tracts32.map_partitions(convert_gdf,meta={'GEOID':'O', 'geometry': 'O'})\n", | |
"gpd.GeoDataFrame(c.compute())" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Using 16 partitions" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 14, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"tracts16 = dd.from_pandas(tracts_2010(),npartitions = 16)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 15, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"15.6 s ± 496 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n" | |
] | |
} | |
], | |
"source": [ | |
"%%timeit\n", | |
"c = tracts16.map_partitions(convert_gdf,meta={'GEOID':'O', 'geometry': 'O'})\n", | |
"gpd.GeoDataFrame(c.compute())" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Using 8 partitions" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 16, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"tracts8 = dd.from_pandas(tracts_2010(),npartitions = 8)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 17, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"15.7 s ± 116 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n" | |
] | |
} | |
], | |
"source": [ | |
"%%timeit\n", | |
"c = tracts8.map_partitions(convert_gdf,meta={'GEOID':'O', 'geometry': 'O'})\n", | |
"gpd.GeoDataFrame(c.compute())" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Results" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"- dask is faster than pandas by ~40%\n", | |
"- threads are way faster than processes for this task\n", | |
"- there's not much performance difference between different partitioning based on number of cores, but there's some small overhead when overshooting" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python [conda env:scratch]", | |
"language": "python", | |
"name": "conda-env-scratch-py" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.6.7" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment