Skip to content

Instantly share code, notes, and snippets.

@tav
Created October 14, 2009 22:33
Show Gist options
  • Select an option

  • Save tav/210461 to your computer and use it in GitHub Desktop.

Select an option

Save tav/210461 to your computer and use it in GitHub Desktop.
# this is seriously old and should not be used for anything...
"""An implementation of the DISCO (DIStributed COncurrency) Model."""
import atexit
import pickle
import select
import signal
import socket
import sys,os
import settings
from collections import deque
from copy import deepcopy #
from errno import EINTR
from heapq import heappop, heappush, heapify, heapreplace
from inspect import getargs, formatargspec #
from itertools import repeat
from sys import _getframe as get_frame
pimp('builtin/dict', 'AttrDict', 'CatDict', 'OrderedDict')
pimp('builtin/symbol', 'Any', 'Null', 'Event', 'Idling', 'Failed', 'Active',
'Finished', 'Create', 'Pause', 'Kill', 'Retry', 'Blank', 'Symbol') #
pimp('builtin/type', 'GeneratorType', 'ClassType', 'ClassicClassType',
'SequenceType')
pimp('builtin/uuid', 'UUID')
pimp('builtin/weakref', 'weakref', 'WeakSet', 'saferef', 'weakpointer') #
pimp('datetime/time', 'sleep', 'time')
pimp('error', 'Error')
pimp('functional', 'partial')
pimp('io/stringio', 'StringIO')
pimp('metaclass/selfsuper', 'SelfSuper')
pimp('os/lockfile', 'lock_file', 'unlock_file')
pimp('threading', 'allocate_lock', 'start_new_thread')
# ------------------------------------------------------------------------------
# some konstants
# ------------------------------------------------------------------------------
__export__ = {
'*': [
# utility
'clear_list', 'safely_set_object_in_namespace',
'reassign_function_arguments', 'get_class_mro', 'TypeChecker', 'is_type',
# error
'DiscoError', 'StopEventPropogation', 'StoreLimitReached',
'DuplicateServiceRegistration',
# some main klasses
'DiscoObject', 'DiscoOwner', 'DiscoDealer', 'DopeDealer', 'CokeDealer',
'DiscoClub',
# utility
'get_disco_owner', 'add_disco_worker', 'shutdown_disco',
# namespaces
'Namespace', 'ImmutableNamespace',
# worker utitlies
'create', 'create_service',
# worker main classes/utilities
'DiscoWorkerBase', 'is_a_disco_worker', 'worker', 'waiter', 'dancer'
],
':api': [
'get_disco_owner', 'add_disco_worker', 'shutdown_disco',
'create', 'create_service',
'Namespace', 'ImmutableNamespace',
'is_a_disco_worker', 'worker', 'waiter', 'dancer'
]
}
__metaclass__ = type
if settings.disco_owners is None:
settings.disco_owners = WeakSet()
THREADING_ENABLED = True
DEFAULT_CLUBS = 1
SINGLE = lambda *args, **kwargs: args and args[0] or kwargs.values()[0] # @/@ ?
FIXED = Symbol('FIXED')
POP = Symbol('POP')
MEMO = CatDict(memoise=True)
DROP = CatDict(drop_non_applicable_method=True)
MEMODROP = MEMO + DROP
OVERRIDE = CatDict(override=True)
REGISTER = CatDict(register=True)
try:
__refresh__
except NameError:
__refresh__ = False
# ------------------------------------------------------------------------------
# utility funktions
# ------------------------------------------------------------------------------
def clear_list(list):
"""A clear function for lists that behaves like ``set.clear``."""
list[:] = []
return list
NAMESPACE_LOCK = allocate_lock()
def safely_set_object_in_namespace(
object, namespace=Blank, depth=1, frame=None
):
"""Set the given ``object`` within a specified or calling ``namespace``."""
if isinstance(object, basestring):
return object
if namespace is Blank:
frame = get_frame(depth)
namespace = frame.f_globals
original_name = name = object.__name__
if original_name == '<lambda>':
original_name = name = 'anon_func'
n = 0
NAMESPACE_LOCK.acquire_lock()
while name in namespace:
n += 1
name = '%s%d' % (original_name, n)
namespace[name] = object
NAMESPACE_LOCK.release_lock()
del frame, namespace
return name
# ------------------------------------------------------------------------------
# typing-related utility funktions and klasses
# ------------------------------------------------------------------------------
def get_class_mro(klass):
"""Return the provided ``klass``'s lookup MRO (Method Resolution Order)."""
# new-style mro
if isinstance(klass, type):
return klass.__mro__
# old-skool mro
if isinstance(klass, ClassicClassType):
mro = []
def build_mro(base):
if base not in mro:
mro.append(base)
for base_class in base.__bases__:
build_mro(base_class)
build_mro(klass)
return tuple(mro)
# oops, not a klass type
raise TypeError("Unknown class type %r" % type(klass))
class TypeChecker:
"""Utility class for use as a predicate for type-checks."""
#__slots__ = ('types',)
mro_map = {}
__lock = allocate_lock()
acquire_lock = __lock.acquire_lock
release_lock = __lock.release_lock
def __init__(self, *types):
mro_map = self.mro_map
filtered_types = []
append_to_filtered_types = filtered_types.append
for obtype in types:
if not isinstance(obtype, ClassType):
raise TypeError("Unknown class type %r" % type(klass))
if obtype not in filtered_types:
append_to_filtered_types(obtype)
self.types = filtered_types
@classmethod
def build_type_map(klass, object):
pass
def __call__(self, object):
for type in self.types:
if isinstance(object, type):
pass
# is_type = TypeChecker
def is_type(*types):
"""Utility function for use as a predicate for type-checks."""
return lambda ob : isinstance(ob, types)
def get_locals():
frame = get_frame(1)
try:
return AttrDict(frame.f_locals)
finally:
del frame
# ------------------------------------------------------------------------------
# error klasses
# ------------------------------------------------------------------------------
class DiscoError(Error):
pass
class StopEventPropogation(DiscoError):
pass
class DuplicateServiceRegistration(DiscoError):
pass
class StoreLimitReached(DiscoError):
pass
# ------------------------------------------------------------------------------
# utility klasses
# ------------------------------------------------------------------------------
class Params:
__slots__ = 'args', 'kwargs'
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
# ------------------------------------------------------------------------------
# da base disko object
# ------------------------------------------------------------------------------
class DiscoObject(object):
"""The fundamental class from which all other Disco classes derive from."""
__metaclass__ = SelfSuper
def __init__(self, *args, **kwargs):
self.id = UUID()
self.name = ''
def __repr__(self):
if self.name:
if self.name.count(':'):
return '<%s: %s>' % (self.name, self.id)
return '<%s:%s: %s>' % (self.name, self.__class__.__name__,
self.id)
return '<%s: %s>' % (self.__class__.__name__, self.id)
__str__ = __repr__
# ------------------------------------------------------------------------------
# disko owners -- poor bastards ;p
# ------------------------------------------------------------------------------
class DiscoOwner(DiscoObject):
"""A ``DiscoOwner`` runs various ``DiscoClub``s."""
def __new__(klass, *args, **kwargs):
# still undecided -- only one owner per python process or many?
for disco_owner in settings.disco_owners:
return disco_owner()
return super(DiscoOwner, klass).__new__(klass, *args, **kwargs)
def __init__(self, initial=DEFAULT_CLUBS, using_threads=THREADING_ENABLED):
self.__super.__init__()
self.__lock = allocate_lock()
self.acquire_lock = self.__lock.acquire
self.release_lock = self.__lock.release
self.is_running = False
self.is_paused = False
self.is_migrating = False
self.using_threads = using_threads
self.club_pool = []
self.club_pool_marker = -1
self.clubs = {}
if isinstance(initial, int):
initial = ('club%i' % i for i in range(1, initial+1))
for club in initial:
self.add_club(club, 1, True)
settings.disco_owners.add(self)
settings.disco_master = self # weakref(self)
atexit.register(shutdown_disco)
def add_club(self, club=None, priority=1, add_to_club_pool=False):
if not isinstance(club, DiscoClub):
club = DiscoClub(club)
club_name = club.name
self.acquire_lock()
try:
if club in self.clubs:
raise KeyError("%s already has a club %r." % (self, club_name))
self.clubs[club] = [club, priority] # da priority level
if add_to_club_pool:
self.club_pool.append(club)
if self.is_running and self.using_threads:
club.run()
finally:
self.release_lock()
def get_club(self, club):
self.acquire_lock()
try:
if club not in self.clubs:
raise KeyError("%s has no club %r" % (self, club))
return self.clubs[club][0]
finally:
self.release_lock()
def remove_club(self, club):
club = self.get_club(club)
self.acquire_lock()
try:
club.shutdown()
if club in self.club_pool:
self.club_pool.remove(club)
del self.clubs[club]
finally:
self.release_lock()
def run(self, delay=1.5, using_threads=Blank):
if using_threads is Blank:
using_threads = self.using_threads
if not THREADING_ENABLED:
using_threads = False
if not self.is_running:
self.is_running = True
if using_threads:
for club in self.clubs:
club.run(delay, using_threads=True)
# @/@ need to modify this to microthread properly
# @/@ also, need to handle signal gracefully if threading status
# @/@ switches from False to True during runtime
else:
signal.signal(signal.SIGINT, self.shutdown)
runners = {}
try:
while self.is_running and not self.is_paused:
for club in self.clubs:
if club not in runners:
runners[club] = club.run(delay, False)
for club in runners:
if club not in self.clubs:
del runners[club]
else:
runners[club].next()
except StopIteration:
for club in self.clubs:
club.shutdown()
else:
if settings.debug:
print "!!ALERT!!\n%r is already running" % self
def pause(self):
self.is_paused = True
for club in self.clubs:
club.pause()
def unpause(self):
self.is_paused = False
for club in self.clubs:
club.unpause()
# @/@ there should be *komplete* shutdown method that also removes the klubs
# @/@ and uses threading.Condition to ensure that everything is safe.
def shutdown(self, graceful=True, kill=False):
if not self.is_running:
return
self.is_running = False
if settings.debug:
print "[%sshutting-down %r]" % (
graceful and "gracefully-" or "", self
)
if graceful:
for club in self.clubs:
club.shutdown(kill)
def __del__(self):
self.shutdown(graceful=True, kill=True)
if settings.debug:
print "[terminating-owner %r]" % self
# @/@ check klubs to see how busy they are and add worker appropriately?
# @/@ would randomly adding to klubs work just as well?
def add_worker_to_club(self, club, worker, *args, **kwargs):
if settings.debug:
print '[adding-worker-to-club %s %s]' % (club, worker)
self.get_club(club).add_worker(worker, *args, **kwargs)
def migrate_worker(self, worker, from_club, to_club):
self.get_club(to_club).add_worker(
self.get_club(from_club).remove_worker(worker, keep_alive=True),
already_instantiated=True
)
def add_worker(self, worker, *args, **kwargs):
self.acquire_lock()
try:
self.club_pool_marker += 1
self.club_pool_marker %= len(self.club_pool)
club = self.club_pool[self.club_pool_marker]
club.add_worker(worker, *args, **kwargs)
finally:
self.release_lock()
# ------------------------------------------------------------------------------
# dealers
# ------------------------------------------------------------------------------
class DiscoDealer:
"""A ``Club`` is run by a ``dealer``. Each Club has its own dealer."""
__slots__ = 'clients'
def __init__(self, clients):
self.clients = clients
def deal(self):
raise NotImplementedError
def pay(self, client, payment):
raise NotImplementedError
class DopeDealer(DiscoDealer):
"""A fair and balanced dealer."""
def deal(self):
for client in self.clients:
yield client, 1
def pay(self, client, payment):
pass
class CokeDealer(DiscoDealer):
"""A dealer whose clients tend to want priority over others."""
def __init__(self, clients, min=1, max=10):
self.clients = clients
self.min_boundary = min
self.max_boundary = max
def deal(self):
min = self.min_boundary
max = self.max_boundary
for client, time in self.clients.iteritems():
if time < min:
time = min
elif time > max:
time = max
yield client, time
def pay(self, client, payment):
if payment is Idling:
self.clients[client] -= 1
elif payment is Active:
self.clients[client] += 1
# ------------------------------------------------------------------------------
# disko klubs -- the klub always wins.
# ------------------------------------------------------------------------------
class DiscoClub(DiscoObject):
"""A ``DiscoClub`` where there are various workers, waiters, and dancers."""
def __init__(self, name=None, dealer=DopeDealer):
self.__super.__init__()
self.name = name or str(self.id) # 'unnamed'
self.is_running = False
self.is_paused = False
self.__lock = allocate_lock()
self.acquire_lock = self.__lock.acquire
self.release_lock = self.__lock.release
self.dealer = dealer # cannot be replased whilst running
self.workers = OrderedDict() # perhaps with priority ?
# --------------------------------------------------------------------------
# komparisons
# --------------------------------------------------------------------------
def __hash__(self):
return hash(self.name)
def __eq__(self, other):
if isinstance(other, DiscoClub):
return (self.name, self.workers) == (other.name, other.workers)
else:
return self.name == other
def __cmp__(self, other):
return cmp((self.name, self.workers), (other.name, other.workers))
# --------------------------------------------------------------------------
# the heart of darkness -- at least until punk kame along
# --------------------------------------------------------------------------
def run(self, delay=1.5, using_threads=True):
if using_threads:
start_new_thread(self.__run, (delay,))
else:
return self.__runner(delay)
def __run(self, delay):
runner = self.__runner(delay)
try:
while 1:
runner.next()
except StopIteration:
pass
def __runner(self, delay):
self.acquire_lock()
self.is_running = True
self.release_lock()
club_dealer = self.dealer
remove_worker = self.remove_worker
logger = self.create_logger(); logger.next()
workers = self.workers
last_iteration = time()
try:
dealer = club_dealer(workers)
while self.is_running and not self.is_paused:
if settings.debug:
if settings.debug == 1:
def log(x):
print x
else:
log = logger.send
log("[running-club %s]" % self)
remove = set()
add_to_remove = remove.add
yield Active
for worker, timeslice in dealer.deal():
if settings.disco_in_slow_motion:
last_timeslice = time()
for i in repeat(0, timeslice):
if settings.debug:
log("[running-worker %s]" % worker)
if not worker.is_alive:
add_to_remove(worker)
break
worker_status = worker.next()
if worker_status is Finished:
add_to_remove(worker)
break
dealer.pay(worker, worker_status)
if settings.disco_in_slow_motion:
this_timeslice = time() - last_timeslice
if this_timeslice < delay:
sleep_time = delay - this_timeslice
log("[timeslice-sleeping for %s]" % sleep_time)
sleep(delay)
yield Active
for worker in remove:
remove_worker(worker)
this_iteration = time() - last_iteration
if this_iteration < delay:
sleep_time = delay - this_iteration
log("[sleeping for %s]" % sleep_time)
sleep(sleep_time)
yield Idling
last_iteration = time()
if settings.debug:
log("[terminating-club %s]" % self)
except StopIteration:
yield Finished
logger.close()
self.shutdown()
# --------------------------------------------------------------------------
# nooooo! we want disko! let the musik play! don't shut us down!
# --------------------------------------------------------------------------
def shutdown(self, kill=False):
if settings.debug:
print "[terminating-club %s]" % self
self.acquire_lock()
self.is_running = False
self.release_lock()
if kill:
# sleep a bit to give the klub a chanse to shutdown properly
# @/@ will not suffise if a worker takes too long ...
sleep(0.5)
for worker in self.workers:
worker.kill()
def replace_dealer(self, dealer):
if self.is_running:
raise RuntimeError(
"Sorry, you cannot replace the dealer in a running club."
)
self.dealer = dealer
def pause(self):
self.is_paused = True
def unpause(self):
self.is_paused = False
# --------------------------------------------------------------------------
# manage workers
# --------------------------------------------------------------------------
def add_worker(self, worker, *args, **kwargs):
if settings.debug:
print '[adding-worker-to-club %s %s]' % (worker, self)
# if kwargs.get('already_instantiated', False):
if not isinstance(worker, DiscoWorkerBase):
worker = worker(*args, **kwargs)
return self.workers.setdefault(worker, 1)
def remove_worker(self, worker, keep_alive=False):
if settings.debug:
print '[removing-worker-from-club %s %s]' % (worker, self)
if not keep_alive:
try:
worker.kill()
except:
raise
del self.workers[worker]
return worker
# --------------------------------------------------------------------------
# log it!
# --------------------------------------------------------------------------
def create_logger(self, filename='%s.%s.log', format=True):
if format:
filename = filename % (self.name, os.getpid())
log = file(filename, 'a')
lock_file(log)
try:
while 1:
log.write((yield) + '\n')
log.flush()
finally:
unlock_file(log)
log.close()
# ------------------------------------------------------------------------------
# utility funktions
# ------------------------------------------------------------------------------
def get_disco_owner(*args, **kwargs):
"""Get a ``DiscoOwner``."""
if settings.disco_owners:
return settings.disco_master
if 'return_none' in kwargs:
return
return DiscoOwner(*args)
def add_disco_worker(worker):
"""Add a ``worker`` to the current ``DiscoOwner()``."""
owner = get_disco_owner()
if owner.is_migrating: # shutdown ? aktually migrate ?
raise RuntimeError(
"Sorry, we are currently migrating -- not accepting new workers."
)
owner.add_worker(worker)
def shutdown_disco():
"""Shutdown DISCO."""
owner = get_disco_owner(return_none=True)
if owner:
owner.shutdown()
del owner
# ------------------------------------------------------------------------------
# kapability namespases
# ------------------------------------------------------------------------------
class Namespace:
"""A normal namespace wrapper."""
def __init__(self, *args, **kwargs):
kwargs.update((value.__name__, value) for value in args)
# raise InvalidNamespace
if '__disco_type__' not in kwargs:
frame = get_frame(1)
kwargs['__disco_type__'] = frame.f_code.co_name
del frame
self.__dict__.update(kwargs)
def __str__(self):
return "<Namespace: %s>" % self.__dict__['__disco_type__']
__repr__ = __str__
class ImmutableNamespace(Namespace):
"""A restricted namespace wrapper that doesn't let new attributes be set."""
def __setattr__(self, attribute, value):
raise AttributeError('Read-only mode.')
def __delattr__(self, attribute):
raise AttributeError('Read-only mode.')
# ------------------------------------------------------------------------------
# disko worker kore utility funktions
# ------------------------------------------------------------------------------
def create(factory, *args, **kwargs):
worker = factory(*args, **kwargs)
add_disco_worker(worker)
return worker
def create_service(service_name, *args, **kwargs):
worker = get_service(service_name, *args, **kwargs)
add_disco_worker(worker)
return worker
# ------------------------------------------------------------------------------
# disko worker kore utility funktions
# ------------------------------------------------------------------------------
# kare should be taken when broadkasting an event
_marker = object()
def get_on(worker, store_name, definition=None, blocking=True, with_txn=False):
if isinstance(definition, basestring):
frame = get_frame(1)
definition = eval(
"lambda %s: AttrDict(locals())" %
definition, frame.f_globals, frame.f_locals
)
del frame
store = worker.stores.setdefault(store_name, [])
def callback():
result = _marker
if store:
txn, result = heappop(store)
if definition:
if isinstance(result, Params):
result = definition(*result.args, **result.kwargs)
else:
result = definition(result)
elif not blocking:
txn = with_txn and UUID()
if definition:
result = definition()
else:
result = Null # @/@ Blank ?
if result is not _marker:
worker.last_transaction = txn
if with_txn:
worker._input_stream.append((txn, result))
else:
worker._input_stream.append(result)
if worker.is_paused:
print "[unpausing-worker %s]" % worker
worker.is_paused = False
return Active
print "[pausing-worker %s]" % worker
worker.is_paused = True
worker._callbacks.append(callback)
return Idling
return callback()
def get(worker, definition=None, blocking=True, with_txn=False):
return get_on(worker, 'inbox', definition, blocking, with_txn)
def catch(worker, definition=None, blocking=True, with_txn=False):
# worker.default_exception_handling = False
return get_on(worker, 'exception', definition, blocking, with_txn)
def call_if_get(worker, namespace, blocking=False, suppress_exceptions=True):
def handler(*args, **kwargs):
method_name = args[0]
args = args[1:]
try:
return getattr(namespace, method_name)(*args, **kwargs)
except:
if not suppress_exceptions:
raise
return Active
return worker.get_on('call', handler, blocking)
# ------------------------------------------------------------------------------
# disko worker kore utility funktions
# ------------------------------------------------------------------------------
def put_on(worker, store_name, input, txn=Blank):
if not isinstance(worker, DiscoWorkerBase):
try:
if isinstance(worker, UUID):
worker = get_worker(worker)
else:
worker = get_service(worker)
except:
raise
# raise TypeError("%r is not a valid disco worker." % worker)
store_limit = worker.store_limits.get(store_name, 0)
if store_limit and worker.get_store_size() >= store_limit:
raise StoreLimitReached(
"Reached a limit of %r in %s" % (store_limit, worker)
)
if txn is Blank:
txn = UUID()
heappush(worker.stores.setdefault(store_name, []), (txn, input))
worker.acquire_lock()
if worker._callbacks:
callback = worker._callbacks.popleft()
callback()
worker.release_lock()
return Active
def put(worker, input, txn=Blank):
return put_on(worker, 'inbox', input, txn)
# ------------------------------------------------------------------------------
# disko worker kore utility funktions
# ------------------------------------------------------------------------------
def pipeline(*workers):
workers = iter(workers)
last_worker = workers.next()
for worker in workers:
link(last_worker, worker)
else:
result = DiscoResult()
link(worker, result)
return result
# example match
# def foo():
# while match((yield Get, 'name', 'age')) is None:
# pass
# disco_sleep / until / timeout / alarm / call_at
# ------------------------------------------------------------------------------
# events
# ------------------------------------------------------------------------------
# Same places, same faces, same times, same crimes. Way too easy to fall into a
# rut with resyk waiting at its end -- Judge Dredd
class DiscoEvent(DiscoObject):
__slots__ = (
'id', 'sender', 'stop_propagation', 'payload', 'name', 'counter',
'max_hops', 'max_time', 'name'
)
def __init__(
self, sender, name, payload=None, counter=0, max_hops=0, max_time=0
):
self.__super.__init__()
self.name = name
# the event sourse
if isinstance(sender, DiscoWorkerBase):
sender = sender.id
self.sender = sender
# to propagate or not to propagate flag
self.stop_propagation = False
# our payload -- usually the event kontext
if payload is None:
payload = AttrDict()
self.payload = payload
# ttl (time to live) related variables
self.counter = counter
self.max_hops = max_hops
self.max_time = max_time
def __setattr__(self, attribute, value):
if attribute.startswith('_') or not hasattr(type(self), attribute):
raise TypeError("%r is not a public attribute of %r objects."
% (attribute, type(self).__name__))
super(DiscoEvent, self).__setattr__(attribute, value)
def is_valid(self):
"""Return whether an event is still valid."""
# if ttl limits have been exseeded or stop_propagation has been set ...
if ((self.max_hops and self.counter > self.max_hops) or
self.stop_propagation):
return False
# @/@ ehm ... how does max_time work over a network ? ntpd ?
# @/@ hmz, i think we need to send out the max time with the kommit
# @/@ event -- in fakt the kommit and validate events are the same?
# @/@ just resent with max time altered somehow and some other flag
# @/@ set?
# @/@ seems on the right trak... kan't think. talk later.
# @/@ k
if self.max_time:
if (time() - get_event_handled_time(self.id)) > self.max_time:
return False
# @/@ ehm ... isn't this mixing applikation logic with the events
# @/@ system? surely checking max time should be something the
# @/@ applikation should do?
return True
@classmethod
def get_receivers(klass, event_name, sender):
return klass._subscriptions.get(event_name, {}).get(id(sender), ())
def __getstate__(self):
return (
self.id,
self.sender, # getattr(self.sender, 'id', None),
self.stop_propagation,
self.payload,
self.counter,
self.max_hops,
self.max_time,
)
def __setstate__(self, state):
(self.id,
self.sender,
self.stop_propagation,
self.payload,
self.counter,
self.max_hops,
self.max_time,
) = state
# ------------------------------------------------------------------------------
# disko worker kore utility funktions
# ------------------------------------------------------------------------------
def send_event(sender, event_name, payload, txn=Blank):
print "Sending:", event_name, "on", sender, 'with', payload
return Active
set_event_handled_time(event.id)
for receiver in list(klass.get_receivers(event_name, event.sender)):
event.counter += 1
if not event.is_valid():
break
try:
receiver, link = receiver
if link:
link(receiver, event)
else:
receiver(event)
except StopEventPropogation:
break
except:
event.stop_propagation = True
raise
def send(sender, payload, txn=Blank):
return send_event(sender, 'output', payload, txn)
def send_txn(sender, payload):
return send_event(sender, 'output', payload, sender.last_transaction)
def throw(sender, exception):
try:
type, value, traceback = exc_info()
raise t,v,tb
finally:
type = value = traceback = None
return send_event(sender, 'exception', type, value, traceback)
# root | root['event name'] -- both have "default" subscribers
# ------------------------------------------------------------------------------
# disko worker kore utility funktions
# ------------------------------------------------------------------------------
def localise(kwargs):
frame = get_frame(1)
frame.f_locals.update(kwargs)
del frame
return Active
# for k, v in kwargs.iteritems(): exec('%s=v' % k)
# for k, v in kwargs.iteritems():
# exec('%s=v' % k, locals(), frame.f_globals)
# worker._main.gi_frame.f_locals.update(kwargs)
def match():
return Retry, ""
return Active
def transact(*workers): # atomic
for worker in workers:
if worker.io_monad:
raise TypeError("Cannot deal with an I/O Monad: %r" % worker)
pass
# commit
# undo
# ------------------------------------------------------------------------------
# disko worker kore utility funktions
# ------------------------------------------------------------------------------
class DiscoLinkMap:
__slots__ = (
'subscriptions', '__lock', 'acquire_lock', 'release_lock', 'registry'
)
def __init__(self, seed_data=None):
self.__lock = allocate_lock()
self.acquire_lock = self.__lock.acquire_lock
self.release_lock = self.__lock.release_lock
if seed_data is None:
self.subscriptions = {}
else:
self.subscriptions = seed_data
atexit.register(self.subscriptions.clear)
def simple_link(self, source, destination):
pass
def link(
self, source, destination, source_namespace=Blank,
destination_namespace=Blank, matchers=Blank, connector=Blank,
defaults=Blank, mutators=Blank, predicates=Blank, bouncer=Blank,
destination_store='inbox', hold=False,
):
if source_namespace is not Blank:
source = getattr(source_namespace, source)
if destination_namespace is not Blank:
destination = getattr(destination_namespace, destination)
# (klass, event_name, sender, receiver, link=None, hold=False):
sender_id = id(sender)
if hold:
rcv = receiver
else:
# keep weak receiver in a separate variable, so receiver can't
# get GC'd before we finish
rcv = saferef(
receiver, lambda ref: klass._unsubscribe(sender_id, rcv)
)
subscriptions = klass._subscriptions
if event_name not in subscriptions:
subscriptions[event_name] = {}
event_subscriptions = subscriptions[event_name]
if sender_id in event_subscriptions:
# don't create another weak sender if one's already registered
receivers = event_subscriptions[sender_id]
# don't add receivers we've already subscribed
for x_receiver, x_link in receivers:
if x_receiver == rcv:
return
else:
receivers.append((rcv, link))
return
ws = weakpointer(
sender, lambda ref: klass._unsubscribe(event_name, ws, None)
)
# setdefault here is for thread-safety, in case two threads
# add a subscriber for the same sender (i.e., it's not guaranteed
# that 'sender_id not in klass._receivers' still holds at this point)
receivers = event_subscriptions.setdefault(ws, [])
for x_receiver, x_link in receivers:
if x_receiver == rcv:
return
else:
receivers.append((rcv, link))
# bidirectional=False):
#self.link(source, sink)
#if bidirectional:
# self.link(sink, source)
def unlink(source, event_name, sender, receiver=None):
"""Stop sending events of this type from `sender` to `receiver`"""
klass._unsubscribe(event_name, id(sender), receiver)
def _unlink(klass, event_name, sender_id, receiver=None):
try:
receivers = klass._subscriptions[event_name][sender_id]
except KeyError:
return
if receiver is None:
clear_list(receivers)
del klass._subscriptions[event_name][sender_id]
return
try:
receivers.remove(receiver)
except ValueError:
pass
try:
receivers.remove(saferef(receiver))
except ValueError:
pass
def intercept(self):
pass
def clone(self):
pass
def clear(self):
self.subscriptions.clear()
# @/@ clean up with WeakSet / WeakList
disco_link_map = DiscoLinkMap()
link = disco_link_map.link
unlink = disco_link_map.unlink
intercept = disco_link_map.unlink
clone = disco_link_map.clone
# non_monadic = True
# default_event_name = 'output'
# Abort | Retry | Idling | Active | Pause | Create
# race
# ------------------------------------------------------------------------------
# servises
# ------------------------------------------------------------------------------
def service_manager(activity, event):
# services = {}
if activity in services:
return services[activity](event)
if hasattr(event, 'on_error'):
event.on_error(event)
#def echo(event):
# print "Got:", event
# register_service('hi', echo)
# Event.subscribe('a', 'hi', service_manager)
# import pickle
# d = pickle.dumps(Event('a'), protocol=2)
# print pickle.loads(d)
# on error
# Event(pattern=Any ... )
#def subscribe(name, publisher, subscriber):
# return Event.subscribe(name, publisher, subscriber)
# adapter adaptor
# coupler junction
# ------------------------------------------------------------------------------
# keep trak of event initialisation times
# ------------------------------------------------------------------------------
disco_event_timekeeper = {}
disco_event_timekeeper_lock = allocate_lock()
def get_event_handled_time(event_id):
return disco_event_timekeeper[event_id]
def set_event_handled_time(event_id, force=False):
disco_event_timekeeper_lock.acquire_lock()
if force or event_id not in disco_event_timekeeper:
disco_event_timekeeper[event_id] = time()
disco_event_timekeeper_lock.release_lock()
# ------------------------------------------------------------------------------
# disko base
# ------------------------------------------------------------------------------
class DiscoWorkerBase(DiscoObject):
"""Base class for ``DiscoWorker``s."""
def is_a_disco_worker(worker):
return isinstance(worker, DiscoWorkerBase)
disco_workers_roster = {}
get_worker = disco_workers_roster.__getitem__
# ------------------------------------------------------------------------------
# our disko workers!
# ------------------------------------------------------------------------------
def _worker(
function, service_name=None, default_args=None, default_kwargs=None,
wrap_function=False, is_method=False, io_monad=False, auto_finish=True,
setup=Blank, signature=None, overridable_error_handler=False,
override_existing_service=True,
):
"""Turn a normal looking function/coroutine into a ``DiscoWorker``."""
# --------------------------------------------------------------------------
# grab the underlying DiscoWorker implementation if we are an instanse
# --------------------------------------------------------------------------
if isinstance(function, DiscoWorkerBase):
function = function._coroutine
# --------------------------------------------------------------------------
# wrap a normal funktion inside a generator
# --------------------------------------------------------------------------
elif wrap_function:
def wrapper(self, *args, **kwargs):
yield Active
yield function(self, *args, **kwargs)
yield Finished
function = wrapper
# --------------------------------------------------------------------------
# da sexy working klass
# --------------------------------------------------------------------------
class DiscoWorker(DiscoWorkerBase):
"""DiscoWorkers of the world unite!"""
default_stores = ['inbox', 'call', 'control', 'exception']
default_args = ()
default_kwargs = {}
io_monad = False
last_transaction = Blank
def __init__(self, *args, **kwargs):
self.__super.__init__()
self.is_alive = True # 'conceived' | alive | dead
self.is_paused = False
self.__lock = allocate_lock()
self.acquire_lock = self.__lock.acquire
self.release_lock = self.__lock.release
self._callbacks = deque()
self._input_stream = deque()
coroutine = self._coroutine
self.name = "%s():DiscoWorker" % coroutine.__name__
# kurrying-like support
args = self.default_args + args
if kwargs and self.default_kwargs:
_kwargs = self.default_kwargs.copy()
_kwargs.update(kwargs)
else:
_kwargs = kwargs or self.default_kwargs
self.initialisers = (args, _kwargs)
self._main = coroutine(*args, **_kwargs)
self.store_limits = {}
self.stores = {}
set = self.stores.__setitem__
for store_name in self.default_stores:
set(store_name, [])
disco_workers_roster[self.id] = self
send(self.__class__, 'created', self)
self.next = self.__iter__().next
@classmethod
def update(klass, coroutine):
klass._coroutine = coroutine
# ----------------------------------------------------------------------
# kontrol
# ----------------------------------------------------------------------
def kill(self):
if settings.debug and self.is_alive:
print "[terminating-worker %s]" % self
try:
self.is_alive = False
del disco_workers_roster[self.id]
self._main.close()
except:
raise
def revive(self): # this wise?
if self.is_alive:
return self
args, kwargs = self.initialisers
return self.__class__(*args, **kwargs)
# ----------------------------------------------------------------------
# iterate
# ----------------------------------------------------------------------
def __iter__(self):
send = self._main.send
send_event = self.send_event
has_input = self._input_stream.__len__
pop_input = self._input_stream.popleft
try:
while 1:
if self.is_paused:
yield Idling
continue
input = has_input() and pop_input() or None
try:
output = send(input)
if output in (Active, Idling):
yield output
elif output is Finished:
raise StopIteration("Received Finished.")
else:
yield send_event('output', output)
yield Active
except StopIteration:
send_event('control', Finished)
raise
except Exception:
# self.throw()
raise
raise StopIteration("Received Exception.")
except StopIteration:
self.kill()
yield Finished # Dead
# ----------------------------------------------------------------------
# get/put from stores
# ----------------------------------------------------------------------
get = get
put = put
def __call__(self, *args, **kwargs):
self.put(*args, **kwargs)
return self # we return self for aiding in pipelining
send = send
send_event = send_event
connect = link = subscribe = link
# ----------------------------------------------------------------------
# get them stores! store exists? not empty? full?
# ----------------------------------------------------------------------
def get_store(self, name='inbox'):
return self.stores.setdefault(name, [])
def clear_store(self, name='inbox'):
if name in self.stores:
self.stores[name][:] = []
def get_store_size(self, name='inbox'):
return len(self.stores.get(name, []))
def set_store_limit(self, name, limit):
self.store_limits[name] = limit
inbox = property(lambda self: self.get_store('inbox'))
@property
def control(self):
return self.get_store('control')
# ----------------------------------------------------------------------
# pattern
# ----------------------------------------------------------------------
def on(self, *args, **kwargs):
return lambda x: x
def when(self, *args, **kwargs):
return lambda x: x
# ----------------------------------------------------------------------
# unions -- used for ``|`` pipelining
# ----------------------------------------------------------------------
__or__ = pipeline
def __ror__(self, other):
return 'WANNA PIPELINE:', self, other
# --------------------------------------------------------------------------
#
# --------------------------------------------------------------------------
DiscoWorker.update(function)
DiscoWorker.__name__ = "%s:DiscoWorker" % function.__name__
DiscoWorker.__doc__ = function.__doc__
# --------------------------------------------------------------------------
#
# --------------------------------------------------------------------------
if default_args:
DiscoWorker.default_args = default_args
if default_kwargs:
DiscoWorker.default_kwargs = default_kwargs
# used in transaktions
if io_monad:
DiscoWorker.io_monad = True
if service_name:
DiscoWorker.service_name = service_name
register_service(
service_name, DiscoWorker, default_args or (), default_kwargs or {},
override=override_existing_service,
# suppress_duplicate_registration_error=True
)
return DiscoWorker
# ------------------------------------------------------------------------------
# our kore funktions
# ------------------------------------------------------------------------------
def worker(**kwargs):
return partial(_worker, **kwargs)
def waiter(**kwargs):
kwargs.update(wrap_function=True)
return partial(_worker, **kwargs)
def dancer(**kwargs):
kwargs.update({})
return partial(_worker, **kwargs)
# ------------------------------------------------------------------------------
# pipelined output
# ------------------------------------------------------------------------------
@worker()
def DiscoResult(self):
"""An object for getting results from DiscoWorkers."""
# DiscoError -- need to include traceback in the object ...
# if exception ...
control = self.stores['control']
inbox = self.stores['inbox']
exception = self.stores['exception']
delay = 0.0005
while Finished not in control:
if exception:
raise heappop(exception)[1]
elif inbox:
yield heappop(inbox)[1]
delay = 0.0005
else:
delay = min(delay * 2, 0.05)
sleep(delay)
while inbox:
yield inbox.popleft()
def head(result):
return result.next()
def tail(result):
try:
result.next()
except StopIteration:
return iter([])
else:
return result
# ------------------------------------------------------------------------------
# disko servise registry
# ------------------------------------------------------------------------------
@worker()
def ServiceManager(self, seed_data=None):
"""Manage disco services."""
if seed_data is None:
registry = {}
else:
registry = seed_data
instantiated = {}
service = None
while 1:
command = yield service
try:
command_name = command[0]
if command_name == 'get':
factory_or_service_name = command[1]
args, kwargs = (), {}
if isinstance(factory_or_service_name, basestring):
factory = None
service_name = factory_or_service_name
else:
factory = factory_or_service_name
service_name = getattr(
factory, 'service_name', factory.__name__
)
if service_name in instantiated:
service = instantiated[service_name]
if service.is_alive:
continue
else:
del instantiated[service_name]
if factory:
extra = command[2:4]
len_extra = len(extra)
if len_extra == 1:
extra = extra[0]
if isinstance(extra, tuple):
args = extra
else:
kwargs = extra
elif len_extra == 2:
args, kwargs = extra
elif service_name in registry:
factory, args, kwargs = registry[service_name]
else:
raise KeyError("No factories found for %r" % service_name)
service = create(factory, *args, **kwargs)
instantiated[service_name] = service
elif command_name == 'register':
name, factory, args, kwargs = command[1:]
if name in registry and '_override' not in kwargs:
raise DuplicateServiceRegistration(
"Service %r has already been registered." % name
)
registry[name] = (factory, args, kwargs)
service = None
else:
raise RuntimeError("Invalid command: %r" % command_name)
except Exception, reason:
service = reason
# hold a referense to the default servise registry
# if not settings.disco_services:
# settings.disco_services = create(ServiceManager)
def register_service(name, worker_factory, *args, **kwargs): # args, kwargs
value = settings.disco_services.send((
'register', name, worker_factory, args, kwargs
))
if isinstance(value, DuplicateServiceRegistration):
if not kwargs.get('_suppress_duplicate_registration_error'):
raise value
elif isinstance(value, Exception):
raise value
def get_service(name, *args, **kwargs):
service = settings.disco_services.send(('get', name, args, kwargs))
if isinstance(service, Exception):
raise service
return service
# ------------------------------------------------------------------------------
# disko servise registry -- take 2 (or was that take 1?)
# ------------------------------------------------------------------------------
class ServiceManager:
"""Manage disco services."""
__slots__ = (
'__lock', 'acquire_lock', 'release_lock', 'registry', 'instantiated'
)
def __init__(self, seed_data=None):
self.__lock = allocate_lock()
self.acquire_lock = self.__lock.acquire_lock
self.release_lock = self.__lock.release_lock
if seed_data is None:
self.registry = {}
else:
self.registry = seed_data
self.instantiated = {}
def get_service(self, name, *args, **kwargs):
self.acquire_lock()
try:
instantiated = self.instantiated
registry = self.registry
if isinstance(name, basestring):
factory = None
service_name = name
else:
factory = name
service_name = getattr(
factory, 'service_name', factory.__name__
)
if service_name in instantiated:
service = instantiated[service_name]
if service.is_alive:
return service
else:
del instantiated[service_name]
if factory:
pass
elif service_name in registry:
factory, args, kwargs = registry[service_name]
else:
raise KeyError("No factories found for %r" % service_name)
finally:
self.release_lock()
if settings.debug:
print '[creating-service %r]' % service_name
service = create(factory, *args, **kwargs)
self.acquire_lock()
try:
instantiated[service_name] = service
return service
finally:
self.release_lock()
def register_service(
self, name, factory, args, kwargs,
override=False, suppress_duplicate_registration_error=False
):
self.acquire_lock()
try:
registry = self.registry
if name in registry and not override:
if suppress_duplicate_registration_error:
raise DuplicateServiceRegistration(
"Service %r has already been registered." % name
)
if settings.debug:
print '[registering-service %r]' % name
registry[name] = (factory, args, kwargs)
finally:
self.release_lock()
def prune(self):
self.acquire_lock()
try:
remove = set()
add_to_remove = remove.add
for name, worker in self.instantiated.iteritems():
if not worker.is_alive:
add_to_remove(name)
delitem = self.instantiated.__delitem__
for name in remove:
if settings.debug:
print '[removing-service %r]' % name
delitem(name)
finally:
self.release_lock()
# if we don't have a predefined alternative manager we kreate a default manager
if not settings.disco_services:
settings.disco_services = ServiceManager()
disco_services = settings.disco_services
get_service = disco_services.get_service
register_service = disco_services.register_service
# ------------------------------------------------------------------------------
# selekt servise
# ------------------------------------------------------------------------------
@worker(service_name="core/factory")
def FactoryService(self):
factories = {}
yield
@worker(service_name="core/registry")
def RegistryService(self):
meta_registry = {}
registry = None
while 1:
event, service = yield self.receive()
if isinstance(service, basestring):
registry = meta_registry.setdefault(service, {})
yield event.send(self, registry)
# get_registry
# instantiate_service = get_service
# ------------------------------------------------------------------------------
# selekt servise
# ------------------------------------------------------------------------------
# handle_error()
@worker(service_name="core/poll select", io_monad=True)
def poll_select(self, selector=select.select, default_timeout=0.0):
"""Let's listen to them file descriptors (sockets) !"""
registry = {}
timeout = [default_timeout]
def register(file_descriptor, readable=None, writable=None):
registry[fd] = (readable, writable)
def deregister(file_descriptor):
if fd in registry:
del registry[fd]
def set_timeout(new_timeout):
timeout[0] = new_timeout
namespace = ImmutableNamespace(register, deregister, set_timeout)
reader_sockets = []
writer_sockets = []
except_sockets = []
append_to_reader_sockets = reader_sockets.append
append_to_writer_sockets = writer_sockets.append
append_to_except_sockets = except_sockets.append
while 1:
yield call_if_get(self, namespace)
reader_sockets[:] = []
writer_sockets[:] = []
except_sockets[:] = []
yield Active
for socket_fileno in registry:
readable, writable = registry[socket_fileno]
if readable is None or readable():
append_to_reader_sockets(socket_fileno)
if writable is None or writable():
append_to_writer_sockets(socket_fileno)
if except_sockets == reader_sockets == writer_sockets:
continue
try:
ready_to_read, ready_to_write, exceptional = selector(
reader_sockets, writer_sockets, except_sockets, timeout[0]
)
except select.error, reason:
if reason[0] not in [EINTR]:
yield Error, reason # throw(reason)
continue
for socket_fileno in ready_to_read:
yield self.send("readable", socket_fileno)
for socket_fileno in ready_to_write:
yield self.send("writable", socket_fileno)
for socket_fileno in exceptional: # out of band data
yield self.send("exceptional", socket_fileno)
# readable(socket_fileno)
# ------------------------------------------------------------------------------
# demo workers and waiters
# ------------------------------------------------------------------------------
if settings.debug:
@worker()
def echo(self):
while 1:
value = yield self.get(SINGLE)
yield value
@worker()
def echo(self):
while 1:
yield (yield self.get(SINGLE))
@worker()
def echo(self):
while 1:
for k, v in (yield self.get('value')).iteritems():
exec("%s=v" % k)
yield value
@worker()
def echo(self):
while 1:
localise((yield self.get('value'))) # kan we lose a set of () here ?
exec '' # kan we get rid of this ?
yield value
input = yield self.get('value, name="tom", age=27')
# input would be:
# {'value': 1700,
# 'name': 'tom',
# 'age': 27}
"%s is age %s" % (name, age)
@worker()
def echo(self):
while 1:
(value,), kwargs = yield self.get()
# here, one could do:
# name = kwargs.get('name', 'default value goes here')
yield value
@worker()
def echo(self):
"""A demonstration ``echo`` disco worker."""
while 1:
value = yield self.get()
yield value
@worker()
def square(self):
while 1:
yield self.send((yield self.get(SINGLE)) ** 2)
@worker(setup=MEMO)
def square(self):
"""A demonstration ``square`` disco worker."""
while 1:
n = yield self.get()
yield n ** 2
@worker()
def counter(self, start=0, stop=Blank, step=1):
if stop is Blank:
stop, start = start, 0
if step > 0:
while start < stop:
yield start
start += step
else:
while start > stop:
yield start
start += step
@worker()
def counter(self, start=0, stop=Blank, step=1):
"""A demonstration ``counter`` disco worker."""
if stop is Blank:
stop, start = start, 0
for n in xrange(start, stop, step):
yield n
def counter2(self):
txn = yield get('n', stop=10)
while n < stop:
n += 1
yield send('output', self, n, txn=txn)
yield send('end transaction', self, txn)
def test_transact(n):
range(n, n + 3)
n += 3
# intercept
# capture
# event = txn = yield get('a')
# around
# exceptions / deffered
# counter()(2) | square()
# counter(2) | square
# counter(2, 10) | echo() | square() | sum()
# before / after / around / mcombine (:progn :and :or :standard)
# don't calculate ancestor lists for objects with only one parent
# calculated initial values
# defgeneric / undefmethod(str, str)
# :before :after
# :method-combination (+ and append list max min nconc or progn)
# yield Finished -- break
# get('n') not match ({'Finished':True})
# lazy map / cat / filter
def fibonacci(self):
put(self, n-1)
# max recursive depth reached
ocaml = """
let rec fibonacci2 = function
0 -> 1
| 1 -> 1
| n -> fibonacci2 (n - 1) + fibonacci2 (n - 2);;
"""
def memoise(function):
cache = {}
def memoised_function(*args):
if args in cache:
return cache[args]
return cache.setdefault(args, function(*args))
memoised_function.__name__ = function.__name__
memoised_function.cache = cache
return memoised_function
@memoise
def fibonacci(n):
if n < 2:
return n
return fibonacci(n - 1) + fibonacci(n -2)
# 1.61803398875
# def fib_non_recu(n):
def fibonacci(n):
if n < 2:
yield OUTPUT, n
# philosophers
# multimethod combine example
def factorial(self):
while 1:
event = localise((yield get('n')))
if n == 1:
event.send(self, 'output', 1)
else:
event.subscribe(self, 'call', self)
event.send('call', self, n-1, lambda k: send(c, n * k))
event.send('output', self, n-1, lambda k: send(c, n * k))
# @match(int, continuation)
# @match('n', 'c')
# def factorial(message):
# if n == 1:
# send(c, 1)
# else:
# send(factorial, n-1, lambda k: send(c, n * k))
# send(factorial, 3, wantsanswer)
def pointmaker(x, y):
def toX():
return x
def toY():
return y
def __add__(other):
return pointmaker(x + other.toX(), y + other.toY())
def __repr__():
return u"<%s,%s>" % (x, y)
return Namespace(toX, toY, __add__, __repr__)
def countermaker(start=0):
x = start
def get_count():
return x
def incr():
global x
x += 1
while 1:
dispatch = yield
yield locals()[dispatch]()
def countermaker(init=0):
def get_count():
return init
def incr():
init = init
init += 1
return Namespace(get_count, incr)
def factorial(x):
if x== 1: return 1
return x * factorial(x-1)
def fact2(x):
if x == 1:
yield 0
else:
yield x * fact2(x - 1)
def create_logger(self, filename='club_%i.log', format=True):
if format:
filename = filename % self.name
f = file(filename, 'a')
lock_file(f)
def close():
unlock_file(f)
f.close()
return Finished
# namespace = ImmutableNamespace(close)
# self.link('Finished', close)
# self.register_on_finish(close)
try:
while 1:
# yield call_if_get(self, namespace)
f.write((yield self.get('text')))
f.flush()
finally:
close()
# ------------------------------------------------------------------------------
# default s11n of python objekts
# ------------------------------------------------------------------------------
class Unpickler(pickle.Unpickler):
def find_class(self, module, name):
try:
__import__(module)
except ImportError:
pimp(module)
return getattr(sys.modules[module], name)
Pickler = pickle.Pickler
def pickle(obj, stream=Blank, retval=False):
if stream is Blank:
stream = StringIO()
retval = True
Pickler(stream, 2).dump(obj)
if retval:
return stream.getvalue()
def unpickle(stream):
if isinstance(stream, basestring):
stream = StringIO(stream)
return Unpickler(stream).load()
# ------------------------------------------------------------------------------
# them funky dansers!
# ------------------------------------------------------------------------------
@worker(service_name="core/unix pipe")
def UnixPipe(self, name):
try:
os.mkfifo(name)
except OSError, e:
if e.strerror != 'File exists':
raise
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
i = 0
while i < 100:
try:
s.bind('/tmp/disco.%i.socket' % i)
except socket.error:
i += 1
else:
break
try:
os.unlink('/tmp/disco.%i.socket' % i)
except OSError:
pass
def send(text):
f = file(name, 'a')
f.write(text + '\n')
def receive():
f = file(name, 'r')
return f.readline()
# p = UnixPipe('hello')
# print p.receive()
# print 'fo'
# p.send('foo')
# print 'fo2'
# print p.receive()
@worker(service_name="core/delayed pipe")
def DelayedSender(self):
pass
@worker(service_name="core/socket pipe")
def SocketPipe(self, port):
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment