Last active
July 17, 2017 01:11
-
-
Save Porges/4b4d698c73fb44f9594cea52c6072c9b to your computer and use it in GitHub Desktop.
Support for LINQing over IAsyncEnumerable and or/async results
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Implementations for: | |
// Select | |
// - { Enumerable, AsyncEnumerable } * { Action, Func, Func<_,Task>, Func<_,Task<T>> } | |
// = 8 | |
// - Enumerable/Action and Enumerable/Func (bad? and already defined) | |
// = 6 | |
// + 2 CancellationToken versions | |
// = 8 | |
public static class LinqExt | |
{ | |
// "Void" methods | |
public static async Task Select<T>(this IAsyncEnumerable<T> source, Action<T> action) | |
{ | |
using (var enumer = source.GetEnumerator()) | |
{ | |
while (await enumer.MoveNext()) | |
{ | |
action(enumer.Current); | |
} | |
} | |
} | |
public static async Task Select<T>(this IEnumerable<T> source, Func<T, Task> action) | |
{ | |
foreach (var x in source) | |
{ | |
await action(x); | |
} | |
} | |
public static async Task Select<T>(this IAsyncEnumerable<T> source, Func<T, Task> action) | |
{ | |
using (var enumer = source.GetEnumerator()) | |
{ | |
while (await enumer.MoveNext()) | |
{ | |
await action(enumer.Current); | |
} | |
} | |
} | |
// Non-"void" methods | |
public static IAsyncEnumerable<TProj> Select<T, TProj>(this IAsyncEnumerable<T> source, Func<T, TProj> projection) | |
=> new Impl_AS<T, TProj>(source, projection); | |
private sealed class Impl_AS<T, TProj> : IAsyncEnumerable<TProj> | |
{ | |
IAsyncEnumerable<T> _source; | |
Func<T, TProj> _projection; | |
public Impl_AS(IAsyncEnumerable<T> source, Func<T, TProj> projection) | |
{ | |
_source = source; | |
_projection = projection; | |
} | |
public IAsyncEnumerator<TProj> GetEnumerator() => new E(_source, _projection); | |
private sealed class E : IAsyncEnumerator<TProj> | |
{ | |
IAsyncEnumerator<T> _sourceEnum; | |
Func<T, TProj> _projection; | |
public E(IAsyncEnumerable<T> source, Func<T, TProj> projection) | |
{ | |
_sourceEnum = source.GetEnumerator(); | |
_projection = projection; | |
} | |
public Task<bool> MoveNext(CancellationToken ct) => _sourceEnum.MoveNext(ct); | |
public TProj Current => _projection(_sourceEnum.Current); | |
public void Dispose() => _sourceEnum.Dispose(); | |
} | |
} | |
public static IAsyncEnumerable<TProj> Select<T, TProj>(this IEnumerable<T> source, Func<T, Task<TProj>> projection) | |
=> new Impl_SA<T, TProj>(source, projection); | |
private sealed class Impl_SA<T, TProj> : IAsyncEnumerable<TProj> | |
{ | |
IEnumerable<T> _source; | |
Func<T, Task<TProj>> _projection; | |
public Impl_SA(IEnumerable<T> source, Func<T, Task<TProj>> projection) | |
{ | |
_source = source; | |
_projection = projection; | |
} | |
public IAsyncEnumerator<TProj> GetEnumerator() => new E(_source, _projection); | |
private sealed class E : IAsyncEnumerator<TProj> | |
{ | |
IEnumerator<T> _sourceEnum; | |
Func<T, Task<TProj>> _projection; | |
public E(IEnumerable<T> source, Func<T, Task<TProj>> projection) | |
{ | |
_sourceEnum = source.GetEnumerator(); | |
_projection = projection; | |
} | |
public async Task<bool> MoveNext(CancellationToken ct) | |
{ | |
if (_sourceEnum.MoveNext()) | |
{ | |
Current = await _projection(_sourceEnum.Current); | |
return true; | |
} | |
return false; | |
} | |
public TProj Current { get; private set; } | |
public void Dispose() => _sourceEnum.Dispose(); | |
} | |
} | |
public static IAsyncEnumerable<TProj> Select<T, TProj>(this IEnumerable<T> source, Func<T, CancellationToken, Task<TProj>> projection) | |
=> new Impl_SA_C<T, TProj>(source, projection); | |
private sealed class Impl_SA_C<T, TProj> : IAsyncEnumerable<TProj> | |
{ | |
IEnumerable<T> _source; | |
Func<T, CancellationToken, Task<TProj>> _projection; | |
public Impl_SA_C(IEnumerable<T> source, Func<T, CancellationToken, Task<TProj>> projection) | |
{ | |
_source = source; | |
_projection = projection; | |
} | |
public IAsyncEnumerator<TProj> GetEnumerator() => new E(_source, _projection); | |
private sealed class E : IAsyncEnumerator<TProj> | |
{ | |
IEnumerator<T> _sourceEnum; | |
Func<T, CancellationToken, Task<TProj>> _projection; | |
public E(IEnumerable<T> source, Func<T, CancellationToken, Task<TProj>> projection) | |
{ | |
_sourceEnum = source.GetEnumerator(); | |
_projection = projection; | |
} | |
public async Task<bool> MoveNext(CancellationToken ct) | |
{ | |
if (_sourceEnum.MoveNext()) | |
{ | |
Current = await _projection(_sourceEnum.Current, ct); | |
return true; | |
} | |
return false; | |
} | |
public TProj Current { get; private set; } | |
public void Dispose() => _sourceEnum.Dispose(); | |
} | |
} | |
public static IAsyncEnumerable<TProj> Select<T, TProj>(this IAsyncEnumerable<T> source, Func<T, Task<TProj>> projection) | |
=> new Impl_AA<T, TProj>(source, projection); | |
private sealed class Impl_AA<T, TProj> : IAsyncEnumerable<TProj> | |
{ | |
IAsyncEnumerable<T> _source; | |
Func<T, Task<TProj>> _projection; | |
public Impl_AA(IAsyncEnumerable<T> source, Func<T, Task<TProj>> projection) | |
{ | |
_source = source; | |
_projection = projection; | |
} | |
public IAsyncEnumerator<TProj> GetEnumerator() => new E(_source, _projection); | |
private sealed class E : IAsyncEnumerator<TProj> | |
{ | |
IAsyncEnumerator<T> _sourceEnum; | |
Func<T, Task<TProj>> _projection; | |
public E(IAsyncEnumerable<T> source, Func<T, Task<TProj>> projection) | |
{ | |
_sourceEnum = source.GetEnumerator(); | |
_projection = projection; | |
} | |
public async Task<bool> MoveNext(CancellationToken ct) | |
{ | |
if (await _sourceEnum.MoveNext(ct)) | |
{ | |
Current = await _projection(_sourceEnum.Current); | |
return true; | |
} | |
return false; | |
} | |
public TProj Current { get; private set; } | |
public void Dispose() => _sourceEnum.Dispose(); | |
} | |
} | |
public static IAsyncEnumerable<TProj> Select<T, TProj>(this IAsyncEnumerable<T> source, Func<T, CancellationToken, Task<TProj>> projection) | |
=> new Impl_AA_C<T, TProj>(source, projection); | |
private sealed class Impl_AA_C<T, TProj> : IAsyncEnumerable<TProj> | |
{ | |
IAsyncEnumerable<T> _source; | |
Func<T, CancellationToken, Task<TProj>> _projection; | |
public Impl_AA_C(IAsyncEnumerable<T> source, Func<T, CancellationToken, Task<TProj>> projection) | |
{ | |
_source = source; | |
_projection = projection; | |
} | |
public IAsyncEnumerator<TProj> GetEnumerator() => new E(_source, _projection); | |
private sealed class E : IAsyncEnumerator<TProj> | |
{ | |
IAsyncEnumerator<T> _sourceEnum; | |
Func<T, CancellationToken, Task<TProj>> _projection; | |
public E(IAsyncEnumerable<T> source, Func<T, CancellationToken, Task<TProj>> projection) | |
{ | |
_sourceEnum = source.GetEnumerator(); | |
_projection = projection; | |
} | |
public async Task<bool> MoveNext(CancellationToken ct) | |
{ | |
if (await _sourceEnum.MoveNext(ct)) | |
{ | |
Current = await _projection(_sourceEnum.Current, ct); | |
return true; | |
} | |
return false; | |
} | |
public TProj Current { get; private set; } | |
public void Dispose() => _sourceEnum.Dispose(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment