Created
July 4, 2013 07:13
-
-
Save lyhapple/5925558 to your computer and use it in GitHub Desktop.
相同库中,表结构类似的两个表的数据复制
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#coding=utf-8 | |
# Created with PyCharm. | |
# Author: liyinhui | |
# Date: 13-7-3 | |
# Time: 上午10:33 | |
# | |
from _mysql import IntegrityError | |
import logging | |
import traceback | |
import MySQLdb | |
import datetime | |
#========================数据库配置信息================ | |
HOST = "192.168.1.240" | |
USER = "portal" | |
PWD = "portal.rzrk" | |
DB = "ttmgrportal" | |
#需要复制数据的表名称 (源表,目标表),运行时需检查数据库中是否已经存在这些表 | |
SOURCE_TARGET = [ | |
("details_cftaccountdetail", "idata_caccountdetail"), | |
("details_cftdealdetail", "idata_cdealdetail"), | |
("details_cftorderdetail", "idata_corderdetail"), | |
("details_cftpositiondetail", "idata_cpositiondetail"), | |
("details_cftpositionstatics", "idata_cpositionstatics"), | |
] | |
# 查询一次的最大数据量 | |
MAX_LIMIT = 10000 | |
#=====================logger 配置=================== | |
# 创建一个logger | |
logger = logging.getLogger('mylogger') | |
# 日志级别: logging.DEBUG, logging.INFO, logging.ERROR | |
logger.setLevel(logging.INFO) | |
#禁止向控制台输出日志 | |
logger.disabled = 0 | |
# 创建一个handler,用于写入日志文件 | |
fh = logging.FileHandler('copy_data.log') | |
fh.setLevel(logging.DEBUG) | |
# 再创建一个handler,用于输出到控制台 | |
ch = logging.StreamHandler() | |
ch.setLevel(logging.DEBUG) | |
# 给logger添加handler | |
logger.addHandler(fh) | |
logger.addHandler(ch) | |
# 主键分隔符 | |
KEY_SPLITTER = "____" | |
# 表字段结构语句模板 | |
COLUMNS_SQL = "select COLUMN_NAME from information_schema.columns where table_name='%s' AND \ | |
TABLE_SCHEMA='ttmgrportal' order by COLUMN_NAME ASC;" | |
# 插入语句模板 | |
INSERT_SQL = "insert into %s (%s) values (%s)" | |
def get_conn_and_cursor(): | |
"""获取数据库连接""" | |
conn = MySQLdb.connect(host=HOST, user=USER, passwd=PWD, db=DB, charset="utf8") | |
return conn, conn.cursor() | |
def close_conn(cursor, conn): | |
"""关闭数据库连接""" | |
try: | |
if conn and cursor: | |
cursor.close() | |
conn.close() | |
except Exception: | |
traceback.print_exc() | |
def get_time(): | |
"""获取当前时间""" | |
return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
class DetailClass(object): | |
def __init__(self, **kwargs): | |
for k, v in kwargs.iteritems(): | |
setattr(self, k, v) | |
class IdataClass(object): | |
def __init__(self, table, columns, detail): | |
self.table = table | |
self.columns = columns | |
self.columns_str = ",".join(self.columns) | |
self.detail = detail | |
self.set_attrs() | |
def gen_idata_key(self): | |
"""组装idata表的主键,由detail的 m_strTagKey + KEY_SPLITTER +m_strTradingDay 组成""" | |
if not self.detail: | |
return None | |
return str(getattr(self.detail, 'm_strTagKey', '')) + KEY_SPLITTER \ | |
+ str(getattr(self.detail, 'm_strTradingDay', '')) | |
def set_attrs(self): | |
"""设置Idata表字段的值,可以在里面添加对特殊字段的处理,如 brokerID""" | |
for col in self.columns: | |
key = getattr(self.detail, col, None) | |
if col == "m_priKey_tag": | |
key = self.gen_idata_key() | |
if not key: | |
raise Exception("idata key gen error!!!!!!") | |
elif col == "m_strBrokerID": | |
key = getattr(self.detail, "m_nBrokerID", None) | |
setattr(self, col, key) | |
def insert_sql(self): | |
"""创建insert sql""" | |
values = [] | |
for col in self.columns: | |
val = getattr(self, col, None) | |
if val is None: | |
values.append("null") | |
else: | |
values.append("'%s'" % val) | |
values = ",".join(values) | |
sql = INSERT_SQL % (self.table, self.columns_str, values) | |
return sql | |
class Main(object): | |
"""复制数据的主类,对象调用copy_to_idata()即可实现复制""" | |
def __init__(self, conn, cursor, source_table, target_table): | |
"""复制数据的主类初始化函数""" | |
self.conn, self.cursor = conn, cursor | |
self.source_table = "" | |
self.target_table = "" | |
self.source_table = source_table | |
self.target_table = target_table | |
self.source_columns = self.get_columns(self.source_table) | |
self.target_columns = self.get_columns(self.target_table) | |
self.query_columns = ",".join(self.source_columns) | |
def get_columns(self, table_name): | |
"""获取某表的所有列名""" | |
self.cursor.execute(COLUMNS_SQL % table_name) | |
columns = [] | |
for col, in self.cursor.fetchall(): | |
columns.append(col) | |
return columns | |
def query_source(self, start, rows): | |
""" | |
从源表中查询列的数据 | |
start: 查询的开始 | |
rows : 一次查询最大多少钱行数据 | |
""" | |
self.cursor.execute("select %s from %s order by id limit %d, %d" % | |
(self.query_columns, self.source_table, start, rows)) | |
source_datas = [] | |
for row in self.cursor.fetchall(): | |
source_datas.append(row) | |
return source_datas | |
def make_detail_instances(self, start, rows): | |
"""将查询出的数据转换成DetailClass对象,并放进列表里""" | |
source_datas = self.query_source(start, rows) | |
detail_instances = [] | |
for data in source_datas: | |
if data: | |
dc = DetailClass(**dict(zip(self.source_columns, data))) | |
detail_instances.append(dc) | |
return detail_instances | |
def count_total(self): | |
"""统计表数据总数""" | |
self.cursor.execute("select count(id) from %s" % self.source_table) | |
num = self.cursor.fetchone() | |
return num[0] | |
def limits(self): | |
"""查询的分段限制""" | |
total = self.count_total() | |
mo = total % MAX_LIMIT | |
li = [i * MAX_LIMIT for i in range(0, total / MAX_LIMIT)] | |
if mo != 0: | |
li.append(total / MAX_LIMIT * MAX_LIMIT) | |
return total, li | |
def copy_to_idata(self): | |
"""主要方法""" | |
start_time = get_time() | |
logger.info("source_table: %s \ntarget_table: %s\nstart at %s" % | |
(self.source_table, self.target_table, start_time)) | |
total, limits = self.limits() | |
logger.info("total: %s " % int(total)) | |
for p, start in enumerate(limits): | |
logger.info("%s. processing: %s --- %s, please wait....." % (p, start, start + MAX_LIMIT)) | |
details = self.make_detail_instances(start, MAX_LIMIT) | |
for i, detail in enumerate(details): | |
idata = IdataClass(self.target_table, self.target_columns, detail) | |
sql = idata.insert_sql() | |
try: | |
self.cursor.execute(sql) | |
logger.debug("Success: %s, sql: %s" % (i, sql)) | |
except IntegrityError, ie: | |
logger.error(" Warning: Duplicate key 'PRIMARY, ignore this sql : %s " % sql) | |
self.conn.commit() | |
end_time = get_time() | |
logger.info("Done.....\nsource_table: %s\ntarget_table: %s\nend at %s" % | |
(self.source_table, self.target_table, end_time)) | |
if __name__ == "__main__": | |
conn, cursor, target_table = None, None, None | |
try: | |
conn, cursor = get_conn_and_cursor() | |
for source, target in SOURCE_TARGET: | |
target_table = target | |
m = Main(conn, cursor, source, target) | |
m.copy_to_idata() | |
except Exception, e: | |
traceback.print_exc() | |
if target_table and conn and cursor: | |
cursor.execute("truncate table %s" % target_table) | |
finally: | |
close_conn(conn, cursor) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment