Created
April 15, 2012 02:05
-
-
Save ajaykalra/2389331 to your computer and use it in GitHub Desktop.
Reactive Observable Extensions for subscribe/Dispose
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Reactive.Disposables; | |
namespace Reactive.Samples | |
{ | |
// For detail see the following post: | |
// http://ajkalra.blogspot.com/2012/04/subscription-snooping-in-rx.html | |
// | |
public static class ObservableExtensions | |
{ | |
/// <summary> | |
/// This method will call the action BEFORE subsctiption takes place | |
/// </summary> | |
/// <typeparam name="TSource"> Type of the observable</typeparam> | |
/// <param name="source">Observable being extended</param> | |
/// <param name="onSubscribing">Action to be called when about to subscribe</param> | |
/// <returns>The observable</returns> | |
public static IObservable<TSource> OnSubscribing<TSource>(this IObservable<TSource> source, | |
Action onSubscribing) | |
{ | |
if (source == null) | |
{ | |
throw new ArgumentNullException("source"); | |
} | |
if (onSubscribing == null) | |
{ | |
throw new ArgumentNullException("onSubscribing"); | |
} | |
return new StateObservable<TSource>(source, | |
onSubscribing, | |
() => { }, | |
() => { }, | |
() => { }); | |
} | |
/// <summary> | |
/// This method will call the action AFTER subsctiption takes place | |
/// </summary> | |
/// <typeparam name="TSource">The type of the source.</typeparam> | |
/// <param name="source">The source.</param> | |
/// <param name="onSubscribed">Action to be executed after the source observable has been subscribed</param> | |
/// <returns>The observable</returns> | |
public static IObservable<TSource> OnSubscribed<TSource>(this IObservable<TSource> source, | |
Action onSubscribed) | |
{ | |
if (source == null) | |
{ | |
throw new ArgumentNullException("source"); | |
} | |
if (onSubscribed == null) | |
{ | |
throw new ArgumentNullException("onSubscribed"); | |
} | |
return new StateObservable<TSource>(source, | |
() => { }, | |
onSubscribed, | |
() => { }, | |
() => { }); | |
} | |
/// <summary> | |
/// This method will call the action BEFORE the subscription is diposed | |
/// </summary> | |
/// <typeparam name="TSource">The type of the source.</typeparam> | |
/// <param name="source">The source.</param> | |
/// <param name="onDisposing">Action to be exectued before the subscription is about to be disposed</param> | |
/// <returns>The observable</returns> | |
public static IObservable<TSource> OnDisposing<TSource>(this IObservable<TSource> source, | |
Action onDisposing) | |
{ | |
if (source == null) | |
{ | |
throw new ArgumentNullException("source"); | |
} | |
if (onDisposing == null) | |
{ | |
throw new ArgumentNullException("onDisposing"); | |
} | |
return new StateObservable<TSource>(source, | |
() => { }, | |
() => { }, | |
onDisposing, | |
() => { }); | |
} | |
/// <summary> | |
/// This method will call the action AFTER the subscription is diposed | |
/// </summary> | |
/// <typeparam name="TSource">The type of the source.</typeparam> | |
/// <param name="source">The source.</param> | |
/// <param name="onDisposed">Action to be exectued before the subscription is about to be disposed</param> | |
/// <returns>The observable</returns> | |
public static IObservable<TSource> OnDisposed<TSource>(this IObservable<TSource> source, | |
Action onDisposed) | |
{ | |
if (source == null) | |
{ | |
throw new ArgumentNullException("source"); | |
} | |
if (onDisposed == null) | |
{ | |
throw new ArgumentNullException("onDisposed"); | |
} | |
return new StateObservable<TSource>(source, | |
() => { }, | |
() => { }, | |
() => { }, | |
onDisposed); | |
} | |
/// <summary> | |
/// This method will call various actions before/after the subscription is created/diposed | |
/// </summary> | |
/// <typeparam name="TSource">The type of the source.</typeparam> | |
/// <param name="source">The source.</param> | |
/// <param name="onSubscribing">Action to be called when about to subscribe</param> | |
/// <param name="onDisposed">Action to be exectued after the subscription has been disposed</param> | |
/// <returns>The observable</returns> | |
public static IObservable<TSource> OnSubscribeDispose<TSource>(this IObservable<TSource> source, | |
Action onSubscribing, | |
Action onDisposed) | |
{ | |
if (source == null) | |
{ | |
throw new ArgumentNullException("source"); | |
} | |
if (onSubscribing == null) | |
{ | |
throw new ArgumentNullException("onSubscribing"); | |
} | |
if (onDisposed == null) | |
{ | |
throw new ArgumentNullException("onDisposed"); | |
} | |
return new StateObservable<TSource>(source, | |
onSubscribing, | |
() => { }, | |
() => { }, | |
onDisposed); | |
} | |
/// <summary> | |
/// This method will call various actions before/after the subscription is created/diposed | |
/// </summary> | |
/// <typeparam name="TSource">The type of the source.</typeparam> | |
/// <param name="source">The source.</param> | |
/// <param name="onSubscribing">Action to be called when about to subscribe</param> | |
/// <param name="onSubscribed">Action to be exectued after the source observable has been subscribed</param> | |
/// <param name="onDisposing">Action to be exectued before the subscription is about to be disposed</param> | |
/// <param name="onDisposed">Action to be exectued after the subscription has been disposed</param> | |
/// <returns>The observable</returns> | |
public static IObservable<TSource> WhenSubscribedOrDisposed<TSource>(this IObservable<TSource> source, | |
Action onSubscribing, | |
Action onSubscribed, | |
Action onDisposing, | |
Action onDisposed) | |
{ | |
if (source == null) | |
{ | |
throw new ArgumentNullException("source"); | |
} | |
if (onSubscribing == null) | |
{ | |
throw new ArgumentNullException("onSubscribing"); | |
} | |
if (onSubscribed == null) | |
{ | |
throw new ArgumentNullException("onSubscribed"); | |
} | |
if (onDisposing== null) | |
{ | |
throw new ArgumentNullException("onDisposing"); | |
} | |
if (onDisposed == null) | |
{ | |
throw new ArgumentNullException("onDisposed"); | |
} | |
return new StateObservable<TSource>(source, | |
onSubscribing, | |
onSubscribed, | |
onDisposing, | |
onDisposed); | |
} | |
private class StateObservable<T> : IObservable<T> | |
{ | |
private readonly IObservable<T> _source; | |
private readonly Action _beforeSub; | |
private readonly Action _afterSub; | |
private readonly Action _beforeDispose; | |
private readonly Action _afterDispose; | |
public StateObservable( | |
IObservable<T> source, | |
Action beforeSub, | |
Action afterSub, | |
Action beforeDispose, | |
Action afterDispose) | |
{ | |
_source = source; | |
_beforeSub = beforeSub; | |
_afterSub = afterSub; | |
_beforeDispose = beforeDispose; | |
_afterDispose = afterDispose; | |
} | |
public IDisposable Subscribe(IObserver<T> observer) | |
{ | |
_beforeSub(); | |
var disposable = _source.Subscribe(observer); | |
_afterSub(); | |
return Disposable.Create( | |
() => | |
{ | |
_beforeDispose(); | |
disposable.Dispose(); | |
_afterDispose(); | |
}); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment