Skip to content

Instantly share code, notes, and snippets.

@neon-sunset
Last active March 8, 2024 17:48
Show Gist options
  • Select an option

  • Save neon-sunset/386ec6d9dd062ebe0e8f60e141f5be85 to your computer and use it in GitHub Desktop.

Select an option

Save neon-sunset/386ec6d9dd062ebe0e8f60e141f5be85 to your computer and use it in GitHub Desktop.
public static class AsyncEnumerableExtensions
{
/// <summary>
/// Will skip faulted tasks by default
/// </summary>
public static async IAsyncEnumerable<TResult?> ParallelSelectAsync<T, TResult>(
this IEnumerable<T> items,
Func<T, Task<TResult>> operation,
int parallelism = -1,
bool throwOnError = false)
{
if (parallelism <= -1)
{
parallelism = Environment.ProcessorCount;
}
using var enumerator = items.GetEnumerator();
var tasks = new HashSet<Task<TResult>>(parallelism);
for (var i = 0; i < parallelism; i++)
{
if (enumerator.MoveNext())
{
tasks.Add(operation(enumerator.Current));
}
}
while (tasks.Count > 0)
{
var completed = await Task.WhenAny(tasks);
tasks.Remove(completed);
if (completed.IsCompletedSuccessfully)
{
if (enumerator.MoveNext())
{
tasks.Add(operation(enumerator.Current));
}
yield return completed.Result;
}
else if (throwOnError && completed.IsFaulted)
{
throw new Exception("Cannot enumerate!", completed.Exception);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment