Skip to content

Instantly share code, notes, and snippets.

@argv0
Created April 6, 2010 18:37
Show Gist options
  • Save argv0/357928 to your computer and use it in GitHub Desktop.
Save argv0/357928 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
import threading
import uuid
import sys, os, getopt, time, random
sys.path.append("./lib")
from riak_pbc import RiakPbcClient
from riak import RiakClient as RiakHTTPClient
HTTP_PORT = 8098
PB_PORT = 8087
RAW_NAME = "riak"
BUCKET = "anonymous"
# change these to suit your cluster's layout
DB_HOSTS = ['192.168.1.2', '192.168.1.3', '192.168.1.4']
DB_HOSTS = ['127.0.0.1']
# this is a default that can be changed on the command line
TOTAL_REQS = 100000
THREADS_PER_HOST = 5
PROGRESS = 0
def generate_key():
return str(uuid.uuid1())
class TesterThread(threading.Thread):
def __init__(self, host, nreqs, mode):
threading.Thread.__init__(self)
self.host = host
self.nreqs = nreqs
if mode == "http":
print "Using Riak HTTP client against http://%s:%s/%s" % (host,HTTP_PORT)
self.client = RiakHTTPClient(host, HTTP_PORT, RAW_NAME)
else:
print "Using Riak Protocol Buffers client against %s:%s" % (host,PB_PORT)
self.client = RiakPbcClient(host, PB_PORT)
self.bucket = self.client.bucket(BUCKET)
def run(self):
global PROGRESS
for i in xrange(0, self.nreqs):
PROGRESS += 1
key = generate_key()
obj = {'ip' : '0.0.0.0', 'node' : 0}
obj = self.bucket.new(key, obj)
start_time = time.time()
obj.store()
# uncomment line below to see individual store times - printing each store
# time will have a negative impact on performance.
#print "store time (seconds):", time.time() - start_time
if __name__ == "__main__":
mode = "protobufs"
tot_reqs = TOTAL_REQS
print "Performing %s write requests" % tot_reqs
try:
if (len(sys.argv) > 1):
mode = sys.argv[1]
if (len(sys.argv) > 2):
tot_reqs = int(sys.argv[2])
except:
print "usage: python riak_tester.py [http|protobufs] [nrequests]"
print
sys.exit(1)
start_time = time.time()
threads = list()
reqs_per_thread = tot_reqs / (len(DB_HOSTS) * THREADS_PER_HOST)
print "running test of %s requests" % tot_reqs
for d in DB_HOSTS:
host_threads = []
for i in range(0,THREADS_PER_HOST):
host_thread = TesterThread(d, reqs_per_thread, mode)
host_threads.append(host_thread)
host_thread.start()
threads.extend(host_threads)
start_time = time.time()
while True:
try:
time.sleep(1)
elapsed_time = time.time() - start_time
ops_per_sec = PROGRESS / elapsed_time
print "Progress: %s requests (%s reqs/second)" % (PROGRESS,ops_per_sec)
[t.join(0) for t in threads]
if [t for t in threads if t.isAlive()]: continue
break
except:
print sys.exc_info()
sys.exit(1)
end_time = time.time()
total_time = end_time - start_time
print
print "total time", total_time
print "average req time (seconds)", (end_time - start_time) / float(tot_reqs)
print "writes/sec", tot_reqs / total_time
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment