Last active
August 18, 2016 13:25
-
-
Save jimfulton/2ae9ec911059888e4d61103baf7e7aa2 to your computer and use it in GitHub Desktop.
A zodb-shootout-like script for measuring ZEO server performance.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import logging | |
import multiprocessing | |
import os | |
import persistent | |
import re | |
import time | |
import ZODB | |
from BTrees.IOBTree import BTree | |
import ZEO | |
#logging.basicConfig(level='DEBUG') | |
class P(persistent.Persistent): | |
children = () | |
parser = argparse.ArgumentParser("zodbshootout-inspired performance exercise") | |
parser.add_argument('--concurrency', '-c', type=int, default=1) | |
parser.add_argument('--object-size', '-o', type=int, default=999) | |
parser.add_argument('--transaction-size', '-t', type=int, default=3) | |
parser.add_argument('--repetitions', '-r', type=int, default=1000) | |
parser.add_argument('--demo-storage-baseline', '-d', action='store_true') | |
parser.add_argument('--address', '-a') | |
parser.add_argument('--ssl', action='store_true') | |
parser.add_argument('--server-sync', action='store_true') | |
def shoot(): | |
"""zodbshootout-inspired performance exercise. | |
""" | |
options = parser.parse_args() | |
concurrency = options.concurrency | |
object_size = options.object_size | |
transaction_size = options.transaction_size | |
repetitions = options.repetitions | |
if options.ssl: | |
from ZEO.tests.testssl import server_config, client_ssl | |
else: | |
server_config = None | |
client_ssl = lambda : None | |
if options.demo_storage_baseline: | |
db_factory = None | |
headings = ('add', 'update', 'read') | |
stop = lambda : None | |
else: | |
if options.address: | |
addr = options.address | |
if ':' in addr: | |
host, port = addr.split(':') | |
addr = host, int(port) | |
else: | |
addr = '127.0.0.1', int(addr) | |
stop = lambda : None | |
else: | |
if os.path.exists('perf.fs'): | |
os.remove('perf.fs') | |
try: | |
addr, stop = ZEO.server( | |
threaded=False, path='perf.fs', zeo_conf=server_config) | |
except TypeError: | |
# ZEO 4 | |
addr, stop = ZEO.server() | |
db_factory = lambda : ZEO.DB( | |
addr, | |
ssl=client_ssl(), | |
wait_timeout=9999, | |
server_sync=options.server_sync) | |
# Initialize database | |
db = db_factory() | |
with db.transaction() as conn: | |
conn.root.speedtest = speedtest = BTree() | |
for ic in range(concurrency): | |
speedtest[ic] = data = BTree() | |
for ir in range(repetitions): | |
data[ir] = P() | |
db.pack() | |
db.close() | |
headings = ('add', 'update', 'cached', 'read', 'prefetch') | |
print('Times per operation in microseconds (o=%s, t=%s, c=%s)' % ( | |
object_size, transaction_size, concurrency)) | |
print(' %12s' * 5 % ('op', 'min', 'mean', 'median', 'max')) | |
queues = [(multiprocessing.Queue(), multiprocessing.Queue()) | |
for ip in range(concurrency)] | |
if concurrency > 1: | |
processes = [ | |
multiprocessing.Process( | |
target=run_test, | |
args=(db_factory, ip, queues[ip][0], queues[ip][1], | |
object_size, transaction_size, repetitions), | |
) | |
for ip in range(concurrency) | |
] | |
for p in processes: | |
p.daemon = True | |
p.start() | |
for iqueue, oqueue in queues: | |
oqueue.get(timeout=9) # ready? | |
for name in headings: | |
for iqueue, oqueue in queues: | |
iqueue.put(None) | |
summarize(name, | |
[oqueue.get(timeout=999) / repetitions | |
for _, oqueue in queues]) | |
for p in processes: | |
p.join(1) | |
else: | |
[(iqueue, oqueue)] = queues | |
for name in headings: | |
iqueue.put(None) | |
run_test(db_factory, 0, iqueue, oqueue, | |
object_size, transaction_size, repetitions) | |
oqueue.get(timeout=9) # ready? | |
for name in headings: | |
summarize(name, [oqueue.get(timeout=999) / repetitions]) | |
stop() | |
def summarize(name, results): | |
results = [int(t*1000000) for t in results] | |
l = len(results) | |
if l > 1: | |
print(' %12s' * 5 % ( | |
name, min(results), round(sum(results)/l), results[l//2], | |
max(results) | |
)) | |
else: | |
print(' %12s' * 2 % (name, results[0])) | |
def run_test(db_factory, process_index, iqueue, oqueue, | |
object_size, transaction_size, repetitions): | |
if db_factory is None: | |
# No factory. Compute baseline numbers using DemoStorage | |
db = ZODB.DB(None) | |
with db.transaction() as conn: | |
conn.root.speedtest = speedtest = BTree() | |
speedtest[process_index] = data = BTree() | |
for ir in range(repetitions): | |
data[ir] = P() | |
else: | |
db = db_factory() | |
conn = db.open() | |
data = conn.root.speedtest[process_index] | |
# add test | |
slots = list(data.values()) # Get data slots loaded | |
oqueue.put('ready') | |
iqueue.get() | |
start = time.time() | |
for slot in slots: | |
conn.transaction_manager.begin() | |
for it in range(transaction_size): | |
p = P() | |
p.data = 'x' * object_size | |
slot.children += (p, ) | |
conn.transaction_manager.commit() | |
conn.transaction_manager.commit() | |
oqueue.put(time.time() - start) | |
iqueue.get() | |
# update test | |
start = time.time() | |
for slot in slots: | |
conn.transaction_manager.begin() | |
for it in range(transaction_size): | |
slot.children[it].data = 'y' * object_size | |
conn.transaction_manager.commit() | |
oqueue.put(time.time() - start) | |
iqueue.get() | |
# hot, With db cache, but not object cache | |
conn.cacheMinimize() | |
for slot in slots: | |
_ = slot.children | |
start = time.time() | |
for slot in slots: | |
conn.transaction_manager.begin() | |
for it in range(transaction_size): | |
assert slot.children[it].data == 'y' * object_size | |
conn.transaction_manager.commit() | |
oqueue.put(time.time() - start) | |
if db_factory is None: | |
return | |
storage = db.storage | |
# cold, With no cache | |
storage._cache.clear() | |
conn.cacheMinimize() | |
for slot in slots: | |
_ = slot.children | |
iqueue.get() | |
start = time.time() | |
for slot in slots: | |
conn.transaction_manager.begin() | |
for it in range(transaction_size): | |
assert slot.children[it].data == 'y' * object_size | |
conn.transaction_manager.commit() | |
cold_time = time.time() - start | |
oqueue.put(time.time() - start) | |
# cold, With no cache, but with prefetch | |
oids = [] | |
for slot in slots: | |
soids = [] | |
oids.append(soids) | |
for it in range(transaction_size): | |
soids.append(slot.children[it]._p_oid) | |
storage._cache.clear() | |
conn.cacheMinimize() | |
for slot in slots: | |
_ = slot.children | |
iqueue.get() | |
start = time.time() | |
for slot, soids in zip(slots, oids): | |
conn.transaction_manager.begin() | |
storage.prefetch(soids, conn._storage._start) | |
for it in range(transaction_size): | |
assert slot.children[it].data == 'y' * object_size | |
conn.transaction_manager.commit() | |
cold_time = time.time() - start | |
oqueue.put(time.time() - start) | |
conn.close() | |
db.close() | |
if __name__ == '__main__': | |
shoot() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment