Created
September 30, 2012 16:58
-
-
Save edwardgeorge/3807592 to your computer and use it in GitHub Desktop.
Consumer for blocking on multiple eventlet queues.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Consumer for blocking on multiple eventlet queues. | |
Ensures that you never take more items from the queues than just | |
the first item you receive, and only when explicitly waiting for | |
and item. | |
""" | |
import eventlet | |
from eventlet.event import Event | |
from eventlet.green import Queue | |
class MultiQueueConsumer(object): | |
def __init__(self, queues): | |
self.cancelled = False | |
self.event = Event() | |
self.queues = queues | |
class Waiter(object): | |
def __init__(self, consumer, queue): | |
self.consumer = consumer | |
self.queue = queue | |
@property | |
def cancelled(self): | |
return self.consumer.cancelled | |
def switch(self, item): | |
if self.cancelled or self.consumer.event.ready(): | |
self.queue.queue.appendleft(item) | |
self.queue._schedule_unlock() | |
else: | |
self.consumer.event.send((self.queue, item)) | |
def kill(self, *exc_info): | |
if not self.cancelled and not self.consumer.event.ready(): | |
self.consumer.event.send(exc=exc_info) | |
def wait(self, timeout=None, return_queue=False): | |
empty_queues = [] | |
for q in self.queues: | |
try: | |
if return_queue: | |
return q, q.get_nowait() | |
else: | |
return q.get_nowait() | |
except Queue.Empty: | |
empty_queues.append(q) | |
for q in empty_queues: | |
q.getters.add(self.Waiter(self, q)) | |
self.cancelled = False | |
try: | |
with eventlet.Timeout(timeout, exception=Queue.Empty): | |
if return_queue: | |
return self.event.wait() | |
else: | |
return self.event.wait()[1] | |
finally: | |
self.cancelled = True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment