Created
May 13, 2016 16:35
-
-
Save dlecocq/857dfe77e9bb884b2a926a80a15e18db to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
from gevent import monkey | |
monkey.patch_all() | |
import time | |
import ujson as json | |
from functools import wraps | |
import gevent | |
import gevent.queue | |
from drop import BdbQueue, FlatQueue | |
def timer(func): | |
@wraps(func) | |
def decorated(*args, **kwargs): | |
start = -time.time() | |
result = func(*args, **kwargs) | |
start += time.time() | |
print '%s(*, **%s) => %10.5f' % (func.__name__, kwargs, start) | |
return result | |
return decorated | |
data = { | |
'url': 'https://moz.com/', | |
'referrer': 'http://moz.com/' | |
} | |
@timer | |
def time_puts(queue, count): | |
for i in xrange(count): | |
queue.put(data) | |
@timer | |
def time_pops(queue, count): | |
for i in xrange(count): | |
queue.get() | |
queue = gevent.queue.Queue() | |
for count in [10, 100, 1000, 10000, 100000]: | |
time_puts(queue, count=count) | |
time_pops(queue, count=count) | |
queue = BdbQueue('/tmp/queue/bar') | |
for count in [10, 100, 1000, 10000]: | |
time_puts(queue, count=count) | |
time_pops(queue, count=count) | |
queue = FlatQueue('/tmp/queue/foo') | |
for count in [10, 100, 1000, 10000]: | |
time_puts(queue, count=count) | |
time_pops(queue, count=count) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import struct | |
import Queue | |
from bsddb3 import DB | |
import gevent.event | |
import simplejson as json | |
class BdbQueue(object): | |
def __init__(self, path): | |
self._db = DB.DB() | |
self._db.open(path, dbtype=DB.DB_BTREE, flags=DB.DB_CREATE) | |
# Syncronization that allows us to blockingly wait for results | |
self._event = gevent.event.Event() | |
# Properly initialize the number of entries | |
self._length = self._db.stat()['ndata'] | |
# Get the value of the last key as our starting point | |
last = self._db.cursor().last() | |
if last: | |
last = self.unpack(last[0]) + 1 | |
else: | |
last = 0 | |
self._keys = iter(self.keys_generator(last)) | |
def pack(self, count): | |
return struct.pack('!Q', count) | |
def unpack(self, packed): | |
return struct.unpack('!Q', packed)[0] | |
def keys_generator(self, count=0): | |
while True: | |
yield self.pack(count) | |
count += 1 | |
def sync(self): | |
self._db.sync() | |
def put(self, item): | |
self._db.put(next(self._keys), json.dumps(item)) | |
self.sync() | |
self._length += 1 | |
self._event.set() | |
def get(self): | |
'''Pop off a result.''' | |
try: | |
while True: | |
while not self._length: | |
self._event.wait() | |
self._event.clear() | |
cursor = self._db.cursor() | |
result = cursor.first() | |
if result: | |
key, value = result | |
result = json.loads(value) | |
cursor.delete() | |
self.sync() | |
self._length -= 1 | |
return result | |
except: | |
raise Queue.Empty() | |
def __iter__(self): | |
while True: | |
try: | |
yield self.get() | |
except Queue.Empty: | |
break | |
class FlatQueue(object): | |
def __init__(self, path): | |
self._path = path | |
self._fout = open(self._path, 'w+') | |
self._fin = open(self._path) | |
self._length = 0 | |
self._event = gevent.event.Event() | |
def close(self): | |
# TODO(dan): robust | |
self._fout.close() | |
self._fin.close() | |
def put(self, item): | |
self._fout.write(json.dumps(item)) | |
self._fout.write('\n') | |
self._fout.flush() | |
self._length += 1 | |
self._event.set() | |
def get(self): | |
try: | |
while True: | |
while not self._length: | |
self._event.wait() | |
self._event.clear() | |
line = self._fin.readline() | |
if line: | |
result = json.loads(line) | |
self._length -= 1 | |
return result | |
except: | |
raise Queue.Empty() | |
def __iter__(self): | |
while True: | |
try: | |
yield self.get() | |
except Queue.Empty: | |
break |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Gevent Queue | |
============ | |
time_puts(*, **{'count': 10}) => 0.00002 | |
time_pops(*, **{'count': 10}) => 0.00001 | |
time_puts(*, **{'count': 100}) => 0.00005 | |
time_pops(*, **{'count': 100}) => 0.00009 | |
time_puts(*, **{'count': 1000}) => 0.00044 | |
time_pops(*, **{'count': 1000}) => 0.00053 | |
time_puts(*, **{'count': 10000}) => 0.00437 | |
time_pops(*, **{'count': 10000}) => 0.00511 | |
time_puts(*, **{'count': 100000}) => 0.04923 | |
time_pops(*, **{'count': 100000}) => 0.06889 | |
Bdb Queue | |
========= | |
time_puts(*, **{'count': 10}) => 0.00257 | |
time_pops(*, **{'count': 10}) => 0.00217 | |
time_puts(*, **{'count': 100}) => 0.01238 | |
time_pops(*, **{'count': 100}) => 0.01311 | |
time_puts(*, **{'count': 1000}) => 0.17238 | |
time_pops(*, **{'count': 1000}) => 0.14911 | |
time_puts(*, **{'count': 10000}) => 1.28870 | |
time_pops(*, **{'count': 10000}) => 1.23262 | |
Flat Queue | |
========== | |
time_puts(*, **{'count': 10}) => 0.00016 | |
time_pops(*, **{'count': 10}) => 0.00007 | |
time_puts(*, **{'count': 100}) => 0.00087 | |
time_pops(*, **{'count': 100}) => 0.00031 | |
time_puts(*, **{'count': 1000}) => 0.00725 | |
time_pops(*, **{'count': 1000}) => 0.00368 | |
time_puts(*, **{'count': 10000}) => 0.08436 | |
time_pops(*, **{'count': 10000}) => 0.03117 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment