Last active
June 2, 2021 01:44
-
-
Save thom-vend/0fe10e1b857aa0d95921fa52a1274feb to your computer and use it in GitHub Desktop.
Zookeeper read-write test job in python3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from datetime import datetime | |
from icecream import ic | |
from kazoo.client import KazooClient | |
from multiprocessing import Pool | |
from pprint import pprint | |
import argparse | |
import logging | |
import random | |
import signal | |
import string | |
import sys | |
import time | |
import traceback | |
args = None | |
def dataforzktest(): | |
rand = "".join(random.choice(string.ascii_uppercase) for _ in range(32)) | |
date = datetime.today().strftime("%Y-%m-%d-%H:%M:%S") | |
data = f"{date}__{rand}" | |
return data.encode() | |
def printme(s): | |
if not args.noprint: | |
logging.info(s) | |
def zk_write(zk, subpath, name): | |
path = "{}/{}".format(subpath.rstrip("/"), name) | |
ic(path) | |
write_latency = -1 | |
if not zk.exists(path): | |
ic() | |
zk.ensure_path(subpath) | |
zk.create(path, dataforzktest()) | |
else: | |
ic() | |
tic = time.perf_counter() | |
zk.set(path, dataforzktest()) | |
write_latency = time.perf_counter() - tic | |
return write_latency | |
def zk_read(zk, subpath, name): | |
path = "{}/{}".format(subpath.rstrip("/"), name) | |
read_latency = -1 | |
if zk.exists(path): | |
tic = time.perf_counter() | |
data, stat = zk.get(path) | |
read_latency = time.perf_counter() - tic | |
ic(stat) | |
ic(data) | |
datas = data.decode("utf-8") | |
printme(f"Version: {stat.version}, data: {datas}") | |
else: | |
ic() | |
printme(f"Path {path} don't exist") | |
return read_latency | |
def zk_appjob(workernum): | |
ic(workernum) | |
zk_node = args.jobname | |
if args.autoname: | |
zk_node = f"{args.jobname}{workernum}" | |
write_latencies = [] | |
read_latencies = [] | |
avg_write = -1 | |
avg_read = -1 | |
start_time = time.perf_counter() | |
zk = KazooClient(hosts=args.host, read_only=(not args.write)) | |
zk.start() | |
ic() | |
while True: | |
if args.write: | |
l = zk_write(zk, args.zkpath, zk_node) | |
if l != -1 and not args.nostats: | |
write_latencies.append(l) | |
if args.read: | |
l = zk_read(zk, args.zkpath, zk_node) | |
if l != -1 and not args.nostats: | |
read_latencies.append(l) | |
if not args.nostats and (time.perf_counter() - start_time) > 5: | |
lwrite_latencies = len(write_latencies) | |
lread_latencies = len(read_latencies) | |
if lwrite_latencies > 0: | |
avg_write = round((sum(write_latencies) / lwrite_latencies) * 1000, 2) | |
write_latencies = [] | |
if lread_latencies > 0: | |
avg_read = round((sum(read_latencies) / lread_latencies) * 1000, 2) | |
read_latencies = [] | |
start_time = time.perf_counter() | |
logging.info( | |
f"STATS: avg latencies: write={avg_write}ms, read={avg_read}ms -- writes:{lwrite_latencies}, reads{lread_latencies}" | |
) | |
if args.apploop: | |
if args.delay > 0.001: | |
time.sleep(args.delay) | |
else: | |
continue | |
else: | |
break | |
ic() | |
zk.stop | |
class GracefulExit(Exception): | |
pass | |
def signal_handler(signum, frame): | |
raise GracefulExit() | |
def job_fullloop(workernum): | |
ic(workernum) | |
while True: | |
try: | |
zk_appjob(workernum) | |
except GracefulExit: | |
logging.info("Gracefull shutdown...") | |
sys.exit(0) | |
return | |
except Exception as e: | |
logging.error(traceback.format_exc()) | |
if args.fullloop: | |
ic() | |
if args.delay > 0.001: | |
time.sleep(args.delay) | |
else: | |
continue | |
else: | |
break | |
def mp_init(_args): | |
global args | |
args = _args | |
if not args.debug: | |
ic.disable() | |
def multiprocess(args): | |
with Pool(args.worker, initializer=mp_init, initargs=(args,)) as p: | |
ic() | |
p.map(job_fullloop, range(0, args.worker)) | |
def signal_handler(signum, frame): | |
raise GracefulExit() | |
def main(): | |
global args | |
parser = argparse.ArgumentParser( | |
description="zookeeper-check-kazoo dirty tool", | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
) | |
parser.add_argument( | |
"--host", dest="host", default="127.0.0.1:2181", help='Set "host:port"' | |
) | |
parser.add_argument( | |
"--write", | |
action="store_true", | |
dest="write", | |
default=False, | |
help="Write some data", | |
) | |
parser.add_argument( | |
"--read", | |
action="store_true", | |
dest="read", | |
default=False, | |
help="Read some data (can be enable with write mode to read just after", | |
) | |
parser.add_argument( | |
"--zk-path", dest="zkpath", default="/sremigrationtest/", help="ZK base path" | |
) | |
parser.add_argument( | |
"--job-name", | |
dest="jobname", | |
default="test0", | |
help="Name of the job, entry in ZK", | |
) | |
parser.add_argument( | |
"--auto-suffix", | |
action="store_true", | |
dest="autoname", | |
default=False, | |
help="Auto add suffix on the job name", | |
) | |
parser.add_argument( | |
"--debug", | |
action="store_true", | |
dest="debug", | |
default=False, | |
help="Enable debug mode", | |
) | |
parser.add_argument( | |
"--apploop", | |
action="store_true", | |
dest="apploop", | |
default=False, | |
help="Infinite app style loop", | |
) | |
parser.add_argument( | |
"--fullloop", | |
action="store_true", | |
dest="fullloop", | |
default=False, | |
help="Full reconnection loop", | |
) | |
parser.add_argument( | |
"--delay", | |
dest="delay", | |
type=float, | |
default=1.0, | |
help="Pause x seconds before doing it again (loops mode only), 0 for no delay", | |
) | |
parser.add_argument( | |
"--worker", | |
dest="worker", | |
type=int, | |
default=1, | |
help="Amount of worker to do the requested job in //", | |
) | |
parser.add_argument( | |
"--no-stats", | |
action="store_true", | |
dest="nostats", | |
default=False, | |
help="Disable stats prints", | |
) | |
parser.add_argument( | |
"--no-prints", | |
action="store_true", | |
dest="noprint", | |
default=False, | |
help="Disable priting read values", | |
) | |
args = parser.parse_args() | |
if args.debug: | |
logging.basicConfig(level=logging.DEBUG) | |
else: | |
ic.disable() | |
logging.basicConfig(level=logging.INFO) | |
ic(args) | |
if not args.read and not args.write: | |
logging.error( | |
"Nothing todo, --read and/or --write need to be passed as argument, -h for help" | |
) | |
sys.exit(42) | |
signal.signal(signal.SIGTERM, signal_handler) | |
signal.signal(signal.SIGINT, signal_handler) | |
if args.worker > 1: | |
multiprocess(args) | |
else: | |
job_fullloop(0) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment