Last active
          August 29, 2015 14:22 
        
      - 
      
- 
        Save hodzanassredin/80267672c3065ecb471a to your computer and use it in GitHub Desktop. 
    transalt on crdts
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | module Transaction = | |
| open System | |
| type CommitReciever = bool -> Async<unit> | |
| let emptyReciever : CommitReciever= fun _ -> async.Return () | |
| let mergeRecievers (xs: CommitReciever seq) = | |
| fun res -> async{ | |
| for x in xs do | |
| do! x(res) | |
| } | |
| type IsReader = bool | |
| type Transaction<'s,'r> = Tran of ('s -> Async<Result<'s,'r>>) | |
| and Result<'s,'r> = | Finished of 's * 'r * CommitReciever | |
| | Sync of 's * Transaction<'s,'r> * CommitReciever * IsReader | |
| let fromAsync wrkfl = | |
| Tran(fun state -> async{ | |
| let! res = wrkfl | |
| return Finished(state, res, emptyReciever) | |
| }) | |
| let always x = async.Return x |> fromAsync | |
| let withRcvr x rcvr = | |
| Tran(fun state -> async{ | |
| return Finished(state, x, rcvr) | |
| }) | |
| let rec never<'a> () = Tran(fun state -> | |
| async.Return (Sync(state, never<'a> (), emptyReciever, true))) | |
| let rec run state (Tran(f)) = | |
| async{ | |
| let! res = f(state) | |
| match res with | |
| | Finished(s,r,c) -> return Some(s,r,c) | |
| | Sync(s,t,c,isReader) -> if isReader then do! c(false) | |
| return None | |
| else return! run s t | |
| } | |
| let step (state:'s) (tr: Transaction<'s,'r>) = | |
| match tr with | |
| | Tran(f) -> f(state) | |
| let getState res = | |
| match res with | |
| | Finished (s,_,_) -> s | |
| | Sync (s,_,_,_) -> s | |
| let choose xs = | |
| let rnd = new Random() | |
| Tran(fun state -> async{ | |
| let run = run state | |
| let! res = xs |> Seq.map run |> Async.Parallel | |
| let res = Array.choose (fun x->x) res | |
| if res.Length = 0 then return Sync(state, never(), emptyReciever, true) | |
| else let winner = rnd.Next(res.Length - 1) | |
| let s, r, c = res.[winner] | |
| for i in 0.. res.Length do | |
| if i <> winner then | |
| let _,_,c = res.[i] | |
| do! c(false) | |
| return Finished(s,r,c) | |
| }) | |
| let rec bind (t:Transaction<'s,'r>) (f:'r->Transaction<'s,'r2>) = | |
| Tran(fun (state:'s) -> async{ | |
| let! res = step state t | |
| match res with | |
| | Finished (s,r,c) -> let next = f(r) | |
| let! res = step s next | |
| match res with | |
| | Finished (s2,r2,c2) -> return Finished(s2,r2, mergeRecievers [c;c2]) | |
| | Sync (s2,t2,c2, isReader) -> return Sync(s2,t2, mergeRecievers [c;c2],isReader) | |
| | Sync (s,t,c,isReader) -> return Sync(s,bind t f,c,isReader) | |
| }) | |
| let rec inline merge (xs:Transaction<'s,'r> seq) (finished: 'r[]) (finishedRcvr: CommitReciever): Transaction<'s,'r[]> = | |
| if Seq.length xs = 0 then withRcvr finished finishedRcvr | |
| else Tran(fun (state:'s) -> async{ | |
| let step x = step state x | |
| let! res = xs |> Seq.map step |> Async.Parallel | |
| let states = res |> Array.map getState | |
| let inline folder s s2 = s +++ s2 | |
| let state = Array.fold folder state states | |
| let finished2, finishedRcvrs2 = res |> Array.choose (fun x -> match x with | |
| | Finished (_,r,c) ->Some(r,c) | |
| | _ -> None) | |
| |> Array.unzip | |
| let synced, syncRcvrs, readers= res |> Array.choose (fun x -> match x with | |
| | Sync (_,t,c, isReader) ->Some(t,c,isReader) | |
| | _ -> None) | |
| |> Array.unzip3 | |
| let finishedRcvr = Array.collect id [|[|finishedRcvr|]; finishedRcvrs2|] |> mergeRecievers | |
| let allRcvrs = Array.collect id [|[|finishedRcvr|]; syncRcvrs|] |> mergeRecievers | |
| let tran = merge synced (Array.append finished finished2) finishedRcvr | |
| let IsReader = Array.TrueForAll(readers, fun x-> x) | |
| return Sync(state, tran, allRcvrs, IsReader) | |
| }) | |
| let wrap tran c = | |
| Tran(fun state -> async{ | |
| let! res = step state tran | |
| match res with | |
| | Finished (s2,r2,c2) -> return Finished(s2,r2, mergeRecievers [c;c2]) | |
| | Sync (s2,t2,c2, isReader) -> return Sync(s2,t2, mergeRecievers [c;c2],isReader) | |
| }) | |
| let withAck builder = async{ | |
| let! tran, c = builder() | |
| return wrap tran c | |
| } | |
| module Crdt = | |
| //https://github.com/aphyr/meangirls#lww-element-set | |
| type GSet<'a when 'a : comparison>() = | |
| let set = Set.empty | |
| with member x.add v = set.Add v | |
| member x.toSet () = set | |
| member x.difference (set2:GSet<'a>) = Set.difference set (set2.toSet()) | |
| member x.intersect (set2:GSet<'a>) = Set.intersect set (set2.toSet()) | |
| member x.union (set2:GSet<'a>) = Set.union set (set2.toSet()) | |
| member x.isEmpty with get() = Set.isEmpty | |
| static member Empty with get() = new GSet<_>() | |
| static member (+++) (set1:GSet<'a>,set2:GSet<'a>) = set1.union(set2) | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment