Created
May 5, 2026 06:10
-
-
Save nickva/2a49f6e624208c45dc0dafdd935a4aae to your computer and use it in GitHub Desktop.
Benchmark replication
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| replicator benchmark | |
| # 1kb docs, single replication | |
| rep_bench.py --ndocs 50000 --doc-size 1024 | |
| # faster polling, don't recreate the source | |
| rep_bench.py --keep-source --poll-interval 1 \ | |
| --worker-batch-size 1000 --worker-processes 8 --http-connections 30 | |
| # source/target on differnet nodes | |
| rep_bench.py --source-url http://localhost:15984 --target-url http://localhost:25984 --keep-source | |
| """ | |
| import argparse | |
| import concurrent.futures | |
| import statistics | |
| import time | |
| import uuid | |
| import requests | |
| DEFAULT_HTTP_TIMEOUT = 600 | |
| DEFAULT_POLL_INTERVAL = 2.0 | |
| AUTH = ('adm', 'pass') | |
| SOURCE_URL = 'http://localhost:15984' | |
| TARGET_URL = None # default is source-url | |
| SOURCE_DB = 'rep_bench_source' | |
| TARGET_DB_PREFIX = 'rep_bench_target' | |
| Q = '2' | |
| JITTER_MS = 0 | |
| REP_PARAMS = [ | |
| 'worker_batch_size', | |
| 'worker_processes', | |
| 'http_connections', | |
| 'connection_timeout', | |
| 'checkpoint_interval', | |
| ] | |
| def make_doc(counter, body_size): | |
| doc = {'_id': f'{counter:010d}'} | |
| pad = max(0, body_size - 30) | |
| if pad: | |
| doc['body'] = 'x' * pad | |
| return doc | |
| class Server: | |
| def __init__(self, url, auth=AUTH, timeout=DEFAULT_HTTP_TIMEOUT): | |
| self.sess = requests.Session() | |
| self.sess.auth = auth | |
| self.url = url.rstrip('/') | |
| self.timeout = timeout | |
| def _to(self, kw): | |
| if 'timeout' not in kw: | |
| kw['timeout'] = self.timeout | |
| return kw | |
| def get(self, path='', **kw): | |
| r = self.sess.get(f'{self.url}/{path}', **self._to(kw)) | |
| r.raise_for_status() | |
| return r.json() | |
| def post(self, path, **kw): | |
| r = self.sess.post(f'{self.url}/{path}', **self._to(kw)) | |
| r.raise_for_status() | |
| return r.json() | |
| def put(self, path, **kw): | |
| r = self.sess.put(f'{self.url}/{path}', **self._to(kw)) | |
| r.raise_for_status() | |
| return r.json() | |
| def delete(self, path, **kw): | |
| r = self.sess.delete(f'{self.url}/{path}', **self._to(kw)) | |
| r.raise_for_status() | |
| return r.json() | |
| def head(self, path, **kw): | |
| r = self.sess.head(f'{self.url}/{path}', **self._to(kw)) | |
| return r.status_code | |
| def __contains__(self, dbname): | |
| code = self.head(dbname) | |
| if code == 200: return True | |
| if code == 404: return False | |
| raise RuntimeError(f"unexpected HEAD status: {code}") | |
| def __str__(self): | |
| return f"<Server:{self.url}>" | |
| def membership(self): | |
| return self.get('_membership') | |
| def create_db(self, dbname, **kw): | |
| if dbname not in self: | |
| self.put(dbname, params={'q': Q, **kw.get('params', {})}) | |
| def delete_db_if_exists(self, dbname): | |
| if dbname in self: | |
| self.delete(dbname) | |
| def db_info(self, dbname): | |
| return self.get(dbname) | |
| def bulk_docs(self, dbname, docs): | |
| return self.post(f'{dbname}/_bulk_docs?w=3', json={'docs': docs}) | |
| def config_set(self, section, key, val, node='_local'): | |
| url = f'_node/{node}/_config/{section}/{key}' | |
| headers = {'x-couch-persist': 'true'} | |
| return self.put(url, data='"' + str(val) + '"', headers=headers) | |
| def config_get(self, section, key, node='_local'): | |
| url = f'_node/{node}/_config/{section}/{key}' | |
| try: | |
| return self.get(url) | |
| except requests.HTTPError as e: | |
| if e.response.status_code == 404: | |
| return None | |
| raise | |
| def config_delete(self, section, key, node='_local'): | |
| url = f'_node/{node}/_config/{section}/{key}' | |
| headers = {'x-couch-persist': 'true'} | |
| try: | |
| return self.delete(url, headers=headers) | |
| except requests.HTTPError as e: | |
| if e.response.status_code == 404: | |
| return None | |
| raise | |
| def ensure_replicator_db(self): | |
| if '_replicator' not in self: | |
| self.put('_replicator') | |
| def replicator_put(self, doc_id, body): | |
| return self.put(f'_replicator/{doc_id}', json=body) | |
| def replicator_get(self, doc_id): | |
| try: | |
| return self.get(f'_replicator/{doc_id}') | |
| except requests.HTTPError as e: | |
| if e.response.status_code == 404: | |
| return None | |
| raise | |
| def replicator_delete(self, doc_id): | |
| meta = self.replicator_get(doc_id) | |
| if meta and meta.get('_rev'): | |
| return self.delete(f'_replicator/{doc_id}', params={'rev': meta['_rev']}) | |
| return None | |
| def scheduler_doc(self, doc_id, db='_replicator'): | |
| try: | |
| return self.get(f'_scheduler/docs/{db}/{doc_id}') | |
| except requests.HTTPError as e: | |
| if e.response.status_code == 404: | |
| return None | |
| raise | |
| def load_source(srv, db, ndocs, doc_size, batch, recreate=True): | |
| if not recreate: | |
| srv.create_db(db) | |
| info = srv.db_info(db) | |
| have = info['doc_count'] | |
| if have < ndocs: | |
| print(f" WARN: keep-source, got {have} docs < {ndocs} requested, using existing {have} docs") | |
| else: | |
| print(f"source has {have} docs; skipping load") | |
| return info | |
| srv.delete_db_if_exists(db) | |
| srv.create_db(db) | |
| print(f" loading {ndocs} docs into {db} (doc_size={doc_size}, batch={batch})...") | |
| t0 = time.monotonic() | |
| for i in range(0, ndocs, batch): | |
| end = min(i + batch, ndocs) | |
| docs = [make_doc(j, doc_size) for j in range(i, end)] | |
| srv.bulk_docs(db, docs) | |
| # Wait for clustered doc_count to settle: bulk_docs returns once the primary | |
| # shard copy has the docs, but doc_count aggregates across copies and can | |
| # briefly read low until internal replication catches up. | |
| deadline = time.monotonic() + 30 | |
| while True: | |
| info = srv.db_info(db) | |
| if info['doc_count'] >= ndocs or time.monotonic() > deadline: | |
| break | |
| time.sleep(0.5) | |
| dt = time.monotonic() - t0 | |
| size_mb = info['sizes']['file'] / 1e6 | |
| print(f" loaded {info['doc_count']} docs in {dt:.1f}s, size {size_mb:.1f}MB") | |
| return info | |
| def build_repl_body(source_ref, target_ref, args): | |
| body = {"source": source_ref, "target": target_ref} | |
| for k in REP_PARAMS: | |
| v = getattr(args, k, None) | |
| if v is not None: | |
| body[k] = v | |
| return body | |
| def replicate(coord_srv, doc_id, body, poll_interval): | |
| rep_doc = dict(body) | |
| rep_doc['_id'] = doc_id | |
| t0 = time.monotonic() | |
| coord_srv.replicator_put(doc_id, rep_doc) | |
| try: | |
| while True: | |
| data = coord_srv.scheduler_doc(doc_id) | |
| if data is None: | |
| # not yet picked up | |
| time.sleep(poll_interval) | |
| continue | |
| state = data.get('state') | |
| if state == 'completed': | |
| elapsed = time.monotonic() - t0 | |
| return elapsed, data.get('info') or {} | |
| if state in ('failed', 'crashing'): | |
| elapsed = time.monotonic() - t0 | |
| err = data.get('info') or data.get('error_count') | |
| raise RuntimeError( | |
| f"replication {doc_id} ended in state={state} after {elapsed:.1f}s: {err}" | |
| ) | |
| time.sleep(poll_interval) | |
| finally: | |
| try: | |
| coord_srv.replicator_delete(doc_id) | |
| except requests.RequestException: | |
| pass | |
| def _with_creds(url): | |
| if not AUTH or '://' not in url: | |
| return url.rstrip('/') | |
| scheme, rest = url.rstrip('/').split('://', 1) | |
| if '@' in rest.split('/', 1)[0]: | |
| return f"{scheme}://{rest}" | |
| return f"{scheme}://{AUTH[0]}:{AUTH[1]}@{rest}" | |
| class CoordinatorTuning: | |
| def __init__(self, coord_srv, overrides): | |
| self.srv = coord_srv | |
| self.overrides = overrides # (section, key) -> str_value | |
| self.saved = {} | |
| def __enter__(self): | |
| for (section, key), val in self.overrides.items(): | |
| try: | |
| self.saved[(section, key)] = self.srv.config_get(section, key) | |
| self.srv.config_set(section, key, val) | |
| except requests.RequestException as e: | |
| print(f" WARNING: could set {section}/{key}: {e}") | |
| if self.overrides: | |
| print(f" setting applied: {self.overrides}") | |
| return self | |
| def __exit__(self, exc_type, exc, tb): | |
| for (section, key), prev in self.saved.items(): | |
| try: | |
| if prev is None: | |
| self.srv.config_delete(section, key) | |
| else: | |
| self.srv.config_set(section, key, prev) | |
| except requests.RequestException as e: | |
| print(f" WARNING: could not restore {section}/{key}: {e}") | |
| def run_iteration(args, source_srv, target_srv, coord_srv, target_dbs, run_tag): | |
| for tgt in target_dbs: | |
| target_srv.delete_db_if_exists(tgt) | |
| target_srv.create_db(tgt) | |
| target_base = args.target_url or args.source_url | |
| source_ref = f"{_with_creds(args.source_url)}/{args.source_db}" | |
| target_refs = [f"{_with_creds(target_base)}/{tgt}" for tgt in target_dbs] | |
| def one(idx): | |
| body = build_repl_body(source_ref, target_refs[idx], args) | |
| doc_id = f"rep_bench-{run_tag}-{idx:02d}" | |
| elapsed, info = replicate(coord_srv, doc_id, body, args.poll_interval) | |
| return idx, elapsed, info | |
| t_start = time.monotonic() | |
| timings = [] | |
| if args.n_jobs == 1: | |
| timings.append(one(0)) | |
| else: | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=args.n_jobs) as ex: | |
| futs = [ex.submit(one, i) for i in range(args.n_jobs)] | |
| for fut in concurrent.futures.as_completed(futs): | |
| timings.append(fut.result()) | |
| wall = time.monotonic() - t_start | |
| timings.sort() | |
| return wall, timings | |
| def fmt_info(h): | |
| missing = h.get('missing_revisions_found', h.get('missing_found', 0)) | |
| return (f"docs_read={h.get('docs_read', 0):>7} " | |
| f"docs_written={h.get('docs_written', 0):>7} " | |
| f"missing={missing:>7}") | |
| def summary(label, values): | |
| if not values: | |
| return | |
| n = len(values) | |
| mn, mx = min(values), max(values) | |
| med = statistics.median(values) | |
| mean = statistics.fmean(values) | |
| sd = statistics.stdev(values) if n > 1 else 0.0 | |
| print(f" {label:<20s} n={n:<3d} min={mn:6.2f}s median={med:6.2f}s " | |
| f"mean={mean:6.2f}s max={mx:6.2f}s stdev={sd:5.2f}s") | |
| def main(args): | |
| source_srv = Server(args.source_url, timeout=args.http_timeout) | |
| target_srv = Server(args.target_url or args.source_url, timeout=args.http_timeout) | |
| coord_url = args.coordinator_url or args.source_url | |
| coord_srv = Server(coord_url, timeout=args.http_timeout) | |
| set_params = {k: getattr(args, k) for k in REP_PARAMS if getattr(args, k) is not None} | |
| print(f"source: {args.source_url}/{args.source_db}") | |
| print(f"target: {args.target_url or args.source_url} (db prefix: {args.target_db_prefix})") | |
| print(f"coordinator: {coord_url} poll: {args.poll_interval}s http_timeout: {args.http_timeout}s") | |
| print(f"docs: {args.ndocs} doc_size: {args.doc_size} n_jobs: {args.n_jobs}") | |
| if set_params: | |
| print(f"replication params: {set_params}") | |
| print("== source ==") | |
| load_source(source_srv, args.source_db, args.ndocs, args.doc_size, args.batch, | |
| recreate=not args.keep_source) | |
| coord_srv.ensure_replicator_db() | |
| target_dbs = [f"{args.target_db_prefix}_{i:02d}" for i in range(args.n_jobs)] | |
| overrides = { | |
| ("replicator", "startup_jitter"): str(JITTER_MS), | |
| ("replicator", "interval"): str(args.scheduler_interval_ms), | |
| } | |
| for kv in args.coord_config or []: | |
| if '=' not in kv: | |
| raise SystemExit(f"--coord-config expects section.key=value, got {kv!r}") | |
| sk, val = kv.split('=', 1) | |
| if '.' not in sk: | |
| raise SystemExit(f"--coord-config key needs section.key, got {sk!r}") | |
| sec, key = sk.split('.', 1) | |
| overrides[(sec, key)] = val | |
| with CoordinatorTuning(coord_srv, overrides): | |
| print("== bench ==") | |
| run_tag = f"{int(time.time())}-{uuid.uuid4().hex[:6]}" | |
| wall, timings = run_iteration(args, source_srv, target_srv, coord_srv, | |
| target_dbs, run_tag) | |
| per_job = [t for _, t, _ in timings] | |
| print(f"\nwall={wall:.2f}s") | |
| for idx, elapsed, hist in timings: | |
| print(f" job {idx:02d} elapsed={elapsed:6.2f}s {fmt_info(hist)}") | |
| if args.n_jobs > 1: | |
| print("\n== summary ==") | |
| summary("per-job elapsed", per_job) | |
| med = statistics.median(per_job) | |
| if med: | |
| print(f" per-job docs/s (median): {args.ndocs / med:,.0f}") | |
| if __name__ == '__main__': | |
| p = argparse.ArgumentParser(description=__doc__, | |
| formatter_class=argparse.RawDescriptionHelpFormatter) | |
| p.add_argument('--source-url', default=SOURCE_URL, | |
| help='source URL (default: %(default)s)') | |
| p.add_argument('--target-url', default=TARGET_URL, | |
| help='target URL; defaults to source-url (same cluster)') | |
| p.add_argument('--coordinator-url', default=None, | |
| help='coordinator URL default=source-url') | |
| p.add_argument('--source-db', default=SOURCE_DB, | |
| help='source DB (default: %(default)s)') | |
| p.add_argument('--target-db-prefix', default=TARGET_DB_PREFIX, | |
| help='target DB prefix, jobs use prefix_NN (default: %(default)s)') | |
| p.add_argument('--ndocs', type=int, default=50000, help='docs count') | |
| p.add_argument('--doc-size', type=int, default=1024, | |
| help='approx doc size in bytes (default: %(default)s)') | |
| p.add_argument('--batch', type=int, default=500, help='load bulk_docs batch size') | |
| p.add_argument('--n-jobs', type=int, default=1, | |
| help='concurrent replications jobs (default: %(default)s)') | |
| p.add_argument('--keep-source', action='store_true', | |
| help='skip source load if DB already has >= ndocs') | |
| p.add_argument('--http-timeout', type=float, default=DEFAULT_HTTP_TIMEOUT, | |
| help='client HTTP timeout (sec) (default: %(default)s)') | |
| p.add_argument('--poll-interval', type=float, default=DEFAULT_POLL_INTERVAL, | |
| help='poll period for _scheduler/docs in seconds (default: %(default)s)') | |
| p.add_argument('--scheduler-interval-ms', type=int, default=1000, | |
| help='replicator.interval override on coordinator; lower = faster ' | |
| 'pickup (default: %(default)s)') | |
| p.add_argument('--coord-config', action='append', default=None, metavar='SECTION.KEY=VAL', | |
| help='extra coord config overrides, restored on exit; ' | |
| 'repeat if needed ... --coord-config replicator.max_jobs=2000') | |
| p.add_argument('--worker-batch-size', type=int, default=None, dest='worker_batch_size') | |
| p.add_argument('--worker-processes', type=int, default=None, dest='worker_processes') | |
| p.add_argument('--http-connections', type=int, default=None, dest='http_connections') | |
| p.add_argument('--connection-timeout', type=int, default=None, dest='connection_timeout', help='timeout in ms') | |
| p.add_argument('--checkpoint-interval', type=int, default=None, dest='checkpoint_interval', help='replicator checkpoint interval in ms') | |
| main(p.parse_args()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment