Created
July 15, 2015 17:11
-
-
Save abg/0040457bd003dca2de04 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Iterative table checksum of tables with composite keys | |
This uses an algorithm similar to mk-table-sync's Nibble | |
algorithm but intended purely for checking if a master/slave | |
are in sync. | |
""" | |
import os, sys | |
import time | |
import warnings | |
import optparse | |
import logging | |
from string import Template | |
from textwrap import dedent | |
from operator import itemgetter | |
import MySQLdb | |
def quote_identifier(name): | |
return '`%s`' % name.replace('`', '``') | |
def quote_string(value): | |
return "'%s'" % value.replace("'", "\\'") | |
class Table(tuple): | |
__slots__ = () | |
schema = property(itemgetter(0)) | |
name = property(itemgetter(1)) | |
engine = property(itemgetter(2)) | |
fields = property(itemgetter(3)) | |
indexes = property(itemgetter(4)) | |
@property | |
def identifier(self): | |
return '%s.%s' % ( | |
quote_identifier(self.schema), | |
quote_identifier(self.name) | |
) | |
@property | |
def pk(self): | |
for idx in self.indexes: | |
if idx.key_name == 'PRIMARY': | |
yield quote_identifier(idx.column_name) | |
@property | |
def columns(self): | |
for col in self.fields: | |
yield quote_identifier(col.field) | |
@property | |
def null_columns(self): | |
for col in self.fields: | |
if col.null == 'YES': | |
yield quote_identifier(col.field) | |
@property | |
def null_bitmap(self): | |
null_columns = list(self.null_columns) | |
if null_columns: | |
return 'CONCAT(%s)' % ', '.join(['ISNULL(%s)' % col | |
for col in null_columns]) | |
else: | |
return 'NULL' | |
class Column(tuple): | |
__slots__ = () | |
field = property(itemgetter(0)) | |
column_type = property(itemgetter(1)) | |
null = property(itemgetter(2)) | |
key = property(itemgetter(3)) | |
default = property(itemgetter(4)) | |
extra = property(itemgetter(5)) | |
class Index(tuple): | |
"""a MySQL index | |
:attr table: name of the table this index belongs to | |
:attr non_unique: boolean whether this table is unique | |
:attr key_name: name of the index | |
:attr seq_in_index: position of column within this index | |
:attr column_name: name of the column within the index | |
:attr collation: collation of the column, if textual | |
:attr cardinality: cardinality of this index | |
:attr sub_part: prefix of this index | |
:attr packed: boolean whether index is packed | |
:attr null: whether this column within the index can be null | |
:attr index_type: type of the index | |
:attr comment: comment on the index | |
:attr index_comment: comment on the index | |
""" | |
__slots__ = () | |
table = property(itemgetter(0)) | |
non_unique = property(itemgetter(1)) | |
key_name = property(itemgetter(2)) | |
seq_in_index = property(itemgetter(3)) | |
column_name = property(itemgetter(4)) | |
collation = property(itemgetter(5)) | |
cardinality = property(itemgetter(6)) | |
sub_part = property(itemgetter(7)) | |
packed = property(itemgetter(8)) | |
null = property(itemgetter(9)) | |
index_type = property(itemgetter(10)) | |
comment = property(itemgetter(11)) | |
index_comment = property(itemgetter(12)) | |
class Checksum(tuple): | |
__slots__ = () | |
pk_values = property(itemgetter(0)) | |
chunk_number = property(itemgetter(1)) | |
crc = property(itemgetter(2)) | |
row_count = property(itemgetter(3)) | |
runtime = property(itemgetter(4)) | |
class Client(object): | |
def __init__(self, *args, **kwargs): | |
self.connection = MySQLdb.connect(*args, **kwargs) | |
def __getattr__(self, name): | |
return getattr(self.connection, name) | |
def show_databases(self): | |
cursor = self.cursor() | |
if cursor.execute('SHOW DATABASES'): | |
databases = [name for name, in cursor] | |
else: | |
databases = None | |
cursor.close() | |
return databases | |
def show_tables(self, db, tbl): | |
cursor = self.cursor() | |
if cursor.execute("SHOW TABLE STATUS FROM %s LIKE '%s'" % (db, tbl)): | |
names = [c[0].lower() for c in cursor.description] | |
for row in cursor: | |
row_dict = dict(zip(names, row)) | |
if row_dict['engine'] is None: | |
continue | |
fields = self.show_fields(db, tbl) | |
indexes = self.show_indexes(db, tbl) | |
yield Table([db, | |
row_dict['name'], | |
row_dict['engine'], | |
fields, | |
indexes]) | |
cursor.close() | |
def show_fields(self, db, tbl): | |
cursor = self.cursor() | |
cursor.execute("SHOW FIELDS FROM %s.%s" % (db, tbl)) | |
result = [Column(row) for row in cursor] | |
cursor.close() | |
return result | |
def show_indexes(self, db, tbl): | |
cursor = self.cursor() | |
cursor.execute("SHOW INDEX FROM %s.%s" % (db, tbl)) | |
result = [Index(row) for row in cursor] | |
cursor.close() | |
return result | |
class TableChecksumOption(object): | |
def __init__(self, | |
table, | |
limit, | |
replicate_table=None): | |
self.table = table | |
self.limit = limit | |
self.replicate_table = replicate_table | |
class TableChecksum(object): | |
def __init__(self, connection, options): | |
self.connection = connection | |
self.options = options | |
self._last_checksum = None | |
self._chunk_number = 0 | |
self._start = None | |
def checksum(self): | |
while True: | |
query = self._checksum_create_query() | |
logging.debug(query) | |
cursor = self.connection.cursor() | |
self._start = time.time() | |
cursor.execute(query) | |
checksum = self._retrieve_checksum() | |
self._last_checksum = checksum | |
logging.debug("checksum: %r", checksum) | |
if self.options.replicate_table: | |
self._update_checksum() | |
self._chunk_number += 1 | |
yield checksum | |
if checksum.row_count < self.options.limit: | |
break | |
def _retrieve_checksum(self): | |
query = self._checksum_fetch_query() | |
pk = list(self.options.table.pk) | |
logging.debug(query) | |
cursor = self.connection.cursor() | |
if cursor.execute(query): | |
row = cursor.fetchone() | |
checksum = Checksum((row[0:len(pk)],) + row[len(pk):] + | |
(time.time() - self._start,)) | |
else: | |
checksum = None | |
cursor.close() | |
return checksum | |
def _update_checksum(self): | |
"""Update the checksum table to set the master_{crc,cnt} for the last | |
checksum""" | |
query = self._checksum_update_query() | |
logging.debug(query) | |
cursor = self.connection.cursor() | |
if cursor.execute(query) != 1: | |
raise ChecksumError("Failed to finalize last checksum record") | |
cursor.close() | |
return cursor.rowcount | |
def _checksum_query_boundaries(self): | |
checksum = self._last_checksum | |
if not checksum: | |
return '' | |
result = [] | |
pks = list(self.options.table.pk) | |
while pks: | |
clause = [] | |
for i, col in enumerate(pks): | |
op = ['=', '>'][i == len(pks) - 1] | |
clause.append("%s %s '%s'" % (col, op, checksum.pk_values[i])) | |
result.append('(' + ' AND '.join(clause) + ')') | |
del clause | |
pks.pop(-1) | |
return '(%s)' % ' OR '.join(result) | |
def _checksum_create_query(self): | |
pk_incr = ', '.join(['@c_%d := %s' % (i, col) | |
for i, col in enumerate(self.options.table.pk)]) | |
boundaries = self._checksum_query_boundaries() | |
where = '' | |
if boundaries: | |
where = 'WHERE ' + boundaries | |
replace = '' | |
if self.options.replicate_table: | |
replace = (''' | |
REPLACE INTO %s (db, tbl, chunk, boundaries, this_crc, this_cnt, ts) | |
''').strip() % self.options.replicate_table.identifier | |
sql = Template(dedent(''' | |
/*${comment}*/ | |
${replace} | |
SELECT SQL_NO_CACHE | |
${db}, | |
${tbl}, | |
${chunk_number}, | |
${boundaries}, | |
@checksum := COALESCE( | |
HEX( | |
BIT_XOR( | |
CAST( | |
CRC32( | |
CONCAT_WS('#', ${columns}, | |
${null_bitmap}) | |
) AS UNSIGNED | |
) | |
) | |
), 0) AS `crc`, | |
@row_count := COUNT(*) AS `row_count`, | |
NOW() | |
FROM ( | |
SELECT ${pk_incr}, ${columns} | |
FROM ${table} FORCE INDEX (PRIMARY) | |
${where} | |
ORDER BY ${pk} | |
LIMIT ${limit} | |
) AS `checksum_chunk` | |
''').strip()).substitute( | |
db=quote_string(self.options.table.schema), | |
tbl=quote_string(self.options.table.name), | |
chunk_number=self._chunk_number, | |
boundaries=quote_string(boundaries), | |
replace=replace, | |
columns=', '.join(self.options.table.columns), | |
null_bitmap=self.options.table.null_bitmap, | |
pk_incr=pk_incr, | |
table=self.options.table.identifier, | |
where=where, | |
pk=', '.join(self.options.table.pk), | |
limit=self.options.limit, | |
comment='chunk : %d' % self._chunk_number, | |
).replace("\n\n", "\n") | |
return sql | |
def _checksum_fetch_query(self): | |
last_pk = ', '.join(['@c_%d AS %s' % (i, name) | |
for i, name in enumerate(self.options.table.pk)]) | |
sql = Template(dedent(''' | |
SELECT ${last_pk}, | |
${chunk_number}, | |
@checksum AS `checksum`, @row_count AS `row_count`; | |
''' | |
).strip()).substitute(last_pk=last_pk, | |
chunk_number=self._chunk_number) | |
return sql | |
def _checksum_update_query(self): | |
db, tbl = (self.options.table.schema, | |
self.options.table.name) | |
sql = Template(dedent(''' | |
UPDATE ${checksum_table} | |
SET master_crc = '${master_crc}', | |
master_cnt = ${master_cnt} | |
WHERE db = ${db} | |
AND tbl = ${tbl} | |
AND chunk = ${chunk_number} | |
''').strip()).substitute( | |
checksum_table=self.options.replicate_table.identifier, | |
master_crc=self._last_checksum.crc, | |
master_cnt=self._last_checksum.row_count, | |
db=quote_string(self.options.table.schema), | |
tbl=quote_string(self.options.table.name), | |
chunk_number=self._chunk_number | |
) | |
return sql | |
def create_replicate_table(connection, tbl): | |
db, tbl = tbl.split('.') | |
query = Template(dedent(''' | |
CREATE TABLE IF NOT EXISTS ${checksum_table} ( | |
db char(64) NOT NULL, | |
tbl char(64) NOT NULL, | |
chunk int NOT NULL, | |
boundaries char(255) NOT NULL, | |
this_crc char(40) NOT NULL, | |
this_cnt int NOT NULL, | |
master_crc char(40) NULL, | |
master_cnt int NULL, | |
ts timestamp NOT NULL, | |
PRIMARY KEY (db, tbl, chunk) | |
); | |
''').strip()).substitute( | |
checksum_table='%s.%s' % (quote_identifier(db), quote_identifier(tbl)) | |
) | |
cursor = connection.cursor() | |
cursor.execute(query) | |
replicate_table, = connection.show_tables(db, tbl) | |
return replicate_table | |
def initialize_connection(connection): | |
cursor = connection.cursor() | |
cursor.execute('SET @checksum := 0, ' | |
'@row_count := 0 ' | |
'/*!50108 ,@@binlog_format := STATEMENT */;') | |
def main(args=None): | |
warnings.filterwarnings('ignore') | |
parser = optparse.OptionParser() | |
parser.add_option('--table', action='append', default=[]) | |
parser.add_option('--replicate') | |
parser.add_option('--limit', type=int, dest='limit', default=5000) | |
parser.add_option('--wait', type=float, default=0) | |
parser.add_option('--debug', action='store_true', default=False) | |
opts, args = parser.parse_args() | |
log_level = logging.INFO | |
if opts.debug: | |
log_level = logging.DEBUG | |
logging.basicConfig(format='%(levelname)s %(message)s', | |
level=log_level) | |
try: | |
connection = Client(read_default_group='client', charset='utf8') | |
initialize_connection(connection) | |
except MySQLdb.DatabaseError, exc: | |
logging.error("Failed to connect to MySQL [%d] %s", *exc.args) | |
return 1 | |
replicate = None | |
try: | |
try: | |
if opts.replicate: | |
replicate = create_replicate_table(connection, opts.replicate) | |
header = ( | |
'DATABASE', | |
'TABLE', | |
'CHUNK', | |
'HOST', | |
'ENGINE', | |
'COUNT', | |
'CHECKSUM', | |
'TIME', | |
'WAIT', | |
'STAT', | |
'LAG' | |
) | |
print '\t'.join(header) | |
fmt = '\t'.join(['%-4s' for _ in header]) | |
for table in opts.table: | |
table, = connection.show_tables(*table.split('.')) | |
options = TableChecksumOption(table, opts.limit, replicate) | |
table_checksum = TableChecksum(connection, options) | |
total_rows = 0 | |
for checksum in table_checksum.checksum(): | |
print fmt % ( | |
table.schema, | |
table.name, | |
checksum.chunk_number, | |
'localhost', | |
table.engine, | |
checksum.row_count, | |
checksum.crc, | |
'%.2fms' % (checksum.runtime*1000.0), | |
0, | |
'NULL', | |
'NULL', | |
) | |
total_rows += checksum.row_count | |
if opts.wait > 0: | |
logging.debug("Sleeping %.4f seconds", opts.wait) | |
time.sleep(opts.wait) | |
print "# %s.%s total rows: %d" % ( | |
table.schema, | |
table.name, | |
total_rows | |
) | |
except MySQLdb.DatabaseError, exc: | |
logging.error("[%d] %s", *exc.args) | |
return 1 | |
finally: | |
try: | |
connection.close() | |
except: | |
pass | |
if __name__ == '__main__': | |
try: | |
sys.exit(main()) | |
except MySQLdb.DatabaseError, exc: | |
logging.error("[%d] %s", *exc.args) | |
sys.exit(1) | |
except KeyboardInterrupt: | |
logging.error("Interrupted") | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment