Created
January 25, 2016 21:53
-
-
Save almarklein/2b6ca182a3e94d532931 to your computer and use it in GitHub Desktop.
Multi-process communication via a message pool implemented using UDP multicast
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2016 (C) Almar Klein. Consider this 2-clause BSD licensed. | |
""" | |
I initially wrote this to be included in flexx for multiple processes | |
on the same machine to communicate, more specifically, to allow a Flexx | |
CLI to get info on and terminate running server processes. I tested it | |
to work on Windows and Linux. I eventuially discarded this approach | |
because I don't fully get how UDP multicast works. It's important for | |
this to only work for localhost, and I am not sure if it does, or how | |
hard it is to compromise this. I went with a simpler approach based on | |
http requests using Tornado. | |
---- | |
Functionality to connect multiple processes in a multicast environment. | |
Based on a channel name (which is hashed to a port number), a pool is | |
defined, through which nodes can send messages. Messages can be send | |
to a specific node, or broadcasted to any node that's interested. | |
This provides a small and lightweight implementation for inter process | |
communication, which supports pub-sub and req-rep. | |
Each node in the pool has a specific id, that can be used to address | |
messages. Each message also has a topic, which is used by nodes to | |
filter messages. | |
Each message is a UTF-8 encoded string that looks like this: | |
'<header> <channel> <body>' | |
The channel contains information about sender, recipient and a "topic". | |
* A/B/X: A message with topic X is send by A to B. | |
* A/*/X: A message with topic X is broadcasted (i.e. published) by A. | |
A topic that starts with "req_" is intended as a request to send back | |
a message. By default, all nodes support the topic "req_identify", | |
which will send back on topic "identify". This can be used to get | |
an overview of all nodes in the pool. | |
""" | |
import os | |
import sys | |
import time | |
import socket | |
import logging | |
import weakref | |
import threading | |
ANY = '0.0.0.0' | |
LOCAL = '127.0.0.1' | |
def port_hash(name): | |
""" Given a string, returns a port number between 49152 and 65535 | |
This range (of 2**14 posibilities) is the range for dynamic and/or | |
private ports (ephemeral ports) specified by iana.org. The algorithm | |
is deterministic. | |
""" | |
fac = 0xd2d84a61 | |
val = 0 | |
for c in name: | |
val += (val >> 3) + (ord(c) * fac) | |
val += (val >> 3) + (len(name) * fac) | |
return 49152 + (val % 2**14) | |
class PoolNode: | |
""" A node in a pool of connections. | |
""" | |
def __init__(self, id=None, channel=None, mcast_ip='239.255.24.24'): | |
for name, val in [('id', id), ('channel', channel)]: | |
if not val: | |
raise ValueError('%s needs an %s as input' % (self.__class__.__name__, name)) | |
self._id = str(id) | |
self._header = str(channel) | |
self._headerb = self._header.encode() | |
self._port = port_hash(self._header) | |
self._mcast_ip = mcast_ip | |
self._lock = threading.RLock() | |
self._collect_topic = '' | |
self._collected_messagages = [] | |
self._sock = self._create_socket() | |
self._thread = PoolNodeThread(self) | |
self._thread.start() | |
def __del__(self): | |
# No need to stop thread; it stops when we die | |
try: | |
self._sock.close() | |
except Exception: | |
pass | |
def _create_socket(self): | |
""" The multicast magic is here. | |
""" | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
sock.bind((ANY, self._port)) | |
sock.settimeout(0.1) | |
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 0) | |
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, | |
socket.inet_aton(self._mcast_ip) + socket.inet_aton(ANY)) | |
return sock | |
def send(self, to, topic, msg): | |
""" Send a message into the pool. To broadcast, use "*" for ``to``. | |
""" | |
channel = '%s/%s/%s' % (self._id, to, topic) | |
txt = '%s %s %s' % (self._header, channel, msg) | |
self._sock.sendto(txt.encode(), (self._mcast_ip, self._port)) | |
def request(self, to, topic, msg, timeout=0.1): | |
""" Make a request. This sends a message on topic "req_X" and waits | |
for a reply, directed at us with topic "X". If the to field is "*", | |
multiple messages are collected over a short amount of time. Otherwise | |
the function returns as soon as one reply has been received. | |
""" | |
if not topic.startswith('req_'): | |
raise ValueError('A request should have a topic that starts with "req_".') | |
with self._lock: | |
self.send(to, topic, msg) | |
count = None if to == '*' else 1 | |
self._collected_messagages = [] | |
self._collect_topic = topic[4:] | |
self._receive(count, timeout) | |
with self._lock: | |
self._collect_topic = '' | |
return self._collected_messagages[:] | |
def receive(self, filter_topic, count=None, timeout=0.1): | |
""" Receive a message for a given topic. | |
""" | |
with self._lock: | |
self._collected_messagages = [] | |
self._collect_topic = filter_topic | |
self._receive(count, timeout) | |
with self._lock: | |
self._collect_topic = '' | |
return self._collected_messagages[:] | |
def _receive(self, count, timeout): | |
etime = time.time() + timeout | |
if count is None: | |
time.sleep(timeout) | |
else: | |
while time.time() < etime: | |
time.sleep(0.001) | |
if len(self._collected_messagages) >= count: | |
break | |
def _respond(self, data): | |
""" The thread calls this. Here we dispatch the handling of a message. | |
""" | |
data = data.decode() | |
_, channel, msg = data.split(' ', 2) | |
fro, to, topic = channel.split('/') | |
if to in (self._id, '*'): | |
# Are we waiting for it? | |
with self._lock: | |
if self._collect_topic and topic == self._collect_topic: | |
self._collected_messagages.append((fro, msg)) | |
return | |
# Process normally | |
method = getattr(self, 'on_' + topic, None) | |
if method: | |
res = method(msg) | |
if res is not None and topic.startswith('req_'): | |
self.send(fro, topic[4:], res) | |
def on_req_identify(self, msg): | |
return '' | |
class PoolNodeThread(threading.Thread): | |
""" The thread for the PoolNode that listens for incoming messages. | |
""" | |
def __init__(self, responder): | |
super().__init__() | |
self._responder = weakref.ref(responder) | |
self._headerb = responder._headerb | |
self._do_stop = False | |
self.setDaemon(True) | |
def run(self): | |
try: | |
self._run() | |
except Exception: | |
pass # interpreter shutdown | |
def _run(self): | |
responder = None | |
while True: | |
responder = self._responder() | |
if responder is None: | |
break | |
sock = responder._sock | |
del responder | |
try: | |
data, addr = sock.recvfrom(1024) | |
except socket.timeout: | |
continue | |
if data.startswith(self._headerb): | |
responder = self._responder() | |
if responder is not None: | |
try: | |
responder._respond(data) | |
except Exception as err: | |
logging.error('PoolNodeThread error: %s' % str(err)) | |
if __name__ == '__main__': | |
class MyReplier(PoolNode): | |
def on_req_exit(self, msg): | |
import signal | |
os.kill(os.getpid(), signal.SIGINT) | |
res = MyReplier(os.getpid(), 'FLEXX') | |
req = MyReplier(os.getpid()+1, 'FLEXX') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment