Created
September 22, 2011 02:56
-
-
Save progrium/1233917 to your computer and use it in GitHub Desktop.
A distributed group membership module
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """A distributed group membership module | |
| This provides distributed group membership for easily building clustered | |
| applications with gevent. Using this in your app, you just provide the IP | |
| of another node in the cluster and it will receive the IPs of all nodes in | |
| the cluster. When a node joins or drops from the cluster, all other nodes find | |
| out immediately. | |
| The roster is managed by a leader. When you create a cluster, you tell the | |
| first node it is the leader (by simply pointing it to its own IP). As you | |
| add nodes, you can point them to the leader or any other node. If a node | |
| is not the leader, it will redirect the connection to the leader. All nodes | |
| also maintain a keepalive with the leader. | |
| If the leader drops from the cluster, the nodes will dumbly pick a new leader | |
| by taking the remaining node list, sorting it, and picking the first node. If | |
| a node happens to get a different leader, as long as it is in the cluster, it | |
| will be redirected to the right leader. | |
| """ | |
| import gevent.monkey; gevent.monkey.patch_all(thread=False) | |
| import logging | |
| import socket | |
| import json | |
| import gevent | |
| import gevent.server | |
| import gevent.socket | |
| from gevent_tools import util | |
| from gevent_tools import service | |
| CLIENT_TIMEOUT_SECONDS = 10 | |
| SERVER_KEEPALIVE_SECONDS = 5 | |
| def logger(obj): | |
| name = '%s.%s' % (obj.__module__, obj.__class__.__name__) | |
| return logging.getLogger(name) | |
| class ClusterError(Exception): pass | |
| class NewLeader(Exception): pass | |
| class ClusterManager(service.Service): | |
| def __init__(self, callback, listen_address, leader_address=None, client_hostname=None): | |
| self.server = PeerServer(self, listen_address) | |
| self.client = PeerClient(self, leader_address, client_hostname) | |
| self.cluster = set() | |
| self._callback = callback | |
| self.add_service(self.server) | |
| if leader_address: | |
| self.add_service(self.client) | |
| self.is_leader = False | |
| else: | |
| self.is_leader = True | |
| def trigger_callback(self): | |
| if self._callback: | |
| self._callback(self.cluster.copy()) | |
| class PeerServer(service.Service): | |
| def __init__(self, manager, address): | |
| self.logger = logger(self) | |
| self.manager = manager | |
| self.address = address | |
| self.clients = {} | |
| self.server = gevent.server.StreamServer(address, | |
| handle=self.handle, spawn=self.spawn) | |
| self.add_service(self.server) | |
| def do_start(self): | |
| if self.manager.is_leader: | |
| self.manager.cluster.add(self.address[0]) | |
| self.manager.trigger_callback() | |
| gevent.sleep(0) | |
| def handle(self, socket, address): | |
| """ | |
| If not a leader, a node will simply return a single item list pointing | |
| to the leader. Otherwise, it will add the host of the connected client | |
| to the cluster roster, broadcast to all nodes the new roster, and wait | |
| for keepalives. If no keepalive within timeout or the client drops, it | |
| drops it from the roster and broadcasts to all remaining nodes. | |
| """ | |
| self.logger.debug('New connection from %s:%s' % address) | |
| if not self.manager.is_leader: | |
| socket.send(json.dumps({'leader': self.manager.client.leader_address[0], | |
| 'port': self.manager.client.leader_address[1]})) | |
| socket.close() | |
| self.logger.debug("Redirected to %s:%s" % self.manager.client.leader_address) | |
| else: | |
| socket.send(self._cluster_message()) | |
| sockfile = socket.makefile() | |
| name = sockfile.readline() | |
| if not name: | |
| return | |
| if name == '\n': | |
| name = address[0] | |
| else: | |
| name = name.strip() | |
| self._update(add={'host': name, 'socket': socket}) | |
| # TODO: Use TCP keepalives | |
| timeout = self._client_timeout(socket) | |
| for line in util.line_protocol(sockfile, strip=False): | |
| timeout.kill() | |
| timeout = self._client_timeout(socket) | |
| socket.send('\n') | |
| self.logger.debug("Keepalive from %s:%s" % address) | |
| self.logger.debug("Client disconnected from %s:%s" % address) | |
| self._update(remove=name) | |
| def _client_timeout(self, socket): | |
| def shutdown(socket): | |
| try: | |
| socket.shutdown(0) | |
| except IOError: | |
| pass | |
| return self.spawn_later(CLIENT_TIMEOUT_SECONDS, | |
| lambda: shutdown(socket)) | |
| def _cluster_message(self): | |
| return '%s\n' % json.dumps({'cluster': list(self.manager.cluster)}) | |
| def _update(self, add=None, remove=None): | |
| """ Used by leader to manage and broadcast roster """ | |
| if add is not None: | |
| self.manager.cluster.add(add['host']) | |
| self.clients[add['host']] = add['socket'] | |
| self.logger.debug("Added to cluster: %s" % add['host']) | |
| if remove is not None: | |
| self.manager.cluster.remove(remove) | |
| del self.clients[remove] | |
| self.logger.debug("Removed from cluster: %s" % remove) | |
| for client in self.clients: | |
| self.clients[client].send(self._cluster_message()) | |
| self.manager.trigger_callback() | |
| class PeerClient(service.Service): | |
| def __init__(self, manager, leader_address, client_hostname=None): | |
| self.logger = logger(self) | |
| self.manager = manager | |
| self.leader_address = leader_address | |
| self.client_hostname = client_hostname | |
| # For connection retries. None means default | |
| self._max_retries = 5 | |
| self._delay = None | |
| self._max_delay = None | |
| def do_start(self): | |
| self.spawn(self.connect) | |
| return service.NOT_READY | |
| def connect(self): | |
| while True: | |
| self.logger.debug("Connecting to leader at %s:%s" % | |
| self.leader_address) | |
| try: | |
| socket = util.connect_and_retry(self.leader_address, | |
| max_retries=self._max_retries, delay=self._delay, | |
| max_delay=self._max_delay) | |
| except IOError: | |
| raise ClusterError("Unable to connect to leader %s:%s" % | |
| self.leader_address) | |
| self.handle(socket) | |
| def handle(self, socket): | |
| self.set_ready() | |
| self.logger.debug("Connected to leader") | |
| client_address = self.client_hostname or socket.getsockname()[0] | |
| socket.send('%s\n' % client_address) | |
| # TODO: Use TCP keepalives | |
| keepalive = self._server_keepalive(socket) | |
| try: | |
| for line in util.line_protocol(socket, strip=False): | |
| if line == '\n': | |
| # Keepalive ack from leader | |
| keepalive.kill() | |
| keepalive = self._server_keepalive(socket) | |
| else: | |
| cluster = json.loads(line) | |
| if 'leader' in cluster: | |
| # Means you have the wrong leader, redirect | |
| host = cluster['leader'] | |
| port = cluster.get('port', self.leader_address[1]) | |
| self.leader_address = (host, port) | |
| self.logger.info("Redirected to %s:%s..." % | |
| self.leader_address) | |
| raise NewLeader() | |
| elif client_address in cluster['cluster']: | |
| # Only report cluster once I'm a member | |
| self.manager.cluster = set(cluster['cluster']) | |
| self.manager.trigger_callback() | |
| self._leader_election() | |
| except NewLeader: | |
| self.manager.trigger_callback() | |
| if self.leader_address[0] == client_address: | |
| self.manager.is_leader = True | |
| self.stop() | |
| else: | |
| return | |
| def _server_keepalive(self, socket): | |
| return self.spawn_later(SERVER_KEEPALIVE_SECONDS, | |
| lambda: socket.send('\n')) | |
| def _leader_election(self): | |
| candidates = list(self.manager.cluster) | |
| candidates.remove(self.leader_address[0]) | |
| candidates.sort() | |
| self.manager.leader = candidates[0] | |
| self.logger.info("New leader %s:%s..." % self.manager.leader_address) | |
| # TODO: if i end up thinking i'm the leader when i'm not | |
| # then i will not rejoin the cluster | |
| raise NewLeader() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| from gevent.hub import greenlet | |
| import gevent | |
| import gevent.event | |
| import gevent.socket | |
| import nose.tools | |
| from gevent_tools.toys import cluster | |
| port = 6000 | |
| class ClusterManagerMock(object): | |
| is_leader = True | |
| class client: | |
| leader_address = ('127.0.0.1', port) | |
| def __init__(self): | |
| self.cluster = set() | |
| def trigger_callback(self): | |
| pass | |
| def yield_(n=1): | |
| for i in xrange(n): | |
| gevent.sleep(0) | |
| # cluster.PeerServer | |
| def test_leader_first_returns_existing_cluster(): | |
| cm = ClusterManagerMock() | |
| cm.cluster = set(['a', 'b', 'c']) | |
| with cluster.PeerServer(cm, ('127.0.0.1', port)) as s: | |
| socket = gevent.socket.create_connection(('127.0.0.1', port)) | |
| sockfile = socket.makefile() | |
| line = sockfile.readline() | |
| obj = json.loads(line.strip()) | |
| assert set(obj['cluster']) == cm.cluster, \ | |
| "First line didn't report correct cluster: %s" % line | |
| def test_nonleader_redirects_to_leader_with_port(): | |
| cm = ClusterManagerMock() | |
| cm.is_leader = False | |
| with cluster.PeerServer(cm, ('127.0.0.1', port)) as s: | |
| socket = gevent.socket.create_connection(('127.0.0.1', port)) | |
| sockfile = socket.makefile() | |
| line = sockfile.readline() | |
| obj = json.loads(line.strip()) | |
| assert obj['leader'] == cm.client.leader_address[0], \ | |
| "First line didn't point to leader: %s" % line | |
| assert 'port' in obj, "Did not include port" | |
| def test_leader_adds_node_to_cluster_by_name(): | |
| node_name = 'node-a' | |
| cm = ClusterManagerMock() | |
| cm.is_leader = True | |
| with cluster.PeerServer(cm, ('127.0.0.1', port)) as s: | |
| socket = gevent.socket.create_connection(('127.0.0.1', port)) | |
| sockfile = socket.makefile() | |
| line = sockfile.readline() | |
| socket.send("%s\n" % node_name) | |
| line = sockfile.readline() | |
| assert node_name in cm.cluster, \ | |
| "Leader did not add name to cluster: %s" % line | |
| del socket | |
| del sockfile | |
| def test_leader_adds_node_to_cluster_by_hostip(): | |
| cm = ClusterManagerMock() | |
| cm.is_leader = True | |
| with cluster.PeerServer(cm, ('127.0.0.1', port)) as s: | |
| socket = gevent.socket.create_connection(('127.0.0.1', port)) | |
| sockfile = socket.makefile() | |
| line = sockfile.readline() | |
| socket.send("\n") | |
| line = sockfile.readline() | |
| assert '127.0.0.1' in cm.cluster, \ | |
| "Leader did not add host to cluster: %s" % line | |
| def test_drop_node_on_disconnect(): | |
| node_name = 'node' | |
| cm = ClusterManagerMock() | |
| cm.is_leader = True | |
| with cluster.PeerServer(cm, ('127.0.0.1', port)) as s: | |
| socket = gevent.socket.create_connection(('127.0.0.1', port)) | |
| sockfile = socket.makefile() | |
| line = sockfile.readline() # cluster list pre-join | |
| socket.send("%s\n" % node_name) | |
| line = sockfile.readline() # cluster list post-join | |
| assert node_name in cm.cluster, \ | |
| "Leader did not add host to cluster: %s" % line | |
| socket.shutdown(0) | |
| # Make sure to cleanup file descriptors: | |
| del sockfile | |
| del socket | |
| yield_(2) # Yield to let PeerServer catch the disconnect | |
| assert not node_name in cm.cluster, \ | |
| "Leader did not remove host from cluster." | |
| # cluster.PeerClient | |
| def test_joins_cluster(): | |
| interface = '127.0.0.1' # I guess we'll just assume this | |
| cm = ClusterManagerMock() | |
| cm.is_leader = True | |
| cm.cluster = set() | |
| with cluster.PeerServer(cm, ('0.0.0.0', port)) as s: | |
| assert len(cm.cluster) == 1, "Cluster is not only leader" | |
| with cluster.PeerClient(cm, s.address) as c: | |
| # Enter will block until "ready", in this case connected | |
| yield_(2) # Yield to let PeerServer update cluster roster | |
| assert len(cm.cluster) > 1, "Client did not join cluster" | |
| assert interface in cm.cluster, \ | |
| "Client did not join cluster using interface as name" | |
| def test_joins_cluster_with_named_host(): | |
| name = 'localhost' | |
| cm = ClusterManagerMock() | |
| cm.is_leader = True | |
| cm.cluster = set() | |
| with cluster.PeerServer(cm, ('0.0.0.0', port)) as s: | |
| assert len(cm.cluster) == 1, "Cluster is not only leader" | |
| with cluster.PeerClient(cm, s.address, client_hostname=name) as c: | |
| # Enter will block until "ready", in this case connected | |
| yield_(2) # Yield to let PeerServer update cluster roster | |
| assert len(cm.cluster) > 1, "Client did not join cluster" | |
| assert name in cm.cluster, \ | |
| "Client did not join cluster using interface as name" | |
| def test_follows_redirect_to_leader(): | |
| follower = ClusterManagerMock() | |
| follower.is_leader = False | |
| follower.client.leader_address = ('127.0.0.1', 6001) | |
| with cluster.PeerServer(follower, ('0.0.0.0', port)) as s1: | |
| leader = ClusterManagerMock() | |
| leader.is_leader = True | |
| with cluster.PeerServer(leader, ('127.0.0.1', 6001)) as s2: | |
| cm = ClusterManagerMock() | |
| cm.is_leader = False | |
| with cluster.PeerClient(cm, ('127.0.0.1', port)) as c: | |
| yield_(6) | |
| # If follower was a real node, it would not have an empty roster. | |
| # However it's useful here to see we connected to the right node. | |
| assert len(follower.cluster) == 0, "Joined on the wrong node: %s" % follower.cluster | |
| assert len(leader.cluster) > 0, "Didn't join on the right node" | |
| @nose.tools.raises(cluster.ClusterError) | |
| def test_raise_when_cant_connect(): | |
| cm = ClusterManagerMock() | |
| cm.is_leader = False | |
| c = cluster.PeerClient(cm, ('', 16666)) | |
| c._max_retries = 1 | |
| c._delay = 0.1 | |
| c._max_delay = 0.5 | |
| c.catch(cluster.ClusterError, lambda e,g: g.throw(e)) | |
| c.start() | |
| c.stop() | |
| def test_leader_election_on_disconnect(): | |
| pass | |
| # cluster.ClusterManager (integration/functional testing) | |
| def test_cluster_manager_as_leader(): | |
| roster = [] | |
| updated = gevent.event.Event() | |
| def callback(c): | |
| del roster[:] | |
| roster.extend(c) | |
| updated.set() | |
| with cluster.ClusterManager(callback, listen_address=('127.0.0.1', port)) as node1: | |
| updated.wait(timeout=1) | |
| assert '127.0.0.1' in roster, "Node is not in cluster" | |
| def test_cluster_manager_as_follower(): | |
| roster = [] | |
| updated = gevent.event.Event() | |
| def callback(c): | |
| del roster[:] | |
| roster.extend(c) | |
| updated.set() | |
| leader = ClusterManagerMock() | |
| leader.is_leader = True | |
| with cluster.PeerServer(leader, ('127.0.0.1', port)) as s: | |
| with cluster.ClusterManager(callback, listen_address=('127.0.0.1', port+1), | |
| leader_address=('127.0.0.1', port), client_hostname="localhost") as follower: | |
| updated.wait(timeout=1) | |
| assert set(roster) == set([u'127.0.0.1', u'localhost']), \ | |
| "Follower did not get full roster: %s" % roster |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment