Last active
August 29, 2015 13:58
-
-
Save d1manson/9943712 to your computer and use it in GitHub Desktop.
wrap matplotlib for piping to main process from subprocess (incomplete) http://stackoverflow.com/q/22819254/2399799
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
The purpose of this module is to do monoprocessing for a single package when using multiprocessing. | |
It is designed with matplotlib in mind, but most of it is pretty general. | |
The idea is that you may have a block of code that you want to run in parallel | |
processes, but because of some "small" thing (i.e. something that isn't computationally expensive) | |
you can't do it easily without some major, ugly, alterations to your existing code. Monobrow.py | |
injects all the ugly stuff for you at the point you launch the processes, so you don't need to | |
alter your base code at all. | |
WARNING: It currently only works in fairly simple circumstances. | |
Many, many, many, things here were learnt from Stack Overflow at one time or another. | |
TOOOs: | |
Implement more stuff | |
We have to be a careful in the way we listen for stuff and hold references on | |
the main process. We could in principle, I believe, find out the reference | |
count on the other process (for our fake isntances) and do garbage collection | |
on the main process...but in the short term it will be sufficient to just wait | |
for the given process to complete and allow all referenecs on the main thread | |
to go out of scope at that point. | |
I'm not sure how much of this could be implemented using multiprocessing managers. | |
Need to take care of using fake_obj_sub instance in argument.kwargs list, i.e. need to | |
convert them back to basic dummy class with nothing but an _id. And then | |
need to recover that back in the main process. | |
Need to find and implement a nice way of transfering large amounts of data that | |
won't fit in the pipe...various options, not sure what is easiest/best...ideally | |
want a way to make data available as readonly. | |
Need to wrap up the stuff for the main process more nicely. | |
Would be good if we could return a placeholder object from the piped calls, and | |
then only provide values/more fake objs on request, thus we could prevent locking. | |
In the spawned process, statements like ``impport bigThing.subThing`` | |
fail, saying there is no "subThing", but you can do:: | |
import bitThing | |
bigThing.subThing # this is fine | |
Also, if you have the statement ``import bigThing.subThing`` before the ``runWithPiping`` | |
function does it's work then __import__ accepts that ``subThing`` exists and will | |
happily call ``getattr`` on ``bigThing``, which is what we want...need to fix this. | |
""" | |
""" | |
############################################################################### | |
Stuff required by main and spawned processes (roughly speaking) | |
############################################################################### | |
""" | |
def strTrunc(x,at=4): | |
s = str(x) | |
return s[:at] + (s[at:] and "...") | |
""" | |
############################################################################### | |
stuff required in spawned process (roughly speaking) | |
############################################################################### | |
""" | |
import sys | |
def unwrapThing(thing,socket): | |
"""This takes "things" which have been reiceved from the main process | |
and prepares them for useage in this process.""" | |
if isinstance(thing,FAKE_OBJ_MAIN): | |
return FAKE_OBJ_SUB(thing._id,socket) | |
else: | |
return thing | |
def piped_call(method_name): | |
def wrapped(self,*args,**kwargs): | |
self_id = object.__getattribute__(self,'_id') | |
self_socket = object.__getattribute__(self,'_socket') | |
print "requesting", method_name, 'args=', strTrunc(args), 'kwargs=', strTrunc(kwargs) | |
sys.stdout.flush() | |
self_socket.send((method_name,self_id,args,kwargs)) | |
_,_,ret = self_socket.recv() | |
return unwrapThing(ret,self_socket) | |
return wrapped | |
PIPEABLE_METHODS = [ | |
'__getattribute__', | |
'__getitem__', | |
'__setitem__', | |
'__delitem__', | |
'__iter__', | |
'__reversed__', | |
'__contains__', | |
'__missing__', | |
'__call__', | |
'__setattr__', | |
'__len__', | |
'__enter__', | |
'__exit__', | |
'__dir__', | |
] # TODO: grow this list | |
class FAKE_OBJ_SUB(list,object): | |
""" This class is a generic wrapper for all objects. It pipes all access | |
requests to the main process and gets results back, thu it is able to act | |
as though it were an object of the expected type, | |
e.g. a matplotlib axes instance. | |
TODO: proably want to subclass FAKE_OBJ_SUB with different fake basecalsses | |
rather than use list here. Could then get info on baseclass as the point of | |
creating the FAKE_OBJ, and used the relevant subclass. | |
""" | |
def __init__(self,_id,socket): | |
object.__setattr__(self,'_id',_id) | |
object.__setattr__(self,'_socket',socket) | |
def __str__(self): | |
return "FAKE_OBJ_SUB for real id=%s" % object.__getattribute__(self,'_id') | |
"""Add all the required methods to FAKE_OBJ_SUB""" | |
for method_name in PIPEABLE_METHODS: | |
setattr(FAKE_OBJ_SUB,method_name,piped_call(method_name)) | |
def runWithPiping(socket,targetPackage,func,args,kwargs): | |
if targetPackage in sys.modules: del sys.modules[targetPackage] | |
sys.modules[targetPackage] = FAKE_OBJ_SUB(_id=targetPackage,socket=socket) | |
return func(*args,**kwargs) | |
""" | |
############################################################################### | |
Stuff required in main process (roughly speaking) | |
############################################################################### | |
""" | |
from importlib import import_module | |
import sys | |
from multiprocessing import Process, Pipe | |
class FAKE_OBJ_MAIN(): | |
"""This is a dummy class which makes it easy to recognise fake object ids | |
at the sub process end...ie. it exists purely to be pickled and unpickled.""" | |
def __init__(self,_id): | |
self._id = _id | |
def __str__(self): | |
return "FAKE_OBJ_MAIN for real id=%s" % self._id | |
class MonobrowProcess(object): | |
""" Monobrow is exposed through this class. | |
See bottom of this file for example. | |
TODO: It might be better to subclass Process and implement pickle as | |
required """ | |
def __init__(self,group,target,args=(),kwargs={},targetPackage='matplotlib'): | |
"""See multiprocessing.Process for meaning of inputs. | |
``targetPackage`` is the name of the package to apply monobrow to.""" | |
self._nearSocket,farSocket = Pipe() | |
self._process = Process(target=runWithPiping,args=(farSocket,targetPackage,target,args,kwargs)) | |
self._inst_cache = {} | |
def start(self): | |
self._process.start() | |
def _getCachedItem(self,_id): | |
if _id not in self._inst_cache: | |
return self._getModule(_id) | |
else: | |
return self._inst_cache[_id] | |
def _getModule(self,module_name): | |
if module_name not in sys.modules: | |
import_module(module_name) | |
return sys.modules[module_name] | |
def _wrapThing(self,thing,new_id): | |
"""This prepares things to be send to the spawned process""" | |
IMMUTABLE_TYPES = (float,int,long,str,unicode) | |
if isinstance(thing,IMMUTABLE_TYPES): | |
return thing | |
else: | |
new_id = id(thing) if new_id is None else new_id | |
self._inst_cache[new_id] = thing | |
return FAKE_OBJ_MAIN(new_id) | |
def recvSome(self): | |
while self._nearSocket.poll(): | |
packet = self._nearSocket.recv() | |
magic_name,_id,args,kwargs = packet | |
print "doing", magic_name, 'args=', strTrunc(args,30),'kwargs=', strTrunc(kwargs) | |
sys.stdout.flush() | |
obj = self._getCachedItem(_id) | |
func = getattr(obj,magic_name) | |
ret,new_id = None,None | |
try: | |
ret = func(*args,**kwargs) | |
except AttributeError: # if obj is a module we may need to import submodule... | |
new_id = obj.__name__ + "." + args[0] | |
ret = self._getModule(new_id) | |
finally: | |
ret = self._wrapThing(ret,new_id) | |
self._nearSocket.send((magic_name,_id,ret)) | |
def is_alive(self): | |
return self._process.is_alive() | |
""" | |
############################################################################### | |
nonesense example usage | |
############################################################################### | |
""" | |
import matplotlib.pylab as plt # this looks stupid, but seems neccessary (see note at top of file) | |
""" this is the thing we want to run in parallel""" | |
def testPlot(v,c): | |
import numpy as np | |
from time import sleep | |
import matplotlib.pylab as plt | |
sleep(v) # give this process a random handicap' | |
plt.hold(True) | |
for ii in np.arange(0,2,0.1): | |
y = np.sin(ii+v) # pretend this is a complex calculation | |
plt.plot([ii],[y],'.'+c) | |
def makeSomePretendArgs(N): | |
from numpy.random import rand | |
args = [None]*N | |
colors = 'rgbkmyc' | |
for ii in range(N): | |
args[ii] = (rand(1)*5,colors[ii]) | |
return args | |
""" this is how we get it to run """ | |
if __name__ == "__main__": | |
from multiprocessing import freeze_support | |
freeze_support() | |
import matplotlib.pylab as plt | |
N = 4 | |
jobs = [None]*N | |
for ii,args_ii in zip(range(N),makeSomePretendArgs(N)): | |
jobs[ii] = MonobrowProcess(None,testPlot,args=args_ii) | |
jobs[ii].start() | |
while any((jj.is_alive() for jj in jobs)): | |
for jj in jobs: | |
jj.recvSome() | |
plt.pause(0.1) | |
for jj in jobs: # we have to do it one last time | |
jj.recvSome() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is actually working.!!..the plot commands from the multiple processes get applied to a single figure, a single axis in fact.
Of course there are many ways to break it (as mentioned in TODOs and other unknowns).