Created
August 29, 2023 09:39
-
-
Save voidlizard/0cf88d9db6681d1e407ad3c5dd90797c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# Language TemplateHaskell #-} | |
{-# Language AllowAmbiguousTypes #-} | |
{-# Language UndecidableInstances #-} | |
{-# Language MultiWayIf #-} | |
module QBLF.Proto where | |
import HBS2.Prelude.Plated | |
import HBS2.System.Logger.Simple | |
import HBS2.Clock | |
import HBS2.Hash | |
import Control.Applicative | |
import Control.Concurrent.STM (flushTQueue) | |
import Control.Monad | |
import Control.Monad.Trans.Maybe | |
import Data.Either | |
import Data.Function | |
import Data.HashMap.Strict (HashMap) | |
import Data.HashMap.Strict qualified as HashMap | |
import Data.HashSet (HashSet) | |
import Data.HashSet qualified as HashSet | |
import Data.Kind | |
import Data.List qualified as List | |
import Data.Map qualified as Map | |
import Data.Maybe | |
import Data.Ord | |
import Data.Tuple (swap) | |
import Lens.Micro.Platform | |
import System.Random (randomRIO) | |
import UnliftIO | |
{- HLINT ignore "Use newtype instead of data" -} | |
data QBLFMessage w = | |
QBLFMsgAnn (QBLFActor w) (QBLFAnnounce w) | |
| QBLFMsgMerge (QBLFActor w ) (QBLFMerge w) | |
| QBLFMsgCommit (QBLFActor w) (QBLFCommit w) | |
| QBLFMsgHeartBeat (QBLFActor w) QBLFStateN (QBLFState w) | |
deriving stock Generic | |
data QBLFAnnounce w = | |
QBLFAnnounce (QBLFState w) (QBLFState w) | |
deriving stock (Generic) | |
data QBLFMerge w = | |
QBLFMerge (QBLFState w) (QBLFState w) | |
deriving stock Generic | |
data QBLFCommit w = | |
QBLFCommit (QBLFState w) (QBLFState w) | |
deriving stock Generic | |
data QBLFStateN = | |
QWait | |
| QAnnounce | |
| QMerge | |
| QCommit | |
deriving stock (Eq,Ord,Enum,Show) | |
type ForQBLF w = ( Hashed HbSync (QBLFState w) | |
, Hashable (QBLFState w) | |
, Hashable (QBLFTransaction w) | |
, Hashable (QBLFActor w) | |
, Hashable (QBLFAnnounce w) | |
, Hashable (QBLFMerge w) | |
, Pretty (QBLFActor w) | |
, Pretty (QBLFState w) | |
, Eq (QBLFState w) | |
) | |
deriving instance ForQBLF w => Eq (QBLFAnnounce w) | |
deriving instance ForQBLF w => Eq (QBLFMerge w) | |
instance ForQBLF w => Hashable (QBLFAnnounce w) | |
instance ForQBLF w => Hashable (QBLFMerge w) | |
class (Monad m, ForQBLF w) => IsQBLF w m where | |
type QBLFActor w :: Type | |
type QBLFTransaction w :: Type | |
type QBLFState w :: Type | |
qblfNewState :: [QBLFTransaction w] -> m (QBLFState w) | |
qblfBroadCast :: QBLFMessage w -> m () | |
qblfMerge :: QBLFState w -> QBLFState w -> QBLFState w -> m (QBLFState w) | |
qblfCommit :: QBLFState w -> QBLFState w -> m () | |
qblfMoveForward :: QBLFState w -> QBLFState w -> m Bool | |
qblfMoveForward _ _ = pure True | |
data QBLF w = | |
QBLF | |
{ _qblfSelf :: QBLFActor w | |
, _qblfAllActors :: HashSet (QBLFActor w) | |
, _qblfState :: QBLFStateN | |
, _qblfCurrent :: QBLFState w | |
, _qblfWaitAnnounce :: Timeout 'Seconds | |
, _qblfCommitsFrom :: HashSet (QBLFActor w) | |
, _qblfTranQ :: TVar (HashSet (QBLFTransaction w)) | |
, _qblfAlive :: TVar (HashMap (QBLFActor w) (QBLFStateN, QBLFState w, TimeSpec)) | |
, _qblfStateTime :: TVar TimeSpec | |
, _qblfLastHeartBeat :: TVar TimeSpec | |
, _qblfAnnounces :: TVar (HashMap (QBLFActor w, QBLFAnnounce w) TimeSpec) | |
, _qblfMerges :: TVar (HashMap (QBLFActor w, QBLFMerge w) TimeSpec) | |
} | |
makeLenses ''QBLF | |
qblfGetActor :: (ForQBLF w) => QBLFMessage w -> QBLFActor w | |
qblfGetActor = \case | |
QBLFMsgAnn a _ -> a | |
QBLFMsgMerge a _ -> a | |
QBLFMsgCommit a _ -> a | |
QBLFMsgHeartBeat a _ _ -> a | |
qblfEnqueue :: (ForQBLF w, MonadIO m) => QBLF w -> QBLFTransaction w -> m () | |
qblfEnqueue me tran = do | |
-- synced <- qblfIsSynced me | |
-- when synced do | |
atomically $ modifyTVar (view qblfTranQ me) (HashSet.insert tran) | |
qblfAcceptMessage :: (ForQBLF w, MonadIO m) => QBLF w -> QBLFMessage w -> m () | |
qblfAcceptMessage me msg = do | |
-- FIXME: drop-premature-announces | |
let actor = qblfGetActor msg | |
when (actor `HashSet.member` view qblfAllActors me) do | |
now <- getTimeCoarse | |
case msg of | |
QBLFMsgAnn a ann -> do | |
atomically $ modifyTVar (view qblfAnnounces me) (HashMap.insert (a, ann) now) | |
QBLFMsgMerge a m@(QBLFMerge _ _) -> do | |
atomically $ modifyTVar (view qblfMerges me) (HashMap.insert (a, m) now) | |
QBLFMsgCommit a (QBLFCommit _ s) -> do | |
atomically $ modifyTVar (view qblfAlive me) (HashMap.insert a (QWait,s,now)) | |
QBLFMsgHeartBeat a t s -> do | |
-- debug $ "heartbeat" <+> pretty (view qblfSelf me) <+> pretty (a, s) | |
atomically $ modifyTVar (view qblfAlive me) (HashMap.insert a (t,s,now)) | |
qblfQuorum :: forall w a m . (ForQBLF w, IsQBLF w m, MonadUnliftIO m, Integral a) | |
=> QBLF w | |
-> m a | |
qblfQuorum me = do | |
-- n <- qblfLastAlive me | |
-- pure $ fromIntegral $ 1 + (n `div` 2) | |
let aliveSz = view qblfAllActors me & List.length | |
pure $ max 1 $ round $ realToFrac (aliveSz + 1) / 2 | |
qblfLastAlive :: (ForQBLF w, IsQBLF w m, MonadUnliftIO m) => QBLF w -> m Int | |
qblfLastAlive me = pure 0 | |
-- q <- qblfQuorum me | |
-- n <- atomically $ readTVar (view qblfAlive me) <&> HashMap.toList <&> length | |
-- if n > 0 then | |
-- pure n | |
-- else | |
-- pure q | |
qblfInit :: forall w m . (ForQBLF w, IsQBLF w m, MonadUnliftIO m) | |
=> QBLFActor w -- ^ self | |
-> [QBLFActor w] -- ^ all actors | |
-> QBLFState w -- ^ initial state | |
-> Timeout 'Seconds -- ^ timeout | |
-> m (QBLF w) | |
qblfInit self actors s0 w = | |
QBLF self | |
(HashSet.fromList actors) | |
QWait | |
s0 | |
w | |
mempty | |
<$> newTVarIO mempty | |
<*> newTVarIO mempty | |
<*> (newTVarIO =<< now) | |
<*> newTVarIO 0 | |
<*> newTVarIO mempty | |
<*> newTVarIO mempty | |
where | |
now = getTimeCoarse | |
qblfNextCommitTime :: (MonadIO f, Real a) => a -> f TimeSpec | |
qblfNextCommitTime ww = do | |
let wt = realToFrac ww | |
t0 <- getTimeCoarse | |
dt <- liftIO $ randomRIO (wt/2, wt) | |
pure $ fromNanoSecs $ toNanoSecs t0 + round (realToFrac dt * 1e9) | |
-- qblfGetState :: (ForQBLF w, MonadUnliftIO m) => QBLF w -> m QBLFStateN | |
-- qblfGetState q = readTVarIO (view qblfState q) | |
qblfRun :: forall w m . ( Pretty (QBLFActor w) | |
, Pretty (QBLFState w) | |
, ForQBLF w | |
, IsQBLF w m | |
, MonadUnliftIO m | |
) => QBLF w -> m () | |
qblfRun me = do | |
forever $ do | |
void $ qblfTo me QWait | |
warn "QUIT!!!" | |
-- mapM_ wait [a1,hb] | |
-- mapM_ wait [a1] | |
-- waitAnyCatchCancel [] | |
where | |
tAlive = 5 | |
minHeartBeat = round 5e9 | |
sendHeartBeat :: IsQBLF w m => QBLF w -> m () | |
sendHeartBeat s = do | |
now <- getTimeCoarse | |
sent <- readTVarIO (_qblfLastHeartBeat s) | |
when (toNanoSecs (now - sent) > minHeartBeat) do | |
qblfBroadCast @w $ QBLFMsgHeartBeat (view qblfSelf s) (view qblfState s) (view qblfCurrent s) | |
atomically $ writeTVar (_qblfLastHeartBeat s) now | |
sendAnnounce :: IsQBLF w m => QBLF w -> QBLFState w -> m () | |
sendAnnounce s sx = do | |
qblfBroadCast @w (QBLFMsgAnn self (QBLFAnnounce current sx)) | |
where | |
self = view qblfSelf s | |
current = view qblfCurrent s | |
sendMerge :: IsQBLF w m => QBLF w -> QBLFState w -> m () | |
sendMerge s sx = do | |
qblfBroadCast @w (QBLFMsgMerge self (QBLFMerge current sx)) | |
where | |
self = view qblfSelf s | |
current = view qblfCurrent s | |
sendCommit :: IsQBLF w m => QBLF w -> QBLFState w -> m () | |
sendCommit s sx = do | |
qblfBroadCast @w (QBLFMsgCommit self (QBLFCommit current sx)) | |
where | |
self = view qblfSelf s | |
current = view qblfCurrent s | |
nap = pause @'Seconds 0.25 | |
getAlive :: IsQBLF w m => QBLF w -> m [(QBLFActor w, QBLFStateN, QBLFState w) ] | |
getAlive s = do | |
now <- getTimeCoarse | |
states <- readTVarIO (view qblfAlive s) <&> HashMap.toList | |
pure [ (a,n,sx) | (a, (n,sx,t)) <- states, toNanoSecs (now - t) < round (2 * tAlive * 1e9) ] | |
qblfTo s n = do | |
now <- getTimeCoarse | |
let ns = s { _qblfState = n } | |
atomically $ writeTVar (_qblfStateTime ns) now | |
case n of | |
QWait -> qblfToWait ns | |
QAnnounce -> qblfToAnnounce ns | |
QMerge -> qblfToMerge ns | |
QCommit -> qblfToCommit ns | |
qblfToWait s = do | |
let w = view qblfWaitAnnounce s | |
q <- qblfQuorum s | |
fix \next -> do | |
sendHeartBeat s | |
now <- getTimeCoarse | |
alive <- getAlive s | |
let wn = sum [ 1 | (_, QWait, _) <- alive ] | |
-- debug $ logActor s <+> "wait" <+> pretty wn | |
t0 <- readTVarIO (view qblfStateTime s) | |
let elapsed = toNanoSeconds $ TimeoutTS (now - t0) | |
their <- selectState s (fmap (view _3) alive) | |
let mine = view qblfCurrent s | |
let g = if | their /= Just mine -> do | |
case their of | |
Nothing -> pure $ nap >> next | |
Just th -> do | |
-- FIXME: what-if-failed | |
forwarded <- qblfMoveForward @w mine th | |
if forwarded then do | |
debug $ logActor s <+> "DO FAST-FORWARD" <+> pretty th | |
pure $ qblfTo (set qblfCurrent th s) QAnnounce | |
else do | |
pure $ nap >> next | |
| wn >= q && elapsed > toNanoSeconds w && Just mine == their -> do | |
debug $ logActor s <+> "ready" <+> pretty their | |
pure $ qblfTo s QAnnounce | |
| otherwise -> pure $ nap >> next | |
join g | |
qblfToAnnounce s = do | |
let mine = view qblfCurrent s | |
q <- qblfQuorum s | |
let wa = 2 * view qblfWaitAnnounce s | |
-- TODO: extract-method | |
txs <- atomically do | |
tx <- readTVar (view qblfTranQ s) <&> HashSet.toList | |
writeTVar (view qblfTranQ s) mempty | |
pure tx | |
hx <- qblfNewState @w txs | |
sendAnnounce s hx | |
g <- race ( pause wa ) do | |
fix \next -> do | |
ann0 <- atomically $ readTVar (view qblfAnnounces s) <&> HashMap.keys | |
let ann = [ s1 | (_, QBLFAnnounce s0 s1) <- ann0, s0 == mine ] | |
if length ann >= q then do | |
debug $ logActor s <+> "announce/ready-to-merge" <+> pretty (view qblfCurrent s) <+> pretty (length ann) | |
pure $ qblfTo s QMerge | |
else do | |
nap >> next | |
case g of | |
Left{} -> qblfTo s QWait | |
Right n -> n | |
qblfToMerge s = do | |
let mine = view qblfCurrent s | |
q <- qblfQuorum s | |
pause @'Seconds 2 | |
debug $ logActor s <+> "merge" | |
ann0 <- atomically $ readTVar (view qblfAnnounces s) <&> HashMap.keys | |
let aann = [ (a, s1) | (a, QBLFAnnounce s0 s1) <- ann0, s0 == mine ] | |
let ann = fmap snd aann | |
let actors = fmap fst aann & HashSet.fromList | |
let s0 = headMay ann | |
let sx = tail ann | |
let g = case s0 of | |
Nothing -> pure $ qblfTo s QWait | |
Just ss0 -> do | |
new <- foldM (qblfMerge @w mine) ss0 sx | |
sendMerge s new | |
pure $ qblfTo (set qblfCommitsFrom actors s) QCommit | |
join g | |
qblfToCommit s = do | |
let mine = view qblfCurrent s | |
let authors = view qblfCommitsFrom s | |
-- pause @'Seconds 2 | |
-- debug $ logActor s <+> "commit" | |
-- FIXME: timeout-and-rollback-to-wait | |
let wa = 2 * view qblfWaitAnnounce s | |
r <- race ( pause wa ) do | |
fix \next -> do | |
merges0 <- atomically $ readTVar (view qblfMerges s) <&> HashMap.keys | |
let merges = [ s1 | (a, QBLFMerge s0 s1) <- merges0, s0 == mine, a `HashSet.member` authors ] | |
mbNew <- selectState s merges | |
case mbNew of | |
Just new -> do | |
debug $ logActor s <+> "commit: " <+> pretty new | |
sendCommit s new | |
qblfCommit @w mine new | |
pure $ qblfTo ( set qblfCurrent new s) QWait | |
Nothing -> do | |
-- debug $ logActor s <+> "commit: " <+> "fail" | |
nap >> next | |
case r of | |
Left{} -> qblfTo s QWait | |
Right n -> n | |
selectState :: IsQBLF w m => QBLF w -> [QBLFState w] -> m (Maybe (QBLFState w)) | |
selectState s sx = do | |
q <- qblfQuorum s | |
let mbs = fmap (,1) sx & HashMap.fromListWith (+) | |
& HashMap.toList | |
& fmap (over _2 List.singleton . swap) | |
& Map.fromListWith (<>) | |
& Map.toDescList | |
& headMay | |
runMaybeT do | |
ss <- MaybeT $ pure mbs | |
let sss = over _2 (List.sortOn (Down . hashObject @HbSync)) ss :: (Integer, [QBLFState w]) | |
if fst sss >= q then do | |
MaybeT $ pure $ headMay (snd sss) | |
else | |
mzero | |
logActor s = "ACTOR" <> parens (pretty (view qblfSelf s)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment