Skip to content

Instantly share code, notes, and snippets.

@lbjay
Last active April 5, 2016 18:18
Show Gist options
  • Save lbjay/f8dee7643e592e0880b2f306ea79afba to your computer and use it in GitHub Desktop.
Save lbjay/f8dee7643e592e0880b2f306ea79afba to your computer and use it in GitHub Desktop.
mh_user_action inserter
import sys
import time
import boto3
import logging
import argparse
from faker import Factory
from random import choice, shuffle
from multiprocessing import Process, JoinableQueue
from mysql.connector import connect
logging.basicConfig(
stream=sys.stdout,
level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s'
)
parser = argparse.ArgumentParser()
parser.add_argument("--host", default=None, help="The hostname of the MySQL node to connect to")
parser.add_argument("--port", default=None, type=int, help="The port of the MySQL node to connect to")
parser.add_argument("--user", default="root", help="The user of the MySQL node to connect to")
parser.add_argument("--password", default="", help="The password of the MySQL node to connect to")
parser.add_argument("--database", default="matterhorn", help="The database to use")
parser.add_argument("--table", default="mh_user_action", help="The user action table to insert to")
parser.add_argument("--aws-profile", default="test", help="AWS profile to use for cloudwatch metrics")
parser.add_argument("--num-workers", type=int, default=10, help="The number of insert threads")
parser.add_argument("--num-inserts", type=int, default=10000, help="The total number of insert to execute")
parser.add_argument("--interval", type=int, default=30, help="The number of seconds each worker waits between inserts")
options = parser.parse_args()
class Worker(Process):
def __init__(self, work_queue, metric):
Process.__init__(self)
self.work_queue = work_queue
self.metric = metric
self.con = connect(
user=options.user,
password=options.password,
host=options.host,
database=options.database
)
self.fake = Factory.create()
self.session_id = self.fake.pystr(max_chars=25)
session_insert = ("INSERT INTO mh_user_session "
"(session_id,user_ip,user_agent,user_id) "
"VALUES (%(session_id)s, %(user_ip)s, %(user_agent)s, 'anonymous')")
c = self.con.cursor()
c.execute(session_insert, {
'session_id': self.session_id,
'user_ip': self.fake.ipv4(),
'user_agent': self.fake.user_agent()
})
self.con.commit()
c.close()
def run(self):
log = logging.getLogger()
while True:
action_id = self.work_queue.get()
if action_id is None:
log.info("Nothing left to do for worker %s", self.name)
self.work_queue.task_done()
self.con.close()
break
try:
action_insert = "INSERT INTO " + options.table + " " \
+ "(id,inpoint,outpoint,mediapackage,session_id,created,length,type,playing) " \
+ "VALUES (%(id)s, %(inpoint)s, %(outpoint)s, %(mediapackage)s, %(session_id)s, %(created)s, 0, %(type)s, 1)"
c = self.con.cursor()
start = time.time()
c.execute(action_insert, {
'id': action_id,
'inpoint': self.fake.pyint(),
'outpoint': self.fake.pyint(),
'mediapackage': self.fake.uuid4(),
'session_id': self.session_id,
'created': self.fake.date_time(),
'type': choice(['PLAY','PAUSE','SEEK','HEARTBEAT'])
})
log.info("%s inserting action %d", self.name, action_id)
self.con.commit()
end = time.time()
self.metric.put_data(
MetricData=[
dict(
MetricName='UserActionInsert-%s' % options.table,
Unit='Seconds',
Value=round(end - start, 3)
)
])
c.close()
time.sleep(choice(range(options.interval)) + 1)
finally:
self.work_queue.task_done()
def main():
log = logging.getLogger()
con = connect(
user=options.user,
password=options.password,
host=options.host,
database=options.database
)
c = con.cursor()
c.execute("SELECT MAX(id) + 1 FROM %s" % options.table)
(next_id,) = c.fetchone()
next_id = next_id or 1
con.close()
boto3.setup_default_session(profile_name=options.aws_profile)
cloudwatch = boto3.resource('cloudwatch')
metric = cloudwatch.Metric('mh-user-action-bench', 'UserActionInsert-%s' % options.table)
work_queue = JoinableQueue()
log.info("starting %d workers", options.num_workers)
workers = [Worker(work_queue, metric) for x in xrange(options.num_workers)]
for w in workers:
w.start()
action_ids = range(next_id, next_id + options.num_inserts)
shuffle(action_ids)
for id in action_ids:
work_queue.put(id)
log.info("poisoning the work queue")
for i in xrange(options.num_workers):
work_queue.put(None)
log.info("joining the work queue")
work_queue.join()
log.info("joining the work threads")
for w in workers:
w.join()
log.info("all work complete")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment