Skip to content

Instantly share code, notes, and snippets.

@progrium
Created September 22, 2011 02:56
Show Gist options
  • Select an option

  • Save progrium/1233917 to your computer and use it in GitHub Desktop.

Select an option

Save progrium/1233917 to your computer and use it in GitHub Desktop.
A distributed group membership module
"""A distributed group membership module
This provides distributed group membership for easily building clustered
applications with gevent. Using this in your app, you just provide the IP
of another node in the cluster and it will receive the IPs of all nodes in
the cluster. When a node joins or drops from the cluster, all other nodes find
out immediately.
The roster is managed by a leader. When you create a cluster, you tell the
first node it is the leader (by simply pointing it to its own IP). As you
add nodes, you can point them to the leader or any other node. If a node
is not the leader, it will redirect the connection to the leader. All nodes
also maintain a keepalive with the leader.
If the leader drops from the cluster, the nodes will dumbly pick a new leader
by taking the remaining node list, sorting it, and picking the first node. If
a node happens to get a different leader, as long as it is in the cluster, it
will be redirected to the right leader.
"""
import gevent.monkey; gevent.monkey.patch_all(thread=False)
import logging
import socket
import json
import gevent
import gevent.server
import gevent.socket
from gevent_tools import util
from gevent_tools import service
CLIENT_TIMEOUT_SECONDS = 10
SERVER_KEEPALIVE_SECONDS = 5
def logger(obj):
name = '%s.%s' % (obj.__module__, obj.__class__.__name__)
return logging.getLogger(name)
class ClusterError(Exception): pass
class NewLeader(Exception): pass
class ClusterManager(service.Service):
def __init__(self, callback, listen_address, leader_address=None, client_hostname=None):
self.server = PeerServer(self, listen_address)
self.client = PeerClient(self, leader_address, client_hostname)
self.cluster = set()
self._callback = callback
self.add_service(self.server)
if leader_address:
self.add_service(self.client)
self.is_leader = False
else:
self.is_leader = True
def trigger_callback(self):
if self._callback:
self._callback(self.cluster.copy())
class PeerServer(service.Service):
def __init__(self, manager, address):
self.logger = logger(self)
self.manager = manager
self.address = address
self.clients = {}
self.server = gevent.server.StreamServer(address,
handle=self.handle, spawn=self.spawn)
self.add_service(self.server)
def do_start(self):
if self.manager.is_leader:
self.manager.cluster.add(self.address[0])
self.manager.trigger_callback()
gevent.sleep(0)
def handle(self, socket, address):
"""
If not a leader, a node will simply return a single item list pointing
to the leader. Otherwise, it will add the host of the connected client
to the cluster roster, broadcast to all nodes the new roster, and wait
for keepalives. If no keepalive within timeout or the client drops, it
drops it from the roster and broadcasts to all remaining nodes.
"""
self.logger.debug('New connection from %s:%s' % address)
if not self.manager.is_leader:
socket.send(json.dumps({'leader': self.manager.client.leader_address[0],
'port': self.manager.client.leader_address[1]}))
socket.close()
self.logger.debug("Redirected to %s:%s" % self.manager.client.leader_address)
else:
socket.send(self._cluster_message())
sockfile = socket.makefile()
name = sockfile.readline()
if not name:
return
if name == '\n':
name = address[0]
else:
name = name.strip()
self._update(add={'host': name, 'socket': socket})
# TODO: Use TCP keepalives
timeout = self._client_timeout(socket)
for line in util.line_protocol(sockfile, strip=False):
timeout.kill()
timeout = self._client_timeout(socket)
socket.send('\n')
self.logger.debug("Keepalive from %s:%s" % address)
self.logger.debug("Client disconnected from %s:%s" % address)
self._update(remove=name)
def _client_timeout(self, socket):
def shutdown(socket):
try:
socket.shutdown(0)
except IOError:
pass
return self.spawn_later(CLIENT_TIMEOUT_SECONDS,
lambda: shutdown(socket))
def _cluster_message(self):
return '%s\n' % json.dumps({'cluster': list(self.manager.cluster)})
def _update(self, add=None, remove=None):
""" Used by leader to manage and broadcast roster """
if add is not None:
self.manager.cluster.add(add['host'])
self.clients[add['host']] = add['socket']
self.logger.debug("Added to cluster: %s" % add['host'])
if remove is not None:
self.manager.cluster.remove(remove)
del self.clients[remove]
self.logger.debug("Removed from cluster: %s" % remove)
for client in self.clients:
self.clients[client].send(self._cluster_message())
self.manager.trigger_callback()
class PeerClient(service.Service):
def __init__(self, manager, leader_address, client_hostname=None):
self.logger = logger(self)
self.manager = manager
self.leader_address = leader_address
self.client_hostname = client_hostname
# For connection retries. None means default
self._max_retries = 5
self._delay = None
self._max_delay = None
def do_start(self):
self.spawn(self.connect)
return service.NOT_READY
def connect(self):
while True:
self.logger.debug("Connecting to leader at %s:%s" %
self.leader_address)
try:
socket = util.connect_and_retry(self.leader_address,
max_retries=self._max_retries, delay=self._delay,
max_delay=self._max_delay)
except IOError:
raise ClusterError("Unable to connect to leader %s:%s" %
self.leader_address)
self.handle(socket)
def handle(self, socket):
self.set_ready()
self.logger.debug("Connected to leader")
client_address = self.client_hostname or socket.getsockname()[0]
socket.send('%s\n' % client_address)
# TODO: Use TCP keepalives
keepalive = self._server_keepalive(socket)
try:
for line in util.line_protocol(socket, strip=False):
if line == '\n':
# Keepalive ack from leader
keepalive.kill()
keepalive = self._server_keepalive(socket)
else:
cluster = json.loads(line)
if 'leader' in cluster:
# Means you have the wrong leader, redirect
host = cluster['leader']
port = cluster.get('port', self.leader_address[1])
self.leader_address = (host, port)
self.logger.info("Redirected to %s:%s..." %
self.leader_address)
raise NewLeader()
elif client_address in cluster['cluster']:
# Only report cluster once I'm a member
self.manager.cluster = set(cluster['cluster'])
self.manager.trigger_callback()
self._leader_election()
except NewLeader:
self.manager.trigger_callback()
if self.leader_address[0] == client_address:
self.manager.is_leader = True
self.stop()
else:
return
def _server_keepalive(self, socket):
return self.spawn_later(SERVER_KEEPALIVE_SECONDS,
lambda: socket.send('\n'))
def _leader_election(self):
candidates = list(self.manager.cluster)
candidates.remove(self.leader_address[0])
candidates.sort()
self.manager.leader = candidates[0]
self.logger.info("New leader %s:%s..." % self.manager.leader_address)
# TODO: if i end up thinking i'm the leader when i'm not
# then i will not rejoin the cluster
raise NewLeader()
import json
from gevent.hub import greenlet
import gevent
import gevent.event
import gevent.socket
import nose.tools
from gevent_tools.toys import cluster
port = 6000
class ClusterManagerMock(object):
is_leader = True
class client:
leader_address = ('127.0.0.1', port)
def __init__(self):
self.cluster = set()
def trigger_callback(self):
pass
def yield_(n=1):
for i in xrange(n):
gevent.sleep(0)
# cluster.PeerServer
def test_leader_first_returns_existing_cluster():
cm = ClusterManagerMock()
cm.cluster = set(['a', 'b', 'c'])
with cluster.PeerServer(cm, ('127.0.0.1', port)) as s:
socket = gevent.socket.create_connection(('127.0.0.1', port))
sockfile = socket.makefile()
line = sockfile.readline()
obj = json.loads(line.strip())
assert set(obj['cluster']) == cm.cluster, \
"First line didn't report correct cluster: %s" % line
def test_nonleader_redirects_to_leader_with_port():
cm = ClusterManagerMock()
cm.is_leader = False
with cluster.PeerServer(cm, ('127.0.0.1', port)) as s:
socket = gevent.socket.create_connection(('127.0.0.1', port))
sockfile = socket.makefile()
line = sockfile.readline()
obj = json.loads(line.strip())
assert obj['leader'] == cm.client.leader_address[0], \
"First line didn't point to leader: %s" % line
assert 'port' in obj, "Did not include port"
def test_leader_adds_node_to_cluster_by_name():
node_name = 'node-a'
cm = ClusterManagerMock()
cm.is_leader = True
with cluster.PeerServer(cm, ('127.0.0.1', port)) as s:
socket = gevent.socket.create_connection(('127.0.0.1', port))
sockfile = socket.makefile()
line = sockfile.readline()
socket.send("%s\n" % node_name)
line = sockfile.readline()
assert node_name in cm.cluster, \
"Leader did not add name to cluster: %s" % line
del socket
del sockfile
def test_leader_adds_node_to_cluster_by_hostip():
cm = ClusterManagerMock()
cm.is_leader = True
with cluster.PeerServer(cm, ('127.0.0.1', port)) as s:
socket = gevent.socket.create_connection(('127.0.0.1', port))
sockfile = socket.makefile()
line = sockfile.readline()
socket.send("\n")
line = sockfile.readline()
assert '127.0.0.1' in cm.cluster, \
"Leader did not add host to cluster: %s" % line
def test_drop_node_on_disconnect():
node_name = 'node'
cm = ClusterManagerMock()
cm.is_leader = True
with cluster.PeerServer(cm, ('127.0.0.1', port)) as s:
socket = gevent.socket.create_connection(('127.0.0.1', port))
sockfile = socket.makefile()
line = sockfile.readline() # cluster list pre-join
socket.send("%s\n" % node_name)
line = sockfile.readline() # cluster list post-join
assert node_name in cm.cluster, \
"Leader did not add host to cluster: %s" % line
socket.shutdown(0)
# Make sure to cleanup file descriptors:
del sockfile
del socket
yield_(2) # Yield to let PeerServer catch the disconnect
assert not node_name in cm.cluster, \
"Leader did not remove host from cluster."
# cluster.PeerClient
def test_joins_cluster():
interface = '127.0.0.1' # I guess we'll just assume this
cm = ClusterManagerMock()
cm.is_leader = True
cm.cluster = set()
with cluster.PeerServer(cm, ('0.0.0.0', port)) as s:
assert len(cm.cluster) == 1, "Cluster is not only leader"
with cluster.PeerClient(cm, s.address) as c:
# Enter will block until "ready", in this case connected
yield_(2) # Yield to let PeerServer update cluster roster
assert len(cm.cluster) > 1, "Client did not join cluster"
assert interface in cm.cluster, \
"Client did not join cluster using interface as name"
def test_joins_cluster_with_named_host():
name = 'localhost'
cm = ClusterManagerMock()
cm.is_leader = True
cm.cluster = set()
with cluster.PeerServer(cm, ('0.0.0.0', port)) as s:
assert len(cm.cluster) == 1, "Cluster is not only leader"
with cluster.PeerClient(cm, s.address, client_hostname=name) as c:
# Enter will block until "ready", in this case connected
yield_(2) # Yield to let PeerServer update cluster roster
assert len(cm.cluster) > 1, "Client did not join cluster"
assert name in cm.cluster, \
"Client did not join cluster using interface as name"
def test_follows_redirect_to_leader():
follower = ClusterManagerMock()
follower.is_leader = False
follower.client.leader_address = ('127.0.0.1', 6001)
with cluster.PeerServer(follower, ('0.0.0.0', port)) as s1:
leader = ClusterManagerMock()
leader.is_leader = True
with cluster.PeerServer(leader, ('127.0.0.1', 6001)) as s2:
cm = ClusterManagerMock()
cm.is_leader = False
with cluster.PeerClient(cm, ('127.0.0.1', port)) as c:
yield_(6)
# If follower was a real node, it would not have an empty roster.
# However it's useful here to see we connected to the right node.
assert len(follower.cluster) == 0, "Joined on the wrong node: %s" % follower.cluster
assert len(leader.cluster) > 0, "Didn't join on the right node"
@nose.tools.raises(cluster.ClusterError)
def test_raise_when_cant_connect():
cm = ClusterManagerMock()
cm.is_leader = False
c = cluster.PeerClient(cm, ('', 16666))
c._max_retries = 1
c._delay = 0.1
c._max_delay = 0.5
c.catch(cluster.ClusterError, lambda e,g: g.throw(e))
c.start()
c.stop()
def test_leader_election_on_disconnect():
pass
# cluster.ClusterManager (integration/functional testing)
def test_cluster_manager_as_leader():
roster = []
updated = gevent.event.Event()
def callback(c):
del roster[:]
roster.extend(c)
updated.set()
with cluster.ClusterManager(callback, listen_address=('127.0.0.1', port)) as node1:
updated.wait(timeout=1)
assert '127.0.0.1' in roster, "Node is not in cluster"
def test_cluster_manager_as_follower():
roster = []
updated = gevent.event.Event()
def callback(c):
del roster[:]
roster.extend(c)
updated.set()
leader = ClusterManagerMock()
leader.is_leader = True
with cluster.PeerServer(leader, ('127.0.0.1', port)) as s:
with cluster.ClusterManager(callback, listen_address=('127.0.0.1', port+1),
leader_address=('127.0.0.1', port), client_hostname="localhost") as follower:
updated.wait(timeout=1)
assert set(roster) == set([u'127.0.0.1', u'localhost']), \
"Follower did not get full roster: %s" % roster
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment