Skip to content

Instantly share code, notes, and snippets.

@mrocklin
Created May 15, 2017 18:46
Show Gist options
  • Save mrocklin/af211edc878fba454661829c7a96b3c8 to your computer and use it in GitHub Desktop.
Save mrocklin/af211edc878fba454661829c7a96b3c8 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<img src=\"http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg\"\n",
" align=\"right\"\n",
" width=\"30%\"\n",
" alt=\"Dask logo\">\n",
"\n",
"\n",
"\n",
"## Dask reuses existing Python APIs"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from dask.distributed import Client\n",
"client = Client('localhost:8786')\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### NumPy/Pandas"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import dask.array as da\n",
"\n",
"x = da.random.random((10000, 10000), chunks=(1000, 1000)).persist()\n",
"x"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"(x + x.T - x.mean())[::2, ::2].std(axis=0).compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### SKLearn APIs"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Code source: Gaël Varoquaux\n",
"# Modified for documentation by Jaques Grobler\n",
"# License: BSD 3 clause\n",
"\n",
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"\n",
"from sklearn import linear_model, decomposition, datasets\n",
"from sklearn.pipeline import Pipeline\n",
"# from sklearn.model_selection import GridSearchCV\n",
"from dask_searchcv import GridSearchCV # <<---- We add this line\n",
"\n",
"logistic = linear_model.LogisticRegression()\n",
"\n",
"pca = decomposition.PCA()\n",
"pipe = Pipeline(steps=[('pca', pca), ('logistic', logistic)])\n",
"\n",
"digits = datasets.load_digits()\n",
"X_digits = digits.data\n",
"y_digits = digits.target"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%%time\n",
"n_components = [20, 40, 64]\n",
"Cs = np.logspace(-4, 4, 3)\n",
"\n",
"#Parameters of pipelines can be set using ‘__’ separated parameter names:\n",
"\n",
"estimator = GridSearchCV(pipe,\n",
" dict(pca__n_components=n_components,\n",
" logistic__C=Cs))\n",
"estimator.fit(X_digits, y_digits)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### SKLearn with Joblib\n",
"\n",
"Scikit-learn code parallelizes internally with [Joblib](https://pythonhosted.org/joblib/parallel.html). Dask can hijack Joblib and take over as parallel computing system. This allows plain scikit-learn code to be parallelized across a cluster easily by wrapping existing code with a context manager."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%%time\n",
"from sklearn.model_selection import GridSearchCV # Back to sklearn\n",
"\n",
"from sklearn.externals import joblib\n",
"import distributed.joblib # hijack sklearn's existing parallel system\n",
"\n",
"n_components = [20, 40, 64]\n",
"Cs = np.logspace(-4, 4, 30)\n",
"\n",
"with joblib.parallel_backend('dask.distributed', \n",
" scheduler_host='localhost:8786'):\n",
" estimator = GridSearchCV(pipe,\n",
" dict(pca__n_components=n_components,\n",
" logistic__C=Cs))\n",
" estimator.fit(X_digits, y_digits)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Concurrent.futures (PEP 3148)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import time, random\n",
"\n",
"def inc(x):\n",
" time.sleep(random.random())\n",
" return x + 1\n",
"\n",
"\n",
"def dec(x):\n",
" time.sleep(random.random())\n",
" return x - 1\n",
"\n",
"\n",
"def add(x, y):\n",
" time.sleep(random.random())\n",
" return x + y"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from dask.distributed import as_completed\n",
"\n",
"data = range(100)\n",
"\n",
"futures = []\n",
"for x in data:\n",
" if x % 2 == 0:\n",
" future = client.submit(inc, x)\n",
" else:\n",
" future = client.submit(dec, x)\n",
" futures.append(future)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"done = as_completed(futures)\n",
"\n",
"while True:\n",
" try:\n",
" a = next(done)\n",
" b = next(done)\n",
" except StopIteration:\n",
" break\n",
" \n",
" future = client.submit(add, a, b)\n",
" done.add(future)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Async/Await"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"async def f():\n",
" total = 0\n",
" async with Client('localhost:8786', start=False, set_as_default=False) as client:\n",
" futures = client.map(inc, range(100))\n",
" async for future in as_completed(futures):\n",
" result = await future\n",
" total += result\n",
" print(total)\n",
" \n",
"from tornado.ioloop import IOLoop\n",
"IOLoop.current().add_callback(f)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Conclusions\n",
"\n",
"By reusing existing Python APIs and protocols, Dask enables the parallization of existing codebases with minimal rewriting and retraining."
]
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python [default]",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.2"
}
},
"nbformat": 4,
"nbformat_minor": 1
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment