Last active
August 29, 2015 14:04
-
-
Save shuxiang/20e72107bb6424cae7ee to your computer and use it in GitHub Desktop.
pull mysql log to rethinkdb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#coding=utf8 | |
import uvent | |
uvent.install() | |
import gevent | |
from gevent import monkey | |
monkey.patch_all()# must patch first, then from gevent import sth else | |
from gevent.threadpool import ThreadPool | |
import time | |
import rethinkdb as r | |
import pymysql as MySQLdb | |
rdb = ['localhost', 28015] | |
mdb = ['192.168.1.139', 'backprocess', 'backprocess', 'slog'] | |
def dictfetchall(cursor): | |
"Returns all rows from a cursor as a dict" | |
desc = cursor.description | |
return [ | |
dict(zip([col[0] for col in desc], row)) | |
for row in cursor.fetchall() | |
] | |
class Gdbm(object): | |
"""通用数据库连接管理 general database mananger""" | |
def __init__(self, host, user, passwd, db, charset="utf8", cursorclass=MySQLdb.cursors.DictCursor): | |
self.host = host | |
self.user = user | |
self.passwd = passwd | |
self.db = db | |
self.conn = MySQLdb.connect(host=self.host,user=self.user,passwd=self.passwd,db=self.db,charset=charset, cursorclass=cursorclass) | |
self.cursor = self.conn.cursor() | |
def query(self, sql): | |
self.cursor.execute(sql) | |
data = self.cursor.fetchall() | |
return data | |
def execute(self, sql): | |
self.cursor.execute(sql) | |
pk = self.conn.insert_id() | |
self.conn.commit() | |
return pk | |
def execute_no_commit(self, sql): | |
"""execute many sql then commit by hand""" | |
self.cursor.execute(sql) | |
def query_as_dict(self, sql): | |
"""result is [dict{},...]""" | |
self.cursor.execute(sql) | |
return dictfetchall(self.cursor) | |
def commit(self): | |
self.conn.commit() | |
def rollback(self): | |
self.conn.rollback() | |
def close(self): | |
self.cursor.close() | |
self.conn.close() | |
def table_fetch(table): | |
conn = r.connect(*rdb) | |
tb = r.db('slog').table('slog_xm') | |
gdb = Gdbm(*mdb) | |
# count table rows | |
sql = "select count(1) as num from `%s`" % table | |
count = gdb.query(sql)[0]['num'] | |
# insert data to rethink db 1000 rows per time | |
offset = 0 | |
while offset <= count: | |
sql = "select * from `%s` order by time limit %s, 1000" % (table, offset) | |
offset += 1000 | |
# insert one dict or list of dict | |
print tb.insert(gdb.query_as_dict(sql)).run(conn, durability='soft')#False/noreply=True | |
conn.close() | |
gdb.close() | |
if __name__ == '__main__': | |
# connect rethinkdb; r.connect.repl() only use in single thread env-->r.run() | |
conn = r.connect(*rdb) | |
if 'slog' not in r.db_list().run(conn): | |
r.db_create('slog').run(conn) | |
if 'slog_xm' not in r.db('slog').table_list().run(conn): | |
r.db('slog').table_create('slog_xm').run(conn) | |
conn.close() | |
# get all slog tables | |
gdb = Gdbm(*mdb) | |
tables = [i['Tables_in_slog'] for i in gdb.query('show tables')] | |
gdb.close() | |
# setup pool | |
pool = ThreadPool(5) | |
start = time.time() | |
# begin jobs | |
jobs = [pool.spawn(table_fetch, t) for t in tables] | |
gevent.wait(jobs) | |
# print time | |
delay = time.time() - start | |
print('Running 5 threads. Take about %.3f seconds' % delay) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment