Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save theodorzoulias/06d39354ab211d27ecdc65f91ec943b7 to your computer and use it in GitHub Desktop.
Save theodorzoulias/06d39354ab211d27ecdc65f91ec943b7 to your computer and use it in GitHub Desktop.
using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
public class ReplayLimitedSubscriptionsSubject<T> : ISubject<T>
{
private readonly ISubject<T> _subject = new Subject<T>();
private readonly int _replayMaxSubscriptions;
private ReplaySubject<T> _replaySubject;
private int _subscriptionsCount = 0;
public ReplayLimitedSubscriptionsSubject(int replayMaxSubscriptions)
{
if (replayMaxSubscriptions < 0)
throw new ArgumentOutOfRangeException(nameof(replayMaxSubscriptions));
_replayMaxSubscriptions = replayMaxSubscriptions;
if (replayMaxSubscriptions > 0)
{
_replaySubject = new ReplaySubject<T>();
_replaySubject.Subscribe(_subject); // Propagate future notifications to the _subject
}
}
public void OnNext(T value) { lock (_subject) (_replaySubject ?? _subject).OnNext(value); }
public void OnError(Exception error) { lock (_subject) (_replaySubject ?? _subject).OnError(error); }
public void OnCompleted() { lock (_subject) (_replaySubject ?? _subject).OnCompleted(); }
public IDisposable Subscribe(IObserver<T> observer)
{
lock (_subject)
{
_subscriptionsCount++;
if (_subscriptionsCount <= _replayMaxSubscriptions && _replaySubject != null)
{
_replaySubject.Subscribe(observer).Dispose(); // Propagate the buffered notifications
if (_subscriptionsCount == _replayMaxSubscriptions)
{
_replaySubject.Dispose(); // Unsubscribe the _subject implicitly
_replaySubject = null;
}
}
return _subject.Subscribe(observer); // Subscribe for future notifications
}
}
}
public static class ReplayLimitedSubscriptionsSubjectExtensions
{
public static IConnectableObservable<T> ReplayLimitedSubscriptions<T>(
this IObservable<T> source, int replayMaxSubscriptions)
{
return source.Multicast(new ReplayLimitedSubscriptionsSubject<T>(replayMaxSubscriptions));
}
public static IObservable<TResult> ReplayLimitedSubscriptions<TSource, TResult>(
this IObservable<TSource> source, int replayMaxSubscriptions,
Func<IObservable<TSource>, IObservable<TResult>> selector)
{
return source.Multicast(() => new ReplayLimitedSubscriptionsSubject<TSource>(replayMaxSubscriptions), selector);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment