Skip to content

Instantly share code, notes, and snippets.

@matteobertozzi
Created December 24, 2015 05:23
Show Gist options
  • Select an option

  • Save matteobertozzi/5f24ce4f181cd4245820 to your computer and use it in GitHub Desktop.

Select an option

Save matteobertozzi/5f24ce4f181cd4245820 to your computer and use it in GitHub Desktop.
Raft Leader Election
from random import randint, shuffle
from uuid import uuid4
from time import time, sleep
import logging
FORMAT = '%(asctime)s %(levelname)s %(funcName)s():%(lineno)d - %(message)s'
logging.basicConfig(format=FORMAT)
LOG = logging.getLogger('raft')
RAFT_ROLE_FOLLOWER = 1
RAFT_ROLE_CANDIDATE = 2
RAFT_ROLE_LEADER = 3
RAFT_HEARTBEAT_TIMEOUT = 5000
RAFT_ELECTION_TIMEOUT_MIN = (RAFT_HEARTBEAT_TIMEOUT * 2)
RAFT_ELECTION_TIMEOUT_MAX = (RAFT_HEARTBEAT_TIMEOUT * 3)
class RaftVoteReq(object):
def __init__(self, raft):
self.term = raft.term
self.last_log_index = 0
self.last_log_term = 0
class RaftVoteResp(object):
def __init__(self, raft, granted):
self.term = raft.term
self.granted = granted
class RaftAppendReq(object):
def __init__(self, raft):
self.leader_uid = raft.leader_uid
self.term = raft.term
self.prev_log_index = 0
self.prev_log_term = 0
self.commit_index = 0
class RaftAppendResp(object):
def __init__(self, raft, success):
self.term = raft.term
self.success = success
class RaftNode(object):
def send(self, node_uid, msg):
pass
class Raft(object):
def __init__(self):
# raft state
self.role = RAFT_ROLE_FOLLOWER
self.my_uuid = uuid4()
self.last_contact_ts = 0
# raft-object state (persistent)
self.term = 0
self.voted_for = None
self.leader_uid = None
# candidate-state
self.election_start_ts = 0
self.election_timeout = 0
self.granted_votes = set()
self.votes_needed = 0
self.ensemble = set()
# config
self.election_timeout_min = RAFT_ELECTION_TIMEOUT_MIN
self.election_timeout_max = RAFT_ELECTION_TIMEOUT_MAX
self.heartbeat_timeout = RAFT_HEARTBEAT_TIMEOUT
def on_timeout(self):
if self.role == RAFT_ROLE_FOLLOWER:
elapsed = (time() - self.last_contact_ts) * 1000
if elapsed > self.election_timeout_max:
# got no heartbeats, leader is probably dead.
# estabilish candidacy and run for election
LOG.debug('%s: Heartbeat timeout reached, starting election' % self.my_uuid)
return self._become_candidate()
elif self.role == RAFT_ROLE_CANDIDATE:
elapsed = (time() - self.election_start_ts) * 1000
if elapsed < self.election_timeout:
# we are in an election and haven't won.
# repoll peers that haven't responded yet.
return self._campaign()
else:
# the election timeout has expired, and we still haven't won or lost.
# call a new election.
LOG.debug('%s Election timeout reached, restarting election' % self.my_uuid)
self.election_timeout += 25 # 25msec
return self._become_candidate()
elif self.role == RAFT_ROLE_LEADER:
return self._send_heartbeat()
def on_vote_request(self, node, vote):
# check if the term is greater than ours, bail
if vote.term > self.term:
LOG.debug('%s: Caller has newer term, updating. Ours was %d, theirs is %d' % (self.my_uuid, self.term, vote.term))
self._step_down(vote.term)
# At this point, if leaderId != 0, we could tell the caller to step down.
# However, this is just an optimization that does not affect correctness
# or really even efficiency, so it's not worth the trouble.
if vote.term == self.term and self.voted_for is None:
self._step_down(self.term)
self.voted_for = node.uid
self._log_state_update()
granted = (vote.term == self.term and self.voted_for == node.uid)
node.send(self.my_uuid, RaftVoteResp(self, granted))
def on_vote_response(self, node, vote):
if self.term != vote.term:
# we don't care about result of RPC
LOG.debug("%s: Ignore vote response, the term don't match %d vs %d" % (self.my_uuid, self.term, vote.term))
return
if self.role != RAFT_ROLE_CANDIDATE:
# we don't care about result of RPC
LOG.debug("%s: Ignore vote response, we are not a candidate" % self.my_uuid)
return
# check if the term is greater than ours, bail
if vote.term > self.term:
return self._step_down(vote.term);
# check if vote is granted
if vote.granted:
self.granted_votes.add(node.uid);
LOG.debug('%s: Vote granted by %s. Tally: %s' % (self.my_uuid, node.uid, self.granted_votes));
# check if we've become the leader
if len(self.granted_votes) >= self.votes_needed:
LOG.info("%s: Election won. Tally: %u Term: %u Total-Time: %s",
self.my_uuid, len(self.granted_votes), self.term, time() - self.election_start_ts);
# TODO: Adjust the timeout based on this the elapsed time
self._become_leader()
def on_append_request(self, node, append):
# If the caller's term is stale, just return our term to it
if append.term < self.term:
LOG.debug("%s: caller is stale. Our term is %d, theirs is %d" % (self.my_uuid, node.uid, self.term, append.term))
return node.send(self.my_uuid, RaftAppendResp(self))
if append.term > self.term:
LOG.debug("%s: Caller(%s) has newer term, updating. Ours was %d, theirs is %d" % (self.my_uuid, node.uid, self.term, append.term))
# This request is a sign of life from the current leader.
# Update our term and convert to follower if necessary.
self._step_down(append.term)
if self.leader_uid is None:
self.leader_uid = append.leader_uid
LOG.info("%s: All hail leader %s for term %d" % (self.my_uuid, self.leader_uid, self.term))
return node.send(self.my_uuid, RaftAppendResp(self, False))
def on_append_response(self, node, append):
if self.term != append.term or self.role != RAFT_ROLE_LEADER:
# we don't care about result of RPC
return
# Since we were leader in this term before,
# we must still be leader in this term
assert self.role == RAFT_ROLE_LEADER, "expected to be leader, role %d" % self.role;
if append.term > self.term:
self._step_down(append.term)
# Raft candidate actions
# ----------------------------------------------------
def _become_candidate(self):
self.role = RAFT_ROLE_CANDIDATE
self.term += 1
self.leader_uid = None
self.voted_for = self.my_uuid
self._log_state_update()
self._set_election_timeout()
self.granted_votes.clear()
self.votes_needed = len(self.ensemble) / 2
self.election_start_ts = time()
return self._campaign()
def _campaign(self):
if len(self.granted_votes) >= self.votes_needed:
LOG.debug('%s: no need for a campaign, we have already all the votes' % self.my_uuid)
return 0
vote_req = RaftVoteReq(self)
for node in self.ensemble:
node.send(self.my_uuid, vote_req)
# Raft follower actions
# ----------------------------------------------------
def _become_follower(self):
self.role = RAFT_ROLE_FOLLOWER
self._set_election_timeout()
def _step_down(self, new_term):
assert self.term <= new_term, 'term %d new-term %d' % (self.term, new_term)
if self.term < new_term:
LOG.warn('%s: newer term discovered, fallback to follower' % self.my_uuid)
self.term = new_term
self.leader_uid = None
self.voted_for = None
self._log_state_update()
self.last_contact_ts = time()
return self._become_follower()
# Raft leader actions
# ----------------------------------------------------
def _become_leader(self):
self.role = RAFT_ROLE_LEADER
self.leader_uid = self.my_uuid
LOG.info("%s: I'm the Leader" % self.my_uuid)
def _send_heartbeat(self):
for node in self.ensemble:
LOG.debug('%s: send ping to %s' % (self.my_uuid, node.uid))
node.send(self.my_uuid, RaftAppendReq(self))
def _set_election_timeout(self):
self.election_timeout = randint(self.election_timeout_min, self.election_timeout_max)
def _log_state_update(self):
pass
_raft_nodes = {}
class RaftServer(RaftNode):
def __init__(self):
self.raft = Raft()
self.uid = self.raft.my_uuid
def send(self, node_uid, msg):
node = _raft_nodes[node_uid]
if isinstance(msg, RaftAppendReq):
self.raft.on_append_request(node, msg)
elif isinstance(msg, RaftAppendResp):
self.raft.on_append_response(node, msg)
elif isinstance(msg, RaftVoteReq):
self.raft.on_vote_request(node, msg)
elif isinstance(msg, RaftVoteResp):
self.raft.on_vote_response(node, msg)
else:
LOG.error('%s: unsupported msg %s from %s' % (self.uid, msg, node_uid))
def __repr__(self):
return 'RaftServer(%s)' % self.uid
if __name__ == '__main__':
LOG.setLevel(logging.INFO)
for i in xrange(3):
node = RaftServer()
_raft_nodes[node.uid] = node
for uid, node in _raft_nodes.items():
node.raft.ensemble |= set(_raft_nodes[k] for k in _raft_nodes.keys() if k != uid)
while True:
ensemble = _raft_nodes.values()
shuffle(ensemble)
for node in ensemble:
node.raft.on_timeout()
sleep(randint(1, 2 + (RAFT_ELECTION_TIMEOUT_MAX / 1000.0)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment