Last active
November 30, 2018 13:38
-
-
Save hakonrossebo/d64e54a6ed7d4def0989716c18995ad8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copy of sourcefile from book | |
// https://github.com/rikace/fConcBook/blob/master/Chapter.06/TwitterEmotionAnalysis.cs/RxPubSub.cs | |
// Concurrency in .NET | |
// https://www.manning.com/books/concurrency-in-dotnet | |
using System; | |
using System.Collections.Generic; | |
using System.Reactive.Concurrency; | |
using System.Reactive.Linq; | |
using System.Reactive.Subjects; | |
namespace RxPublisherSubscriber | |
{ | |
//Listing 6.6 Reactive Publisher Subscriber in C# | |
public class RxPubSub<T> : IDisposable | |
{ | |
private ISubject<T> subject; //#A | |
private readonly List<IObserver<T>> observers = new List<IObserver<T>>(); //#B | |
private readonly List<IDisposable> observables = new List<IDisposable>(); //#C | |
public RxPubSub(ISubject<T> subject) | |
{ | |
this.subject = subject; //#D | |
} | |
public RxPubSub() : this(new Subject<T>()) { } //#D | |
public IDisposable Subscribe(IObserver<T> observer) | |
{ | |
observers.Add(observer); | |
observables.Add(this.subject.Subscribe(observer)); | |
return new ObserverHandler<T>(observer, observers); //#E | |
} | |
public IDisposable AddPublisher(IObservable<T> observable) => | |
observable.SubscribeOn(TaskPoolScheduler.Default).Subscribe(subject); //#F | |
public IObservable<T> AsObservable() => subject.AsObservable(); //#G | |
public void Dispose() | |
{ | |
subject.OnCompleted(); | |
observers.ForEach(x => x.OnCompleted()); | |
observers.Clear(); //#H | |
} | |
} | |
class ObserverHandler<T> : IDisposable //#I | |
{ | |
private readonly IObserver<T> observer; | |
private readonly List<IObserver<T>> observers; | |
public ObserverHandler(IObserver<T> observer, List<IObserver<T>> observers) | |
{ | |
this.observer = observer; | |
this.observers = observers; | |
} | |
public void Dispose() //#I | |
{ | |
observer.OnCompleted(); | |
observers.Remove(observer); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Initial F# implementation of C# file above | |
namespace Fs.Rx | |
open System | |
open System.Collections.Generic | |
open System.Reactive.Concurrency | |
open System.Reactive.Subjects | |
open System.Reactive.Linq | |
type ObserverHandler<'T>(observer: IObserver<'T>, observers: List<IObserver<'T>>) = | |
interface IDisposable with | |
member x.Dispose() = | |
observer.OnCompleted() | |
observers.Remove(observer) |> ignore | |
type RxPubSub<'T>(subject: ISubject<'T>) = | |
let mutable observers: List<IObserver<'T>> = new List<IObserver<'T>>() | |
let mutable observables: List<IDisposable> = new List<IDisposable>() | |
new () = RxPubSub(Subject<'T>()) | |
interface IDisposable with | |
member x.Dispose() = | |
subject.OnCompleted() | |
observers.ForEach(fun x -> x.OnCompleted()) | |
observers.Clear() | |
member x.Subscribe(observer: IObserver<'T>) = | |
observers.Add(observer) | |
observables.Add(subject.Subscribe(observer)) | |
new ObserverHandler<'T>(observer, observers) | |
member x.AddPublisher(observable: IObservable<'T>) = | |
observable.SubscribeOn(TaskPoolScheduler.Default).Subscribe(subject); | |
member x.AsObservable() = subject.AsObservable(); | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
When I use
new () = RxPubSub(Subject<'T>())
I get this warning:
It is recommended that objects supporting the IDisposable interface are created using the syntax 'new Type(args)', rather than 'Type(args)' or 'Type' as a function value representing the constructor, to indicate that resources may be owned by the generated value
When I try
new () = new RxPubSub(Subject<'T>())
I get this error:
This is not a valid object construction expression. Explicit object constructors must either call an alternate constructor or initialize all fields of the object and specify a call to a super class constructor. The type 'Fs.Rx.RxPubSub<_>' expects 1 type argument(s) but is given 0