Skip to content

Instantly share code, notes, and snippets.

@shuxiang
Last active August 29, 2015 14:04
Show Gist options
  • Save shuxiang/20e72107bb6424cae7ee to your computer and use it in GitHub Desktop.
Save shuxiang/20e72107bb6424cae7ee to your computer and use it in GitHub Desktop.
pull mysql log to rethinkdb
#coding=utf8
import uvent
uvent.install()
import gevent
from gevent import monkey
monkey.patch_all()# must patch first, then from gevent import sth else
from gevent.threadpool import ThreadPool
import time
import rethinkdb as r
import pymysql as MySQLdb
rdb = ['localhost', 28015]
mdb = ['192.168.1.139', 'backprocess', 'backprocess', 'slog']
def dictfetchall(cursor):
"Returns all rows from a cursor as a dict"
desc = cursor.description
return [
dict(zip([col[0] for col in desc], row))
for row in cursor.fetchall()
]
class Gdbm(object):
"""通用数据库连接管理 general database mananger"""
def __init__(self, host, user, passwd, db, charset="utf8", cursorclass=MySQLdb.cursors.DictCursor):
self.host = host
self.user = user
self.passwd = passwd
self.db = db
self.conn = MySQLdb.connect(host=self.host,user=self.user,passwd=self.passwd,db=self.db,charset=charset, cursorclass=cursorclass)
self.cursor = self.conn.cursor()
def query(self, sql):
self.cursor.execute(sql)
data = self.cursor.fetchall()
return data
def execute(self, sql):
self.cursor.execute(sql)
pk = self.conn.insert_id()
self.conn.commit()
return pk
def execute_no_commit(self, sql):
"""execute many sql then commit by hand"""
self.cursor.execute(sql)
def query_as_dict(self, sql):
"""result is [dict{},...]"""
self.cursor.execute(sql)
return dictfetchall(self.cursor)
def commit(self):
self.conn.commit()
def rollback(self):
self.conn.rollback()
def close(self):
self.cursor.close()
self.conn.close()
def table_fetch(table):
conn = r.connect(*rdb)
tb = r.db('slog').table('slog_xm')
gdb = Gdbm(*mdb)
# count table rows
sql = "select count(1) as num from `%s`" % table
count = gdb.query(sql)[0]['num']
# insert data to rethink db 1000 rows per time
offset = 0
while offset <= count:
sql = "select * from `%s` order by time limit %s, 1000" % (table, offset)
offset += 1000
# insert one dict or list of dict
print tb.insert(gdb.query_as_dict(sql)).run(conn, durability='soft')#False/noreply=True
conn.close()
gdb.close()
if __name__ == '__main__':
# connect rethinkdb; r.connect.repl() only use in single thread env-->r.run()
conn = r.connect(*rdb)
if 'slog' not in r.db_list().run(conn):
r.db_create('slog').run(conn)
if 'slog_xm' not in r.db('slog').table_list().run(conn):
r.db('slog').table_create('slog_xm').run(conn)
conn.close()
# get all slog tables
gdb = Gdbm(*mdb)
tables = [i['Tables_in_slog'] for i in gdb.query('show tables')]
gdb.close()
# setup pool
pool = ThreadPool(5)
start = time.time()
# begin jobs
jobs = [pool.spawn(table_fetch, t) for t in tables]
gevent.wait(jobs)
# print time
delay = time.time() - start
print('Running 5 threads. Take about %.3f seconds' % delay)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment