Created
November 22, 2016 22:47
-
-
Save mrocklin/ada85ef06d625947f7b34886fd2710f8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"<img src=\"http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg\" \n", | |
" width=\"30%\" \n", | |
" align=right\n", | |
" alt=\"Dask logo\">\n", | |
"\n", | |
"Custom Workflows\n", | |
"------------------\n", | |
"\n", | |
"We submit tasks directly to the task scheduler. This demonstrates the flexibility that can be achieved with the `submit` function and normal Python for loops.\n", | |
"\n", | |
"Later on we map functions across Python queues to construct data processing pipelines." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"from dask.distributed import Client, progress\n", | |
"client = Client('localhost:8786')\n", | |
"client" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"from time import sleep\n", | |
"\n", | |
"def inc(x):\n", | |
" from random import random\n", | |
" sleep(random())\n", | |
" return x + 1\n", | |
"\n", | |
"def double(x):\n", | |
" from random import random\n", | |
" sleep(random())\n", | |
" return 2 * x\n", | |
" \n", | |
"def add(x, y):\n", | |
" from random import random\n", | |
" sleep(random())\n", | |
" return x + y " | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"inc(1)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"future = client.submit(inc, 1) # returns immediately with pending future\n", | |
"future" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"future # scheduler and client talk constantly" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"future.result()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Submit many tasks\n", | |
"\n", | |
"We submit many tasks that depend on each other in a normal Python for loop" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"%%time\n", | |
"zs = []\n", | |
"for i in range(256):\n", | |
" x = client.submit(inc, i) # x = inc(i)\n", | |
" y = client.submit(double, x) # y = inc(x)\n", | |
" z = client.submit(add, x, y) # z = inc(y)\n", | |
" zs.append(z)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"client.gather(zs)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Custom computation: Tree summation\n", | |
"\n", | |
"As an example of a non-trivial algorithm, consider the classic tree reduction. We accomplish this with a nested for loop and a bit of normal Python logic.\n", | |
"\n", | |
"```\n", | |
"finish total single output\n", | |
" ^ / \\\n", | |
" | c1 c2 neighbors merge\n", | |
" | / \\ / \\\n", | |
" | b1 b2 b3 b4 neighbors merge\n", | |
" ^ / \\ / \\ / \\ / \\\n", | |
"start a1 a2 a3 a4 a5 a6 a7 a8 many inputs\n", | |
"```" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"L = zs\n", | |
"while len(L) > 1:\n", | |
" new_L = []\n", | |
" for i in range(0, len(L), 2):\n", | |
" future = client.submit(add, L[i], L[i + 1]) # add neighbors\n", | |
" new_L.append(future)\n", | |
" L = new_L # swap old list for new\n", | |
" \n", | |
"progress(L)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"L" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"client.gather(L)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"collapsed": true | |
}, | |
"source": [ | |
"Example with data streams\n", | |
"----------------------------\n", | |
"\n", | |
"The executor can map functions over lists or queues. This is nothing more than calling `submit` many times. We can chain maps on queues together to construct simple data processing pipelines.\n", | |
"\n", | |
"All of this logic happens on the client-side. None of this logic was hard-coded into the scheduler. This simple streaming system is a good example of the kind of system that becomes easy for users to build when given access to custom task scheduling." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"from queue import Queue\n", | |
"from threading import Thread\n", | |
"\n", | |
"def multiplex(n, q, **kwargs):\n", | |
" \"\"\" Convert one queue into several equivalent Queues\n", | |
" \n", | |
" >>> q1, q2, q3 = multiplex(3, in_q)\n", | |
" \"\"\"\n", | |
" out_queues = [Queue(**kwargs) for i in range(n)]\n", | |
" def f():\n", | |
" while True:\n", | |
" x = q.get()\n", | |
" for out_q in out_queues:\n", | |
" out_q.put(x)\n", | |
" t = Thread(target=f)\n", | |
" t.daemon = True\n", | |
" t.start()\n", | |
" return out_queues " | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"```\n", | |
" ----inc---->\n", | |
" / \\ \n", | |
"in_q --> q \\_add__ results\n", | |
" \\ / \n", | |
" ---double-->/\n", | |
"```" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"in_q = Queue()\n", | |
"q = client.scatter(in_q)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"in_q.put(1)\n", | |
"q.get()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"q_1, q_2 = multiplex(2, q)\n", | |
"\n", | |
"inc_q = client.map(inc, q_1)\n", | |
"double_q = client.map(double, q_2)\n", | |
"\n", | |
"add_q = client.map(add, inc_q, double_q)\n", | |
"\n", | |
"out_q = client.gather(add_q)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"in_q.put(10)\n", | |
"out_q.get()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"from random import random\n", | |
"\n", | |
"def feed(q):\n", | |
" for i in range(10000):\n", | |
" sleep(random())\n", | |
" q.put(i)\n", | |
" \n", | |
"t = Thread(target=feed, args=(q,))\n", | |
"t.daemon = True\n", | |
"t.start()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"out_q.qsize()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.5.2" | |
}, | |
"widgets": { | |
"state": { | |
"b112c72cc59a4cdc8217351b61931334": { | |
"views": [ | |
{ | |
"cell_index": 11 | |
} | |
] | |
} | |
}, | |
"version": "1.2.0" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 0 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment