Skip to content

Instantly share code, notes, and snippets.

@wolever
Last active December 29, 2015 22:17
Show Gist options
  • Save wolever/a97c4e8f6f3e8777170f to your computer and use it in GitHub Desktop.
Save wolever/a97c4e8f6f3e8777170f to your computer and use it in GitHub Desktop.
Playing with implementing RAFT in Haskell (probably terrible)
*.class
*.o
*.pyc
*.sqlite3
*.sw[a-z]
*~
.DS_Store
bin-debug/*
bin-release/*
bin/*
tags
*.beam
*.dump
env/
.env/
*egg-info*
misc/
dist/
Icon?
node_modules/
raft: raft.hs
@echo "================================================================"
@echo "================================================================"
@echo "================================================================"
@echo "================================================================"
ghc -threaded -prof -auto-all -caf-all raft.hs
#./raft +RTS -xc
./raft +RTS -N4
watch:
watchman-make -p *.hs -t raft
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE DisambiguateRecordFields #-}
import Debug.Trace
-- import Control.Lens
import System.Timeout
import Control.Concurrent.MVar
import qualified System.Random as R
import qualified Control.Concurrent.Chan as C
import Control.Concurrent ( forkIO, forkFinally, threadDelay )
import Data.Time.Clock ( getCurrentTime, diffUTCTime )
data ChanMsg = ChanMsg {
chanMsg :: Message,
sender :: Maybe ServerID
} deriving (Show);
data ServerID = ServerID {
serverName :: String,
wChan :: C.Chan ChanMsg
} deriving (Eq)
instance Show ServerID where
show s = "<" ++ (serverName s) ++ ">"
data ServerRole =
Leader |
Follower |
Candidate |
Shutdown
deriving (Show, Eq)
data LogEntry = LogEntry {
eTerm :: Int,
eIdx :: Int,
eKey :: String,
eVal :: String
} deriving (Show)
data ServerState = ServerState {
serverID :: ServerID,
serverPeers :: [ServerID],
serverRole :: ServerRole,
serverLeader :: Maybe ServerID,
currentTerm :: Int,
votedFor :: Maybe ServerID,
logEntries :: [LogEntry],
commitIndex :: Int,
lastAppliedIndex :: Int
}
instance Show ServerState where
show s = show (serverID s)
data Message =
MsgRead { rKey :: String } |
MsgReadResSuccess { rKey :: String, rVal :: Maybe String } |
MsgWrite { wKey :: String, wVal :: String } |
MsgWriteRes { success :: Bool } |
MsgSeeLeader { leader :: ServerID } |
MsgShutdown |
MsgAppendEntries {
term :: Int,
leaderID :: ServerID,
prevLogIndex :: Int,
prevLogTerm :: Int,
newEntries :: [LogEntry],
leaderCommitIndex :: Int
} |
MsgAppendEntriesRes { term :: Int, success :: Bool } |
MsgRequestVote {
term :: Int,
candidateID :: ServerID,
lastLogIndex :: Int,
lastLogTerm :: Int
} |
MsgRequestVoteRes {
term :: Int,
voteGranted :: Bool
}
deriving (Show)
emptyLogEntry = LogEntry { eTerm = -1, eIdx = -1, eKey = "", eVal = "" }
lastLogEntry [] = emptyLogEntry
lastLogEntry entries = last entries
getLogEntry entries idx
| idx < 0 = emptyLogEntry
| idx >= length entries = emptyLogEntry
| otherwise = entries !! idx
sChan s = (wChan (serverID s))
incrIf x True = x + 1
incrIf x _ = x
timedTimeout delay arg expr = do
start <- getCurrentTime
res <- timeout delay $ expr arg
end <- getCurrentTime
let remaining = delay - round(1000000 * diffUTCTime end start)
return $ case res of
Nothing -> (remaining, arg)
Just x -> (remaining, x)
seconds :: Float -> Int
seconds x = round(x * 1000000)
waitVotesTimeout = (seconds 2.0)
elxnWait = do
w <- R.randomRIO (0.0, 0.3)
return $ seconds w
sendNoRecv sender s msg = do
traceIO (" msg: " ++ show sender ++ " -> " ++ show s ++ ": " ++ show msg)
C.writeChan (wChan s) ChanMsg { chanMsg=msg, sender=Nothing }
sendrecv sender s msg = do
traceIO (" msg: " ++ show sender ++ " -> " ++ show s ++ ": " ++ show msg)
C.writeChan (wChan s) ChanMsg { chanMsg=msg, sender=Just sender }
broadcast srv msg =
sequence_ [ sendrecv (serverID srv) s msg | s <- (serverPeers srv) ]
handleAppendEntries srv msg sender = do
let MsgAppendEntries { term, leaderID, prevLogIndex, prevLogTerm, newEntries } = msg
let msgLogEntry = getLogEntry (logEntries srv) prevLogIndex
let isConflict = (eTerm msgLogEntry) /= prevLogTerm
let shouldReject = term < (currentTerm srv) ||
isConflict
-- TODO:
-- - Handle leaderCommitIndex
-- - Only append entries which aren't already in the log
let newEntries = (if isConflict
then take prevLogIndex (logEntries srv)
else (logEntries srv)) ++ newEntries
sendNoRecv srv sender MsgAppendEntriesRes {
term=(currentTerm srv),
success=not shouldReject
}
return $ if shouldReject then srv else srv {
serverRole=Follower,
serverLeader=Just sender,
votedFor=Nothing,
logEntries=newEntries
}
handleRequestVote srv msg sender = do
let lastEntry = lastLogEntry (logEntries srv)
let MsgRequestVote { term, candidateID, lastLogIndex, lastLogTerm } = msg
let shouldReject = term < (currentTerm srv) ||
((votedFor srv) /= Nothing && (votedFor srv) /= (Just sender)) ||
lastLogIndex < (eIdx lastEntry) ||
lastLogTerm < (eTerm lastEntry)
sendNoRecv srv sender MsgRequestVoteRes {
term=(currentTerm srv),
voteGranted=not shouldReject
}
return $ if shouldReject then srv else srv {
votedFor=Just sender
}
runCandidate oldCfg = do
let srv = oldCfg {
serverRole=Candidate,
currentTerm=(currentTerm oldCfg) + 1
}
wait <- elxnWait
traceIO (show srv ++ ": now a candidate (timeout: " ++ show wait ++ ")")
srv <- _candidateLeaderWait srv wait
case (serverRole srv) of
Candidate -> _candidateElectSelf srv
_ -> return srv
_candidateElectSelf srv = do
traceIO (show srv ++ ": nominating self as leader")
let lastEntry = lastLogEntry (logEntries srv)
ok <- broadcast srv MsgRequestVote {
term = (currentTerm srv),
candidateID = (serverID srv),
lastLogIndex = (eIdx lastEntry),
lastLogTerm = (eTerm lastEntry)
}
srv <- _candidateWaitVotes srv
case (serverRole srv) of
Candidate ->
trace (show srv ++ ": election failed, trying again")
runCandidate srv
Leader -> do
return srv
_ -> error ("uh oh invalid server state: " ++ show srv)
_candidateLeaderWait srv remaining
| remaining <= 0 = return srv
| (serverRole srv) /= Candidate = return srv
| otherwise = do
(nextRem, srv) <- timedTimeout remaining srv _candidateLeaderWaitRead
_candidateLeaderWait srv nextRem
_candidateLeaderWaitRead srv = do
ChanMsg { chanMsg=msg, sender } <- C.readChan (sChan srv)
let actualSender = case sender of
Just x -> x
_ -> error "uh oh: message didn't come with a sender"
case msg of
MsgRequestVote { .. } -> do
handleRequestVote srv msg actualSender
MsgAppendEntries { .. } ->
handleAppendEntries srv msg actualSender
_ -> error ("uh oh bad message waiting for a leader: " ++ show msg)
_candidateWaitVotes :: ServerState -> IO ServerState
_candidateWaitVotes srv = do
res <- timeout waitVotesTimeout $ _waitVotes srv (wChan (serverID srv)) 1
return $ case res of
Just x -> x
_ -> srv
_waitVotes :: ServerState -> C.Chan ChanMsg -> Int -> IO ServerState
_waitVotes srv chan voteCount
| voteCount >= (1 + (length $ serverPeers srv)) `div` 2 + 1 =
trace (show srv ++ ": now the leader with " ++ show voteCount ++ " votes!")
return srv {
serverRole=Leader,
votedFor=Nothing
}
| otherwise = do
cMsg <- C.readChan chan
let msg = (chanMsg cMsg)
case msg of
MsgRequestVoteRes { term, voteGranted } ->
case term `compare` (currentTerm srv) of
LT ->
trace ("old term:" ++ show term)
_waitVotes srv chan voteCount
EQ ->
_waitVotes srv chan (incrIf voteCount voteGranted)
GT ->
trace ("new term:" ++ show term)
return srv {
serverRole=Follower,
serverLeader=Nothing
}
_ -> error ("uh oh bad message waiting for votes: " ++ show msg)
runFollower srv = do
res <- timeout waitVotesTimeout $ followerWaitRPC srv
case res of
Just x -> runFollower x
_ -> runCandidate srv
followerWaitRPC srv = do
ChanMsg { chanMsg=msg, sender } <- C.readChan (sChan srv)
let actualSender = case sender of
Just x -> x
_ -> error "uh oh: message didn't come with a sender"
let actualLeader = case (serverLeader srv) of
Just x -> x
_ -> error "uh oh: needed a server leader but we don't have one yet"
case msg of
MsgAppendEntries { .. } ->
handleAppendEntries srv msg actualSender
MsgRead { .. } -> do
sendNoRecv srv actualSender MsgSeeLeader { leader=actualLeader }
return srv
MsgWrite { .. } -> do
sendNoRecv srv actualSender MsgSeeLeader { leader=actualLeader }
return srv
runLeader srv = do
let lastEntry = lastLogEntry (logEntries srv)
broadcast srv MsgAppendEntries {
term=(currentTerm srv),
leaderID=(serverID srv),
prevLogIndex=(eIdx lastEntry),
prevLogTerm=(eTerm lastEntry),
newEntries=[],
leaderCommitIndex=(commitIndex srv)
}
let loop = \srv -> do
res <- timeout (waitVotesTimeout `div` 2) $ leaderWaitRPC srv
case res of
Just x -> loop x
_ -> runLeader srv
loop srv
leaderWaitRPC srv = do
ChanMsg { chanMsg=msg, sender } <- C.readChan (sChan srv)
let actualSender = case sender of
Just x -> x
_ -> error "uh oh: message didn't come with a sender"
case msg of
MsgAppendEntries { .. } ->
handleAppendEntries srv msg actualSender
MsgAppendEntriesRes { .. } -> do
return srv
MsgRead { rKey } -> do
sendNoRecv srv actualSender MsgReadResSuccess {
rKey=rKey,
rVal=Just "42"
}
return srv
MsgWrite { .. } -> do
sendNoRecv srv actualSender MsgWriteRes { success=True }
return srv
runServer srv = do
traceIO ("Starting " ++ show srv)
serverLoop srv
serverLoop srv = do
next <- case (serverRole srv) of
Candidate -> runCandidate srv
Leader -> runLeader srv
Follower -> runFollower srv
Shutdown -> error "Done"
serverLoop next
newServerID :: String -> IO ServerID
newServerID name = do
w <- C.newChan
return ServerID {
serverName=name,
wChan=w
}
newServerState serverID peers =
ServerState {
serverID=serverID,
serverPeers=peers,
serverRole=Candidate,
serverLeader=Nothing,
currentTerm=0,
votedFor=Nothing,
logEntries=[],
commitIndex=0,
lastAppliedIndex=0
}
randomChoice l = do
idx <- R.randomRIO (0, length l)
return $ l !! idx
makeServers n = do
let excluding xs exclude = [ x | x <- xs, x /= exclude ]
peers <- sequence $ [ newServerID ("Server" ++ show x) | x <- [1..n] ]
return $ [ newServerState id (excluding peers id) | id <- peers ]
runQueries :: [ServerID] -> Int -> IO ()
runQueries serverIDs n
| n == 0 = do
let msg = ChanMsg { chanMsg=MsgShutdown, sender=Nothing }
sequence_ [ C.writeChan (wChan s) msg | s <- serverIDs ]
| otherwise = do
let loop = runQueries serverIDs (n - 1)
traceIO ("Querier starting with n = " ++ show n)
x <- threadDelay (seconds 0.5)
traceIO ("Querier 2! " ++ show x)
key <- randomChoice ["a", "b", "c", "d", "e"]
traceIO ("Querier 3! " ++ show key)
val <- (\x -> show (x::Int)) `fmap` R.randomRIO (1, 100)
traceIO ("Querier 4! " ++ show val)
msg <- randomChoice [ MsgRead { rKey=key },
MsgRead { rKey=key },
MsgWrite { wKey=key, wVal=val } ]
traceIO ("Querier 5! " ++ show msg)
target <- randomChoice serverIDs
traceIO ("Querier sending A -> " ++ show target)
res <- _sendQuery msg target
case res of
Just x -> do
traceIO ("Querier sending B...")
res <- _sendQuery msg x
case res of
Just x ->
trace ("wtf? got another redirect?")
loop
_ -> loop
_ -> loop
_sendQuery msg target = do
traceIO ("HERE1")
chan <- C.newChan
traceIO ("HERE2")
let sender = ServerID { serverName="dummy", wChan=chan }
traceIO ("sending: " ++ show target ++ " <- " ++ show msg)
sendrecv sender target msg
res <- timeout (seconds 0.5) $ C.readChan chan
case res of
Nothing ->
trace "timeout sending!"
return Nothing
Just ChanMsg { chanMsg=MsgSeeLeader { leader } } ->
return $ Just leader
Just ChanMsg { chanMsg=MsgReadResSuccess { .. } } ->
return Nothing
Just ChanMsg { chanMsg=MsgWriteRes { .. } } ->
return Nothing
spawn :: IO () -> IO (IO ())
spawn io = do
mvar <- newEmptyMVar
forkFinally io (\_ -> putMVar mvar ())
return $ takeMVar mvar
main :: IO ()
main = do
servers <- makeServers 2
sThreads <- sequence [ spawn $ runServer s | s <- servers ]
qThread <- spawn $ runQueries [ (serverID s) | s <- servers ] 10
ok <- qThread
sequence_ sThreads
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment