Created
November 27, 2019 11:03
-
-
Save vshapenko/9d2d6a912e8d44b792f10852af493b9b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type EventStream<'T> ()= | |
let evt=Channel.CreateUnbounded<'T>() | |
let writer = evt.Writer | |
let reader = evt.Reader | |
let mutable key = 0 | |
let mutable subscriptions = Map.empty : Map<int, IObserver<'T>> | |
let thisLock = new obj() | |
let obs = | |
{ new IObservable<'T> with | |
member this.Subscribe(obs) = | |
let key1 = | |
lock thisLock (fun () -> | |
let key1 = key | |
key <- key + 1 | |
subscriptions <- subscriptions.Add(key1, obs) | |
key1) | |
{ new IDisposable with | |
member this.Dispose() = | |
lock thisLock (fun () -> | |
subscriptions <- subscriptions.Remove(key1)) } } | |
let rec loop = startOnPool <| async{ | |
do! Async.SwitchToThreadPool() | |
let! flag=task {return! reader.WaitToReadAsync()}|> Async.AwaitTask |> startOnPool | |
if(flag) then | |
match reader.TryRead() with | |
| (true, item) -> subscriptions |> Map.iter (fun _ v -> v.OnNext item) | |
| (false,_) -> () | |
return! loop | |
else return () | |
} | |
do loop |> Async.Start | |
member x.Pub v=task{ | |
do! writer.WriteAsync v | |
} |> Async.AwaitTask |> Async.Start | |
member x.Sub (f:'T->unit)= obs.Subscribe f | |
member x.Stream = obs | |
member x.Pull() : Async<'T > = startOnPool ^ (reader.ReadAsync().AsTask()|>Async.AwaitTask) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment