Created
December 24, 2015 05:23
-
-
Save matteobertozzi/5f24ce4f181cd4245820 to your computer and use it in GitHub Desktop.
Raft Leader Election
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from random import randint, shuffle | |
| from uuid import uuid4 | |
| from time import time, sleep | |
| import logging | |
| FORMAT = '%(asctime)s %(levelname)s %(funcName)s():%(lineno)d - %(message)s' | |
| logging.basicConfig(format=FORMAT) | |
| LOG = logging.getLogger('raft') | |
| RAFT_ROLE_FOLLOWER = 1 | |
| RAFT_ROLE_CANDIDATE = 2 | |
| RAFT_ROLE_LEADER = 3 | |
| RAFT_HEARTBEAT_TIMEOUT = 5000 | |
| RAFT_ELECTION_TIMEOUT_MIN = (RAFT_HEARTBEAT_TIMEOUT * 2) | |
| RAFT_ELECTION_TIMEOUT_MAX = (RAFT_HEARTBEAT_TIMEOUT * 3) | |
| class RaftVoteReq(object): | |
| def __init__(self, raft): | |
| self.term = raft.term | |
| self.last_log_index = 0 | |
| self.last_log_term = 0 | |
| class RaftVoteResp(object): | |
| def __init__(self, raft, granted): | |
| self.term = raft.term | |
| self.granted = granted | |
| class RaftAppendReq(object): | |
| def __init__(self, raft): | |
| self.leader_uid = raft.leader_uid | |
| self.term = raft.term | |
| self.prev_log_index = 0 | |
| self.prev_log_term = 0 | |
| self.commit_index = 0 | |
| class RaftAppendResp(object): | |
| def __init__(self, raft, success): | |
| self.term = raft.term | |
| self.success = success | |
| class RaftNode(object): | |
| def send(self, node_uid, msg): | |
| pass | |
| class Raft(object): | |
| def __init__(self): | |
| # raft state | |
| self.role = RAFT_ROLE_FOLLOWER | |
| self.my_uuid = uuid4() | |
| self.last_contact_ts = 0 | |
| # raft-object state (persistent) | |
| self.term = 0 | |
| self.voted_for = None | |
| self.leader_uid = None | |
| # candidate-state | |
| self.election_start_ts = 0 | |
| self.election_timeout = 0 | |
| self.granted_votes = set() | |
| self.votes_needed = 0 | |
| self.ensemble = set() | |
| # config | |
| self.election_timeout_min = RAFT_ELECTION_TIMEOUT_MIN | |
| self.election_timeout_max = RAFT_ELECTION_TIMEOUT_MAX | |
| self.heartbeat_timeout = RAFT_HEARTBEAT_TIMEOUT | |
| def on_timeout(self): | |
| if self.role == RAFT_ROLE_FOLLOWER: | |
| elapsed = (time() - self.last_contact_ts) * 1000 | |
| if elapsed > self.election_timeout_max: | |
| # got no heartbeats, leader is probably dead. | |
| # estabilish candidacy and run for election | |
| LOG.debug('%s: Heartbeat timeout reached, starting election' % self.my_uuid) | |
| return self._become_candidate() | |
| elif self.role == RAFT_ROLE_CANDIDATE: | |
| elapsed = (time() - self.election_start_ts) * 1000 | |
| if elapsed < self.election_timeout: | |
| # we are in an election and haven't won. | |
| # repoll peers that haven't responded yet. | |
| return self._campaign() | |
| else: | |
| # the election timeout has expired, and we still haven't won or lost. | |
| # call a new election. | |
| LOG.debug('%s Election timeout reached, restarting election' % self.my_uuid) | |
| self.election_timeout += 25 # 25msec | |
| return self._become_candidate() | |
| elif self.role == RAFT_ROLE_LEADER: | |
| return self._send_heartbeat() | |
| def on_vote_request(self, node, vote): | |
| # check if the term is greater than ours, bail | |
| if vote.term > self.term: | |
| LOG.debug('%s: Caller has newer term, updating. Ours was %d, theirs is %d' % (self.my_uuid, self.term, vote.term)) | |
| self._step_down(vote.term) | |
| # At this point, if leaderId != 0, we could tell the caller to step down. | |
| # However, this is just an optimization that does not affect correctness | |
| # or really even efficiency, so it's not worth the trouble. | |
| if vote.term == self.term and self.voted_for is None: | |
| self._step_down(self.term) | |
| self.voted_for = node.uid | |
| self._log_state_update() | |
| granted = (vote.term == self.term and self.voted_for == node.uid) | |
| node.send(self.my_uuid, RaftVoteResp(self, granted)) | |
| def on_vote_response(self, node, vote): | |
| if self.term != vote.term: | |
| # we don't care about result of RPC | |
| LOG.debug("%s: Ignore vote response, the term don't match %d vs %d" % (self.my_uuid, self.term, vote.term)) | |
| return | |
| if self.role != RAFT_ROLE_CANDIDATE: | |
| # we don't care about result of RPC | |
| LOG.debug("%s: Ignore vote response, we are not a candidate" % self.my_uuid) | |
| return | |
| # check if the term is greater than ours, bail | |
| if vote.term > self.term: | |
| return self._step_down(vote.term); | |
| # check if vote is granted | |
| if vote.granted: | |
| self.granted_votes.add(node.uid); | |
| LOG.debug('%s: Vote granted by %s. Tally: %s' % (self.my_uuid, node.uid, self.granted_votes)); | |
| # check if we've become the leader | |
| if len(self.granted_votes) >= self.votes_needed: | |
| LOG.info("%s: Election won. Tally: %u Term: %u Total-Time: %s", | |
| self.my_uuid, len(self.granted_votes), self.term, time() - self.election_start_ts); | |
| # TODO: Adjust the timeout based on this the elapsed time | |
| self._become_leader() | |
| def on_append_request(self, node, append): | |
| # If the caller's term is stale, just return our term to it | |
| if append.term < self.term: | |
| LOG.debug("%s: caller is stale. Our term is %d, theirs is %d" % (self.my_uuid, node.uid, self.term, append.term)) | |
| return node.send(self.my_uuid, RaftAppendResp(self)) | |
| if append.term > self.term: | |
| LOG.debug("%s: Caller(%s) has newer term, updating. Ours was %d, theirs is %d" % (self.my_uuid, node.uid, self.term, append.term)) | |
| # This request is a sign of life from the current leader. | |
| # Update our term and convert to follower if necessary. | |
| self._step_down(append.term) | |
| if self.leader_uid is None: | |
| self.leader_uid = append.leader_uid | |
| LOG.info("%s: All hail leader %s for term %d" % (self.my_uuid, self.leader_uid, self.term)) | |
| return node.send(self.my_uuid, RaftAppendResp(self, False)) | |
| def on_append_response(self, node, append): | |
| if self.term != append.term or self.role != RAFT_ROLE_LEADER: | |
| # we don't care about result of RPC | |
| return | |
| # Since we were leader in this term before, | |
| # we must still be leader in this term | |
| assert self.role == RAFT_ROLE_LEADER, "expected to be leader, role %d" % self.role; | |
| if append.term > self.term: | |
| self._step_down(append.term) | |
| # Raft candidate actions | |
| # ---------------------------------------------------- | |
| def _become_candidate(self): | |
| self.role = RAFT_ROLE_CANDIDATE | |
| self.term += 1 | |
| self.leader_uid = None | |
| self.voted_for = self.my_uuid | |
| self._log_state_update() | |
| self._set_election_timeout() | |
| self.granted_votes.clear() | |
| self.votes_needed = len(self.ensemble) / 2 | |
| self.election_start_ts = time() | |
| return self._campaign() | |
| def _campaign(self): | |
| if len(self.granted_votes) >= self.votes_needed: | |
| LOG.debug('%s: no need for a campaign, we have already all the votes' % self.my_uuid) | |
| return 0 | |
| vote_req = RaftVoteReq(self) | |
| for node in self.ensemble: | |
| node.send(self.my_uuid, vote_req) | |
| # Raft follower actions | |
| # ---------------------------------------------------- | |
| def _become_follower(self): | |
| self.role = RAFT_ROLE_FOLLOWER | |
| self._set_election_timeout() | |
| def _step_down(self, new_term): | |
| assert self.term <= new_term, 'term %d new-term %d' % (self.term, new_term) | |
| if self.term < new_term: | |
| LOG.warn('%s: newer term discovered, fallback to follower' % self.my_uuid) | |
| self.term = new_term | |
| self.leader_uid = None | |
| self.voted_for = None | |
| self._log_state_update() | |
| self.last_contact_ts = time() | |
| return self._become_follower() | |
| # Raft leader actions | |
| # ---------------------------------------------------- | |
| def _become_leader(self): | |
| self.role = RAFT_ROLE_LEADER | |
| self.leader_uid = self.my_uuid | |
| LOG.info("%s: I'm the Leader" % self.my_uuid) | |
| def _send_heartbeat(self): | |
| for node in self.ensemble: | |
| LOG.debug('%s: send ping to %s' % (self.my_uuid, node.uid)) | |
| node.send(self.my_uuid, RaftAppendReq(self)) | |
| def _set_election_timeout(self): | |
| self.election_timeout = randint(self.election_timeout_min, self.election_timeout_max) | |
| def _log_state_update(self): | |
| pass | |
| _raft_nodes = {} | |
| class RaftServer(RaftNode): | |
| def __init__(self): | |
| self.raft = Raft() | |
| self.uid = self.raft.my_uuid | |
| def send(self, node_uid, msg): | |
| node = _raft_nodes[node_uid] | |
| if isinstance(msg, RaftAppendReq): | |
| self.raft.on_append_request(node, msg) | |
| elif isinstance(msg, RaftAppendResp): | |
| self.raft.on_append_response(node, msg) | |
| elif isinstance(msg, RaftVoteReq): | |
| self.raft.on_vote_request(node, msg) | |
| elif isinstance(msg, RaftVoteResp): | |
| self.raft.on_vote_response(node, msg) | |
| else: | |
| LOG.error('%s: unsupported msg %s from %s' % (self.uid, msg, node_uid)) | |
| def __repr__(self): | |
| return 'RaftServer(%s)' % self.uid | |
| if __name__ == '__main__': | |
| LOG.setLevel(logging.INFO) | |
| for i in xrange(3): | |
| node = RaftServer() | |
| _raft_nodes[node.uid] = node | |
| for uid, node in _raft_nodes.items(): | |
| node.raft.ensemble |= set(_raft_nodes[k] for k in _raft_nodes.keys() if k != uid) | |
| while True: | |
| ensemble = _raft_nodes.values() | |
| shuffle(ensemble) | |
| for node in ensemble: | |
| node.raft.on_timeout() | |
| sleep(randint(1, 2 + (RAFT_ELECTION_TIMEOUT_MAX / 1000.0))) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment