Skip to content

Instantly share code, notes, and snippets.

@knaaptime
Created May 23, 2019 18:32
Show Gist options
  • Save knaaptime/f111109a6416a9906a1b4fd8652a3e79 to your computer and use it in GitHub Desktop.
Save knaaptime/f111109a6416a9906a1b4fd8652a3e79 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"from quilt.data.spatialucr.census import tracts_2010"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"from quilt.data.spatialucr import census"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/Users/knaaptime/anaconda3/envs/scratch/lib/python3.6/site-packages/sklearn/externals/six.py:31: DeprecationWarning: The module is deprecated in version 0.21 and will be removed in version 0.23 since we've dropped support for Python 2.7. Please rely on the official version of six (https://pypi.org/project/six/).\n",
" \"(https://pypi.org/project/six/).\", DeprecationWarning)\n",
"/Users/knaaptime/anaconda3/envs/scratch/lib/python3.6/site-packages/sklearn/externals/joblib/__init__.py:15: DeprecationWarning: sklearn.externals.joblib is deprecated in 0.21 and will be removed in 0.23. Please import this functionality directly from joblib, which can be installed with: pip install joblib. If this warning is raised when loading pickled models, you may need to re-serialize those models with scikit-learn 0.21+.\n",
" warnings.warn(msg, category=DeprecationWarning)\n"
]
}
],
"source": [
"from geosnap.util import convert_gdf"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"def listdir_fullpath(d):\n",
" return [os.path.join(d, f) for f in os.listdir(d)]"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"# load all state-level block dfs in parallel \n",
"blocks = dd.read_parquet(listdir_fullpath('/Users/knaaptime/projects/tiger/blocks_2000'))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Test: dask vs. vanilla pandas"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"tracts = dd.from_pandas(tracts_2010(),npartitions = 50)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"27 s ± 5.91 s per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%%timeit\n",
"\n",
"# vanilla pandas\n",
"\n",
"convert_gdf(census.tracts_2010())"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/Users/knaaptime/anaconda3/envs/scratch/lib/python3.6/site-packages/distributed/worker.py:3101: UserWarning: Large object of size 30.62 MB detected in task graph: \n",
" ( GEOID ... 9c977ee0cca2e')\n",
"Consider scattering large objects ahead of time\n",
"with client.scatter to reduce scheduler burden and \n",
"keep data on workers\n",
"\n",
" future = client.submit(func, big_data) # bad\n",
"\n",
" big_future = client.scatter(big_data) # good\n",
" future = client.submit(func, big_future) # good\n",
" % (format_bytes(len(b)), s)\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"16.5 s ± 617 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%%timeit\n",
"\n",
"# parallelizing with dask threads\n",
"\n",
"c = tracts.map_partitions(convert_gdf,meta={'GEOID':'O', 'geometry': 'O'})\n",
"gpd.GeoDataFrame(c.compute())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Test: parallel approaches"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Rocklin [says](https://stackoverflow.com/questions/31361721/python-dask-dataframe-support-for-trivially-parallelizable-row-apply) \n",
">As of version 0.6.0 dask.dataframes parallelizes with threads. Custom Python functions will not receive much benefit from thread-based parallelism. You could try processes instead"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"39 s ± 348 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%%timeit\n",
"\n",
"# parallelizing with dask processes\n",
"\n",
"c = tracts.map_partitions(convert_gdf,meta={'GEOID':'O', 'geometry': 'O'})\n",
"gpd.GeoDataFrame(c.compute(scheduler='processes'))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Test: how many partitions"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Rocklin [says](https://stackoverflow.com/questions/46645477/what-is-the-role-of-npartitions-in-a-dask-dataframe) \n",
">Generally you want a few times more partitions than you have cores. Every task takes up a few hundred microseconds in the scheduler."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Datashader [says](http://datashader.org/user_guide/10_Performance.html) \n",
">With dask on a single machine, a rule of thumb for the number of partitions to use is multiprocessing.cpu_count(), which allows Dask to use one thread per core for parallelizing computations"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"8"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import multiprocessing\n",
"multiprocessing.cpu_count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This machine has 8 cores, so lets try a few multiples"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Using 32 partitions"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"tracts32 = dd.from_pandas(tracts_2010(),npartitions = 32)"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"15.8 s ± 163 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%%timeit\n",
"c = tracts32.map_partitions(convert_gdf,meta={'GEOID':'O', 'geometry': 'O'})\n",
"gpd.GeoDataFrame(c.compute())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Using 16 partitions"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"tracts16 = dd.from_pandas(tracts_2010(),npartitions = 16)"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"15.6 s ± 496 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%%timeit\n",
"c = tracts16.map_partitions(convert_gdf,meta={'GEOID':'O', 'geometry': 'O'})\n",
"gpd.GeoDataFrame(c.compute())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Using 8 partitions"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [],
"source": [
"tracts8 = dd.from_pandas(tracts_2010(),npartitions = 8)"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"15.7 s ± 116 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%%timeit\n",
"c = tracts8.map_partitions(convert_gdf,meta={'GEOID':'O', 'geometry': 'O'})\n",
"gpd.GeoDataFrame(c.compute())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Results"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"- dask is faster than pandas by ~40%\n",
"- threads are way faster than processes for this task\n",
"- there's not much performance difference between different partitioning based on number of cores, but there's some small overhead when overshooting"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python [conda env:scratch]",
"language": "python",
"name": "conda-env-scratch-py"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment