-
-
Save medigor/7aa6e7c62bfeed9b85ad9851a74e69b3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type EventStream<'T> ()= | |
let evt=Channel.CreateUnbounded<'T>() | |
let writer = evt.Writer | |
let reader = evt.Reader | |
let mutable key = 0 | |
let mutable subscriptions = Map.empty : Map<int, IObserver<'T>> | |
let thisLock = new obj() | |
let obs = | |
{ new IObservable<'T> with | |
member this.Subscribe(obs) = | |
let key1 = | |
lock thisLock (fun () -> | |
let key1 = key | |
key <- key + 1 | |
subscriptions <- subscriptions.Add(key1, obs) | |
key1) | |
{ new IDisposable with | |
member this.Dispose() = | |
lock thisLock (fun () -> | |
subscriptions <- subscriptions.Remove(key1)) } } | |
let rec loop = startOnPool <| async{ | |
do! Async.SwitchToThreadPool() | |
let! flag=task {return! reader.WaitToReadAsync()}|> Async.AwaitTask |> startOnPool | |
if(flag) then | |
match reader.TryRead() with | |
| (true, item) -> subscriptions |> Map.iter (fun _ v -> v.OnNext item) | |
| (false,_) -> () | |
return! loop | |
else return () | |
} | |
do loop |> Async.Start | |
member x.Pub v=task{ | |
do! writer.WriteAsync v | |
} |> Async.AwaitTask |> Async.Start | |
member x.Sub (f:'T->unit)= obs.Subscribe f | |
member x.Stream = obs | |
member x.Pull() : Async<'T > = startOnPool ^ (reader.ReadAsync().AsTask()|>Async.AwaitTask) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment