Created
February 12, 2013 13:45
-
-
Save erikkaplun/4769978 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ... snip | |
class Cell(_BaseCell): # TODO: inherit from Greenlet? | |
uri = None | |
hub = None | |
impl = None | |
proc = None | |
stash = None | |
stopped = False | |
inbox = None | |
_ref = None | |
child_name_gen = None | |
watchers = None | |
watchees = None | |
def __init__(self, parent, factory, uri, hub): | |
if not callable(factory): # pragma: no cover | |
raise TypeError("Provide a callable (such as a class, function or Props) as the factory of the new actor") | |
self.factory = factory | |
self.parent = parent | |
self.uri = uri | |
self.hub = hub | |
self.queue = gevent.queue.Queue() | |
self.inbox = deque() | |
self.worker = gevent.spawn(self.work) | |
@logstring(u'←') | |
def receive(self, message): | |
self.queue.put(message) | |
@logstring(u'↻') | |
def work(self): | |
def _restart(): | |
self.shutdown() | |
self.worker = gevent.spawn(self.work) | |
gevent.getcurrent().kill() | |
def _stop(): | |
self.shutdown() | |
self.destroy() | |
gevent.getcurrent().kill() | |
def _resume_children(): | |
for x in self.children: | |
x.send('_resume') | |
dbg(u"►►") | |
try: | |
self.impl = self.construct() | |
except: | |
self.report() | |
stash = deque() | |
while True: | |
m = self.queue.get() | |
if m in ('_resume', '_stop'): | |
self.queue.queue.extendleft(stash) | |
_stop() | |
elif m == '_restart': | |
self.queue.queue.extendleft(stash) | |
_restart() | |
else: | |
stash.appendleft(m) | |
else: | |
processing, has_ever_wanted_a_message = (True, False) if self.impl.run else (False, True) | |
error = suspended = stopping = restarting = False | |
while True: | |
# consume the queue, handle system messages and internal signals from receive()/run(), and collect letters to the inbox | |
if suspended or processing or error or stopping or not self.inbox: | |
self.queue.peek() | |
should_restart = False | |
suspend_or_resume = None | |
while True: | |
try: | |
m = self.queue.get_nowait() | |
except gevent.queue.Empty: | |
break | |
if m == '__done': | |
has_ever_wanted_a_message = True | |
processing = False | |
if stopping: | |
if self.proc: | |
self.proc.kill() | |
_stop() | |
elif restarting: | |
should_restart = True | |
elif m == ('__error', ANY, ANY): | |
_, exc, tb = m | |
error = True | |
processing = False | |
if stopping: | |
Events.log(ErrorIgnored(self.ref, exc, tb)) | |
if self.proc: | |
self.proc.kill() | |
_stop() | |
elif restarting: | |
should_restart = True | |
else: | |
self._suspend_children() | |
self.report((exc, tb)) | |
elif m == '_stop': | |
if not processing or not has_ever_wanted_a_message: | |
if self.proc: | |
self.proc.kill() | |
_stop() | |
else: | |
stopping = True | |
elif m == '_restart': | |
if not processing or not has_ever_wanted_a_message: | |
should_restart = True | |
else: | |
restarting = True | |
elif m in ('_suspend', '_resume'): | |
suspend_or_resume = m | |
elif m == '_ignore': | |
assert error, "unexpected _ignore received" | |
if self.impl.run: | |
_stop() | |
else: | |
error = False | |
elif m == ('_watched', ANY): | |
self._watched(m[1]) | |
elif m == ('_unwatched', ANY): | |
self.watchers.discard(m[1]) | |
else: | |
if m == ('_node_down', ANY): | |
_, node = m | |
self.inbox.extend(('terminated', x) for x in (self.watchees or []) if x.uri.node == node) | |
else: | |
self.inbox.append(m) | |
if should_restart: | |
_restart() | |
elif suspend_or_resume: | |
suspended_new = (suspend_or_resume == '_suspend') | |
if suspended != suspended_new: | |
suspended = suspended_new | |
if suspended: | |
self._suspend_children() | |
else: | |
if self.proc and self.proc.exception: | |
self.destroy() | |
gevent.getcurrent().kill() | |
else: | |
_resume_children() | |
# process the normal letters (i.e. the regular, non-system/non-special messages) | |
while not processing and not suspended and not error and not stopping and self.queue.empty() and self.inbox: | |
m = self.inbox.popleft() | |
if m == ('_error', ANY, ANY, ANY): # error handling is viewed as user-land logic | |
_, sender, exc, tb = m | |
self.catch_exc(self.supervise, sender, exc, tb) | |
break | |
if m == ('terminated', ANY): | |
_, actor = m | |
if actor in self.watchees: | |
self.watchees.remove(actor) | |
self._unwatch(actor, silent=True) | |
elif m == ('_child_terminated', ANY): | |
self._child_gone(m[1]) | |
break | |
if self.impl.receive: | |
processing = True | |
self.proc = gevent.spawn(self.catch_exc, self.catch_unhandled, self.impl.receive, m) | |
gevent.idle() | |
elif self.impl.run: | |
assert self.ch.balance == -1 | |
if self.get_pt == m: | |
processing = True | |
self.ch.put(m) | |
self.inbox.extendleft(reversed(self.stash)) | |
self.stash.clear() | |
else: | |
self.stash.append(m) | |
else: | |
self.catch_exc(self.unhandled, m) | |
def _suspend_children(self): | |
for x in self.children: | |
x.send('_suspend') | |
def catch_exc(self, fn, *args, **kwargs): | |
try: | |
fn(*args, **kwargs) | |
except Exception: | |
self.queue.put(('__error', sys.exc_info()[1], sys.exc_info()[2])) | |
else: | |
self.queue.put('__done') | |
def catch_unhandled(self, fn, m): | |
try: | |
fn(m) | |
except Unhandled: | |
self.unhandled(m) | |
# processes (== Actors with a run() method) | |
def get(self, *patterns): | |
self.get_pt = OR(*patterns) | |
self.queue.put('__done') | |
return self.ch.get() | |
def flush(self): | |
while self.stash: | |
self.unhandled(self.stash.popleft()) | |
# TODO: haven't figured out yet how to cleanly fit an implementation of this to the existing state machine | |
# def escalate(self): | |
# _, exc, tb = sys.exc_info() | |
# if not (exc and tb): | |
# raise InvalidEscalation("Process.escalate must be called in an exception context") | |
# self.report((exc, tb)) | |
# # TODO: don't know what--this code is from the old Process class which has now been merged into this class | |
# birth & death | |
def construct(self): | |
factory = self.factory | |
try: | |
impl = factory() | |
except Exception: | |
raise CreateFailed("Constructing actor failed", factory) | |
impl._parent = self.parent | |
impl._set_cell(self) | |
if hasattr(impl, 'pre_start'): | |
pre_start = impl.pre_start | |
args, kwargs = impl.args, impl.kwargs | |
try: | |
pre_start(*args, **kwargs) | |
except Exception: | |
raise CreateFailed("Actor failed to start", impl) | |
self.ch = gevent.queue.Channel() | |
if impl.run: | |
if impl.receive: | |
raise TypeError("actor should implement only run() or receive() but not both") | |
self.proc = gevent.spawn(self.wrap_run, impl.run) | |
self.stash = deque() | |
return impl | |
def wrap_run(self, fn): | |
try: | |
ret = fn(*self.impl.args, **self.impl.kwargs) | |
except Exception: | |
self.queue.put(('__error', sys.exc_info()[1], sys.exc_info()[2])) | |
else: | |
if ret is not None: | |
warnings.warn("Process.run should not return anything--it's ignored") | |
self.queue.put('__done') | |
self.queue.put('_stop') | |
def shutdown(self, term_msg='_stop'): | |
if hasattr(self.impl, 'post_stop'): | |
try: | |
self.impl.post_stop() | |
except Exception: | |
Events.log(ErrorIgnored(self.ref, sys.exc_info()[1], sys.exc_info()[2])) | |
while self.watchees: | |
self._unwatch(self.watchees.pop()) | |
for child in self.children: | |
child << term_msg | |
stash = deque() | |
while self.children: | |
m = self.queue.get() | |
if m == ('_child_terminated', ANY): | |
self._child_gone(m[1]) | |
else: | |
stash.appendleft(m) | |
self.queue.queue.extendleft(stash) | |
def destroy(self): | |
ref = self.ref # grab the ref before we stop, otherwise ref() returns a dead ref | |
self.stopped = True | |
self.queue.queue.extendleft(reversed(self.inbox)) | |
while True: | |
try: | |
m = self.queue.get_nowait() | |
except gevent.queue.Empty: | |
break | |
if m == ('_watched', ANY): | |
self._watched(m[1]) | |
elif m == ('_error', ANY, ANY, ANY): | |
_, sender, exc, tb = m | |
Events.log(ErrorIgnored(sender, exc, tb)) | |
elif m == ('__error', ANY, ANY): | |
_, exc, tb = m | |
Events.log(ErrorIgnored(ref, exc, tb)) | |
elif not (m == ('terminated', ANY) or m == ('_unwatched', ANY) or m == ('_node_down', ANY) or m == '_restart' or m == '_stop' or m == '_suspend' or m == '_resume' or m == '__done'): | |
Events.log(DeadLetter(ref, m)) | |
self.parent.send(('_child_terminated', ref)) | |
for watcher in (self.watchers or []): | |
watcher << ('terminated', ref) | |
self.impl = self.inbox = self.queue = ref._cell = self.parent = None | |
# unhandled | |
def unhandled(self, m): | |
if ('terminated', ANY) == m: | |
raise UnhandledTermination(watcher=self.ref, watchee=m[1]) | |
else: | |
Events.log(UnhandledMessage(self.ref, m)) | |
# supervision | |
def supervise(self, child, exc, tb): | |
if child not in self.children: | |
Events.log(ErrorIgnored(child, exc, tb)) | |
return | |
supervise = getattr(self.impl, 'supervise', None) | |
if supervise: | |
decision = supervise(exc) | |
if not supervise or decision == Default: | |
decision = default_supervise(exc) | |
dbg(u"%r → %r" % (decision, child)) | |
if not isinstance(decision, Decision): | |
raise BadSupervision("Bad supervisor decision: %s" % (decision,), exc, tb) | |
# TODO: make these always async? | |
if decision == Ignore: | |
child << '_ignore' | |
elif decision == Restart(ANY, ANY): | |
child << '_restart' | |
elif decision == Stop: | |
child << '_stop' | |
else: | |
raise exc, None, tb | |
@logstring("report") | |
def report(self, exc_and_tb=None): | |
if not exc_and_tb: | |
_, exc, tb = sys.exc_info() | |
else: | |
exc, tb = exc_and_tb | |
if not isinstance(exc, UnhandledTermination): | |
if isinstance(tb, str): | |
exc_fmt = tb | |
else: | |
exc_fmt = ''.join(traceback.format_exception(type(exc), exc, tb)) | |
if isinstance(exc, WrappingException): | |
inner_exc_fm = traceback.format_exception(type(exc.cause), exc.cause, exc.tb) | |
inner_exc_fm = ''.join(' ' + line for line in inner_exc_fm) | |
exc_fmt += "-------\n" + inner_exc_fm | |
fail('\n\n', exc_fmt) | |
else: | |
fail("Died because a watched actor (%r) died" % (exc.watchee,)) | |
Events.log(Error(self.ref, exc, tb)), | |
self.parent << ('_error', self.ref, exc, tb) | |
# death watch | |
def watch(self, actor, *actors, **kwargs): | |
actors = (actor,) + actors | |
actors = [self.spawn(x, **kwargs) if isinstance(x, (type, Props)) else x for x in actors] | |
for other in actors: | |
node = self.uri.node | |
if other != self.ref: | |
assert not other.is_local if other.uri.node and other.uri.node != node else True, (other.uri.node, node) | |
if not self.watchees: | |
self.watchees = set() | |
if other not in self.watchees: | |
self.watchees.add(other) | |
if not other.is_local: | |
self.hub.watch_node(other.uri.node, report_to=self.ref) | |
other << ('_watched', self.ref) | |
return actors[0] if len(actors) == 1 else actors | |
def unwatch(self, actor, *actors): | |
actors = (actor,) + actors | |
if self.watchees: | |
for actor in actors: | |
try: | |
self.watchees.remove(actor) | |
except KeyError: | |
pass | |
else: | |
self._unwatch(actor) | |
def _unwatch(self, other, silent=False): | |
if not silent: | |
other << ('_unwatched', self.ref) | |
if not other.is_local: | |
self.hub.unwatch_node(other.uri.node, report_to=self.ref) | |
def _watched(self, other): | |
if not self.watchers: | |
self.watchers = set() | |
self.watchers.add(other) | |
def _unwatched(self, other): | |
if self.watchers: | |
self.watchers.discard(other) | |
# misc | |
@property | |
def ref(self): | |
if self.stopped: | |
return Ref(cell=None, uri=self.uri) | |
if not self._ref or not self._ref(): | |
ref = Ref(self, self.uri) # must store in a temporary variable to avoid immediate collection | |
self._ref = weakref.ref(ref) | |
return self._ref() | |
@property | |
def root(self): | |
from spinoff.actor.guardian import Guardian | |
return self.parent if isinstance(self.parent, Guardian) else self.parent._cell.root | |
def __repr__(self): | |
return "<cell:%s@%s>" % (type(self.impl).__name__ if self.impl else (self.factory.__name__ if isinstance(self.factory, type) else self.factory.cls.__name__), self.uri.path,) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from types import GeneratorType | |
import zmq.green as zmq | |
from gevent import sleep, spawn | |
from gevent.socket import gethostbyname | |
from spinoff.actor.events import Events, DeadLetter | |
from spinoff.actor.remoting.hublogic import HubLogic, Connect, Disconnect, SendPacket, Deliver, NotifySendFailed, NotifyNodeDown | |
__all__ = ['Hub'] | |
class Hub(object): | |
"""Handles traffic between actors on different nodes. | |
The wire-transport implementation is specified/overridden by the `incoming` and `outgoing` parameters. | |
""" | |
__doc_HEARTBEAT_INTERVAL__ = ( | |
"Time on seconds after which to send out a heartbeat signal to all known nodes. Regular messages can be " | |
"subsituted by the framework for heartbeats to save network bandwidth.") | |
HEARTBEAT_INTERVAL = 1.0 | |
ALLOWED_HEARTBEAT_DELAY = HEARTBEAT_INTERVAL * 0.2 | |
HEARTBEAT_MAX_SILENCE = 15.0 | |
def __init__(self, node_id, owner): | |
self._owner = owner | |
self._logic = HubLogic() | |
self._ctx = zmq.Context() | |
self._ctx.linger = 0 | |
self._sock = self._ctx.socket(zmq.ROUTER) | |
self._sock.identity = node_id | |
self._listener = spawn(self._listen) | |
self._hearbeater = spawn(self._send_recv_heartbeat) | |
self._watched_nodes = set() | |
def _listen(self): | |
recv, message_received, execute, t = self._sock.recv, self._logic.message_received, self._execute, time.time | |
while True: | |
sender_node_id, msg_bytes = recv() | |
execute(message_received(sender_node_id, msg_bytes, t())) | |
def _send_recv_heartbeat(self): | |
interval, heartbeat, execute, t = self.HEARTBEAT_INTERVAL, self._logic.heartbeat, self._execute, time.time | |
while True: | |
execute(heartbeat(t())) | |
sleep(interval) | |
def send_message(self, node_id, msg_handle): | |
self._execute(self._logic.send_message(node_id, msg_handle)) | |
def watch_node(self, node_id, watcher): | |
if node_id not in self._watched_nodes: | |
self._watched_nodes[node_id] = set(watcher) | |
self._execute(self.logic.watch_new_node(node_id)) | |
else: | |
self._watched_nodes[node_id].add(watcher) | |
def unwatch_node(self, node_id, watcher): | |
try: | |
self._watched_nodes[node_id].discard(watcher) | |
except IndexError: | |
pass | |
def stop(self): | |
self._listener.kill() | |
self._execute(self._logic.shutdown()) | |
sleep(.01) # XXX: needed? | |
self._sock.shutdown() | |
self._ctx.shutdown() | |
def _execute(self, actions): | |
for action in flatten(actions): | |
cmd = action[0] | |
if cmd is SendPacket: | |
_, node_id, msg_handle = action | |
self._sock.send_multipart([node_id, msg_handle()]) | |
elif cmd is Connect: | |
_, node_id = action | |
self._sock.connect(node_id_to_zmq_endpoint(node_id)) | |
elif cmd is Disconnect: | |
_, node_id = action | |
try: | |
self._sock.disconnect(node_id_to_zmq_endpoint(node_id)) | |
except zmq.ZMQError: | |
pass | |
elif cmd is Deliver: | |
_, msg_bytes, sender_node_id = action | |
self._owner.deliver(msg_bytes, sender_node_id) | |
elif cmd is NotifySendFailed: | |
_, msg_handle = action | |
Events.log(DeadLetter(msg_handle.get())) | |
elif cmd is NotifyNodeDown: | |
_, node_id = action | |
for watcher in self._watched_nodes.pop(node_id, []): | |
watcher << ('_node_down', node_id) | |
assert False, "unknown command" | |
def node_id_to_zmq_endpoint(node_id): | |
host, port = node_id.split(':') | |
return 'tcp://%s:%s' % (gethostbyname(host), port) | |
def flatten(gen): | |
for x in gen: | |
if isinstance(x, GeneratorType): | |
for y in x: | |
yield y | |
else: | |
yield x |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import struct | |
DISCONNECT, MESSAGE = 1, 2 | |
MSG_HEADER_FORMAT = '!I' | |
NO_CHANGE = object() | |
DISCONNECT_SIGNAL = '\0\0\0\0' | |
# signals from HubLogic to Hub or test code | |
Connect = object() | |
Disconnect = object() | |
SendPacket = object() | |
Deliver = object() | |
NotifySendFailed = object() | |
NotifyNodeDown = object() | |
class HubLogic(object): | |
def __init__(self): | |
self.channels = set() | |
self.last_seen = {} | |
self.versions = {} | |
self.queues = {} | |
# we start from 1 not 0 so that a 0 byte at the beginning of a pack-*et could be identified as a DISCONNECT | |
self.version = 1 | |
def send_message(self, recipient_node_id, msg_handle): | |
# is it a new connection, or an existing but not yet active connection? | |
if recipient_node_id in self.channels and self._is_connection_active(recipient_node_id): | |
yield SendPacket, recipient_node_id, msg_handle | |
else: | |
yield Connect, recipient_node_id | |
yield SendPacket, recipient_node_id, self._make_ping_packet() | |
self._channel_opened(recipient_node_id) | |
self.queues[recipient_node_id].append((recipient_node_id, msg_handle)) | |
def message_received(self, sender_node_id, msg_bytes, current_time): | |
if msg_bytes == DISCONNECT_SIGNAL: | |
if sender_node_id in self.channels: | |
yield Disconnect, sender_node_id | |
self.channels.remove(sender_node_id) | |
yield self._channel_lost(sender_node_id) | |
else: | |
version, msg_body_bytes = struct.unpack(MSG_HEADER_FORMAT, msg_bytes[:4]), msg_bytes[4:] | |
if sender_node_id not in self.channels: | |
yield SendPacket, sender_node_id, self._make_ping_packet() | |
self._channel_opened(sender_node_id) | |
else: | |
if sender_node_id in self.queues: | |
for msg_handle in self.queues.pop(sender_node_id, []): | |
yield SendPacket, sender_node_id, msg_handle | |
else: | |
if not (version > self.versions.get(sender_node_id, 0)): | |
# version has been reset--he has restarted, so emulate a node-down-node-back-up event pair: | |
yield self._channel_lost(sender_node_id) | |
self._channel_opened(sender_node_id) | |
self.versions[sender_node_id] = version | |
self.last_seen[sender_node_id] = current_time | |
if msg_body_bytes: | |
yield Deliver, msg_body_bytes, sender_node_id | |
def heartbeat(self, current_time): | |
t_gone = current_time - self.heartbeat_max_silence | |
for node_id in self.channels: | |
if self.last_seen[node_id] < t_gone: | |
yield Disconnect, node_id | |
self.channel.remove(node_id) | |
yield self._channel_lost(node_id) | |
else: | |
yield SendPacket, node_id, self._make_ping_packet() | |
self.version += 1 | |
def watch_new_node(self, node_id): | |
yield Connect, node_id | |
yield SendPacket, node_id, self._make_ping_packet() | |
self._channel_opened(node_id) | |
def shutdown(self): | |
for node_id in self.channels: | |
yield SendPacket, node_id, DISCONNECT_SIGNAL | |
yield self._channel_lost(node_id) | |
# private: | |
def _make_ping_packet(self): | |
return struct.pack(MSG_HEADER_FORMAT, self.version) | |
def _channel_opened(self, node_id): | |
self.queues[node_id] = [] | |
def _channel_lost(self, node_id): | |
for msg_handle in self.queues.pop(node_id, []): | |
yield NotifySendFailed, msg_handle | |
yield NotifyNodeDown, node_id | |
def _is_connection_active(self, node_id): | |
return node_id in self.versions |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment