Skip to content

Instantly share code, notes, and snippets.

@hakonrossebo
Last active November 30, 2018 13:38
Show Gist options
  • Save hakonrossebo/d64e54a6ed7d4def0989716c18995ad8 to your computer and use it in GitHub Desktop.
Save hakonrossebo/d64e54a6ed7d4def0989716c18995ad8 to your computer and use it in GitHub Desktop.
// Copy of sourcefile from book
// https://github.com/rikace/fConcBook/blob/master/Chapter.06/TwitterEmotionAnalysis.cs/RxPubSub.cs
// Concurrency in .NET
// https://www.manning.com/books/concurrency-in-dotnet
using System;
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace RxPublisherSubscriber
{
//Listing 6.6 Reactive Publisher Subscriber in C#
public class RxPubSub<T> : IDisposable
{
private ISubject<T> subject; //#A
private readonly List<IObserver<T>> observers = new List<IObserver<T>>(); //#B
private readonly List<IDisposable> observables = new List<IDisposable>(); //#C
public RxPubSub(ISubject<T> subject)
{
this.subject = subject; //#D
}
public RxPubSub() : this(new Subject<T>()) { } //#D
public IDisposable Subscribe(IObserver<T> observer)
{
observers.Add(observer);
observables.Add(this.subject.Subscribe(observer));
return new ObserverHandler<T>(observer, observers); //#E
}
public IDisposable AddPublisher(IObservable<T> observable) =>
observable.SubscribeOn(TaskPoolScheduler.Default).Subscribe(subject); //#F
public IObservable<T> AsObservable() => subject.AsObservable(); //#G
public void Dispose()
{
subject.OnCompleted();
observers.ForEach(x => x.OnCompleted());
observers.Clear(); //#H
}
}
class ObserverHandler<T> : IDisposable //#I
{
private readonly IObserver<T> observer;
private readonly List<IObserver<T>> observers;
public ObserverHandler(IObserver<T> observer, List<IObserver<T>> observers)
{
this.observer = observer;
this.observers = observers;
}
public void Dispose() //#I
{
observer.OnCompleted();
observers.Remove(observer);
}
}
}
// Initial F# implementation of C# file above
namespace Fs.Rx
open System
open System.Collections.Generic
open System.Reactive.Concurrency
open System.Reactive.Subjects
open System.Reactive.Linq
type ObserverHandler<'T>(observer: IObserver<'T>, observers: List<IObserver<'T>>) =
interface IDisposable with
member x.Dispose() =
observer.OnCompleted()
observers.Remove(observer) |> ignore
type RxPubSub<'T>(subject: ISubject<'T>) =
let mutable observers: List<IObserver<'T>> = new List<IObserver<'T>>()
let mutable observables: List<IDisposable> = new List<IDisposable>()
new () = RxPubSub(Subject<'T>())
interface IDisposable with
member x.Dispose() =
subject.OnCompleted()
observers.ForEach(fun x -> x.OnCompleted())
observers.Clear()
member x.Subscribe(observer: IObserver<'T>) =
observers.Add(observer)
observables.Add(subject.Subscribe(observer))
new ObserverHandler<'T>(observer, observers)
member x.AddPublisher(observable: IObservable<'T>) =
observable.SubscribeOn(TaskPoolScheduler.Default).Subscribe(subject);
member x.AsObservable() = subject.AsObservable();
@hakonrossebo
Copy link
Author

When I use
new () = RxPubSub(Subject<'T>())

I get this warning:
It is recommended that objects supporting the IDisposable interface are created using the syntax 'new Type(args)', rather than 'Type(args)' or 'Type' as a function value representing the constructor, to indicate that resources may be owned by the generated value

When I try
new () = new RxPubSub(Subject<'T>())

I get this error:
This is not a valid object construction expression. Explicit object constructors must either call an alternate constructor or initialize all fields of the object and specify a call to a super class constructor. The type 'Fs.Rx.RxPubSub<_>' expects 1 type argument(s) but is given 0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment