Last active
September 11, 2016 07:29
-
-
Save mika76/c8118a7e5f6c53287bf6165fc04cc22a to your computer and use it in GitHub Desktop.
rx spy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//from http://stackoverflow.com/questions/20220755/how-can-i-see-what-my-reactive-extensions-query-is-doing/20220756#20220756 | |
public static IObservable<T> Spy<T>(this IObservable<T> source, string opName = null) | |
{ | |
opName = opName ?? "IObservable"; | |
Console.WriteLine("{0}: Observable obtained on Thread: {1}", | |
opName, | |
Thread.CurrentThread.ManagedThreadId); | |
return Observable.Create<T>(obs => | |
{ | |
Console.WriteLine("{0}: Subscribed to on Thread: {1}", | |
opName, | |
Thread.CurrentThread.ManagedThreadId); | |
try | |
{ | |
var subscription = source | |
.Do(x => Console.WriteLine("{0}: OnNext({1}) on Thread: {2}", | |
opName, | |
x, | |
Thread.CurrentThread.ManagedThreadId), | |
ex => Console.WriteLine("{0}: OnError({1}) on Thread: {2}", | |
opName, | |
ex, | |
Thread.CurrentThread.ManagedThreadId), | |
() => Console.WriteLine("{0}: OnCompleted() on Thread: {1}", | |
opName, | |
Thread.CurrentThread.ManagedThreadId) | |
) | |
.Subscribe(obs); | |
return new CompositeDisposable( | |
subscription, | |
Disposable.Create(() => Console.WriteLine( | |
"{0}: Cleaned up on Thread: {1}", | |
opName, | |
Thread.CurrentThread.ManagedThreadId))); | |
} | |
finally | |
{ | |
Console.WriteLine("{0}: Subscription completed.", opName); | |
} | |
}); | |
} | |
//use: | |
Observable.Range(0, 1).Spy("Range").Subscribe(); | |
//Range: Observable obtained on Thread: 7 | |
//Range: Subscribed to on Thread: 7 | |
//Range: Subscription completed. | |
//Range: OnNext(0) on Thread: 7 | |
//Range: OnCompleted() on Thread: 7 | |
//Range: Cleaned up on Thread: 7 | |
Observable.Range(0, 1, Scheduler.Immediate).Spy("Range").Subscribe(); | |
//Range: Observable obtained on Thread: 7 | |
//Range: Subscribed to on Thread: 7 | |
//Range: OnNext(0) on Thread: 7 | |
//Range: OnCompleted() on Thread: 7 | |
//Range: Subscription completed. | |
//Range: Cleaned up on Thread: 7 | |
Observable.Range(0,3).Spy("Range").Scan((acc, i) => acc + i).Spy("Scan").Subscribe(); | |
//Range: Observable obtained on Thread: 7 | |
//Scan: Observable obtained on Thread: 7 | |
//Scan: Subscribed to on Thread: 7 | |
//Range: Subscribed to on Thread: 7 | |
//Range: Subscription completed. | |
//Scan: Subscription completed. | |
//Range: OnNext(1) on Thread: 7 | |
//Scan: OnNext(1) on Thread: 7 | |
//Range: OnNext(2) on Thread: 7 | |
//Scan: OnNext(3) on Thread: 7 | |
//Range: OnCompleted() on Thread: 7 | |
//Scan: OnCompleted() on Thread: 7 | |
//Range: Cleaned up on Thread: 7 | |
//Scan: Cleaned up on Thread: 7 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment