Created
April 17, 2012 21:27
-
-
Save lebedov/2409173 to your computer and use it in GitHub Desktop.
Bidirectional communication between two processes with pyzmq
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Bidirectional communication between two processes. | |
""" | |
import numpy as np | |
import zmq | |
import multiprocessing as mp | |
# Set this to True to use durable sockets: | |
durable = False | |
def is_poll_in(sock, poller, timeout=100): | |
""" | |
Check whether a poller detects incoming data on a specified | |
socket. | |
""" | |
socks = dict(poller.poll(timeout)) | |
if sock in socks and socks[sock] == zmq.POLLIN: | |
return True | |
else: | |
return False | |
class Module(mp.Process): | |
def __init__(self, *args, **kwargs): | |
self.id = kwargs.pop('id') | |
self.local_ports = kwargs.pop('local_ports') | |
self.remote_ports = kwargs.pop('remote_ports') | |
mp.Process.__init__(self, *args, **kwargs) | |
def run(self): | |
np.random.seed(self.id) | |
# Set up connections: | |
self.ctx = zmq.Context() | |
self.poller = zmq.Poller() | |
self.in_sock = self.ctx.socket(zmq.SUB) | |
if durable: | |
self.in_sock.setsockopt(zmq.IDENTITY, str(self.id) + '_data') | |
self.in_sock.setsockopt(zmq.SUBSCRIBE, '') | |
self.in_sock.connect('tcp://localhost:%i' % self.local_ports[0]) | |
self.poller.register(self.in_sock, zmq.POLLIN) | |
self.in_ctrl_sock = self.ctx.socket(zmq.SUB) | |
if durable: | |
self.in_ctrl_sock.setsockopt(zmq.IDENTITY, str(self.id) + '_ctrl') | |
self.in_ctrl_sock.setsockopt(zmq.SUBSCRIBE, '') | |
self.in_ctrl_sock.connect('tcp://localhost:%i' % self.local_ports[1]) | |
self.poller.register(self.in_ctrl_sock, zmq.POLLIN) | |
self.out_sock = self.ctx.socket(zmq.PUB) | |
self.out_sock.bind('tcp://*:%i' % self.remote_ports[0]) | |
if durable: | |
self.out_sock.setsockopt(zmq.HWM, 1) | |
self.out_ctrl_sock = self.ctx.socket(zmq.PUB) | |
self.out_ctrl_sock.bind('tcp://*:%i' % self.remote_ports[1]) | |
if durable: | |
self.out_ctrl_sock.setsockopt(zmq.HWM, 1) | |
remote_det = False | |
sync_ack = False | |
while True: | |
# Send phony data until the remote module sends an acknowledgment: | |
if not sync_ack: | |
self.out_sock.send_pyobj('init') | |
# Check for sync signal: | |
if is_poll_in(self.in_ctrl_sock, self.poller): | |
_ = self.in_ctrl_sock.recv_pyobj() | |
sync_ack = True | |
# Send an acknowledgment when phony data is first detected: | |
socks = dict(self.poller.poll(100)) | |
if is_poll_in(self.in_sock, self.poller): | |
if self.in_sock.recv_pyobj() == 'init': | |
self.out_ctrl_sock.send_pyobj('') | |
remote_det = True | |
# Move on after synchronization has been achieved: | |
if remote_det and sync_ack: | |
break | |
# Transmit actual data between modules: | |
for i in xrange(100): | |
if is_poll_in(self.in_sock, self.poller): | |
# Read input from remote module: | |
in_data = self.in_sock.recv_pyobj() | |
print 'module %i in : %s ' % (self.id, str(in_data)) | |
# Send output to remote module: | |
out_data = np.random.rand(3) | |
print 'module %i out: %s ' % (self.id, str(out_data)) | |
self.out_sock.send_pyobj(out_data) | |
if __name__ == '__main__': | |
m1 = Module(id=0, local_ports=(5000, 5001), remote_ports=(5002, 5003)) | |
m2 = Module(id=1, local_ports=(5002, 5003), remote_ports=(5000, 5001)) | |
m1.start() | |
m2.start() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment