Skip to content

Instantly share code, notes, and snippets.

@lyhapple
Created July 4, 2013 07:13
Show Gist options
  • Save lyhapple/5925558 to your computer and use it in GitHub Desktop.
Save lyhapple/5925558 to your computer and use it in GitHub Desktop.
相同库中,表结构类似的两个表的数据复制
#coding=utf-8
# Created with PyCharm.
# Author: liyinhui
# Date: 13-7-3
# Time: 上午10:33
#
from _mysql import IntegrityError
import logging
import traceback
import MySQLdb
import datetime
#========================数据库配置信息================
HOST = "192.168.1.240"
USER = "portal"
PWD = "portal.rzrk"
DB = "ttmgrportal"
#需要复制数据的表名称 (源表,目标表),运行时需检查数据库中是否已经存在这些表
SOURCE_TARGET = [
("details_cftaccountdetail", "idata_caccountdetail"),
("details_cftdealdetail", "idata_cdealdetail"),
("details_cftorderdetail", "idata_corderdetail"),
("details_cftpositiondetail", "idata_cpositiondetail"),
("details_cftpositionstatics", "idata_cpositionstatics"),
]
# 查询一次的最大数据量
MAX_LIMIT = 10000
#=====================logger 配置===================
# 创建一个logger
logger = logging.getLogger('mylogger')
# 日志级别: logging.DEBUG, logging.INFO, logging.ERROR
logger.setLevel(logging.INFO)
#禁止向控制台输出日志
logger.disabled = 0
# 创建一个handler,用于写入日志文件
fh = logging.FileHandler('copy_data.log')
fh.setLevel(logging.DEBUG)
# 再创建一个handler,用于输出到控制台
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# 给logger添加handler
logger.addHandler(fh)
logger.addHandler(ch)
# 主键分隔符
KEY_SPLITTER = "____"
# 表字段结构语句模板
COLUMNS_SQL = "select COLUMN_NAME from information_schema.columns where table_name='%s' AND \
TABLE_SCHEMA='ttmgrportal' order by COLUMN_NAME ASC;"
# 插入语句模板
INSERT_SQL = "insert into %s (%s) values (%s)"
def get_conn_and_cursor():
"""获取数据库连接"""
conn = MySQLdb.connect(host=HOST, user=USER, passwd=PWD, db=DB, charset="utf8")
return conn, conn.cursor()
def close_conn(cursor, conn):
"""关闭数据库连接"""
try:
if conn and cursor:
cursor.close()
conn.close()
except Exception:
traceback.print_exc()
def get_time():
"""获取当前时间"""
return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
class DetailClass(object):
def __init__(self, **kwargs):
for k, v in kwargs.iteritems():
setattr(self, k, v)
class IdataClass(object):
def __init__(self, table, columns, detail):
self.table = table
self.columns = columns
self.columns_str = ",".join(self.columns)
self.detail = detail
self.set_attrs()
def gen_idata_key(self):
"""组装idata表的主键,由detail的 m_strTagKey + KEY_SPLITTER +m_strTradingDay 组成"""
if not self.detail:
return None
return str(getattr(self.detail, 'm_strTagKey', '')) + KEY_SPLITTER \
+ str(getattr(self.detail, 'm_strTradingDay', ''))
def set_attrs(self):
"""设置Idata表字段的值,可以在里面添加对特殊字段的处理,如 brokerID"""
for col in self.columns:
key = getattr(self.detail, col, None)
if col == "m_priKey_tag":
key = self.gen_idata_key()
if not key:
raise Exception("idata key gen error!!!!!!")
elif col == "m_strBrokerID":
key = getattr(self.detail, "m_nBrokerID", None)
setattr(self, col, key)
def insert_sql(self):
"""创建insert sql"""
values = []
for col in self.columns:
val = getattr(self, col, None)
if val is None:
values.append("null")
else:
values.append("'%s'" % val)
values = ",".join(values)
sql = INSERT_SQL % (self.table, self.columns_str, values)
return sql
class Main(object):
"""复制数据的主类,对象调用copy_to_idata()即可实现复制"""
def __init__(self, conn, cursor, source_table, target_table):
"""复制数据的主类初始化函数"""
self.conn, self.cursor = conn, cursor
self.source_table = ""
self.target_table = ""
self.source_table = source_table
self.target_table = target_table
self.source_columns = self.get_columns(self.source_table)
self.target_columns = self.get_columns(self.target_table)
self.query_columns = ",".join(self.source_columns)
def get_columns(self, table_name):
"""获取某表的所有列名"""
self.cursor.execute(COLUMNS_SQL % table_name)
columns = []
for col, in self.cursor.fetchall():
columns.append(col)
return columns
def query_source(self, start, rows):
"""
从源表中查询列的数据
start: 查询的开始
rows : 一次查询最大多少钱行数据
"""
self.cursor.execute("select %s from %s order by id limit %d, %d" %
(self.query_columns, self.source_table, start, rows))
source_datas = []
for row in self.cursor.fetchall():
source_datas.append(row)
return source_datas
def make_detail_instances(self, start, rows):
"""将查询出的数据转换成DetailClass对象,并放进列表里"""
source_datas = self.query_source(start, rows)
detail_instances = []
for data in source_datas:
if data:
dc = DetailClass(**dict(zip(self.source_columns, data)))
detail_instances.append(dc)
return detail_instances
def count_total(self):
"""统计表数据总数"""
self.cursor.execute("select count(id) from %s" % self.source_table)
num = self.cursor.fetchone()
return num[0]
def limits(self):
"""查询的分段限制"""
total = self.count_total()
mo = total % MAX_LIMIT
li = [i * MAX_LIMIT for i in range(0, total / MAX_LIMIT)]
if mo != 0:
li.append(total / MAX_LIMIT * MAX_LIMIT)
return total, li
def copy_to_idata(self):
"""主要方法"""
start_time = get_time()
logger.info("source_table: %s \ntarget_table: %s\nstart at %s" %
(self.source_table, self.target_table, start_time))
total, limits = self.limits()
logger.info("total: %s " % int(total))
for p, start in enumerate(limits):
logger.info("%s. processing: %s --- %s, please wait....." % (p, start, start + MAX_LIMIT))
details = self.make_detail_instances(start, MAX_LIMIT)
for i, detail in enumerate(details):
idata = IdataClass(self.target_table, self.target_columns, detail)
sql = idata.insert_sql()
try:
self.cursor.execute(sql)
logger.debug("Success: %s, sql: %s" % (i, sql))
except IntegrityError, ie:
logger.error(" Warning: Duplicate key 'PRIMARY, ignore this sql : %s " % sql)
self.conn.commit()
end_time = get_time()
logger.info("Done.....\nsource_table: %s\ntarget_table: %s\nend at %s" %
(self.source_table, self.target_table, end_time))
if __name__ == "__main__":
conn, cursor, target_table = None, None, None
try:
conn, cursor = get_conn_and_cursor()
for source, target in SOURCE_TARGET:
target_table = target
m = Main(conn, cursor, source, target)
m.copy_to_idata()
except Exception, e:
traceback.print_exc()
if target_table and conn and cursor:
cursor.execute("truncate table %s" % target_table)
finally:
close_conn(conn, cursor)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment