Created
October 14, 2024 17:02
-
-
Save nickva/8f1f5705adf6a1e1c88b16c16d878bbe to your computer and use it in GitHub Desktop.
Create and monitor Apache CouchDB replication documents
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Run with an n=1 dev/run test cluster | |
# Needs python 3.7 minimum | |
# | |
# $ virtualenv -p python3 venv3 | |
# $ . venv3/bin/activate | |
# $ pip install requests | |
# make && ./dev/run -n 3 --with-haproxy --admin=adm:pass | |
# [replicator] | |
# interval = 5000 | |
# worker_processes = 1 | |
# worker_batch_size = 1 | |
# update_docs = true | |
# cluster_start_period = 0 | |
import base64 | |
import time | |
import json | |
from datetime import datetime | |
from urllib.parse import quote_plus | |
import requests | |
USER = 'adm' | |
PASS = 'pass' | |
SESS = requests.session() | |
SESS.headers = {"Content-Type": "application/json"} | |
SESS.auth = ("adm", "pass") | |
URL = "http://127.0.0.1:5984/" | |
RDB = "_replicator" | |
REPS = 1 | |
DOCS = 1000 | |
def maybe_delete_db(db): | |
url = db_url(db) | |
resp = SESS.get(url) | |
if resp.ok: | |
SESS.delete(url).raise_for_status() | |
def create_db(db): | |
SESS.put(db_url(db), params = {'q':'8'}).raise_for_status() | |
def create_source_dbs(num): | |
dbs = [] | |
for i in range(0, num): | |
db = "tst_" + str(i) | |
resp = SESS.get(db_url(db)) | |
if not resp.ok: | |
create_db(db) | |
add_some_docs(db) | |
dbs.append(db) | |
return dbs | |
def add_some_docs(db): | |
docs = {'docs': [{}] * DOCS} | |
resp = SESS.post(db_url(db) + '/_bulk_docs', data = json.dumps(docs)) | |
resp.raise_for_status() | |
return resp | |
def create_reps(source_dbs): | |
for i, src in enumerate(source_dbs): | |
tgt = src + '_t' | |
headers = {'Authorization': ba_header(USER, PASS)} | |
rdoc = { | |
'_id': str(i), | |
'source': {'url': db_url(src), 'headers': headers}, | |
'target': {'url': db_url(tgt), 'headers': headers}, | |
'continuous': False, | |
'create_target': True, | |
'worker_processes': 1 | |
} | |
data = json.dumps(rdoc) | |
SESS.post(db_url(RDB), headers=headers, data=data).raise_for_status() | |
def ba_header(user, password): | |
user_pass = (user + ':' + password).encode('utf-8') | |
return 'Basic ' + base64.b64encode(user_pass).decode('utf-8') | |
def db_url(db): | |
if db.lower().startswith('http://') or db.lower().startswith('https://'): | |
return db | |
else: | |
return URL + quote_plus(db) | |
def create_all_reps(): | |
maybe_delete_db(RDB) | |
create_db(RDB) | |
source_dbs = create_source_dbs(REPS) | |
print(f" ** Creating {REPS} replications") | |
create_reps(source_dbs) | |
def job_states(): | |
url = URL + '_scheduler/docs/' + RDB | |
resp = SESS.get(url) | |
resp.raise_for_status() | |
jobs = {} | |
for job in resp.json()["docs"]: | |
jobs[job['doc_id']] = {'state': job['state']} | |
return jobs | |
def doc_states(): | |
url = URL + RDB + '/_all_docs?include_docs=true&r=3' | |
resp = SESS.get(url) | |
resp.raise_for_status() | |
docs = {} | |
for row in resp.json()['rows']: | |
doc = row['doc'] | |
rev = doc['_rev'] | |
state = doc.get('_replication_state', '?') | |
docs[row['id']] = {'rev': rev, 'state':state} | |
return docs | |
def sample_states(): | |
while True: | |
print() | |
print_jobs(job_states()) | |
docs = doc_states() | |
print_docs(docs) | |
delete_completed(docs) | |
time.sleep(1.0) | |
# +++ b/src/fabric/src/fabric_doc_update.erl | |
# @@ -194,6 +194,7 @@ force_reply(Doc, [FirstReply | _] = Replies, {Health, W, Acc}) -> | |
# ok -> accepted; | |
# _ -> Health | |
# end, | |
# + couch_log:error(" ++XXXXXXXXXXXXXXXXXXXXX+++++ ~p:~p@~B~p", [?MODULE, ?FUNCTION_NAME, ?LINE, {accepted, Replies, NewHealth}]), | |
# {NewHealth, W, [{Doc, {accepted, AcceptedRev}} | Acc]} | |
# end | |
# end. | |
# [error] 2024-10-14T16:39:08.668451Z [email protected] <0.20958.0> -------- ++XXXXXXXXXXXXXXXXXXXXX+++++ fabric_doc_update:force_reply@197{accepted,[conflict,conflict,{ok,{4,<<20,236,231,42,87,211,218,121,12,181,243,237,240,168,248,175>>}}],accepted} | |
def delete_completed(docs): | |
for doc_id, rev_state in docs.items(): | |
if rev_state['state'] == 'completed': | |
rev = rev_state['rev'] | |
resp = SESS.delete(db_url(RDB) + '/' + doc_id, params = {'rev':rev}) | |
status = resp.status_code | |
print(f"Doc {doc_id} completed deleting @rev {rev}' : {status}") | |
resp.raise_for_status() | |
def print_jobs(jobs): | |
for doc_id in jobs: | |
state = jobs[doc_id] | |
print(f"sched: {doc_id} {state}") | |
def print_docs(docs): | |
for doc_id in docs: | |
state = docs[doc_id] | |
print(f"doc: {doc_id} {state}") | |
def main(): | |
create_all_reps() | |
sample_states() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment