Created
September 10, 2016 22:42
-
-
Save lucindo/f68278b210c247d95082dfd76e7b924a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from collections import deque | |
from Queue import Queue | |
# Adding some control to Queue class | |
class ProcessQueue(Queue): | |
""" Extends Queue to add some control methods | |
Assumes only blocking put and get will be used | |
There's 2 common patterns to consume items from Queue | |
that are noisy and/or error prone: | |
- 1. Using a message to indicate end for consumer: | |
Producer: | |
q.put("END") | |
Consumer: | |
while True: | |
msg = q.get() | |
if msg == "END": | |
break | |
else: | |
process(msg) | |
q.task_done() | |
This scenario is verbose and assumes only one consumer | |
(or producer/s has to be aware of the number of consumers | |
to send "END" to each one). | |
- 2. Checking for emptiness: | |
Consumer: | |
while not q.empty(): | |
msg = q.get() | |
process(msg) | |
q.task_done() | |
There are situations where the queue will be empty but the | |
producers may still put data on queue. This leads to q.join() | |
blocking forever as consumers stopped consuming. | |
- Also: Use q.join(): | |
Same problem as checking for empty queue, all consumers may | |
be waiting to process and producers are blocked on I/O but | |
will put data on queue after that. In this scenario join() | |
will return and main thread may exit. | |
Using ProcessQueue avoid these common errors. Use it this way: | |
Producer: | |
# to signal stop of processing: | |
q.register() | |
produce_several_messages(q) | |
q.finish() | |
Consumer: | |
while not q.finished(): | |
msg = q.get() | |
process(msg) | |
q.task_done() | |
Main thread: | |
# busy wait for now | |
q.wait_finish(interval=0.1) | |
""" | |
def _init(self, maxsize): | |
self.queue = deque() | |
self.producers = 0 | |
def register(self): | |
with self.mutex: | |
self.producers += 1 | |
def finish(self): | |
with self.mutex: | |
self.producers -= 1 | |
def finished(self): | |
return self.producers <= 0 and self.empty() | |
def wait_finish(self, interval=1.0): | |
while True: | |
if self.finished(): | |
return | |
time.sleep(interval) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment