Skip to content

Instantly share code, notes, and snippets.

@Manuel-S
Created December 1, 2014 13:05
Show Gist options
  • Save Manuel-S/1fad0455d849e1e2df6c to your computer and use it in GitHub Desktop.
Save Manuel-S/1fad0455d849e1e2df6c to your computer and use it in GitHub Desktop.
BufferWhile Rx Extension
public static IObservable<IList<TSource>> BufferWhile<TSource>(
this IObservable<TSource> source,
Func<TSource, bool> condition)
{
return source.AggregateUntil(
() => new List<TSource>(),
(list, value) =>
{
list.Add(value);
return list;
},
list => !condition(list[list.Count - 1]));
}
public static IObservable<TAccumulate> AggregateUntil<TSource, TAccumulate>(
this IObservable<TSource> source,
Func<TAccumulate> seed,
Func<TAccumulate, TSource, TAccumulate> accumulator,
Func<TAccumulate, bool> predicate)
{
return Observable.CreateWithDisposable<TAccumulate>(observer =>
{
var accumulate = seed();
return source.Subscribe(value =>
{
accumulate = accumulator(accumulate, value);
if (predicate(accumulate))
{
observer.OnNext(accumulate);
accumulate = seed();
}
},
observer.OnError,
observer.OnCompleted);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment