Skip to content

Instantly share code, notes, and snippets.

@Horusiath
Last active December 26, 2023 12:21
Show Gist options
  • Save Horusiath/86f60b5a247a6f5be5242950dd3733b5 to your computer and use it in GitHub Desktop.
Save Horusiath/86f60b5a247a6f5be5242950dd3733b5 to your computer and use it in GitHub Desktop.
A simple Reliable Causal Broadcast implementation using F# and Akka.NET
module Program =
type InMemoryDb(replica: ReplicaId) =
let snapshot = ref null
let mutable events : Map<uint64,obj> = Map.empty
interface Db with
member _.SaveSnapshot state = async { snapshot := (box state) }
member _.LoadSnapshot<'s>() = async {
match !snapshot with
| :? 's as state -> return Some state
| _ -> return None
}
member _.LoadEvents<'e>(from) = asyncSeq {
for (seqNr, e) in Map.toSeq events do
if seqNr >= from then
let casted : Event<'e> = downcast e
yield casted
}
member _.SaveEvents evts = async {
for event in evts do
events <- Map.add event.SeqNr (box event) events
}
let main () =
let sys = System.create "sys" <| Configuration.parse "akka.loglevel = DEBUG"
let a = spawn sys "A" <| props (ORSet.props (InMemoryDb "A") "A")
let b = spawn sys "B" <| props (ORSet.props (InMemoryDb "B") "B")
async {
a <! Connect("B", b)
b <! Connect("A", a)
let! state = ORSet.add 1L a
printfn "State on node A (first): %A" state
do! Async.Sleep 1000
let! state = ORSet.add 2L a
printfn "State on node A (second): %A" state
do! Async.Sleep 5000
let! state = ORSet.query b
printfn "State on node B (after sync): %A" state
let! state = ORSet.remove 2L b
printfn "State on node B (after update): %A" state
do! Async.Sleep 5000
let! state1 = ORSet.query a
let! state2 = ORSet.query b
assert (state1 = state2)
printfn "SUCCESS"
} |> Async.RunSynchronously
0
/// Reliable causal broadcast implementation
module DemoFs.RCB
open Akka.Actor
open Akkling
open System
open FSharp.Control
type ReplicaId = String
type Ord =
| Lt = -1 // lower
| Eq = 0 // equal
| Gt = 1 // greater
| Cc = 2 // concurrent
[<RequireQualifiedAccess>]
module Helpers =
/// Helper method for insert-or-update semantic for Map
let upsert k v fn map =
match Map.tryFind k map with
| None -> Map.add k v map
| Some v -> Map.add k (fn v) map
type VTime = Map<ReplicaId, int64>
[<RequireQualifiedAccess>]
module Version =
let zero: VTime = Map.empty
let inc r (vv: VTime): VTime = vv |> Helpers.upsert r 1L ((+)1L)
let set r ts (vv: VTime): VTime = Map.add r ts vv
let merge (vv1: VTime) (vv2: VTime) =
vv2 |> Map.fold (fun acc k v2 -> Helpers.upsert k v2 (max v2) acc) vv1
let compare (a: VTime) (b: VTime): Ord =
let valOrDefault k map =
match Map.tryFind k map with
| Some v -> v
| None -> 0L
let akeys = a |> Map.toSeq |> Seq.map fst |> Set.ofSeq
let bkeys = b |> Map.toSeq |> Seq.map fst |> Set.ofSeq
(akeys + bkeys)
|> Seq.fold (fun prev k ->
let va = valOrDefault k a
let vb = valOrDefault k b
match prev with
| Ord.Eq when va > vb -> Ord.Gt
| Ord.Eq when va < vb -> Ord.Lt
| Ord.Lt when va > vb -> Ord.Cc
| Ord.Gt when va < vb -> Ord.Cc
| _ -> prev ) Ord.Eq
/// A contaienr for user-defined events of type 'e. It contains metadata necessary
/// to partially order and distribute events in peer-to-peer fashion.
type Event<'e> =
{ /// The replica, on which this event originally was created.
Origin: ReplicaId
/// The sequence number given by the origin replica at the moment of event creation.
/// This allows us to keep track of replication progress with remote replicas even
/// when we didn't received their events directly, but via intermediate replica.
OriginSeqNr: uint64
/// The sequence number given by the local replica. For events created by current replica
/// it's the same as `OriginSeqNr`. For replicated events it's usually higher.
LocalSeqNr: uint64
/// Vector clock which describes happened-before relationships between events from
/// different replicas, enabling to establish partial order among them.
Version: VTime
/// An user-defined event data.
Data: 'e }
override this.ToString() = sprintf "(%s, %i, %i, %A, %O)" this.Origin this.OriginSeqNr this.LocalSeqNr this.Version this.Data
type Endpoint<'s,'c,'e> = IActorRef<Protocol<'s,'c,'e>>
and Protocol<'s,'c,'e> =
/// Attaches another replica to continuously synchronize with current one.
| Connect of replicaId:ReplicaId * Endpoint<'s,'c,'e>
/// Request to read up to `maxCount` events from a given replica starting from `seqNr`. Additionally a `filter`
/// is provided to deduplicate possible events on the sender side (it will be then used second time on receiver side).
/// This message is expected to be replied with `Recovered`, which contains all events satisfying seqNr/filter criteria.
| Replicate of seqNr:uint64 * maxCount:int * filter:VTime * replyTo:Endpoint<'s,'c,'e>
| ReplicateTimeout of ReplicaId
/// Response to `Recover` - must always be send. Empty content notifies about end of event stream. `toSeqNr` informs
/// up to which sequence number this message advanced.
| Replicated of from:ReplicaId * toSeqNr:uint64 * events:Event<'e>[]
/// Request for a state. It should be replied with state being application of `Crdt.Query` over `ReplicationState.Crdt`.
| Query
/// Persists an event into current replica. Replied with updated, materialized state after success.
| Command of 'c
/// Message send at the beginning of recovery phase with the latest persisted snapshot of the state (if there was any)
| Loaded of ReplicationState<'s>
/// Periodic trigger to persist current state snapshot (only performed if state has changed since last snapshot, tracked by IsDirty flag).
| Snapshot
and ReplicationState<'s> =
{ /// Unique identifier of a given replica/node.
Id: ReplicaId
/// Checks if replication state has been modified after being persisted.
IsDirty: bool
/// Counter used to assign unique sequence number for the events to be stored locally.
SeqNr: uint64
/// Version vector describing the last observed event.
Version: VTime
/// Sequence numbers of remote replicas. When synchronizing (via `Recover` message) with remote replicas,
/// we start doing so from the last known sequence numbers we received.
Observed: Map<ReplicaId, uint64>
/// CRDT object that is replicated.
Crdt: 's }
[<RequireQualifiedAccess>]
module ReplicationState =
let inline create (id: ReplicaId) state = { Id = id; IsDirty = false; SeqNr = 0UL; Version = Map.empty; Observed = Map.empty; Crdt = state }
/// Checks if current event has NOT been observed by a replica identified by state. Unseen events are those, which
/// have SeqNr higher than the highest observed sequence number on a given node AND their version vectors were not
/// observed (meaning they are either greater or concurrent to current node version).
let unseen nodeId (state: ReplicationState<'s>) (e: Event<'e>) =
match Map.tryFind nodeId state.Observed with
| Some ver when e.OriginSeqNr <= ver -> false
| _ -> (Version.compare e.Version state.Version) > Ord.Eq
[<Interface>]
type Db =
abstract SaveSnapshot: 's -> Async<unit>
abstract LoadSnapshot: unit -> Async<'s option>
abstract LoadEvents: startSeqNr:uint64 -> AsyncSeq<Event<'e>>
abstract SaveEvents: events:Event<'e> seq -> Async<unit>
/// Use database `cursor` to read up to `count` elements and send them to the `target` as Recovered message.
/// Send only entries that have keys starting with a given `prefix` (eg. events belonging to specific nodeId).
/// Use `filter` to skip events that have been seen by the `target`.
let replay (nodeId: ReplicaId) (filter: VTime) (target: Endpoint<'s,'c,'e>) (events: AsyncSeq<Event<'e>>) (count:int) = async {
let buf = ResizeArray()
let mutable cont = count > 0
let mutable i = 0
let mutable lastSeqNr = 0UL
use cursor = events.GetEnumerator()
while cont do
match! cursor.MoveNext() with
| Some e ->
if Version.compare e.Version filter > Ord.Eq then
buf.Add(e)
i <- i + 1
cont <- i < count
lastSeqNr <- Math.Max(lastSeqNr, e.LocalSeqNr)
| _ -> cont <- false
let events = buf.ToArray()
target <! Replicated(nodeId, lastSeqNr, events)
}
let recoverTimeout = TimeSpan.FromSeconds 5.
type ReplicationStatus<'s,'c,'e> =
{ /// Access point for the remote replica.
Endpoint: Endpoint<'s,'c,'e>
/// Cancellation token for pending `RecoverTimeout`.
Timeout: ICancelable }
[<Interface>]
type Crdt<'crdt,'state,'cmd,'event> =
/// Get a default (zero) value of the CRDT.
abstract Default: 'crdt
/// Given a CRDT state return an actual value that user has interest in. Eg. ORSet still has to carry
/// metadata timestamps, however from user perspective materialized value of ORSet is just ordrinary Set<'a>.
abstract Query: 'crdt -> 'state
/// Equivalent of command handler in eventsourcing analogy.
abstract Prepare: state:'crdt * command:'cmd -> 'event
/// Equivalent of event handler in eventsourcing analogy.
abstract Effect: state:'crdt * event:Event<'event> -> 'crdt
let replicator (crdt: Crdt<'crdt,'state,'cmd,'event>) (db: Db) (id: ReplicaId) (ctx: Actor<Protocol<_,_,_>>) =
/// Cancel last pending `RecoverTimeout` task, and schedule it again.
let refreshTimeouts nodeId progresses (ctx: Actor<_>) =
let p = Map.find nodeId progresses
p.Timeout.Cancel()
let timeout = ctx.Schedule recoverTimeout ctx.Self (ReplicateTimeout nodeId)
Map.add nodeId { p with Timeout = timeout } progresses
let rec active (db: Db) (state: ReplicationState<'crdt>) (replicatingNodes: Map<ReplicaId, ReplicationStatus<'crdt,'cmd,'event>>) (ctx: Actor<_>) = actor {
match! ctx.Receive() with
| Query ->
ctx.Sender() <! crdt.Query state.Crdt
return! active db state replicatingNodes ctx
| Replicate(from, count, filter, sender) ->
logDebugf ctx "received recover request from %s: seqNr=%i, vt=%O" sender.Path.Name from filter
let cursor = db.LoadEvents(from)
replay state.Id filter sender cursor count |> Async.Start
return! active db state replicatingNodes ctx
| Replicated(nodeId, lastSeqNr, [||]) ->
// if we received empty event list, this node is up to date with `nodeId`
// just schedule timeout, so when it happens we ask to Recover again
logDebugf ctx "%s reached end of updates" nodeId
let prog = refreshTimeouts nodeId replicatingNodes ctx
let observedSeqNr = Map.tryFind nodeId state.Observed |> Option.defaultValue 0UL
if lastSeqNr > observedSeqNr then
let nstate = { state with Observed = Map.add nodeId lastSeqNr state.Observed }
do! db.SaveSnapshot(nstate.Id, nstate)
return! active db nstate prog ctx
else
return! active db state prog ctx
| Replicated(nodeId, lastSeqNr, events) ->
let mutable nstate = state
let mutable remoteSeqNr = Map.tryFind nodeId nstate.Observed |> Option.defaultValue 0UL
let toSave = ResizeArray()
// for all events not seen by the current node, rewrite them to use local sequence nr, update the state
// and save them in the database
for e in events |> Array.filter (ReplicationState.unseen nodeId state) do
logDebugf ctx "replicating event %O from replica %s" e nodeId
let seqNr = nstate.SeqNr + 1UL
let version = Version.merge nstate.Version e.Version // update current node version vector
remoteSeqNr <- Math.Max(remoteSeqNr, e.LocalSeqNr) // increment observed remote sequence nr
let nevent = { e with LocalSeqNr = seqNr }
nstate <- { nstate with
Crdt = crdt.Effect(nstate.Crdt, nevent)
SeqNr = seqNr
Version = version
Observed = Map.add nodeId remoteSeqNr nstate.Observed }
toSave.Add nevent
do! db.SaveEvents toSave // save all unseen events together with updated state
//do! db.SaveSnapshot nstate // in practice snapshot should be applied on condition (ideally in the same transaction)
let target = Map.find nodeId replicatingNodes
target.Endpoint <! Replicate(lastSeqNr+1UL, 100, nstate.Version, ctx.Self) // continue syncing
let prog = refreshTimeouts nodeId replicatingNodes ctx
return! active db { nstate with IsDirty = true } prog ctx
| ReplicateTimeout nodeId ->
// if we didn't received Recovered in time or the last one was empty, upon timeout just retry the request
logDebugf ctx "%s didn't replied to read request in time" nodeId
let seqNr = Map.tryFind nodeId state.Observed |> Option.defaultValue 0UL
let p = Map.find nodeId replicatingNodes
p.Endpoint <! Replicate(seqNr+1UL, 100, state.Version, ctx.Self)
let timeout = ctx.Schedule recoverTimeout ctx.Self (ReplicateTimeout nodeId)
let prog = Map.add nodeId { p with Timeout = timeout } replicatingNodes
return! active db state prog ctx
| Command(cmd) ->
let sender = ctx.Sender()
let seqNr = state.SeqNr + 1UL
let version = Version.inc state.Id state.Version
let data = crdt.Prepare(state.Crdt, cmd) // handle the command, produce event
let event = { Origin = state.Id; OriginSeqNr = seqNr; LocalSeqNr = seqNr; Version = version; Data = data }
let ncrdt = crdt.Effect(state.Crdt, event) // update the state with produced event
let nstate = { state with Version = version; SeqNr = seqNr; Crdt = ncrdt }
// store new event atomically with updated state
do! db.SaveEvents [event]
logDebugf ctx "stored event %O in a database" event
sender <! crdt.Query ncrdt // send updated materialized CRDT state back to the sender
return! active db { nstate with IsDirty = true } replicatingNodes ctx
| Connect(nodeId, endpoint) ->
// connect with the remote replica, and start synchronizing with it
let seqNr = Map.tryFind nodeId state.Observed |> Option.defaultValue 0UL
endpoint <! Replicate(seqNr+1UL, 100, state.Version, ctx.Self)
logDebugf ctx "connected with replica %s. Sending read request starting from %i" nodeId (seqNr+1UL)
let timeout = ctx.Schedule recoverTimeout ctx.Self (ReplicateTimeout nodeId)
return! active db state (Map.add nodeId { Endpoint = endpoint; Timeout = timeout } replicatingNodes) ctx
| Snapshot when state.IsDirty ->
logDebugf ctx "Snapshot triggered"
let nstate = { state with IsDirty = false }
do! db.SaveSnapshot nstate
return! active db nstate replicatingNodes ctx
| _ -> return Unhandled
}
let rec recovering (db: Db) (ctx: Actor<_>) = actor {
match! ctx.Receive() with
| Loaded state ->
logDebugf ctx "Recovery phase done with state: %O" state
ctx.UnstashAll()
let interval = TimeSpan.FromSeconds 5.
ctx.ScheduleRepeatedly interval interval ctx.Self Snapshot |> ignore
return! active db state Map.empty ctx
| _ ->
// stash all other operations until recovery is complete
ctx.Stash()
return! recovering db ctx
}
async {
// load state from DB snapshot or create a new empty one
let! snapshot = db.LoadSnapshot()
let mutable state = snapshot |> Option.defaultValue (ReplicationState.create id crdt.Default)
// apply all events that happened since snapshot has been made
for event in db.LoadEvents (state.SeqNr + 1UL) do
state <- { state with
Crdt = crdt.Effect(state.Crdt, event)
SeqNr = event.LocalSeqNr
Version = Version.merge event.Version state.Version
Observed = Map.add event.Origin event.OriginSeqNr state.Observed }
ctx.Self <! Loaded state
} |> Async.Start
recovering db ctx
[<RequireQualifiedAccess>]
module Counter =
let private crdt =
{ new Crdt<int64,int64,int64,int64> with
member _.Default = 0L
member _.Query crdt = crdt
member _.Prepare(_, op) = op
member _.Effect(counter, e) = counter + e.Data }
/// Used to create replication endpoint handling operation-based Counter protocol.
let props db replica ctx = replicator crdt db replica ctx
/// Increment counter maintainer by given `ref` endpoint by a given delta (can be negative).
let inc (by: int64) (ref: Endpoint<int64,int64,int64>) : Async<int64> = ref <? Command by
/// Retrieve the current state of the counter maintained by the given `ref` endpoint.
let query (ref: Endpoint<int64,int64,int64>) : Async<int64> = ref <? Query
[<RequireQualifiedAccess>]
module ORSet =
type ORSet<'a> when 'a: comparison = Set<'a * VTime>
type Command<'a> =
| Add of 'a
| Remove of 'a
type Operation<'a> =
| Added of 'a
| Removed of Set<VTime>
type Endpoint<'a> when 'a: comparison = Endpoint<ORSet<'a>, Command<'a>, Operation<'a>>
let private crdt : Crdt<ORSet<'a>, Set<'a>, Command<'a>, Operation<'a>> =
{ new Crdt<_,_,_,_> with
member _.Default = Set.empty
member _.Query(orset) = orset |> Set.map fst
member _.Prepare(orset, cmd) =
match cmd with
| Add item -> Added(item)
| Remove item ->
let timestamps =
orset
|> Set.filter (fun (i, _) -> i = item)
|> Set.map snd
Removed timestamps
member _.Effect(orset, e) =
match e.Data with
| Added(item) -> Set.add (item, e.Version) orset
| Removed(versions) -> orset |> Set.filter (fun (_, ts) -> not (Set.contains ts versions)) }
/// Used to create replication endpoint handling operation-based ORSet protocol.
let props db replicaId ctx = replicator crdt db replicaId ctx
/// Add new `item` into an ORSet maintained by the given `ref` endpoint. In case of add/remove conflicts add wins.
let add (item: 'a) (ref: Endpoint<'a>) : Async<Set<'a>> = ref <? Command (Add item)
/// Remove an `item` from the ORSet maintained by the given `ref` endpoint. In case of add/remove conflicts add wins.
let remove (item: 'a) (ref: Endpoint<'a>) : Async<Set<'a>> = ref <? Command (Remove item)
/// Retrieve the current state of the ORSet maintained by the given `ref` endpoint.
let query (ref: Endpoint<'a>) : Async<Set<'a>> = ref <? Query
[<RequireQualifiedAccess>]
module LWWRegister =
[<Struct>]
type LWWRegister<'a> =
{ Timestamp: struct(DateTime * ReplicaId)
Value: 'a voption }
type Operation<'a> = DateTime * 'a voption
type Endpoint<'a> = Endpoint<LWWRegister<'a>, 'a voption, Operation<'a>>
let private crdt : Crdt<LWWRegister<'a>, 'a voption, 'a voption, Operation<'a>> =
{ new Crdt<_,_,_,_> with
member _.Default = { Timestamp = struct(DateTime.MinValue, ""); Value = ValueNone }
member _.Query crdt = crdt.Value
member _.Prepare(_, value) = (DateTime.UtcNow, value)
member _.Effect(existing, e) =
let (at, value) = e.Data
let timestamp = struct(at, e.Origin)
if existing.Timestamp < timestamp then
{ existing with Timestamp = timestamp; Value = value }
else existing }
let props db replica ctx = replicator crdt db replica ctx
let updte (value: 'a voption) (ref: Endpoint<'a>) : Async<'a voption> = ref <? Command value
let query (ref: Endpoint<'a>) : Async<'a voption> = ref <? Query
[<RequireQualifiedAccess>]
module MVRegister =
type MVRegister<'a> = (VTime * 'a voption) list
type Endpoint<'a> = Endpoint<MVRegister<'a>, 'a voption, 'a voption>
let private crdt : Crdt<MVRegister<'a>, 'a list, 'a voption, 'a voption> =
{ new Crdt<_,_,_,_> with
member _.Default = []
member _.Query crdt =
crdt
|> List.choose (function (_, ValueSome v) -> Some v | _ -> None)
member _.Prepare(_, value) = value
member _.Effect(existing, e) =
let concurrent =
existing
|> List.filter (fun (vt, _) -> Version.compare vt e.Version = Ord.Cc)
(e.Version, e.Data)::concurrent }
let props db replica ctx = replicator crdt db replica ctx
let updte (value: 'a voption) (ref: Endpoint<'a>) : Async<'a voption> = ref <? Command value
let query (ref: Endpoint<'a>) : Async<'a voption> = ref <? Query
[<RequireQualifiedAccess>]
module Rga =
/// Virtual index - while physical index of an element in RGA changes as new elements are appended or removed,
/// a virtual index always stays the same. It allows tracking the item position over time.
type VPtr = (int * ReplicaId)
type Vertex<'a> = (VPtr * 'a option)
type Rga<'a> =
{ Sequencer: VPtr
Vertices: Vertex<'a>[] }
type Command<'a> =
| Insert of index:int * value:'a
| RemoveAt of index:int
type Operation<'a> =
| Inserted of after:VPtr * at:VPtr * value:'a
| Removed of at:VPtr
/// Checks if given vertex has been tombstoned.
let inline isTombstone (_, data) = Option.isNone data
/// Maps user-given index (which ignores tombstones) into physical index inside of `vertices` array.
let private indexWithTombstones index vertices =
let rec loop offset remaining (vertices: Vertex<'a>[]) =
if remaining = 0 then offset
elif isTombstone vertices.[offset] then loop (offset+1) remaining vertices // skip over tombstones
else loop (offset+1) (remaining-1) vertices
loop 1 index vertices // skip head as it's always tombstoned (it serves as reference point)
/// Maps user-given VIndex into physical index inside of `vertices` array.
let private indexOfVPtr ptr vertices =
let rec loop offset ptr (vertices: Vertex<'a>[]) =
if ptr = fst vertices.[offset] then offset
else loop (offset+1) ptr vertices
loop 0 ptr vertices
/// Recursively checks if the next vertex on the right of a given `offset`
/// has position higher than `pos` at if so, shift offset to the right.
///
/// By design, when doing concurrent inserts, we skip over elements on the right
/// if their Position is higher than Position of inserted element.
let rec private shift offset ptr (vertices: Vertex<'a>[]) =
if offset >= vertices.Length then offset // append at the end
else
let (next, _) = vertices.[offset]
if next < ptr then offset
else shift (offset+1) ptr vertices // move insertion point to the right
/// Increments given sequence number.
let inline private nextSeqNr ((i, id): VPtr) : VPtr = (i+1, id)
let private createInserted i value rga =
let index = indexWithTombstones i rga.Vertices // start from 1 to skip header vertex
let prev = fst rga.Vertices.[index-1] // get VPtr of previous element or RGA's head
let at = nextSeqNr rga.Sequencer
Inserted(prev, at, value)
let private createRemoved i rga =
let index = indexWithTombstones i rga.Vertices // start from 1 to skip header vertex
let at = fst rga.Vertices.[index] // get VPtr of a previous element
Removed at
let private applyInserted (predecessor: VPtr) (ptr: VPtr) value rga =
// find index where predecessor vertex can be found
let predecessorIdx = indexOfVPtr predecessor rga.Vertices
// adjust index where new vertex is to be inserted
let insertIdx = shift (predecessorIdx+1) ptr rga.Vertices
// update RGA to store the highest observed sequence number
let (seqNr, replicaId) = rga.Sequencer
let nextSeqNr = (max (fst ptr) seqNr, replicaId)
let newVertices = Array.insert insertIdx (ptr, Some value) rga.Vertices
{ Sequencer = nextSeqNr; Vertices = newVertices }
let private applyRemoved ptr rga =
// find index where removed vertex can be found and clear its content to tombstone it
let index = indexOfVPtr ptr rga.Vertices
let (at, _) = rga.Vertices.[index]
{ rga with Vertices = Array.replace index (at, None) rga.Vertices }
let private crdt (replicaId: ReplicaId) : Crdt<Rga<'a>, 'a[], Command<'a>, Operation<'a>> =
{ new Crdt<_,_,_,_> with
member _.Default = { Sequencer = (0,replicaId); Vertices = [| ((0,""), None) |] }
member _.Query rga = rga.Vertices |> Array.choose snd
member _.Prepare(rga, cmd) =
match cmd with
| Insert(i, value) -> createInserted i value rga
| RemoveAt(i) -> createRemoved i rga
member _.Effect(rga, e) =
match e.Data with
| Inserted(predecessor, ptr, value) -> applyInserted predecessor ptr value rga
| Removed(ptr) -> applyRemoved ptr rga
}
type Endpoint<'a> = Endpoint<Rga<'a>, Command<'a>, Operation<'a>>
/// Used to create replication endpoint handling operation-based RGA protocol.
let props db replicaId ctx = replicator (crdt replicaId) db replicaId ctx
/// Inserts an `item` at given index. To insert at head use 0 index,
/// to push back to a tail of sequence insert at array length.
let insert (index: int) (item: 'a) (ref: Endpoint<'a>) : Async<'a[]> = ref <? Command (Insert(index, item))
/// Removes item stored at a provided `index`.
let removeAt (index: int) (ref: Endpoint<'a>) : Async<'a[]> = ref <? Command (RemoveAt index)
/// Retrieve an array of elements maintained by the given `ref` endpoint.
let query (ref: Endpoint<'a>) : Async<'a[]> = ref <? Query
/// Block-wise RGA. It exposes operations for adding/removing multiple elements at once.
[<RequireQualifiedAccess>]
module BWRga =
type Position = (int * ReplicaId)
[<Struct>]
type PositionOffset =
{ Position: Position; Offset: int }
override this.ToString () = sprintf "(%i%s:%i)" (fst this.Position) (snd this.Position) this.Offset
[<Struct>]
type Content<'a> =
| Content of content:'a[]
| Tombstone of skipped:int
member this.Slice(offset: int) =
match this with
| Content data ->
let left = Array.take offset data
let right = Array.skip offset data
(Content left, Content right)
| Tombstone length ->
(Tombstone offset, Tombstone (length - offset))
type Block<'a> =
{ Ptr: PositionOffset
//TODO: in this approach Block contains both user data and CRDT metadata, it's possible
// however to split these appart and all slicing manipulations can be performed on blocks
// alone. In this case Query method could return an user data right away with no extra
// modifications, while the user-content could be stored in optimized structure such as Rope,
// instead of deeply cloned arrays used here.
Data: Content<'a> }
member this.Length =
match this.Data with
| Content data -> data.Length
| Tombstone skipped -> skipped
override this.ToString() =
sprintf "%O -> %A" this.Ptr this.Data
[<RequireQualifiedAccess>]
module Block =
let tombstone (block: Block<'a>) = { block with Data = Tombstone block.Length }
let isTombstone (block: Block<'a>) = match block.Data with Tombstone _ -> true | _ -> false
let split (offset) (block: Block<'a>) =
if offset = block.Length then (block, None)
else
let ptr = block.Ptr
let (a, b) = block.Data.Slice offset
let left = { block with Data = a }
let right = { Ptr = { ptr with Offset = ptr.Offset + offset }; Data = b }
(left, Some right)
type Rga<'a> =
{ Sequencer: Position
Blocks: Block<'a>[] }
type Command<'a> =
| Insert of index:int * 'a[]
| RemoveAt of index:int * count:int
type Operation<'a> =
| Inserted of after:PositionOffset * at:Position * value:'a[]
| Removed of slices:(PositionOffset*int) list
/// Given user-aware index, return an index of a block and position inside of that block,
/// which matches provided index.
let private findByIndex idx blocks =
let rec loop currentIndex consumed (idx: int) (blocks: Block<'a>[]) =
if idx = consumed then (currentIndex, 0)
else
let block = blocks.[currentIndex]
if Block.isTombstone block then
loop (currentIndex+1) consumed idx blocks
else
let remaining = idx - consumed
if remaining <= block.Length then
// we found the position somewhere in the block
(currentIndex, remaining)
else
// move to the next block with i shortened by current block length
loop (currentIndex + 1) (consumed + block.Length) idx blocks
loop 0 0 idx blocks
let private findByPositionOffset ptr blocks =
let rec loop idx ptr (blocks: Block<'a>[]) =
let block = blocks.[idx]
if block.Ptr.Position = ptr.Position then
if block.Ptr.Offset + block.Length >= ptr.Offset then (idx, ptr.Offset-block.Ptr.Offset)
else loop (idx+1) ptr blocks
else loop (idx+1) ptr blocks
loop 0 ptr blocks
/// Recursively check if the next vertex on the right of a given `offset`
/// has position higher than `pos` at if so, shift offset to the right.
let rec private shift offset pos (blocks: Block<'a>[]) =
if offset >= blocks.Length then offset // append at the tail
else
let next = blocks.[offset].Ptr.Position
if next < pos then offset
else shift (offset+1) pos blocks // move insertion point to the right
/// Increments given sequence number.
let inline private nextSeqNr ((i, id): Position) : Position = (i+1, id)
let private sliceBlocks start count blocks =
let rec loop acc idx offset remaining (blocks: Block<'a>[]) =
let block = blocks.[idx]
let ptr = block.Ptr
let ptr = { ptr with Offset = ptr.Offset + offset }
let len = block.Length - offset
if len > remaining then (ptr, remaining)::acc
elif len = 0 then loop acc (idx+1) 0 remaining blocks // skip over empty blocks
else loop ((ptr, len)::acc) (idx+1) 0 (remaining-len) blocks
let (first, offset) = findByIndex start blocks
loop [] first offset count blocks |> List.rev
let private filterBlocks slices blocks =
let rec loop (acc: ResizeArray<Block<'a>>) idx slices (blocks: Block<'a>[]) =
match slices with
| [] ->
for i=idx to blocks.Length-1 do
acc.Add blocks.[i] // copy over remaining blocks
acc.ToArray()
| (ptr, length)::tail ->
let block = blocks.[idx]
if block.Ptr.Position = ptr.Position then // we found valid block
let currLen = block.Length
if block.Ptr.Offset = ptr.Offset then // the beginning of deleted block was found
if currLen = length then // deleted block exactly matches bounds
acc.Add (Block.tombstone block)
loop acc (idx+1) tail blocks
elif currLen < length then // deleted block is longer, delete current one and keep remainder
acc.Add (Block.tombstone block)
let ptr = { ptr with Offset = ptr.Offset + currLen }
loop acc (idx+1) ((ptr, length-currLen)::tail) blocks
else // deleted block is shorter, we need to split current block and tombstone left side
let (left, Some right) = Block.split length block
acc.Add (Block.tombstone left)
acc.Add right
loop acc (idx+1) tail blocks
elif block.Ptr.Offset < ptr.Offset && block.Ptr.Offset + currLen > ptr.Offset then // the deleted block starts inside of a current one
let splitPoint = ptr.Offset - block.Ptr.Offset
let (left, Some right) = Block.split splitPoint block
acc.Add left
if length > right.Length then // remainer is longer than right, we need to subtract it and keep around
let remainer = length - right.Length
acc.Add (Block.tombstone right)
let pos = { ptr with Offset = right.Ptr.Offset + right.Length }
loop acc (idx+1) ((pos, remainer)::tail) blocks
else
let (del, right) = Block.split length right
acc.Add (Block.tombstone del)
right |> Option.iter acc.Add
loop acc (idx+1) tail blocks
else // position ID is correct but offset doesn't fit, we need to move on
acc.Add block
loop acc (idx+1) slices blocks
else
acc.Add block
loop acc (idx+1) slices blocks
loop (ResizeArray()) 1 slices blocks
let private crdt (replicaId: ReplicaId) : Crdt<Rga<'a>, 'a[], Command<'a>, Operation<'a>> =
{ new Crdt<_,_,_,_> with
member _.Default =
let head = { Ptr = { Position = (0,""); Offset = 0 }; Data = Tombstone 0 }
{ Sequencer = (0,replicaId); Blocks = [| head |] }
member _.Query rga = rga.Blocks |> Array.collect (fun block -> match block.Data with Content data -> data | _ -> [||])
member _.Prepare(rga, cmd) =
match cmd with
| Insert(idx, slice) ->
let (index, offset) = findByIndex idx rga.Blocks
let ptr = rga.Blocks.[index].Ptr
let at = nextSeqNr rga.Sequencer
Inserted({ ptr with Offset = ptr.Offset+offset }, at, slice)
| RemoveAt(idx, count) ->
let slices = sliceBlocks idx count rga.Blocks
Removed slices
member _.Effect(rga, e) =
match e.Data with
| Inserted(after, at, slice) ->
let (index, split) = findByPositionOffset after rga.Blocks
let indexAdjusted = shift (index+1) at rga.Blocks
let block = rga.Blocks.[index]
let newBlock = { Ptr = { Position = at; Offset = 0}; Data = Content slice }
let (left, right) = Block.split split block
let (seqNr, replicaId) = rga.Sequencer
let nextSeqNr = (max (fst at) seqNr, replicaId)
let blocks =
rga.Blocks
|> Array.replace index left
|> Array.insert indexAdjusted newBlock
match right with
| Some right ->
let blocks = blocks |> Array.insert (indexAdjusted+1) right
{ Sequencer = nextSeqNr; Blocks = blocks }
| None ->
{ Sequencer = nextSeqNr; Blocks = blocks }
| Removed(slices) ->
let blocks = filterBlocks slices rga.Blocks
{ rga with Blocks = blocks }
}
type Endpoint<'a> = Endpoint<Rga<'a>, Command<'a>, Operation<'a>>
/// Used to create replication endpoint handling operation-based RGA protocol.
let props db replicaId ctx = replicator (crdt replicaId) db replicaId ctx
let insertRange (index: int) (slice: 'a[]) (ref: Endpoint<'a>) : Async<'a[]> = ref <? Command (Insert(index, slice))
let removeRange (index: int) (count: int) (ref: Endpoint<'a>) : Async<'a[]> = ref <? Command (RemoveAt(index, count))
/// Retrieve the current state of the RGA maintained by the given `ref` endpoint.
let query (ref: Endpoint<'a>) : Async<'a[]> = ref <? Query
[<RequireQualifiedAccess>]
module LSeq =
[<Struct;CustomComparison;CustomEquality>]
type VPtr =
{ Sequence: byte[]; Id: ReplicaId }
override this.ToString() =
String.Join('.', this.Sequence) + ":" + string this.Id
member this.CompareTo(other) =
// apply lexical comparison of sequence elements
let len = min this.Sequence.Length other.Sequence.Length
let mutable i = 0
let mutable cmp = 0
while cmp = 0 && i < len do
cmp <- this.Sequence.[i].CompareTo other.Sequence.[i]
i <- i + 1
if cmp = 0 then
// one of the sequences is subsequence of another one,
// compare their lengths (cause maybe they're the same)
// then compare replica ids
cmp <- this.Sequence.Length - other.Sequence.Length
if cmp = 0 then this.Id.CompareTo other.Id else cmp
else cmp
interface IComparable<VPtr> with member this.CompareTo other = this.CompareTo other
interface IComparable with member this.CompareTo other = match other with :? VPtr as vptr -> this.CompareTo(vptr)
interface IEquatable<VPtr> with member this.Equals other = this.CompareTo other = 0
type Vertex<'a> = (VPtr * 'a)
type LSeq<'a> = Vertex<'a>[]
type Command<'a> =
| Insert of index:int * value:'a
| RemoveAt of index:int
type Operation<'a> =
| Inserted of at:VPtr * value:'a
| Removed of at:VPtr
/// Binary search for index of `vptr` in an ordered sequence, looking for a place to insert
/// an element. If `vptr` is the lowest element, 0 will be returned. If it's the highest
/// one: lseq.Length will be returned.
let private binarySearch vptr (lseq: LSeq<_>) =
let mutable i = 0
let mutable j = lseq.Length
while i < j do
let half = (i + j) / 2
if vptr >= fst lseq.[half] then i <- half + 1
else j <- half
i
/// Generates a byte sequence that - ordered lexically - would fit between `lo` and `hi`.
let private generateSeq (lo: byte[]) (hi: byte[]) =
let rec loop (acc: ResizeArray<byte>) i (lo: byte[]) (hi: byte[]) =
let min = if i >= lo.Length then 0uy else lo.[i]
let max = if i >= hi.Length then 255uy else hi.[i]
if min + 1uy < max then
acc.Add (min + 1uy)
acc.ToArray()
else
acc.Add min
loop acc (i+1) lo hi
loop (ResizeArray (min lo.Length hi.Length)) 0 lo hi
let private crdt (replicaId: ReplicaId) : Crdt<LSeq<'a>, 'a[], Command<'a>, Operation<'a>> =
{ new Crdt<_,_,_,_> with
member _.Default = [||]
member _.Query lseq = lseq |> Array.map snd
member _.Prepare(lseq, cmd) =
match cmd with
| Insert(i, value) ->
let left = if Array.isEmpty lseq || i = 0 then [||] else (fst lseq.[i-1]).Sequence
let right = if i = lseq.Length then [||] else (fst lseq.[i]).Sequence
let ptr = { Sequence = generateSeq left right; Id = replicaId }
Inserted(ptr, value)
| RemoveAt(i) -> Removed(fst lseq.[i])
member _.Effect(lseq, e) =
match e.Data with
| Inserted(ptr, value) ->
let idx = binarySearch ptr lseq
Array.insert idx (ptr, value) lseq
| Removed(ptr) ->
let idx = binarySearch ptr lseq
Array.removeAt idx lseq
}
type Endpoint<'a> = Endpoint<LSeq<'a>, Command<'a>, Operation<'a>>
/// Used to create replication endpoint handling operation-based RGA protocol.
let props db replicaId ctx = replicator (crdt replicaId) db replicaId ctx
/// Inserts an `item` at given index. To insert at head use 0 index,
/// to push back to a tail of sequence insert at array length.
let insert (index: int) (item: 'a) (ref: Endpoint<'a>) : Async<'a[]> = ref <? Command (Insert(index, item))
/// Removes item stored at a provided `index`.
let removeAt (index: int) (ref: Endpoint<'a>) : Async<'a[]> = ref <? Command (RemoveAt index)
/// Retrieve an array of elements maintained by the given `ref` endpoint.
let query (ref: Endpoint<'a>) : Async<'a[]> = ref <? Query
@hitochan777
Copy link

I came from your great blog post. Thanks for sharing the code too.
I have a question on unseen function.
Shouldn't e.OriginSeqNr be e.LocalSeqNr because Observed keeps the maximum of locally generated seq numbers for each replica.
I am afraid the code leads to unseen events being treated as seen when e.OriginSeqNr <= ver <= e.LocalSeqNr.

I am new to F# and CRDT as well so there might be some misunderstanding...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment