Last active
June 21, 2023 18:20
-
-
Save Horusiath/84fac596101b197da0546d1697580d99 to your computer and use it in GitHub Desktop.
Plumtree + Hyparview implementation in F#
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Protocols | |
open System | |
open System.Runtime.ExceptionServices | |
type Endpoint = string | |
type TTL = int | |
type Binary = byte[] | |
type MessageId = Guid | |
type Round = uint64 | |
type Config = | |
{ LocalEndpoint: Endpoint // identifier of current node | |
ActiveRandomWalkLength: int // init TTL for `Neighbor` message | |
PassiveRandomWalkLength: int // TTL threshold after which Neighbor may result in passive state addition | |
ActiveViewCapacity: int // max allowed size of active view | |
PassiveViewCapacity: int // max allowed size of passive view | |
ShuffleTTL: int // init TTL for `Shuffle` message | |
ShuffleActiveViewCount: int // no. of active peers included in `Shuffle` message | |
ShufflePassiveViewCount: int // no. of passive peers included in `Shuffle` message | |
ShuffleInterval: TimeSpan } // time interval in which shuffling procedure is started | |
static member Create(endpoint) = { Config.Default with LocalEndpoint = endpoint } | |
static member Default = | |
{ LocalEndpoint = "localhost:5000" | |
ActiveRandomWalkLength = 5 | |
PassiveRandomWalkLength = 2 | |
ActiveViewCapacity = 4 | |
PassiveViewCapacity = 24 | |
ShuffleTTL = 2 | |
ShuffleActiveViewCount = 2 | |
ShufflePassiveViewCount = 2 | |
ShuffleInterval = TimeSpan.FromSeconds 60. } | |
type Shuffle = | |
{ Origin: Endpoint // the original initiator of Shuffle request | |
Nodes: Set<Endpoint> // shuffled nodes that being exchanged | |
Ttl: TTL } // remaining time to live | |
[<Struct>] | |
type Gossip = | |
{ Id: MessageId | |
Round: Round | |
Data: Binary } | |
type Message = | |
/// Initial request to join to given cluster. | |
| Join | |
/// Upon Join, ForwardJoin is gossiped over the cluster to introduce `peer` to other cluster members. | |
| ForwardJoin of peer:Endpoint * ttl:TTL | |
/// Message that triggers shuffle procedure. | |
| DoShuffle | |
/// Shuffle request, that is used to refresh and exchange passive view with other nodes. | |
| Shuffle of Shuffle | |
/// Shuffle reply, that contains exchanged nodes. | |
| ShuffleReply of Set<Endpoint> | |
/// Request to add sender to an active view of recipient. If `highPriority` is set, it cannot be denied. | |
| Neighbor of highPriority:bool | |
/// Disconnect request. If `alive` is set, sender can safely be added to passive set for future reconnections. | |
/// If `response` is set, recipient should answer with its own `Disconnect` (with respond=false) as well. | |
| Disconnect of alive:bool * respond:bool | |
/// Message with metadata to be broadcasted over the cluster. Metadata is necessary to identify which nodes | |
/// may have missed it and to optimize too long broadcast tree branches. | |
| Gossip of Gossip | |
/// Prune is used to sever tree link between two peers, moving them from eager to lazy sets. | |
| Prune | |
/// A batched set of the recently broadcasted messages. It's periodically sent to lazy peers, to identify if their | |
/// connections have not been severed. If that happens they'll try to reestablish connection. | |
| IHave of struct(MessageId*Round)[] | |
/// Send to trigger grafts to be send for missing messages. | |
| Timer of MessageId | |
/// Send when a missing broadcast has been detected. | |
| Graft of MessageId * Round | |
/// Created by the client, it's like `Gossip` but missing metadata that are internal knowledge of current peer. | |
| Broadcast of Binary | |
/// Send to self to trigger reaction to new active peer showing up. | |
| Up | |
/// Send to self to trigger reaction to active peer shutting down up. | |
| Down | |
/// Send to self periodically, to trigger emptying procedure on messages scheduled in lazy queue to be sent to corresponding lazy peers. | |
| Dispatch | |
member this.CanInit = | |
match this with | |
| Disconnect _ | DoShuffle -> false | |
| _ -> true | |
[<Struct>] | |
type Envelope = { Peer: Endpoint; Message: Message } | |
type PeerEvent = | |
| NeighborUp of Endpoint // Notifies that new peer has been added to active set. | |
| NeighborDown of Endpoint // Notifies that existing peer has stepped down from active set. | |
| Received of Binary | |
[<Interface>] | |
type Reactor = | |
/// Sends a `message` to a `target` node. If current and `target` nodes where not connected so far, | |
/// it will establish connection first. | |
abstract Send: target:Endpoint * envelope:Envelope -> Async<unit> | |
/// Disconnects from previously connected node (connection happens on initial `this.Send` request). | |
abstract Disconnect: Endpoint -> Async<unit> | |
/// Sends a peer event notification. | |
abstract Notify: PeerEvent -> unit | |
[<Struct>] | |
type Announcement = { Sender: Endpoint; MessageId: MessageId; Round: Round } | |
/// Reference: https://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf | |
module Hyparview = | |
type State = | |
{ ActiveView: Set<Endpoint> | |
PassiveView: Set<Endpoint> | |
Config: Config | |
Random: Random | |
Output: Reactor } | |
static member Create (config: Config) (out: Reactor) (random: Random) = | |
{ ActiveView = Set.empty | |
PassiveView = Set.empty | |
Random = random | |
Config = config | |
Output = out } | |
member this.Self = this.Config.LocalEndpoint | |
member this.Send(target, msg: Message) = this.Output.Send(target, { Peer = this.Self; Message = msg }) | |
type Random with | |
member this.Pick(set: Set<'a>): 'a option = | |
if Set.isEmpty set then None | |
else | |
let i = this.Next(set.Count) | |
set |> Seq.skip i |> Seq.tryHead | |
member this.Shuffle(set: Set<'a>): 'a seq = | |
set |> Seq.sortBy (fun _ -> this.Next()) | |
let private isPassiveViewFull (state: State) = | |
state.PassiveView.Count >= state.Config.PassiveViewCapacity | |
let private addPassive (peer: Endpoint) (state: State) = | |
if Set.contains peer state.ActiveView || Set.contains peer state.PassiveView || peer = state.Self | |
then state | |
else | |
let passive = | |
if isPassiveViewFull state then | |
state.Random.Pick state.PassiveView | |
|> Option.map (fun drop -> Set.remove drop state.PassiveView) | |
|> Option.defaultValue state.PassiveView | |
else state.PassiveView | |
{ state with PassiveView = Set.add peer passive } | |
let private isActiveViewFull (state: State) = | |
state.ActiveView.Count >= state.Config.ActiveViewCapacity | |
/// Removes active peer, disconnecting necessary connection, sending communication messages and triggering peer events. | |
let private removeActive (peer: Endpoint) (state: State) respond = async { | |
let active = Set.remove peer state.ActiveView | |
if obj.ReferenceEquals(active, state.ActiveView) then return state | |
else | |
if respond then | |
do! state.Send(peer, Disconnect(alive=true, respond=false)) | |
do! state.Output.Disconnect(peer) | |
state.Output.Notify(NeighborDown peer) | |
do! state.Output.Send(state.Self, { Peer = peer; Message = Down }) | |
return addPassive peer { state with ActiveView = active } | |
} | |
/// Checks if active view is full and if so, removes one of the nodes at random. | |
let private removeActiveIfFull state = async { | |
if isActiveViewFull state then | |
match state.Random.Pick state.ActiveView with | |
| Some drop -> return! removeActive drop state true | |
| None -> return state | |
else return state | |
} | |
/// Forcefully puts peer into active view, sending necessary messages, | |
/// establishing connections and triggering peer events in the process. | |
let private addActive (peer: Endpoint) (highPriority: bool) (state: State) = async { | |
if Set.contains peer state.ActiveView || peer = state.Self then return state | |
else | |
let! state = removeActiveIfFull state | |
let passive = Set.remove peer state.PassiveView | |
let active = Set.add peer state.ActiveView | |
do! state.Send(peer, Neighbor(highPriority)) | |
state.Output.Notify(NeighborUp peer) | |
do! state.Output.Send(state.Self, { Peer = peer; Message = Up }) | |
return { state with ActiveView = active; PassiveView = passive } | |
} | |
let private onDisconnect (old: State) (peer: Endpoint) alive respond = async { | |
let! state = removeActive peer old respond | |
if not (obj.ReferenceEquals(old, state)) then | |
let passive = Set.remove peer state.PassiveView | |
if not (isActiveViewFull state) then | |
// if active view has free slot, pick one passive peer at random and promote it into active view | |
match state.Random.Pick passive with | |
| Some node -> | |
let highPriority = Set.isEmpty state.ActiveView | |
do! state.Send(node, Neighbor(highPriority)) | |
return if alive then addPassive peer state else state | |
| None -> return if alive then addPassive peer state else state | |
else | |
return if alive then addPassive peer state else state | |
else return state | |
} | |
/// Gets samples of configured size from active and passive view and send them to random active peer. | |
let private doShuffle (state: State) = async { | |
match state.Random.Pick state.ActiveView with | |
| None -> return state | |
| Some node -> | |
let active = | |
state.Random.Shuffle (Set.remove node state.ActiveView) | |
|> Seq.take state.Config.ShuffleActiveViewCount | |
|> Set.ofSeq | |
let passive = | |
state.Random.Shuffle state.PassiveView | |
|> Seq.take state.Config.ShufflePassiveViewCount | |
|> Set.ofSeq | |
let msg = | |
{ Origin = state.Self | |
Nodes = active + passive | |
Ttl = state.Config.ShuffleTTL } | |
do! state.Send(node, Shuffle msg) | |
return state | |
} | |
let private onNeighbor (state: State) (peer: Endpoint) highPriority = async { | |
if highPriority || not (isActiveViewFull state) then | |
return! addActive peer highPriority state | |
else return state | |
} | |
let private onJoin (state: State) (peer: Endpoint) = async { | |
let! state = addActive peer true state | |
let ttl = state.Config.ActiveRandomWalkLength | |
let fwd = ForwardJoin(peer, ttl) | |
for node in Set.remove peer state.ActiveView do | |
do! state.Send(node, fwd) // announce new joining peer to other active peers | |
return state | |
} | |
let private onForwardJoin (state: State) (peer: Endpoint) (sender: Endpoint) ttl = async { | |
if ttl = 0 || Set.isEmpty state.ActiveView then return! addActive peer true state | |
else | |
let state = if ttl = state.Config.PassiveRandomWalkLength then addPassive peer state else state | |
match state.Random.Pick(Set.remove sender state.ActiveView) with | |
| None -> return! addActive peer true state | |
| Some next -> | |
do! state.Send(next, ForwardJoin(peer, ttl-1)) | |
return state | |
} | |
let private onShuffle (state: State) (shuffle: Shuffle) sender = async { | |
if shuffle.Ttl = 0 then | |
let nodes = | |
state.Random.Shuffle(state.PassiveView) | |
|> Seq.take shuffle.Nodes.Count | |
|> Set.ofSeq | |
do! state.Send(shuffle.Origin, ShuffleReply(nodes)) | |
return shuffle.Nodes |> Set.fold (fun acc node -> addPassive node acc) state | |
else match state.Random.Pick(state.ActiveView - Set.ofList [shuffle.Origin; sender]) with | |
| Some node -> | |
do! state.Send(node, Shuffle { shuffle with Ttl = shuffle.Ttl - 1 }) | |
return state | |
| None -> return state | |
} | |
let private onShuffleReply (state: State) (nodes: Set<Endpoint>) = async { | |
return nodes |> Set.fold (fun acc peer -> addPassive peer acc) state | |
} | |
/// Disconnect peer if its not in active set. | |
let private disconnectNonActivePeer (state: State) (peer: Endpoint) = async { | |
if peer <> state.Self && not (Set.contains peer state.ActiveView) then | |
do! state.Send(peer, Disconnect(alive=true, respond=false)) | |
do! state.Output.Disconnect(peer) | |
} | |
let handle (state: State) (e: Envelope) : Async<State> = async { | |
let! state = | |
match e.Message with | |
| Join -> onJoin state e.Peer | |
| ForwardJoin(peer, ttl) -> onForwardJoin state peer e.Peer ttl | |
| Shuffle shuffle -> onShuffle state shuffle e.Peer | |
| ShuffleReply nodes -> onShuffleReply state nodes | |
| Neighbor highPriority -> onNeighbor state e.Peer highPriority | |
| Disconnect(alive, respond) -> onDisconnect state e.Peer alive respond | |
| DoShuffle -> doShuffle state | |
| _ -> async.Return state | |
// make sure to dispose the connection from sender if it's not a part of active view | |
if e.Message.CanInit then | |
do! disconnectNonActivePeer state e.Peer | |
return state | |
} | |
/// Reference: https://core.ac.uk/download/pdf/32330596.pdf | |
/// NOTES: Plumtree proposes to send IHave and Graft messages after some timeout, but this is not implemented yet | |
module Plumtree = | |
type State = | |
{ Self: Endpoint // ID/address of current node | |
EagerPushPeers: Set<Endpoint> // direct branches of peers to broadcast messages to | |
LazyPushPeers: Set<Endpoint> // peers that periodically receive message IHave to detect undelivered messages | |
LazyQueue: struct (MessageId * Round * Endpoint) list // lazy peers don't need to be informed right away, IHave can worked in batched manner | |
Output: Reactor | |
Config: Config | |
ReceivedMessages: Map<MessageId, Gossip> // received messages are stashed to purpose of redelivery. It may need to be prunned from time to time. | |
Missing: Set<Announcement> // messages that are confirmed to be not received | |
Timers: Set<MessageId> } // active timers | |
static member Create (config: Config) (out: Reactor) = | |
{ Self = config.LocalEndpoint | |
EagerPushPeers = Set.empty | |
LazyPushPeers = Set.empty | |
LazyQueue = [] | |
Output = out | |
Config = config | |
ReceivedMessages = Map.empty | |
Missing = Set.empty | |
Timers = Set.empty } | |
member this.Send(target: Endpoint, msg: Message) = this.Output.Send(target, { Peer = this.Self; Message = msg }) | |
/// Moves peer into eager set. | |
let private addEager peer state = | |
{ state with | |
EagerPushPeers = Set.add peer state.EagerPushPeers | |
LazyPushPeers = Set.remove peer state.LazyPushPeers } | |
/// Moves peer into lazy set. | |
let private addLazy peer state = | |
{ state with | |
EagerPushPeers = Set.remove peer state.EagerPushPeers | |
LazyPushPeers = Set.add peer state.LazyPushPeers } | |
/// Immediatelly sends message to eager peers. | |
let private eagerPush (state: State) gossip sender = async { | |
do! state.EagerPushPeers - Set.ofArray [| state.Self; sender |] | |
|> Seq.map (fun peer -> | |
//printfn "%s eager push to %s" state.Self peer | |
state.Send(peer, Gossip gossip)) | |
|> Async.Parallel | |
|> Async.Ignore | |
} | |
/// Puts lazy message announcements on top of the queue which will be consumed into batched IHave message | |
/// once dispatch trigger activates (it's cyclic operation). | |
let private lazyPush (state: State) (gossip: Gossip) sender = async { | |
let lazyQueue = | |
Set.remove sender state.LazyPushPeers | |
|> Set.fold (fun acc peer -> struct(gossip.Id, gossip.Round, peer)::acc) state.LazyQueue | |
return { state with LazyQueue = lazyQueue } | |
} | |
/// Dispatches messages from lazy queue over to lazy peers. | |
let private onDispatch (state: State) = async { | |
let gossips = | |
state.LazyQueue | |
|> List.fold (fun acc struct(id, round, peer) -> | |
let msg = struct(id, round) | |
match Map.tryFind peer acc with | |
| None -> Map.add peer [msg] acc | |
| Some existing -> Map.add peer (msg::existing) acc) Map.empty | |
do! gossips | |
|> Seq.map (fun (KeyValue(peer, grafts)) -> | |
//printfn "%s lazy push to %s" state.Self peer | |
state.Send(peer, IHave(List.toArray grafts))) | |
|> Async.Parallel | |
|> Async.Ignore | |
return { state with LazyQueue = [] } | |
} | |
/// Builds a gossip message and sends it. Lazy peers receive only gossip metadata that is used to | |
/// detect disconnections (in which case gossips where not received) and trigger self-heal process. | |
let private onBroadcast (state: State) (data: Binary) = async { | |
let msgId = Guid.NewGuid() | |
let gossip = { Id = msgId; Round = 0UL; Data = data } | |
do! eagerPush state gossip state.Self | |
let! state = lazyPush state gossip state.Self | |
return { state with ReceivedMessages = Map.add msgId gossip state.ReceivedMessages } | |
} | |
//let private optimize (state: State) data messageId round sender = async { | |
// for m in state.Missing |> Set.filter (fun m -> m.MessageId = messageId) do | |
// if m.Round < round && round - m.Round >= state.Config.OptimizationThreshold then | |
// do! state.Send(m.Sender, Graft(0UL, m.Round)) | |
// do! state.Send(sender, Prune) | |
//} | |
let private onGossip (state: State) (gossip: Gossip) sender = async { | |
if Map.containsKey gossip.Id state.ReceivedMessages then | |
let state = addLazy sender state | |
do! state.Send(sender, Prune) | |
return state | |
else | |
let received = Map.add gossip.Id gossip state.ReceivedMessages | |
let message = { gossip with Round = gossip.Round+1UL } | |
do! eagerPush state message sender | |
let! state = lazyPush state message sender | |
let state = { addEager sender state with ReceivedMessages = received } | |
state.Output.Notify (Received gossip.Data) | |
//TODO: schedule timer that will prune gossip after some period (~2x of expected time to broadcast gossip to all nodes) | |
return state | |
} | |
/// Prunes the sender, moving it into lazy peers. | |
let private onPrune (state: State) sender = async { | |
return addLazy sender state | |
} | |
let private iHave (state: State) (messageId: MessageId) (round: Round) sender = | |
if Map.containsKey messageId state.ReceivedMessages then | |
let timers = Set.add messageId state.Timers | |
let missing = { MessageId = messageId; Sender = sender; Round = round } | |
{ state with Timers = timers; Missing = Set.add missing state.Missing } | |
else state | |
let private onIHave state notes sender = async { | |
let nstate = notes |> Array.fold (fun state struct(msgId, round) -> iHave state msgId round sender) state | |
for messageId in nstate.Timers - state.Timers do | |
//TODO: schedule timer to send this message after config.IHaveTimeout | |
do! state.Output.Send(state.Self, { Message = Timer messageId; Peer = state.Self }) | |
return nstate | |
} | |
let private removeFirstAnnouncement (missing: Set<Announcement>) msgId = | |
let found = Seq.find (fun m -> m.MessageId = msgId) missing | |
(Set.remove found missing), found | |
let private onTimer (state: State) (messageId: MessageId) = async { | |
let! timers = async { | |
if Set.contains messageId state.Timers then return state.Timers | |
else | |
//TODO: schedule timer to send this message after config.GraftTimeout | |
do! state.Output.Send(state.Self, { Message = Timer messageId; Peer = state.Self }) | |
return Set.add messageId state.Timers } | |
let missing, announcement = removeFirstAnnouncement state.Missing messageId | |
let state = { addEager announcement.Sender { state with Missing = missing } with Timers = timers } | |
do! state.Send(announcement.Sender, Graft(messageId, announcement.Round)) | |
return state | |
} | |
/// Handle Graft message. Receiving it usually means, that sender connection has been severed, | |
/// needs to be restored and following gossip message needs to be resend. | |
let private onGraft (state: State) (messageId: MessageId) (round: Round) sender = async { | |
let state = addEager sender state | |
match Map.tryFind messageId state.ReceivedMessages with | |
| None -> return state | |
| Some gossip -> | |
do! state.Send(sender, Gossip gossip) | |
return state | |
} | |
let private onNeighborDown (state: State) peer = async { | |
return { state with | |
Missing = state.Missing |> Set.filter (fun m -> m.Sender = peer) | |
EagerPushPeers = Set.remove peer state.EagerPushPeers | |
LazyPushPeers = Set.remove peer state.LazyPushPeers } | |
} | |
let private onNeighborUp (state: State) peer = async { | |
return { state with EagerPushPeers = Set.add peer state.EagerPushPeers } | |
} | |
let handle (state: State) (e: Envelope) = async { | |
match e.Message with | |
| Up -> return! onNeighborUp state e.Peer | |
| Down -> return! onNeighborDown state e.Peer | |
| Gossip gossip -> return! onGossip state gossip e.Peer | |
| Prune -> return! onPrune state e.Peer | |
| IHave notes -> return! onIHave state notes e.Peer | |
| Timer msgId -> return! onTimer state msgId | |
| Graft(msgId, round) -> return! onGraft state msgId round e.Peer | |
| Broadcast data -> return! onBroadcast state data | |
| Dispatch -> return! onDispatch state | |
| _ -> return state | |
} | |
[<Sealed>] | |
type Peer(out: Reactor, ?config: Config, ?random: Random) = | |
let config = config |> Option.defaultValue Config.Default | |
let random = random |> Option.defaultWith Random | |
let membership = Hyparview.State.Create config out random | |
let broadcast = Plumtree.State.Create config out | |
let actor = MailboxProcessor.Start(fun ctx -> | |
let rec loop (membership: Hyparview.State) (broadcast: Plumtree.State) = async { | |
let! (envelope, reply: AsyncReplyChannel<_> option) = ctx.Receive() | |
try | |
let! broadcast = Plumtree.handle broadcast envelope | |
let! membership = Hyparview.handle membership envelope | |
reply |> Option.iter (fun r -> r.Reply None) | |
return! loop membership broadcast | |
with err -> | |
reply |> Option.iter (fun r -> r.Reply (Some err)) | |
return! loop membership broadcast | |
} | |
loop membership broadcast) | |
let send envelope = async { | |
match! actor.PostAndAsyncReply(fun r -> (envelope, Some r)) with | |
| None -> () | |
| Some e -> ExceptionDispatchInfo.Capture(e).Throw() | |
} | |
member this.Endpoint = config.LocalEndpoint | |
member this.Post message = actor.Post (message, None) | |
member this.Join endpoint = send { Message = Join; Peer = endpoint } | |
member this.Disconnect endpoint = send { Message = Disconnect(true, true); Peer = endpoint } | |
member this.Broadcast data = send { Message = Broadcast data; Peer = config.LocalEndpoint } | |
member this.Dispose() = | |
(actor :> IDisposable).Dispose() | |
interface IDisposable with member this.Dispose() = this.Dispose() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Tests | |
open System | |
open System.Threading | |
open System.Threading.Channels | |
open System.Threading.Tasks | |
open Protocols | |
open FSharp.Control.Tasks | |
open Expecto | |
[<Sealed>] | |
type TestEnv() = | |
let sync = obj() | |
let mutable peers = Map.empty | |
let mutable connections = Set.empty | |
let peerEvents = Channel.CreateUnbounded<_>() | |
let connect endpoint target = | |
lock sync (fun () -> | |
//printfn "Connecting %s to %s" endpoint target | |
connections <- Set.add (endpoint, target) connections | |
connections <- Set.add (target, endpoint) connections | |
) | |
member this.Register(endpoint, peer) = peers <- Map.add endpoint peer peers | |
member this.AreConnected(a, b) = | |
lock sync ( fun () -> Set.contains (a, b) connections || Set.contains (b, a) connections) | |
member this.Events = peerEvents.Reader | |
member this.Reactor (endpoint: Endpoint) = | |
{ new Reactor with | |
member this.Send(target, msg) = async { | |
match Map.tryFind target peers with | |
| Some (m: Peer) -> | |
if not (Set.contains (endpoint, target) connections) then | |
connect endpoint target | |
m.Post msg | |
| None -> failwithf "Couldn't find target node: %O" target | |
} | |
member this.Disconnect(target) = async { | |
lock sync (fun () -> | |
//printfn "Disconnecting %s from %s" endpoint target | |
connections <- Set.remove (endpoint, target) connections | |
connections <- Set.remove (target, endpoint) connections | |
) | |
} | |
member this.Notify event = | |
//lock sync (fun () -> printfn "%s received: %O" endpoint event) | |
peerEvents.Writer.WriteAsync((endpoint, event)).GetAwaiter().GetResult() } | |
let peer (env: TestEnv) (config: Config) (random: Random) = | |
let reactor = env.Reactor config.LocalEndpoint | |
let m = new Peer(reactor, config, random) | |
env.Register(config.LocalEndpoint, m) | |
m | |
let poll count (env: TestEnv) (ct: CancellationToken) = task { | |
let messages = Array.zeroCreate count | |
for i=0 to count-1 do | |
let! msg = env.Events.ReadAsync ct | |
messages.[i] <- msg | |
return messages | |
} | |
[<RequireQualifiedAccess>] | |
module Expect = | |
let connected (env: TestEnv) connections = | |
for (x, y) in connections do | |
Expect.isTrue (env.AreConnected(x, y)) <| sprintf "%s-%s should be connected" x y | |
let disconnected (env: TestEnv) connections = | |
for (x, y) in connections do | |
Expect.isFalse (env.AreConnected(x, y)) <| sprintf "%s-%s should be diconnected" x y | |
[<Tests>] | |
let hyparviewTests = testList "Hyparview" [ | |
testTask "join & leave" { | |
do! task { | |
let env = TestEnv() | |
use a = peer env (Config.Create "A") (Random()) | |
use b = peer env (Config.Create "B") (Random()) | |
use cancel = new CancellationTokenSource(1000) | |
do! a.Join "B" | |
let! actual = poll 2 env cancel.Token | |
let expected = Set.ofList [("A", NeighborUp "B"); ("B", NeighborUp "A")] | |
Expect.equal (Set.ofArray actual) expected "neighbor up should be notified by both joining nodes" | |
Expect.isTrue (env.AreConnected("A", "B")) "both parties should be connected" | |
do! b.Disconnect "A" | |
let! actual = poll 2 env cancel.Token | |
let expected = Set.ofList [("A", NeighborDown "B"); ("B", NeighborDown "A")] | |
Expect.equal (Set.ofArray actual) expected "neighbor down should be notified by both disconnecting nodes" | |
Expect.disconnected env [("A", "B")] | |
} | |
} | |
testTask "limit active view size" { | |
do! task { | |
let env = TestEnv() | |
use a = peer env ({ Config.Create "A" with ActiveViewCapacity = 2; PassiveViewCapacity = 3 }) (Random(1234)) | |
use b = peer env ({ Config.Create "B" with ActiveViewCapacity = 2; PassiveViewCapacity = 3 }) (Random(1234)) | |
use c = peer env ({ Config.Create "C" with ActiveViewCapacity = 2; PassiveViewCapacity = 3 }) (Random(1234)) | |
use d = peer env ({ Config.Create "D" with ActiveViewCapacity = 2; PassiveViewCapacity = 3 }) (Random(1234)) | |
use cancel = new CancellationTokenSource(1000) | |
do! a.Join "B" | |
do! a.Join "C" | |
do! b.Join "C" | |
// A-B-C are interconnected to each other (active view cap = 2) | |
let expected = Set.ofList [ | |
("A", NeighborUp "B") | |
("A", NeighborUp "C") | |
("B", NeighborUp "C") | |
("B", NeighborUp "A") | |
("C", NeighborUp "B") | |
("C", NeighborUp "A") | |
] | |
let! actual = poll 6 env cancel.Token | |
Expect.equal (Set.ofArray actual) expected "neighbor up should be notified by all joining nodes" | |
Expect.connected env [("A", "B"); ("A", "C") ; ("B", "C")] | |
do! d.Join "A" // since D joins A and has no other neighbors, it has higher priority. A must disconnect from either B or C. | |
let expected = Set.ofList [ | |
("D", NeighborUp "A") | |
("A", NeighborUp "D") | |
("A", NeighborDown "B") | |
("B", NeighborDown "A") | |
] | |
let! actual = poll 4 env cancel.Token | |
Expect.equal (Set.ofArray actual) expected "D should join with A, which should disconnect from B or C" | |
Expect.connected env [("A", "D"); ("A", "C") ; ("B", "C")] | |
Expect.disconnected env [("A", "B"); ("D", "B") ; ("D", "C")] | |
} | |
} | |
] | |
let private encode (x: string) : Binary = System.Text.Encoding.UTF8.GetBytes x | |
let private decode (x: Binary) : string = System.Text.Encoding.UTF8.GetString x | |
let private addr i = "P" + string i | |
let private receiveAllOf (env: TestEnv) (expected: Set<_>) (cancel) = task { | |
let mutable remaining = expected | |
try | |
while not (Set.isEmpty remaining) do | |
let! received = env.Events.ReadAsync(cancel) | |
remaining <- Set.remove received remaining | |
with :? OperationCanceledException -> | |
failwithf "didn't received %i messages: %A" (Set.count remaining) remaining | |
} | |
let private setup env (n: int) = task { | |
let makePeer i = | |
let config = { Config.Create (addr i) with ActiveViewCapacity = int (Math.Log2(float n)) + 1; PassiveViewCapacity = 6* (int (Math.Log2(float n)) + 1) } | |
peer env config (Random(1234)) | |
let peers = Array.init n makePeer | |
for i=0 to n-1 do | |
do! peers.[i].Join (addr ((i+1) % n)) | |
do! Task.Delay 1000 // wait for peers to stabilize connection | |
let! _ = poll n env CancellationToken.None | |
return peers | |
} | |
[<Tests>] | |
let plumtreeTests = testList "Plumtree" [ | |
testTask "test broadcast message in big cluster" { | |
do! task { | |
let n = 1000 | |
let env = TestEnv() | |
let! peers = setup env n // simulate broadcasting a message in the cluster of thousand nodes | |
let msg = encode "hello" | |
do! (Array.head peers).Broadcast msg | |
let expected = Array.init (n-1) (fun i -> (addr (i+1), Received msg)) |> Set.ofArray | |
use cancel = new CancellationTokenSource 10_000 | |
do! receiveAllOf env expected cancel.Token | |
} | |
} | |
testTask "broadcast works even when nodes have disconnected" { | |
do! task { | |
let n = 100 | |
let env = TestEnv() | |
let! peers = setup env n // simulate broadcasting a message in the cluster of thousand nodes | |
let msg = encode "hello" | |
do! (Array.head peers).Broadcast msg | |
// establish initial broadcast tree | |
let expected = Array.init (n-1) (fun i -> (addr (i+1), Received msg)) |> Set.ofArray | |
use cancel = new CancellationTokenSource 10_000 | |
do! receiveAllOf env expected cancel.Token | |
let dc = peers.[n/2] | |
do! dc.Disconnect "P15" | |
do! dc.Disconnect "P21" | |
do! dc.Disconnect "P47" | |
do! dc.Disconnect "P53" | |
do! dc.Disconnect "P7" | |
do! dc.Disconnect "P71" | |
do! dc.Broadcast msg | |
let expected = | |
Array.init n (fun i -> (addr i, Received msg)) | |
|> Set.ofArray | |
|> Set.filter (fun (addr, _) -> addr <> dc.Endpoint) | |
use cancel = new CancellationTokenSource 10_000 | |
do! receiveAllOf env expected cancel.Token | |
} | |
} | |
] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment