|
import time |
|
import redis |
|
|
|
|
|
class RedisNode: |
|
|
|
def __init__(self, **kwargs): |
|
self.ip = kwargs.get('node_ip') |
|
self.port = int(kwargs.get('node_port')) |
|
self.node_second_port = kwargs.get('node_second_port') |
|
self.node_id = kwargs.get('node_id') |
|
self.master_node_id = kwargs.get('master_id') |
|
self.slots = kwargs.get('slots') |
|
self.type = kwargs.get('node_type') |
|
self.connected_nodes = kwargs.get('connected_nodes') |
|
|
|
for kwarg in kwargs.keys(): |
|
setattr(self, kwarg, kwargs.get(kwarg)) |
|
|
|
self.client = redis.StrictRedis( |
|
self.ip, self.port, decode_responses=True |
|
) |
|
|
|
def __repr__(self): |
|
return 'RedisNode({}:{})'.format(self.ip, self.port) |
|
|
|
def execute_command(self, command): |
|
res = self.client.execute_command(command) |
|
|
|
if ('NODES' not in command) and ('ADDSLOTS' not in command): |
|
print('[{}:{}]({}): {}'.format(self.ip, self.port, command, res)) |
|
|
|
return res |
|
|
|
|
|
class RedisCluster: |
|
nodes = [] |
|
slaves = [] |
|
masters = [] |
|
|
|
__servers_map__ = None |
|
|
|
def __init__(self, servers_map): |
|
""" |
|
Establish connection to redis-servers |
|
|
|
:param servers_map: ip list if servers |
|
""" |
|
self.__servers_map__ = servers_map |
|
|
|
# Create RedisNode objects and connect to them |
|
for server in servers_map: |
|
self.add_node( |
|
node_ip=server.get('host'), node_port=server.get('port'), |
|
master=server.get('master') |
|
) |
|
|
|
self.nodes = self.masters + self.slaves |
|
|
|
if not len(self.slaves): |
|
raise Exception('Please add slave servers') |
|
|
|
if not len(self.masters) or (len(self.masters) < 3): |
|
raise Exception('Please add master servers (minimum 3)') |
|
|
|
print('masters: ', self.masters) |
|
print('slaves: ', self.slaves) |
|
|
|
def add_node(self, master=True, **kwargs): |
|
""" |
|
Create new RedisNode object and connect to it |
|
|
|
:param master: is this server must be a master |
|
:return: |
|
""" |
|
new_node = RedisNode(**kwargs) |
|
|
|
if master: |
|
self.masters.append(new_node) |
|
return |
|
|
|
self.slaves.append(new_node) |
|
|
|
def reset(self): |
|
# Reset slave servers |
|
for node in self.slaves: |
|
node.execute_command('CLUSTER RESET HARD') |
|
|
|
# Reset master servers |
|
for node in self.masters: |
|
# you can not reset master if it store some keys |
|
node.execute_command('FLUSHALL') |
|
node.execute_command('CLUSTER RESET HARD') |
|
|
|
def meet_servers(self): |
|
# Meet servers |
|
for node in self.nodes: |
|
for connecting_node in self.nodes: |
|
if node == connecting_node: |
|
continue |
|
|
|
node.execute_command('CLUSTER MEET {} {}'.format( |
|
connecting_node.ip, connecting_node.port |
|
)) |
|
|
|
# Check connection status and wait when status `hanshake` disappear |
|
waiting = True |
|
|
|
while waiting: |
|
waiting = False |
|
cluster_info = self.nodes[0].execute_command('CLUSTER NODES') |
|
for node in cluster_info: |
|
if 'handshake' in cluster_info.get(node).get('flags'): |
|
waiting = True |
|
|
|
if waiting: |
|
print('Servers hanshaking now...') |
|
time.sleep(0.1) |
|
|
|
def create(self): |
|
# Meet our servers and wait their initialization |
|
self.meet_servers() |
|
|
|
# Now we have properly id of each of them and refresh our nodes list |
|
cluster_info = self.nodes[0].execute_command('CLUSTER NODES') |
|
self.nodes_from_object(cluster_info) |
|
|
|
# Setup replication |
|
master_replication_inc = 0 |
|
|
|
for slave in self.slaves: |
|
master_node = self.masters[master_replication_inc] |
|
slave.execute_command( |
|
'CLUSTER REPLICATE {}'.format(master_node.node_id) |
|
) |
|
master_replication_inc += 1 |
|
|
|
if master_replication_inc > len(self.masters): |
|
master_replication_inc = 0 |
|
|
|
# Generate slot ranges for master servers |
|
slots_ranges = list(self.__generate_ranges__(16384, len(self.masters))) |
|
|
|
# And assign this slots |
|
print('Adding slots...') |
|
|
|
for index, slot_range in enumerate(slots_ranges): |
|
master_node = self.masters[index] |
|
|
|
for master_slot in range(*slot_range): |
|
master_node.execute_command( |
|
'CLUSTER ADDSLOTS {}'.format(master_slot) |
|
) |
|
print('Custer prepared successfully!') |
|
|
|
def nodes_from_object(self, cluster_info): |
|
self.nodes = [] |
|
self.slaves = [] |
|
self.masters = [] |
|
|
|
for node_key in cluster_info.keys(): |
|
|
|
ip, ports = node_key.split(':') |
|
first_port, second_port = ports.split('@') |
|
|
|
master = True |
|
for server in self.__servers_map__: |
|
if server.get('host') != ip: |
|
continue |
|
if server.get('port') != int(first_port): |
|
continue |
|
|
|
master = server.get('master') |
|
break |
|
|
|
self.add_node( |
|
master=master, |
|
node_ip=ip, |
|
node_port=int(first_port), |
|
node_second_port=int(second_port), |
|
node_id=cluster_info.get(node_key).get('node_id'), |
|
master_id=cluster_info.get(node_key).get('master_id'), |
|
slots=cluster_info.get(node_key).get('slots'), |
|
node_type=cluster_info.get(node_key).get('flags'), |
|
connected_nodes=cluster_info.get(node_key).get('connected') |
|
) |
|
|
|
self.nodes = self.masters + self.slaves |
|
|
|
@staticmethod |
|
def __generate_ranges__(total, parts): |
|
part_amount = int(total/parts) |
|
diff = total - parts*part_amount |
|
|
|
for part_inc in range(1, parts + 1): |
|
if part_inc < parts: |
|
yield [(part_inc - 1)*part_amount, part_inc*part_amount] |
|
continue |
|
|
|
yield [(part_inc - 1) * part_amount, part_inc * part_amount + diff] |
|
|
|
|
|
nodes_config = [ |
|
dict(host="XX.XX.XX.XX", port=7000, master=True), |
|
dict(host="XX.XX.XX.XX", port=7001, master=True), |
|
dict(host="XX.XX.XX.XX", port=7002, master=True), |
|
dict(host="XX.XX.XX.XX", port=7003, master=False), |
|
] |
|
|
|
cluster = RedisCluster(nodes_config) |
|
cluster.reset() |
|
cluster.create() |