Created
October 14, 2009 22:33
-
-
Save tav/210461 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # this is seriously old and should not be used for anything... | |
| """An implementation of the DISCO (DIStributed COncurrency) Model.""" | |
| import atexit | |
| import pickle | |
| import select | |
| import signal | |
| import socket | |
| import sys,os | |
| import settings | |
| from collections import deque | |
| from copy import deepcopy # | |
| from errno import EINTR | |
| from heapq import heappop, heappush, heapify, heapreplace | |
| from inspect import getargs, formatargspec # | |
| from itertools import repeat | |
| from sys import _getframe as get_frame | |
| pimp('builtin/dict', 'AttrDict', 'CatDict', 'OrderedDict') | |
| pimp('builtin/symbol', 'Any', 'Null', 'Event', 'Idling', 'Failed', 'Active', | |
| 'Finished', 'Create', 'Pause', 'Kill', 'Retry', 'Blank', 'Symbol') # | |
| pimp('builtin/type', 'GeneratorType', 'ClassType', 'ClassicClassType', | |
| 'SequenceType') | |
| pimp('builtin/uuid', 'UUID') | |
| pimp('builtin/weakref', 'weakref', 'WeakSet', 'saferef', 'weakpointer') # | |
| pimp('datetime/time', 'sleep', 'time') | |
| pimp('error', 'Error') | |
| pimp('functional', 'partial') | |
| pimp('io/stringio', 'StringIO') | |
| pimp('metaclass/selfsuper', 'SelfSuper') | |
| pimp('os/lockfile', 'lock_file', 'unlock_file') | |
| pimp('threading', 'allocate_lock', 'start_new_thread') | |
| # ------------------------------------------------------------------------------ | |
| # some konstants | |
| # ------------------------------------------------------------------------------ | |
| __export__ = { | |
| '*': [ | |
| # utility | |
| 'clear_list', 'safely_set_object_in_namespace', | |
| 'reassign_function_arguments', 'get_class_mro', 'TypeChecker', 'is_type', | |
| # error | |
| 'DiscoError', 'StopEventPropogation', 'StoreLimitReached', | |
| 'DuplicateServiceRegistration', | |
| # some main klasses | |
| 'DiscoObject', 'DiscoOwner', 'DiscoDealer', 'DopeDealer', 'CokeDealer', | |
| 'DiscoClub', | |
| # utility | |
| 'get_disco_owner', 'add_disco_worker', 'shutdown_disco', | |
| # namespaces | |
| 'Namespace', 'ImmutableNamespace', | |
| # worker utitlies | |
| 'create', 'create_service', | |
| # worker main classes/utilities | |
| 'DiscoWorkerBase', 'is_a_disco_worker', 'worker', 'waiter', 'dancer' | |
| ], | |
| ':api': [ | |
| 'get_disco_owner', 'add_disco_worker', 'shutdown_disco', | |
| 'create', 'create_service', | |
| 'Namespace', 'ImmutableNamespace', | |
| 'is_a_disco_worker', 'worker', 'waiter', 'dancer' | |
| ] | |
| } | |
| __metaclass__ = type | |
| if settings.disco_owners is None: | |
| settings.disco_owners = WeakSet() | |
| THREADING_ENABLED = True | |
| DEFAULT_CLUBS = 1 | |
| SINGLE = lambda *args, **kwargs: args and args[0] or kwargs.values()[0] # @/@ ? | |
| FIXED = Symbol('FIXED') | |
| POP = Symbol('POP') | |
| MEMO = CatDict(memoise=True) | |
| DROP = CatDict(drop_non_applicable_method=True) | |
| MEMODROP = MEMO + DROP | |
| OVERRIDE = CatDict(override=True) | |
| REGISTER = CatDict(register=True) | |
| try: | |
| __refresh__ | |
| except NameError: | |
| __refresh__ = False | |
| # ------------------------------------------------------------------------------ | |
| # utility funktions | |
| # ------------------------------------------------------------------------------ | |
| def clear_list(list): | |
| """A clear function for lists that behaves like ``set.clear``.""" | |
| list[:] = [] | |
| return list | |
| NAMESPACE_LOCK = allocate_lock() | |
| def safely_set_object_in_namespace( | |
| object, namespace=Blank, depth=1, frame=None | |
| ): | |
| """Set the given ``object`` within a specified or calling ``namespace``.""" | |
| if isinstance(object, basestring): | |
| return object | |
| if namespace is Blank: | |
| frame = get_frame(depth) | |
| namespace = frame.f_globals | |
| original_name = name = object.__name__ | |
| if original_name == '<lambda>': | |
| original_name = name = 'anon_func' | |
| n = 0 | |
| NAMESPACE_LOCK.acquire_lock() | |
| while name in namespace: | |
| n += 1 | |
| name = '%s%d' % (original_name, n) | |
| namespace[name] = object | |
| NAMESPACE_LOCK.release_lock() | |
| del frame, namespace | |
| return name | |
| # ------------------------------------------------------------------------------ | |
| # typing-related utility funktions and klasses | |
| # ------------------------------------------------------------------------------ | |
| def get_class_mro(klass): | |
| """Return the provided ``klass``'s lookup MRO (Method Resolution Order).""" | |
| # new-style mro | |
| if isinstance(klass, type): | |
| return klass.__mro__ | |
| # old-skool mro | |
| if isinstance(klass, ClassicClassType): | |
| mro = [] | |
| def build_mro(base): | |
| if base not in mro: | |
| mro.append(base) | |
| for base_class in base.__bases__: | |
| build_mro(base_class) | |
| build_mro(klass) | |
| return tuple(mro) | |
| # oops, not a klass type | |
| raise TypeError("Unknown class type %r" % type(klass)) | |
| class TypeChecker: | |
| """Utility class for use as a predicate for type-checks.""" | |
| #__slots__ = ('types',) | |
| mro_map = {} | |
| __lock = allocate_lock() | |
| acquire_lock = __lock.acquire_lock | |
| release_lock = __lock.release_lock | |
| def __init__(self, *types): | |
| mro_map = self.mro_map | |
| filtered_types = [] | |
| append_to_filtered_types = filtered_types.append | |
| for obtype in types: | |
| if not isinstance(obtype, ClassType): | |
| raise TypeError("Unknown class type %r" % type(klass)) | |
| if obtype not in filtered_types: | |
| append_to_filtered_types(obtype) | |
| self.types = filtered_types | |
| @classmethod | |
| def build_type_map(klass, object): | |
| pass | |
| def __call__(self, object): | |
| for type in self.types: | |
| if isinstance(object, type): | |
| pass | |
| # is_type = TypeChecker | |
| def is_type(*types): | |
| """Utility function for use as a predicate for type-checks.""" | |
| return lambda ob : isinstance(ob, types) | |
| def get_locals(): | |
| frame = get_frame(1) | |
| try: | |
| return AttrDict(frame.f_locals) | |
| finally: | |
| del frame | |
| # ------------------------------------------------------------------------------ | |
| # error klasses | |
| # ------------------------------------------------------------------------------ | |
| class DiscoError(Error): | |
| pass | |
| class StopEventPropogation(DiscoError): | |
| pass | |
| class DuplicateServiceRegistration(DiscoError): | |
| pass | |
| class StoreLimitReached(DiscoError): | |
| pass | |
| # ------------------------------------------------------------------------------ | |
| # utility klasses | |
| # ------------------------------------------------------------------------------ | |
| class Params: | |
| __slots__ = 'args', 'kwargs' | |
| def __init__(self, *args, **kwargs): | |
| self.args = args | |
| self.kwargs = kwargs | |
| # ------------------------------------------------------------------------------ | |
| # da base disko object | |
| # ------------------------------------------------------------------------------ | |
| class DiscoObject(object): | |
| """The fundamental class from which all other Disco classes derive from.""" | |
| __metaclass__ = SelfSuper | |
| def __init__(self, *args, **kwargs): | |
| self.id = UUID() | |
| self.name = '' | |
| def __repr__(self): | |
| if self.name: | |
| if self.name.count(':'): | |
| return '<%s: %s>' % (self.name, self.id) | |
| return '<%s:%s: %s>' % (self.name, self.__class__.__name__, | |
| self.id) | |
| return '<%s: %s>' % (self.__class__.__name__, self.id) | |
| __str__ = __repr__ | |
| # ------------------------------------------------------------------------------ | |
| # disko owners -- poor bastards ;p | |
| # ------------------------------------------------------------------------------ | |
| class DiscoOwner(DiscoObject): | |
| """A ``DiscoOwner`` runs various ``DiscoClub``s.""" | |
| def __new__(klass, *args, **kwargs): | |
| # still undecided -- only one owner per python process or many? | |
| for disco_owner in settings.disco_owners: | |
| return disco_owner() | |
| return super(DiscoOwner, klass).__new__(klass, *args, **kwargs) | |
| def __init__(self, initial=DEFAULT_CLUBS, using_threads=THREADING_ENABLED): | |
| self.__super.__init__() | |
| self.__lock = allocate_lock() | |
| self.acquire_lock = self.__lock.acquire | |
| self.release_lock = self.__lock.release | |
| self.is_running = False | |
| self.is_paused = False | |
| self.is_migrating = False | |
| self.using_threads = using_threads | |
| self.club_pool = [] | |
| self.club_pool_marker = -1 | |
| self.clubs = {} | |
| if isinstance(initial, int): | |
| initial = ('club%i' % i for i in range(1, initial+1)) | |
| for club in initial: | |
| self.add_club(club, 1, True) | |
| settings.disco_owners.add(self) | |
| settings.disco_master = self # weakref(self) | |
| atexit.register(shutdown_disco) | |
| def add_club(self, club=None, priority=1, add_to_club_pool=False): | |
| if not isinstance(club, DiscoClub): | |
| club = DiscoClub(club) | |
| club_name = club.name | |
| self.acquire_lock() | |
| try: | |
| if club in self.clubs: | |
| raise KeyError("%s already has a club %r." % (self, club_name)) | |
| self.clubs[club] = [club, priority] # da priority level | |
| if add_to_club_pool: | |
| self.club_pool.append(club) | |
| if self.is_running and self.using_threads: | |
| club.run() | |
| finally: | |
| self.release_lock() | |
| def get_club(self, club): | |
| self.acquire_lock() | |
| try: | |
| if club not in self.clubs: | |
| raise KeyError("%s has no club %r" % (self, club)) | |
| return self.clubs[club][0] | |
| finally: | |
| self.release_lock() | |
| def remove_club(self, club): | |
| club = self.get_club(club) | |
| self.acquire_lock() | |
| try: | |
| club.shutdown() | |
| if club in self.club_pool: | |
| self.club_pool.remove(club) | |
| del self.clubs[club] | |
| finally: | |
| self.release_lock() | |
| def run(self, delay=1.5, using_threads=Blank): | |
| if using_threads is Blank: | |
| using_threads = self.using_threads | |
| if not THREADING_ENABLED: | |
| using_threads = False | |
| if not self.is_running: | |
| self.is_running = True | |
| if using_threads: | |
| for club in self.clubs: | |
| club.run(delay, using_threads=True) | |
| # @/@ need to modify this to microthread properly | |
| # @/@ also, need to handle signal gracefully if threading status | |
| # @/@ switches from False to True during runtime | |
| else: | |
| signal.signal(signal.SIGINT, self.shutdown) | |
| runners = {} | |
| try: | |
| while self.is_running and not self.is_paused: | |
| for club in self.clubs: | |
| if club not in runners: | |
| runners[club] = club.run(delay, False) | |
| for club in runners: | |
| if club not in self.clubs: | |
| del runners[club] | |
| else: | |
| runners[club].next() | |
| except StopIteration: | |
| for club in self.clubs: | |
| club.shutdown() | |
| else: | |
| if settings.debug: | |
| print "!!ALERT!!\n%r is already running" % self | |
| def pause(self): | |
| self.is_paused = True | |
| for club in self.clubs: | |
| club.pause() | |
| def unpause(self): | |
| self.is_paused = False | |
| for club in self.clubs: | |
| club.unpause() | |
| # @/@ there should be *komplete* shutdown method that also removes the klubs | |
| # @/@ and uses threading.Condition to ensure that everything is safe. | |
| def shutdown(self, graceful=True, kill=False): | |
| if not self.is_running: | |
| return | |
| self.is_running = False | |
| if settings.debug: | |
| print "[%sshutting-down %r]" % ( | |
| graceful and "gracefully-" or "", self | |
| ) | |
| if graceful: | |
| for club in self.clubs: | |
| club.shutdown(kill) | |
| def __del__(self): | |
| self.shutdown(graceful=True, kill=True) | |
| if settings.debug: | |
| print "[terminating-owner %r]" % self | |
| # @/@ check klubs to see how busy they are and add worker appropriately? | |
| # @/@ would randomly adding to klubs work just as well? | |
| def add_worker_to_club(self, club, worker, *args, **kwargs): | |
| if settings.debug: | |
| print '[adding-worker-to-club %s %s]' % (club, worker) | |
| self.get_club(club).add_worker(worker, *args, **kwargs) | |
| def migrate_worker(self, worker, from_club, to_club): | |
| self.get_club(to_club).add_worker( | |
| self.get_club(from_club).remove_worker(worker, keep_alive=True), | |
| already_instantiated=True | |
| ) | |
| def add_worker(self, worker, *args, **kwargs): | |
| self.acquire_lock() | |
| try: | |
| self.club_pool_marker += 1 | |
| self.club_pool_marker %= len(self.club_pool) | |
| club = self.club_pool[self.club_pool_marker] | |
| club.add_worker(worker, *args, **kwargs) | |
| finally: | |
| self.release_lock() | |
| # ------------------------------------------------------------------------------ | |
| # dealers | |
| # ------------------------------------------------------------------------------ | |
| class DiscoDealer: | |
| """A ``Club`` is run by a ``dealer``. Each Club has its own dealer.""" | |
| __slots__ = 'clients' | |
| def __init__(self, clients): | |
| self.clients = clients | |
| def deal(self): | |
| raise NotImplementedError | |
| def pay(self, client, payment): | |
| raise NotImplementedError | |
| class DopeDealer(DiscoDealer): | |
| """A fair and balanced dealer.""" | |
| def deal(self): | |
| for client in self.clients: | |
| yield client, 1 | |
| def pay(self, client, payment): | |
| pass | |
| class CokeDealer(DiscoDealer): | |
| """A dealer whose clients tend to want priority over others.""" | |
| def __init__(self, clients, min=1, max=10): | |
| self.clients = clients | |
| self.min_boundary = min | |
| self.max_boundary = max | |
| def deal(self): | |
| min = self.min_boundary | |
| max = self.max_boundary | |
| for client, time in self.clients.iteritems(): | |
| if time < min: | |
| time = min | |
| elif time > max: | |
| time = max | |
| yield client, time | |
| def pay(self, client, payment): | |
| if payment is Idling: | |
| self.clients[client] -= 1 | |
| elif payment is Active: | |
| self.clients[client] += 1 | |
| # ------------------------------------------------------------------------------ | |
| # disko klubs -- the klub always wins. | |
| # ------------------------------------------------------------------------------ | |
| class DiscoClub(DiscoObject): | |
| """A ``DiscoClub`` where there are various workers, waiters, and dancers.""" | |
| def __init__(self, name=None, dealer=DopeDealer): | |
| self.__super.__init__() | |
| self.name = name or str(self.id) # 'unnamed' | |
| self.is_running = False | |
| self.is_paused = False | |
| self.__lock = allocate_lock() | |
| self.acquire_lock = self.__lock.acquire | |
| self.release_lock = self.__lock.release | |
| self.dealer = dealer # cannot be replased whilst running | |
| self.workers = OrderedDict() # perhaps with priority ? | |
| # -------------------------------------------------------------------------- | |
| # komparisons | |
| # -------------------------------------------------------------------------- | |
| def __hash__(self): | |
| return hash(self.name) | |
| def __eq__(self, other): | |
| if isinstance(other, DiscoClub): | |
| return (self.name, self.workers) == (other.name, other.workers) | |
| else: | |
| return self.name == other | |
| def __cmp__(self, other): | |
| return cmp((self.name, self.workers), (other.name, other.workers)) | |
| # -------------------------------------------------------------------------- | |
| # the heart of darkness -- at least until punk kame along | |
| # -------------------------------------------------------------------------- | |
| def run(self, delay=1.5, using_threads=True): | |
| if using_threads: | |
| start_new_thread(self.__run, (delay,)) | |
| else: | |
| return self.__runner(delay) | |
| def __run(self, delay): | |
| runner = self.__runner(delay) | |
| try: | |
| while 1: | |
| runner.next() | |
| except StopIteration: | |
| pass | |
| def __runner(self, delay): | |
| self.acquire_lock() | |
| self.is_running = True | |
| self.release_lock() | |
| club_dealer = self.dealer | |
| remove_worker = self.remove_worker | |
| logger = self.create_logger(); logger.next() | |
| workers = self.workers | |
| last_iteration = time() | |
| try: | |
| dealer = club_dealer(workers) | |
| while self.is_running and not self.is_paused: | |
| if settings.debug: | |
| if settings.debug == 1: | |
| def log(x): | |
| print x | |
| else: | |
| log = logger.send | |
| log("[running-club %s]" % self) | |
| remove = set() | |
| add_to_remove = remove.add | |
| yield Active | |
| for worker, timeslice in dealer.deal(): | |
| if settings.disco_in_slow_motion: | |
| last_timeslice = time() | |
| for i in repeat(0, timeslice): | |
| if settings.debug: | |
| log("[running-worker %s]" % worker) | |
| if not worker.is_alive: | |
| add_to_remove(worker) | |
| break | |
| worker_status = worker.next() | |
| if worker_status is Finished: | |
| add_to_remove(worker) | |
| break | |
| dealer.pay(worker, worker_status) | |
| if settings.disco_in_slow_motion: | |
| this_timeslice = time() - last_timeslice | |
| if this_timeslice < delay: | |
| sleep_time = delay - this_timeslice | |
| log("[timeslice-sleeping for %s]" % sleep_time) | |
| sleep(delay) | |
| yield Active | |
| for worker in remove: | |
| remove_worker(worker) | |
| this_iteration = time() - last_iteration | |
| if this_iteration < delay: | |
| sleep_time = delay - this_iteration | |
| log("[sleeping for %s]" % sleep_time) | |
| sleep(sleep_time) | |
| yield Idling | |
| last_iteration = time() | |
| if settings.debug: | |
| log("[terminating-club %s]" % self) | |
| except StopIteration: | |
| yield Finished | |
| logger.close() | |
| self.shutdown() | |
| # -------------------------------------------------------------------------- | |
| # nooooo! we want disko! let the musik play! don't shut us down! | |
| # -------------------------------------------------------------------------- | |
| def shutdown(self, kill=False): | |
| if settings.debug: | |
| print "[terminating-club %s]" % self | |
| self.acquire_lock() | |
| self.is_running = False | |
| self.release_lock() | |
| if kill: | |
| # sleep a bit to give the klub a chanse to shutdown properly | |
| # @/@ will not suffise if a worker takes too long ... | |
| sleep(0.5) | |
| for worker in self.workers: | |
| worker.kill() | |
| def replace_dealer(self, dealer): | |
| if self.is_running: | |
| raise RuntimeError( | |
| "Sorry, you cannot replace the dealer in a running club." | |
| ) | |
| self.dealer = dealer | |
| def pause(self): | |
| self.is_paused = True | |
| def unpause(self): | |
| self.is_paused = False | |
| # -------------------------------------------------------------------------- | |
| # manage workers | |
| # -------------------------------------------------------------------------- | |
| def add_worker(self, worker, *args, **kwargs): | |
| if settings.debug: | |
| print '[adding-worker-to-club %s %s]' % (worker, self) | |
| # if kwargs.get('already_instantiated', False): | |
| if not isinstance(worker, DiscoWorkerBase): | |
| worker = worker(*args, **kwargs) | |
| return self.workers.setdefault(worker, 1) | |
| def remove_worker(self, worker, keep_alive=False): | |
| if settings.debug: | |
| print '[removing-worker-from-club %s %s]' % (worker, self) | |
| if not keep_alive: | |
| try: | |
| worker.kill() | |
| except: | |
| raise | |
| del self.workers[worker] | |
| return worker | |
| # -------------------------------------------------------------------------- | |
| # log it! | |
| # -------------------------------------------------------------------------- | |
| def create_logger(self, filename='%s.%s.log', format=True): | |
| if format: | |
| filename = filename % (self.name, os.getpid()) | |
| log = file(filename, 'a') | |
| lock_file(log) | |
| try: | |
| while 1: | |
| log.write((yield) + '\n') | |
| log.flush() | |
| finally: | |
| unlock_file(log) | |
| log.close() | |
| # ------------------------------------------------------------------------------ | |
| # utility funktions | |
| # ------------------------------------------------------------------------------ | |
| def get_disco_owner(*args, **kwargs): | |
| """Get a ``DiscoOwner``.""" | |
| if settings.disco_owners: | |
| return settings.disco_master | |
| if 'return_none' in kwargs: | |
| return | |
| return DiscoOwner(*args) | |
| def add_disco_worker(worker): | |
| """Add a ``worker`` to the current ``DiscoOwner()``.""" | |
| owner = get_disco_owner() | |
| if owner.is_migrating: # shutdown ? aktually migrate ? | |
| raise RuntimeError( | |
| "Sorry, we are currently migrating -- not accepting new workers." | |
| ) | |
| owner.add_worker(worker) | |
| def shutdown_disco(): | |
| """Shutdown DISCO.""" | |
| owner = get_disco_owner(return_none=True) | |
| if owner: | |
| owner.shutdown() | |
| del owner | |
| # ------------------------------------------------------------------------------ | |
| # kapability namespases | |
| # ------------------------------------------------------------------------------ | |
| class Namespace: | |
| """A normal namespace wrapper.""" | |
| def __init__(self, *args, **kwargs): | |
| kwargs.update((value.__name__, value) for value in args) | |
| # raise InvalidNamespace | |
| if '__disco_type__' not in kwargs: | |
| frame = get_frame(1) | |
| kwargs['__disco_type__'] = frame.f_code.co_name | |
| del frame | |
| self.__dict__.update(kwargs) | |
| def __str__(self): | |
| return "<Namespace: %s>" % self.__dict__['__disco_type__'] | |
| __repr__ = __str__ | |
| class ImmutableNamespace(Namespace): | |
| """A restricted namespace wrapper that doesn't let new attributes be set.""" | |
| def __setattr__(self, attribute, value): | |
| raise AttributeError('Read-only mode.') | |
| def __delattr__(self, attribute): | |
| raise AttributeError('Read-only mode.') | |
| # ------------------------------------------------------------------------------ | |
| # disko worker kore utility funktions | |
| # ------------------------------------------------------------------------------ | |
| def create(factory, *args, **kwargs): | |
| worker = factory(*args, **kwargs) | |
| add_disco_worker(worker) | |
| return worker | |
| def create_service(service_name, *args, **kwargs): | |
| worker = get_service(service_name, *args, **kwargs) | |
| add_disco_worker(worker) | |
| return worker | |
| # ------------------------------------------------------------------------------ | |
| # disko worker kore utility funktions | |
| # ------------------------------------------------------------------------------ | |
| # kare should be taken when broadkasting an event | |
| _marker = object() | |
| def get_on(worker, store_name, definition=None, blocking=True, with_txn=False): | |
| if isinstance(definition, basestring): | |
| frame = get_frame(1) | |
| definition = eval( | |
| "lambda %s: AttrDict(locals())" % | |
| definition, frame.f_globals, frame.f_locals | |
| ) | |
| del frame | |
| store = worker.stores.setdefault(store_name, []) | |
| def callback(): | |
| result = _marker | |
| if store: | |
| txn, result = heappop(store) | |
| if definition: | |
| if isinstance(result, Params): | |
| result = definition(*result.args, **result.kwargs) | |
| else: | |
| result = definition(result) | |
| elif not blocking: | |
| txn = with_txn and UUID() | |
| if definition: | |
| result = definition() | |
| else: | |
| result = Null # @/@ Blank ? | |
| if result is not _marker: | |
| worker.last_transaction = txn | |
| if with_txn: | |
| worker._input_stream.append((txn, result)) | |
| else: | |
| worker._input_stream.append(result) | |
| if worker.is_paused: | |
| print "[unpausing-worker %s]" % worker | |
| worker.is_paused = False | |
| return Active | |
| print "[pausing-worker %s]" % worker | |
| worker.is_paused = True | |
| worker._callbacks.append(callback) | |
| return Idling | |
| return callback() | |
| def get(worker, definition=None, blocking=True, with_txn=False): | |
| return get_on(worker, 'inbox', definition, blocking, with_txn) | |
| def catch(worker, definition=None, blocking=True, with_txn=False): | |
| # worker.default_exception_handling = False | |
| return get_on(worker, 'exception', definition, blocking, with_txn) | |
| def call_if_get(worker, namespace, blocking=False, suppress_exceptions=True): | |
| def handler(*args, **kwargs): | |
| method_name = args[0] | |
| args = args[1:] | |
| try: | |
| return getattr(namespace, method_name)(*args, **kwargs) | |
| except: | |
| if not suppress_exceptions: | |
| raise | |
| return Active | |
| return worker.get_on('call', handler, blocking) | |
| # ------------------------------------------------------------------------------ | |
| # disko worker kore utility funktions | |
| # ------------------------------------------------------------------------------ | |
| def put_on(worker, store_name, input, txn=Blank): | |
| if not isinstance(worker, DiscoWorkerBase): | |
| try: | |
| if isinstance(worker, UUID): | |
| worker = get_worker(worker) | |
| else: | |
| worker = get_service(worker) | |
| except: | |
| raise | |
| # raise TypeError("%r is not a valid disco worker." % worker) | |
| store_limit = worker.store_limits.get(store_name, 0) | |
| if store_limit and worker.get_store_size() >= store_limit: | |
| raise StoreLimitReached( | |
| "Reached a limit of %r in %s" % (store_limit, worker) | |
| ) | |
| if txn is Blank: | |
| txn = UUID() | |
| heappush(worker.stores.setdefault(store_name, []), (txn, input)) | |
| worker.acquire_lock() | |
| if worker._callbacks: | |
| callback = worker._callbacks.popleft() | |
| callback() | |
| worker.release_lock() | |
| return Active | |
| def put(worker, input, txn=Blank): | |
| return put_on(worker, 'inbox', input, txn) | |
| # ------------------------------------------------------------------------------ | |
| # disko worker kore utility funktions | |
| # ------------------------------------------------------------------------------ | |
| def pipeline(*workers): | |
| workers = iter(workers) | |
| last_worker = workers.next() | |
| for worker in workers: | |
| link(last_worker, worker) | |
| else: | |
| result = DiscoResult() | |
| link(worker, result) | |
| return result | |
| # example match | |
| # def foo(): | |
| # while match((yield Get, 'name', 'age')) is None: | |
| # pass | |
| # disco_sleep / until / timeout / alarm / call_at | |
| # ------------------------------------------------------------------------------ | |
| # events | |
| # ------------------------------------------------------------------------------ | |
| # Same places, same faces, same times, same crimes. Way too easy to fall into a | |
| # rut with resyk waiting at its end -- Judge Dredd | |
| class DiscoEvent(DiscoObject): | |
| __slots__ = ( | |
| 'id', 'sender', 'stop_propagation', 'payload', 'name', 'counter', | |
| 'max_hops', 'max_time', 'name' | |
| ) | |
| def __init__( | |
| self, sender, name, payload=None, counter=0, max_hops=0, max_time=0 | |
| ): | |
| self.__super.__init__() | |
| self.name = name | |
| # the event sourse | |
| if isinstance(sender, DiscoWorkerBase): | |
| sender = sender.id | |
| self.sender = sender | |
| # to propagate or not to propagate flag | |
| self.stop_propagation = False | |
| # our payload -- usually the event kontext | |
| if payload is None: | |
| payload = AttrDict() | |
| self.payload = payload | |
| # ttl (time to live) related variables | |
| self.counter = counter | |
| self.max_hops = max_hops | |
| self.max_time = max_time | |
| def __setattr__(self, attribute, value): | |
| if attribute.startswith('_') or not hasattr(type(self), attribute): | |
| raise TypeError("%r is not a public attribute of %r objects." | |
| % (attribute, type(self).__name__)) | |
| super(DiscoEvent, self).__setattr__(attribute, value) | |
| def is_valid(self): | |
| """Return whether an event is still valid.""" | |
| # if ttl limits have been exseeded or stop_propagation has been set ... | |
| if ((self.max_hops and self.counter > self.max_hops) or | |
| self.stop_propagation): | |
| return False | |
| # @/@ ehm ... how does max_time work over a network ? ntpd ? | |
| # @/@ hmz, i think we need to send out the max time with the kommit | |
| # @/@ event -- in fakt the kommit and validate events are the same? | |
| # @/@ just resent with max time altered somehow and some other flag | |
| # @/@ set? | |
| # @/@ seems on the right trak... kan't think. talk later. | |
| # @/@ k | |
| if self.max_time: | |
| if (time() - get_event_handled_time(self.id)) > self.max_time: | |
| return False | |
| # @/@ ehm ... isn't this mixing applikation logic with the events | |
| # @/@ system? surely checking max time should be something the | |
| # @/@ applikation should do? | |
| return True | |
| @classmethod | |
| def get_receivers(klass, event_name, sender): | |
| return klass._subscriptions.get(event_name, {}).get(id(sender), ()) | |
| def __getstate__(self): | |
| return ( | |
| self.id, | |
| self.sender, # getattr(self.sender, 'id', None), | |
| self.stop_propagation, | |
| self.payload, | |
| self.counter, | |
| self.max_hops, | |
| self.max_time, | |
| ) | |
| def __setstate__(self, state): | |
| (self.id, | |
| self.sender, | |
| self.stop_propagation, | |
| self.payload, | |
| self.counter, | |
| self.max_hops, | |
| self.max_time, | |
| ) = state | |
| # ------------------------------------------------------------------------------ | |
| # disko worker kore utility funktions | |
| # ------------------------------------------------------------------------------ | |
| def send_event(sender, event_name, payload, txn=Blank): | |
| print "Sending:", event_name, "on", sender, 'with', payload | |
| return Active | |
| set_event_handled_time(event.id) | |
| for receiver in list(klass.get_receivers(event_name, event.sender)): | |
| event.counter += 1 | |
| if not event.is_valid(): | |
| break | |
| try: | |
| receiver, link = receiver | |
| if link: | |
| link(receiver, event) | |
| else: | |
| receiver(event) | |
| except StopEventPropogation: | |
| break | |
| except: | |
| event.stop_propagation = True | |
| raise | |
| def send(sender, payload, txn=Blank): | |
| return send_event(sender, 'output', payload, txn) | |
| def send_txn(sender, payload): | |
| return send_event(sender, 'output', payload, sender.last_transaction) | |
| def throw(sender, exception): | |
| try: | |
| type, value, traceback = exc_info() | |
| raise t,v,tb | |
| finally: | |
| type = value = traceback = None | |
| return send_event(sender, 'exception', type, value, traceback) | |
| # root | root['event name'] -- both have "default" subscribers | |
| # ------------------------------------------------------------------------------ | |
| # disko worker kore utility funktions | |
| # ------------------------------------------------------------------------------ | |
| def localise(kwargs): | |
| frame = get_frame(1) | |
| frame.f_locals.update(kwargs) | |
| del frame | |
| return Active | |
| # for k, v in kwargs.iteritems(): exec('%s=v' % k) | |
| # for k, v in kwargs.iteritems(): | |
| # exec('%s=v' % k, locals(), frame.f_globals) | |
| # worker._main.gi_frame.f_locals.update(kwargs) | |
| def match(): | |
| return Retry, "" | |
| return Active | |
| def transact(*workers): # atomic | |
| for worker in workers: | |
| if worker.io_monad: | |
| raise TypeError("Cannot deal with an I/O Monad: %r" % worker) | |
| pass | |
| # commit | |
| # undo | |
| # ------------------------------------------------------------------------------ | |
| # disko worker kore utility funktions | |
| # ------------------------------------------------------------------------------ | |
| class DiscoLinkMap: | |
| __slots__ = ( | |
| 'subscriptions', '__lock', 'acquire_lock', 'release_lock', 'registry' | |
| ) | |
| def __init__(self, seed_data=None): | |
| self.__lock = allocate_lock() | |
| self.acquire_lock = self.__lock.acquire_lock | |
| self.release_lock = self.__lock.release_lock | |
| if seed_data is None: | |
| self.subscriptions = {} | |
| else: | |
| self.subscriptions = seed_data | |
| atexit.register(self.subscriptions.clear) | |
| def simple_link(self, source, destination): | |
| pass | |
| def link( | |
| self, source, destination, source_namespace=Blank, | |
| destination_namespace=Blank, matchers=Blank, connector=Blank, | |
| defaults=Blank, mutators=Blank, predicates=Blank, bouncer=Blank, | |
| destination_store='inbox', hold=False, | |
| ): | |
| if source_namespace is not Blank: | |
| source = getattr(source_namespace, source) | |
| if destination_namespace is not Blank: | |
| destination = getattr(destination_namespace, destination) | |
| # (klass, event_name, sender, receiver, link=None, hold=False): | |
| sender_id = id(sender) | |
| if hold: | |
| rcv = receiver | |
| else: | |
| # keep weak receiver in a separate variable, so receiver can't | |
| # get GC'd before we finish | |
| rcv = saferef( | |
| receiver, lambda ref: klass._unsubscribe(sender_id, rcv) | |
| ) | |
| subscriptions = klass._subscriptions | |
| if event_name not in subscriptions: | |
| subscriptions[event_name] = {} | |
| event_subscriptions = subscriptions[event_name] | |
| if sender_id in event_subscriptions: | |
| # don't create another weak sender if one's already registered | |
| receivers = event_subscriptions[sender_id] | |
| # don't add receivers we've already subscribed | |
| for x_receiver, x_link in receivers: | |
| if x_receiver == rcv: | |
| return | |
| else: | |
| receivers.append((rcv, link)) | |
| return | |
| ws = weakpointer( | |
| sender, lambda ref: klass._unsubscribe(event_name, ws, None) | |
| ) | |
| # setdefault here is for thread-safety, in case two threads | |
| # add a subscriber for the same sender (i.e., it's not guaranteed | |
| # that 'sender_id not in klass._receivers' still holds at this point) | |
| receivers = event_subscriptions.setdefault(ws, []) | |
| for x_receiver, x_link in receivers: | |
| if x_receiver == rcv: | |
| return | |
| else: | |
| receivers.append((rcv, link)) | |
| # bidirectional=False): | |
| #self.link(source, sink) | |
| #if bidirectional: | |
| # self.link(sink, source) | |
| def unlink(source, event_name, sender, receiver=None): | |
| """Stop sending events of this type from `sender` to `receiver`""" | |
| klass._unsubscribe(event_name, id(sender), receiver) | |
| def _unlink(klass, event_name, sender_id, receiver=None): | |
| try: | |
| receivers = klass._subscriptions[event_name][sender_id] | |
| except KeyError: | |
| return | |
| if receiver is None: | |
| clear_list(receivers) | |
| del klass._subscriptions[event_name][sender_id] | |
| return | |
| try: | |
| receivers.remove(receiver) | |
| except ValueError: | |
| pass | |
| try: | |
| receivers.remove(saferef(receiver)) | |
| except ValueError: | |
| pass | |
| def intercept(self): | |
| pass | |
| def clone(self): | |
| pass | |
| def clear(self): | |
| self.subscriptions.clear() | |
| # @/@ clean up with WeakSet / WeakList | |
| disco_link_map = DiscoLinkMap() | |
| link = disco_link_map.link | |
| unlink = disco_link_map.unlink | |
| intercept = disco_link_map.unlink | |
| clone = disco_link_map.clone | |
| # non_monadic = True | |
| # default_event_name = 'output' | |
| # Abort | Retry | Idling | Active | Pause | Create | |
| # race | |
| # ------------------------------------------------------------------------------ | |
| # servises | |
| # ------------------------------------------------------------------------------ | |
| def service_manager(activity, event): | |
| # services = {} | |
| if activity in services: | |
| return services[activity](event) | |
| if hasattr(event, 'on_error'): | |
| event.on_error(event) | |
| #def echo(event): | |
| # print "Got:", event | |
| # register_service('hi', echo) | |
| # Event.subscribe('a', 'hi', service_manager) | |
| # import pickle | |
| # d = pickle.dumps(Event('a'), protocol=2) | |
| # print pickle.loads(d) | |
| # on error | |
| # Event(pattern=Any ... ) | |
| #def subscribe(name, publisher, subscriber): | |
| # return Event.subscribe(name, publisher, subscriber) | |
| # adapter adaptor | |
| # coupler junction | |
| # ------------------------------------------------------------------------------ | |
| # keep trak of event initialisation times | |
| # ------------------------------------------------------------------------------ | |
| disco_event_timekeeper = {} | |
| disco_event_timekeeper_lock = allocate_lock() | |
| def get_event_handled_time(event_id): | |
| return disco_event_timekeeper[event_id] | |
| def set_event_handled_time(event_id, force=False): | |
| disco_event_timekeeper_lock.acquire_lock() | |
| if force or event_id not in disco_event_timekeeper: | |
| disco_event_timekeeper[event_id] = time() | |
| disco_event_timekeeper_lock.release_lock() | |
| # ------------------------------------------------------------------------------ | |
| # disko base | |
| # ------------------------------------------------------------------------------ | |
| class DiscoWorkerBase(DiscoObject): | |
| """Base class for ``DiscoWorker``s.""" | |
| def is_a_disco_worker(worker): | |
| return isinstance(worker, DiscoWorkerBase) | |
| disco_workers_roster = {} | |
| get_worker = disco_workers_roster.__getitem__ | |
| # ------------------------------------------------------------------------------ | |
| # our disko workers! | |
| # ------------------------------------------------------------------------------ | |
| def _worker( | |
| function, service_name=None, default_args=None, default_kwargs=None, | |
| wrap_function=False, is_method=False, io_monad=False, auto_finish=True, | |
| setup=Blank, signature=None, overridable_error_handler=False, | |
| override_existing_service=True, | |
| ): | |
| """Turn a normal looking function/coroutine into a ``DiscoWorker``.""" | |
| # -------------------------------------------------------------------------- | |
| # grab the underlying DiscoWorker implementation if we are an instanse | |
| # -------------------------------------------------------------------------- | |
| if isinstance(function, DiscoWorkerBase): | |
| function = function._coroutine | |
| # -------------------------------------------------------------------------- | |
| # wrap a normal funktion inside a generator | |
| # -------------------------------------------------------------------------- | |
| elif wrap_function: | |
| def wrapper(self, *args, **kwargs): | |
| yield Active | |
| yield function(self, *args, **kwargs) | |
| yield Finished | |
| function = wrapper | |
| # -------------------------------------------------------------------------- | |
| # da sexy working klass | |
| # -------------------------------------------------------------------------- | |
| class DiscoWorker(DiscoWorkerBase): | |
| """DiscoWorkers of the world unite!""" | |
| default_stores = ['inbox', 'call', 'control', 'exception'] | |
| default_args = () | |
| default_kwargs = {} | |
| io_monad = False | |
| last_transaction = Blank | |
| def __init__(self, *args, **kwargs): | |
| self.__super.__init__() | |
| self.is_alive = True # 'conceived' | alive | dead | |
| self.is_paused = False | |
| self.__lock = allocate_lock() | |
| self.acquire_lock = self.__lock.acquire | |
| self.release_lock = self.__lock.release | |
| self._callbacks = deque() | |
| self._input_stream = deque() | |
| coroutine = self._coroutine | |
| self.name = "%s():DiscoWorker" % coroutine.__name__ | |
| # kurrying-like support | |
| args = self.default_args + args | |
| if kwargs and self.default_kwargs: | |
| _kwargs = self.default_kwargs.copy() | |
| _kwargs.update(kwargs) | |
| else: | |
| _kwargs = kwargs or self.default_kwargs | |
| self.initialisers = (args, _kwargs) | |
| self._main = coroutine(*args, **_kwargs) | |
| self.store_limits = {} | |
| self.stores = {} | |
| set = self.stores.__setitem__ | |
| for store_name in self.default_stores: | |
| set(store_name, []) | |
| disco_workers_roster[self.id] = self | |
| send(self.__class__, 'created', self) | |
| self.next = self.__iter__().next | |
| @classmethod | |
| def update(klass, coroutine): | |
| klass._coroutine = coroutine | |
| # ---------------------------------------------------------------------- | |
| # kontrol | |
| # ---------------------------------------------------------------------- | |
| def kill(self): | |
| if settings.debug and self.is_alive: | |
| print "[terminating-worker %s]" % self | |
| try: | |
| self.is_alive = False | |
| del disco_workers_roster[self.id] | |
| self._main.close() | |
| except: | |
| raise | |
| def revive(self): # this wise? | |
| if self.is_alive: | |
| return self | |
| args, kwargs = self.initialisers | |
| return self.__class__(*args, **kwargs) | |
| # ---------------------------------------------------------------------- | |
| # iterate | |
| # ---------------------------------------------------------------------- | |
| def __iter__(self): | |
| send = self._main.send | |
| send_event = self.send_event | |
| has_input = self._input_stream.__len__ | |
| pop_input = self._input_stream.popleft | |
| try: | |
| while 1: | |
| if self.is_paused: | |
| yield Idling | |
| continue | |
| input = has_input() and pop_input() or None | |
| try: | |
| output = send(input) | |
| if output in (Active, Idling): | |
| yield output | |
| elif output is Finished: | |
| raise StopIteration("Received Finished.") | |
| else: | |
| yield send_event('output', output) | |
| yield Active | |
| except StopIteration: | |
| send_event('control', Finished) | |
| raise | |
| except Exception: | |
| # self.throw() | |
| raise | |
| raise StopIteration("Received Exception.") | |
| except StopIteration: | |
| self.kill() | |
| yield Finished # Dead | |
| # ---------------------------------------------------------------------- | |
| # get/put from stores | |
| # ---------------------------------------------------------------------- | |
| get = get | |
| put = put | |
| def __call__(self, *args, **kwargs): | |
| self.put(*args, **kwargs) | |
| return self # we return self for aiding in pipelining | |
| send = send | |
| send_event = send_event | |
| connect = link = subscribe = link | |
| # ---------------------------------------------------------------------- | |
| # get them stores! store exists? not empty? full? | |
| # ---------------------------------------------------------------------- | |
| def get_store(self, name='inbox'): | |
| return self.stores.setdefault(name, []) | |
| def clear_store(self, name='inbox'): | |
| if name in self.stores: | |
| self.stores[name][:] = [] | |
| def get_store_size(self, name='inbox'): | |
| return len(self.stores.get(name, [])) | |
| def set_store_limit(self, name, limit): | |
| self.store_limits[name] = limit | |
| inbox = property(lambda self: self.get_store('inbox')) | |
| @property | |
| def control(self): | |
| return self.get_store('control') | |
| # ---------------------------------------------------------------------- | |
| # pattern | |
| # ---------------------------------------------------------------------- | |
| def on(self, *args, **kwargs): | |
| return lambda x: x | |
| def when(self, *args, **kwargs): | |
| return lambda x: x | |
| # ---------------------------------------------------------------------- | |
| # unions -- used for ``|`` pipelining | |
| # ---------------------------------------------------------------------- | |
| __or__ = pipeline | |
| def __ror__(self, other): | |
| return 'WANNA PIPELINE:', self, other | |
| # -------------------------------------------------------------------------- | |
| # | |
| # -------------------------------------------------------------------------- | |
| DiscoWorker.update(function) | |
| DiscoWorker.__name__ = "%s:DiscoWorker" % function.__name__ | |
| DiscoWorker.__doc__ = function.__doc__ | |
| # -------------------------------------------------------------------------- | |
| # | |
| # -------------------------------------------------------------------------- | |
| if default_args: | |
| DiscoWorker.default_args = default_args | |
| if default_kwargs: | |
| DiscoWorker.default_kwargs = default_kwargs | |
| # used in transaktions | |
| if io_monad: | |
| DiscoWorker.io_monad = True | |
| if service_name: | |
| DiscoWorker.service_name = service_name | |
| register_service( | |
| service_name, DiscoWorker, default_args or (), default_kwargs or {}, | |
| override=override_existing_service, | |
| # suppress_duplicate_registration_error=True | |
| ) | |
| return DiscoWorker | |
| # ------------------------------------------------------------------------------ | |
| # our kore funktions | |
| # ------------------------------------------------------------------------------ | |
| def worker(**kwargs): | |
| return partial(_worker, **kwargs) | |
| def waiter(**kwargs): | |
| kwargs.update(wrap_function=True) | |
| return partial(_worker, **kwargs) | |
| def dancer(**kwargs): | |
| kwargs.update({}) | |
| return partial(_worker, **kwargs) | |
| # ------------------------------------------------------------------------------ | |
| # pipelined output | |
| # ------------------------------------------------------------------------------ | |
| @worker() | |
| def DiscoResult(self): | |
| """An object for getting results from DiscoWorkers.""" | |
| # DiscoError -- need to include traceback in the object ... | |
| # if exception ... | |
| control = self.stores['control'] | |
| inbox = self.stores['inbox'] | |
| exception = self.stores['exception'] | |
| delay = 0.0005 | |
| while Finished not in control: | |
| if exception: | |
| raise heappop(exception)[1] | |
| elif inbox: | |
| yield heappop(inbox)[1] | |
| delay = 0.0005 | |
| else: | |
| delay = min(delay * 2, 0.05) | |
| sleep(delay) | |
| while inbox: | |
| yield inbox.popleft() | |
| def head(result): | |
| return result.next() | |
| def tail(result): | |
| try: | |
| result.next() | |
| except StopIteration: | |
| return iter([]) | |
| else: | |
| return result | |
| # ------------------------------------------------------------------------------ | |
| # disko servise registry | |
| # ------------------------------------------------------------------------------ | |
| @worker() | |
| def ServiceManager(self, seed_data=None): | |
| """Manage disco services.""" | |
| if seed_data is None: | |
| registry = {} | |
| else: | |
| registry = seed_data | |
| instantiated = {} | |
| service = None | |
| while 1: | |
| command = yield service | |
| try: | |
| command_name = command[0] | |
| if command_name == 'get': | |
| factory_or_service_name = command[1] | |
| args, kwargs = (), {} | |
| if isinstance(factory_or_service_name, basestring): | |
| factory = None | |
| service_name = factory_or_service_name | |
| else: | |
| factory = factory_or_service_name | |
| service_name = getattr( | |
| factory, 'service_name', factory.__name__ | |
| ) | |
| if service_name in instantiated: | |
| service = instantiated[service_name] | |
| if service.is_alive: | |
| continue | |
| else: | |
| del instantiated[service_name] | |
| if factory: | |
| extra = command[2:4] | |
| len_extra = len(extra) | |
| if len_extra == 1: | |
| extra = extra[0] | |
| if isinstance(extra, tuple): | |
| args = extra | |
| else: | |
| kwargs = extra | |
| elif len_extra == 2: | |
| args, kwargs = extra | |
| elif service_name in registry: | |
| factory, args, kwargs = registry[service_name] | |
| else: | |
| raise KeyError("No factories found for %r" % service_name) | |
| service = create(factory, *args, **kwargs) | |
| instantiated[service_name] = service | |
| elif command_name == 'register': | |
| name, factory, args, kwargs = command[1:] | |
| if name in registry and '_override' not in kwargs: | |
| raise DuplicateServiceRegistration( | |
| "Service %r has already been registered." % name | |
| ) | |
| registry[name] = (factory, args, kwargs) | |
| service = None | |
| else: | |
| raise RuntimeError("Invalid command: %r" % command_name) | |
| except Exception, reason: | |
| service = reason | |
| # hold a referense to the default servise registry | |
| # if not settings.disco_services: | |
| # settings.disco_services = create(ServiceManager) | |
| def register_service(name, worker_factory, *args, **kwargs): # args, kwargs | |
| value = settings.disco_services.send(( | |
| 'register', name, worker_factory, args, kwargs | |
| )) | |
| if isinstance(value, DuplicateServiceRegistration): | |
| if not kwargs.get('_suppress_duplicate_registration_error'): | |
| raise value | |
| elif isinstance(value, Exception): | |
| raise value | |
| def get_service(name, *args, **kwargs): | |
| service = settings.disco_services.send(('get', name, args, kwargs)) | |
| if isinstance(service, Exception): | |
| raise service | |
| return service | |
| # ------------------------------------------------------------------------------ | |
| # disko servise registry -- take 2 (or was that take 1?) | |
| # ------------------------------------------------------------------------------ | |
| class ServiceManager: | |
| """Manage disco services.""" | |
| __slots__ = ( | |
| '__lock', 'acquire_lock', 'release_lock', 'registry', 'instantiated' | |
| ) | |
| def __init__(self, seed_data=None): | |
| self.__lock = allocate_lock() | |
| self.acquire_lock = self.__lock.acquire_lock | |
| self.release_lock = self.__lock.release_lock | |
| if seed_data is None: | |
| self.registry = {} | |
| else: | |
| self.registry = seed_data | |
| self.instantiated = {} | |
| def get_service(self, name, *args, **kwargs): | |
| self.acquire_lock() | |
| try: | |
| instantiated = self.instantiated | |
| registry = self.registry | |
| if isinstance(name, basestring): | |
| factory = None | |
| service_name = name | |
| else: | |
| factory = name | |
| service_name = getattr( | |
| factory, 'service_name', factory.__name__ | |
| ) | |
| if service_name in instantiated: | |
| service = instantiated[service_name] | |
| if service.is_alive: | |
| return service | |
| else: | |
| del instantiated[service_name] | |
| if factory: | |
| pass | |
| elif service_name in registry: | |
| factory, args, kwargs = registry[service_name] | |
| else: | |
| raise KeyError("No factories found for %r" % service_name) | |
| finally: | |
| self.release_lock() | |
| if settings.debug: | |
| print '[creating-service %r]' % service_name | |
| service = create(factory, *args, **kwargs) | |
| self.acquire_lock() | |
| try: | |
| instantiated[service_name] = service | |
| return service | |
| finally: | |
| self.release_lock() | |
| def register_service( | |
| self, name, factory, args, kwargs, | |
| override=False, suppress_duplicate_registration_error=False | |
| ): | |
| self.acquire_lock() | |
| try: | |
| registry = self.registry | |
| if name in registry and not override: | |
| if suppress_duplicate_registration_error: | |
| raise DuplicateServiceRegistration( | |
| "Service %r has already been registered." % name | |
| ) | |
| if settings.debug: | |
| print '[registering-service %r]' % name | |
| registry[name] = (factory, args, kwargs) | |
| finally: | |
| self.release_lock() | |
| def prune(self): | |
| self.acquire_lock() | |
| try: | |
| remove = set() | |
| add_to_remove = remove.add | |
| for name, worker in self.instantiated.iteritems(): | |
| if not worker.is_alive: | |
| add_to_remove(name) | |
| delitem = self.instantiated.__delitem__ | |
| for name in remove: | |
| if settings.debug: | |
| print '[removing-service %r]' % name | |
| delitem(name) | |
| finally: | |
| self.release_lock() | |
| # if we don't have a predefined alternative manager we kreate a default manager | |
| if not settings.disco_services: | |
| settings.disco_services = ServiceManager() | |
| disco_services = settings.disco_services | |
| get_service = disco_services.get_service | |
| register_service = disco_services.register_service | |
| # ------------------------------------------------------------------------------ | |
| # selekt servise | |
| # ------------------------------------------------------------------------------ | |
| @worker(service_name="core/factory") | |
| def FactoryService(self): | |
| factories = {} | |
| yield | |
| @worker(service_name="core/registry") | |
| def RegistryService(self): | |
| meta_registry = {} | |
| registry = None | |
| while 1: | |
| event, service = yield self.receive() | |
| if isinstance(service, basestring): | |
| registry = meta_registry.setdefault(service, {}) | |
| yield event.send(self, registry) | |
| # get_registry | |
| # instantiate_service = get_service | |
| # ------------------------------------------------------------------------------ | |
| # selekt servise | |
| # ------------------------------------------------------------------------------ | |
| # handle_error() | |
| @worker(service_name="core/poll select", io_monad=True) | |
| def poll_select(self, selector=select.select, default_timeout=0.0): | |
| """Let's listen to them file descriptors (sockets) !""" | |
| registry = {} | |
| timeout = [default_timeout] | |
| def register(file_descriptor, readable=None, writable=None): | |
| registry[fd] = (readable, writable) | |
| def deregister(file_descriptor): | |
| if fd in registry: | |
| del registry[fd] | |
| def set_timeout(new_timeout): | |
| timeout[0] = new_timeout | |
| namespace = ImmutableNamespace(register, deregister, set_timeout) | |
| reader_sockets = [] | |
| writer_sockets = [] | |
| except_sockets = [] | |
| append_to_reader_sockets = reader_sockets.append | |
| append_to_writer_sockets = writer_sockets.append | |
| append_to_except_sockets = except_sockets.append | |
| while 1: | |
| yield call_if_get(self, namespace) | |
| reader_sockets[:] = [] | |
| writer_sockets[:] = [] | |
| except_sockets[:] = [] | |
| yield Active | |
| for socket_fileno in registry: | |
| readable, writable = registry[socket_fileno] | |
| if readable is None or readable(): | |
| append_to_reader_sockets(socket_fileno) | |
| if writable is None or writable(): | |
| append_to_writer_sockets(socket_fileno) | |
| if except_sockets == reader_sockets == writer_sockets: | |
| continue | |
| try: | |
| ready_to_read, ready_to_write, exceptional = selector( | |
| reader_sockets, writer_sockets, except_sockets, timeout[0] | |
| ) | |
| except select.error, reason: | |
| if reason[0] not in [EINTR]: | |
| yield Error, reason # throw(reason) | |
| continue | |
| for socket_fileno in ready_to_read: | |
| yield self.send("readable", socket_fileno) | |
| for socket_fileno in ready_to_write: | |
| yield self.send("writable", socket_fileno) | |
| for socket_fileno in exceptional: # out of band data | |
| yield self.send("exceptional", socket_fileno) | |
| # readable(socket_fileno) | |
| # ------------------------------------------------------------------------------ | |
| # demo workers and waiters | |
| # ------------------------------------------------------------------------------ | |
| if settings.debug: | |
| @worker() | |
| def echo(self): | |
| while 1: | |
| value = yield self.get(SINGLE) | |
| yield value | |
| @worker() | |
| def echo(self): | |
| while 1: | |
| yield (yield self.get(SINGLE)) | |
| @worker() | |
| def echo(self): | |
| while 1: | |
| for k, v in (yield self.get('value')).iteritems(): | |
| exec("%s=v" % k) | |
| yield value | |
| @worker() | |
| def echo(self): | |
| while 1: | |
| localise((yield self.get('value'))) # kan we lose a set of () here ? | |
| exec '' # kan we get rid of this ? | |
| yield value | |
| input = yield self.get('value, name="tom", age=27') | |
| # input would be: | |
| # {'value': 1700, | |
| # 'name': 'tom', | |
| # 'age': 27} | |
| "%s is age %s" % (name, age) | |
| @worker() | |
| def echo(self): | |
| while 1: | |
| (value,), kwargs = yield self.get() | |
| # here, one could do: | |
| # name = kwargs.get('name', 'default value goes here') | |
| yield value | |
| @worker() | |
| def echo(self): | |
| """A demonstration ``echo`` disco worker.""" | |
| while 1: | |
| value = yield self.get() | |
| yield value | |
| @worker() | |
| def square(self): | |
| while 1: | |
| yield self.send((yield self.get(SINGLE)) ** 2) | |
| @worker(setup=MEMO) | |
| def square(self): | |
| """A demonstration ``square`` disco worker.""" | |
| while 1: | |
| n = yield self.get() | |
| yield n ** 2 | |
| @worker() | |
| def counter(self, start=0, stop=Blank, step=1): | |
| if stop is Blank: | |
| stop, start = start, 0 | |
| if step > 0: | |
| while start < stop: | |
| yield start | |
| start += step | |
| else: | |
| while start > stop: | |
| yield start | |
| start += step | |
| @worker() | |
| def counter(self, start=0, stop=Blank, step=1): | |
| """A demonstration ``counter`` disco worker.""" | |
| if stop is Blank: | |
| stop, start = start, 0 | |
| for n in xrange(start, stop, step): | |
| yield n | |
| def counter2(self): | |
| txn = yield get('n', stop=10) | |
| while n < stop: | |
| n += 1 | |
| yield send('output', self, n, txn=txn) | |
| yield send('end transaction', self, txn) | |
| def test_transact(n): | |
| range(n, n + 3) | |
| n += 3 | |
| # intercept | |
| # capture | |
| # event = txn = yield get('a') | |
| # around | |
| # exceptions / deffered | |
| # counter()(2) | square() | |
| # counter(2) | square | |
| # counter(2, 10) | echo() | square() | sum() | |
| # before / after / around / mcombine (:progn :and :or :standard) | |
| # don't calculate ancestor lists for objects with only one parent | |
| # calculated initial values | |
| # defgeneric / undefmethod(str, str) | |
| # :before :after | |
| # :method-combination (+ and append list max min nconc or progn) | |
| # yield Finished -- break | |
| # get('n') not match ({'Finished':True}) | |
| # lazy map / cat / filter | |
| def fibonacci(self): | |
| put(self, n-1) | |
| # max recursive depth reached | |
| ocaml = """ | |
| let rec fibonacci2 = function | |
| 0 -> 1 | |
| | 1 -> 1 | |
| | n -> fibonacci2 (n - 1) + fibonacci2 (n - 2);; | |
| """ | |
| def memoise(function): | |
| cache = {} | |
| def memoised_function(*args): | |
| if args in cache: | |
| return cache[args] | |
| return cache.setdefault(args, function(*args)) | |
| memoised_function.__name__ = function.__name__ | |
| memoised_function.cache = cache | |
| return memoised_function | |
| @memoise | |
| def fibonacci(n): | |
| if n < 2: | |
| return n | |
| return fibonacci(n - 1) + fibonacci(n -2) | |
| # 1.61803398875 | |
| # def fib_non_recu(n): | |
| def fibonacci(n): | |
| if n < 2: | |
| yield OUTPUT, n | |
| # philosophers | |
| # multimethod combine example | |
| def factorial(self): | |
| while 1: | |
| event = localise((yield get('n'))) | |
| if n == 1: | |
| event.send(self, 'output', 1) | |
| else: | |
| event.subscribe(self, 'call', self) | |
| event.send('call', self, n-1, lambda k: send(c, n * k)) | |
| event.send('output', self, n-1, lambda k: send(c, n * k)) | |
| # @match(int, continuation) | |
| # @match('n', 'c') | |
| # def factorial(message): | |
| # if n == 1: | |
| # send(c, 1) | |
| # else: | |
| # send(factorial, n-1, lambda k: send(c, n * k)) | |
| # send(factorial, 3, wantsanswer) | |
| def pointmaker(x, y): | |
| def toX(): | |
| return x | |
| def toY(): | |
| return y | |
| def __add__(other): | |
| return pointmaker(x + other.toX(), y + other.toY()) | |
| def __repr__(): | |
| return u"<%s,%s>" % (x, y) | |
| return Namespace(toX, toY, __add__, __repr__) | |
| def countermaker(start=0): | |
| x = start | |
| def get_count(): | |
| return x | |
| def incr(): | |
| global x | |
| x += 1 | |
| while 1: | |
| dispatch = yield | |
| yield locals()[dispatch]() | |
| def countermaker(init=0): | |
| def get_count(): | |
| return init | |
| def incr(): | |
| init = init | |
| init += 1 | |
| return Namespace(get_count, incr) | |
| def factorial(x): | |
| if x== 1: return 1 | |
| return x * factorial(x-1) | |
| def fact2(x): | |
| if x == 1: | |
| yield 0 | |
| else: | |
| yield x * fact2(x - 1) | |
| def create_logger(self, filename='club_%i.log', format=True): | |
| if format: | |
| filename = filename % self.name | |
| f = file(filename, 'a') | |
| lock_file(f) | |
| def close(): | |
| unlock_file(f) | |
| f.close() | |
| return Finished | |
| # namespace = ImmutableNamespace(close) | |
| # self.link('Finished', close) | |
| # self.register_on_finish(close) | |
| try: | |
| while 1: | |
| # yield call_if_get(self, namespace) | |
| f.write((yield self.get('text'))) | |
| f.flush() | |
| finally: | |
| close() | |
| # ------------------------------------------------------------------------------ | |
| # default s11n of python objekts | |
| # ------------------------------------------------------------------------------ | |
| class Unpickler(pickle.Unpickler): | |
| def find_class(self, module, name): | |
| try: | |
| __import__(module) | |
| except ImportError: | |
| pimp(module) | |
| return getattr(sys.modules[module], name) | |
| Pickler = pickle.Pickler | |
| def pickle(obj, stream=Blank, retval=False): | |
| if stream is Blank: | |
| stream = StringIO() | |
| retval = True | |
| Pickler(stream, 2).dump(obj) | |
| if retval: | |
| return stream.getvalue() | |
| def unpickle(stream): | |
| if isinstance(stream, basestring): | |
| stream = StringIO(stream) | |
| return Unpickler(stream).load() | |
| # ------------------------------------------------------------------------------ | |
| # them funky dansers! | |
| # ------------------------------------------------------------------------------ | |
| @worker(service_name="core/unix pipe") | |
| def UnixPipe(self, name): | |
| try: | |
| os.mkfifo(name) | |
| except OSError, e: | |
| if e.strerror != 'File exists': | |
| raise | |
| s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
| i = 0 | |
| while i < 100: | |
| try: | |
| s.bind('/tmp/disco.%i.socket' % i) | |
| except socket.error: | |
| i += 1 | |
| else: | |
| break | |
| try: | |
| os.unlink('/tmp/disco.%i.socket' % i) | |
| except OSError: | |
| pass | |
| def send(text): | |
| f = file(name, 'a') | |
| f.write(text + '\n') | |
| def receive(): | |
| f = file(name, 'r') | |
| return f.readline() | |
| # p = UnixPipe('hello') | |
| # print p.receive() | |
| # print 'fo' | |
| # p.send('foo') | |
| # print 'fo2' | |
| # print p.receive() | |
| @worker(service_name="core/delayed pipe") | |
| def DelayedSender(self): | |
| pass | |
| @worker(service_name="core/socket pipe") | |
| def SocketPipe(self, port): | |
| pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment