Last active
September 5, 2017 06:39
-
-
Save parity3/10890d36b098e40a8cae24081d78e614 to your computer and use it in GitHub Desktop.
trio wait on multiple queues
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections, operator, datetime, importlib | |
if 0: | |
trio = module() | |
trio.run = None # type: callable | |
trio.WouldBlock = None | |
trio.Abort = None # type: abort_class | |
trio.current_task = None # type: callable | |
# STUBS for PyCharm | |
class abort_class: | |
SUCCEEDED = None | |
class ParkingLot_class: | |
park = None # type: callable | |
hazmat = module() | |
hazmat.ParkingLot = None # type: callable -> ParkingLot_class | |
hazmat.yield_indefinitely = None # type: callable | |
hazmat.reschedule = None # type: callable | |
else: | |
globals()['trio'] = _ = importlib.import_module("trio") | |
globals()['hazmat'] = _.hazmat | |
del _ | |
class QueueNotOpenException(Exception): | |
pass | |
class WaiterEntry(object): | |
__slots__ = 'task', 'still_waiting', 'data', 'queue', 'aborted' | |
def __init__(self): | |
self.task = trio.current_task() | |
self.still_waiting = True | |
self.data = None | |
self.queue = None | |
self.aborted = True | |
def assign_data_and_wakeup(self,data,q : "DispatcherQueue"): | |
self.data = data | |
self.still_waiting = False | |
self.queue = q | |
hazmat.reschedule(self.task) | |
class Dispatcher: | |
""" | |
Factory for queues which allows for a get() to be run on multiple queues, prioritizing any ready data to the first | |
passed-in queue. | |
Queues created by this factory have one caveat: they MUST be close()'d after they are no longer in use. | |
Calling put() on a closed queue will generate a warning. Calling get() on a closed queue will raise a QueueNotOpenException. | |
A race-condition can happen where a canceled task can have data ready for it that must be requeued. Because of this, | |
the order (usually FIFO) of the queue data cannot be guaranteed. | |
""" | |
MAX_EVENT_WAITS = 10000 | |
MAX_QUEUES = 1000 | |
QueueNotOpenException = QueueNotOpenException | |
def __init__(self): | |
self.waiters_for_queues = {} | |
def log_func_off(self, *args): | |
pass | |
def log_func_on(self, *args): | |
print("{} - Dispatcher:".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')), *args) | |
log_func = log_func_off | |
warn_func = log_func_on | |
def queue(self) -> "DispatcherQueue": | |
q = DispatcherQueue(self) | |
num_queues = len(self.waiters_for_queues) | |
if num_queues >= self.MAX_QUEUES: | |
raise RuntimeError(f"number of queues exceeded, please close some: {num_queues+1} > {self.MAX_QUEUES}") | |
self.waiters_for_queues[q] = [] | |
self.log_func(f"queue() created new, now at: {num_queues}") | |
return q | |
def close_queue(self, q: "DispatcherQueue"): | |
# must be called when queue no longer in use | |
rsp = self.waiters_for_queues.pop(q, None) | |
if rsp is None: | |
self.log_func(f"WARNING: close_queue() NOT removed, at: {len(self.waiters_for_queues)}") | |
else: | |
self.log_func(f"close_queue() removed, now at: {len(self.waiters_for_queues)}") | |
def get_nowait(self, *queues): | |
try: | |
ready_data = next(filter(None, map(operator.attrgetter('ready_data'), queues))) | |
except StopIteration: | |
raise trio.WouldBlock("queues are empty") | |
return ready_data.popleft() | |
async def get(self, *queues): | |
if not queues: | |
raise AssertionError("must be called with at least 1 queue") | |
try: | |
ready_data = next(filter(None, map(operator.attrgetter('ready_data'),queues))) | |
except StopIteration: | |
pass | |
else: | |
return ready_data.popleft() | |
entry = WaiterEntry() | |
# get the waiter entry lists for each queue | |
try: | |
qlists = list(map(self.waiters_for_queues.__getitem__, queues)) # type: list | |
except KeyError: | |
raise QueueNotOpenException('cannot get() on unknown or closed queue') | |
# register ourselves as a waiter entry for each of the queues we are interested in | |
collections.deque(map(operator.methodcaller('append', entry), qlists), 0) | |
try: | |
# have faith we will be woken up by a put or put_nowait | |
# reschedule() on this task can only be called once so there is no need to do anything synchronously in the abort_fn callback | |
await hazmat.yield_indefinitely(abort_fn=lambda *args: trio.Abort.SUCCEEDED) | |
if entry.still_waiting: | |
raise AssertionError("somehow rescheduled but no data waiting for me") | |
entry.aborted = False # if we got here everything is ok | |
return entry.data | |
finally: | |
if entry.aborted and not entry.still_waiting: | |
self.warn_func(f"WARNING: we were canceled and assigned data was never delivered!") | |
# Make sure the data is at least requeued so it is not lost. | |
self.requeue(entry.queue,entry.data) | |
# unregister myself from queues I added myself to | |
try: | |
collections.deque(map(operator.methodcaller('remove', entry), qlists), 0) | |
except ValueError: | |
raise AssertionError("someone else removed me from queue's waiter entries") | |
def requeue(self,q : "DispatcherQueue", data): | |
"""hazmat territory. | |
Something must have gone wrong in the caller so we have to put this data somewhere so it isn't lost | |
Try to ensure it isn't out of order by putting it in front of queue.""" | |
try: | |
waiters = self.waiters_for_queues[q] # type: list | |
except KeyError: | |
# since we are being called from get(), if queue is closed we are done | |
return | |
for wentry in waiters: # type: WaiterEntry | |
if wentry.still_waiting: | |
# get() will guarantee this data isn't lost. | |
# Assign it to the task, wake it up and move on. | |
# It will handle unregistration. | |
wentry.assign_data_and_wakeup(data,q) | |
break | |
else: | |
# no waiters need more data. save it for later. | |
# also skip the usual size check; we are in sync land and we can't raise WouldBlock. | |
q.ready_data.appendleft(data) | |
def assign_to_waiter(self, q: "DispatcherQueue", data): | |
if q.ready_data: | |
# There are no waiters if we haven't delivered previous ready_data | |
q.ready_data.append(data) | |
else: | |
try: | |
waiters = self.waiters_for_queues[q] # type: list | |
except KeyError: | |
# raise QueueNotOpenException("cannot put() to unknown or closed queue") | |
self.warn_func(f"WARNING: put() on closed queue; throwing away data: {data}") | |
return | |
for wentry in waiters: # type: WaiterEntry | |
if wentry.still_waiting: | |
# get() will guarantee this data isn't lost. | |
# Assign it to the task, wake it up and move on. | |
# It will handle unregistration. | |
wentry.assign_data_and_wakeup(data,q) | |
break | |
else: | |
# no waiters need more data. save it for later. | |
q.ready_data.append(data) | |
class DispatcherQueue: | |
MAX_EVENT_WAITS = 10000 | |
def __init__(self, dispatcher: Dispatcher): | |
# when this queue fills up, a get is responsible for triggering this event once the queue has room for more data | |
self.has_free_slots = hazmat.ParkingLot() | |
# the actual data on this queue that was put here but remains to be consumed | |
self.ready_data = collections.deque() | |
self.max_size = 10 | |
self.dispatcher = dispatcher | |
def __len__(self): | |
return len(self.ready_data) | |
async def put(self, data): | |
if len(self.ready_data) >= self.max_size: | |
# have faith we will be resurrected later by a get() or get_nowait() | |
await self.has_free_slots.park() | |
self.dispatcher.assign_to_waiter(self, data) | |
def put_nowait(self, data): | |
if len(self.ready_data) >= self.max_size: | |
raise trio.WouldBlock("max size reached") | |
self.dispatcher.assign_to_waiter(self, data) | |
async def get(self): | |
try: | |
rd = self.ready_data.popleft() | |
if len(self.ready_data) == self.max_size - 1: | |
self.has_free_slots.unpark() | |
return rd | |
except IndexError: | |
pass # no data is immediately available. Subcsribe and wait. | |
return await self.dispatcher.get(self) | |
def get_nowait(self): | |
try: | |
rd = self.ready_data.popleft() | |
if len(self.ready_data) == self.max_size - 1: | |
self.has_free_slots.unpark() | |
return rd | |
except IndexError: | |
raise trio.WouldBlock("no ready_data for queue") | |
def qsize(self): | |
return len(self.ready_data) | |
def close(self): | |
self.dispatcher.close_queue(self) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment