Created
January 20, 2016 19:27
-
-
Save SuperJMN/8e5cdac40117a4bcec4c to your computer and use it in GitHub Desktop.
StackOverflow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
void Main() | |
{ | |
var xs = Observable.Interval(TimeSpan.FromSeconds(1)); | |
var bs = new Subject<bool>(); | |
var pxs = xs.Pausable(bs); | |
pxs.Subscribe(x => { Debug.WriteLine(x); }); | |
Thread.Sleep(10000); | |
bs.OnNext(true); | |
Thread.Sleep(10000); | |
} | |
public static class Extensions | |
{ | |
public static IObservable<T> Pausable<T>( | |
this IObservable<T> source, | |
IObservable<bool> pauser) | |
{ | |
return Observable.Create<T>(o => | |
{ | |
var paused = new SerialDisposable(); | |
var subscription = Observable.Publish(source, ps => | |
{ | |
var values = new ReplaySubject<T>(); | |
Func<bool, IObservable<T>> switcher = b => | |
{ | |
if (b) | |
{ | |
values.Dispose(); | |
values = new ReplaySubject<T>(); | |
paused.Disposable = ps.Subscribe(values); | |
return Observable.Empty<T>(); | |
} | |
else | |
{ | |
return values.Concat(ps); | |
} | |
}; | |
return pauser.StartWith(true).DistinctUntilChanged() | |
.Select(p => switcher(p)) | |
.Switch(); | |
}).Subscribe(o); | |
return new CompositeDisposable(subscription, paused); | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment