Skip to content

Instantly share code, notes, and snippets.

@Dorus
Last active October 15, 2016 21:27
Show Gist options
  • Save Dorus/b86f3135f5dafc78c130 to your computer and use it in GitHub Desktop.
Save Dorus/b86f3135f5dafc78c130 to your computer and use it in GitHub Desktop.
RxExtensions.cs
public static class Extended {
public static IObservable<TSource> Amb<TSource>(this IObservable<IObservable<TSource>> source) {
return Observable.Create<TSource>(o => {
int first = -1;
return source.TakeWhile(_ => first == -1)
.Select((el, c) => el
.DoFirst(1, _ => Interlocked.CompareExchange(ref first, c, -1))
.TakeWhile(_ => first == c))
.Merge().Subscribe(o);
}
);
}
/*
* Like Do, but only for the first n elements.
*/
public static IObservable<TSource> DoFirst<TSource>(this IObservable<TSource> source, int n, Action<TSource> action) {
return Observable.Create<TSource>(o => {
var pub = source.Publish();
return new CompositeDisposable(
pub.Take(n).Subscribe(action),
pub.Subscribe(o),
pub.Connect());
}
);
}
// This is called `withLatestFrom` in RxJava.
public static IObservable<TResult> CombineLatestLeft<TSource1, TSouces2, TResult>(this IObservable<TSource1> source1, IObservable<TSouces2> source2, Func<TSource1, TSouces2, TResult> resultSelector) {
return source1.Select(e => new { isLeft = true, left = e, right = default(TSouces2) }).Merge(
source2.Select(e => new { isLeft = false, left = default(TSource1), right = e }))
.Scan(new { isLeft = false, left = default(TSource1), right = default(TSouces2) }, (o, n) => n.isLeft ? new { isLeft = true, left = n.left, right = o.right } : new { isLeft = false, left = default(TSource1), right = n.right })
.Where(e => e.isLeft)
.Select(e => resultSelector(e.left, e.right));
}
/*
* Switch, but stay connected to observablse till they complete before switching again. Ignores all
* intermediate arrived observables.
*/
public static IObservable<TSource> SwitchFirst<TSource>(this IObservable<IObservable<TSource>> source) {
return Observable.Create<TSource>(o => {
int free = 1;
return source.Where(_ =>
Interlocked.CompareExchange(ref free, 0, 1) == 1
).Select(el => el.Finally(() => free = 1)).Switch().Subscribe(o);
}
);
}
public static IObservable<IObservable<TSource>> Window<TSource>(this IObservable<TSource> source, Func<TSource, bool> split) {
return Observable.Create<IObservable<TSource>>(o =>
{
var pub = source.Publish();
pub.Window(pub.Where(split)).Subscribe(o);
return pub.Connect();
});
}
/*
* Limit the flow to 1 element per timespan.
* Note: Will still enqueue 1 element per timespan even if downsteam observers are processing elements slower.
* Once downsteam observers release, all enqued elements can arrive in burst.
*/
public static IObservable<T> RateLimit<T>(this IObservable<T> source, TimeSpan minDelay)
{
return source.RateLimit(minDelay, Scheduler.CurrentThread);
}
/*
* Limit the flow to 1 element per timespan.
* Note: Will still enqueue 1 element per timespan even if downsteam observers are processing elements slower.
* Once downsteam observers release, all enqued elements can arrive in burst.
*/
public static IObservable<T> RateLimit<T>(this IObservable<T> source, TimeSpan minDelay, IScheduler scheduler)
{
return source.Scan(new { scheduledTime = scheduler.Now, item = default(T) }
, (tupel, item) => new { scheduledTime = scheduler.Now > tupel.scheduledTime + minDelay ? scheduler.Now : tupel.scheduledTime + minDelay, item = item })
.Delay(tup => Observable.Timer(tup.scheduledTime))
.Select(tup => tup.item);
}
//
// Summary:
// Add hearthBeat events if the the source emits no items during the timeout. Will repeat
// the hearthBeat events every inverval until the source emit gain.
//
// Parameters:
// source:
// Source sequence for HearthBeat.
//
// Type parameters:
// TSource:
// The type of the elements in the source sequence.
//
// Returns:
// The sequence with the hearthBeat added.
//
// Exceptions:
// T:System.ArgumentNullException:
// source is null.
//
// T:System.ArgumentOutOfRangeException:
// timeout or interval is less than TimeSpan.Zero.
//
public static IObservable<TSource> HearthBeat<TSource>(this IObservable<TSource> source, TimeSpan timeout, TimeSpan interval, Func<TSource> hearthBeatSelector)
{
var hearthBeat = Observable.Interval(interval).Select(e => hearthBeatSelector());
return source.Publish(_source => _source.Merge(_source
.StartWith(default(TSource))
.Select(e => Observable.Timer(timeout)
.SelectMany(hearthBeat)
.TakeUntil(_source.Concat(Observable.Return(default(TSource))))
).Switch()
));
}
// Like debounceTime from RxJs. ( http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-debounceTime )
public static IObservable<TSource> Debounce<TSource>(this IObservable<TSource> source, TimeSpan timeout)
{
return source
.Select(e => Observable.Return(e).Delay(timeout))
.Switch();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment