Created
November 27, 2012 08:00
-
-
Save elprup/4153049 to your computer and use it in GitHub Desktop.
MySQL tables svn-like sync between server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/python | |
''' | |
MySQL tables svn-like sync between server | |
@ through ssh connect | |
@ incremental sync | |
@ can retry and redo | |
@ svn-style mode | |
@ timeout | |
version 1.2 | |
modified 2012-12-11 | |
usage: | |
checkout set up intial mysql table information, check from remote mysql server to local | |
update update table information | |
''' | |
import re | |
import os | |
import logging | |
import subprocess | |
import threading | |
logging.getLogger().setLevel(logging.DEBUG) | |
class TableExistedExeption(Exception): | |
def __init__(self, table): | |
self.table = table | |
def __str__(self): | |
return '%s already existed.' % self.table | |
class ConfigFileNotSetException(Exception): | |
def __init__(self): | |
pass | |
def __str__(self): | |
return 'config file name not set' | |
class ConfigNotFoundException(Exception): | |
def __init__(self, key): | |
self.key = key | |
def __str__(self): | |
return 'config not found: %s' % self.key | |
class CommandTimeoutException(Exception): | |
def __init__(self): | |
pass | |
def __str__(self): | |
return 'command timeout' | |
class Command(object): | |
''' | |
refer jcollado code:http://stackoverflow.com/questions/1191374/subprocess-with-timeout | |
''' | |
def __init__(self, cmd): | |
self.cmd = cmd | |
self.process = None | |
def run(self, timeout): | |
def target(): | |
logging.debug('Thread started') | |
self.process = subprocess.Popen(self.cmd, shell=True, stdout=subprocess.PIPE) | |
self.out = self.process.communicate()[0] | |
logging.debug('Thread finished') | |
thread = threading.Thread(target=target) | |
thread.start() | |
thread.join(timeout) | |
if thread.is_alive(): | |
logging.debug('Terminating process') | |
self.process.terminate() | |
thread.join() | |
raise CommandTimeoutException | |
return self.out | |
def shell_exec(command, timeout=None, dryrun=False): | |
logging.info('shell_exec command:'+command) | |
result = None | |
if not dryrun: | |
command = Command(command) | |
result = command.run(timeout=timeout) | |
logging.debug('shell_exec result: %s' % result) | |
return result | |
def ssh_command(param, command=''): | |
def quote(s): | |
return s.replace('\\', '\\\\').replace("\"",'\\\"') | |
sshcmd_list = ['ssh', param, '"%s"' % quote(command)] | |
sshcmd = ' '.join(sshcmd_list) | |
return sshcmd | |
def mysql_command(conn, sql=''): | |
# check if table exists | |
def quote(s): | |
return s.replace('\\', '\\\\').replace("\"",'\\\"') | |
command = 'mysql -u%s -h%s %s' % (conn.user, conn.host, conn.database) | |
if conn.password and len(conn.password): | |
command += ' -p%s' % conn.password | |
if sql != '': | |
command += ' -e "%s"' % quote(sql) | |
return command | |
def check_column_type(conn, table, column, ssh_param=None): | |
'''check column type''' | |
sql = r'''select data_type from information_schema.columns where table_name = "%s" and table_schema="%s" and column_name="%s"'''\ | |
% (table, conn.database, column) | |
command = mysql_command(conn, sql) | |
if ssh_param: | |
command = ssh_command(param, command) | |
columntype = shell_exec(command, dryrun=False).split('\n')[1].strip() | |
logging.debug('check table %s field %s : %s' % (table, column, columntype)) | |
return columntype | |
def mysqldump_command(conn, table, create=False, where=None, drop=False): | |
# check if table exists | |
def quote(s): | |
return s.replace('\\', '\\\\').replace("\"",'\\\"') | |
command = 'mysqldump --compact --compress -u%s -h%s %s %s' % (conn.user, conn.host, conn.database, table) | |
if conn.password and len(conn.password): | |
command += ' -p%s' % conn.password | |
if not create: | |
command += ' --no-create-info' | |
if drop: | |
command += ' --add-drop-table' | |
if where: | |
command += ' --where "%s"' % quote(where) | |
return command | |
class Connection(object): | |
def __init__(self, host=None, user=None, password=None, database=None): | |
self.host, self.user, self.password, self.database = \ | |
host, user, password, database | |
def __repr__(self): | |
return '%s:%s@%s:%s' % (self.user, self.password, self.host, self.database) | |
def load_from_string(self, s): | |
regex = '[:@]' | |
l = re.split(regex, s) | |
self.user, self.password, self.host, self.database = l[0], l[1], l[2], l[3] | |
class Config(object): | |
'''key: (local_conn, table) | |
value: [remote_conn, pk, last_pk] | |
''' | |
def __init__(self, filename=None): | |
self.filename = filename | |
self.configdict = {} | |
if filename is not None: | |
self._load() | |
def _load(self): | |
try: | |
f = open(self.filename, 'r') | |
except IOError: | |
logging.info('config file not found, use empty config.') | |
return | |
for line in f.xreadlines(): | |
if line.startswith('#'): | |
continue | |
row = line.split() | |
if len(row) == 5: | |
self.configdict[(row[0],row[1])] = [row[2], row[3], row[4]] | |
f.close() | |
def get(self, key): | |
return self.configdict.get(key,None) | |
def set(self, key, value): | |
self.configdict[key] = value | |
def store(self, filename=None): | |
content = '' | |
for k,v in self.configdict.items(): | |
content += ' '.join([str(k[0]), str(k[1]), str(v[0]), str(v[1]), str(v[2])]) + '\n' | |
store_file = self.filename if filename is None else filename | |
if store_file is not None: | |
f = open(store_file, 'w') | |
f.write(content) | |
f.close() | |
else: | |
raise ConfigFileNotSetException() | |
def checkout_table(remote_conn, local_conn, table, ssh_param=None, pk='id', dryrun=False, overwrite=False): | |
# check if table exists | |
tables_result = shell_exec(command=mysql_command(local_conn, "show tables"), dryrun=False) | |
local_tables = [row for row in tables_result.split('\n')[1:] if len(row) > 0] | |
logging.debug('local tables:%s' % local_tables) | |
drop = False | |
if table in local_tables: | |
if not overwrite: | |
raise TableExistedExeption(table) | |
else: | |
logging.warning('Table %s already existed, now overwrite it.' % table) | |
drop = True | |
# use pipe to redirect mysqldump from remote to local | |
remote_command = mysqldump_command(remote_conn, table, create=True, drop=drop) | |
local_command = mysql_command(local_conn) | |
if ssh_param: | |
remote_command = ssh_command(ssh_param, remote_command) | |
command = remote_command + ' | ' + local_command | |
shell_exec(command, timeout=600, dryrun=dryrun) | |
# record relationship | |
if dryrun: | |
return | |
config = Config('.mysqlsvn') | |
# test pk type | |
last_pk = shell_exec(mysql_command(local_conn, "SELECT max(%s) FROM %s" % (pk, table))).split('\n')[1].strip() | |
config.set((local_conn,table), (remote_conn, pk, last_pk)) | |
config.store() | |
def update_table(local_conn, table, ssh_param=None, dryrun=False): | |
# search config file to get remote conn | |
config = Config('.mysqlsvn') | |
configname = (str(local_conn), table) | |
myconfig = config.get(configname) | |
if myconfig is None: | |
raise ConfigNotFoundException( str(configname) ) | |
remote_conn_string, pk, last_pk = myconfig[0], myconfig[1], myconfig[2] | |
# generate remote conn | |
remote_conn = Connection() | |
remote_conn.load_from_string(remote_conn_string) | |
# doing update | |
if check_column_type(local_conn, table, pk) == 'varchar': | |
remote_command = mysqldump_command(remote_conn, table, create=False, where="%s > \"%s\"" % (pk, last_pk)) | |
else: | |
remote_command = mysqldump_command(remote_conn, table, create=False, where="%s > %s" % (pk, last_pk)) | |
local_command = mysql_command(local_conn) | |
if ssh_param: | |
remote_command = ssh_command(ssh_param, remote_command) | |
command = remote_command + ' | ' + local_command | |
shell_exec(command, timeout=60, dryrun=dryrun) | |
if dryrun: | |
return | |
last_pk = shell_exec(mysql_command(local_conn, "SELECT max(%s) FROM %s" % (pk, table))).split('\n')[1].strip() | |
config.set((str(local_conn),table), (remote_conn, pk, last_pk)) | |
logging.debug(config.configdict) | |
config.store() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment