Skip to content

Instantly share code, notes, and snippets.

@nickva
Created October 14, 2024 17:02
Show Gist options
  • Save nickva/8f1f5705adf6a1e1c88b16c16d878bbe to your computer and use it in GitHub Desktop.
Save nickva/8f1f5705adf6a1e1c88b16c16d878bbe to your computer and use it in GitHub Desktop.
Create and monitor Apache CouchDB replication documents
#!/usr/bin/env python
# Run with an n=1 dev/run test cluster
# Needs python 3.7 minimum
#
# $ virtualenv -p python3 venv3
# $ . venv3/bin/activate
# $ pip install requests
# make && ./dev/run -n 3 --with-haproxy --admin=adm:pass
# [replicator]
# interval = 5000
# worker_processes = 1
# worker_batch_size = 1
# update_docs = true
# cluster_start_period = 0
import base64
import time
import json
from datetime import datetime
from urllib.parse import quote_plus
import requests
USER = 'adm'
PASS = 'pass'
SESS = requests.session()
SESS.headers = {"Content-Type": "application/json"}
SESS.auth = ("adm", "pass")
URL = "http://127.0.0.1:5984/"
RDB = "_replicator"
REPS = 1
DOCS = 1000
def maybe_delete_db(db):
url = db_url(db)
resp = SESS.get(url)
if resp.ok:
SESS.delete(url).raise_for_status()
def create_db(db):
SESS.put(db_url(db), params = {'q':'8'}).raise_for_status()
def create_source_dbs(num):
dbs = []
for i in range(0, num):
db = "tst_" + str(i)
resp = SESS.get(db_url(db))
if not resp.ok:
create_db(db)
add_some_docs(db)
dbs.append(db)
return dbs
def add_some_docs(db):
docs = {'docs': [{}] * DOCS}
resp = SESS.post(db_url(db) + '/_bulk_docs', data = json.dumps(docs))
resp.raise_for_status()
return resp
def create_reps(source_dbs):
for i, src in enumerate(source_dbs):
tgt = src + '_t'
headers = {'Authorization': ba_header(USER, PASS)}
rdoc = {
'_id': str(i),
'source': {'url': db_url(src), 'headers': headers},
'target': {'url': db_url(tgt), 'headers': headers},
'continuous': False,
'create_target': True,
'worker_processes': 1
}
data = json.dumps(rdoc)
SESS.post(db_url(RDB), headers=headers, data=data).raise_for_status()
def ba_header(user, password):
user_pass = (user + ':' + password).encode('utf-8')
return 'Basic ' + base64.b64encode(user_pass).decode('utf-8')
def db_url(db):
if db.lower().startswith('http://') or db.lower().startswith('https://'):
return db
else:
return URL + quote_plus(db)
def create_all_reps():
maybe_delete_db(RDB)
create_db(RDB)
source_dbs = create_source_dbs(REPS)
print(f" ** Creating {REPS} replications")
create_reps(source_dbs)
def job_states():
url = URL + '_scheduler/docs/' + RDB
resp = SESS.get(url)
resp.raise_for_status()
jobs = {}
for job in resp.json()["docs"]:
jobs[job['doc_id']] = {'state': job['state']}
return jobs
def doc_states():
url = URL + RDB + '/_all_docs?include_docs=true&r=3'
resp = SESS.get(url)
resp.raise_for_status()
docs = {}
for row in resp.json()['rows']:
doc = row['doc']
rev = doc['_rev']
state = doc.get('_replication_state', '?')
docs[row['id']] = {'rev': rev, 'state':state}
return docs
def sample_states():
while True:
print()
print_jobs(job_states())
docs = doc_states()
print_docs(docs)
delete_completed(docs)
time.sleep(1.0)
# +++ b/src/fabric/src/fabric_doc_update.erl
# @@ -194,6 +194,7 @@ force_reply(Doc, [FirstReply | _] = Replies, {Health, W, Acc}) ->
# ok -> accepted;
# _ -> Health
# end,
# + couch_log:error(" ++XXXXXXXXXXXXXXXXXXXXX+++++ ~p:~p@~B~p", [?MODULE, ?FUNCTION_NAME, ?LINE, {accepted, Replies, NewHealth}]),
# {NewHealth, W, [{Doc, {accepted, AcceptedRev}} | Acc]}
# end
# end.
# [error] 2024-10-14T16:39:08.668451Z [email protected] <0.20958.0> -------- ++XXXXXXXXXXXXXXXXXXXXX+++++ fabric_doc_update:force_reply@197{accepted,[conflict,conflict,{ok,{4,<<20,236,231,42,87,211,218,121,12,181,243,237,240,168,248,175>>}}],accepted}
def delete_completed(docs):
for doc_id, rev_state in docs.items():
if rev_state['state'] == 'completed':
rev = rev_state['rev']
resp = SESS.delete(db_url(RDB) + '/' + doc_id, params = {'rev':rev})
status = resp.status_code
print(f"Doc {doc_id} completed deleting @rev {rev}' : {status}")
resp.raise_for_status()
def print_jobs(jobs):
for doc_id in jobs:
state = jobs[doc_id]
print(f"sched: {doc_id} {state}")
def print_docs(docs):
for doc_id in docs:
state = docs[doc_id]
print(f"doc: {doc_id} {state}")
def main():
create_all_reps()
sample_states()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment