Skip to content

Instantly share code, notes, and snippets.

@nickva
Created July 20, 2017 18:44
Show Gist options
  • Save nickva/185fb975bcfd5d943491defc2cee35aa to your computer and use it in GitHub Desktop.
Save nickva/185fb975bcfd5d943491defc2cee35aa to your computer and use it in GitHub Desktop.
Scripts which makes a lot of parallel requests to views in a single db. It is used to stress ddoc_cache component in CouchDB
#!/usr/bin/env python
#
# To install:
# $ virtualenv ./venv && . ./venv/bin/activate && pip install CouchDB
#
# To run:
# $ /stampede_ddocs.py -p 64 -w 10 -t 1000 -u https://dbhost
#
# Where:
# -p 64 : Start 64 OS processes
# -w 10 : Each with 10 thread workers
# -t 1000 : Each worker running the "execute_fun" 1000 times in a row
import copy
import sys
import time
import threading
import os
import argparse
import uuid
import traceback
import random
from multiprocessing.dummy import Pool as ThreadPool
from multiprocessing import Pool
import couchdb
DB_URLS = [
'http://adm:pass@localhost:15984',
'http://adm:pass@localhost:25984',
'http://adm:pass@localhost:35984'
]
DB_NAME = 'db'
def log(*args):
msg = " ".join(["%s" % a for a in args])
sys.stderr.write(msg + '\n')
sys.stderr.flush()
def pick_server(urls):
if isinstance(urls, list):
return random.choice(urls)
return urls
def get_design_doc(docid):
return {
"_id": docid,
"views": {
"v1": {
"map": 'function(d){emit(1,1);}',
}
},
}
def wait(args):
sleep = 0
if args.random_wait > 0.0:
sleep = random.randrange(args.random_wait * 1000) / 1000.0
elif args.fixed_wait > 0.0:
sleep = args.fixed_wait
if sleep > 0:
time.sleep(sleep)
def execute_fun(args, pid, tid, db, i):
dname = '%s_%s_%s' % (pid, tid, i)
_id = '_design/' + dname
doc = get_design_doc(_id)
db.save(doc)
list(db.view(dname+'/v1', limit=1))
wait(args)
db.delete(doc)
if i % 10 == 0:
log(" *** process:", pid, "tid", tid, "i:", i)
def thread_worker(args):
tid = args.tid
url = pick_server(args.urls)
srv = couchdb.Server(url)
dbname = args.dbname
tries = args.tries
db = srv[dbname]
pid = os.getpid()
for i in xrange(tries):
try:
execute_fun(args, pid, tid, db, i)
except Exception as e:
log(" >>> Worker exception caught", e)
traceback.print_exc(file=sys.stderr)
raise
return tid
def set_worker_id(args, tid):
args = copy.deepcopy(args)
args.tid = tid
return args
def process_worker(args):
wcount = args.worker_count
pool = ThreadPool(wcount)
worker_args = [set_worker_id(args, i) for i in xrange(wcount)]
res = pool.map(thread_worker, worker_args)
def main(args):
if args.urls == []:
args.urls = DB_URLS
url = pick_server(args.urls)
srv = couchdb.Server(url)
if args.dbname in srv:
srv.delete(args.dbname)
if args.dbname not in srv:
srv.create(args.dbname)
pool = Pool(processes=args.processes)
pool_args = [args for pnum in xrange(args.processes)]
pool.map(process_worker, pool_args)
def _args():
description = "Do a few crud operations as a stampede"
p = argparse.ArgumentParser(description = description)
p.add_argument('-u', '--urls', action="append", default=[], help = "Server URL(s)")
p.add_argument('-d', '--dbname', default=DB_NAME, help = "DB name")
p.add_argument('-w', '--worker-count', type=int, default=1)
p.add_argument('-t', '--tries', type=int, default=1)
p.add_argument('-r', '--random-wait', type=float, default=0)
p.add_argument('-f', '--fixed-wait', type=float, default=0)
p.add_argument('-p', '--processes', type=int, default=1)
return p.parse_args()
if __name__=='__main__':
args = _args()
main(_args())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment