Created
April 30, 2025 04:57
-
-
Save kevinbin/7f3e87c283f55ea17c446be9652e61dd to your computer and use it in GitHub Desktop.
deploy and test innodb cluster set
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env mysqlsh --file | |
import os | |
import sys | |
import time | |
import json | |
import argparse | |
import datetime | |
import socket | |
import signal | |
from pathlib import Path | |
class MySQLConnection: | |
"""MySQL连接管理类""" | |
def __init__(self, host='localhost', port=None, user='root', password='root123'): | |
self.host = host | |
self.port = port | |
self.user = user | |
self.password = password | |
self.session = None | |
def connect(self): | |
"""建立MySQL连接""" | |
try: | |
shell.connect({ | |
'host': self.host, | |
'port': self.port, | |
'user': self.user, | |
'password': self.password | |
}) | |
self.session = shell.get_session() | |
return True | |
except Exception as e: | |
print(f"连接失败 ({self.host}:{self.port}): {str(e)}") | |
return False | |
def disconnect(self): | |
"""断开连接""" | |
if self.session: | |
self.session = None | |
shell.disconnect() | |
def execute_query(self, query): | |
"""执行SQL查询""" | |
try: | |
result = self.session.run_sql(query) | |
return result.fetch_all() | |
except Exception as e: | |
print(f"执行查询失败: {str(e)}") | |
return None | |
def execute_statement(self, query): | |
"""执行SQL语句(不返回结果)""" | |
try: | |
self.session.run_sql(query) | |
return True | |
except Exception as e: | |
print(f"执行语句失败: {str(e)}") | |
return False | |
class InstanceManager: | |
"""MySQL实例管理类""" | |
def __init__(self): | |
self.sandbox_dir = self._get_sandbox_dir() | |
def _get_sandbox_dir(self): | |
"""获取沙箱目录""" | |
home_dir = str(Path.home()) | |
sandbox_dir = os.path.join(home_dir, "mysql-sandboxes") | |
os.makedirs(sandbox_dir, exist_ok=True) | |
return sandbox_dir | |
def deploy_instance(self, port): | |
"""部署新实例""" | |
try: | |
print(f"部署沙箱实例 {port}...") | |
dba.deploy_sandbox_instance(port, { | |
'password': 'root123', | |
'portx': port + 10, | |
'sandboxDir': self.sandbox_dir, | |
'ignoreSslError': True, | |
'allowRootFrom': '%' | |
}) | |
self._enable_general_log(port) | |
return True | |
except Exception as e: | |
print(f"部署实例失败 {port}: {str(e)}") | |
return False | |
def start_instance(self, port): | |
"""启动实例""" | |
try: | |
dba.start_sandbox_instance(port) | |
print(f"✅ 已启动实例 {port}") | |
return True | |
except Exception as e: | |
print(f"❌ 启动实例失败 {port}: {str(e)}") | |
return False | |
def stop_instance(self, port): | |
"""停止实例""" | |
# try: | |
# dba.stop_sandbox_instance(port, {'sandboxDir': self.sandbox_dir}) | |
# print(f"✅ 已停止实例 {port}") | |
# return True | |
# except Exception as e: | |
try: | |
# 如果正常停止失败,尝试强制终止 | |
pid_file = f"{self.sandbox_dir}/{port}/{port}.pid" | |
if os.path.exists(pid_file): | |
with open(pid_file, 'r') as f: | |
pid = int(f.read().strip()) | |
os.kill(pid, signal.SIGKILL) | |
print(f"✅ 已强制停止实例 {port}") | |
return True | |
except Exception as e2: | |
print(f"❌ 停止实例失败 {port}: {str(e2)}") | |
return False | |
def kill_instance(self, port): | |
"""强制终止实例进程""" | |
try: | |
pid_file = f"{self.sandbox_dir}/{port}/{port}.pid" | |
if os.path.exists(pid_file): | |
with open(pid_file, 'r') as f: | |
pid = int(f.read().strip()) | |
os.kill(pid, signal.SIGKILL) | |
print(f"✅ 已强制终止实例 {port}") | |
return True | |
else: | |
print(f"❌ 找不到PID文件: {pid_file}") | |
return False | |
except Exception as e: | |
print(f"❌ 终止实例失败 {port}: {str(e)}") | |
return False | |
def _enable_general_log(self, port): | |
"""启用general log""" | |
conn = MySQLConnection(port=port) | |
if conn.connect(): | |
instance_dir = f"{self.sandbox_dir}/{port}" | |
conn.execute_statement(f"SET GLOBAL general_log_file = '{instance_dir}/general.log'") | |
conn.execute_statement("SET GLOBAL general_log = 1") | |
conn.disconnect() | |
def delete_instance(self, port): | |
"""删除沙箱实例""" | |
try: | |
dba.delete_sandbox_instance(port, {'sandboxDir': self.sandbox_dir}) | |
print(f"✅ 已删除实例 {port}") | |
return True | |
except Exception as e: | |
print(f"❌ 删除实例失败 {port}: {str(e)}") | |
return False | |
def cleanup_instances(self, ports): | |
"""清理多个实例""" | |
for port in ports: | |
try: | |
self.stop_instance(port) | |
except Exception: | |
pass | |
try: | |
self.delete_instance(port) | |
except Exception: | |
pass | |
return True | |
class ClusterManager: | |
"""InnoDB集群管理类""" | |
def __init__(self, name, ports): | |
self.name = name | |
self.ports = ports | |
self.instance_mgr = InstanceManager() | |
self.cluster = None | |
def create_cluster(self): | |
"""创建InnoDB集群""" | |
try: | |
# 连接到第一个实例 | |
conn = MySQLConnection(port=self.ports[0]) | |
if not conn.connect(): | |
print(f"无法连接到实例 {self.ports[0]},创建集群失败") | |
return False | |
# 创建集群 | |
print(f"创建集群 '{self.name}'...") | |
self.cluster = dba.create_cluster(self.name, {'gtidSetIsComplete': True}) | |
# 添加其他实例 | |
for port in self.ports[1:]: | |
print(f"添加实例 {port} 到集群...") | |
self.cluster.add_instance(f"root:root123@localhost:{port}", {'recoveryMethod': 'clone'}) | |
conn.disconnect() | |
print(f"✅ 集群 '{self.name}' 创建成功") | |
return True | |
except Exception as e: | |
print(f"❌ 创建集群失败: {str(e)}") | |
return False | |
def get_cluster(self, port=None): | |
"""获取集群对象""" | |
try: | |
if port is None: | |
port = self.ports[0] | |
conn = MySQLConnection(port=port) | |
if not conn.connect(): | |
return None | |
self.cluster = dba.get_cluster(self.name) | |
return self.cluster | |
except Exception as e: | |
print(f"获取集群失败: {str(e)}") | |
return None | |
def get_primary_port(self): | |
"""获取当前主节点端口""" | |
for port in self.ports: | |
conn = MySQLConnection(port=port) | |
if conn.connect(): | |
try: | |
result = conn.execute_query( | |
"SELECT member_host, member_port, member_role FROM performance_schema.replication_group_members " | |
"WHERE member_id = @@server_uuid" | |
) | |
if result and result[0][2] == "PRIMARY": | |
conn.disconnect() | |
return port | |
conn.disconnect() | |
except Exception: | |
conn.disconnect() | |
return None | |
def get_cluster_status(self, port=None): | |
"""获取集群状态""" | |
cluster = self.get_cluster(port) | |
if cluster: | |
return cluster.status() | |
return None | |
def print_cluster_status(self, port=None): | |
"""打印集群状态""" | |
status = self.get_cluster_status(port) | |
if status: | |
print("\n当前集群状态:") | |
for instance in status.get('defaultReplicaSet', {}).get('topology', {}): | |
addr = status['defaultReplicaSet']['topology'][instance] | |
print(f"实例: {addr['address']}, 状态: {addr['status']}, 模式: {addr['mode']}, 角色: {addr['memberRole']}") | |
return True | |
return False | |
def switch_primary(self, target_port): | |
"""切换主节点""" | |
try: | |
primary_port = self.get_primary_port() | |
if primary_port == target_port: | |
print(f"端口 {target_port} 已经是主节点") | |
return True | |
conn = MySQLConnection(port=primary_port) | |
if conn.connect(): | |
cluster = self.get_cluster(primary_port) | |
if cluster: | |
print(f"切换主节点到 {target_port}...") | |
cluster.set_primary_instance(f"root:root123@localhost:{target_port}") | |
conn.disconnect() | |
print(f"✅ 已将 {target_port} 设置为主节点") | |
return True | |
conn.disconnect() | |
except Exception as e: | |
print(f"❌ 切换主节点失败: {str(e)}") | |
return False | |
def rejoin_instance(self, port): | |
"""将实例重新加入集群""" | |
try: | |
print(f"将实例 {port} 重新加入集群...") | |
# 确保实例已启动 | |
self.instance_mgr.start_instance(port) | |
# 获取集群对象 | |
primary_port = self.get_primary_port() | |
if not primary_port: | |
print("找不到主节点,无法重新加入实例") | |
return False | |
# 从主节点连接集群 | |
cluster = self.get_cluster(primary_port) | |
if not cluster: | |
print("获取集群对象失败") | |
return False | |
# 重新加入实例 | |
try: | |
cluster.rejoin_instance(f"root:root123@localhost:{port}") | |
print(f"✅ 节点 {port} 已成功重新加入集群") | |
time.sleep(5) # 等待节点状态更新 | |
return True | |
except Exception as e: | |
print(f"常规重新加入失败: {str(e)}") | |
try: | |
# 如果常规加入失败,尝试克隆方式 | |
print("尝试使用clone方式恢复...") | |
cluster.rejoin_instance(f"root:root123@localhost:{port}", { | |
'recoveryMethod': 'clone' | |
}) | |
print(f"✅ 节点 {port} 已通过克隆方式重新加入集群") | |
time.sleep(10) | |
return True | |
except Exception as e2: | |
print(f"❌ 重新加入集群失败: {str(e2)}") | |
return False | |
except Exception as e: | |
print(f"❌ 重新加入过程中出错: {str(e)}") | |
return False | |
class ClusterSetManager: | |
"""ClusterSet管理类""" | |
def __init__(self, primary_cluster, replica_cluster): | |
self.primary_cluster = primary_cluster | |
self.replica_cluster = replica_cluster | |
self.cluster_set = None | |
def create_cluster_set(self, name="myClusterSet"): | |
"""创建ClusterSet""" | |
try: | |
# 获取主集群 | |
primary_port = self.primary_cluster.get_primary_port() | |
if not primary_port: | |
print("找不到主集群的主节点,创建ClusterSet失败") | |
return False | |
# 连接到主集群 | |
conn = MySQLConnection(port=primary_port) | |
if not conn.connect(): | |
print(f"无法连接到主集群主节点 {primary_port}") | |
return False | |
# 创建ClusterSet | |
cluster = self.primary_cluster.get_cluster(primary_port) | |
if not cluster: | |
print("获取主集群对象失败") | |
conn.disconnect() | |
return False | |
print(f"创建ClusterSet '{name}'...") | |
self.cluster_set = cluster.create_cluster_set(name) | |
conn.disconnect() | |
print(f"✅ ClusterSet '{name}' 创建成功") | |
return True | |
except Exception as e: | |
print(f"❌ 创建ClusterSet失败: {str(e)}") | |
return False | |
def add_replica_cluster(self): | |
"""添加从集群到ClusterSet""" | |
try: | |
# 获取主集群连接 | |
primary_port = self.primary_cluster.get_primary_port() | |
if not primary_port: | |
print("找不到主集群的主节点") | |
return False | |
# 连接到主集群 | |
conn = MySQLConnection(port=primary_port) | |
if not conn.connect(): | |
print(f"无法连接到主集群主节点 {primary_port}") | |
return False | |
# 获取ClusterSet | |
cluster = self.primary_cluster.get_cluster(primary_port) | |
if not cluster: | |
print("获取主集群对象失败") | |
conn.disconnect() | |
return False | |
self.cluster_set = cluster.get_cluster_set() | |
# 添加从集群 - 从独立实例创建,而非从现有集群 | |
print(f"将独立实例添加为从集群 '{self.replica_cluster.name}'...") | |
replica_port = self.replica_cluster.ports[0] | |
replica_cluster = self.cluster_set.create_replica_cluster( | |
f'root:root123@localhost:{replica_port}', | |
self.replica_cluster.name, | |
{'recoveryMethod': 'clone'} | |
) | |
# 向副本集群添加其余节点 | |
print("向副本集群添加其他节点...") | |
for port in self.replica_cluster.ports[1:]: | |
replica_cluster.add_instance( | |
f'root:root123@localhost:{port}', | |
{'recoveryMethod': 'clone'} | |
) | |
print(f"✅ 从集群 '{self.replica_cluster.name}' 已添加到ClusterSet") | |
conn.disconnect() | |
return True | |
except Exception as e: | |
print(f"❌ 添加从集群失败: {str(e)}") | |
return False | |
def get_cluster_set(self): | |
"""获取ClusterSet对象,自动尝试所有节点""" | |
# 尝试连接到主集群的任意可用节点 | |
for port in self.primary_cluster.ports: | |
try: | |
conn = MySQLConnection(port=port) | |
if conn.connect(): | |
cluster = dba.get_cluster(self.primary_cluster.name) | |
self.cluster_set = cluster.get_cluster_set() | |
conn.disconnect() | |
return self.cluster_set | |
except Exception: | |
continue | |
print("❌ 无法连接到任何主集群节点") | |
return None | |
def get_status(self): | |
"""获取ClusterSet状态""" | |
cluster_set = self.get_cluster_set() | |
if cluster_set: | |
return cluster_set.status() | |
return None | |
def print_status(self): | |
"""打印ClusterSet状态""" | |
status = self.get_status() | |
if status: | |
# print("\nClusterSet状态:") | |
# print(json.dumps(status, indent=2)) | |
if "clusters" in status: | |
print("\nClusterSet状态摘要:") | |
for cluster_name, cluster_info in status["clusters"].items(): | |
cluster_role = cluster_info.get("clusterRole", "未知") | |
global_status = cluster_info.get("globalStatus", "未知") | |
print(f"集群: {cluster_name}, 角色: {cluster_role}, 状态: {global_status}") | |
# 额外显示主要信息 | |
print(f"全局主实例: {status.get('globalPrimaryInstance', '未知')}") | |
print(f"总体状态: {status.get('status', '未知')} - {status.get('statusText', '')}") | |
return True | |
return False | |
class FailoverTester: | |
"""故障转移测试类""" | |
def __init__(self, primary_cluster, replica_cluster, clusterset_mgr, instance_mgr): | |
self.primary_cluster = primary_cluster | |
self.replica_cluster = replica_cluster | |
self.clusterset_mgr = clusterset_mgr | |
self.instance_mgr = instance_mgr | |
def test_failover_node(self, port): | |
"""测试单个节点的故障转移""" | |
print(f"\n\n========== 开始测试节点 {port} 的故障转移 ==========") | |
# 1. 确保测试节点是主集群中的主节点 | |
primary_port = self.primary_cluster.get_primary_port() | |
if primary_port != port: | |
print(f"节点 {port} 不是当前的主节点,测试需要主节点") | |
if not self.primary_cluster.switch_primary(port): | |
print(f"无法将 {port} 切换为主节点,跳过测试") | |
return False | |
# 打印测试前的集群状态 | |
self.primary_cluster.print_cluster_status() | |
# 2. 连接到主节点并创建测试数据 | |
print("\n=== 开始测试ClusterSet故障转移 ===") | |
print("步骤1: 在主集群创建测试数据") | |
conn = MySQLConnection(port=port) | |
if not conn.connect(): | |
print(f"无法连接到主节点 {port}") | |
return False | |
current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
try: | |
# 创建测试数据库和表 | |
conn.execute_statement("CREATE DATABASE IF NOT EXISTS failover_test") | |
conn.execute_statement("CREATE TABLE IF NOT EXISTS failover_test.failover (id INT PRIMARY KEY AUTO_INCREMENT, message VARCHAR(255), ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP)") | |
# 插入一条基准测试数据,添加端口信息 | |
conn.execute_statement(f"INSERT INTO failover_test.failover (message) VALUES ('故障转移前数据 - 旧主节点[{port}] - {current_time}')") | |
print(f"已在主节点 {port} 上创建测试数据库和初始数据") | |
conn.disconnect() | |
except Exception as e: | |
print(f"创建测试数据失败: {str(e)}") | |
conn.disconnect() | |
return False | |
# 3. 等待数据同步到从集群 | |
print("\n步骤2: 等待数据同步到从集群") | |
time.sleep(5) | |
# 4. 验证从集群中的数据 | |
print("\n步骤3: 验证从集群中的数据") | |
replica_port = self.replica_cluster.ports[0] | |
replica_conn = MySQLConnection(port=replica_port) | |
if not replica_conn.connect(): | |
print(f"无法连接到从集群 {replica_port}") | |
return False | |
try: | |
rows = replica_conn.execute_query("SELECT * FROM failover_test.failover ORDER BY id DESC LIMIT 1") | |
if rows: | |
print(f"从集群中的数据: ID={rows[0][0]}, 消息='{rows[0][1]}'") | |
else: | |
print("从集群中未找到数据") | |
replica_conn.disconnect() | |
except Exception as e: | |
print(f"验证从集群数据失败: {str(e)}") | |
replica_conn.disconnect() | |
return False | |
# 5. 停止主节点 | |
print(f"\n步骤4: 停止主节点 {port}") | |
if not self.instance_mgr.kill_instance(port): | |
print(f"无法停止主节点 {port}") | |
return False | |
# 6. 等待故障转移 | |
print("\n步骤5: 等待故障转移") | |
time.sleep(30) | |
# 7. 确认新的主节点 | |
print("\n步骤6: 确认新的主节点") | |
new_primary_port = None | |
for test_port in self.primary_cluster.ports: | |
if test_port != port: | |
try: | |
test_conn = MySQLConnection(port=test_port) | |
if test_conn.connect(): | |
result = test_conn.execute_query( | |
"SELECT member_host, member_port, member_role FROM performance_schema.replication_group_members " | |
"WHERE member_id = @@server_uuid" | |
) | |
if result and result[0][2] == "PRIMARY": | |
new_primary_port = test_port | |
print(f"找到新的主节点: {new_primary_port}") | |
test_conn.disconnect() | |
break | |
test_conn.disconnect() | |
except Exception: | |
pass | |
if not new_primary_port: | |
print("无法找到新的主节点") | |
return False | |
# 8. 在新主节点上插入数据 | |
print(f"\n步骤7: 在新主节点 {new_primary_port} 上插入新数据") | |
new_conn = MySQLConnection(port=new_primary_port) | |
if not new_conn.connect(): | |
print(f"无法连接到新主节点 {new_primary_port}") | |
return False | |
failover_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
try: | |
new_conn.execute_statement(f"INSERT INTO failover_test.failover (message) VALUES ('故障转移后数据 - 新主节点[{new_primary_port}] - {failover_time}')") | |
print(f"已在新主节点 {new_primary_port} 上插入数据") | |
new_conn.disconnect() | |
except Exception as e: | |
print(f"插入新数据失败: {str(e)}") | |
new_conn.disconnect() | |
return False | |
# 9. 等待数据同步到从集群 | |
print("\n步骤8: 等待新数据同步到从集群") | |
time.sleep(10) | |
# 10. 验证从集群中的新数据 | |
print("\n步骤9: 验证从集群中的新数据") | |
replica_conn = MySQLConnection(port=replica_port) | |
if not replica_conn.connect(): | |
print(f"无法连接到从集群 {replica_port}") | |
return False | |
try: | |
rows = replica_conn.execute_query("SELECT * FROM failover_test.failover ORDER BY id DESC LIMIT 2") | |
if rows and len(rows) >= 2: | |
print("✅ 故障转移成功! 节点宕机后的新数据已成功复制到从集群") | |
for row in rows: | |
print(f"ID={row[0]}, 消息='{row[1]}'") | |
else: | |
print("❌ 复制故障转移失败: 在从集群中未找到新数据") | |
replica_conn.disconnect() | |
except Exception as e: | |
print(f"❌ 复制故障转移失败: {str(e)}") | |
replica_conn.disconnect() | |
return False | |
# 11. 检查ClusterSet状态 | |
print("\n步骤10: 检查ClusterSet状态") | |
self.clusterset_mgr.print_status() | |
# 12. 启动并重新加入故障节点 | |
print(f"\n步骤11: 重新启动并加入故障节点 {port}") | |
# if not self.instance_mgr.start_instance(port): | |
# print(f"无法启动实例 {port}") | |
if not self.primary_cluster.rejoin_instance(port): | |
print(f"无法将实例 {port} 重新加入集群") | |
print("\n=== ClusterSet故障转移测试完成 ===") | |
return True | |
def test_all_nodes(self): | |
"""测试主集群中所有节点的故障转移""" | |
print("\n====== 开始对所有节点进行故障转移测试 ======") | |
# 首先验证ClusterSet状态 | |
if not self.clusterset_mgr.print_status(): | |
print("无法获取ClusterSet状态,测试失败") | |
return False | |
# 测试每个节点 | |
for i, port in enumerate(self.primary_cluster.ports): | |
print(f"\n\n========== 开始测试节点 {port} 的故障转移 [{i+1}/{len(self.primary_cluster.ports)}] ==========") | |
# 确认集群状态 | |
self.primary_cluster.print_cluster_status() | |
# 对当前节点进行测试 | |
print(f"\n开始对该节点进行故障转移测试...") | |
self.test_failover_node(port) | |
# 如果不是最后一个节点,等待一段时间再进行下一个测试 | |
if i < len(self.primary_cluster.ports) - 1: | |
print("\n等待一段时间后进行下一个节点的测试...") | |
time.sleep(15) | |
print("\n\n====== 所有节点的故障转移测试完成 ======") | |
return True | |
def main(): | |
# 解析命令行参数 | |
parser = argparse.ArgumentParser(description='MySQL InnoDB Cluster/ClusterSet 管理工具') | |
parser.add_argument('--test-failover', action='store_true', help='测试故障转移') | |
parser.add_argument('--create', action='store_true', help='创建新的 ClusterSet, 仅支持单主模式') | |
parser.add_argument('--cleanup', action='store_true', help='清理环境') | |
parser.add_argument('--show-status', action='store_true', help='显示状态') | |
args = parser.parse_args() | |
# 如果没有指定任何操作,显示帮助 | |
if not (args.test_failover or args.create or args.cleanup or args.show_status): | |
parser.print_help() | |
return | |
# 定义端口 | |
primary_ports = [4306, 4307, 4308] | |
replica_ports = [5306, 5307, 5308] | |
# 创建管理器实例 | |
instance_mgr = InstanceManager() | |
primary_cluster = ClusterManager('primaryCluster', primary_ports) | |
replica_cluster = ClusterManager('replicaCluster', replica_ports) | |
clusterset_mgr = ClusterSetManager(primary_cluster, replica_cluster) | |
failover_tester = FailoverTester(primary_cluster, replica_cluster, clusterset_mgr, instance_mgr) | |
# 显示状态 | |
if args.show_status: | |
print("\n=== 显示当前状态 ===") | |
primary_cluster.print_cluster_status() | |
clusterset_mgr.print_status() | |
return | |
# 清理环境 | |
if args.cleanup: | |
print("\n=== 清理环境 ===") | |
instance_mgr.cleanup_instances(primary_ports + replica_ports) | |
print("✅ 环境清理完成") | |
return | |
# 如果只要测试故障转移,尝试在已有环境中测试 | |
if args.test_failover and not args.create_clusterset: | |
print("\n=== 对所有主集群节点进行故障转移测试 ===") | |
# 连接到主集群的一个节点 | |
conn = MySQLConnection(port=primary_ports[0]) | |
if conn.connect(): | |
try: | |
cluster = dba.get_cluster('primaryCluster') | |
cluster_set = cluster.get_cluster_set() | |
print("检测到现有ClusterSet,开始对所有节点进行测试...") | |
conn.disconnect() | |
failover_tester.test_all_nodes() | |
return | |
except Exception as e: | |
print(f"验证现有ClusterSet时出错: {str(e)}") | |
print("请先创建ClusterSet然后再测试") | |
conn.disconnect() | |
return | |
# 创建新的ClusterSet | |
if args.create_clusterset: | |
try: | |
# 清理现有环境 | |
print("\n=== 清理现有环境 ===") | |
instance_mgr.cleanup_instances(primary_ports + replica_ports) | |
# 部署主集群实例 | |
print("\n=== 部署主集群实例 ===") | |
for port in primary_ports: | |
instance_mgr.deploy_instance(port) | |
# 部署从集群实例 - 但不创建集群 | |
print("\n=== 部署从集群实例 ===") | |
for port in replica_ports: | |
instance_mgr.deploy_instance(port) | |
# 创建主集群 | |
print("\n=== 创建主集群 ===") | |
primary_cluster.create_cluster() | |
# 创建ClusterSet | |
print("\n=== 创建ClusterSet ===") | |
clusterset_mgr.create_cluster_set() | |
# 添加从集群到ClusterSet | |
print("\n=== 添加从集群到ClusterSet ===") | |
clusterset_mgr.add_replica_cluster() | |
# 显示ClusterSet状态 | |
clusterset_mgr.print_status() | |
print("\n✅ ClusterSet创建完成") | |
# 如果同时要求测试故障转移,进行测试 | |
if args.test_failover: | |
print("\n=== 进行故障转移测试 ===") | |
failover_tester.test_all_nodes() | |
except Exception as e: | |
print(f"\n❌ 创建ClusterSet过程中出错: {str(e)}") | |
print("\n=== 操作完成 ===") | |
if __name__ == "__main__": | |
# dba变量将由MySQL Shell在运行过程中提供 | |
if 'shell' not in globals(): | |
import mysqlsh | |
shell = mysqlsh.globals.shell | |
dba = mysqlsh.globals.dba | |
session = mysqlsh.globals.session | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment