Skip to content

Instantly share code, notes, and snippets.

@kevinbin
Created April 30, 2025 04:57
Show Gist options
  • Save kevinbin/7f3e87c283f55ea17c446be9652e61dd to your computer and use it in GitHub Desktop.
Save kevinbin/7f3e87c283f55ea17c446be9652e61dd to your computer and use it in GitHub Desktop.
deploy and test innodb cluster set
#!/usr/bin/env mysqlsh --file
import os
import sys
import time
import json
import argparse
import datetime
import socket
import signal
from pathlib import Path
class MySQLConnection:
"""MySQL连接管理类"""
def __init__(self, host='localhost', port=None, user='root', password='root123'):
self.host = host
self.port = port
self.user = user
self.password = password
self.session = None
def connect(self):
"""建立MySQL连接"""
try:
shell.connect({
'host': self.host,
'port': self.port,
'user': self.user,
'password': self.password
})
self.session = shell.get_session()
return True
except Exception as e:
print(f"连接失败 ({self.host}:{self.port}): {str(e)}")
return False
def disconnect(self):
"""断开连接"""
if self.session:
self.session = None
shell.disconnect()
def execute_query(self, query):
"""执行SQL查询"""
try:
result = self.session.run_sql(query)
return result.fetch_all()
except Exception as e:
print(f"执行查询失败: {str(e)}")
return None
def execute_statement(self, query):
"""执行SQL语句(不返回结果)"""
try:
self.session.run_sql(query)
return True
except Exception as e:
print(f"执行语句失败: {str(e)}")
return False
class InstanceManager:
"""MySQL实例管理类"""
def __init__(self):
self.sandbox_dir = self._get_sandbox_dir()
def _get_sandbox_dir(self):
"""获取沙箱目录"""
home_dir = str(Path.home())
sandbox_dir = os.path.join(home_dir, "mysql-sandboxes")
os.makedirs(sandbox_dir, exist_ok=True)
return sandbox_dir
def deploy_instance(self, port):
"""部署新实例"""
try:
print(f"部署沙箱实例 {port}...")
dba.deploy_sandbox_instance(port, {
'password': 'root123',
'portx': port + 10,
'sandboxDir': self.sandbox_dir,
'ignoreSslError': True,
'allowRootFrom': '%'
})
self._enable_general_log(port)
return True
except Exception as e:
print(f"部署实例失败 {port}: {str(e)}")
return False
def start_instance(self, port):
"""启动实例"""
try:
dba.start_sandbox_instance(port)
print(f"✅ 已启动实例 {port}")
return True
except Exception as e:
print(f"❌ 启动实例失败 {port}: {str(e)}")
return False
def stop_instance(self, port):
"""停止实例"""
# try:
# dba.stop_sandbox_instance(port, {'sandboxDir': self.sandbox_dir})
# print(f"✅ 已停止实例 {port}")
# return True
# except Exception as e:
try:
# 如果正常停止失败,尝试强制终止
pid_file = f"{self.sandbox_dir}/{port}/{port}.pid"
if os.path.exists(pid_file):
with open(pid_file, 'r') as f:
pid = int(f.read().strip())
os.kill(pid, signal.SIGKILL)
print(f"✅ 已强制停止实例 {port}")
return True
except Exception as e2:
print(f"❌ 停止实例失败 {port}: {str(e2)}")
return False
def kill_instance(self, port):
"""强制终止实例进程"""
try:
pid_file = f"{self.sandbox_dir}/{port}/{port}.pid"
if os.path.exists(pid_file):
with open(pid_file, 'r') as f:
pid = int(f.read().strip())
os.kill(pid, signal.SIGKILL)
print(f"✅ 已强制终止实例 {port}")
return True
else:
print(f"❌ 找不到PID文件: {pid_file}")
return False
except Exception as e:
print(f"❌ 终止实例失败 {port}: {str(e)}")
return False
def _enable_general_log(self, port):
"""启用general log"""
conn = MySQLConnection(port=port)
if conn.connect():
instance_dir = f"{self.sandbox_dir}/{port}"
conn.execute_statement(f"SET GLOBAL general_log_file = '{instance_dir}/general.log'")
conn.execute_statement("SET GLOBAL general_log = 1")
conn.disconnect()
def delete_instance(self, port):
"""删除沙箱实例"""
try:
dba.delete_sandbox_instance(port, {'sandboxDir': self.sandbox_dir})
print(f"✅ 已删除实例 {port}")
return True
except Exception as e:
print(f"❌ 删除实例失败 {port}: {str(e)}")
return False
def cleanup_instances(self, ports):
"""清理多个实例"""
for port in ports:
try:
self.stop_instance(port)
except Exception:
pass
try:
self.delete_instance(port)
except Exception:
pass
return True
class ClusterManager:
"""InnoDB集群管理类"""
def __init__(self, name, ports):
self.name = name
self.ports = ports
self.instance_mgr = InstanceManager()
self.cluster = None
def create_cluster(self):
"""创建InnoDB集群"""
try:
# 连接到第一个实例
conn = MySQLConnection(port=self.ports[0])
if not conn.connect():
print(f"无法连接到实例 {self.ports[0]},创建集群失败")
return False
# 创建集群
print(f"创建集群 '{self.name}'...")
self.cluster = dba.create_cluster(self.name, {'gtidSetIsComplete': True})
# 添加其他实例
for port in self.ports[1:]:
print(f"添加实例 {port} 到集群...")
self.cluster.add_instance(f"root:root123@localhost:{port}", {'recoveryMethod': 'clone'})
conn.disconnect()
print(f"✅ 集群 '{self.name}' 创建成功")
return True
except Exception as e:
print(f"❌ 创建集群失败: {str(e)}")
return False
def get_cluster(self, port=None):
"""获取集群对象"""
try:
if port is None:
port = self.ports[0]
conn = MySQLConnection(port=port)
if not conn.connect():
return None
self.cluster = dba.get_cluster(self.name)
return self.cluster
except Exception as e:
print(f"获取集群失败: {str(e)}")
return None
def get_primary_port(self):
"""获取当前主节点端口"""
for port in self.ports:
conn = MySQLConnection(port=port)
if conn.connect():
try:
result = conn.execute_query(
"SELECT member_host, member_port, member_role FROM performance_schema.replication_group_members "
"WHERE member_id = @@server_uuid"
)
if result and result[0][2] == "PRIMARY":
conn.disconnect()
return port
conn.disconnect()
except Exception:
conn.disconnect()
return None
def get_cluster_status(self, port=None):
"""获取集群状态"""
cluster = self.get_cluster(port)
if cluster:
return cluster.status()
return None
def print_cluster_status(self, port=None):
"""打印集群状态"""
status = self.get_cluster_status(port)
if status:
print("\n当前集群状态:")
for instance in status.get('defaultReplicaSet', {}).get('topology', {}):
addr = status['defaultReplicaSet']['topology'][instance]
print(f"实例: {addr['address']}, 状态: {addr['status']}, 模式: {addr['mode']}, 角色: {addr['memberRole']}")
return True
return False
def switch_primary(self, target_port):
"""切换主节点"""
try:
primary_port = self.get_primary_port()
if primary_port == target_port:
print(f"端口 {target_port} 已经是主节点")
return True
conn = MySQLConnection(port=primary_port)
if conn.connect():
cluster = self.get_cluster(primary_port)
if cluster:
print(f"切换主节点到 {target_port}...")
cluster.set_primary_instance(f"root:root123@localhost:{target_port}")
conn.disconnect()
print(f"✅ 已将 {target_port} 设置为主节点")
return True
conn.disconnect()
except Exception as e:
print(f"❌ 切换主节点失败: {str(e)}")
return False
def rejoin_instance(self, port):
"""将实例重新加入集群"""
try:
print(f"将实例 {port} 重新加入集群...")
# 确保实例已启动
self.instance_mgr.start_instance(port)
# 获取集群对象
primary_port = self.get_primary_port()
if not primary_port:
print("找不到主节点,无法重新加入实例")
return False
# 从主节点连接集群
cluster = self.get_cluster(primary_port)
if not cluster:
print("获取集群对象失败")
return False
# 重新加入实例
try:
cluster.rejoin_instance(f"root:root123@localhost:{port}")
print(f"✅ 节点 {port} 已成功重新加入集群")
time.sleep(5) # 等待节点状态更新
return True
except Exception as e:
print(f"常规重新加入失败: {str(e)}")
try:
# 如果常规加入失败,尝试克隆方式
print("尝试使用clone方式恢复...")
cluster.rejoin_instance(f"root:root123@localhost:{port}", {
'recoveryMethod': 'clone'
})
print(f"✅ 节点 {port} 已通过克隆方式重新加入集群")
time.sleep(10)
return True
except Exception as e2:
print(f"❌ 重新加入集群失败: {str(e2)}")
return False
except Exception as e:
print(f"❌ 重新加入过程中出错: {str(e)}")
return False
class ClusterSetManager:
"""ClusterSet管理类"""
def __init__(self, primary_cluster, replica_cluster):
self.primary_cluster = primary_cluster
self.replica_cluster = replica_cluster
self.cluster_set = None
def create_cluster_set(self, name="myClusterSet"):
"""创建ClusterSet"""
try:
# 获取主集群
primary_port = self.primary_cluster.get_primary_port()
if not primary_port:
print("找不到主集群的主节点,创建ClusterSet失败")
return False
# 连接到主集群
conn = MySQLConnection(port=primary_port)
if not conn.connect():
print(f"无法连接到主集群主节点 {primary_port}")
return False
# 创建ClusterSet
cluster = self.primary_cluster.get_cluster(primary_port)
if not cluster:
print("获取主集群对象失败")
conn.disconnect()
return False
print(f"创建ClusterSet '{name}'...")
self.cluster_set = cluster.create_cluster_set(name)
conn.disconnect()
print(f"✅ ClusterSet '{name}' 创建成功")
return True
except Exception as e:
print(f"❌ 创建ClusterSet失败: {str(e)}")
return False
def add_replica_cluster(self):
"""添加从集群到ClusterSet"""
try:
# 获取主集群连接
primary_port = self.primary_cluster.get_primary_port()
if not primary_port:
print("找不到主集群的主节点")
return False
# 连接到主集群
conn = MySQLConnection(port=primary_port)
if not conn.connect():
print(f"无法连接到主集群主节点 {primary_port}")
return False
# 获取ClusterSet
cluster = self.primary_cluster.get_cluster(primary_port)
if not cluster:
print("获取主集群对象失败")
conn.disconnect()
return False
self.cluster_set = cluster.get_cluster_set()
# 添加从集群 - 从独立实例创建,而非从现有集群
print(f"将独立实例添加为从集群 '{self.replica_cluster.name}'...")
replica_port = self.replica_cluster.ports[0]
replica_cluster = self.cluster_set.create_replica_cluster(
f'root:root123@localhost:{replica_port}',
self.replica_cluster.name,
{'recoveryMethod': 'clone'}
)
# 向副本集群添加其余节点
print("向副本集群添加其他节点...")
for port in self.replica_cluster.ports[1:]:
replica_cluster.add_instance(
f'root:root123@localhost:{port}',
{'recoveryMethod': 'clone'}
)
print(f"✅ 从集群 '{self.replica_cluster.name}' 已添加到ClusterSet")
conn.disconnect()
return True
except Exception as e:
print(f"❌ 添加从集群失败: {str(e)}")
return False
def get_cluster_set(self):
"""获取ClusterSet对象,自动尝试所有节点"""
# 尝试连接到主集群的任意可用节点
for port in self.primary_cluster.ports:
try:
conn = MySQLConnection(port=port)
if conn.connect():
cluster = dba.get_cluster(self.primary_cluster.name)
self.cluster_set = cluster.get_cluster_set()
conn.disconnect()
return self.cluster_set
except Exception:
continue
print("❌ 无法连接到任何主集群节点")
return None
def get_status(self):
"""获取ClusterSet状态"""
cluster_set = self.get_cluster_set()
if cluster_set:
return cluster_set.status()
return None
def print_status(self):
"""打印ClusterSet状态"""
status = self.get_status()
if status:
# print("\nClusterSet状态:")
# print(json.dumps(status, indent=2))
if "clusters" in status:
print("\nClusterSet状态摘要:")
for cluster_name, cluster_info in status["clusters"].items():
cluster_role = cluster_info.get("clusterRole", "未知")
global_status = cluster_info.get("globalStatus", "未知")
print(f"集群: {cluster_name}, 角色: {cluster_role}, 状态: {global_status}")
# 额外显示主要信息
print(f"全局主实例: {status.get('globalPrimaryInstance', '未知')}")
print(f"总体状态: {status.get('status', '未知')} - {status.get('statusText', '')}")
return True
return False
class FailoverTester:
"""故障转移测试类"""
def __init__(self, primary_cluster, replica_cluster, clusterset_mgr, instance_mgr):
self.primary_cluster = primary_cluster
self.replica_cluster = replica_cluster
self.clusterset_mgr = clusterset_mgr
self.instance_mgr = instance_mgr
def test_failover_node(self, port):
"""测试单个节点的故障转移"""
print(f"\n\n========== 开始测试节点 {port} 的故障转移 ==========")
# 1. 确保测试节点是主集群中的主节点
primary_port = self.primary_cluster.get_primary_port()
if primary_port != port:
print(f"节点 {port} 不是当前的主节点,测试需要主节点")
if not self.primary_cluster.switch_primary(port):
print(f"无法将 {port} 切换为主节点,跳过测试")
return False
# 打印测试前的集群状态
self.primary_cluster.print_cluster_status()
# 2. 连接到主节点并创建测试数据
print("\n=== 开始测试ClusterSet故障转移 ===")
print("步骤1: 在主集群创建测试数据")
conn = MySQLConnection(port=port)
if not conn.connect():
print(f"无法连接到主节点 {port}")
return False
current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
try:
# 创建测试数据库和表
conn.execute_statement("CREATE DATABASE IF NOT EXISTS failover_test")
conn.execute_statement("CREATE TABLE IF NOT EXISTS failover_test.failover (id INT PRIMARY KEY AUTO_INCREMENT, message VARCHAR(255), ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP)")
# 插入一条基准测试数据,添加端口信息
conn.execute_statement(f"INSERT INTO failover_test.failover (message) VALUES ('故障转移前数据 - 旧主节点[{port}] - {current_time}')")
print(f"已在主节点 {port} 上创建测试数据库和初始数据")
conn.disconnect()
except Exception as e:
print(f"创建测试数据失败: {str(e)}")
conn.disconnect()
return False
# 3. 等待数据同步到从集群
print("\n步骤2: 等待数据同步到从集群")
time.sleep(5)
# 4. 验证从集群中的数据
print("\n步骤3: 验证从集群中的数据")
replica_port = self.replica_cluster.ports[0]
replica_conn = MySQLConnection(port=replica_port)
if not replica_conn.connect():
print(f"无法连接到从集群 {replica_port}")
return False
try:
rows = replica_conn.execute_query("SELECT * FROM failover_test.failover ORDER BY id DESC LIMIT 1")
if rows:
print(f"从集群中的数据: ID={rows[0][0]}, 消息='{rows[0][1]}'")
else:
print("从集群中未找到数据")
replica_conn.disconnect()
except Exception as e:
print(f"验证从集群数据失败: {str(e)}")
replica_conn.disconnect()
return False
# 5. 停止主节点
print(f"\n步骤4: 停止主节点 {port}")
if not self.instance_mgr.kill_instance(port):
print(f"无法停止主节点 {port}")
return False
# 6. 等待故障转移
print("\n步骤5: 等待故障转移")
time.sleep(30)
# 7. 确认新的主节点
print("\n步骤6: 确认新的主节点")
new_primary_port = None
for test_port in self.primary_cluster.ports:
if test_port != port:
try:
test_conn = MySQLConnection(port=test_port)
if test_conn.connect():
result = test_conn.execute_query(
"SELECT member_host, member_port, member_role FROM performance_schema.replication_group_members "
"WHERE member_id = @@server_uuid"
)
if result and result[0][2] == "PRIMARY":
new_primary_port = test_port
print(f"找到新的主节点: {new_primary_port}")
test_conn.disconnect()
break
test_conn.disconnect()
except Exception:
pass
if not new_primary_port:
print("无法找到新的主节点")
return False
# 8. 在新主节点上插入数据
print(f"\n步骤7: 在新主节点 {new_primary_port} 上插入新数据")
new_conn = MySQLConnection(port=new_primary_port)
if not new_conn.connect():
print(f"无法连接到新主节点 {new_primary_port}")
return False
failover_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
try:
new_conn.execute_statement(f"INSERT INTO failover_test.failover (message) VALUES ('故障转移后数据 - 新主节点[{new_primary_port}] - {failover_time}')")
print(f"已在新主节点 {new_primary_port} 上插入数据")
new_conn.disconnect()
except Exception as e:
print(f"插入新数据失败: {str(e)}")
new_conn.disconnect()
return False
# 9. 等待数据同步到从集群
print("\n步骤8: 等待新数据同步到从集群")
time.sleep(10)
# 10. 验证从集群中的新数据
print("\n步骤9: 验证从集群中的新数据")
replica_conn = MySQLConnection(port=replica_port)
if not replica_conn.connect():
print(f"无法连接到从集群 {replica_port}")
return False
try:
rows = replica_conn.execute_query("SELECT * FROM failover_test.failover ORDER BY id DESC LIMIT 2")
if rows and len(rows) >= 2:
print("✅ 故障转移成功! 节点宕机后的新数据已成功复制到从集群")
for row in rows:
print(f"ID={row[0]}, 消息='{row[1]}'")
else:
print("❌ 复制故障转移失败: 在从集群中未找到新数据")
replica_conn.disconnect()
except Exception as e:
print(f"❌ 复制故障转移失败: {str(e)}")
replica_conn.disconnect()
return False
# 11. 检查ClusterSet状态
print("\n步骤10: 检查ClusterSet状态")
self.clusterset_mgr.print_status()
# 12. 启动并重新加入故障节点
print(f"\n步骤11: 重新启动并加入故障节点 {port}")
# if not self.instance_mgr.start_instance(port):
# print(f"无法启动实例 {port}")
if not self.primary_cluster.rejoin_instance(port):
print(f"无法将实例 {port} 重新加入集群")
print("\n=== ClusterSet故障转移测试完成 ===")
return True
def test_all_nodes(self):
"""测试主集群中所有节点的故障转移"""
print("\n====== 开始对所有节点进行故障转移测试 ======")
# 首先验证ClusterSet状态
if not self.clusterset_mgr.print_status():
print("无法获取ClusterSet状态,测试失败")
return False
# 测试每个节点
for i, port in enumerate(self.primary_cluster.ports):
print(f"\n\n========== 开始测试节点 {port} 的故障转移 [{i+1}/{len(self.primary_cluster.ports)}] ==========")
# 确认集群状态
self.primary_cluster.print_cluster_status()
# 对当前节点进行测试
print(f"\n开始对该节点进行故障转移测试...")
self.test_failover_node(port)
# 如果不是最后一个节点,等待一段时间再进行下一个测试
if i < len(self.primary_cluster.ports) - 1:
print("\n等待一段时间后进行下一个节点的测试...")
time.sleep(15)
print("\n\n====== 所有节点的故障转移测试完成 ======")
return True
def main():
# 解析命令行参数
parser = argparse.ArgumentParser(description='MySQL InnoDB Cluster/ClusterSet 管理工具')
parser.add_argument('--test-failover', action='store_true', help='测试故障转移')
parser.add_argument('--create', action='store_true', help='创建新的 ClusterSet, 仅支持单主模式')
parser.add_argument('--cleanup', action='store_true', help='清理环境')
parser.add_argument('--show-status', action='store_true', help='显示状态')
args = parser.parse_args()
# 如果没有指定任何操作,显示帮助
if not (args.test_failover or args.create or args.cleanup or args.show_status):
parser.print_help()
return
# 定义端口
primary_ports = [4306, 4307, 4308]
replica_ports = [5306, 5307, 5308]
# 创建管理器实例
instance_mgr = InstanceManager()
primary_cluster = ClusterManager('primaryCluster', primary_ports)
replica_cluster = ClusterManager('replicaCluster', replica_ports)
clusterset_mgr = ClusterSetManager(primary_cluster, replica_cluster)
failover_tester = FailoverTester(primary_cluster, replica_cluster, clusterset_mgr, instance_mgr)
# 显示状态
if args.show_status:
print("\n=== 显示当前状态 ===")
primary_cluster.print_cluster_status()
clusterset_mgr.print_status()
return
# 清理环境
if args.cleanup:
print("\n=== 清理环境 ===")
instance_mgr.cleanup_instances(primary_ports + replica_ports)
print("✅ 环境清理完成")
return
# 如果只要测试故障转移,尝试在已有环境中测试
if args.test_failover and not args.create_clusterset:
print("\n=== 对所有主集群节点进行故障转移测试 ===")
# 连接到主集群的一个节点
conn = MySQLConnection(port=primary_ports[0])
if conn.connect():
try:
cluster = dba.get_cluster('primaryCluster')
cluster_set = cluster.get_cluster_set()
print("检测到现有ClusterSet,开始对所有节点进行测试...")
conn.disconnect()
failover_tester.test_all_nodes()
return
except Exception as e:
print(f"验证现有ClusterSet时出错: {str(e)}")
print("请先创建ClusterSet然后再测试")
conn.disconnect()
return
# 创建新的ClusterSet
if args.create_clusterset:
try:
# 清理现有环境
print("\n=== 清理现有环境 ===")
instance_mgr.cleanup_instances(primary_ports + replica_ports)
# 部署主集群实例
print("\n=== 部署主集群实例 ===")
for port in primary_ports:
instance_mgr.deploy_instance(port)
# 部署从集群实例 - 但不创建集群
print("\n=== 部署从集群实例 ===")
for port in replica_ports:
instance_mgr.deploy_instance(port)
# 创建主集群
print("\n=== 创建主集群 ===")
primary_cluster.create_cluster()
# 创建ClusterSet
print("\n=== 创建ClusterSet ===")
clusterset_mgr.create_cluster_set()
# 添加从集群到ClusterSet
print("\n=== 添加从集群到ClusterSet ===")
clusterset_mgr.add_replica_cluster()
# 显示ClusterSet状态
clusterset_mgr.print_status()
print("\n✅ ClusterSet创建完成")
# 如果同时要求测试故障转移,进行测试
if args.test_failover:
print("\n=== 进行故障转移测试 ===")
failover_tester.test_all_nodes()
except Exception as e:
print(f"\n❌ 创建ClusterSet过程中出错: {str(e)}")
print("\n=== 操作完成 ===")
if __name__ == "__main__":
# dba变量将由MySQL Shell在运行过程中提供
if 'shell' not in globals():
import mysqlsh
shell = mysqlsh.globals.shell
dba = mysqlsh.globals.dba
session = mysqlsh.globals.session
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment