Last active
August 4, 2016 20:20
-
-
Save jasonmimick/e624bea9c46fb219da17d32e378220ae to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import argparse | |
import datetime | |
import pymongo | |
from pymongo import MongoClient, CursorType | |
import uuid | |
import time | |
import json | |
import random | |
import string | |
import threading | |
from threading import Thread | |
class Consumer: | |
"""Consumer of certain oplog documents""" | |
def __init__(self,name,args): | |
self.name = name | |
self.args = args | |
self.all_events = args.events.split(',') | |
# select some random number of events for this consumer | |
# to be interested in | |
self.events = random.sample( self.all_events, random.choice( range(len(self.all_events) ))) | |
self.log("events=" + str(self.events)) | |
self.event_ns = args.namespace | |
self.event_db,self.event_coll = args.namespace.split('.') | |
self.marker_coll = self.event_coll+".markers" | |
self.counter_coll = self.event_coll+".counter" | |
def log(self,msg,force=False): | |
if self.args.verbose or force: | |
print("consumer["+self.name+"] " + msg) | |
def consume(self): | |
i = 0 | |
conn = pymongo.MongoClient(self.args.mongouri) | |
self.conn = conn | |
# check marker collection is there | |
collections = conn[self.event_db].collection_names(); | |
# TODO need to add filter on only the events this consumer cares about! | |
oplog_query = { "ns" : self.event_ns, "o.event" : { '$in' : self.events } } | |
self.event_count = 0; | |
if self.marker_coll in collections: | |
try: | |
query = { "name" : self.name } | |
marker = conn[self.event_db][self.marker_coll].find(query).limit(1).next() | |
oplog_query["ts"] = { "$gt" : marker['ts'] } | |
except StopIteration: | |
self.log(self.event_db+"."+self.marker_coll+" exists but no marker found") | |
else: | |
self.log("No marker collection!") | |
conn[self.event_db].create_collection( self.marker_coll ) | |
#TODO get create_index to work | |
conn[self.event_db][self.marker_coll].create_index( [("name" , pymongo.ASCENDING)] ) | |
print( oplog_query ) | |
self.log(str(oplog_query)) | |
oplog = conn['local']['oplog.rs'].find(oplog_query,cursor_type=CursorType.TAILABLE_AWAIT) | |
if 'ts' in query: | |
oplog.add_option(8) #oplog replay | |
self.update_count() | |
while oplog.alive: | |
try: | |
if os.path.exists("./stop"): | |
self.log("Got stop command",force=True) | |
oplog.close() | |
return | |
event = oplog.next() | |
self.try_update_marker(conn,event,self.event_count) | |
self.event_count = self.event_count + 1 | |
self.log("event_count=" + str(self.event_count)) | |
except StopIteration: | |
time.sleep(self.args.sleep) | |
def update_count(self): | |
if os.path.exists("./stop"): | |
self.log("Got stop command update_count thread",force=True) | |
return | |
for i in range(5): | |
try: | |
coll = self.conn[self.event_db][self.counter_coll]; | |
result = coll.insert_one( { "name" : self.name, "count" : self.event_count, "ts" : datetime.datetime.now() } ) | |
threading.Timer(1, self.update_count).start() | |
return | |
except pymongo.errors.AutoReconnect: | |
self.log("AutoReconnect error attempt #" + str(i)) | |
time.sleep(pow(2,i)) | |
def try_update_marker(self,conn,event,event_count): | |
for i in range(5): | |
try: | |
self.log(str(event['ts'])) | |
coll = conn[self.event_db][self.marker_coll] | |
result = coll.update( { "name" : self.name }, { "$set" : { "ts" : event['ts'], "count" : event_count } }, upsert=True ) | |
return | |
except pymongo.errors.AutoReconnect: | |
self.log("AutoReconnect error attempt #" + str(i)) | |
time.sleep(pow(2,i)) | |
class Producer: | |
"""Create events""" | |
def __init__(self,name,args): | |
self.name = name | |
self.args = args | |
self.events = args.events.split(',') | |
self.event_ns = args.namespace | |
self.event_db,self.event_coll = args.namespace.split('.') | |
self.counter_coll = self.event_coll+".pcounter" | |
def log(self,msg,force=False): | |
if self.args.verbose or force: | |
print("producer["+self.name+"] " + msg) | |
def produce(self): | |
self.event_count = 0; | |
N = 100 | |
conn = pymongo.MongoClient(self.args.mongouri) | |
self.conn = conn | |
self.update_count() | |
while True: | |
if os.path.exists("./stop"): | |
self.log("Got stop command",force=True) | |
return | |
self.log("producing " + str(i)); | |
event = self.events[random.choice( range(len(self.events) ))] | |
data = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(N)) | |
self.try_insert_event(conn,event, data) | |
self.event_count = self.event_count+1; | |
time.sleep(float(self.args.producerPause)) | |
def update_count(self): | |
if os.path.exists("./stop"): | |
self.log("Got stop command update_count thread",force=True) | |
return | |
for i in range(5): | |
try: | |
coll = self.conn[self.event_db][self.counter_coll]; | |
result = coll.insert_one( { "name" : self.name, "count" : self.event_count, "ts" : datetime.datetime.now() } ) | |
threading.Timer(1, self.update_count).start() | |
return | |
except pymongo.errors.AutoReconnect: | |
self.log("AutoReconnect error attempt #" + str(i)) | |
time.sleep(pow(2,i)) | |
def try_insert_event(self,conn,event,data): | |
for i in range(5): | |
try: | |
coll = conn[self.event_db][self.event_coll] | |
result = coll.insert( { "name" : self.name, 'event' : event, 'ts' : datetime.datetime.now(), 'data' : data } ) | |
return | |
except pymongo.errors.AutoReconnect: | |
self.log("AutoReconnect error attempt #" + str(i)) | |
time.sleep(pow(2,i)) | |
parser = argparse.ArgumentParser(description="benchmark oplog tailing") | |
required = parser.add_argument_group('required named arguments') | |
required.add_argument("--mongouri" | |
,help='Connection string to MongoDB' | |
,required=True) | |
required.add_argument("--namespace" | |
,help='Namespace to write events into' | |
,required=True) | |
required.add_argument("--events" | |
,help='Comma-delimited string of event names' | |
,required=True) | |
parser.add_argument("--producers" | |
,help='Number of producer threads, default 1' | |
,default=1) | |
parser.add_argument("--consumers" | |
,help='Number of consumer theads, default 2' | |
,default=2,type=int) | |
parser.add_argument("--create", action='store_true' | |
,default=False | |
,help='Set this flag to drop and recreate target collection') | |
parser.add_argument("--sleep" | |
,default=5 | |
,help='Number of seconds consumers should sleep waiting for more events') | |
parser.add_argument("--producerPause" | |
,default=0.1 | |
,help='Number of seconds producers should pause between sending events') | |
parser.add_argument("--verbose", action='store_true', default=False | |
,help='enable versbose output for troubleshooting') | |
parsed_args = parser.parse_args() | |
print( vars(parsed_args) ) | |
threads = [] | |
# start up producers | |
producer_names = [] | |
consumer_names = [] | |
if parsed_args.create: | |
for i in range(int(parsed_args.producers)): | |
producer_names.append( str(uuid.uuid4() ) ) | |
pf = open("./producers","w+") | |
pf.write( json.dumps( producer_names ) ) | |
pf.close() | |
for i in range(int(parsed_args.consumers)): | |
consumer_names.append( str(uuid.uuid4() ) ) | |
cf = open("./consumers","w+") | |
cf.write( json.dumps( consumer_names ) ) | |
cf.close() | |
else: | |
if os.path.exists("./producers") is False: | |
print("No 'producers' file found, do you want the --create flag?") | |
sys.exit(1) | |
if os.path.exists("./consumers") is False: | |
print("No 'consumers' file found, do you want the --create flag?") | |
sys.exit(1) | |
pf = open("./producers","r") | |
cf = open("./consumers","r") | |
producer_names = json.loads(pf.read()) | |
consumer_names = json.loads(cf.read()) | |
pf.close() | |
cf.close() | |
print(producer_names) | |
print(consumer_names) | |
for i in range(int(parsed_args.producers)): | |
p = Producer(producer_names[i],parsed_args) | |
t = Thread(target=p.produce) | |
t.daemon = True | |
threads.append(t) | |
# start up consumers | |
for i in range(int(parsed_args.consumers)): | |
c = Consumer(consumer_names[i],parsed_args) | |
t = Thread(target=c.consume) | |
t.daemon = True | |
threads.append(t) | |
[t.start() for t in threads] | |
while True: | |
threads = [t.join(20) for t in threads if t is not None and t.isAlive()] | |
if os.path.exists("./stop"): | |
time.sleep(5) # Give workers time to cleanup | |
print("[Main] Got stop command") | |
break | |
# restart logic |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment