Last active
December 26, 2023 12:21
-
-
Save Horusiath/86f60b5a247a6f5be5242950dd3733b5 to your computer and use it in GitHub Desktop.
A simple Reliable Causal Broadcast implementation using F# and Akka.NET
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Program = | |
type InMemoryDb(replica: ReplicaId) = | |
let snapshot = ref null | |
let mutable events : Map<uint64,obj> = Map.empty | |
interface Db with | |
member _.SaveSnapshot state = async { snapshot := (box state) } | |
member _.LoadSnapshot<'s>() = async { | |
match !snapshot with | |
| :? 's as state -> return Some state | |
| _ -> return None | |
} | |
member _.LoadEvents<'e>(from) = asyncSeq { | |
for (seqNr, e) in Map.toSeq events do | |
if seqNr >= from then | |
let casted : Event<'e> = downcast e | |
yield casted | |
} | |
member _.SaveEvents evts = async { | |
for event in evts do | |
events <- Map.add event.SeqNr (box event) events | |
} | |
let main () = | |
let sys = System.create "sys" <| Configuration.parse "akka.loglevel = DEBUG" | |
let a = spawn sys "A" <| props (ORSet.props (InMemoryDb "A") "A") | |
let b = spawn sys "B" <| props (ORSet.props (InMemoryDb "B") "B") | |
async { | |
a <! Connect("B", b) | |
b <! Connect("A", a) | |
let! state = ORSet.add 1L a | |
printfn "State on node A (first): %A" state | |
do! Async.Sleep 1000 | |
let! state = ORSet.add 2L a | |
printfn "State on node A (second): %A" state | |
do! Async.Sleep 5000 | |
let! state = ORSet.query b | |
printfn "State on node B (after sync): %A" state | |
let! state = ORSet.remove 2L b | |
printfn "State on node B (after update): %A" state | |
do! Async.Sleep 5000 | |
let! state1 = ORSet.query a | |
let! state2 = ORSet.query b | |
assert (state1 = state2) | |
printfn "SUCCESS" | |
} |> Async.RunSynchronously | |
0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// Reliable causal broadcast implementation | |
module DemoFs.RCB | |
open Akka.Actor | |
open Akkling | |
open System | |
open FSharp.Control | |
type ReplicaId = String | |
type Ord = | |
| Lt = -1 // lower | |
| Eq = 0 // equal | |
| Gt = 1 // greater | |
| Cc = 2 // concurrent | |
[<RequireQualifiedAccess>] | |
module Helpers = | |
/// Helper method for insert-or-update semantic for Map | |
let upsert k v fn map = | |
match Map.tryFind k map with | |
| None -> Map.add k v map | |
| Some v -> Map.add k (fn v) map | |
type VTime = Map<ReplicaId, int64> | |
[<RequireQualifiedAccess>] | |
module Version = | |
let zero: VTime = Map.empty | |
let inc r (vv: VTime): VTime = vv |> Helpers.upsert r 1L ((+)1L) | |
let set r ts (vv: VTime): VTime = Map.add r ts vv | |
let merge (vv1: VTime) (vv2: VTime) = | |
vv2 |> Map.fold (fun acc k v2 -> Helpers.upsert k v2 (max v2) acc) vv1 | |
let compare (a: VTime) (b: VTime): Ord = | |
let valOrDefault k map = | |
match Map.tryFind k map with | |
| Some v -> v | |
| None -> 0L | |
let akeys = a |> Map.toSeq |> Seq.map fst |> Set.ofSeq | |
let bkeys = b |> Map.toSeq |> Seq.map fst |> Set.ofSeq | |
(akeys + bkeys) | |
|> Seq.fold (fun prev k -> | |
let va = valOrDefault k a | |
let vb = valOrDefault k b | |
match prev with | |
| Ord.Eq when va > vb -> Ord.Gt | |
| Ord.Eq when va < vb -> Ord.Lt | |
| Ord.Lt when va > vb -> Ord.Cc | |
| Ord.Gt when va < vb -> Ord.Cc | |
| _ -> prev ) Ord.Eq | |
/// A contaienr for user-defined events of type 'e. It contains metadata necessary | |
/// to partially order and distribute events in peer-to-peer fashion. | |
type Event<'e> = | |
{ /// The replica, on which this event originally was created. | |
Origin: ReplicaId | |
/// The sequence number given by the origin replica at the moment of event creation. | |
/// This allows us to keep track of replication progress with remote replicas even | |
/// when we didn't received their events directly, but via intermediate replica. | |
OriginSeqNr: uint64 | |
/// The sequence number given by the local replica. For events created by current replica | |
/// it's the same as `OriginSeqNr`. For replicated events it's usually higher. | |
LocalSeqNr: uint64 | |
/// Vector clock which describes happened-before relationships between events from | |
/// different replicas, enabling to establish partial order among them. | |
Version: VTime | |
/// An user-defined event data. | |
Data: 'e } | |
override this.ToString() = sprintf "(%s, %i, %i, %A, %O)" this.Origin this.OriginSeqNr this.LocalSeqNr this.Version this.Data | |
type Endpoint<'s,'c,'e> = IActorRef<Protocol<'s,'c,'e>> | |
and Protocol<'s,'c,'e> = | |
/// Attaches another replica to continuously synchronize with current one. | |
| Connect of replicaId:ReplicaId * Endpoint<'s,'c,'e> | |
/// Request to read up to `maxCount` events from a given replica starting from `seqNr`. Additionally a `filter` | |
/// is provided to deduplicate possible events on the sender side (it will be then used second time on receiver side). | |
/// This message is expected to be replied with `Recovered`, which contains all events satisfying seqNr/filter criteria. | |
| Replicate of seqNr:uint64 * maxCount:int * filter:VTime * replyTo:Endpoint<'s,'c,'e> | |
| ReplicateTimeout of ReplicaId | |
/// Response to `Recover` - must always be send. Empty content notifies about end of event stream. `toSeqNr` informs | |
/// up to which sequence number this message advanced. | |
| Replicated of from:ReplicaId * toSeqNr:uint64 * events:Event<'e>[] | |
/// Request for a state. It should be replied with state being application of `Crdt.Query` over `ReplicationState.Crdt`. | |
| Query | |
/// Persists an event into current replica. Replied with updated, materialized state after success. | |
| Command of 'c | |
/// Message send at the beginning of recovery phase with the latest persisted snapshot of the state (if there was any) | |
| Loaded of ReplicationState<'s> | |
/// Periodic trigger to persist current state snapshot (only performed if state has changed since last snapshot, tracked by IsDirty flag). | |
| Snapshot | |
and ReplicationState<'s> = | |
{ /// Unique identifier of a given replica/node. | |
Id: ReplicaId | |
/// Checks if replication state has been modified after being persisted. | |
IsDirty: bool | |
/// Counter used to assign unique sequence number for the events to be stored locally. | |
SeqNr: uint64 | |
/// Version vector describing the last observed event. | |
Version: VTime | |
/// Sequence numbers of remote replicas. When synchronizing (via `Recover` message) with remote replicas, | |
/// we start doing so from the last known sequence numbers we received. | |
Observed: Map<ReplicaId, uint64> | |
/// CRDT object that is replicated. | |
Crdt: 's } | |
[<RequireQualifiedAccess>] | |
module ReplicationState = | |
let inline create (id: ReplicaId) state = { Id = id; IsDirty = false; SeqNr = 0UL; Version = Map.empty; Observed = Map.empty; Crdt = state } | |
/// Checks if current event has NOT been observed by a replica identified by state. Unseen events are those, which | |
/// have SeqNr higher than the highest observed sequence number on a given node AND their version vectors were not | |
/// observed (meaning they are either greater or concurrent to current node version). | |
let unseen nodeId (state: ReplicationState<'s>) (e: Event<'e>) = | |
match Map.tryFind nodeId state.Observed with | |
| Some ver when e.OriginSeqNr <= ver -> false | |
| _ -> (Version.compare e.Version state.Version) > Ord.Eq | |
[<Interface>] | |
type Db = | |
abstract SaveSnapshot: 's -> Async<unit> | |
abstract LoadSnapshot: unit -> Async<'s option> | |
abstract LoadEvents: startSeqNr:uint64 -> AsyncSeq<Event<'e>> | |
abstract SaveEvents: events:Event<'e> seq -> Async<unit> | |
/// Use database `cursor` to read up to `count` elements and send them to the `target` as Recovered message. | |
/// Send only entries that have keys starting with a given `prefix` (eg. events belonging to specific nodeId). | |
/// Use `filter` to skip events that have been seen by the `target`. | |
let replay (nodeId: ReplicaId) (filter: VTime) (target: Endpoint<'s,'c,'e>) (events: AsyncSeq<Event<'e>>) (count:int) = async { | |
let buf = ResizeArray() | |
let mutable cont = count > 0 | |
let mutable i = 0 | |
let mutable lastSeqNr = 0UL | |
use cursor = events.GetEnumerator() | |
while cont do | |
match! cursor.MoveNext() with | |
| Some e -> | |
if Version.compare e.Version filter > Ord.Eq then | |
buf.Add(e) | |
i <- i + 1 | |
cont <- i < count | |
lastSeqNr <- Math.Max(lastSeqNr, e.LocalSeqNr) | |
| _ -> cont <- false | |
let events = buf.ToArray() | |
target <! Replicated(nodeId, lastSeqNr, events) | |
} | |
let recoverTimeout = TimeSpan.FromSeconds 5. | |
type ReplicationStatus<'s,'c,'e> = | |
{ /// Access point for the remote replica. | |
Endpoint: Endpoint<'s,'c,'e> | |
/// Cancellation token for pending `RecoverTimeout`. | |
Timeout: ICancelable } | |
[<Interface>] | |
type Crdt<'crdt,'state,'cmd,'event> = | |
/// Get a default (zero) value of the CRDT. | |
abstract Default: 'crdt | |
/// Given a CRDT state return an actual value that user has interest in. Eg. ORSet still has to carry | |
/// metadata timestamps, however from user perspective materialized value of ORSet is just ordrinary Set<'a>. | |
abstract Query: 'crdt -> 'state | |
/// Equivalent of command handler in eventsourcing analogy. | |
abstract Prepare: state:'crdt * command:'cmd -> 'event | |
/// Equivalent of event handler in eventsourcing analogy. | |
abstract Effect: state:'crdt * event:Event<'event> -> 'crdt | |
let replicator (crdt: Crdt<'crdt,'state,'cmd,'event>) (db: Db) (id: ReplicaId) (ctx: Actor<Protocol<_,_,_>>) = | |
/// Cancel last pending `RecoverTimeout` task, and schedule it again. | |
let refreshTimeouts nodeId progresses (ctx: Actor<_>) = | |
let p = Map.find nodeId progresses | |
p.Timeout.Cancel() | |
let timeout = ctx.Schedule recoverTimeout ctx.Self (ReplicateTimeout nodeId) | |
Map.add nodeId { p with Timeout = timeout } progresses | |
let rec active (db: Db) (state: ReplicationState<'crdt>) (replicatingNodes: Map<ReplicaId, ReplicationStatus<'crdt,'cmd,'event>>) (ctx: Actor<_>) = actor { | |
match! ctx.Receive() with | |
| Query -> | |
ctx.Sender() <! crdt.Query state.Crdt | |
return! active db state replicatingNodes ctx | |
| Replicate(from, count, filter, sender) -> | |
logDebugf ctx "received recover request from %s: seqNr=%i, vt=%O" sender.Path.Name from filter | |
let cursor = db.LoadEvents(from) | |
replay state.Id filter sender cursor count |> Async.Start | |
return! active db state replicatingNodes ctx | |
| Replicated(nodeId, lastSeqNr, [||]) -> | |
// if we received empty event list, this node is up to date with `nodeId` | |
// just schedule timeout, so when it happens we ask to Recover again | |
logDebugf ctx "%s reached end of updates" nodeId | |
let prog = refreshTimeouts nodeId replicatingNodes ctx | |
let observedSeqNr = Map.tryFind nodeId state.Observed |> Option.defaultValue 0UL | |
if lastSeqNr > observedSeqNr then | |
let nstate = { state with Observed = Map.add nodeId lastSeqNr state.Observed } | |
do! db.SaveSnapshot(nstate.Id, nstate) | |
return! active db nstate prog ctx | |
else | |
return! active db state prog ctx | |
| Replicated(nodeId, lastSeqNr, events) -> | |
let mutable nstate = state | |
let mutable remoteSeqNr = Map.tryFind nodeId nstate.Observed |> Option.defaultValue 0UL | |
let toSave = ResizeArray() | |
// for all events not seen by the current node, rewrite them to use local sequence nr, update the state | |
// and save them in the database | |
for e in events |> Array.filter (ReplicationState.unseen nodeId state) do | |
logDebugf ctx "replicating event %O from replica %s" e nodeId | |
let seqNr = nstate.SeqNr + 1UL | |
let version = Version.merge nstate.Version e.Version // update current node version vector | |
remoteSeqNr <- Math.Max(remoteSeqNr, e.LocalSeqNr) // increment observed remote sequence nr | |
let nevent = { e with LocalSeqNr = seqNr } | |
nstate <- { nstate with | |
Crdt = crdt.Effect(nstate.Crdt, nevent) | |
SeqNr = seqNr | |
Version = version | |
Observed = Map.add nodeId remoteSeqNr nstate.Observed } | |
toSave.Add nevent | |
do! db.SaveEvents toSave // save all unseen events together with updated state | |
//do! db.SaveSnapshot nstate // in practice snapshot should be applied on condition (ideally in the same transaction) | |
let target = Map.find nodeId replicatingNodes | |
target.Endpoint <! Replicate(lastSeqNr+1UL, 100, nstate.Version, ctx.Self) // continue syncing | |
let prog = refreshTimeouts nodeId replicatingNodes ctx | |
return! active db { nstate with IsDirty = true } prog ctx | |
| ReplicateTimeout nodeId -> | |
// if we didn't received Recovered in time or the last one was empty, upon timeout just retry the request | |
logDebugf ctx "%s didn't replied to read request in time" nodeId | |
let seqNr = Map.tryFind nodeId state.Observed |> Option.defaultValue 0UL | |
let p = Map.find nodeId replicatingNodes | |
p.Endpoint <! Replicate(seqNr+1UL, 100, state.Version, ctx.Self) | |
let timeout = ctx.Schedule recoverTimeout ctx.Self (ReplicateTimeout nodeId) | |
let prog = Map.add nodeId { p with Timeout = timeout } replicatingNodes | |
return! active db state prog ctx | |
| Command(cmd) -> | |
let sender = ctx.Sender() | |
let seqNr = state.SeqNr + 1UL | |
let version = Version.inc state.Id state.Version | |
let data = crdt.Prepare(state.Crdt, cmd) // handle the command, produce event | |
let event = { Origin = state.Id; OriginSeqNr = seqNr; LocalSeqNr = seqNr; Version = version; Data = data } | |
let ncrdt = crdt.Effect(state.Crdt, event) // update the state with produced event | |
let nstate = { state with Version = version; SeqNr = seqNr; Crdt = ncrdt } | |
// store new event atomically with updated state | |
do! db.SaveEvents [event] | |
logDebugf ctx "stored event %O in a database" event | |
sender <! crdt.Query ncrdt // send updated materialized CRDT state back to the sender | |
return! active db { nstate with IsDirty = true } replicatingNodes ctx | |
| Connect(nodeId, endpoint) -> | |
// connect with the remote replica, and start synchronizing with it | |
let seqNr = Map.tryFind nodeId state.Observed |> Option.defaultValue 0UL | |
endpoint <! Replicate(seqNr+1UL, 100, state.Version, ctx.Self) | |
logDebugf ctx "connected with replica %s. Sending read request starting from %i" nodeId (seqNr+1UL) | |
let timeout = ctx.Schedule recoverTimeout ctx.Self (ReplicateTimeout nodeId) | |
return! active db state (Map.add nodeId { Endpoint = endpoint; Timeout = timeout } replicatingNodes) ctx | |
| Snapshot when state.IsDirty -> | |
logDebugf ctx "Snapshot triggered" | |
let nstate = { state with IsDirty = false } | |
do! db.SaveSnapshot nstate | |
return! active db nstate replicatingNodes ctx | |
| _ -> return Unhandled | |
} | |
let rec recovering (db: Db) (ctx: Actor<_>) = actor { | |
match! ctx.Receive() with | |
| Loaded state -> | |
logDebugf ctx "Recovery phase done with state: %O" state | |
ctx.UnstashAll() | |
let interval = TimeSpan.FromSeconds 5. | |
ctx.ScheduleRepeatedly interval interval ctx.Self Snapshot |> ignore | |
return! active db state Map.empty ctx | |
| _ -> | |
// stash all other operations until recovery is complete | |
ctx.Stash() | |
return! recovering db ctx | |
} | |
async { | |
// load state from DB snapshot or create a new empty one | |
let! snapshot = db.LoadSnapshot() | |
let mutable state = snapshot |> Option.defaultValue (ReplicationState.create id crdt.Default) | |
// apply all events that happened since snapshot has been made | |
for event in db.LoadEvents (state.SeqNr + 1UL) do | |
state <- { state with | |
Crdt = crdt.Effect(state.Crdt, event) | |
SeqNr = event.LocalSeqNr | |
Version = Version.merge event.Version state.Version | |
Observed = Map.add event.Origin event.OriginSeqNr state.Observed } | |
ctx.Self <! Loaded state | |
} |> Async.Start | |
recovering db ctx | |
[<RequireQualifiedAccess>] | |
module Counter = | |
let private crdt = | |
{ new Crdt<int64,int64,int64,int64> with | |
member _.Default = 0L | |
member _.Query crdt = crdt | |
member _.Prepare(_, op) = op | |
member _.Effect(counter, e) = counter + e.Data } | |
/// Used to create replication endpoint handling operation-based Counter protocol. | |
let props db replica ctx = replicator crdt db replica ctx | |
/// Increment counter maintainer by given `ref` endpoint by a given delta (can be negative). | |
let inc (by: int64) (ref: Endpoint<int64,int64,int64>) : Async<int64> = ref <? Command by | |
/// Retrieve the current state of the counter maintained by the given `ref` endpoint. | |
let query (ref: Endpoint<int64,int64,int64>) : Async<int64> = ref <? Query | |
[<RequireQualifiedAccess>] | |
module ORSet = | |
type ORSet<'a> when 'a: comparison = Set<'a * VTime> | |
type Command<'a> = | |
| Add of 'a | |
| Remove of 'a | |
type Operation<'a> = | |
| Added of 'a | |
| Removed of Set<VTime> | |
type Endpoint<'a> when 'a: comparison = Endpoint<ORSet<'a>, Command<'a>, Operation<'a>> | |
let private crdt : Crdt<ORSet<'a>, Set<'a>, Command<'a>, Operation<'a>> = | |
{ new Crdt<_,_,_,_> with | |
member _.Default = Set.empty | |
member _.Query(orset) = orset |> Set.map fst | |
member _.Prepare(orset, cmd) = | |
match cmd with | |
| Add item -> Added(item) | |
| Remove item -> | |
let timestamps = | |
orset | |
|> Set.filter (fun (i, _) -> i = item) | |
|> Set.map snd | |
Removed timestamps | |
member _.Effect(orset, e) = | |
match e.Data with | |
| Added(item) -> Set.add (item, e.Version) orset | |
| Removed(versions) -> orset |> Set.filter (fun (_, ts) -> not (Set.contains ts versions)) } | |
/// Used to create replication endpoint handling operation-based ORSet protocol. | |
let props db replicaId ctx = replicator crdt db replicaId ctx | |
/// Add new `item` into an ORSet maintained by the given `ref` endpoint. In case of add/remove conflicts add wins. | |
let add (item: 'a) (ref: Endpoint<'a>) : Async<Set<'a>> = ref <? Command (Add item) | |
/// Remove an `item` from the ORSet maintained by the given `ref` endpoint. In case of add/remove conflicts add wins. | |
let remove (item: 'a) (ref: Endpoint<'a>) : Async<Set<'a>> = ref <? Command (Remove item) | |
/// Retrieve the current state of the ORSet maintained by the given `ref` endpoint. | |
let query (ref: Endpoint<'a>) : Async<Set<'a>> = ref <? Query | |
[<RequireQualifiedAccess>] | |
module LWWRegister = | |
[<Struct>] | |
type LWWRegister<'a> = | |
{ Timestamp: struct(DateTime * ReplicaId) | |
Value: 'a voption } | |
type Operation<'a> = DateTime * 'a voption | |
type Endpoint<'a> = Endpoint<LWWRegister<'a>, 'a voption, Operation<'a>> | |
let private crdt : Crdt<LWWRegister<'a>, 'a voption, 'a voption, Operation<'a>> = | |
{ new Crdt<_,_,_,_> with | |
member _.Default = { Timestamp = struct(DateTime.MinValue, ""); Value = ValueNone } | |
member _.Query crdt = crdt.Value | |
member _.Prepare(_, value) = (DateTime.UtcNow, value) | |
member _.Effect(existing, e) = | |
let (at, value) = e.Data | |
let timestamp = struct(at, e.Origin) | |
if existing.Timestamp < timestamp then | |
{ existing with Timestamp = timestamp; Value = value } | |
else existing } | |
let props db replica ctx = replicator crdt db replica ctx | |
let updte (value: 'a voption) (ref: Endpoint<'a>) : Async<'a voption> = ref <? Command value | |
let query (ref: Endpoint<'a>) : Async<'a voption> = ref <? Query | |
[<RequireQualifiedAccess>] | |
module MVRegister = | |
type MVRegister<'a> = (VTime * 'a voption) list | |
type Endpoint<'a> = Endpoint<MVRegister<'a>, 'a voption, 'a voption> | |
let private crdt : Crdt<MVRegister<'a>, 'a list, 'a voption, 'a voption> = | |
{ new Crdt<_,_,_,_> with | |
member _.Default = [] | |
member _.Query crdt = | |
crdt | |
|> List.choose (function (_, ValueSome v) -> Some v | _ -> None) | |
member _.Prepare(_, value) = value | |
member _.Effect(existing, e) = | |
let concurrent = | |
existing | |
|> List.filter (fun (vt, _) -> Version.compare vt e.Version = Ord.Cc) | |
(e.Version, e.Data)::concurrent } | |
let props db replica ctx = replicator crdt db replica ctx | |
let updte (value: 'a voption) (ref: Endpoint<'a>) : Async<'a voption> = ref <? Command value | |
let query (ref: Endpoint<'a>) : Async<'a voption> = ref <? Query | |
[<RequireQualifiedAccess>] | |
module Rga = | |
/// Virtual index - while physical index of an element in RGA changes as new elements are appended or removed, | |
/// a virtual index always stays the same. It allows tracking the item position over time. | |
type VPtr = (int * ReplicaId) | |
type Vertex<'a> = (VPtr * 'a option) | |
type Rga<'a> = | |
{ Sequencer: VPtr | |
Vertices: Vertex<'a>[] } | |
type Command<'a> = | |
| Insert of index:int * value:'a | |
| RemoveAt of index:int | |
type Operation<'a> = | |
| Inserted of after:VPtr * at:VPtr * value:'a | |
| Removed of at:VPtr | |
/// Checks if given vertex has been tombstoned. | |
let inline isTombstone (_, data) = Option.isNone data | |
/// Maps user-given index (which ignores tombstones) into physical index inside of `vertices` array. | |
let private indexWithTombstones index vertices = | |
let rec loop offset remaining (vertices: Vertex<'a>[]) = | |
if remaining = 0 then offset | |
elif isTombstone vertices.[offset] then loop (offset+1) remaining vertices // skip over tombstones | |
else loop (offset+1) (remaining-1) vertices | |
loop 1 index vertices // skip head as it's always tombstoned (it serves as reference point) | |
/// Maps user-given VIndex into physical index inside of `vertices` array. | |
let private indexOfVPtr ptr vertices = | |
let rec loop offset ptr (vertices: Vertex<'a>[]) = | |
if ptr = fst vertices.[offset] then offset | |
else loop (offset+1) ptr vertices | |
loop 0 ptr vertices | |
/// Recursively checks if the next vertex on the right of a given `offset` | |
/// has position higher than `pos` at if so, shift offset to the right. | |
/// | |
/// By design, when doing concurrent inserts, we skip over elements on the right | |
/// if their Position is higher than Position of inserted element. | |
let rec private shift offset ptr (vertices: Vertex<'a>[]) = | |
if offset >= vertices.Length then offset // append at the end | |
else | |
let (next, _) = vertices.[offset] | |
if next < ptr then offset | |
else shift (offset+1) ptr vertices // move insertion point to the right | |
/// Increments given sequence number. | |
let inline private nextSeqNr ((i, id): VPtr) : VPtr = (i+1, id) | |
let private createInserted i value rga = | |
let index = indexWithTombstones i rga.Vertices // start from 1 to skip header vertex | |
let prev = fst rga.Vertices.[index-1] // get VPtr of previous element or RGA's head | |
let at = nextSeqNr rga.Sequencer | |
Inserted(prev, at, value) | |
let private createRemoved i rga = | |
let index = indexWithTombstones i rga.Vertices // start from 1 to skip header vertex | |
let at = fst rga.Vertices.[index] // get VPtr of a previous element | |
Removed at | |
let private applyInserted (predecessor: VPtr) (ptr: VPtr) value rga = | |
// find index where predecessor vertex can be found | |
let predecessorIdx = indexOfVPtr predecessor rga.Vertices | |
// adjust index where new vertex is to be inserted | |
let insertIdx = shift (predecessorIdx+1) ptr rga.Vertices | |
// update RGA to store the highest observed sequence number | |
let (seqNr, replicaId) = rga.Sequencer | |
let nextSeqNr = (max (fst ptr) seqNr, replicaId) | |
let newVertices = Array.insert insertIdx (ptr, Some value) rga.Vertices | |
{ Sequencer = nextSeqNr; Vertices = newVertices } | |
let private applyRemoved ptr rga = | |
// find index where removed vertex can be found and clear its content to tombstone it | |
let index = indexOfVPtr ptr rga.Vertices | |
let (at, _) = rga.Vertices.[index] | |
{ rga with Vertices = Array.replace index (at, None) rga.Vertices } | |
let private crdt (replicaId: ReplicaId) : Crdt<Rga<'a>, 'a[], Command<'a>, Operation<'a>> = | |
{ new Crdt<_,_,_,_> with | |
member _.Default = { Sequencer = (0,replicaId); Vertices = [| ((0,""), None) |] } | |
member _.Query rga = rga.Vertices |> Array.choose snd | |
member _.Prepare(rga, cmd) = | |
match cmd with | |
| Insert(i, value) -> createInserted i value rga | |
| RemoveAt(i) -> createRemoved i rga | |
member _.Effect(rga, e) = | |
match e.Data with | |
| Inserted(predecessor, ptr, value) -> applyInserted predecessor ptr value rga | |
| Removed(ptr) -> applyRemoved ptr rga | |
} | |
type Endpoint<'a> = Endpoint<Rga<'a>, Command<'a>, Operation<'a>> | |
/// Used to create replication endpoint handling operation-based RGA protocol. | |
let props db replicaId ctx = replicator (crdt replicaId) db replicaId ctx | |
/// Inserts an `item` at given index. To insert at head use 0 index, | |
/// to push back to a tail of sequence insert at array length. | |
let insert (index: int) (item: 'a) (ref: Endpoint<'a>) : Async<'a[]> = ref <? Command (Insert(index, item)) | |
/// Removes item stored at a provided `index`. | |
let removeAt (index: int) (ref: Endpoint<'a>) : Async<'a[]> = ref <? Command (RemoveAt index) | |
/// Retrieve an array of elements maintained by the given `ref` endpoint. | |
let query (ref: Endpoint<'a>) : Async<'a[]> = ref <? Query | |
/// Block-wise RGA. It exposes operations for adding/removing multiple elements at once. | |
[<RequireQualifiedAccess>] | |
module BWRga = | |
type Position = (int * ReplicaId) | |
[<Struct>] | |
type PositionOffset = | |
{ Position: Position; Offset: int } | |
override this.ToString () = sprintf "(%i%s:%i)" (fst this.Position) (snd this.Position) this.Offset | |
[<Struct>] | |
type Content<'a> = | |
| Content of content:'a[] | |
| Tombstone of skipped:int | |
member this.Slice(offset: int) = | |
match this with | |
| Content data -> | |
let left = Array.take offset data | |
let right = Array.skip offset data | |
(Content left, Content right) | |
| Tombstone length -> | |
(Tombstone offset, Tombstone (length - offset)) | |
type Block<'a> = | |
{ Ptr: PositionOffset | |
//TODO: in this approach Block contains both user data and CRDT metadata, it's possible | |
// however to split these appart and all slicing manipulations can be performed on blocks | |
// alone. In this case Query method could return an user data right away with no extra | |
// modifications, while the user-content could be stored in optimized structure such as Rope, | |
// instead of deeply cloned arrays used here. | |
Data: Content<'a> } | |
member this.Length = | |
match this.Data with | |
| Content data -> data.Length | |
| Tombstone skipped -> skipped | |
override this.ToString() = | |
sprintf "%O -> %A" this.Ptr this.Data | |
[<RequireQualifiedAccess>] | |
module Block = | |
let tombstone (block: Block<'a>) = { block with Data = Tombstone block.Length } | |
let isTombstone (block: Block<'a>) = match block.Data with Tombstone _ -> true | _ -> false | |
let split (offset) (block: Block<'a>) = | |
if offset = block.Length then (block, None) | |
else | |
let ptr = block.Ptr | |
let (a, b) = block.Data.Slice offset | |
let left = { block with Data = a } | |
let right = { Ptr = { ptr with Offset = ptr.Offset + offset }; Data = b } | |
(left, Some right) | |
type Rga<'a> = | |
{ Sequencer: Position | |
Blocks: Block<'a>[] } | |
type Command<'a> = | |
| Insert of index:int * 'a[] | |
| RemoveAt of index:int * count:int | |
type Operation<'a> = | |
| Inserted of after:PositionOffset * at:Position * value:'a[] | |
| Removed of slices:(PositionOffset*int) list | |
/// Given user-aware index, return an index of a block and position inside of that block, | |
/// which matches provided index. | |
let private findByIndex idx blocks = | |
let rec loop currentIndex consumed (idx: int) (blocks: Block<'a>[]) = | |
if idx = consumed then (currentIndex, 0) | |
else | |
let block = blocks.[currentIndex] | |
if Block.isTombstone block then | |
loop (currentIndex+1) consumed idx blocks | |
else | |
let remaining = idx - consumed | |
if remaining <= block.Length then | |
// we found the position somewhere in the block | |
(currentIndex, remaining) | |
else | |
// move to the next block with i shortened by current block length | |
loop (currentIndex + 1) (consumed + block.Length) idx blocks | |
loop 0 0 idx blocks | |
let private findByPositionOffset ptr blocks = | |
let rec loop idx ptr (blocks: Block<'a>[]) = | |
let block = blocks.[idx] | |
if block.Ptr.Position = ptr.Position then | |
if block.Ptr.Offset + block.Length >= ptr.Offset then (idx, ptr.Offset-block.Ptr.Offset) | |
else loop (idx+1) ptr blocks | |
else loop (idx+1) ptr blocks | |
loop 0 ptr blocks | |
/// Recursively check if the next vertex on the right of a given `offset` | |
/// has position higher than `pos` at if so, shift offset to the right. | |
let rec private shift offset pos (blocks: Block<'a>[]) = | |
if offset >= blocks.Length then offset // append at the tail | |
else | |
let next = blocks.[offset].Ptr.Position | |
if next < pos then offset | |
else shift (offset+1) pos blocks // move insertion point to the right | |
/// Increments given sequence number. | |
let inline private nextSeqNr ((i, id): Position) : Position = (i+1, id) | |
let private sliceBlocks start count blocks = | |
let rec loop acc idx offset remaining (blocks: Block<'a>[]) = | |
let block = blocks.[idx] | |
let ptr = block.Ptr | |
let ptr = { ptr with Offset = ptr.Offset + offset } | |
let len = block.Length - offset | |
if len > remaining then (ptr, remaining)::acc | |
elif len = 0 then loop acc (idx+1) 0 remaining blocks // skip over empty blocks | |
else loop ((ptr, len)::acc) (idx+1) 0 (remaining-len) blocks | |
let (first, offset) = findByIndex start blocks | |
loop [] first offset count blocks |> List.rev | |
let private filterBlocks slices blocks = | |
let rec loop (acc: ResizeArray<Block<'a>>) idx slices (blocks: Block<'a>[]) = | |
match slices with | |
| [] -> | |
for i=idx to blocks.Length-1 do | |
acc.Add blocks.[i] // copy over remaining blocks | |
acc.ToArray() | |
| (ptr, length)::tail -> | |
let block = blocks.[idx] | |
if block.Ptr.Position = ptr.Position then // we found valid block | |
let currLen = block.Length | |
if block.Ptr.Offset = ptr.Offset then // the beginning of deleted block was found | |
if currLen = length then // deleted block exactly matches bounds | |
acc.Add (Block.tombstone block) | |
loop acc (idx+1) tail blocks | |
elif currLen < length then // deleted block is longer, delete current one and keep remainder | |
acc.Add (Block.tombstone block) | |
let ptr = { ptr with Offset = ptr.Offset + currLen } | |
loop acc (idx+1) ((ptr, length-currLen)::tail) blocks | |
else // deleted block is shorter, we need to split current block and tombstone left side | |
let (left, Some right) = Block.split length block | |
acc.Add (Block.tombstone left) | |
acc.Add right | |
loop acc (idx+1) tail blocks | |
elif block.Ptr.Offset < ptr.Offset && block.Ptr.Offset + currLen > ptr.Offset then // the deleted block starts inside of a current one | |
let splitPoint = ptr.Offset - block.Ptr.Offset | |
let (left, Some right) = Block.split splitPoint block | |
acc.Add left | |
if length > right.Length then // remainer is longer than right, we need to subtract it and keep around | |
let remainer = length - right.Length | |
acc.Add (Block.tombstone right) | |
let pos = { ptr with Offset = right.Ptr.Offset + right.Length } | |
loop acc (idx+1) ((pos, remainer)::tail) blocks | |
else | |
let (del, right) = Block.split length right | |
acc.Add (Block.tombstone del) | |
right |> Option.iter acc.Add | |
loop acc (idx+1) tail blocks | |
else // position ID is correct but offset doesn't fit, we need to move on | |
acc.Add block | |
loop acc (idx+1) slices blocks | |
else | |
acc.Add block | |
loop acc (idx+1) slices blocks | |
loop (ResizeArray()) 1 slices blocks | |
let private crdt (replicaId: ReplicaId) : Crdt<Rga<'a>, 'a[], Command<'a>, Operation<'a>> = | |
{ new Crdt<_,_,_,_> with | |
member _.Default = | |
let head = { Ptr = { Position = (0,""); Offset = 0 }; Data = Tombstone 0 } | |
{ Sequencer = (0,replicaId); Blocks = [| head |] } | |
member _.Query rga = rga.Blocks |> Array.collect (fun block -> match block.Data with Content data -> data | _ -> [||]) | |
member _.Prepare(rga, cmd) = | |
match cmd with | |
| Insert(idx, slice) -> | |
let (index, offset) = findByIndex idx rga.Blocks | |
let ptr = rga.Blocks.[index].Ptr | |
let at = nextSeqNr rga.Sequencer | |
Inserted({ ptr with Offset = ptr.Offset+offset }, at, slice) | |
| RemoveAt(idx, count) -> | |
let slices = sliceBlocks idx count rga.Blocks | |
Removed slices | |
member _.Effect(rga, e) = | |
match e.Data with | |
| Inserted(after, at, slice) -> | |
let (index, split) = findByPositionOffset after rga.Blocks | |
let indexAdjusted = shift (index+1) at rga.Blocks | |
let block = rga.Blocks.[index] | |
let newBlock = { Ptr = { Position = at; Offset = 0}; Data = Content slice } | |
let (left, right) = Block.split split block | |
let (seqNr, replicaId) = rga.Sequencer | |
let nextSeqNr = (max (fst at) seqNr, replicaId) | |
let blocks = | |
rga.Blocks | |
|> Array.replace index left | |
|> Array.insert indexAdjusted newBlock | |
match right with | |
| Some right -> | |
let blocks = blocks |> Array.insert (indexAdjusted+1) right | |
{ Sequencer = nextSeqNr; Blocks = blocks } | |
| None -> | |
{ Sequencer = nextSeqNr; Blocks = blocks } | |
| Removed(slices) -> | |
let blocks = filterBlocks slices rga.Blocks | |
{ rga with Blocks = blocks } | |
} | |
type Endpoint<'a> = Endpoint<Rga<'a>, Command<'a>, Operation<'a>> | |
/// Used to create replication endpoint handling operation-based RGA protocol. | |
let props db replicaId ctx = replicator (crdt replicaId) db replicaId ctx | |
let insertRange (index: int) (slice: 'a[]) (ref: Endpoint<'a>) : Async<'a[]> = ref <? Command (Insert(index, slice)) | |
let removeRange (index: int) (count: int) (ref: Endpoint<'a>) : Async<'a[]> = ref <? Command (RemoveAt(index, count)) | |
/// Retrieve the current state of the RGA maintained by the given `ref` endpoint. | |
let query (ref: Endpoint<'a>) : Async<'a[]> = ref <? Query | |
[<RequireQualifiedAccess>] | |
module LSeq = | |
[<Struct;CustomComparison;CustomEquality>] | |
type VPtr = | |
{ Sequence: byte[]; Id: ReplicaId } | |
override this.ToString() = | |
String.Join('.', this.Sequence) + ":" + string this.Id | |
member this.CompareTo(other) = | |
// apply lexical comparison of sequence elements | |
let len = min this.Sequence.Length other.Sequence.Length | |
let mutable i = 0 | |
let mutable cmp = 0 | |
while cmp = 0 && i < len do | |
cmp <- this.Sequence.[i].CompareTo other.Sequence.[i] | |
i <- i + 1 | |
if cmp = 0 then | |
// one of the sequences is subsequence of another one, | |
// compare their lengths (cause maybe they're the same) | |
// then compare replica ids | |
cmp <- this.Sequence.Length - other.Sequence.Length | |
if cmp = 0 then this.Id.CompareTo other.Id else cmp | |
else cmp | |
interface IComparable<VPtr> with member this.CompareTo other = this.CompareTo other | |
interface IComparable with member this.CompareTo other = match other with :? VPtr as vptr -> this.CompareTo(vptr) | |
interface IEquatable<VPtr> with member this.Equals other = this.CompareTo other = 0 | |
type Vertex<'a> = (VPtr * 'a) | |
type LSeq<'a> = Vertex<'a>[] | |
type Command<'a> = | |
| Insert of index:int * value:'a | |
| RemoveAt of index:int | |
type Operation<'a> = | |
| Inserted of at:VPtr * value:'a | |
| Removed of at:VPtr | |
/// Binary search for index of `vptr` in an ordered sequence, looking for a place to insert | |
/// an element. If `vptr` is the lowest element, 0 will be returned. If it's the highest | |
/// one: lseq.Length will be returned. | |
let private binarySearch vptr (lseq: LSeq<_>) = | |
let mutable i = 0 | |
let mutable j = lseq.Length | |
while i < j do | |
let half = (i + j) / 2 | |
if vptr >= fst lseq.[half] then i <- half + 1 | |
else j <- half | |
i | |
/// Generates a byte sequence that - ordered lexically - would fit between `lo` and `hi`. | |
let private generateSeq (lo: byte[]) (hi: byte[]) = | |
let rec loop (acc: ResizeArray<byte>) i (lo: byte[]) (hi: byte[]) = | |
let min = if i >= lo.Length then 0uy else lo.[i] | |
let max = if i >= hi.Length then 255uy else hi.[i] | |
if min + 1uy < max then | |
acc.Add (min + 1uy) | |
acc.ToArray() | |
else | |
acc.Add min | |
loop acc (i+1) lo hi | |
loop (ResizeArray (min lo.Length hi.Length)) 0 lo hi | |
let private crdt (replicaId: ReplicaId) : Crdt<LSeq<'a>, 'a[], Command<'a>, Operation<'a>> = | |
{ new Crdt<_,_,_,_> with | |
member _.Default = [||] | |
member _.Query lseq = lseq |> Array.map snd | |
member _.Prepare(lseq, cmd) = | |
match cmd with | |
| Insert(i, value) -> | |
let left = if Array.isEmpty lseq || i = 0 then [||] else (fst lseq.[i-1]).Sequence | |
let right = if i = lseq.Length then [||] else (fst lseq.[i]).Sequence | |
let ptr = { Sequence = generateSeq left right; Id = replicaId } | |
Inserted(ptr, value) | |
| RemoveAt(i) -> Removed(fst lseq.[i]) | |
member _.Effect(lseq, e) = | |
match e.Data with | |
| Inserted(ptr, value) -> | |
let idx = binarySearch ptr lseq | |
Array.insert idx (ptr, value) lseq | |
| Removed(ptr) -> | |
let idx = binarySearch ptr lseq | |
Array.removeAt idx lseq | |
} | |
type Endpoint<'a> = Endpoint<LSeq<'a>, Command<'a>, Operation<'a>> | |
/// Used to create replication endpoint handling operation-based RGA protocol. | |
let props db replicaId ctx = replicator (crdt replicaId) db replicaId ctx | |
/// Inserts an `item` at given index. To insert at head use 0 index, | |
/// to push back to a tail of sequence insert at array length. | |
let insert (index: int) (item: 'a) (ref: Endpoint<'a>) : Async<'a[]> = ref <? Command (Insert(index, item)) | |
/// Removes item stored at a provided `index`. | |
let removeAt (index: int) (ref: Endpoint<'a>) : Async<'a[]> = ref <? Command (RemoveAt index) | |
/// Retrieve an array of elements maintained by the given `ref` endpoint. | |
let query (ref: Endpoint<'a>) : Async<'a[]> = ref <? Query | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I came from your great blog post. Thanks for sharing the code too.
I have a question on unseen function.
Shouldn't e.OriginSeqNr be e.LocalSeqNr because
Observed
keeps the maximum oflocally
generated seq numbers for each replica.I am afraid the code leads to unseen events being treated as seen when
e.OriginSeqNr <= ver <= e.LocalSeqNr
.I am new to F# and CRDT as well so there might be some misunderstanding...