Skip to content

Instantly share code, notes, and snippets.

@abg
Created July 15, 2015 17:11
Show Gist options
  • Save abg/0040457bd003dca2de04 to your computer and use it in GitHub Desktop.
Save abg/0040457bd003dca2de04 to your computer and use it in GitHub Desktop.
"""
Iterative table checksum of tables with composite keys
This uses an algorithm similar to mk-table-sync's Nibble
algorithm but intended purely for checking if a master/slave
are in sync.
"""
import os, sys
import time
import warnings
import optparse
import logging
from string import Template
from textwrap import dedent
from operator import itemgetter
import MySQLdb
def quote_identifier(name):
return '`%s`' % name.replace('`', '``')
def quote_string(value):
return "'%s'" % value.replace("'", "\\'")
class Table(tuple):
__slots__ = ()
schema = property(itemgetter(0))
name = property(itemgetter(1))
engine = property(itemgetter(2))
fields = property(itemgetter(3))
indexes = property(itemgetter(4))
@property
def identifier(self):
return '%s.%s' % (
quote_identifier(self.schema),
quote_identifier(self.name)
)
@property
def pk(self):
for idx in self.indexes:
if idx.key_name == 'PRIMARY':
yield quote_identifier(idx.column_name)
@property
def columns(self):
for col in self.fields:
yield quote_identifier(col.field)
@property
def null_columns(self):
for col in self.fields:
if col.null == 'YES':
yield quote_identifier(col.field)
@property
def null_bitmap(self):
null_columns = list(self.null_columns)
if null_columns:
return 'CONCAT(%s)' % ', '.join(['ISNULL(%s)' % col
for col in null_columns])
else:
return 'NULL'
class Column(tuple):
__slots__ = ()
field = property(itemgetter(0))
column_type = property(itemgetter(1))
null = property(itemgetter(2))
key = property(itemgetter(3))
default = property(itemgetter(4))
extra = property(itemgetter(5))
class Index(tuple):
"""a MySQL index
:attr table: name of the table this index belongs to
:attr non_unique: boolean whether this table is unique
:attr key_name: name of the index
:attr seq_in_index: position of column within this index
:attr column_name: name of the column within the index
:attr collation: collation of the column, if textual
:attr cardinality: cardinality of this index
:attr sub_part: prefix of this index
:attr packed: boolean whether index is packed
:attr null: whether this column within the index can be null
:attr index_type: type of the index
:attr comment: comment on the index
:attr index_comment: comment on the index
"""
__slots__ = ()
table = property(itemgetter(0))
non_unique = property(itemgetter(1))
key_name = property(itemgetter(2))
seq_in_index = property(itemgetter(3))
column_name = property(itemgetter(4))
collation = property(itemgetter(5))
cardinality = property(itemgetter(6))
sub_part = property(itemgetter(7))
packed = property(itemgetter(8))
null = property(itemgetter(9))
index_type = property(itemgetter(10))
comment = property(itemgetter(11))
index_comment = property(itemgetter(12))
class Checksum(tuple):
__slots__ = ()
pk_values = property(itemgetter(0))
chunk_number = property(itemgetter(1))
crc = property(itemgetter(2))
row_count = property(itemgetter(3))
runtime = property(itemgetter(4))
class Client(object):
def __init__(self, *args, **kwargs):
self.connection = MySQLdb.connect(*args, **kwargs)
def __getattr__(self, name):
return getattr(self.connection, name)
def show_databases(self):
cursor = self.cursor()
if cursor.execute('SHOW DATABASES'):
databases = [name for name, in cursor]
else:
databases = None
cursor.close()
return databases
def show_tables(self, db, tbl):
cursor = self.cursor()
if cursor.execute("SHOW TABLE STATUS FROM %s LIKE '%s'" % (db, tbl)):
names = [c[0].lower() for c in cursor.description]
for row in cursor:
row_dict = dict(zip(names, row))
if row_dict['engine'] is None:
continue
fields = self.show_fields(db, tbl)
indexes = self.show_indexes(db, tbl)
yield Table([db,
row_dict['name'],
row_dict['engine'],
fields,
indexes])
cursor.close()
def show_fields(self, db, tbl):
cursor = self.cursor()
cursor.execute("SHOW FIELDS FROM %s.%s" % (db, tbl))
result = [Column(row) for row in cursor]
cursor.close()
return result
def show_indexes(self, db, tbl):
cursor = self.cursor()
cursor.execute("SHOW INDEX FROM %s.%s" % (db, tbl))
result = [Index(row) for row in cursor]
cursor.close()
return result
class TableChecksumOption(object):
def __init__(self,
table,
limit,
replicate_table=None):
self.table = table
self.limit = limit
self.replicate_table = replicate_table
class TableChecksum(object):
def __init__(self, connection, options):
self.connection = connection
self.options = options
self._last_checksum = None
self._chunk_number = 0
self._start = None
def checksum(self):
while True:
query = self._checksum_create_query()
logging.debug(query)
cursor = self.connection.cursor()
self._start = time.time()
cursor.execute(query)
checksum = self._retrieve_checksum()
self._last_checksum = checksum
logging.debug("checksum: %r", checksum)
if self.options.replicate_table:
self._update_checksum()
self._chunk_number += 1
yield checksum
if checksum.row_count < self.options.limit:
break
def _retrieve_checksum(self):
query = self._checksum_fetch_query()
pk = list(self.options.table.pk)
logging.debug(query)
cursor = self.connection.cursor()
if cursor.execute(query):
row = cursor.fetchone()
checksum = Checksum((row[0:len(pk)],) + row[len(pk):] +
(time.time() - self._start,))
else:
checksum = None
cursor.close()
return checksum
def _update_checksum(self):
"""Update the checksum table to set the master_{crc,cnt} for the last
checksum"""
query = self._checksum_update_query()
logging.debug(query)
cursor = self.connection.cursor()
if cursor.execute(query) != 1:
raise ChecksumError("Failed to finalize last checksum record")
cursor.close()
return cursor.rowcount
def _checksum_query_boundaries(self):
checksum = self._last_checksum
if not checksum:
return ''
result = []
pks = list(self.options.table.pk)
while pks:
clause = []
for i, col in enumerate(pks):
op = ['=', '>'][i == len(pks) - 1]
clause.append("%s %s '%s'" % (col, op, checksum.pk_values[i]))
result.append('(' + ' AND '.join(clause) + ')')
del clause
pks.pop(-1)
return '(%s)' % ' OR '.join(result)
def _checksum_create_query(self):
pk_incr = ', '.join(['@c_%d := %s' % (i, col)
for i, col in enumerate(self.options.table.pk)])
boundaries = self._checksum_query_boundaries()
where = ''
if boundaries:
where = 'WHERE ' + boundaries
replace = ''
if self.options.replicate_table:
replace = ('''
REPLACE INTO %s (db, tbl, chunk, boundaries, this_crc, this_cnt, ts)
''').strip() % self.options.replicate_table.identifier
sql = Template(dedent('''
/*${comment}*/
${replace}
SELECT SQL_NO_CACHE
${db},
${tbl},
${chunk_number},
${boundaries},
@checksum := COALESCE(
HEX(
BIT_XOR(
CAST(
CRC32(
CONCAT_WS('#', ${columns},
${null_bitmap})
) AS UNSIGNED
)
)
), 0) AS `crc`,
@row_count := COUNT(*) AS `row_count`,
NOW()
FROM (
SELECT ${pk_incr}, ${columns}
FROM ${table} FORCE INDEX (PRIMARY)
${where}
ORDER BY ${pk}
LIMIT ${limit}
) AS `checksum_chunk`
''').strip()).substitute(
db=quote_string(self.options.table.schema),
tbl=quote_string(self.options.table.name),
chunk_number=self._chunk_number,
boundaries=quote_string(boundaries),
replace=replace,
columns=', '.join(self.options.table.columns),
null_bitmap=self.options.table.null_bitmap,
pk_incr=pk_incr,
table=self.options.table.identifier,
where=where,
pk=', '.join(self.options.table.pk),
limit=self.options.limit,
comment='chunk : %d' % self._chunk_number,
).replace("\n\n", "\n")
return sql
def _checksum_fetch_query(self):
last_pk = ', '.join(['@c_%d AS %s' % (i, name)
for i, name in enumerate(self.options.table.pk)])
sql = Template(dedent('''
SELECT ${last_pk},
${chunk_number},
@checksum AS `checksum`, @row_count AS `row_count`;
'''
).strip()).substitute(last_pk=last_pk,
chunk_number=self._chunk_number)
return sql
def _checksum_update_query(self):
db, tbl = (self.options.table.schema,
self.options.table.name)
sql = Template(dedent('''
UPDATE ${checksum_table}
SET master_crc = '${master_crc}',
master_cnt = ${master_cnt}
WHERE db = ${db}
AND tbl = ${tbl}
AND chunk = ${chunk_number}
''').strip()).substitute(
checksum_table=self.options.replicate_table.identifier,
master_crc=self._last_checksum.crc,
master_cnt=self._last_checksum.row_count,
db=quote_string(self.options.table.schema),
tbl=quote_string(self.options.table.name),
chunk_number=self._chunk_number
)
return sql
def create_replicate_table(connection, tbl):
db, tbl = tbl.split('.')
query = Template(dedent('''
CREATE TABLE IF NOT EXISTS ${checksum_table} (
db char(64) NOT NULL,
tbl char(64) NOT NULL,
chunk int NOT NULL,
boundaries char(255) NOT NULL,
this_crc char(40) NOT NULL,
this_cnt int NOT NULL,
master_crc char(40) NULL,
master_cnt int NULL,
ts timestamp NOT NULL,
PRIMARY KEY (db, tbl, chunk)
);
''').strip()).substitute(
checksum_table='%s.%s' % (quote_identifier(db), quote_identifier(tbl))
)
cursor = connection.cursor()
cursor.execute(query)
replicate_table, = connection.show_tables(db, tbl)
return replicate_table
def initialize_connection(connection):
cursor = connection.cursor()
cursor.execute('SET @checksum := 0, '
'@row_count := 0 '
'/*!50108 ,@@binlog_format := STATEMENT */;')
def main(args=None):
warnings.filterwarnings('ignore')
parser = optparse.OptionParser()
parser.add_option('--table', action='append', default=[])
parser.add_option('--replicate')
parser.add_option('--limit', type=int, dest='limit', default=5000)
parser.add_option('--wait', type=float, default=0)
parser.add_option('--debug', action='store_true', default=False)
opts, args = parser.parse_args()
log_level = logging.INFO
if opts.debug:
log_level = logging.DEBUG
logging.basicConfig(format='%(levelname)s %(message)s',
level=log_level)
try:
connection = Client(read_default_group='client', charset='utf8')
initialize_connection(connection)
except MySQLdb.DatabaseError, exc:
logging.error("Failed to connect to MySQL [%d] %s", *exc.args)
return 1
replicate = None
try:
try:
if opts.replicate:
replicate = create_replicate_table(connection, opts.replicate)
header = (
'DATABASE',
'TABLE',
'CHUNK',
'HOST',
'ENGINE',
'COUNT',
'CHECKSUM',
'TIME',
'WAIT',
'STAT',
'LAG'
)
print '\t'.join(header)
fmt = '\t'.join(['%-4s' for _ in header])
for table in opts.table:
table, = connection.show_tables(*table.split('.'))
options = TableChecksumOption(table, opts.limit, replicate)
table_checksum = TableChecksum(connection, options)
total_rows = 0
for checksum in table_checksum.checksum():
print fmt % (
table.schema,
table.name,
checksum.chunk_number,
'localhost',
table.engine,
checksum.row_count,
checksum.crc,
'%.2fms' % (checksum.runtime*1000.0),
0,
'NULL',
'NULL',
)
total_rows += checksum.row_count
if opts.wait > 0:
logging.debug("Sleeping %.4f seconds", opts.wait)
time.sleep(opts.wait)
print "# %s.%s total rows: %d" % (
table.schema,
table.name,
total_rows
)
except MySQLdb.DatabaseError, exc:
logging.error("[%d] %s", *exc.args)
return 1
finally:
try:
connection.close()
except:
pass
if __name__ == '__main__':
try:
sys.exit(main())
except MySQLdb.DatabaseError, exc:
logging.error("[%d] %s", *exc.args)
sys.exit(1)
except KeyboardInterrupt:
logging.error("Interrupted")
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment