Skip to content

Instantly share code, notes, and snippets.

@lucindo
Created September 10, 2016 22:42
Show Gist options
  • Save lucindo/f68278b210c247d95082dfd76e7b924a to your computer and use it in GitHub Desktop.
Save lucindo/f68278b210c247d95082dfd76e7b924a to your computer and use it in GitHub Desktop.
import time
from collections import deque
from Queue import Queue
# Adding some control to Queue class
class ProcessQueue(Queue):
""" Extends Queue to add some control methods
Assumes only blocking put and get will be used
There's 2 common patterns to consume items from Queue
that are noisy and/or error prone:
- 1. Using a message to indicate end for consumer:
Producer:
q.put("END")
Consumer:
while True:
msg = q.get()
if msg == "END":
break
else:
process(msg)
q.task_done()
This scenario is verbose and assumes only one consumer
(or producer/s has to be aware of the number of consumers
to send "END" to each one).
- 2. Checking for emptiness:
Consumer:
while not q.empty():
msg = q.get()
process(msg)
q.task_done()
There are situations where the queue will be empty but the
producers may still put data on queue. This leads to q.join()
blocking forever as consumers stopped consuming.
- Also: Use q.join():
Same problem as checking for empty queue, all consumers may
be waiting to process and producers are blocked on I/O but
will put data on queue after that. In this scenario join()
will return and main thread may exit.
Using ProcessQueue avoid these common errors. Use it this way:
Producer:
# to signal stop of processing:
q.register()
produce_several_messages(q)
q.finish()
Consumer:
while not q.finished():
msg = q.get()
process(msg)
q.task_done()
Main thread:
# busy wait for now
q.wait_finish(interval=0.1)
"""
def _init(self, maxsize):
self.queue = deque()
self.producers = 0
def register(self):
with self.mutex:
self.producers += 1
def finish(self):
with self.mutex:
self.producers -= 1
def finished(self):
return self.producers <= 0 and self.empty()
def wait_finish(self, interval=1.0):
while True:
if self.finished():
return
time.sleep(interval)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment