Created
May 30, 2012 05:30
-
-
Save minrk/2833944 to your computer and use it in GitHub Desktop.
Sample notebook for backgrounding a simulation on IPython engines
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"metadata": { | |
"name": "BackgroundLoop" | |
}, | |
"nbformat": 3, | |
"worksheets": [ | |
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"%pylab inline", | |
"", | |
"import sys, time", | |
"import numpy as np", | |
"" | |
], | |
"language": "python", | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"Create the Client and View:" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"from IPython import parallel", | |
"rc = parallel.Client(profile=\"mpi\")", | |
"dv = rc[:]", | |
"dv.block = True", | |
"dv.activate()", | |
"dv" | |
], | |
"language": "python", | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"Check for MPI. It's fine for this example if we don't have it, but it will technically be possible for", | |
"simulation nodes to get slightly out of sync." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"%%px", | |
"try:", | |
" from mpi4py import MPI", | |
"except ImportError:", | |
" bcast = lambda buf: buf", | |
" barrier = lambda : None", | |
" rank = 0", | |
" print \"No MPI, dummy sims may get slightly out of sync\"", | |
"else:", | |
" mpi = MPI.COMM_WORLD", | |
" bcast = mpi.bcast", | |
" barrier = mpi.barrier", | |
" rank = mpi.rank", | |
" print \"MPI rank: %i/%i\" % (mpi.rank,mpi.size)" | |
], | |
"language": "python", | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"Quick test of the broadcast:" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"%px bcast(rank)" | |
], | |
"language": "python", | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"Define a function on the engines that holds the GIL for a period of time", | |
"(like time.sleep, but holding the GIL to simulate blocking computation):" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"%%px", | |
"from cython import inline", | |
"", | |
"def gilsleep(t):", | |
" \"\"\"gil-holding sleep with cython.inline\"\"\"", | |
" code = '\\n'.join([", | |
" 'from posix cimport unistd',", | |
" 'unistd.sleep(t)',", | |
" ])", | |
" inline(code, quiet=True, t=t)", | |
"", | |
"gilsleep(1)" | |
], | |
"language": "python", | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"And define our dummy simulation, which just refines the appropriate slice of sin(x),", | |
"after grabbing the GIL for a period of time:" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"%%px", | |
"import numpy as np", | |
"", | |
"class DummySimulation(object):", | |
" def __init__(self, x1, x2, n, p, dt=1):", | |
" self.x1 = x1", | |
" self.x2 = x2", | |
" self.n = n", | |
" self.p = p", | |
" self.dt = dt", | |
" self.y = None", | |
" self.step = 0", | |
" ", | |
" def advance(self):", | |
" # include mpi barrier, for good measure", | |
" barrier()", | |
" self.step += 1", | |
" n = self.n", | |
" p = self.p", | |
" gilsleep(self.dt)", | |
" npoints = p * self.step", | |
" X = np.linspace(x1, x2, npoints)", | |
" x = X[n * npoints / p : (n+1) * npoints / p]", | |
" self.y = np.sin(x)", | |
" return self.step", | |
"" | |
], | |
"language": "python", | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"Set up the global state (we are just sampling sin(x) on an interval), and instantiate the Simulation objects:" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"x1 = 0", | |
"x2 = 10*np.pi", | |
"dv['x1'] = x1", | |
"dv['x2'] = x2", | |
"", | |
"dv.scatter('n', range(len(dv)), flatten=True)", | |
"dv['p'] = len(dv)", | |
"dv['step'] = 0", | |
"", | |
"%px sim = DummySimulation(x1, x2, n, p, dt=1)" | |
], | |
"language": "python", | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"Take the first step, to make sure everything is in working order:" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"%px sim.advance()" | |
], | |
"language": "python", | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"This is the function we are going to use to inspect our intermediate result.", | |
"It fetches the current value of the simulation step, and the value of 'y',", | |
"and makes a plot." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"from IPython.core.display import display", | |
"", | |
"def check_y():", | |
" \"\"\"fetch and plot the current state of the simulation\"\"\"", | |
" states = dv.apply_sync(lambda : (sim.step, sim.y))", | |
" steps, ys = zip(*states)", | |
" Y = np.concatenate(ys)", | |
" X = linspace(0, 10*np.pi, len(Y))", | |
" fig = plt.figure()", | |
" plt.title(\"step %i (%i points)\" % (steps[0], len(X)))", | |
" plt.plot(X, Y, 'o-')", | |
" display(fig)", | |
" plt.close(fig)" | |
], | |
"language": "python", | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"check_y()" | |
], | |
"language": "python", | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "heading", | |
"level": 2, | |
"source": [ | |
"Now the interesting part" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"Here we start a Thread that advances the simulation *in the background*.", | |
"", | |
"The engines should be responsive on the timescale of the gilsleep threshold (**1 seconds**, here):" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"%%px --block", | |
"from threading import Thread", | |
"", | |
"class BackgroundAdvance(Thread):", | |
" \"\"\"Call a function periodically in a thread, until self.stop() is called\"\"\"", | |
" def __init__(self, advance):", | |
" Thread.__init__(self)", | |
" self._done = False", | |
" self._paused = False", | |
" self._advance = advance", | |
" ", | |
" ", | |
" def stop(self):", | |
" self._done = True", | |
" ", | |
" def pause(self):", | |
" self._paused = True", | |
" ", | |
" def resume(self):", | |
" self._paused = False", | |
" ", | |
" def run(self):", | |
" while True:", | |
" # all stop together:", | |
" self._done = bcast(self._done)", | |
" if self._done:", | |
" return", | |
" if self._paused:", | |
" barrier()", | |
" time.sleep(1)", | |
" else:", | |
" self._advance()", | |
"", | |
"sim_thread = BackgroundAdvance(sim.advance)", | |
"sim_thread.start()", | |
"" | |
], | |
"language": "python", | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"Note how that cell actually finished running.", | |
"", | |
"Now the simulation is running in a background thread, and we can check in on it with out check_y():" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"for i in range(6):", | |
" check_y()", | |
" time.sleep(2)" | |
], | |
"language": "python", | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"And we can stop the background thread with a simple call:" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"%px sim_thread.stop()" | |
], | |
"language": "python", | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"And hopefully all the engines are on the same step at the end:" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"%px sim.step" | |
], | |
"language": "python", | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"And hopefully they are all stopped, as well:" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"%px sim_thread.is_alive()" | |
], | |
"language": "python", | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"And we can continue to advance the sim manually, as well:" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"for i in range(5):", | |
" %px sim.advance()", | |
" check_y()", | |
" sys.stdout.flush()", | |
"" | |
], | |
"language": "python", | |
"outputs": [] | |
} | |
] | |
} | |
] | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment