Last active
December 29, 2015 22:17
-
-
Save wolever/a97c4e8f6f3e8777170f to your computer and use it in GitHub Desktop.
Playing with implementing RAFT in Haskell (probably terrible)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
*.class | |
*.o | |
*.pyc | |
*.sqlite3 | |
*.sw[a-z] | |
*~ | |
.DS_Store | |
bin-debug/* | |
bin-release/* | |
bin/* | |
tags | |
*.beam | |
*.dump | |
env/ | |
.env/ | |
*egg-info* | |
misc/ | |
dist/ | |
Icon? | |
node_modules/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
raft: raft.hs | |
@echo "================================================================" | |
@echo "================================================================" | |
@echo "================================================================" | |
@echo "================================================================" | |
ghc -threaded -prof -auto-all -caf-all raft.hs | |
#./raft +RTS -xc | |
./raft +RTS -N4 | |
watch: | |
watchman-make -p *.hs -t raft |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE RecordWildCards #-} | |
{-# LANGUAGE NamedFieldPuns #-} | |
{-# LANGUAGE DisambiguateRecordFields #-} | |
import Debug.Trace | |
-- import Control.Lens | |
import System.Timeout | |
import Control.Concurrent.MVar | |
import qualified System.Random as R | |
import qualified Control.Concurrent.Chan as C | |
import Control.Concurrent ( forkIO, forkFinally, threadDelay ) | |
import Data.Time.Clock ( getCurrentTime, diffUTCTime ) | |
data ChanMsg = ChanMsg { | |
chanMsg :: Message, | |
sender :: Maybe ServerID | |
} deriving (Show); | |
data ServerID = ServerID { | |
serverName :: String, | |
wChan :: C.Chan ChanMsg | |
} deriving (Eq) | |
instance Show ServerID where | |
show s = "<" ++ (serverName s) ++ ">" | |
data ServerRole = | |
Leader | | |
Follower | | |
Candidate | | |
Shutdown | |
deriving (Show, Eq) | |
data LogEntry = LogEntry { | |
eTerm :: Int, | |
eIdx :: Int, | |
eKey :: String, | |
eVal :: String | |
} deriving (Show) | |
data ServerState = ServerState { | |
serverID :: ServerID, | |
serverPeers :: [ServerID], | |
serverRole :: ServerRole, | |
serverLeader :: Maybe ServerID, | |
currentTerm :: Int, | |
votedFor :: Maybe ServerID, | |
logEntries :: [LogEntry], | |
commitIndex :: Int, | |
lastAppliedIndex :: Int | |
} | |
instance Show ServerState where | |
show s = show (serverID s) | |
data Message = | |
MsgRead { rKey :: String } | | |
MsgReadResSuccess { rKey :: String, rVal :: Maybe String } | | |
MsgWrite { wKey :: String, wVal :: String } | | |
MsgWriteRes { success :: Bool } | | |
MsgSeeLeader { leader :: ServerID } | | |
MsgShutdown | | |
MsgAppendEntries { | |
term :: Int, | |
leaderID :: ServerID, | |
prevLogIndex :: Int, | |
prevLogTerm :: Int, | |
newEntries :: [LogEntry], | |
leaderCommitIndex :: Int | |
} | | |
MsgAppendEntriesRes { term :: Int, success :: Bool } | | |
MsgRequestVote { | |
term :: Int, | |
candidateID :: ServerID, | |
lastLogIndex :: Int, | |
lastLogTerm :: Int | |
} | | |
MsgRequestVoteRes { | |
term :: Int, | |
voteGranted :: Bool | |
} | |
deriving (Show) | |
emptyLogEntry = LogEntry { eTerm = -1, eIdx = -1, eKey = "", eVal = "" } | |
lastLogEntry [] = emptyLogEntry | |
lastLogEntry entries = last entries | |
getLogEntry entries idx | |
| idx < 0 = emptyLogEntry | |
| idx >= length entries = emptyLogEntry | |
| otherwise = entries !! idx | |
sChan s = (wChan (serverID s)) | |
incrIf x True = x + 1 | |
incrIf x _ = x | |
timedTimeout delay arg expr = do | |
start <- getCurrentTime | |
res <- timeout delay $ expr arg | |
end <- getCurrentTime | |
let remaining = delay - round(1000000 * diffUTCTime end start) | |
return $ case res of | |
Nothing -> (remaining, arg) | |
Just x -> (remaining, x) | |
seconds :: Float -> Int | |
seconds x = round(x * 1000000) | |
waitVotesTimeout = (seconds 2.0) | |
elxnWait = do | |
w <- R.randomRIO (0.0, 0.3) | |
return $ seconds w | |
sendNoRecv sender s msg = do | |
traceIO (" msg: " ++ show sender ++ " -> " ++ show s ++ ": " ++ show msg) | |
C.writeChan (wChan s) ChanMsg { chanMsg=msg, sender=Nothing } | |
sendrecv sender s msg = do | |
traceIO (" msg: " ++ show sender ++ " -> " ++ show s ++ ": " ++ show msg) | |
C.writeChan (wChan s) ChanMsg { chanMsg=msg, sender=Just sender } | |
broadcast srv msg = | |
sequence_ [ sendrecv (serverID srv) s msg | s <- (serverPeers srv) ] | |
handleAppendEntries srv msg sender = do | |
let MsgAppendEntries { term, leaderID, prevLogIndex, prevLogTerm, newEntries } = msg | |
let msgLogEntry = getLogEntry (logEntries srv) prevLogIndex | |
let isConflict = (eTerm msgLogEntry) /= prevLogTerm | |
let shouldReject = term < (currentTerm srv) || | |
isConflict | |
-- TODO: | |
-- - Handle leaderCommitIndex | |
-- - Only append entries which aren't already in the log | |
let newEntries = (if isConflict | |
then take prevLogIndex (logEntries srv) | |
else (logEntries srv)) ++ newEntries | |
sendNoRecv srv sender MsgAppendEntriesRes { | |
term=(currentTerm srv), | |
success=not shouldReject | |
} | |
return $ if shouldReject then srv else srv { | |
serverRole=Follower, | |
serverLeader=Just sender, | |
votedFor=Nothing, | |
logEntries=newEntries | |
} | |
handleRequestVote srv msg sender = do | |
let lastEntry = lastLogEntry (logEntries srv) | |
let MsgRequestVote { term, candidateID, lastLogIndex, lastLogTerm } = msg | |
let shouldReject = term < (currentTerm srv) || | |
((votedFor srv) /= Nothing && (votedFor srv) /= (Just sender)) || | |
lastLogIndex < (eIdx lastEntry) || | |
lastLogTerm < (eTerm lastEntry) | |
sendNoRecv srv sender MsgRequestVoteRes { | |
term=(currentTerm srv), | |
voteGranted=not shouldReject | |
} | |
return $ if shouldReject then srv else srv { | |
votedFor=Just sender | |
} | |
runCandidate oldCfg = do | |
let srv = oldCfg { | |
serverRole=Candidate, | |
currentTerm=(currentTerm oldCfg) + 1 | |
} | |
wait <- elxnWait | |
traceIO (show srv ++ ": now a candidate (timeout: " ++ show wait ++ ")") | |
srv <- _candidateLeaderWait srv wait | |
case (serverRole srv) of | |
Candidate -> _candidateElectSelf srv | |
_ -> return srv | |
_candidateElectSelf srv = do | |
traceIO (show srv ++ ": nominating self as leader") | |
let lastEntry = lastLogEntry (logEntries srv) | |
ok <- broadcast srv MsgRequestVote { | |
term = (currentTerm srv), | |
candidateID = (serverID srv), | |
lastLogIndex = (eIdx lastEntry), | |
lastLogTerm = (eTerm lastEntry) | |
} | |
srv <- _candidateWaitVotes srv | |
case (serverRole srv) of | |
Candidate -> | |
trace (show srv ++ ": election failed, trying again") | |
runCandidate srv | |
Leader -> do | |
return srv | |
_ -> error ("uh oh invalid server state: " ++ show srv) | |
_candidateLeaderWait srv remaining | |
| remaining <= 0 = return srv | |
| (serverRole srv) /= Candidate = return srv | |
| otherwise = do | |
(nextRem, srv) <- timedTimeout remaining srv _candidateLeaderWaitRead | |
_candidateLeaderWait srv nextRem | |
_candidateLeaderWaitRead srv = do | |
ChanMsg { chanMsg=msg, sender } <- C.readChan (sChan srv) | |
let actualSender = case sender of | |
Just x -> x | |
_ -> error "uh oh: message didn't come with a sender" | |
case msg of | |
MsgRequestVote { .. } -> do | |
handleRequestVote srv msg actualSender | |
MsgAppendEntries { .. } -> | |
handleAppendEntries srv msg actualSender | |
_ -> error ("uh oh bad message waiting for a leader: " ++ show msg) | |
_candidateWaitVotes :: ServerState -> IO ServerState | |
_candidateWaitVotes srv = do | |
res <- timeout waitVotesTimeout $ _waitVotes srv (wChan (serverID srv)) 1 | |
return $ case res of | |
Just x -> x | |
_ -> srv | |
_waitVotes :: ServerState -> C.Chan ChanMsg -> Int -> IO ServerState | |
_waitVotes srv chan voteCount | |
| voteCount >= (1 + (length $ serverPeers srv)) `div` 2 + 1 = | |
trace (show srv ++ ": now the leader with " ++ show voteCount ++ " votes!") | |
return srv { | |
serverRole=Leader, | |
votedFor=Nothing | |
} | |
| otherwise = do | |
cMsg <- C.readChan chan | |
let msg = (chanMsg cMsg) | |
case msg of | |
MsgRequestVoteRes { term, voteGranted } -> | |
case term `compare` (currentTerm srv) of | |
LT -> | |
trace ("old term:" ++ show term) | |
_waitVotes srv chan voteCount | |
EQ -> | |
_waitVotes srv chan (incrIf voteCount voteGranted) | |
GT -> | |
trace ("new term:" ++ show term) | |
return srv { | |
serverRole=Follower, | |
serverLeader=Nothing | |
} | |
_ -> error ("uh oh bad message waiting for votes: " ++ show msg) | |
runFollower srv = do | |
res <- timeout waitVotesTimeout $ followerWaitRPC srv | |
case res of | |
Just x -> runFollower x | |
_ -> runCandidate srv | |
followerWaitRPC srv = do | |
ChanMsg { chanMsg=msg, sender } <- C.readChan (sChan srv) | |
let actualSender = case sender of | |
Just x -> x | |
_ -> error "uh oh: message didn't come with a sender" | |
let actualLeader = case (serverLeader srv) of | |
Just x -> x | |
_ -> error "uh oh: needed a server leader but we don't have one yet" | |
case msg of | |
MsgAppendEntries { .. } -> | |
handleAppendEntries srv msg actualSender | |
MsgRead { .. } -> do | |
sendNoRecv srv actualSender MsgSeeLeader { leader=actualLeader } | |
return srv | |
MsgWrite { .. } -> do | |
sendNoRecv srv actualSender MsgSeeLeader { leader=actualLeader } | |
return srv | |
runLeader srv = do | |
let lastEntry = lastLogEntry (logEntries srv) | |
broadcast srv MsgAppendEntries { | |
term=(currentTerm srv), | |
leaderID=(serverID srv), | |
prevLogIndex=(eIdx lastEntry), | |
prevLogTerm=(eTerm lastEntry), | |
newEntries=[], | |
leaderCommitIndex=(commitIndex srv) | |
} | |
let loop = \srv -> do | |
res <- timeout (waitVotesTimeout `div` 2) $ leaderWaitRPC srv | |
case res of | |
Just x -> loop x | |
_ -> runLeader srv | |
loop srv | |
leaderWaitRPC srv = do | |
ChanMsg { chanMsg=msg, sender } <- C.readChan (sChan srv) | |
let actualSender = case sender of | |
Just x -> x | |
_ -> error "uh oh: message didn't come with a sender" | |
case msg of | |
MsgAppendEntries { .. } -> | |
handleAppendEntries srv msg actualSender | |
MsgAppendEntriesRes { .. } -> do | |
return srv | |
MsgRead { rKey } -> do | |
sendNoRecv srv actualSender MsgReadResSuccess { | |
rKey=rKey, | |
rVal=Just "42" | |
} | |
return srv | |
MsgWrite { .. } -> do | |
sendNoRecv srv actualSender MsgWriteRes { success=True } | |
return srv | |
runServer srv = do | |
traceIO ("Starting " ++ show srv) | |
serverLoop srv | |
serverLoop srv = do | |
next <- case (serverRole srv) of | |
Candidate -> runCandidate srv | |
Leader -> runLeader srv | |
Follower -> runFollower srv | |
Shutdown -> error "Done" | |
serverLoop next | |
newServerID :: String -> IO ServerID | |
newServerID name = do | |
w <- C.newChan | |
return ServerID { | |
serverName=name, | |
wChan=w | |
} | |
newServerState serverID peers = | |
ServerState { | |
serverID=serverID, | |
serverPeers=peers, | |
serverRole=Candidate, | |
serverLeader=Nothing, | |
currentTerm=0, | |
votedFor=Nothing, | |
logEntries=[], | |
commitIndex=0, | |
lastAppliedIndex=0 | |
} | |
randomChoice l = do | |
idx <- R.randomRIO (0, length l) | |
return $ l !! idx | |
makeServers n = do | |
let excluding xs exclude = [ x | x <- xs, x /= exclude ] | |
peers <- sequence $ [ newServerID ("Server" ++ show x) | x <- [1..n] ] | |
return $ [ newServerState id (excluding peers id) | id <- peers ] | |
runQueries :: [ServerID] -> Int -> IO () | |
runQueries serverIDs n | |
| n == 0 = do | |
let msg = ChanMsg { chanMsg=MsgShutdown, sender=Nothing } | |
sequence_ [ C.writeChan (wChan s) msg | s <- serverIDs ] | |
| otherwise = do | |
let loop = runQueries serverIDs (n - 1) | |
traceIO ("Querier starting with n = " ++ show n) | |
x <- threadDelay (seconds 0.5) | |
traceIO ("Querier 2! " ++ show x) | |
key <- randomChoice ["a", "b", "c", "d", "e"] | |
traceIO ("Querier 3! " ++ show key) | |
val <- (\x -> show (x::Int)) `fmap` R.randomRIO (1, 100) | |
traceIO ("Querier 4! " ++ show val) | |
msg <- randomChoice [ MsgRead { rKey=key }, | |
MsgRead { rKey=key }, | |
MsgWrite { wKey=key, wVal=val } ] | |
traceIO ("Querier 5! " ++ show msg) | |
target <- randomChoice serverIDs | |
traceIO ("Querier sending A -> " ++ show target) | |
res <- _sendQuery msg target | |
case res of | |
Just x -> do | |
traceIO ("Querier sending B...") | |
res <- _sendQuery msg x | |
case res of | |
Just x -> | |
trace ("wtf? got another redirect?") | |
loop | |
_ -> loop | |
_ -> loop | |
_sendQuery msg target = do | |
traceIO ("HERE1") | |
chan <- C.newChan | |
traceIO ("HERE2") | |
let sender = ServerID { serverName="dummy", wChan=chan } | |
traceIO ("sending: " ++ show target ++ " <- " ++ show msg) | |
sendrecv sender target msg | |
res <- timeout (seconds 0.5) $ C.readChan chan | |
case res of | |
Nothing -> | |
trace "timeout sending!" | |
return Nothing | |
Just ChanMsg { chanMsg=MsgSeeLeader { leader } } -> | |
return $ Just leader | |
Just ChanMsg { chanMsg=MsgReadResSuccess { .. } } -> | |
return Nothing | |
Just ChanMsg { chanMsg=MsgWriteRes { .. } } -> | |
return Nothing | |
spawn :: IO () -> IO (IO ()) | |
spawn io = do | |
mvar <- newEmptyMVar | |
forkFinally io (\_ -> putMVar mvar ()) | |
return $ takeMVar mvar | |
main :: IO () | |
main = do | |
servers <- makeServers 2 | |
sThreads <- sequence [ spawn $ runServer s | s <- servers ] | |
qThread <- spawn $ runQueries [ (serverID s) | s <- servers ] 10 | |
ok <- qThread | |
sequence_ sThreads |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment