Skip to content

Instantly share code, notes, and snippets.

@jasonmimick
Last active August 4, 2016 20:20
Show Gist options
  • Save jasonmimick/e624bea9c46fb219da17d32e378220ae to your computer and use it in GitHub Desktop.
Save jasonmimick/e624bea9c46fb219da17d32e378220ae to your computer and use it in GitHub Desktop.
import os
import argparse
import datetime
import pymongo
from pymongo import MongoClient, CursorType
import uuid
import time
import json
import random
import string
import threading
from threading import Thread
class Consumer:
"""Consumer of certain oplog documents"""
def __init__(self,name,args):
self.name = name
self.args = args
self.all_events = args.events.split(',')
# select some random number of events for this consumer
# to be interested in
self.events = random.sample( self.all_events, random.choice( range(len(self.all_events) )))
self.log("events=" + str(self.events))
self.event_ns = args.namespace
self.event_db,self.event_coll = args.namespace.split('.')
self.marker_coll = self.event_coll+".markers"
self.counter_coll = self.event_coll+".counter"
def log(self,msg,force=False):
if self.args.verbose or force:
print("consumer["+self.name+"] " + msg)
def consume(self):
i = 0
conn = pymongo.MongoClient(self.args.mongouri)
self.conn = conn
# check marker collection is there
collections = conn[self.event_db].collection_names();
# TODO need to add filter on only the events this consumer cares about!
oplog_query = { "ns" : self.event_ns, "o.event" : { '$in' : self.events } }
self.event_count = 0;
if self.marker_coll in collections:
try:
query = { "name" : self.name }
marker = conn[self.event_db][self.marker_coll].find(query).limit(1).next()
oplog_query["ts"] = { "$gt" : marker['ts'] }
except StopIteration:
self.log(self.event_db+"."+self.marker_coll+" exists but no marker found")
else:
self.log("No marker collection!")
conn[self.event_db].create_collection( self.marker_coll )
#TODO get create_index to work
conn[self.event_db][self.marker_coll].create_index( [("name" , pymongo.ASCENDING)] )
print( oplog_query )
self.log(str(oplog_query))
oplog = conn['local']['oplog.rs'].find(oplog_query,cursor_type=CursorType.TAILABLE_AWAIT)
if 'ts' in query:
oplog.add_option(8) #oplog replay
self.update_count()
while oplog.alive:
try:
if os.path.exists("./stop"):
self.log("Got stop command",force=True)
oplog.close()
return
event = oplog.next()
self.try_update_marker(conn,event,self.event_count)
self.event_count = self.event_count + 1
self.log("event_count=" + str(self.event_count))
except StopIteration:
time.sleep(self.args.sleep)
def update_count(self):
if os.path.exists("./stop"):
self.log("Got stop command update_count thread",force=True)
return
for i in range(5):
try:
coll = self.conn[self.event_db][self.counter_coll];
result = coll.insert_one( { "name" : self.name, "count" : self.event_count, "ts" : datetime.datetime.now() } )
threading.Timer(1, self.update_count).start()
return
except pymongo.errors.AutoReconnect:
self.log("AutoReconnect error attempt #" + str(i))
time.sleep(pow(2,i))
def try_update_marker(self,conn,event,event_count):
for i in range(5):
try:
self.log(str(event['ts']))
coll = conn[self.event_db][self.marker_coll]
result = coll.update( { "name" : self.name }, { "$set" : { "ts" : event['ts'], "count" : event_count } }, upsert=True )
return
except pymongo.errors.AutoReconnect:
self.log("AutoReconnect error attempt #" + str(i))
time.sleep(pow(2,i))
class Producer:
"""Create events"""
def __init__(self,name,args):
self.name = name
self.args = args
self.events = args.events.split(',')
self.event_ns = args.namespace
self.event_db,self.event_coll = args.namespace.split('.')
self.counter_coll = self.event_coll+".pcounter"
def log(self,msg,force=False):
if self.args.verbose or force:
print("producer["+self.name+"] " + msg)
def produce(self):
self.event_count = 0;
N = 100
conn = pymongo.MongoClient(self.args.mongouri)
self.conn = conn
self.update_count()
while True:
if os.path.exists("./stop"):
self.log("Got stop command",force=True)
return
self.log("producing " + str(i));
event = self.events[random.choice( range(len(self.events) ))]
data = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(N))
self.try_insert_event(conn,event, data)
self.event_count = self.event_count+1;
time.sleep(float(self.args.producerPause))
def update_count(self):
if os.path.exists("./stop"):
self.log("Got stop command update_count thread",force=True)
return
for i in range(5):
try:
coll = self.conn[self.event_db][self.counter_coll];
result = coll.insert_one( { "name" : self.name, "count" : self.event_count, "ts" : datetime.datetime.now() } )
threading.Timer(1, self.update_count).start()
return
except pymongo.errors.AutoReconnect:
self.log("AutoReconnect error attempt #" + str(i))
time.sleep(pow(2,i))
def try_insert_event(self,conn,event,data):
for i in range(5):
try:
coll = conn[self.event_db][self.event_coll]
result = coll.insert( { "name" : self.name, 'event' : event, 'ts' : datetime.datetime.now(), 'data' : data } )
return
except pymongo.errors.AutoReconnect:
self.log("AutoReconnect error attempt #" + str(i))
time.sleep(pow(2,i))
parser = argparse.ArgumentParser(description="benchmark oplog tailing")
required = parser.add_argument_group('required named arguments')
required.add_argument("--mongouri"
,help='Connection string to MongoDB'
,required=True)
required.add_argument("--namespace"
,help='Namespace to write events into'
,required=True)
required.add_argument("--events"
,help='Comma-delimited string of event names'
,required=True)
parser.add_argument("--producers"
,help='Number of producer threads, default 1'
,default=1)
parser.add_argument("--consumers"
,help='Number of consumer theads, default 2'
,default=2,type=int)
parser.add_argument("--create", action='store_true'
,default=False
,help='Set this flag to drop and recreate target collection')
parser.add_argument("--sleep"
,default=5
,help='Number of seconds consumers should sleep waiting for more events')
parser.add_argument("--producerPause"
,default=0.1
,help='Number of seconds producers should pause between sending events')
parser.add_argument("--verbose", action='store_true', default=False
,help='enable versbose output for troubleshooting')
parsed_args = parser.parse_args()
print( vars(parsed_args) )
threads = []
# start up producers
producer_names = []
consumer_names = []
if parsed_args.create:
for i in range(int(parsed_args.producers)):
producer_names.append( str(uuid.uuid4() ) )
pf = open("./producers","w+")
pf.write( json.dumps( producer_names ) )
pf.close()
for i in range(int(parsed_args.consumers)):
consumer_names.append( str(uuid.uuid4() ) )
cf = open("./consumers","w+")
cf.write( json.dumps( consumer_names ) )
cf.close()
else:
if os.path.exists("./producers") is False:
print("No 'producers' file found, do you want the --create flag?")
sys.exit(1)
if os.path.exists("./consumers") is False:
print("No 'consumers' file found, do you want the --create flag?")
sys.exit(1)
pf = open("./producers","r")
cf = open("./consumers","r")
producer_names = json.loads(pf.read())
consumer_names = json.loads(cf.read())
pf.close()
cf.close()
print(producer_names)
print(consumer_names)
for i in range(int(parsed_args.producers)):
p = Producer(producer_names[i],parsed_args)
t = Thread(target=p.produce)
t.daemon = True
threads.append(t)
# start up consumers
for i in range(int(parsed_args.consumers)):
c = Consumer(consumer_names[i],parsed_args)
t = Thread(target=c.consume)
t.daemon = True
threads.append(t)
[t.start() for t in threads]
while True:
threads = [t.join(20) for t in threads if t is not None and t.isAlive()]
if os.path.exists("./stop"):
time.sleep(5) # Give workers time to cleanup
print("[Main] Got stop command")
break
# restart logic
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment