-
-
Save PatWie/b55a0ccd85238e725a7adb59e8e7c7c1 to your computer and use it in GitHub Desktop.
Edited version of example script for farming out image saves via zmq ventilator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy | |
import zmq | |
def send_array(socket, A, flags=0, copy=True, track=False): | |
"""send a numpy array with metadata""" | |
md = dict( | |
dtype = str(A.dtype), | |
shape = A.shape, | |
) | |
socket.send_json(md, flags|zmq.SNDMORE) | |
return socket.send(A, flags, copy=copy, track=track) | |
def recv_array(socket, flags=0, copy=True, track=False): | |
"""recv a numpy array""" | |
md = socket.recv_json(flags=flags) | |
msg = socket.recv(flags=flags, copy=copy, track=track) | |
buf = buffer(msg) | |
A = numpy.frombuffer(buf, dtype=md['dtype']) | |
return A.reshape(md['shape']) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
############### | |
# | |
# (c) Vasco Tenner 2012 | |
# | |
# Creates class to write images in parallel | |
# | |
############## | |
import zmq | |
import random | |
import time | |
import sys | |
import numpy as np | |
# local | |
from npzmq import send_array | |
from worker import run_worker | |
class ParallelWrite: | |
""" Write images parallel to capture of images """ | |
def __init__(self, nworkers=0, nmessages=1000): | |
self.nworkers = nworkers | |
self.num = nmessages | |
self.numtowrite = 0 | |
context = zmq.Context() | |
# Socket to send messages on | |
self.toworker = context.socket(zmq.PUSH) | |
self.toworker.hwm = 100 | |
self.toworker.bind("tcp://*:5557") | |
# Socket with direct access to the sink: | |
#used to syncronize start of batch | |
self.tosink = context.socket(zmq.PUSH) | |
self.tosink.connect("tcp://localhost:5558") | |
# Socket to receive messages on | |
self.sink = context.socket(zmq.PULL) | |
self.sink.bind("tcp://*:5558") | |
# Socket for worker control | |
self.controller = context.socket(zmq.PUB) | |
self.controller.bind("tcp://*:5559") | |
def cleanup(self): | |
lastnumtowrite = self.numtowrite+1 | |
while self.numtowrite > 0: | |
if not self.numtowrite == lastnumtowrite: | |
print 'Waiting for %i files to be written: ' % self.numtowrite | |
lastnumtowrite = self.numtowrite.copy() | |
time.sleep(0.1) | |
sys.stdout.write('.') | |
sys.stdout.flush() | |
# Send kill signal to workers | |
self.controller.send("KILL") | |
#print self.p.communicate() | |
# Finished | |
self.toworker.close() | |
self.tosink.close() | |
self.sink.close() | |
self.controller.close() | |
#print 'Waiting for worker to shut down' | |
#self.p.join() | |
def wait_for_workers(self): | |
n = max(self.nworkers, 1) | |
print "Waiting for %i workers" % n | |
ready = 0 | |
while ready < n: | |
msg = self.sink.recv_multipart() | |
if msg[0] == "READY": | |
ready += 1 | |
def ventilator(self): | |
print "Sending tasks to workers..." | |
random.seed() | |
data = np.fromfunction(lambda x,y: np.cos(x)*y,(1000,2000),dtype=np.uint16) | |
# Start our clock now | |
self.tstart = time.time() | |
# The first message is "0" and signals start of batch | |
self.tosink.send(str(self.num)) | |
self.controller.send(str(self.num)) | |
for task_nbr in xrange(self.num): | |
d = data.copy() | |
self.write_img(d, | |
'g:/python_programming/test/test%i.tif'%task_nbr,normalize=-1) | |
del d | |
if task_nbr%1 == 0: | |
print task_nbr | |
# Give 0MQ time to deliver | |
#time.sleep(1) | |
print 'Finished sending' | |
tend = time.time() | |
tdiff = tend - self.tstart | |
total_msec = tdiff * 1000 | |
print "Total elapsed time: %d msec" % total_msec | |
def write_img(self,data,fname,*args,**kwargs): | |
""" Save image using separate process, See Imagerw.write_img """ | |
self.numtowrite += 1 | |
md = dict(fname=fname, args=args, kwargs=kwargs) | |
self.toworker.send_json(md, zmq.SNDMORE) | |
send_array(self.toworker, data, copy=False) | |
def do_sink(self): | |
# Wait for start of batch | |
# Process 100 confirmiations | |
task_nbr = 0 | |
while task_nbr < self.num: | |
msg = self.sink.recv_multipart() | |
if msg != ['.']: | |
# only count "I got one!" messages | |
continue | |
task_nbr += 1 | |
self.numtowrite -= 1 | |
if task_nbr % 10 == 0: | |
sys.stdout.write(":") | |
else: | |
sys.stdout.write(".") | |
sys.stdout.flush() | |
# Calculate and report duration of batch | |
tend = time.time() | |
tdiff = tend - self.tstart | |
total_msec = tdiff * 1000 | |
print "Total elapsed time: %d msec" % total_msec | |
if __name__ == '__main__': | |
import argparse | |
from multiprocessing import Process | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-w", type=int, default=0, help="The number of workers to start") | |
parser.add_argument("-n", type=int, default=1000, help="The number of messages to send") | |
opts = parser.parse_args() | |
print "starting %i workers" % opts.w | |
workers = [ Process(target=run_worker) for i in range(opts.w) ] | |
[ w.start() for w in workers ] | |
pw = ParallelWrite(opts.w, opts.n) | |
pw.wait_for_workers() | |
pw.ventilator() | |
pw.do_sink() | |
pw.cleanup() | |
[ w.join() for w in workers ] | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import zmq | |
class Worker(object): | |
def __init__(self): | |
context = zmq.Context() | |
# Socket to send messages on | |
self.fromvent = context.socket(zmq.PULL) | |
self.fromvent.connect("tcp://localhost:5557") | |
# Socket with direct access to the sink: | |
#used to syncronize start of batch | |
self.tosink = context.socket(zmq.PUSH) | |
self.tosink.connect("tcp://localhost:5558") | |
# Socket for worker control | |
self.tocontroller = context.socket(zmq.SUB) | |
self.tocontroller.connect("tcp://localhost:5559") | |
self.tocontroller.setsockopt(zmq.SUBSCRIBE, b'') | |
def log(self, msg): | |
print "[%i] %s" % (os.getpid(), msg) | |
def process(self): | |
self.tosink.send_multipart(["READY", str(os.getpid())]) | |
poller = zmq.Poller() | |
poller.register(self.fromvent, zmq.POLLIN) | |
poller.register(self.tocontroller, zmq.POLLIN) | |
self.log("begin processing") | |
while True: | |
events = dict(poller.poll()) | |
if self.fromvent in events: | |
self.process_task() | |
if self.tocontroller in events: | |
msg = self.tocontroller.recv_multipart() | |
self.log("control message: %s" % msg) | |
if msg[0] == "KILL": | |
break | |
self.log("done processing") | |
def process_task(self): | |
md = self.fromvent.recv_json(zmq.NOBLOCK) | |
assert self.fromvent.rcvmore, "expected more parts" | |
# receive into aether: | |
data = self.fromvent.recv_multipart(zmq.NOBLOCK) | |
MB = sum(len(part) for part in data) / (1024.*1024.) | |
self.log("writing %s (%.2f MB)" % (md['fname'], MB)) | |
self.tosink.send('.') | |
def run_worker(): | |
w = Worker() | |
w.process() | |
if __name__ == '__main__': | |
import argparse | |
from multiprocessing import Process | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-n", type=int, default=1, help="The number of workers to start") | |
n = parser.parse_args().n | |
print "starting %i workers" % n | |
workers = [ Process(target=run_worker) for i in range(n) ] | |
[ w.start() for w in workers ] | |
[ w.join() for w in workers ] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment