Created
March 11, 2015 19:50
-
-
Save lebedov/e611c786ef36060b5b7a to your computer and use it in GitHub Desktop.
Class for managing a group of queues with per-queue push and simultaneous pop
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Group of queues; data can be pushed/popped into each queue separately, but one | |
can also pop off values from all of the queues simultaneously. | |
+-------+ | |
queue a: in -> | |A|B| | |
+-------+ out -> (B,C) | |
queue b: in -> | |C| | |
+-------+ | |
""" | |
from collections import deque | |
class QueueGroup(object): | |
""" | |
Group of queues. | |
""" | |
def __init__(self, names): | |
self.queues = {} | |
for name in names: | |
self.queues[name] = deque() | |
def names(self): | |
""" | |
List queue names. | |
Returns | |
------- | |
n : list | |
List of queue names. | |
""" | |
return self.queues.keys() | |
def add(self, name): | |
""" | |
Add a new queue. | |
Parameters | |
---------- | |
name : hashable | |
Queue name. Attempting to add an existing name will raise an exception. | |
""" | |
if name in self.queues: | |
raise ValueError('queue name already exists') | |
self.queues[name] = deque() | |
def push(self, name, val): | |
""" | |
Push a value into a specific queue. | |
Parameters | |
---------- | |
name : hashable | |
Queue name. | |
val : object | |
Object to queue. | |
""" | |
self.queues[name].appendleft(val) | |
def pop(self, name): | |
""" | |
Pop a value off a specific queue. | |
Parameters | |
---------- | |
name : hashable | |
Queue name. | |
Returns | |
------- | |
result : object | |
Popped object. | |
""" | |
return self.queues[name].pop() | |
def nonempty(self, name=None): | |
""" | |
Check if queue(s) is/are empty. | |
Parameters | |
---------- | |
name : hashable | |
Queue name. | |
Returns | |
------- | |
result : bool | |
If a queue name is specified, return True if it is nonempty. | |
If no queue name is specified, check whether all of the | |
queues are nonempty. | |
""" | |
if name: | |
return len(self.queues[name]) != 0 | |
else: | |
return 0 not in map(len, self.queues.values()) | |
def popall(self): | |
""" | |
Pop all of the queues in the group. | |
Returns | |
------- | |
result : list | |
Values popped off all queues. | |
""" | |
if self.nonempty(): | |
result = [] | |
for name in self.names(): | |
result.append(self.pop(name)) | |
return result | |
else: | |
raise RuntimeError('some queues are empty') | |
def __getitem__(self, name): | |
return self.queues[name] | |
def __repr__(self): | |
out = [] | |
names = self.names() | |
max_name_len = max(map(len, names)) | |
for name in self.names(): | |
out.append(('%-'+('%i' % max_name_len)+'s: %s') % (name, str(self.queues[name]))) | |
return '\n'.join(out) | |
if __name__ == '__main__': | |
from unittest import main, TestCase | |
class test_queue_group(TestCase): | |
def test_add(self): | |
qg = QueueGroup(['a']) | |
qg.add('b') | |
assert set(qg.queues.keys()) == set(['a', 'b']) | |
self.assertRaises(ValueError, qg.add, 'b') | |
def test_push(self): | |
qg = QueueGroup(['a', 'b']) | |
qg.push('a', 0) | |
qg.push('a', 1) | |
assert qg.queues['a'] == deque([1,0]) | |
assert qg.queues['b'] == deque() | |
def test_pop(self): | |
qg = QueueGroup(['a', 'b']) | |
qg.push('a', 0) | |
qg.push('a', 1) | |
qg.push('b', 'x') | |
assert qg.pop('a') == 0 | |
assert qg.pop('b') == 'x' | |
def test_nonempty(self): | |
qg = QueueGroup(['a', 'b']) | |
qg.push('a', 0) | |
assert qg.nonempty('a') | |
assert not qg.nonempty('b') | |
assert not qg.nonempty() | |
def test_popall(self): | |
# All queues nonempty: | |
qg = QueueGroup(['a', 'b']) | |
qg.push('a', 0) | |
qg.push('a', 1) | |
qg.push('b', 'x') | |
assert set(qg.popall()) == set([0, 'x']) | |
# Some queues nonempty: | |
qg = QueueGroup(['a', 'b']) | |
qg.push('a', 0) | |
qg.push('a', 1) | |
self.assertRaises(RuntimeError, qg.popall) | |
def test_getitem(self): | |
qg = QueueGroup(['a', 'b']) | |
qg.push('a', 0) | |
assert qg['a'] == deque([0]) | |
def test_repr(self): | |
qg = QueueGroup(['a', 'xyz']) | |
qg.push('a', 0) | |
qg.push('a', 1) | |
assert str(qg) == 'a : deque([1, 0])\nxyz: deque([])' | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment