Skip to content

Instantly share code, notes, and snippets.

@mika76
Last active September 11, 2016 07:29
Show Gist options
  • Save mika76/c8118a7e5f6c53287bf6165fc04cc22a to your computer and use it in GitHub Desktop.
Save mika76/c8118a7e5f6c53287bf6165fc04cc22a to your computer and use it in GitHub Desktop.
rx spy
//from http://stackoverflow.com/questions/20220755/how-can-i-see-what-my-reactive-extensions-query-is-doing/20220756#20220756
public static IObservable<T> Spy<T>(this IObservable<T> source, string opName = null)
{
opName = opName ?? "IObservable";
Console.WriteLine("{0}: Observable obtained on Thread: {1}",
opName,
Thread.CurrentThread.ManagedThreadId);
return Observable.Create<T>(obs =>
{
Console.WriteLine("{0}: Subscribed to on Thread: {1}",
opName,
Thread.CurrentThread.ManagedThreadId);
try
{
var subscription = source
.Do(x => Console.WriteLine("{0}: OnNext({1}) on Thread: {2}",
opName,
x,
Thread.CurrentThread.ManagedThreadId),
ex => Console.WriteLine("{0}: OnError({1}) on Thread: {2}",
opName,
ex,
Thread.CurrentThread.ManagedThreadId),
() => Console.WriteLine("{0}: OnCompleted() on Thread: {1}",
opName,
Thread.CurrentThread.ManagedThreadId)
)
.Subscribe(obs);
return new CompositeDisposable(
subscription,
Disposable.Create(() => Console.WriteLine(
"{0}: Cleaned up on Thread: {1}",
opName,
Thread.CurrentThread.ManagedThreadId)));
}
finally
{
Console.WriteLine("{0}: Subscription completed.", opName);
}
});
}
//use:
Observable.Range(0, 1).Spy("Range").Subscribe();
//Range: Observable obtained on Thread: 7
//Range: Subscribed to on Thread: 7
//Range: Subscription completed.
//Range: OnNext(0) on Thread: 7
//Range: OnCompleted() on Thread: 7
//Range: Cleaned up on Thread: 7
Observable.Range(0, 1, Scheduler.Immediate).Spy("Range").Subscribe();
//Range: Observable obtained on Thread: 7
//Range: Subscribed to on Thread: 7
//Range: OnNext(0) on Thread: 7
//Range: OnCompleted() on Thread: 7
//Range: Subscription completed.
//Range: Cleaned up on Thread: 7
Observable.Range(0,3).Spy("Range").Scan((acc, i) => acc + i).Spy("Scan").Subscribe();
//Range: Observable obtained on Thread: 7
//Scan: Observable obtained on Thread: 7
//Scan: Subscribed to on Thread: 7
//Range: Subscribed to on Thread: 7
//Range: Subscription completed.
//Scan: Subscription completed.
//Range: OnNext(1) on Thread: 7
//Scan: OnNext(1) on Thread: 7
//Range: OnNext(2) on Thread: 7
//Scan: OnNext(3) on Thread: 7
//Range: OnCompleted() on Thread: 7
//Scan: OnCompleted() on Thread: 7
//Range: Cleaned up on Thread: 7
//Scan: Cleaned up on Thread: 7
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment