Skip to content

Instantly share code, notes, and snippets.

@erikkaplun
Created February 12, 2013 13:45
Show Gist options
  • Save erikkaplun/4769978 to your computer and use it in GitHub Desktop.
Save erikkaplun/4769978 to your computer and use it in GitHub Desktop.
# ... snip
class Cell(_BaseCell): # TODO: inherit from Greenlet?
uri = None
hub = None
impl = None
proc = None
stash = None
stopped = False
inbox = None
_ref = None
child_name_gen = None
watchers = None
watchees = None
def __init__(self, parent, factory, uri, hub):
if not callable(factory): # pragma: no cover
raise TypeError("Provide a callable (such as a class, function or Props) as the factory of the new actor")
self.factory = factory
self.parent = parent
self.uri = uri
self.hub = hub
self.queue = gevent.queue.Queue()
self.inbox = deque()
self.worker = gevent.spawn(self.work)
@logstring(u'←')
def receive(self, message):
self.queue.put(message)
@logstring(u'↻')
def work(self):
def _restart():
self.shutdown()
self.worker = gevent.spawn(self.work)
gevent.getcurrent().kill()
def _stop():
self.shutdown()
self.destroy()
gevent.getcurrent().kill()
def _resume_children():
for x in self.children:
x.send('_resume')
dbg(u"►►")
try:
self.impl = self.construct()
except:
self.report()
stash = deque()
while True:
m = self.queue.get()
if m in ('_resume', '_stop'):
self.queue.queue.extendleft(stash)
_stop()
elif m == '_restart':
self.queue.queue.extendleft(stash)
_restart()
else:
stash.appendleft(m)
else:
processing, has_ever_wanted_a_message = (True, False) if self.impl.run else (False, True)
error = suspended = stopping = restarting = False
while True:
# consume the queue, handle system messages and internal signals from receive()/run(), and collect letters to the inbox
if suspended or processing or error or stopping or not self.inbox:
self.queue.peek()
should_restart = False
suspend_or_resume = None
while True:
try:
m = self.queue.get_nowait()
except gevent.queue.Empty:
break
if m == '__done':
has_ever_wanted_a_message = True
processing = False
if stopping:
if self.proc:
self.proc.kill()
_stop()
elif restarting:
should_restart = True
elif m == ('__error', ANY, ANY):
_, exc, tb = m
error = True
processing = False
if stopping:
Events.log(ErrorIgnored(self.ref, exc, tb))
if self.proc:
self.proc.kill()
_stop()
elif restarting:
should_restart = True
else:
self._suspend_children()
self.report((exc, tb))
elif m == '_stop':
if not processing or not has_ever_wanted_a_message:
if self.proc:
self.proc.kill()
_stop()
else:
stopping = True
elif m == '_restart':
if not processing or not has_ever_wanted_a_message:
should_restart = True
else:
restarting = True
elif m in ('_suspend', '_resume'):
suspend_or_resume = m
elif m == '_ignore':
assert error, "unexpected _ignore received"
if self.impl.run:
_stop()
else:
error = False
elif m == ('_watched', ANY):
self._watched(m[1])
elif m == ('_unwatched', ANY):
self.watchers.discard(m[1])
else:
if m == ('_node_down', ANY):
_, node = m
self.inbox.extend(('terminated', x) for x in (self.watchees or []) if x.uri.node == node)
else:
self.inbox.append(m)
if should_restart:
_restart()
elif suspend_or_resume:
suspended_new = (suspend_or_resume == '_suspend')
if suspended != suspended_new:
suspended = suspended_new
if suspended:
self._suspend_children()
else:
if self.proc and self.proc.exception:
self.destroy()
gevent.getcurrent().kill()
else:
_resume_children()
# process the normal letters (i.e. the regular, non-system/non-special messages)
while not processing and not suspended and not error and not stopping and self.queue.empty() and self.inbox:
m = self.inbox.popleft()
if m == ('_error', ANY, ANY, ANY): # error handling is viewed as user-land logic
_, sender, exc, tb = m
self.catch_exc(self.supervise, sender, exc, tb)
break
if m == ('terminated', ANY):
_, actor = m
if actor in self.watchees:
self.watchees.remove(actor)
self._unwatch(actor, silent=True)
elif m == ('_child_terminated', ANY):
self._child_gone(m[1])
break
if self.impl.receive:
processing = True
self.proc = gevent.spawn(self.catch_exc, self.catch_unhandled, self.impl.receive, m)
gevent.idle()
elif self.impl.run:
assert self.ch.balance == -1
if self.get_pt == m:
processing = True
self.ch.put(m)
self.inbox.extendleft(reversed(self.stash))
self.stash.clear()
else:
self.stash.append(m)
else:
self.catch_exc(self.unhandled, m)
def _suspend_children(self):
for x in self.children:
x.send('_suspend')
def catch_exc(self, fn, *args, **kwargs):
try:
fn(*args, **kwargs)
except Exception:
self.queue.put(('__error', sys.exc_info()[1], sys.exc_info()[2]))
else:
self.queue.put('__done')
def catch_unhandled(self, fn, m):
try:
fn(m)
except Unhandled:
self.unhandled(m)
# processes (== Actors with a run() method)
def get(self, *patterns):
self.get_pt = OR(*patterns)
self.queue.put('__done')
return self.ch.get()
def flush(self):
while self.stash:
self.unhandled(self.stash.popleft())
# TODO: haven't figured out yet how to cleanly fit an implementation of this to the existing state machine
# def escalate(self):
# _, exc, tb = sys.exc_info()
# if not (exc and tb):
# raise InvalidEscalation("Process.escalate must be called in an exception context")
# self.report((exc, tb))
# # TODO: don't know what--this code is from the old Process class which has now been merged into this class
# birth & death
def construct(self):
factory = self.factory
try:
impl = factory()
except Exception:
raise CreateFailed("Constructing actor failed", factory)
impl._parent = self.parent
impl._set_cell(self)
if hasattr(impl, 'pre_start'):
pre_start = impl.pre_start
args, kwargs = impl.args, impl.kwargs
try:
pre_start(*args, **kwargs)
except Exception:
raise CreateFailed("Actor failed to start", impl)
self.ch = gevent.queue.Channel()
if impl.run:
if impl.receive:
raise TypeError("actor should implement only run() or receive() but not both")
self.proc = gevent.spawn(self.wrap_run, impl.run)
self.stash = deque()
return impl
def wrap_run(self, fn):
try:
ret = fn(*self.impl.args, **self.impl.kwargs)
except Exception:
self.queue.put(('__error', sys.exc_info()[1], sys.exc_info()[2]))
else:
if ret is not None:
warnings.warn("Process.run should not return anything--it's ignored")
self.queue.put('__done')
self.queue.put('_stop')
def shutdown(self, term_msg='_stop'):
if hasattr(self.impl, 'post_stop'):
try:
self.impl.post_stop()
except Exception:
Events.log(ErrorIgnored(self.ref, sys.exc_info()[1], sys.exc_info()[2]))
while self.watchees:
self._unwatch(self.watchees.pop())
for child in self.children:
child << term_msg
stash = deque()
while self.children:
m = self.queue.get()
if m == ('_child_terminated', ANY):
self._child_gone(m[1])
else:
stash.appendleft(m)
self.queue.queue.extendleft(stash)
def destroy(self):
ref = self.ref # grab the ref before we stop, otherwise ref() returns a dead ref
self.stopped = True
self.queue.queue.extendleft(reversed(self.inbox))
while True:
try:
m = self.queue.get_nowait()
except gevent.queue.Empty:
break
if m == ('_watched', ANY):
self._watched(m[1])
elif m == ('_error', ANY, ANY, ANY):
_, sender, exc, tb = m
Events.log(ErrorIgnored(sender, exc, tb))
elif m == ('__error', ANY, ANY):
_, exc, tb = m
Events.log(ErrorIgnored(ref, exc, tb))
elif not (m == ('terminated', ANY) or m == ('_unwatched', ANY) or m == ('_node_down', ANY) or m == '_restart' or m == '_stop' or m == '_suspend' or m == '_resume' or m == '__done'):
Events.log(DeadLetter(ref, m))
self.parent.send(('_child_terminated', ref))
for watcher in (self.watchers or []):
watcher << ('terminated', ref)
self.impl = self.inbox = self.queue = ref._cell = self.parent = None
# unhandled
def unhandled(self, m):
if ('terminated', ANY) == m:
raise UnhandledTermination(watcher=self.ref, watchee=m[1])
else:
Events.log(UnhandledMessage(self.ref, m))
# supervision
def supervise(self, child, exc, tb):
if child not in self.children:
Events.log(ErrorIgnored(child, exc, tb))
return
supervise = getattr(self.impl, 'supervise', None)
if supervise:
decision = supervise(exc)
if not supervise or decision == Default:
decision = default_supervise(exc)
dbg(u"%r → %r" % (decision, child))
if not isinstance(decision, Decision):
raise BadSupervision("Bad supervisor decision: %s" % (decision,), exc, tb)
# TODO: make these always async?
if decision == Ignore:
child << '_ignore'
elif decision == Restart(ANY, ANY):
child << '_restart'
elif decision == Stop:
child << '_stop'
else:
raise exc, None, tb
@logstring("report")
def report(self, exc_and_tb=None):
if not exc_and_tb:
_, exc, tb = sys.exc_info()
else:
exc, tb = exc_and_tb
if not isinstance(exc, UnhandledTermination):
if isinstance(tb, str):
exc_fmt = tb
else:
exc_fmt = ''.join(traceback.format_exception(type(exc), exc, tb))
if isinstance(exc, WrappingException):
inner_exc_fm = traceback.format_exception(type(exc.cause), exc.cause, exc.tb)
inner_exc_fm = ''.join(' ' + line for line in inner_exc_fm)
exc_fmt += "-------\n" + inner_exc_fm
fail('\n\n', exc_fmt)
else:
fail("Died because a watched actor (%r) died" % (exc.watchee,))
Events.log(Error(self.ref, exc, tb)),
self.parent << ('_error', self.ref, exc, tb)
# death watch
def watch(self, actor, *actors, **kwargs):
actors = (actor,) + actors
actors = [self.spawn(x, **kwargs) if isinstance(x, (type, Props)) else x for x in actors]
for other in actors:
node = self.uri.node
if other != self.ref:
assert not other.is_local if other.uri.node and other.uri.node != node else True, (other.uri.node, node)
if not self.watchees:
self.watchees = set()
if other not in self.watchees:
self.watchees.add(other)
if not other.is_local:
self.hub.watch_node(other.uri.node, report_to=self.ref)
other << ('_watched', self.ref)
return actors[0] if len(actors) == 1 else actors
def unwatch(self, actor, *actors):
actors = (actor,) + actors
if self.watchees:
for actor in actors:
try:
self.watchees.remove(actor)
except KeyError:
pass
else:
self._unwatch(actor)
def _unwatch(self, other, silent=False):
if not silent:
other << ('_unwatched', self.ref)
if not other.is_local:
self.hub.unwatch_node(other.uri.node, report_to=self.ref)
def _watched(self, other):
if not self.watchers:
self.watchers = set()
self.watchers.add(other)
def _unwatched(self, other):
if self.watchers:
self.watchers.discard(other)
# misc
@property
def ref(self):
if self.stopped:
return Ref(cell=None, uri=self.uri)
if not self._ref or not self._ref():
ref = Ref(self, self.uri) # must store in a temporary variable to avoid immediate collection
self._ref = weakref.ref(ref)
return self._ref()
@property
def root(self):
from spinoff.actor.guardian import Guardian
return self.parent if isinstance(self.parent, Guardian) else self.parent._cell.root
def __repr__(self):
return "<cell:%s@%s>" % (type(self.impl).__name__ if self.impl else (self.factory.__name__ if isinstance(self.factory, type) else self.factory.cls.__name__), self.uri.path,)
import time
from types import GeneratorType
import zmq.green as zmq
from gevent import sleep, spawn
from gevent.socket import gethostbyname
from spinoff.actor.events import Events, DeadLetter
from spinoff.actor.remoting.hublogic import HubLogic, Connect, Disconnect, SendPacket, Deliver, NotifySendFailed, NotifyNodeDown
__all__ = ['Hub']
class Hub(object):
"""Handles traffic between actors on different nodes.
The wire-transport implementation is specified/overridden by the `incoming` and `outgoing` parameters.
"""
__doc_HEARTBEAT_INTERVAL__ = (
"Time on seconds after which to send out a heartbeat signal to all known nodes. Regular messages can be "
"subsituted by the framework for heartbeats to save network bandwidth.")
HEARTBEAT_INTERVAL = 1.0
ALLOWED_HEARTBEAT_DELAY = HEARTBEAT_INTERVAL * 0.2
HEARTBEAT_MAX_SILENCE = 15.0
def __init__(self, node_id, owner):
self._owner = owner
self._logic = HubLogic()
self._ctx = zmq.Context()
self._ctx.linger = 0
self._sock = self._ctx.socket(zmq.ROUTER)
self._sock.identity = node_id
self._listener = spawn(self._listen)
self._hearbeater = spawn(self._send_recv_heartbeat)
self._watched_nodes = set()
def _listen(self):
recv, message_received, execute, t = self._sock.recv, self._logic.message_received, self._execute, time.time
while True:
sender_node_id, msg_bytes = recv()
execute(message_received(sender_node_id, msg_bytes, t()))
def _send_recv_heartbeat(self):
interval, heartbeat, execute, t = self.HEARTBEAT_INTERVAL, self._logic.heartbeat, self._execute, time.time
while True:
execute(heartbeat(t()))
sleep(interval)
def send_message(self, node_id, msg_handle):
self._execute(self._logic.send_message(node_id, msg_handle))
def watch_node(self, node_id, watcher):
if node_id not in self._watched_nodes:
self._watched_nodes[node_id] = set(watcher)
self._execute(self.logic.watch_new_node(node_id))
else:
self._watched_nodes[node_id].add(watcher)
def unwatch_node(self, node_id, watcher):
try:
self._watched_nodes[node_id].discard(watcher)
except IndexError:
pass
def stop(self):
self._listener.kill()
self._execute(self._logic.shutdown())
sleep(.01) # XXX: needed?
self._sock.shutdown()
self._ctx.shutdown()
def _execute(self, actions):
for action in flatten(actions):
cmd = action[0]
if cmd is SendPacket:
_, node_id, msg_handle = action
self._sock.send_multipart([node_id, msg_handle()])
elif cmd is Connect:
_, node_id = action
self._sock.connect(node_id_to_zmq_endpoint(node_id))
elif cmd is Disconnect:
_, node_id = action
try:
self._sock.disconnect(node_id_to_zmq_endpoint(node_id))
except zmq.ZMQError:
pass
elif cmd is Deliver:
_, msg_bytes, sender_node_id = action
self._owner.deliver(msg_bytes, sender_node_id)
elif cmd is NotifySendFailed:
_, msg_handle = action
Events.log(DeadLetter(msg_handle.get()))
elif cmd is NotifyNodeDown:
_, node_id = action
for watcher in self._watched_nodes.pop(node_id, []):
watcher << ('_node_down', node_id)
assert False, "unknown command"
def node_id_to_zmq_endpoint(node_id):
host, port = node_id.split(':')
return 'tcp://%s:%s' % (gethostbyname(host), port)
def flatten(gen):
for x in gen:
if isinstance(x, GeneratorType):
for y in x:
yield y
else:
yield x
from __future__ import print_function
import struct
DISCONNECT, MESSAGE = 1, 2
MSG_HEADER_FORMAT = '!I'
NO_CHANGE = object()
DISCONNECT_SIGNAL = '\0\0\0\0'
# signals from HubLogic to Hub or test code
Connect = object()
Disconnect = object()
SendPacket = object()
Deliver = object()
NotifySendFailed = object()
NotifyNodeDown = object()
class HubLogic(object):
def __init__(self):
self.channels = set()
self.last_seen = {}
self.versions = {}
self.queues = {}
# we start from 1 not 0 so that a 0 byte at the beginning of a pack-*et could be identified as a DISCONNECT
self.version = 1
def send_message(self, recipient_node_id, msg_handle):
# is it a new connection, or an existing but not yet active connection?
if recipient_node_id in self.channels and self._is_connection_active(recipient_node_id):
yield SendPacket, recipient_node_id, msg_handle
else:
yield Connect, recipient_node_id
yield SendPacket, recipient_node_id, self._make_ping_packet()
self._channel_opened(recipient_node_id)
self.queues[recipient_node_id].append((recipient_node_id, msg_handle))
def message_received(self, sender_node_id, msg_bytes, current_time):
if msg_bytes == DISCONNECT_SIGNAL:
if sender_node_id in self.channels:
yield Disconnect, sender_node_id
self.channels.remove(sender_node_id)
yield self._channel_lost(sender_node_id)
else:
version, msg_body_bytes = struct.unpack(MSG_HEADER_FORMAT, msg_bytes[:4]), msg_bytes[4:]
if sender_node_id not in self.channels:
yield SendPacket, sender_node_id, self._make_ping_packet()
self._channel_opened(sender_node_id)
else:
if sender_node_id in self.queues:
for msg_handle in self.queues.pop(sender_node_id, []):
yield SendPacket, sender_node_id, msg_handle
else:
if not (version > self.versions.get(sender_node_id, 0)):
# version has been reset--he has restarted, so emulate a node-down-node-back-up event pair:
yield self._channel_lost(sender_node_id)
self._channel_opened(sender_node_id)
self.versions[sender_node_id] = version
self.last_seen[sender_node_id] = current_time
if msg_body_bytes:
yield Deliver, msg_body_bytes, sender_node_id
def heartbeat(self, current_time):
t_gone = current_time - self.heartbeat_max_silence
for node_id in self.channels:
if self.last_seen[node_id] < t_gone:
yield Disconnect, node_id
self.channel.remove(node_id)
yield self._channel_lost(node_id)
else:
yield SendPacket, node_id, self._make_ping_packet()
self.version += 1
def watch_new_node(self, node_id):
yield Connect, node_id
yield SendPacket, node_id, self._make_ping_packet()
self._channel_opened(node_id)
def shutdown(self):
for node_id in self.channels:
yield SendPacket, node_id, DISCONNECT_SIGNAL
yield self._channel_lost(node_id)
# private:
def _make_ping_packet(self):
return struct.pack(MSG_HEADER_FORMAT, self.version)
def _channel_opened(self, node_id):
self.queues[node_id] = []
def _channel_lost(self, node_id):
for msg_handle in self.queues.pop(node_id, []):
yield NotifySendFailed, msg_handle
yield NotifyNodeDown, node_id
def _is_connection_active(self, node_id):
return node_id in self.versions
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment