Skip to content

Instantly share code, notes, and snippets.

@devhawk
Last active December 12, 2015 03:18
Show Gist options
  • Select an option

  • Save devhawk/4705696 to your computer and use it in GitHub Desktop.

Select an option

Save devhawk/4705696 to your computer and use it in GitHub Desktop.
static class ObservableExtensions
{
public static IObservable<TSource> CompleteAfter<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
{
return new AnonymousObservable<TSource>(observer =>
{
var complete = false;
return source.Subscribe(
x =>
{
if (!complete)
{
try
{
complete = predicate(x);
}
catch (Exception exception)
{
observer.OnError(exception);
return;
}
observer.OnNext(x);
if (complete)
observer.OnCompleted();
}
},
observer.OnError,
observer.OnCompleted);
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment