Last active
May 17, 2024 15:12
-
-
Save Horusiath/1091a4c3629acf995d50e506db35d5f2 to your computer and use it in GitHub Desktop.
Yata move algorithm
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Demos | |
open System | |
type ReplicaId = String | |
[<RequireQualifiedAccess>] | |
module Array = | |
let string (a: 't[]) = | |
let sb = System.Text.StringBuilder() | |
sb.Append("[") |> ignore | |
let mutable e = a.GetEnumerator() | |
if e.MoveNext() then | |
sb.Append(string e.Current) |> ignore | |
while e.MoveNext() do | |
sb.Append(", ").Append(string e.Current) |> ignore | |
sb.Append("]") |> ignore | |
sb.ToString() | |
let insert idx item array = | |
let len = Array.length array | |
let copy = Array.zeroCreate (len + 1) | |
Array.blit array 0 copy 0 idx | |
copy.[idx] <- item | |
Array.blit array idx copy (idx+1) (len - idx) | |
copy | |
let removeAt idx array = | |
let len = Array.length array | |
let copy = Array.zeroCreate (len - 1) | |
Array.blit array 0 copy 0 idx | |
Array.blit array (idx+1) copy idx (len - idx - 1) | |
copy | |
let replace idx item array = | |
let copy = Array.copy array | |
copy.[idx] <- item | |
copy | |
/// Binary search for index in an ordered sequence, looking for a place to insert | |
/// an element. Predicate can be used as eg. `fun a -> toInsert >= a`. | |
/// If 'toInsert' is the lowest element, 0 will be returned. If it's the highest | |
/// one: array.Length will be returned. | |
let binarySearch (predicate: 'a -> bool) (array: 'a[]) = | |
let mutable i = 0 | |
let mutable j = array.Length | |
while i < j do | |
let half = (i + j) / 2 | |
if not (predicate array.[half]) then i <- half + 1 | |
else j <- half | |
i |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Demos.Yata | |
open System.Collections.Generic | |
open Demos | |
type ID = (ReplicaId * uint64) | |
type Content<'t> = | |
| Value of value:'t | |
| Tombstone | |
| Moved of from:ID * priority:int | |
type Block<'t> = | |
{ Id: ID // unique block identifier | |
OriginLeft: Option<ID> | |
OriginRight: Option<ID> | |
MovedTo: Option<ID> | |
Value: Content<'t> } | |
module Block = | |
let isDeleted b = match b.Value with Tombstone -> true | _ -> false | |
let value b = match b.Value with Value v -> Some v | _ -> None | |
/// A minimal implementation of YATA CRDT. This one lacks any optimizations | |
/// and is created mostly for educational purposes. | |
/// | |
/// For paper, see: https://www.researchgate.net/publication/310212186_Near_Real-Time_Peer-to-Peer_Shared_Editing_on_Extensible_Data_Types | |
type Yata<'t> = Block<'t>[] | |
[<RequireQualifiedAccess>] | |
module Yata = | |
/// Returns zero/default/empty instance of YATA array. | |
let zero: Yata<'t> = [||] | |
/// Returns index of block identified by given `id` within YATA `array`, | |
/// or `None` if no such block existed. | |
let private indexOf (array: Yata<'t>) (id: ID) = | |
array |> Array.tryFindIndex (fun b -> b.Id = id) | |
/// Sequence that produces blocks with values in the order that respects their move. | |
let private blocks (array: Yata<'t>) = seq { | |
let mutable i = 0 | |
while i < array.Length do | |
let block = array.[i] | |
match block.Value with | |
| Moved(from,_) -> | |
let j = indexOf array from |> Option.get | |
let target = array.[j] | |
match target.Value with | |
| Value _ when target.MovedTo = Some block.Id -> yield (j, target) | |
| _ -> () | |
| Value _ when Option.isNone block.MovedTo -> yield (i, block) | |
| Value _ -> () | |
| Tombstone -> () | |
i <- i + 1 | |
} | |
/// Returns a value of YATA array, stripped of all of the metadata, without tombstones. | |
let value (array: Yata<'t>) : 't[] = | |
blocks array | |
|> Seq.choose (fun (_, b) -> Block.value b) | |
|> Seq.toArray | |
let str (array: Yata<char>) : string = | |
let sb = System.Text.StringBuilder() | |
for c in value array do | |
sb.Append c |> ignore | |
sb.ToString() | |
/// Maps used defined `index` into an actual block index, skipping over deleted blocks. | |
let private findPosition (index: int) (array: Yata<'t>) = | |
blocks array |> Seq.skip index |> Seq.tryHead | |
/// Gets last sequence number for a given `replicaId` (0 in no block with given `id` exists). | |
let private lastSeqNr replicaId (array: Yata<'t>) = | |
let rec loop blocks id seqNr i = | |
if i >= Array.length blocks then seqNr | |
else | |
let (id', seqNr') = blocks.[i].Id | |
if id' = id then | |
loop blocks id (max seqNr seqNr') (i+1) | |
else loop blocks id seqNr (i+1) | |
loop array replicaId 0UL 0 | |
/// Safe indexer function, which returns a block at given index `i` | |
/// or `None` if index was outside of the bounds of an `array`. | |
let private getBlock (i: int) (array: Yata<'t>) : Option<Block<'t>> = | |
if i < 0 || i >= array.Length then None | |
else Some array.[i] | |
/// This function deals with the complexity of determining where to insert | |
/// a given `block` within YATA `array`, given the circumstances that in the | |
/// meantime other blocks might have been inserted concurrently. | |
let rec private findInsertIndex (array: Yata<'t>) block scanning left right dst i = | |
let dst = if scanning then dst else i | |
if i = right || i = Array.length array then dst | |
else | |
let o = array.[i] | |
let oleft = o.OriginLeft |> Option.bind (indexOf array) |> Option.defaultValue -1 | |
let oright = o.OriginRight |> Option.bind (indexOf array) |> Option.defaultValue array.Length | |
let id1 = fst block.Id | |
let id2 = fst o.Id | |
if oleft < left || (oleft = left && oright = right && id1 <= id2) | |
then dst | |
else | |
let scanning = if oleft = left then id1 <= id2 else scanning | |
findInsertIndex array block scanning left right dst (i+1) | |
let private integrateMoved block (array: Yata<'t>) = | |
match block.Value with | |
| Moved(target, prio) -> | |
// we need to check if target block was not already moved by another operation, in such case the one with | |
// higher priority and ID wins | |
let targetIdx = indexOf array target |> Option.get | |
let target = array.[targetIdx] | |
match target.MovedTo with | |
| None -> array.[targetIdx] <- { target with MovedTo = Some block.Id } | |
| Some other -> | |
let otherIdx = indexOf array other |> Option.get | |
let other = array.[otherIdx] | |
match other.Value with | |
| Moved(_, prio2) -> | |
if prio > prio2 || (prio = prio2 && block.Id > other.Id) then | |
// current move operation has precedence over already existing one, override previous move | |
array.[targetIdx] <- { target with MovedTo = Some block.Id } | |
| _ -> () // this shouldn't happen | |
| _ -> () | |
array | |
/// Puts given `block` into an YATA `array` based on the adjacency of its | |
/// left and right origins. This behavior is shared between `insert` and `merge` functions. | |
let private integrate (block: Block<'t>) (array: Yata<'t>) : Yata<'t> = | |
let (id, seqNr) = block.Id | |
let last = lastSeqNr id array | |
if last <> seqNr - 1UL | |
// since we operate of left/right origins we cannot allow for the gaps between blocks to happen | |
then failwithf "operation out of order: tried to insert after (%s,%i): %O" id last block | |
else | |
let left = | |
block.OriginLeft | |
|> Option.bind (indexOf array) | |
|> Option.defaultValue -1 | |
let right = | |
block.OriginRight | |
|> Option.bind (indexOf array) | |
|> Option.defaultValue (Array.length array) | |
let i = findInsertIndex array block false left right (left+1) (left+1) | |
let array = Array.insert i block array // since we do deep array copy here, we can mutate it from this point | |
array | |
/// Inserts a given `value` at provided `index` of an YATA `array`. Insert is performed | |
/// from the perspective of `replicaId`. | |
let insert (replicaId: ReplicaId) (index: int) (value: 't) (array: Yata<'t>) : Yata<'t> = | |
let (i, right) = | |
findPosition index array | |
|> Option.map (fun (i, block) -> (i, Some block.Id)) | |
|> Option.defaultValue (array.Length, None) | |
let seqNr = 1UL + lastSeqNr replicaId array | |
let left = array |> getBlock (i-1) |> Option.map (fun b -> b.Id) | |
let block = | |
{ Id = (replicaId, seqNr) | |
OriginLeft = left | |
OriginRight = right | |
MovedTo = None | |
Value = Value value } | |
integrate block array | |
/// Deletes an element at given `index`. YATA uses tombstones to mark items as deleted, | |
/// so that they can be later used as reference points (origins) by potential concurrent | |
/// operations. | |
let delete (index: int) (blocks: Yata<'t>) : Yata<'t> = | |
let (i, block) = findPosition index blocks |> Option.get | |
let tombstoned = { block with Value = Tombstone } | |
Array.replace i tombstoned blocks | |
/// Merges two YATA arrays together. | |
let merge (a: Yata<'t>) (b: Yata<'t>) : Yata<'t> = | |
// IDs of the blocks that have been tombstoned | |
let tombstones = b |> Array.choose (fun b -> if Block.isDeleted b then Some b.Id else None) | |
let mutable a = | |
a // tombstone existing elements | |
|> Array.map (fun block -> | |
if not (Block.isDeleted block) && Array.contains block.Id tombstones | |
then { block with Value = Tombstone } // mark block as deleted | |
else block | |
) | |
// IDs of blocks already existing in `a` | |
let mutable seen = a |> Array.map (fun b -> b.Id) |> Set.ofArray | |
let blocks = | |
b | |
// deduplicate blocks already existing in current array `a` | |
|> Array.filter (fun block -> not (Set.contains block.Id seen)) | |
let mutable remaining = blocks.Length | |
let inline isPresent seen id = | |
id | |
|> Option.map (fun id -> Set.contains id seen) | |
|> Option.defaultValue true | |
while remaining > 0 do | |
for block in blocks do | |
// make sure that block was not already inserted | |
// but its dependencies are already present in `a` | |
let canInsert = | |
not (Set.contains block.Id seen) && | |
(isPresent seen block.OriginLeft) && | |
(isPresent seen block.OriginRight) | |
if canInsert then | |
a <- integrate block a | |
seen <- Set.add block.Id seen | |
remaining <- remaining - 1 | |
for block in blocks do | |
a <- integrateMoved block a | |
a | |
type Delta<'t> = (Yata<'t> * ID[]) | |
/// Returns version representing the actual progression and state of Yata array. | |
let version (a: Yata<'t>) : VTime = | |
a | |
|> Array.fold (fun vtime block -> | |
let (replicaId, seqNr) = block.Id | |
match Map.tryFind replicaId vtime with | |
| None -> Map.add replicaId seqNr vtime | |
| Some seqNr' -> Map.add replicaId (max seqNr' seqNr) vtime | |
) Version.zero | |
/// Computes delta based on `version` produced by `Yata.version` vector generated | |
/// by one peer and state `a` present on another peer. Returned delta is a combination | |
/// of Yata array, which contains only blocks that appeared after given `version` | |
/// and so called delete set which contains IDs of deleted blocks. | |
let delta (version: VTime) (a: Yata<'t>) : Delta<'t> = | |
let deltaArray = | |
a | |
|> Array.filter (fun block -> | |
let (replicaId, seqNr) = block.Id | |
match Map.tryFind replicaId version with | |
| None -> true | |
| Some n -> seqNr > n) | |
// Note: in practical implementation delete set can be compressed. | |
let deleteSet = | |
a | |
|> Array.choose (fun block -> if Block.isDeleted block then Some block.Id else None) | |
(deltaArray, deleteSet) | |
/// Merges given `delta` from remote into local Yata array `a`. | |
let mergeDelta (delta: Delta<'t>) (a: Yata<'t>) : Yata<'t> = | |
let (b, deleteSet) = delta | |
merge a b | |
|> Array.map (fun block -> | |
if not (Block.isDeleted block) && Array.contains block.Id deleteSet | |
then { block with Value = Tombstone } // tombstone block | |
else block) | |
let move (replicaId: ReplicaId) (src: int) (dst: int) (array: Yata<'t>) : Yata<'t> = | |
let (src, moved) = findPosition src array |> Option.get | |
let (dst, right) = | |
findPosition dst array | |
|> Option.map (fun (i,block) -> (i, Some block.Id)) | |
|> Option.defaultValue (array.Length, None) | |
let seqNr = 1UL + lastSeqNr replicaId array | |
let left = array |> getBlock (dst-1) |> Option.map (fun b -> b.Id) | |
let prio = | |
// if our src was moved by other block, get this move block priority and increment it | |
moved.MovedTo | |
|> Option.bind (fun dst -> indexOf array dst) | |
|> Option.bind (fun idx -> getBlock idx array) | |
|> Option.map (fun block -> match block.Value with Moved(_,prio) -> prio + 1 | _ -> 0) | |
|> Option.defaultValue 0 | |
let block = | |
{ Id = (replicaId, seqNr) | |
OriginLeft = left | |
OriginRight = right | |
MovedTo = None | |
Value = Moved(moved.Id, prio) } | |
integrate block array |> integrateMoved block |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Demos.Yata.Tests | |
open Expecto | |
open Demos.Yata | |
let [<Literal>] A = "A" | |
let [<Literal>] B = "B" | |
[<Tests>] | |
let tests = testList "Yata" [ | |
test "merge insert" { | |
let a = Yata.zero | |
|> Yata.insert A 0 'a' | |
|> Yata.insert A 1 'd' | |
let b = a |> Yata.insert B 1 'c' | |
let a = a |> Yata.insert A 1 'b' | |
Expect.equal (Yata.str a) "abd" "pre-merge (A)" | |
Expect.equal (Yata.str b) "acd" "pre-merge (B)" | |
let a1 = Yata.merge a b | |
let a2 = Yata.merge b a | |
Expect.equal (Yata.str a1) "abcd" "post-merge" | |
Expect.equal a1 a2 "post-merge commutativity" | |
} | |
test "merge insert/delete" { | |
let a = Yata.zero | |
|> Yata.insert A 0 'a' | |
|> Yata.insert A 1 'd' | |
let b = a |> Yata.insert B 1 'c' | |
let a = a |> Yata.delete 1 | |
Expect.equal (Yata.str a) "a" "pre-merge (A)" | |
Expect.equal (Yata.str b) "acd" "pre-merge (B)" | |
let a1 = Yata.merge a b | |
let a2 = Yata.merge b a | |
Expect.equal (Yata.str a1) "ac" "post-merge" | |
Expect.equal a1 a2 "post-merge commutativity" | |
} | |
ftest "merge moved" { | |
let a = Yata.zero | |
|> Yata.insert A 0 'a' | |
|> Yata.insert A 1 'b' | |
|> Yata.insert A 2 'c' | |
let b = a |> Yata.move B 1 3 | |
let a = a |> Yata.move A 1 0 | |
Expect.equal (Yata.str a) "bac" "pre-merge (A)" | |
Expect.equal (Yata.str b) "acb" "pre-merge (B)" | |
let a1 = Yata.merge a b | |
let a2 = Yata.merge b a | |
Expect.equal (Yata.str a1) "acb" "post-merge" | |
Expect.equal a1 a2 "post-merge" | |
} | |
] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment