Created
September 18, 2014 23:17
-
-
Save rabernat/ea06ef3ca436d05cc151 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"worksheets": [ | |
{ | |
"cells": [ | |
{ | |
"metadata": {}, | |
"cell_type": "markdown", | |
"source": "# Lazy Map Reduce #\n\nIn order to make map reduce truly scalable. It needs to be lazy at every stage. There are two opportunities for laziness.\n\nThe most obvious is at the reduce stage: reduction can begin as soon as there are two elements to work on. The mapping stage doesn't have to be finished, and for a huge job, one can't afford to populate a complete list full of the individual map results before reducing. \n\nA more subtle stage is laziness in mapping. Here we consider inputs to map that are constructed by a lazy generator. The input list doesn't have to be completely build in order for map to get to work.\n\nI have found that IPython parallel can be lazy at the reduce stage but not at the map stage. For some reason, it waits for the input list to be populated completely before starting to reduce. This is not scalable. Is it a feature or a bug? I can't find a work-around.\n\n## demonstration with regular (non-parallel) python ##" | |
}, | |
{ | |
"metadata": {}, | |
"cell_type": "code", | |
"input": "from itertools import imap\nglobal verbose\nverbose = True \n\n# equivalent to range\ndef gen_list(n):\n res = []\n i=0\n while i<n:\n if verbose:\n print 'generating', i\n res.append(i)\n i += 1\n return res\n\n# equivalent to xrange\ndef gen_list_lazy(n):\n i=0\n while i<n:\n if verbose:\n print 'generating', i\n yield i\n i += 1\n if verbose:\n print 'gen_list_lazy finished'\n\ndef map_fun(x):\n if verbose:\n print 'mapping', x\n return x**2\n\ndef reduce_fun(a,b):\n if verbose:\n print 'reducing', a, b\n return a + b\n\ndef default_map_reduce(n):\n return reduce(reduce_fun, map(map_fun, gen_list(n)))\n\ndef lazy_map_reduce(n):\n reduce(reduce_fun, imap(map_fun, gen_list_lazy(4)))", | |
"prompt_number": 1, | |
"outputs": [], | |
"language": "python", | |
"trusted": true, | |
"collapsed": false | |
}, | |
{ | |
"metadata": {}, | |
"cell_type": "markdown", | |
"source": "## Default Map Reduce ##\nThis does not use any lazy evaluation. The generate, map, and reduce steps are performed in sequence." | |
}, | |
{ | |
"metadata": {}, | |
"cell_type": "code", | |
"input": "default_map_reduce(4)", | |
"prompt_number": 2, | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"text": "generating 0\ngenerating 1\ngenerating 2\ngenerating 3\nmapping 0\nmapping 1\nmapping 2\nmapping 3\nreducing 0 1\nreducing 1 4\nreducing 5 9\n", | |
"stream": "stdout" | |
}, | |
{ | |
"text": "14", | |
"output_type": "pyout", | |
"metadata": {}, | |
"prompt_number": 2 | |
} | |
], | |
"language": "python", | |
"trusted": true, | |
"collapsed": false | |
}, | |
{ | |
"metadata": {}, | |
"cell_type": "markdown", | |
"source": "## Lazy Map Reduce ##\nIn this case, the map and reduce operations begin as soon as there are enough elements present. The generator function is lazy, and so is the map function. This approach can scale to an infinitely large number of elements because it never stores the whole sequence in memory." | |
}, | |
{ | |
"metadata": {}, | |
"cell_type": "code", | |
"input": "lazy_map_reduce(4)", | |
"prompt_number": 58, | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"stream": "stdout", | |
"text": "generating 0\nmapping 0\ngenerating 1\nmapping 1\nreducing 0 1\ngenerating 2\nmapping 2\nreducing 1 4\ngenerating 3\nmapping 3\nreducing 5 9\ngen_list_lazy finished\n" | |
} | |
], | |
"language": "python", | |
"trusted": true, | |
"collapsed": false | |
}, | |
{ | |
"metadata": {}, | |
"cell_type": "markdown", | |
"source": "Even for a relatively small n, we can see that the lazy evaluation is much faster." | |
}, | |
{ | |
"metadata": {}, | |
"cell_type": "code", | |
"input": "verbose=False\n%timeit default_map_reduce(1000)\n%timeit lazy_map_reduce(1000)", | |
"prompt_number": 4, | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"text": "1000 loops, best of 3: 424 µs per loop\n100000 loops, best of 3: 2 µs per loop", | |
"stream": "stdout" | |
}, | |
{ | |
"output_type": "stream", | |
"text": "\n", | |
"stream": "stdout" | |
} | |
], | |
"language": "python", | |
"trusted": true, | |
"collapsed": false | |
}, | |
{ | |
"metadata": {}, | |
"cell_type": "markdown", | |
"source": "## IPython Parallel Map Reduce ##\nNow let's implement a parallel map reduce using IPython." | |
}, | |
{ | |
"metadata": {}, | |
"cell_type": "code", | |
"input": "# already started a cluster using the IPython control panel\nfrom IPython.parallel import Client\nc = Client()\ndview = c[:]\ndview.execute('verbose=True')\nlview = c.load_balanced_view()", | |
"prompt_number": 8, | |
"outputs": [], | |
"language": "python", | |
"trusted": true, | |
"collapsed": false | |
}, | |
{ | |
"metadata": {}, | |
"cell_type": "markdown", | |
"source": "Let's try to see if lazy evaluation is being used in the parallel map." | |
}, | |
{ | |
"metadata": {}, | |
"cell_type": "code", | |
"input": "verbose=True\nreduce(reduce_fun, lview.imap(map_fun, gen_list_lazy(6), ordered=False))", | |
"prompt_number": 9, | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"text": "generating 0\ngenerating 1\ngenerating 2\ngenerating 3\ngenerating 4\ngenerating 5\ngen_list_lazy finished\nreducing 1 0\nreducing 1 4\nreducing 5 16\nreducing 21 9\nreducing 30 25\n", | |
"stream": "stdout" | |
}, | |
{ | |
"text": "55", | |
"output_type": "pyout", | |
"metadata": {}, | |
"prompt_number": 9 | |
} | |
], | |
"language": "python", | |
"trusted": true, | |
"collapsed": false | |
}, | |
{ | |
"metadata": {}, | |
"cell_type": "markdown", | |
"source": "Here we can't see the output from the map function, but I am pretty sure that reducing starts as soon as imap returns. That is definitely what is suggested by [the documentation](http://ipython.org/ipython-doc/stable/parallel/asyncresult.html). However, it is clear that *imap waits to populate the whole list before starting to map*. The same behavior happens with lview.map and lview.map_async.\n\nThis is a problem for me, because I want to use this approach to processs an enormous dataset, and I can't afford to even put all the input into a list." | |
}, | |
{ | |
"metadata": {}, | |
"cell_type": "code", | |
"input": "", | |
"outputs": [], | |
"language": "python", | |
"trusted": true, | |
"collapsed": false | |
} | |
], | |
"metadata": {} | |
} | |
], | |
"metadata": { | |
"name": "", | |
"signature": "sha256:f37d4933adaf164a7088643c29cab76deeb3ba1ad97ef5279af4ab2ebf4c066d" | |
}, | |
"nbformat": 3 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment