Created
February 18, 2012 06:31
-
-
Save wolever/1857838 to your computer and use it in GitHub Desktop.
A persistent Queue implementation in Python which focuses on durability over throughput
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pqueue import PersistentQueue | |
q1 = PersistentQueue("/tmp/queue_storage_dir") | |
q1.put("1") | |
q1.put("2") | |
q1.put("3") | |
q1.close() | |
q2 = PersistentQueue("/tmp/queue_storage_dir") | |
while not q2.empty(): | |
print repr(q2.get()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import glob | |
import zlib | |
import pickle | |
import struct | |
import logging | |
import tempfile | |
from time import time as _time | |
from Queue import Empty, Full | |
from collections import namedtuple, deque | |
log = logging.getLogger(__name__) | |
class SafeQueueQueue(object): | |
"""Create a queue object with a given maximum size. | |
If maxsize is <= 0, the queue size is infinite. | |
A copy of Python's Queue.Queue with changes that guarantee locks will | |
always be released, even if unexpected exceptions are raised (also, it | |
inherits from ``object``). | |
""" | |
def __init__(self, maxsize=0): | |
import threading | |
self.maxsize = maxsize | |
self._init(maxsize) | |
# mutex must be held whenever the queue is mutating. All methods | |
# that acquire mutex must release it before returning. mutex | |
# is shared between the three conditions, so acquiring and | |
# releasing the conditions also acquires and releases mutex. | |
self.mutex = threading.Lock() | |
# Notify not_empty whenever an item is added to the queue; a | |
# thread waiting to get is notified then. | |
self.not_empty = threading.Condition(self.mutex) | |
# Notify not_full whenever an item is removed from the queue; | |
# a thread waiting to put is notified then. | |
self.not_full = threading.Condition(self.mutex) | |
# Notify all_tasks_done whenever the number of unfinished tasks | |
# drops to zero; thread waiting to join() is notified to resume | |
self.all_tasks_done = threading.Condition(self.mutex) | |
self.unfinished_tasks = 0 | |
def task_done(self): | |
"""Indicate that a formerly enqueued task is complete. | |
Used by Queue consumer threads. For each get() used to fetch a task, | |
a subsequent call to task_done() tells the queue that the processing | |
on the task is complete. | |
If a join() is currently blocking, it will resume when all items | |
have been processed (meaning that a task_done() call was received | |
for every item that had been put() into the queue). | |
Raises a ValueError if called more times than there were items | |
placed in the queue. | |
""" | |
self.all_tasks_done.acquire() | |
try: | |
unfinished = self.unfinished_tasks - 1 | |
if unfinished <= 0: | |
if unfinished < 0: | |
raise ValueError('task_done() called too many times') | |
self.all_tasks_done.notify_all() | |
self.unfinished_tasks = unfinished | |
finally: | |
self.all_tasks_done.release() | |
def join(self): | |
"""Blocks until all items in the Queue have been gotten and processed. | |
The count of unfinished tasks goes up whenever an item is added to the | |
queue. The count goes down whenever a consumer thread calls task_done() | |
to indicate the item was retrieved and all work on it is complete. | |
When the count of unfinished tasks drops to zero, join() unblocks. | |
""" | |
self.all_tasks_done.acquire() | |
try: | |
while self.unfinished_tasks: | |
self.all_tasks_done.wait() | |
finally: | |
self.all_tasks_done.release() | |
def qsize(self): | |
"""Return the approximate size of the queue (not reliable!).""" | |
with self.mutex: | |
return self._qsize() | |
def empty(self): | |
"""Return True if the queue is empty, False otherwise (not reliable!).""" | |
with self.mutex: | |
return not self._qsize() | |
def full(self): | |
"""Return True if the queue is full, False otherwise (not reliable!).""" | |
with self.mutex: | |
return 0 < self.maxsize == self._qsize() | |
def put(self, item, block=True, timeout=None): | |
"""Put an item into the queue. | |
If optional args 'block' is true and 'timeout' is None (the default), | |
block if necessary until a free slot is available. If 'timeout' is | |
a positive number, it blocks at most 'timeout' seconds and raises | |
the Full exception if no free slot was available within that time. | |
Otherwise ('block' is false), put an item on the queue if a free slot | |
is immediately available, else raise the Full exception ('timeout' | |
is ignored in that case). | |
""" | |
self.not_full.acquire() | |
try: | |
if self.maxsize > 0: | |
if not block: | |
if self._qsize() == self.maxsize: | |
raise Full | |
elif timeout is None: | |
while self._qsize() == self.maxsize: | |
self.not_full.wait() | |
elif timeout < 0: | |
raise ValueError("'timeout' must be a positive number") | |
else: | |
endtime = _time() + timeout | |
while self._qsize() == self.maxsize: | |
remaining = endtime - _time() | |
if remaining <= 0.0: | |
raise Full | |
self.not_full.wait(remaining) | |
self._put(item) | |
self.unfinished_tasks += 1 | |
self.not_empty.notify() | |
finally: | |
self.not_full.release() | |
def put_nowait(self, item): | |
"""Put an item into the queue without blocking. | |
Only enqueue the item if a free slot is immediately available. | |
Otherwise raise the Full exception. | |
""" | |
return self.put(item, False) | |
def get(self, block=True, timeout=None): | |
"""Remove and return an item from the queue. | |
If optional args 'block' is true and 'timeout' is None (the default), | |
block if necessary until an item is available. If 'timeout' is | |
a positive number, it blocks at most 'timeout' seconds and raises | |
the Empty exception if no item was available within that time. | |
Otherwise ('block' is false), return an item if one is immediately | |
available, else raise the Empty exception ('timeout' is ignored | |
in that case). | |
""" | |
self.not_empty.acquire() | |
try: | |
if not block: | |
if not self._qsize(): | |
raise Empty | |
elif timeout is None: | |
while not self._qsize(): | |
self.not_empty.wait() | |
elif timeout < 0: | |
raise ValueError("'timeout' must be a positive number") | |
else: | |
endtime = _time() + timeout | |
while not self._qsize(): | |
remaining = endtime - _time() | |
if remaining <= 0.0: | |
raise Empty | |
self.not_empty.wait(remaining) | |
item = self._get() | |
self.not_full.notify() | |
return item | |
finally: | |
self.not_empty.release() | |
def get_nowait(self): | |
"""Remove and return an item from the queue without blocking. | |
Only get an item if one is immediately available. Otherwise | |
raise the Empty exception. | |
""" | |
return self.get(False) | |
# Override these methods to implement other queue organizations | |
# (e.g. stack or priority queue). | |
# These will only be called with appropriate locks held | |
# Initialize the queue representation | |
def _init(self, maxsize): | |
self.queue = deque() | |
def _qsize(self, len=len): | |
return len(self.queue) | |
# Put a new item in the queue | |
def _put(self, item): | |
self.queue.append(item) | |
# Get an item from the queue | |
def _get(self): | |
return self.queue.popleft() | |
class ThreadsafeQueueBase(SafeQueueQueue): | |
def peek(self, block=True, timeout=None): | |
"""Return an item from the queue without removing it. | |
If optional args 'block' is true and 'timeout' is None (the default), | |
block if necessary until an item is available. If 'timeout' is | |
a positive number, it blocks at most 'timeout' seconds and raises | |
the Empty exception if no item was available within that time. | |
Otherwise ('block' is false), return an item if one is immediately | |
available, else raise the Empty exception ('timeout' is ignored | |
in that case). | |
""" | |
self.not_empty.acquire() | |
try: | |
if not block: | |
if not self._qsize(): | |
raise Empty | |
elif timeout is None: | |
while not self._qsize(): | |
self.not_empty.wait() | |
elif timeout < 0: | |
raise ValueError("'timeout' must be a positive number") | |
else: | |
endtime = _time() + timeout | |
while not self._qsize(): | |
remaining = endtime - _time() | |
if remaining <= 0.0: | |
raise Empty | |
self.not_empty.wait(remaining) | |
item = self._peek() | |
self.not_full.notify() | |
return item | |
finally: | |
self.not_empty.release() | |
def peek_nowait(self): | |
"""Return an item from the queue without removing it. | |
Only get an item if one is immediately available. Otherwise | |
raise the Empty exception. | |
""" | |
return self.peek(False) | |
def _init(self, maxsize): | |
# Queue.Queue's ``_init`` creates a deque, which we don't necessarily | |
# want. | |
pass | |
def _qsize(self, len=len): | |
raise NotImplemented | |
def _put(self, item): | |
raise NotImplemented | |
def _get(self): | |
raise NotImplemented | |
def pack_int(int): | |
return struct.pack("!i", int) | |
def unpack_int(bytes): | |
return struct.unpack("!i", bytes)[0] | |
def pack_uint(uint): | |
assert uint >= 0, "%r < 0" %(uint, ) | |
return struct.pack("!I", uint) | |
def unpack_uint(bytes): | |
return struct.unpack("!I", bytes)[0] | |
class QueueError(Exception): | |
pass | |
PQRecord = namedtuple("PQRecord", "queue start end type data error") | |
class JournaledPersistentQueue(object): | |
""" A queue that is persisted to a journal file. | |
Properties: | |
- *Not* thread safe (intended to be used through ``PersistentQueue``, | |
which handles thread safety). | |
- Records all operations (puts, gets) to a journal file (which will | |
only grow). | |
- Loading the journal will require time proportional to the number | |
of records. | |
- Optimized for durability, not throughput | |
- Uses the ``_nowait`` suffix on public methods to make sure that | |
they are never used in a context where blocking is expected. | |
- Each record has the format:: | |
char type (either ``RECORD_TYPE_JOURNAL`` or ``RECORD_TYPE_ITEM``) | |
uint length (of the ``data``) | |
char[length] data (see below) | |
int crc32 (the ``crc32`` checksum of ``type . length . data``) | |
The ``data`` is either a pickled object (when the ``type`` is | |
``RECORD_TYPE_ITEM``) or when the ``type`` is | |
``RECORD_TYPE_JOURNAL``, a ``uint`` which is a reference to the end | |
of the last "read" item record. | |
- If an error (truncated record or checksum failure) is encountered | |
while reading from the journal, a ``log.warning`` message will be | |
emitted and all further records will be ignored. | |
- Exceptions will only be raised when: | |
- The ``create`` flag is specified, but the file already exists | |
- The ``create`` flag is not specified but the file does not exist | |
- The programmer makes a stupid error (see ``assert`` statements) | |
""" | |
RECORD_TYPE_ITEM = "\x01" | |
RECORD_TYPE_JOURNAL = "\x02" | |
SERIALIZER = pickle | |
def __init__(self, filename, create=False): | |
self.filename = filename | |
self._peeked = None | |
self._file = self._open(create) | |
self._initialize() | |
def _open(self, create): | |
exists = os.path.exists(self.filename) | |
if create and exists: | |
raise QueueError("refusing to create - file already exists: %r" | |
%(self.filename, )) | |
elif not (create or exists): | |
raise QueueError("file does not exist: %r" %(self.filename, )) | |
mode = create and "w+b" or "r+b" | |
return open(self.filename, mode) | |
def _initialize(self): | |
""" Initializes ``self._read_pos`` (the location of the record | |
currently being read from), ``self._write_pos`` (the location where | |
future writes should go) and ``self._item_count`` (the number of | |
active items in this queue). | |
""" | |
self._file.seek(0) | |
self._read_pos = self._file.tell() | |
self._write_pos = self._read_pos | |
self._item_count = 0 | |
while True: | |
record = self._read_one(self._write_pos) | |
if record is None: | |
break | |
if record.error: | |
log.warn("error encountered while loading journal; truncated " | |
"record will be overwritten on the next write " | |
"(error record: %r)", record) | |
break | |
if record.type == self.RECORD_TYPE_JOURNAL: | |
self._read_pos = unpack_uint(record.data) | |
self._item_count -= 1 | |
elif record.type == self.RECORD_TYPE_ITEM: | |
self._item_count += 1 | |
else: | |
assert False, "_read_one returned unexpected record type " \ | |
"without an error: %r" %(record, ) | |
self._write_pos = record.end | |
def _read_one(self, start): | |
""" Reads one record from location ``start``. | |
Returns either ``None`` (if ``start`` is the end of the file) or | |
an instance of ``PQRecord``. | |
If any error is encountered while reading the record (truncation | |
or checksum failure), ``PQRecord.error`` will be set. Other fields | |
of ``PQRecord`` may also be set, but their values should only | |
be used for diagnostics and debugging. | |
Note that ``self._read_pos`` is not modified. | |
""" | |
HEADER_LEN = 5 | |
CHECKSUM_LEN = 4 | |
self._file.seek(start) | |
header = self._file.read(HEADER_LEN) | |
header_len = len(header) | |
if header_len == 0: | |
return None | |
if header_len < HEADER_LEN: | |
return PQRecord(self, start, None, header[0], header[1:], | |
"truncated header: %r" %(header, )) | |
type = header[0] | |
if not (type == self.RECORD_TYPE_ITEM or type == self.RECORD_TYPE_JOURNAL): | |
return PQRecord(self, start, None, header[0], header[1:], | |
"unexpected record type: %r" %(header, )) | |
data_checksum_len = unpack_uint(header[1:]) + CHECKSUM_LEN | |
end = start + HEADER_LEN + data_checksum_len | |
data_checksum = self._file.read(data_checksum_len) | |
if len(data_checksum) != data_checksum_len: | |
return PQRecord(self, start, end, type, None, "record truncated") | |
data = data_checksum[:-CHECKSUM_LEN] | |
checksum = data_checksum[-CHECKSUM_LEN:] | |
actual_checksum = pack_int(zlib.crc32(header + data)) | |
if actual_checksum != checksum: | |
return PQRecord(self, start, end, type, data, | |
"checksum failed (%r != %r)" | |
%(checksum, actual_checksum)) | |
return PQRecord(self, start, end, type, data, None) | |
def _write_one(self, type, data, active=True): | |
""" Writes one record to ``self._write_pos`` and updates | |
``self._write_pos`` to the end of the new record. | |
""" | |
assert type in [self.RECORD_TYPE_ITEM, self.RECORD_TYPE_JOURNAL], \ | |
"invalid record type: %r" %(type, ) | |
self._file.seek(self._write_pos) | |
data_len = pack_uint(len(data)) | |
to_write = type + data_len + data | |
to_write += pack_int(zlib.crc32(to_write)) | |
self._file.write(to_write) | |
self._file.flush() | |
self._write_pos += len(to_write) | |
def _peek(self): | |
""" Reads and returns a ``(record, obj)`` tuple, where ``record`` is | |
the next ``RECORD_TYPE_ITEM`` record (starting at | |
``self._read_pos``), and ``obj`` is the deserialized object stored | |
in that record. | |
Raises ``Empty`` if there are no ``RECORD_TYPE_ITEM`` records left | |
in the file. | |
If an error is encountered, a warning will be logged and further | |
records in the file will be truncated. | |
""" | |
if self._peeked is None: | |
while True: | |
record = self._read_one(self._read_pos) | |
if record is None: | |
break | |
if record.error: | |
log.warn("error encountered while reading from journal; " | |
"any subsequent records will be ignored: %r", | |
record) | |
self._write_pos = self._read_pos | |
self._item_count = 0 | |
record = None | |
break | |
if record.type == self.RECORD_TYPE_ITEM: | |
break | |
self._read_pos = record.end | |
obj = None | |
else: | |
record, obj = self._peeked | |
if record is None: | |
raise Empty | |
if obj is None: | |
obj = self.SERIALIZER.loads(record.data) | |
self._peeked = (record, obj) | |
return obj, record | |
def put_nowait(self, obj): | |
data = self.SERIALIZER.dumps(obj) | |
self._write_one(self.RECORD_TYPE_ITEM, data) | |
self._item_count += 1 | |
def get_nowait(self): | |
obj, record = self._peek() | |
self._read_pos = record.end | |
self._write_one(self.RECORD_TYPE_JOURNAL, pack_uint(self._read_pos)) | |
self._item_count -= 1 | |
self._peeked = None | |
return obj | |
def peek_nowait(self): | |
obj, _ = self._peek() | |
return obj | |
def qsize(self): | |
return self._item_count | |
def empty(self): | |
return self._item_count == 0 | |
def filesize(self): | |
""" Returns the end of the journal. Note that this will *almost* always | |
be the end of the file, but it may be earlier in the file if the file | |
contains invalid records. | |
""" | |
return self._write_pos | |
def close(self): | |
self._file.close() | |
def __repr__(self): | |
return "<%s %r filesize=%r qsize=%r>" %( | |
type(self).__name__, self.filename, self.filesize(), self.qsize(), | |
) | |
class PersistentQueue(ThreadsafeQueueBase): | |
""" A thread safe queue that will persist to multiple, rolling, journaled | |
queues. | |
Properties: | |
- Thread safe. | |
- Optimized for durability, not throughput. | |
- Stores journal files as ``basedir/pq-data-%016x``. | |
- Deletes journal files once they are no longer needed. | |
- After a journal file has grown larger than ``max_filesize``, no new | |
items will be written to it (subsequent writes will go to a new | |
journal). | |
- The ``qsize()`` will only return ``0`` (the queue is empty) or ``1`` | |
(the queue is not empty). | |
""" | |
DATAFILE_PREFIX = "pq-data-" | |
DATAFILE_TEMPLATE = DATAFILE_PREFIX + "%016x" | |
def __init__(self, basedir, max_filesize=1024*1024, | |
queue_class=JournaledPersistentQueue): | |
# Because our ``qsize()`` method doesn't return the total size of the | |
# queue, setting a ``maxsize`` value here would be redundant. | |
super(PersistentQueue, self).__init__(maxsize=0) | |
self.basedir = basedir | |
self.max_filesize = max_filesize | |
self.queue_class = queue_class | |
self._initialize() | |
def _initialize(self): | |
""" Initializes ``_reader`` and ``_writer`` queues, which are used | |
by ``_get_reader`` and ``_get_writer`` to track which internal | |
queue should be used to handle reads and writes. """ | |
if not os.path.exists(self.basedir): | |
os.mkdir(self.basedir) | |
self._load_data_files() | |
if len(self._data_files) == 0: | |
self._serial = 0 | |
self._writer = self._create_queue() | |
self._reader = self._writer | |
else: | |
writer_filepath = self._data_files.pop() | |
writer_filename = os.path.basename(writer_filepath) | |
self._serial = int(writer_filename[len(self.DATAFILE_PREFIX):], 16) | |
self._writer = self._load_queue(writer_filepath) | |
if len(self._data_files) > 0: | |
self._reader = self._load_queue(self._data_files.popleft()) | |
else: | |
self._reader = self._writer | |
def _load_data_files(self): | |
""" Loads ``self._data_files``. | |
``self._data_files`` is an ordered deque of the available data | |
files that are not currently being read from or written to. | |
When ``self._writer`` fills up, the file it was writing to will be | |
appended to ``self._data_files`` and a new writer will be created | |
which will be backed by a new data file. | |
When ``self._reader`` reaches the end of the file it is working | |
with, the left-most file in ``self._data_files`` will be shifted | |
off and read from. | |
""" | |
data_files = glob.glob(self._path(self.DATAFILE_PREFIX + "*")) | |
data_files.sort() | |
self._data_files = deque(data_files) | |
def _path(self, *parts): | |
""" Returns a path relative to ``self.basedir``. """ | |
return os.path.join(self.basedir, *parts) | |
def _load_queue(self, filename): | |
""" Loads the queue from ``filename``. """ | |
return self.queue_class(filename) | |
def _create_queue(self): | |
""" Creates and returns a new queue. """ | |
self._serial += 1 | |
filename = self._path(self.DATAFILE_TEMPLATE %(self._serial, )) | |
log.info("creating new queue at %r", filename) | |
return self.queue_class(filename, create=True) | |
def _get_reader(self): | |
""" Returns the queue which should be read from. """ | |
while self._reader is not self._writer and self._reader.empty(): | |
self._reader.close() | |
os.unlink(self._reader.filename) | |
if len(self._data_files) > 0: | |
self._reader = self._load_queue(self._data_files.popleft()) | |
else: | |
self._reader = self._writer | |
return self._reader | |
def _get_writer(self): | |
""" Returns the queue which should be written to. """ | |
if self._writer.filesize() > self.max_filesize: | |
if self._reader is not self._writer: | |
self._writer.close() | |
self._data_files.append(self._writer.filename) | |
self._writer = self._create_queue() | |
return self._writer | |
def _peek(self): | |
return self._get_reader().peek_nowait() | |
def _get(self): | |
return self._get_reader().get_nowait() | |
def _put(self, obj): | |
return self._get_writer().put_nowait(obj) | |
def _qsize(self): | |
""" Returns ``0`` if a call to ``get`` will block, otherwise returns | |
``1``. | |
This is done because it would be very slow to load and read all | |
the data files, and ``qsize`` is normally only used to determine | |
if the queue is empty. | |
""" | |
return self._get_reader().qsize() > 0 and 1 or 0 | |
def close(self): | |
self._reader.close() | |
if self._reader is not self._writer: | |
self._writer.close() | |
if __name__ == "__main__": | |
import time | |
import shutil | |
from itertools import product | |
def mk_journaled_queue(basedir): | |
return JournaledPersistentQueue(basedir + "/journal", create=True) | |
def mk_persistent_queue(basedir): | |
return PersistentQueue(basedir) | |
queues = [ | |
("JournaledPersistentQueue", mk_journaled_queue), | |
("PersistentQueue", mk_persistent_queue), | |
] | |
tests = [ | |
("1-byte", 20000, "x"), | |
("1k-bytes", 20000, "x" * 1000), | |
("10k-bytes", 20000, "x" * 10000), | |
] | |
for (qname, qfactory), (name, num, data) in product(queues, tests): | |
basedir = tempfile.mkdtemp(prefix="pqueue-%s-%s-" %(qname, name, )) | |
try: | |
queue = qfactory(basedir) | |
operations = [ | |
("put/get", lambda: [queue.put_nowait(data), queue.get_nowait()]), | |
("puts", lambda: queue.put_nowait(data)), | |
("gets", lambda: queue.get_nowait()), | |
] | |
for op_name, op in operations: | |
start = time.time() | |
try: | |
for _ in xrange(num): | |
op() | |
finally: | |
end = time.time() | |
duration = end - start | |
stats = "(%d ops/sec, %0.02f kB/sec)" %( | |
num / duration, | |
(len(data) * num / 1000.0) / duration, | |
) | |
print qname, name, num, "%s:" %(op_name, ), duration, stats | |
finally: | |
shutil.rmtree(basedir) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import random | |
import shutil | |
import tempfile | |
from Queue import Empty, Queue | |
from StringIO import StringIO | |
from nose.tools import assert_equal | |
from pqueue import JournaledPersistentQueue, PersistentQueue, QueueError | |
def assert_raises(func, exc_type, str_contains=None, repr_contains=None): | |
try: | |
func() | |
except exc_type as e: | |
if str_contains is not None and str_contains not in str(e): | |
raise AssertionError("%s raised, but %r does not contain %r" | |
%(exc_type, str(e), str_contains)) | |
if repr_contains is not None and repr_contains not in repr(e): | |
raise AssertionError("%s raised, but %r does not contain %r" | |
%(exc_type, repr(e), repr_contains)) | |
return e | |
else: | |
raise AssertionError("%s not raised" %(exc_type, )) | |
def run_blackbox_queue_verification(load_test_queue, test_qsize=False, | |
operation_count=400): | |
performed = [] | |
def perform_operation(queue, operation, arg, is_reference, **kwargs): | |
args = operation == "put_nowait" and (arg, ) or () | |
if is_reference: | |
performed.append((operation, args)) | |
if operation == "reload": | |
if is_reference: | |
return | |
else: | |
test_queue[0].close() | |
test_queue[0] = load_test_queue() | |
return | |
if operation == "qsize": | |
return queue.qsize() | |
try: | |
return getattr(queue, operation)(*args, **kwargs) | |
except Empty: | |
return Empty | |
base_operations = ["peek_nowait", "get_nowait", "put_nowait", "reload"] | |
if test_qsize: | |
base_operations += ["qsize", "empty"] | |
reference_queue = Queue() | |
def peek_nowait(**kwargs): | |
if not reference_queue.queue: | |
raise Empty() | |
return reference_queue.queue[0] | |
reference_queue.peek_nowait = peek_nowait | |
test_queue = [load_test_queue()] | |
try: | |
biases = ["grow", "shrink", "grow", "shrink"] | |
operation_count = operation_count / len(biases) | |
for bias in biases: | |
if bias == "grow": | |
operations = base_operations + ["put_nowait"] | |
else: | |
operations = base_operations + ["get_nowait"] | |
for num in xrange(operation_count): | |
operation = random.choice(operations) | |
expected = perform_operation(reference_queue, operation, num, True) | |
actual = perform_operation(test_queue[0], operation, num, False) | |
assert_equal(expected, actual) | |
except: | |
print "operations performed:" | |
print performed | |
raise | |
class TempdirTestBase(object): | |
def setup(self): | |
self.dirname = tempfile.mkdtemp(prefix="pqueue-test-") | |
self.path = lambda *a: os.path.join(self.dirname, *a) | |
def teardown(self): | |
shutil.rmtree(self.dirname) | |
class TestJournaledPersistentQueueFilesystemThings(TempdirTestBase): | |
def queue(self, filename=None, create=True): | |
return JournaledPersistentQueue( | |
filename or self.path("testfile"), | |
create=create | |
) | |
def test_create_file(self): | |
queue = self.queue() | |
assert_equal(queue.qsize(), 0) | |
assert_equal(queue.empty(), True) | |
def test_get_empty(self): | |
queue = self.queue() | |
assert_raises(queue.peek_nowait, Empty) | |
assert_raises(queue.get_nowait, Empty) | |
def test_put(self): | |
queue = self.queue() | |
queue.put_nowait(1) | |
queue.put_nowait(2) | |
assert_equal(queue.peek_nowait(), 1) | |
assert_equal(queue.get_nowait(), 1) | |
assert_equal(queue.get_nowait(), 2) | |
def test_blackbox(self): | |
self.queue(create=True).close() | |
run_blackbox_queue_verification(lambda: self.queue(create=False), | |
test_qsize=True) | |
def test_create_existing_file(self): | |
filename = self.path("testfile") | |
open(filename, "w").close() | |
assert_raises(lambda: self.queue(filename, create=True), | |
QueueError, str_contains=filename) | |
def test_load_existing_file(self): | |
q1 = self.queue() | |
q1.put_nowait(1) | |
q1.put_nowait(2) | |
q1.put_nowait(3) | |
q1.close() | |
q2 = self.queue(q1.filename, create=False) | |
assert_equal(q2.get_nowait(), 1) | |
assert_equal(q2.get_nowait(), 2) | |
assert_equal(q2.get_nowait(), 3) | |
assert_raises(q2.get_nowait, Empty) | |
class JournaledPersistentQueueWithFakeOpen(JournaledPersistentQueue): | |
def __init__(self, fake_file, *args, **kwargs): | |
self.fake_file = fake_file | |
super(JournaledPersistentQueueWithFakeOpen, self).__init__( | |
"/does/not/exist", *args, **kwargs | |
) | |
def _open(self, create): | |
return self.fake_file | |
class TestJournaledPersistentQueueLogicThings(object): | |
def queue(self, initial_data="", create=False): | |
fake_file = StringIO(initial_data) | |
fake_file.seek(0) | |
queue = JournaledPersistentQueueWithFakeOpen(fake_file, create=create) | |
return fake_file, queue | |
def get_file_contents(self, items, active=True): | |
data, queue = self.queue(create=True) | |
for item in items: | |
queue.put_nowait(item) | |
if not active: | |
queue.get_nowait() | |
data.seek(0) | |
return data.read() | |
def test_get_file_contents(self): | |
_, queue = self.queue(self.get_file_contents(["foo", "bar"])) | |
assert_equal(queue.get_nowait(), "foo") | |
assert_equal(queue.get_nowait(), "bar") | |
assert_raises(queue.get_nowait, Empty) | |
def test_data_truncated(self): | |
data = self.get_file_contents(["foo", "bar", "ohno!"])[:-3] | |
_, queue = self.queue(data) | |
assert_equal(queue.get_nowait(), "foo") | |
assert_equal(queue.get_nowait(), "bar") | |
assert_raises(queue.get_nowait, Empty) | |
def test_data_truncated_after_load(self): | |
file, queue = self.queue() | |
queue.put_nowait("foo") | |
queue.put_nowait("ohno!") | |
old_position = file.tell() | |
file.seek(-5, 2) | |
file.truncate() | |
file.seek(old_position) | |
assert_equal(queue.get_nowait(), "foo") | |
assert_raises(queue.get_nowait, Empty) | |
def test_data_checksum_fail_after_load(self): | |
file, queue = self.queue() | |
queue.put_nowait("foo") | |
queue.put_nowait("ohno!") | |
old_position = file.tell() | |
file.seek(-5, 2) | |
file.write("x") | |
file.seek(old_position) | |
assert_equal(queue.get_nowait(), "foo") | |
assert_raises(queue.get_nowait, Empty) | |
queue.put_nowait("bar") | |
assert_equal(queue.get_nowait(), "bar") | |
def test_fuzzing(self): | |
file, queue = self.queue() | |
queue.put_nowait("foo") | |
queue.put_nowait("bar") | |
queue.get_nowait() | |
queue.put_nowait("baz") | |
queue.get_nowait() | |
file.seek(0) | |
# A simple fuzzing test. Walks over the file, flipping the least | |
# significant bit of each byte, then loads this "bad" file and performs | |
# some simple operations to make sure that no exceptions are raised. | |
bytes = [ord(x) for x in file.read()] | |
for idx, _ in enumerate(bytes): | |
bytes[idx] ^= 0x01 | |
_, queue = self.queue("".join(map(chr, bytes))) | |
queue.put_nowait("test") | |
for _ in range(queue.qsize()): | |
queue.get_nowait() | |
queue.put_nowait("test") | |
assert_equal(queue.get_nowait(), "test") | |
bytes[idx] ^= 0x01 | |
class TestPersistentQueue(TempdirTestBase): | |
def test_blackbox(self): | |
mk_queue = lambda: PersistentQueue(self.dirname, max_filesize=1000) | |
run_blackbox_queue_verification(mk_queue) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
JournaledPersistentQueue 1-byte 20000 put/get: 1.82719492912 (10945 ops/sec, 10.95 kB/sec) | |
JournaledPersistentQueue 1-byte 20000 puts: 0.517249822617 (38666 ops/sec, 38.67 kB/sec) | |
JournaledPersistentQueue 1-byte 20000 gets: 1.02022385597 (19603 ops/sec, 19.60 kB/sec) | |
JournaledPersistentQueue 1k-bytes 20000 put/get: 2.29758691788 (8704 ops/sec, 8704.78 kB/sec) | |
JournaledPersistentQueue 1k-bytes 20000 puts: 0.684365987778 (29224 ops/sec, 29224.13 kB/sec) | |
JournaledPersistentQueue 1k-bytes 20000 gets: 1.10291314125 (18133 ops/sec, 18133.79 kB/sec) | |
JournaledPersistentQueue 10k-bytes 20000 put/get: 4.56774711609 (4378 ops/sec, 43785.26 kB/sec) | |
JournaledPersistentQueue 10k-bytes 20000 puts: 4.80307793617 (4163 ops/sec, 41639.97 kB/sec) | |
JournaledPersistentQueue 10k-bytes 20000 gets: 2.24041008949 (8926 ops/sec, 89269.37 kB/sec) | |
PersistentQueue 1-byte 20000 put/get: 2.01486301422 (9926 ops/sec, 9.93 kB/sec) | |
PersistentQueue 1-byte 20000 puts: 0.596590995789 (33523 ops/sec, 33.52 kB/sec) | |
PersistentQueue 1-byte 20000 gets: 1.09916114807 (18195 ops/sec, 18.20 kB/sec) | |
PersistentQueue 1k-bytes 20000 put/get: 2.52385520935 (7924 ops/sec, 7924.38 kB/sec) | |
PersistentQueue 1k-bytes 20000 puts: 0.840219020844 (23803 ops/sec, 23803.32 kB/sec) | |
PersistentQueue 1k-bytes 20000 gets: 1.62955617905 (12273 ops/sec, 12273.28 kB/sec) | |
PersistentQueue 10k-bytes 20000 put/get: 5.63676190376 (3548 ops/sec, 35481.36 kB/sec) | |
PersistentQueue 10k-bytes 20000 puts: 4.02938818932 (4963 ops/sec, 49635.33 kB/sec) | |
PersistentQueue 10k-bytes 20000 gets: 3.88762593269 (5144 ops/sec, 51445.28 kB/sec) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment