Last active
October 15, 2016 21:27
-
-
Save Dorus/b86f3135f5dafc78c130 to your computer and use it in GitHub Desktop.
RxExtensions.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class Extended { | |
public static IObservable<TSource> Amb<TSource>(this IObservable<IObservable<TSource>> source) { | |
return Observable.Create<TSource>(o => { | |
int first = -1; | |
return source.TakeWhile(_ => first == -1) | |
.Select((el, c) => el | |
.DoFirst(1, _ => Interlocked.CompareExchange(ref first, c, -1)) | |
.TakeWhile(_ => first == c)) | |
.Merge().Subscribe(o); | |
} | |
); | |
} | |
/* | |
* Like Do, but only for the first n elements. | |
*/ | |
public static IObservable<TSource> DoFirst<TSource>(this IObservable<TSource> source, int n, Action<TSource> action) { | |
return Observable.Create<TSource>(o => { | |
var pub = source.Publish(); | |
return new CompositeDisposable( | |
pub.Take(n).Subscribe(action), | |
pub.Subscribe(o), | |
pub.Connect()); | |
} | |
); | |
} | |
// This is called `withLatestFrom` in RxJava. | |
public static IObservable<TResult> CombineLatestLeft<TSource1, TSouces2, TResult>(this IObservable<TSource1> source1, IObservable<TSouces2> source2, Func<TSource1, TSouces2, TResult> resultSelector) { | |
return source1.Select(e => new { isLeft = true, left = e, right = default(TSouces2) }).Merge( | |
source2.Select(e => new { isLeft = false, left = default(TSource1), right = e })) | |
.Scan(new { isLeft = false, left = default(TSource1), right = default(TSouces2) }, (o, n) => n.isLeft ? new { isLeft = true, left = n.left, right = o.right } : new { isLeft = false, left = default(TSource1), right = n.right }) | |
.Where(e => e.isLeft) | |
.Select(e => resultSelector(e.left, e.right)); | |
} | |
/* | |
* Switch, but stay connected to observablse till they complete before switching again. Ignores all | |
* intermediate arrived observables. | |
*/ | |
public static IObservable<TSource> SwitchFirst<TSource>(this IObservable<IObservable<TSource>> source) { | |
return Observable.Create<TSource>(o => { | |
int free = 1; | |
return source.Where(_ => | |
Interlocked.CompareExchange(ref free, 0, 1) == 1 | |
).Select(el => el.Finally(() => free = 1)).Switch().Subscribe(o); | |
} | |
); | |
} | |
public static IObservable<IObservable<TSource>> Window<TSource>(this IObservable<TSource> source, Func<TSource, bool> split) { | |
return Observable.Create<IObservable<TSource>>(o => | |
{ | |
var pub = source.Publish(); | |
pub.Window(pub.Where(split)).Subscribe(o); | |
return pub.Connect(); | |
}); | |
} | |
/* | |
* Limit the flow to 1 element per timespan. | |
* Note: Will still enqueue 1 element per timespan even if downsteam observers are processing elements slower. | |
* Once downsteam observers release, all enqued elements can arrive in burst. | |
*/ | |
public static IObservable<T> RateLimit<T>(this IObservable<T> source, TimeSpan minDelay) | |
{ | |
return source.RateLimit(minDelay, Scheduler.CurrentThread); | |
} | |
/* | |
* Limit the flow to 1 element per timespan. | |
* Note: Will still enqueue 1 element per timespan even if downsteam observers are processing elements slower. | |
* Once downsteam observers release, all enqued elements can arrive in burst. | |
*/ | |
public static IObservable<T> RateLimit<T>(this IObservable<T> source, TimeSpan minDelay, IScheduler scheduler) | |
{ | |
return source.Scan(new { scheduledTime = scheduler.Now, item = default(T) } | |
, (tupel, item) => new { scheduledTime = scheduler.Now > tupel.scheduledTime + minDelay ? scheduler.Now : tupel.scheduledTime + minDelay, item = item }) | |
.Delay(tup => Observable.Timer(tup.scheduledTime)) | |
.Select(tup => tup.item); | |
} | |
// | |
// Summary: | |
// Add hearthBeat events if the the source emits no items during the timeout. Will repeat | |
// the hearthBeat events every inverval until the source emit gain. | |
// | |
// Parameters: | |
// source: | |
// Source sequence for HearthBeat. | |
// | |
// Type parameters: | |
// TSource: | |
// The type of the elements in the source sequence. | |
// | |
// Returns: | |
// The sequence with the hearthBeat added. | |
// | |
// Exceptions: | |
// T:System.ArgumentNullException: | |
// source is null. | |
// | |
// T:System.ArgumentOutOfRangeException: | |
// timeout or interval is less than TimeSpan.Zero. | |
// | |
public static IObservable<TSource> HearthBeat<TSource>(this IObservable<TSource> source, TimeSpan timeout, TimeSpan interval, Func<TSource> hearthBeatSelector) | |
{ | |
var hearthBeat = Observable.Interval(interval).Select(e => hearthBeatSelector()); | |
return source.Publish(_source => _source.Merge(_source | |
.StartWith(default(TSource)) | |
.Select(e => Observable.Timer(timeout) | |
.SelectMany(hearthBeat) | |
.TakeUntil(_source.Concat(Observable.Return(default(TSource)))) | |
).Switch() | |
)); | |
} | |
// Like debounceTime from RxJs. ( http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-debounceTime ) | |
public static IObservable<TSource> Debounce<TSource>(this IObservable<TSource> source, TimeSpan timeout) | |
{ | |
return source | |
.Select(e => Observable.Return(e).Delay(timeout)) | |
.Switch(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment