Skip to content

Instantly share code, notes, and snippets.

@hodzanassredin
Last active August 29, 2015 14:22
Show Gist options
  • Save hodzanassredin/80267672c3065ecb471a to your computer and use it in GitHub Desktop.
Save hodzanassredin/80267672c3065ecb471a to your computer and use it in GitHub Desktop.
transalt on crdts
module Transaction =
open System
type CommitReciever = bool -> Async<unit>
let emptyReciever : CommitReciever= fun _ -> async.Return ()
let mergeRecievers (xs: CommitReciever seq) =
fun res -> async{
for x in xs do
do! x(res)
}
type IsReader = bool
type Transaction<'s,'r> = Tran of ('s -> Async<Result<'s,'r>>)
and Result<'s,'r> = | Finished of 's * 'r * CommitReciever
| Sync of 's * Transaction<'s,'r> * CommitReciever * IsReader
let fromAsync wrkfl =
Tran(fun state -> async{
let! res = wrkfl
return Finished(state, res, emptyReciever)
})
let always x = async.Return x |> fromAsync
let withRcvr x rcvr =
Tran(fun state -> async{
return Finished(state, x, rcvr)
})
let rec never<'a> () = Tran(fun state ->
async.Return (Sync(state, never<'a> (), emptyReciever, true)))
let rec run state (Tran(f)) =
async{
let! res = f(state)
match res with
| Finished(s,r,c) -> return Some(s,r,c)
| Sync(s,t,c,isReader) -> if isReader then do! c(false)
return None
else return! run s t
}
let step (state:'s) (tr: Transaction<'s,'r>) =
match tr with
| Tran(f) -> f(state)
let getState res =
match res with
| Finished (s,_,_) -> s
| Sync (s,_,_,_) -> s
let choose xs =
let rnd = new Random()
Tran(fun state -> async{
let run = run state
let! res = xs |> Seq.map run |> Async.Parallel
let res = Array.choose (fun x->x) res
if res.Length = 0 then return Sync(state, never(), emptyReciever, true)
else let winner = rnd.Next(res.Length - 1)
let s, r, c = res.[winner]
for i in 0.. res.Length do
if i <> winner then
let _,_,c = res.[i]
do! c(false)
return Finished(s,r,c)
})
let rec bind (t:Transaction<'s,'r>) (f:'r->Transaction<'s,'r2>) =
Tran(fun (state:'s) -> async{
let! res = step state t
match res with
| Finished (s,r,c) -> let next = f(r)
let! res = step s next
match res with
| Finished (s2,r2,c2) -> return Finished(s2,r2, mergeRecievers [c;c2])
| Sync (s2,t2,c2, isReader) -> return Sync(s2,t2, mergeRecievers [c;c2],isReader)
| Sync (s,t,c,isReader) -> return Sync(s,bind t f,c,isReader)
})
let rec inline merge (xs:Transaction<'s,'r> seq) (finished: 'r[]) (finishedRcvr: CommitReciever): Transaction<'s,'r[]> =
if Seq.length xs = 0 then withRcvr finished finishedRcvr
else Tran(fun (state:'s) -> async{
let step x = step state x
let! res = xs |> Seq.map step |> Async.Parallel
let states = res |> Array.map getState
let inline folder s s2 = s +++ s2
let state = Array.fold folder state states
let finished2, finishedRcvrs2 = res |> Array.choose (fun x -> match x with
| Finished (_,r,c) ->Some(r,c)
| _ -> None)
|> Array.unzip
let synced, syncRcvrs, readers= res |> Array.choose (fun x -> match x with
| Sync (_,t,c, isReader) ->Some(t,c,isReader)
| _ -> None)
|> Array.unzip3
let finishedRcvr = Array.collect id [|[|finishedRcvr|]; finishedRcvrs2|] |> mergeRecievers
let allRcvrs = Array.collect id [|[|finishedRcvr|]; syncRcvrs|] |> mergeRecievers
let tran = merge synced (Array.append finished finished2) finishedRcvr
let IsReader = Array.TrueForAll(readers, fun x-> x)
return Sync(state, tran, allRcvrs, IsReader)
})
let wrap tran c =
Tran(fun state -> async{
let! res = step state tran
match res with
| Finished (s2,r2,c2) -> return Finished(s2,r2, mergeRecievers [c;c2])
| Sync (s2,t2,c2, isReader) -> return Sync(s2,t2, mergeRecievers [c;c2],isReader)
})
let withAck builder = async{
let! tran, c = builder()
return wrap tran c
}
module Crdt =
//https://github.com/aphyr/meangirls#lww-element-set
type GSet<'a when 'a : comparison>() =
let set = Set.empty
with member x.add v = set.Add v
member x.toSet () = set
member x.difference (set2:GSet<'a>) = Set.difference set (set2.toSet())
member x.intersect (set2:GSet<'a>) = Set.intersect set (set2.toSet())
member x.union (set2:GSet<'a>) = Set.union set (set2.toSet())
member x.isEmpty with get() = Set.isEmpty
static member Empty with get() = new GSet<_>()
static member (+++) (set1:GSet<'a>,set2:GSet<'a>) = set1.union(set2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment