Skip to content

Instantly share code, notes, and snippets.

@nickva
Created May 5, 2026 06:10
Show Gist options
  • Select an option

  • Save nickva/2a49f6e624208c45dc0dafdd935a4aae to your computer and use it in GitHub Desktop.

Select an option

Save nickva/2a49f6e624208c45dc0dafdd935a4aae to your computer and use it in GitHub Desktop.
Benchmark replication
#!/usr/bin/env python3
"""
replicator benchmark
# 1kb docs, single replication
rep_bench.py --ndocs 50000 --doc-size 1024
# faster polling, don't recreate the source
rep_bench.py --keep-source --poll-interval 1 \
--worker-batch-size 1000 --worker-processes 8 --http-connections 30
# source/target on differnet nodes
rep_bench.py --source-url http://localhost:15984 --target-url http://localhost:25984 --keep-source
"""
import argparse
import concurrent.futures
import statistics
import time
import uuid
import requests
DEFAULT_HTTP_TIMEOUT = 600
DEFAULT_POLL_INTERVAL = 2.0
AUTH = ('adm', 'pass')
SOURCE_URL = 'http://localhost:15984'
TARGET_URL = None # default is source-url
SOURCE_DB = 'rep_bench_source'
TARGET_DB_PREFIX = 'rep_bench_target'
Q = '2'
JITTER_MS = 0
REP_PARAMS = [
'worker_batch_size',
'worker_processes',
'http_connections',
'connection_timeout',
'checkpoint_interval',
]
def make_doc(counter, body_size):
doc = {'_id': f'{counter:010d}'}
pad = max(0, body_size - 30)
if pad:
doc['body'] = 'x' * pad
return doc
class Server:
def __init__(self, url, auth=AUTH, timeout=DEFAULT_HTTP_TIMEOUT):
self.sess = requests.Session()
self.sess.auth = auth
self.url = url.rstrip('/')
self.timeout = timeout
def _to(self, kw):
if 'timeout' not in kw:
kw['timeout'] = self.timeout
return kw
def get(self, path='', **kw):
r = self.sess.get(f'{self.url}/{path}', **self._to(kw))
r.raise_for_status()
return r.json()
def post(self, path, **kw):
r = self.sess.post(f'{self.url}/{path}', **self._to(kw))
r.raise_for_status()
return r.json()
def put(self, path, **kw):
r = self.sess.put(f'{self.url}/{path}', **self._to(kw))
r.raise_for_status()
return r.json()
def delete(self, path, **kw):
r = self.sess.delete(f'{self.url}/{path}', **self._to(kw))
r.raise_for_status()
return r.json()
def head(self, path, **kw):
r = self.sess.head(f'{self.url}/{path}', **self._to(kw))
return r.status_code
def __contains__(self, dbname):
code = self.head(dbname)
if code == 200: return True
if code == 404: return False
raise RuntimeError(f"unexpected HEAD status: {code}")
def __str__(self):
return f"<Server:{self.url}>"
def membership(self):
return self.get('_membership')
def create_db(self, dbname, **kw):
if dbname not in self:
self.put(dbname, params={'q': Q, **kw.get('params', {})})
def delete_db_if_exists(self, dbname):
if dbname in self:
self.delete(dbname)
def db_info(self, dbname):
return self.get(dbname)
def bulk_docs(self, dbname, docs):
return self.post(f'{dbname}/_bulk_docs?w=3', json={'docs': docs})
def config_set(self, section, key, val, node='_local'):
url = f'_node/{node}/_config/{section}/{key}'
headers = {'x-couch-persist': 'true'}
return self.put(url, data='"' + str(val) + '"', headers=headers)
def config_get(self, section, key, node='_local'):
url = f'_node/{node}/_config/{section}/{key}'
try:
return self.get(url)
except requests.HTTPError as e:
if e.response.status_code == 404:
return None
raise
def config_delete(self, section, key, node='_local'):
url = f'_node/{node}/_config/{section}/{key}'
headers = {'x-couch-persist': 'true'}
try:
return self.delete(url, headers=headers)
except requests.HTTPError as e:
if e.response.status_code == 404:
return None
raise
def ensure_replicator_db(self):
if '_replicator' not in self:
self.put('_replicator')
def replicator_put(self, doc_id, body):
return self.put(f'_replicator/{doc_id}', json=body)
def replicator_get(self, doc_id):
try:
return self.get(f'_replicator/{doc_id}')
except requests.HTTPError as e:
if e.response.status_code == 404:
return None
raise
def replicator_delete(self, doc_id):
meta = self.replicator_get(doc_id)
if meta and meta.get('_rev'):
return self.delete(f'_replicator/{doc_id}', params={'rev': meta['_rev']})
return None
def scheduler_doc(self, doc_id, db='_replicator'):
try:
return self.get(f'_scheduler/docs/{db}/{doc_id}')
except requests.HTTPError as e:
if e.response.status_code == 404:
return None
raise
def load_source(srv, db, ndocs, doc_size, batch, recreate=True):
if not recreate:
srv.create_db(db)
info = srv.db_info(db)
have = info['doc_count']
if have < ndocs:
print(f" WARN: keep-source, got {have} docs < {ndocs} requested, using existing {have} docs")
else:
print(f"source has {have} docs; skipping load")
return info
srv.delete_db_if_exists(db)
srv.create_db(db)
print(f" loading {ndocs} docs into {db} (doc_size={doc_size}, batch={batch})...")
t0 = time.monotonic()
for i in range(0, ndocs, batch):
end = min(i + batch, ndocs)
docs = [make_doc(j, doc_size) for j in range(i, end)]
srv.bulk_docs(db, docs)
# Wait for clustered doc_count to settle: bulk_docs returns once the primary
# shard copy has the docs, but doc_count aggregates across copies and can
# briefly read low until internal replication catches up.
deadline = time.monotonic() + 30
while True:
info = srv.db_info(db)
if info['doc_count'] >= ndocs or time.monotonic() > deadline:
break
time.sleep(0.5)
dt = time.monotonic() - t0
size_mb = info['sizes']['file'] / 1e6
print(f" loaded {info['doc_count']} docs in {dt:.1f}s, size {size_mb:.1f}MB")
return info
def build_repl_body(source_ref, target_ref, args):
body = {"source": source_ref, "target": target_ref}
for k in REP_PARAMS:
v = getattr(args, k, None)
if v is not None:
body[k] = v
return body
def replicate(coord_srv, doc_id, body, poll_interval):
rep_doc = dict(body)
rep_doc['_id'] = doc_id
t0 = time.monotonic()
coord_srv.replicator_put(doc_id, rep_doc)
try:
while True:
data = coord_srv.scheduler_doc(doc_id)
if data is None:
# not yet picked up
time.sleep(poll_interval)
continue
state = data.get('state')
if state == 'completed':
elapsed = time.monotonic() - t0
return elapsed, data.get('info') or {}
if state in ('failed', 'crashing'):
elapsed = time.monotonic() - t0
err = data.get('info') or data.get('error_count')
raise RuntimeError(
f"replication {doc_id} ended in state={state} after {elapsed:.1f}s: {err}"
)
time.sleep(poll_interval)
finally:
try:
coord_srv.replicator_delete(doc_id)
except requests.RequestException:
pass
def _with_creds(url):
if not AUTH or '://' not in url:
return url.rstrip('/')
scheme, rest = url.rstrip('/').split('://', 1)
if '@' in rest.split('/', 1)[0]:
return f"{scheme}://{rest}"
return f"{scheme}://{AUTH[0]}:{AUTH[1]}@{rest}"
class CoordinatorTuning:
def __init__(self, coord_srv, overrides):
self.srv = coord_srv
self.overrides = overrides # (section, key) -> str_value
self.saved = {}
def __enter__(self):
for (section, key), val in self.overrides.items():
try:
self.saved[(section, key)] = self.srv.config_get(section, key)
self.srv.config_set(section, key, val)
except requests.RequestException as e:
print(f" WARNING: could set {section}/{key}: {e}")
if self.overrides:
print(f" setting applied: {self.overrides}")
return self
def __exit__(self, exc_type, exc, tb):
for (section, key), prev in self.saved.items():
try:
if prev is None:
self.srv.config_delete(section, key)
else:
self.srv.config_set(section, key, prev)
except requests.RequestException as e:
print(f" WARNING: could not restore {section}/{key}: {e}")
def run_iteration(args, source_srv, target_srv, coord_srv, target_dbs, run_tag):
for tgt in target_dbs:
target_srv.delete_db_if_exists(tgt)
target_srv.create_db(tgt)
target_base = args.target_url or args.source_url
source_ref = f"{_with_creds(args.source_url)}/{args.source_db}"
target_refs = [f"{_with_creds(target_base)}/{tgt}" for tgt in target_dbs]
def one(idx):
body = build_repl_body(source_ref, target_refs[idx], args)
doc_id = f"rep_bench-{run_tag}-{idx:02d}"
elapsed, info = replicate(coord_srv, doc_id, body, args.poll_interval)
return idx, elapsed, info
t_start = time.monotonic()
timings = []
if args.n_jobs == 1:
timings.append(one(0))
else:
with concurrent.futures.ThreadPoolExecutor(max_workers=args.n_jobs) as ex:
futs = [ex.submit(one, i) for i in range(args.n_jobs)]
for fut in concurrent.futures.as_completed(futs):
timings.append(fut.result())
wall = time.monotonic() - t_start
timings.sort()
return wall, timings
def fmt_info(h):
missing = h.get('missing_revisions_found', h.get('missing_found', 0))
return (f"docs_read={h.get('docs_read', 0):>7} "
f"docs_written={h.get('docs_written', 0):>7} "
f"missing={missing:>7}")
def summary(label, values):
if not values:
return
n = len(values)
mn, mx = min(values), max(values)
med = statistics.median(values)
mean = statistics.fmean(values)
sd = statistics.stdev(values) if n > 1 else 0.0
print(f" {label:<20s} n={n:<3d} min={mn:6.2f}s median={med:6.2f}s "
f"mean={mean:6.2f}s max={mx:6.2f}s stdev={sd:5.2f}s")
def main(args):
source_srv = Server(args.source_url, timeout=args.http_timeout)
target_srv = Server(args.target_url or args.source_url, timeout=args.http_timeout)
coord_url = args.coordinator_url or args.source_url
coord_srv = Server(coord_url, timeout=args.http_timeout)
set_params = {k: getattr(args, k) for k in REP_PARAMS if getattr(args, k) is not None}
print(f"source: {args.source_url}/{args.source_db}")
print(f"target: {args.target_url or args.source_url} (db prefix: {args.target_db_prefix})")
print(f"coordinator: {coord_url} poll: {args.poll_interval}s http_timeout: {args.http_timeout}s")
print(f"docs: {args.ndocs} doc_size: {args.doc_size} n_jobs: {args.n_jobs}")
if set_params:
print(f"replication params: {set_params}")
print("== source ==")
load_source(source_srv, args.source_db, args.ndocs, args.doc_size, args.batch,
recreate=not args.keep_source)
coord_srv.ensure_replicator_db()
target_dbs = [f"{args.target_db_prefix}_{i:02d}" for i in range(args.n_jobs)]
overrides = {
("replicator", "startup_jitter"): str(JITTER_MS),
("replicator", "interval"): str(args.scheduler_interval_ms),
}
for kv in args.coord_config or []:
if '=' not in kv:
raise SystemExit(f"--coord-config expects section.key=value, got {kv!r}")
sk, val = kv.split('=', 1)
if '.' not in sk:
raise SystemExit(f"--coord-config key needs section.key, got {sk!r}")
sec, key = sk.split('.', 1)
overrides[(sec, key)] = val
with CoordinatorTuning(coord_srv, overrides):
print("== bench ==")
run_tag = f"{int(time.time())}-{uuid.uuid4().hex[:6]}"
wall, timings = run_iteration(args, source_srv, target_srv, coord_srv,
target_dbs, run_tag)
per_job = [t for _, t, _ in timings]
print(f"\nwall={wall:.2f}s")
for idx, elapsed, hist in timings:
print(f" job {idx:02d} elapsed={elapsed:6.2f}s {fmt_info(hist)}")
if args.n_jobs > 1:
print("\n== summary ==")
summary("per-job elapsed", per_job)
med = statistics.median(per_job)
if med:
print(f" per-job docs/s (median): {args.ndocs / med:,.0f}")
if __name__ == '__main__':
p = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
p.add_argument('--source-url', default=SOURCE_URL,
help='source URL (default: %(default)s)')
p.add_argument('--target-url', default=TARGET_URL,
help='target URL; defaults to source-url (same cluster)')
p.add_argument('--coordinator-url', default=None,
help='coordinator URL default=source-url')
p.add_argument('--source-db', default=SOURCE_DB,
help='source DB (default: %(default)s)')
p.add_argument('--target-db-prefix', default=TARGET_DB_PREFIX,
help='target DB prefix, jobs use prefix_NN (default: %(default)s)')
p.add_argument('--ndocs', type=int, default=50000, help='docs count')
p.add_argument('--doc-size', type=int, default=1024,
help='approx doc size in bytes (default: %(default)s)')
p.add_argument('--batch', type=int, default=500, help='load bulk_docs batch size')
p.add_argument('--n-jobs', type=int, default=1,
help='concurrent replications jobs (default: %(default)s)')
p.add_argument('--keep-source', action='store_true',
help='skip source load if DB already has >= ndocs')
p.add_argument('--http-timeout', type=float, default=DEFAULT_HTTP_TIMEOUT,
help='client HTTP timeout (sec) (default: %(default)s)')
p.add_argument('--poll-interval', type=float, default=DEFAULT_POLL_INTERVAL,
help='poll period for _scheduler/docs in seconds (default: %(default)s)')
p.add_argument('--scheduler-interval-ms', type=int, default=1000,
help='replicator.interval override on coordinator; lower = faster '
'pickup (default: %(default)s)')
p.add_argument('--coord-config', action='append', default=None, metavar='SECTION.KEY=VAL',
help='extra coord config overrides, restored on exit; '
'repeat if needed ... --coord-config replicator.max_jobs=2000')
p.add_argument('--worker-batch-size', type=int, default=None, dest='worker_batch_size')
p.add_argument('--worker-processes', type=int, default=None, dest='worker_processes')
p.add_argument('--http-connections', type=int, default=None, dest='http_connections')
p.add_argument('--connection-timeout', type=int, default=None, dest='connection_timeout', help='timeout in ms')
p.add_argument('--checkpoint-interval', type=int, default=None, dest='checkpoint_interval', help='replicator checkpoint interval in ms')
main(p.parse_args())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment