Skip to content

Instantly share code, notes, and snippets.

@dantin
Created October 9, 2017 06:55
Show Gist options
  • Save dantin/f017e182ed389c4b12cc657af1155b17 to your computer and use it in GitHub Desktop.
Save dantin/f017e182ed389c4b12cc657af1155b17 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import argparse
import logging
import sys
import threading
import pymysql.cursors
from datetime import datetime
reload(sys)
sys.setdefaultencoding('utf8')
DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
def db_conn(host, port, username, password, database, charset='utf8'):
return pymysql.connect(
host=host,
port=port,
user=username,
password=password,
db=database,
charset=charset,
cursorclass=pymysql.cursors.DictCursor)
class CleanTask(threading.Thread):
def __init__(self, from_dt, to_dt):
threading.Thread.__init__(self)
self.start_dt = from_dt
self.end_dt = to_dt
def run(self):
# tidb database
host = '10.30.1.4'
port = 4000
username = 'root'
password = ''
database = 'lock_event_log'
conn = db_conn(host, port, username, password, database)
batch_size = 1000
_dml = '''
DELETE FROM `mbk_lock_event`
WHERE CREATE_TIME >= '%s'
AND CREATE_TIME < '%s'
LIMIT %d
'''
_start_dt= self.start_dt.strftime(DATE_FORMAT)
_end_dt= self.end_dt.strftime(DATE_FORMAT)
count = 0
with conn.cursor() as cursor:
result = cursor.execute(_dml % (_start_dt, _end_dt, batch_size))
conn.commit()
count += result
while result > 0:
result = cursor.execute(_dml % (_start_dt, _end_dt, batch_size))
conn.commit()
count += result
logging.info('delete %d records, time range [%s, %s)', count, self.start_dt.strftime(DATE_FORMAT), self.end_dt.strftime(DATE_FORMAT))
def dispatch_task(start_dt, end_dt, chunks):
logging.info('Start data processing...')
# dispatch task
tasks = []
slot = (end_dt - start_dt) / chunks
n = 0
for b, e in [(i*slot + start_dt, (i+1)*slot + start_dt) for i in xrange(chunks)]:
logging.info("Task %d, time range [%s, %s)", n, b.strftime(DATE_FORMAT), e.strftime(DATE_FORMAT))
task = CleanTask(b, e)
tasks.append(task)
n += 1
for t in tasks:
t.start()
for t in tasks:
t.join()
logging.info('Finish data processing')
def main():
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('start_time', type=str, help='start time')
arg_parser.add_argument('end_time', type=str, help='end time')
arg_parser.add_argument('--chunks', type=int, default=1, help='# of chunks')
args = arg_parser.parse_args()
start_dt = datetime.strptime(args.start_time, DATE_FORMAT)
end_dt = datetime.strptime(args.end_time, DATE_FORMAT)
dispatch_task(start_dt, end_dt, args.chunks)
if __name__ == '__main__':
# define logging
root = logging.getLogger()
root.setLevel(logging.DEBUG)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment