Created
May 15, 2017 18:46
-
-
Save mrocklin/af211edc878fba454661829c7a96b3c8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"<img src=\"http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg\"\n", | |
" align=\"right\"\n", | |
" width=\"30%\"\n", | |
" alt=\"Dask logo\">\n", | |
"\n", | |
"\n", | |
"\n", | |
"## Dask reuses existing Python APIs" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"from dask.distributed import Client\n", | |
"client = Client('localhost:8786')\n", | |
"client" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### NumPy/Pandas" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"import dask.array as da\n", | |
"\n", | |
"x = da.random.random((10000, 10000), chunks=(1000, 1000)).persist()\n", | |
"x" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"(x + x.T - x.mean())[::2, ::2].std(axis=0).compute()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### SKLearn APIs" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"# Code source: Gaël Varoquaux\n", | |
"# Modified for documentation by Jaques Grobler\n", | |
"# License: BSD 3 clause\n", | |
"\n", | |
"import numpy as np\n", | |
"import matplotlib.pyplot as plt\n", | |
"\n", | |
"from sklearn import linear_model, decomposition, datasets\n", | |
"from sklearn.pipeline import Pipeline\n", | |
"# from sklearn.model_selection import GridSearchCV\n", | |
"from dask_searchcv import GridSearchCV # <<---- We add this line\n", | |
"\n", | |
"logistic = linear_model.LogisticRegression()\n", | |
"\n", | |
"pca = decomposition.PCA()\n", | |
"pipe = Pipeline(steps=[('pca', pca), ('logistic', logistic)])\n", | |
"\n", | |
"digits = datasets.load_digits()\n", | |
"X_digits = digits.data\n", | |
"y_digits = digits.target" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"%%time\n", | |
"n_components = [20, 40, 64]\n", | |
"Cs = np.logspace(-4, 4, 3)\n", | |
"\n", | |
"#Parameters of pipelines can be set using ‘__’ separated parameter names:\n", | |
"\n", | |
"estimator = GridSearchCV(pipe,\n", | |
" dict(pca__n_components=n_components,\n", | |
" logistic__C=Cs))\n", | |
"estimator.fit(X_digits, y_digits)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### SKLearn with Joblib\n", | |
"\n", | |
"Scikit-learn code parallelizes internally with [Joblib](https://pythonhosted.org/joblib/parallel.html). Dask can hijack Joblib and take over as parallel computing system. This allows plain scikit-learn code to be parallelized across a cluster easily by wrapping existing code with a context manager." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"%%time\n", | |
"from sklearn.model_selection import GridSearchCV # Back to sklearn\n", | |
"\n", | |
"from sklearn.externals import joblib\n", | |
"import distributed.joblib # hijack sklearn's existing parallel system\n", | |
"\n", | |
"n_components = [20, 40, 64]\n", | |
"Cs = np.logspace(-4, 4, 30)\n", | |
"\n", | |
"with joblib.parallel_backend('dask.distributed', \n", | |
" scheduler_host='localhost:8786'):\n", | |
" estimator = GridSearchCV(pipe,\n", | |
" dict(pca__n_components=n_components,\n", | |
" logistic__C=Cs))\n", | |
" estimator.fit(X_digits, y_digits)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Concurrent.futures (PEP 3148)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"import time, random\n", | |
"\n", | |
"def inc(x):\n", | |
" time.sleep(random.random())\n", | |
" return x + 1\n", | |
"\n", | |
"\n", | |
"def dec(x):\n", | |
" time.sleep(random.random())\n", | |
" return x - 1\n", | |
"\n", | |
"\n", | |
"def add(x, y):\n", | |
" time.sleep(random.random())\n", | |
" return x + y" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"from dask.distributed import as_completed\n", | |
"\n", | |
"data = range(100)\n", | |
"\n", | |
"futures = []\n", | |
"for x in data:\n", | |
" if x % 2 == 0:\n", | |
" future = client.submit(inc, x)\n", | |
" else:\n", | |
" future = client.submit(dec, x)\n", | |
" futures.append(future)\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"done = as_completed(futures)\n", | |
"\n", | |
"while True:\n", | |
" try:\n", | |
" a = next(done)\n", | |
" b = next(done)\n", | |
" except StopIteration:\n", | |
" break\n", | |
" \n", | |
" future = client.submit(add, a, b)\n", | |
" done.add(future)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Async/Await" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"async def f():\n", | |
" total = 0\n", | |
" async with Client('localhost:8786', start=False, set_as_default=False) as client:\n", | |
" futures = client.map(inc, range(100))\n", | |
" async for future in as_completed(futures):\n", | |
" result = await future\n", | |
" total += result\n", | |
" print(total)\n", | |
" \n", | |
"from tornado.ioloop import IOLoop\n", | |
"IOLoop.current().add_callback(f)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Conclusions\n", | |
"\n", | |
"By reusing existing Python APIs and protocols, Dask enables the parallization of existing codebases with minimal rewriting and retraining." | |
] | |
} | |
], | |
"metadata": { | |
"anaconda-cloud": {}, | |
"kernelspec": { | |
"display_name": "Python [default]", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.5.2" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 1 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment