Created
January 23, 2022 04:35
-
-
Save mahadirz/b8d7a1623b588dc7d43a3131a6c4aa18 to your computer and use it in GitHub Desktop.
mysq-binlog-tutorial
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import traceback | |
import pendulum | |
import pymysql | |
from pymysql.protocol import MysqlPacket | |
from pymysqlreplication.binlogstream import MYSQL_EXPECTED_ERROR_CODES | |
from pymysqlreplication.constants import * | |
from pymysqlreplication.event import * | |
from pymysqlreplication.packet import BinLogPacketWrapper | |
from pymysqlreplication.row_event import TableMapEvent, UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent | |
bin_log_file = "mysql-bin-changelog.000050.bin" | |
connection = pymysql.Connection( | |
user="USER", | |
password="PASSWORD", | |
host="database-test.cto4n7gnn8z3.ap-southeast-1.rds.amazonaws.com", | |
cursorclass=pymysql.cursors.DictCursor | |
) | |
def get_table_information(schema, table): | |
for i in range(1, 3): | |
try: | |
cur = connection.cursor() | |
cur.execute(""" | |
SELECT | |
COLUMN_NAME, COLLATION_NAME, CHARACTER_SET_NAME, | |
COLUMN_COMMENT, COLUMN_TYPE, COLUMN_KEY, ORDINAL_POSITION | |
FROM | |
information_schema.columns | |
WHERE | |
table_schema = %s AND table_name = %s | |
ORDER BY ORDINAL_POSITION | |
""", (schema, table)) | |
return cur.fetchall() | |
except pymysql.OperationalError as error: | |
code, message = error.args | |
if code in MYSQL_EXPECTED_ERROR_CODES: | |
continue | |
else: | |
raise error | |
connection._get_table_information = get_table_information | |
class Header: | |
def __init__(self): | |
self.timestamp = 0 | |
self.event_type = None | |
self.server_id = None | |
self.event_size = 0 | |
self.log_pos = 0 | |
self.flag = 0 | |
def unpack(self, data): | |
unpacked = struct.unpack('<IBIIIH', data) | |
self.timestamp = unpacked[0] | |
self.event_type = unpacked[1] | |
self.server_id = unpacked[2] | |
self.event_size = unpacked[3] | |
self.log_pos = unpacked[4] | |
self.flag = unpacked[5] | |
def __repr__(self): | |
fmt = pendulum.from_timestamp(self.timestamp).to_datetime_string() | |
return f"timestamp:{self.timestamp} ({fmt})\n" \ | |
f"event_type:{self.event_type}\n" \ | |
f"server_id: {self.server_id}\n" \ | |
f"event_size: {self.event_size}\n" \ | |
f"next_log_pos: {self.log_pos}" | |
with open(bin_log_file, 'rb+') as f: | |
try: | |
pos = f.tell() | |
expected_binlog_magic = b'\xfebin' | |
true_binlog_magic = f.read(4) | |
assert true_binlog_magic == expected_binlog_magic | |
pos += len(true_binlog_magic) | |
table_map = {} | |
while True: | |
# move the position of file reading | |
headerlength = 19 | |
header = f.read(headerlength) | |
if not header: | |
break | |
h = Header() | |
h.unpack(header) | |
pos += len(header) | |
body = f.read(h.event_size - headerlength) | |
pos += len(body) | |
print(h) | |
packet = MysqlPacket(struct.pack("<c", b'0') + header + body, 'utf') | |
allowed_events = [ | |
TableMapEvent, RotateEvent, | |
UpdateRowsEvent, | |
WriteRowsEvent, | |
DeleteRowsEvent, | |
] | |
binlog_event = BinLogPacketWrapper(packet, table_map, | |
connection, | |
True, | |
allowed_events, | |
None, | |
None, | |
None, | |
None, | |
False, | |
False | |
) | |
# cache | |
if binlog_event.event_type == TABLE_MAP_EVENT and \ | |
binlog_event.event is not None: | |
table_map[binlog_event.event.table_id] = binlog_event.event.get_table() | |
if binlog_event.event is not None and binlog_event.event_type not in [TABLE_MAP_EVENT, ROTATE_EVENT]: | |
binlog_event.event.dump() | |
print(binlog_event.event.rows) | |
except: | |
traceback.print_exc() | |
connection.close() | |
connection.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment