Skip to content

Instantly share code, notes, and snippets.

@vshapenko
Created November 27, 2019 11:03
Show Gist options
  • Save vshapenko/9d2d6a912e8d44b792f10852af493b9b to your computer and use it in GitHub Desktop.
Save vshapenko/9d2d6a912e8d44b792f10852af493b9b to your computer and use it in GitHub Desktop.
type EventStream<'T> ()=
let evt=Channel.CreateUnbounded<'T>()
let writer = evt.Writer
let reader = evt.Reader
let mutable key = 0
let mutable subscriptions = Map.empty : Map<int, IObserver<'T>>
let thisLock = new obj()
let obs =
{ new IObservable<'T> with
member this.Subscribe(obs) =
let key1 =
lock thisLock (fun () ->
let key1 = key
key <- key + 1
subscriptions <- subscriptions.Add(key1, obs)
key1)
{ new IDisposable with
member this.Dispose() =
lock thisLock (fun () ->
subscriptions <- subscriptions.Remove(key1)) } }
let rec loop = startOnPool <| async{
do! Async.SwitchToThreadPool()
let! flag=task {return! reader.WaitToReadAsync()}|> Async.AwaitTask |> startOnPool
if(flag) then
match reader.TryRead() with
| (true, item) -> subscriptions |> Map.iter (fun _ v -> v.OnNext item)
| (false,_) -> ()
return! loop
else return ()
}
do loop |> Async.Start
member x.Pub v=task{
do! writer.WriteAsync v
} |> Async.AwaitTask |> Async.Start
member x.Sub (f:'T->unit)= obs.Subscribe f
member x.Stream = obs
member x.Pull() : Async<'T > = startOnPool ^ (reader.ReadAsync().AsTask()|>Async.AwaitTask)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment