Skip to content

Instantly share code, notes, and snippets.

@wapiflapi
Created February 16, 2015 10:43
Show Gist options
  • Select an option

  • Save wapiflapi/9f12d672c594505222df to your computer and use it in GitHub Desktop.

Select an option

Save wapiflapi/9f12d672c594505222df to your computer and use it in GitHub Desktop.
Binglide v2 IPC BXMDP00 / Draft POC
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import collections
import zmq
import ipc
class RoundRobin(object):
def __init__(self):
self.clients = collections.defaultdict(self.add_client)
self.queue = collections.deque()
def add_client(self, client):
client_queue = collections.deque()
self.queue.append((client, client_queue))
return client_queue
def add(self, client, task):
self.clients[client].append(task)
def pop(self):
client, client_queue = self.queue.popleft()
task = client_queue.popleft()
if client_queue:
self.queue.append((client, client_queue))
else:
self.clients.pop(client)
return client, task
class GreedySetDict(collections.defaultdict):
def __init__(self):
super().__init__(set)
def popfrom(self, key, *args):
value = self[key].pop(*args)
if not self[key]:
del self[key]
def removefrom(self, key, *args):
value = self[key].remove(*args)
if not self[key]:
del self[key]
class Broker(ipc.Dispatcher):
def __init__(self, zmqctx, dealer_config):
self.zmqctx = zmqctx
self.router = self.zmqctx.socket(zmq.ROUTER)
router_config(self.router)
self.requests = collections.defaultdict(RoundRobin)
self.idleworkers = GreedySetDict()
self.activeworkers = {}
self.jobs = GreedySetDict()
def run(self):
while True:
msg = self.router.recv_multipart()
self.dispatch(msg[1], msg)
def issue_work(self, worker, task):
task[2] = task[0]
task[1] = bxmdp.XREQUEST
task[0] = worker
job = (task[3], task[4])
self.jobs[job].add(worker)
self.activeworkers[worker] = job
self.router.send_multipart(task)
def match_worker(self, service):
if service not in self.idleworkers:
return
try:
client, task = self.requests[service].pop()
except KeyError:
return
# shouldn't fail because service has entries.
worker = self.idleworkers.popfrom(service)
self.issue_work(worker, task)
@ipc.bind(bxmdp.REQUEST)
def on_request(self, msg):
if not msg[4]:
# This client is the original client.
msg[4] = msg[0]
self.requests[msg[2]].add(msg[4], msg)
# We need to match in case there is already a worker waiting.
self.match_worker(msg[2])
@ipc.bind(bxmdp.CANCEL)
def on_cancel(self, msg):
if not msg[4]:
# This client is the original client.
msg[4] = msg[0]
job = (msg[3], msg[4])
for worker in self.jobs[job]:
msg[2] = msg[0]
msg[1] = bxmdp.XCANCEL
msg[0] = worker
self.router.send_multipart(msg)
@ipc.bind(bxmdp.READY)
def on_ready(self, msg):
self.workers[msg[0]] = msg[2]
self.idleworkers[msg[2]].add(msg[0])
job = self.activeworkers.pop(msg[0])
self.jobs.removefrom(job, msg[0])
# We need to match in case there is already a task waiting.
self.match_worker(msg[2])
@ipc.bind(bxmdp.XREPORT)
def on_xreport(self, msg):
service = self.workers[msg[0]]
msg[0] = msg[2]
msg[1] = bxmdp.REPORT
msg[2] = service
self.router.send_multipart(msg)
@ipc.bind(bxmdp.DISCONNECT)
def on_disconnect(self, msg):
service = self.worker.pop(msg[0])
try:
self.idleworkers.removefrom(service, msg[0])
except KeyError:
pass
try:
job = self.activeworkers.pop(msg[0])
self.jobs.removefrom(job, msg[0])
except KeyError:
pass
def versions():
print("libzmq %s" % zmq.zmq_version())
print("pyzmq %s" % zmq.__version__)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import abc
class Endpoint(metaclass=abc.ABCMeta):
def __init__(self, endpoint, identity=None):
self.endpoint = endpoint
self.identity = identity
@abc.abstractmethod
def set_endpoint(self, socket):
pass
def __call__(self, socket):
if self.identity is not None:
socket.identity = identity
self.set_endpoint(socket)
return socket
class Bind(Endpoint):
def set_endpoint(self, socket):
socket.bind(self.endpoint)
class Connect(Endpoint):
def set_endpoint(self, socket):
socket.connect(self.endpoint)
class bind(object):
def __init__(self, *args):
self.keys = args
def __call__(self, f):
f._handles = keys
return f
class DispatcherMeta(type):
def __new__(cls, name, bases, attrs):
# Not sure if we want this or not.
if not issubclass(attrs.setdefault('bind', bind), bind):
raise TypeError("bind should be subclass of %r" % bind)
dispatchtable = attrs.setdefault('dispatchtable', {})
for value in attrs.values():
for key in getattr(value, '_handles', []):
dispatchtable.setdefault(key, []).append(value)
return super().__new__(cls, name, bases, attrs)
class Dispatcher(meta=DispatcherMeta):
def __init__(self, assertive=False):
self.assertive = assertive
def dispatch(self, key, *args, **kwargs):
calbacks = self.dispatchtable.get(key, [])
for callback in callbacks:
return callback(*args, **kwargs)
if self.assertive and not calbacks:
return self.handle_unknown(key, *args, **kwargs)
def handle_unknown(self, key, *args, **kwargs):
raise NotImplementedError("Unknown event key <%s>." % key)

%VERSION% = BXMDP00

When receiving messages a ROUTER socket shall prepend a message part containing the identity of the originating peer to the message before passing it to the application. When sending messages a ROUTER socket shall remove the first part of the message and use it to determine the identity of the peer the message shall be routed to.

This extra frame is not shown in the sub-protocol commands explained below.

REQUEST

from CLIENT

  1. %VERSION%.REQUEST
  2. service name; SHOULD be utf8 encoded string.
  3. request id; opaque binary
  4. client id, should be empty if not set by broker
  5. request body; opaque binary

to WORKER

  1. %VERSION%.XREQUEST
  2. return address; envelope stack
  3. request id; opaque binary
  4. client id, should be empty if not set by broker
  5. request body; opaque binary

CANCEL

from CLIENT

  1. %VERSION%.CANCEL
  2. service name; SHOULD be utf8 encoded string.
  3. request id; opaque binary
  4. client id, should be empty if not set by broker
  5. request body; opaque binary

to WORKER

  1. %VERSION%.XCANCEL
  2. return address; envelope stack
  3. request id; opaque binary
  4. client id, should be empty if not set by broker
  5. request body; opaque binary

REPORT

from WORKER

  1. %VERSION%.XREPORT
  2. return address; envelope stack
  3. request id; opaque binary4
  4. client id, should be empty if not set by broker
  5. report body; opaque binary

to CLIENT

  1. %VERSION%.REPORT
  2. service name; SHOULD be utf8 encoded string.
  3. request id; opaque binary
  4. client id, should be empty if not set by broker
  5. report body; opaque binary

READY (from WORKER)

  1. %VERSION%.READY
  2. service name; SHOULD be utf8 encoded string.

DISCONNECT (from * to BROKER, from BROKER to *)

  1. %VERSION%.DISCONNECT
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment