Created
April 16, 2012 18:41
-
-
Save lebedov/2400604 to your computer and use it in GitHub Desktop.
Interprocess communication with pyzmq and multiprocessing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Pass data between processes started through the multiprocessing module | |
using pyzmq and process them with PyCUDA | |
""" | |
import numpy as np | |
import zmq | |
import multiprocessing as mp | |
gpu = 0 | |
def worker(): | |
import pycuda.driver as drv | |
import pycuda.gpuarray as gpuarray | |
import atexit | |
# Initialize device: | |
drv.init() | |
dev = drv.Device(gpu) | |
ctx = dev.make_context() | |
atexit.register(ctx.pop) | |
print dev.name() | |
context = zmq.Context() | |
socket = context.socket(zmq.REP) | |
socket.connect("tcp://localhost:5555") | |
# Process data sent to worker until a quit signal is transmitted: | |
while True: | |
data = socket.recv_pyobj() | |
print "Worker %i: %s" % (gpu, data) | |
if data == 'quit': | |
break | |
# Do something with the data on the GPU: | |
data_gpu = gpuarray.to_gpu(data) | |
result_gpu = -data_gpu | |
socket.send_pyobj(result_gpu.get()) | |
def master(): | |
# Data to send to worker: | |
data_list = map(lambda x: np.random.rand(4, 4), xrange(4)) | |
context = zmq.Context() | |
socket = context.socket(zmq.REQ) | |
socket.bind("tcp://*:5555") | |
# Send data out for processing and get back the results: | |
for i in xrange(len(data_list)): | |
socket.send_pyobj(data_list[i]) | |
result = socket.recv_pyobj() | |
print "Master: ", result | |
socket.send_pyobj('quit') | |
if __name__ == '__main__': | |
worker = mp.Process(target=worker) | |
worker.start() | |
master() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Does this work if
master()
is called before starting the worker?I've faced this issue, with python multiprocessing where if the publisher is started from the main thread before starting the subscriber (which is run in another process as you show), then the subscriber does not receive any messages. However, if I start the publisher in another process (it's important that the
zmq.bind
be done in the new process), then things work as expected, i.e the publisher can start before the subscriber.