Skip to content

Instantly share code, notes, and snippets.

@mahadirz
Created January 23, 2022 04:35
Show Gist options
  • Save mahadirz/b8d7a1623b588dc7d43a3131a6c4aa18 to your computer and use it in GitHub Desktop.
Save mahadirz/b8d7a1623b588dc7d43a3131a6c4aa18 to your computer and use it in GitHub Desktop.
mysq-binlog-tutorial
import traceback
import pendulum
import pymysql
from pymysql.protocol import MysqlPacket
from pymysqlreplication.binlogstream import MYSQL_EXPECTED_ERROR_CODES
from pymysqlreplication.constants import *
from pymysqlreplication.event import *
from pymysqlreplication.packet import BinLogPacketWrapper
from pymysqlreplication.row_event import TableMapEvent, UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent
bin_log_file = "mysql-bin-changelog.000050.bin"
connection = pymysql.Connection(
user="USER",
password="PASSWORD",
host="database-test.cto4n7gnn8z3.ap-southeast-1.rds.amazonaws.com",
cursorclass=pymysql.cursors.DictCursor
)
def get_table_information(schema, table):
for i in range(1, 3):
try:
cur = connection.cursor()
cur.execute("""
SELECT
COLUMN_NAME, COLLATION_NAME, CHARACTER_SET_NAME,
COLUMN_COMMENT, COLUMN_TYPE, COLUMN_KEY, ORDINAL_POSITION
FROM
information_schema.columns
WHERE
table_schema = %s AND table_name = %s
ORDER BY ORDINAL_POSITION
""", (schema, table))
return cur.fetchall()
except pymysql.OperationalError as error:
code, message = error.args
if code in MYSQL_EXPECTED_ERROR_CODES:
continue
else:
raise error
connection._get_table_information = get_table_information
class Header:
def __init__(self):
self.timestamp = 0
self.event_type = None
self.server_id = None
self.event_size = 0
self.log_pos = 0
self.flag = 0
def unpack(self, data):
unpacked = struct.unpack('<IBIIIH', data)
self.timestamp = unpacked[0]
self.event_type = unpacked[1]
self.server_id = unpacked[2]
self.event_size = unpacked[3]
self.log_pos = unpacked[4]
self.flag = unpacked[5]
def __repr__(self):
fmt = pendulum.from_timestamp(self.timestamp).to_datetime_string()
return f"timestamp:{self.timestamp} ({fmt})\n" \
f"event_type:{self.event_type}\n" \
f"server_id: {self.server_id}\n" \
f"event_size: {self.event_size}\n" \
f"next_log_pos: {self.log_pos}"
with open(bin_log_file, 'rb+') as f:
try:
pos = f.tell()
expected_binlog_magic = b'\xfebin'
true_binlog_magic = f.read(4)
assert true_binlog_magic == expected_binlog_magic
pos += len(true_binlog_magic)
table_map = {}
while True:
# move the position of file reading
headerlength = 19
header = f.read(headerlength)
if not header:
break
h = Header()
h.unpack(header)
pos += len(header)
body = f.read(h.event_size - headerlength)
pos += len(body)
print(h)
packet = MysqlPacket(struct.pack("<c", b'0') + header + body, 'utf')
allowed_events = [
TableMapEvent, RotateEvent,
UpdateRowsEvent,
WriteRowsEvent,
DeleteRowsEvent,
]
binlog_event = BinLogPacketWrapper(packet, table_map,
connection,
True,
allowed_events,
None,
None,
None,
None,
False,
False
)
# cache
if binlog_event.event_type == TABLE_MAP_EVENT and \
binlog_event.event is not None:
table_map[binlog_event.event.table_id] = binlog_event.event.get_table()
if binlog_event.event is not None and binlog_event.event_type not in [TABLE_MAP_EVENT, ROTATE_EVENT]:
binlog_event.event.dump()
print(binlog_event.event.rows)
except:
traceback.print_exc()
connection.close()
connection.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment