Last active
August 29, 2015 14:24
-
-
Save dcastro/b3553e29a59dfd4fe790 to your computer and use it in GitHub Desktop.
TakeWhileInclusive Async - using IAsyncEnumerable
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Using Interactive Extensions Async / Ix-Async | |
* https://www.nuget.org/packages/Ix-Async/ | |
* https://github.com/Reactive-Extensions/Rx.NET | |
*/ | |
var tasks = Enumerable.Range(0, 10).Select(i => Task.Run(() => | |
{ | |
Debug.WriteLine("evaluating " + i); | |
return i; | |
})); | |
var lessThan5Inclusive = await tasks | |
.ToAsyncEnumerable() | |
.TakeWhileInclusive(i => i < 5) | |
.ToList(); | |
public static class AsyncEnumerableExt | |
{ | |
public static IAsyncEnumerable<T> TakeWhileInclusive<T>(this IAsyncEnumerable<T> source, Predicate<T> pred) | |
{ | |
return new AnonymousEnumerable<T>(() => | |
new TakeWhileInclusiveEnumerator<T>(source.GetEnumerator(), pred)); | |
} | |
public static IAsyncEnumerable<T> ToAsyncEnumerable<T>(this IEnumerable<Task<T>> tasks) | |
{ | |
return AsyncEnumerable | |
.ToAsyncEnumerable(tasks) | |
.SelectMany(t => t.ToAsyncEnumerable()); | |
} | |
internal class AnonymousEnumerable<T> : IAsyncEnumerable<T> | |
{ | |
private readonly Func<IAsyncEnumerator<T>> _enumerator; | |
public AnonymousEnumerable(Func<IAsyncEnumerator<T>> enumerator) | |
{ | |
_enumerator = enumerator; | |
} | |
public IAsyncEnumerator<T> GetEnumerator() | |
{ | |
return _enumerator(); | |
} | |
} | |
internal class TakeWhileInclusiveEnumerator<T> : IAsyncEnumerator<T> | |
{ | |
private readonly IAsyncEnumerator<T> _source; | |
private readonly Predicate<T> _pred; | |
private bool _done = false; | |
public TakeWhileInclusiveEnumerator(IAsyncEnumerator<T> source, Predicate<T> pred) | |
{ | |
_source = source; | |
_pred = pred; | |
} | |
public void Dispose() | |
{ | |
_source.Dispose(); | |
} | |
public async Task<bool> MoveNext(CancellationToken token) | |
{ | |
if (_done || !await _source.MoveNext(token)) | |
return false; | |
Current = _source.Current; | |
// If the item does not match the predicate, then this is the last item we return | |
if (!_pred(Current)) | |
_done = true; | |
return true; | |
} | |
public T Current { get; private set; } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment