Skip to content

Instantly share code, notes, and snippets.

@Porges
Last active July 17, 2017 01:11
Show Gist options
  • Save Porges/4b4d698c73fb44f9594cea52c6072c9b to your computer and use it in GitHub Desktop.
Save Porges/4b4d698c73fb44f9594cea52c6072c9b to your computer and use it in GitHub Desktop.
Support for LINQing over IAsyncEnumerable and or/async results
// Implementations for:
// Select
// - { Enumerable, AsyncEnumerable } * { Action, Func, Func<_,Task>, Func<_,Task<T>> }
// = 8
// - Enumerable/Action and Enumerable/Func (bad? and already defined)
// = 6
// + 2 CancellationToken versions
// = 8
public static class LinqExt
{
// "Void" methods
public static async Task Select<T>(this IAsyncEnumerable<T> source, Action<T> action)
{
using (var enumer = source.GetEnumerator())
{
while (await enumer.MoveNext())
{
action(enumer.Current);
}
}
}
public static async Task Select<T>(this IEnumerable<T> source, Func<T, Task> action)
{
foreach (var x in source)
{
await action(x);
}
}
public static async Task Select<T>(this IAsyncEnumerable<T> source, Func<T, Task> action)
{
using (var enumer = source.GetEnumerator())
{
while (await enumer.MoveNext())
{
await action(enumer.Current);
}
}
}
// Non-"void" methods
public static IAsyncEnumerable<TProj> Select<T, TProj>(this IAsyncEnumerable<T> source, Func<T, TProj> projection)
=> new Impl_AS<T, TProj>(source, projection);
private sealed class Impl_AS<T, TProj> : IAsyncEnumerable<TProj>
{
IAsyncEnumerable<T> _source;
Func<T, TProj> _projection;
public Impl_AS(IAsyncEnumerable<T> source, Func<T, TProj> projection)
{
_source = source;
_projection = projection;
}
public IAsyncEnumerator<TProj> GetEnumerator() => new E(_source, _projection);
private sealed class E : IAsyncEnumerator<TProj>
{
IAsyncEnumerator<T> _sourceEnum;
Func<T, TProj> _projection;
public E(IAsyncEnumerable<T> source, Func<T, TProj> projection)
{
_sourceEnum = source.GetEnumerator();
_projection = projection;
}
public Task<bool> MoveNext(CancellationToken ct) => _sourceEnum.MoveNext(ct);
public TProj Current => _projection(_sourceEnum.Current);
public void Dispose() => _sourceEnum.Dispose();
}
}
public static IAsyncEnumerable<TProj> Select<T, TProj>(this IEnumerable<T> source, Func<T, Task<TProj>> projection)
=> new Impl_SA<T, TProj>(source, projection);
private sealed class Impl_SA<T, TProj> : IAsyncEnumerable<TProj>
{
IEnumerable<T> _source;
Func<T, Task<TProj>> _projection;
public Impl_SA(IEnumerable<T> source, Func<T, Task<TProj>> projection)
{
_source = source;
_projection = projection;
}
public IAsyncEnumerator<TProj> GetEnumerator() => new E(_source, _projection);
private sealed class E : IAsyncEnumerator<TProj>
{
IEnumerator<T> _sourceEnum;
Func<T, Task<TProj>> _projection;
public E(IEnumerable<T> source, Func<T, Task<TProj>> projection)
{
_sourceEnum = source.GetEnumerator();
_projection = projection;
}
public async Task<bool> MoveNext(CancellationToken ct)
{
if (_sourceEnum.MoveNext())
{
Current = await _projection(_sourceEnum.Current);
return true;
}
return false;
}
public TProj Current { get; private set; }
public void Dispose() => _sourceEnum.Dispose();
}
}
public static IAsyncEnumerable<TProj> Select<T, TProj>(this IEnumerable<T> source, Func<T, CancellationToken, Task<TProj>> projection)
=> new Impl_SA_C<T, TProj>(source, projection);
private sealed class Impl_SA_C<T, TProj> : IAsyncEnumerable<TProj>
{
IEnumerable<T> _source;
Func<T, CancellationToken, Task<TProj>> _projection;
public Impl_SA_C(IEnumerable<T> source, Func<T, CancellationToken, Task<TProj>> projection)
{
_source = source;
_projection = projection;
}
public IAsyncEnumerator<TProj> GetEnumerator() => new E(_source, _projection);
private sealed class E : IAsyncEnumerator<TProj>
{
IEnumerator<T> _sourceEnum;
Func<T, CancellationToken, Task<TProj>> _projection;
public E(IEnumerable<T> source, Func<T, CancellationToken, Task<TProj>> projection)
{
_sourceEnum = source.GetEnumerator();
_projection = projection;
}
public async Task<bool> MoveNext(CancellationToken ct)
{
if (_sourceEnum.MoveNext())
{
Current = await _projection(_sourceEnum.Current, ct);
return true;
}
return false;
}
public TProj Current { get; private set; }
public void Dispose() => _sourceEnum.Dispose();
}
}
public static IAsyncEnumerable<TProj> Select<T, TProj>(this IAsyncEnumerable<T> source, Func<T, Task<TProj>> projection)
=> new Impl_AA<T, TProj>(source, projection);
private sealed class Impl_AA<T, TProj> : IAsyncEnumerable<TProj>
{
IAsyncEnumerable<T> _source;
Func<T, Task<TProj>> _projection;
public Impl_AA(IAsyncEnumerable<T> source, Func<T, Task<TProj>> projection)
{
_source = source;
_projection = projection;
}
public IAsyncEnumerator<TProj> GetEnumerator() => new E(_source, _projection);
private sealed class E : IAsyncEnumerator<TProj>
{
IAsyncEnumerator<T> _sourceEnum;
Func<T, Task<TProj>> _projection;
public E(IAsyncEnumerable<T> source, Func<T, Task<TProj>> projection)
{
_sourceEnum = source.GetEnumerator();
_projection = projection;
}
public async Task<bool> MoveNext(CancellationToken ct)
{
if (await _sourceEnum.MoveNext(ct))
{
Current = await _projection(_sourceEnum.Current);
return true;
}
return false;
}
public TProj Current { get; private set; }
public void Dispose() => _sourceEnum.Dispose();
}
}
public static IAsyncEnumerable<TProj> Select<T, TProj>(this IAsyncEnumerable<T> source, Func<T, CancellationToken, Task<TProj>> projection)
=> new Impl_AA_C<T, TProj>(source, projection);
private sealed class Impl_AA_C<T, TProj> : IAsyncEnumerable<TProj>
{
IAsyncEnumerable<T> _source;
Func<T, CancellationToken, Task<TProj>> _projection;
public Impl_AA_C(IAsyncEnumerable<T> source, Func<T, CancellationToken, Task<TProj>> projection)
{
_source = source;
_projection = projection;
}
public IAsyncEnumerator<TProj> GetEnumerator() => new E(_source, _projection);
private sealed class E : IAsyncEnumerator<TProj>
{
IAsyncEnumerator<T> _sourceEnum;
Func<T, CancellationToken, Task<TProj>> _projection;
public E(IAsyncEnumerable<T> source, Func<T, CancellationToken, Task<TProj>> projection)
{
_sourceEnum = source.GetEnumerator();
_projection = projection;
}
public async Task<bool> MoveNext(CancellationToken ct)
{
if (await _sourceEnum.MoveNext(ct))
{
Current = await _projection(_sourceEnum.Current, ct);
return true;
}
return false;
}
public TProj Current { get; private set; }
public void Dispose() => _sourceEnum.Dispose();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment