Skip to content

Instantly share code, notes, and snippets.

@Tirael
Forked from 0xced/ForEachAsync.cs
Created June 5, 2020 11:41
Show Gist options
  • Select an option

  • Save Tirael/863d9e5a1ec6b463fe66609410a8d799 to your computer and use it in GitHub Desktop.

Select an option

Save Tirael/863d9e5a1ec6b463fe66609410a8d799 to your computer and use it in GitHub Desktop.
Parallel foreach async enumeration with maximum degree of parallelism
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Parallel
{
public static class EnumerableExtensions
{
// Adapted from https://blogs.msdn.microsoft.com/pfxteam/2012/03/05/implementing-a-simple-foreachasync-part-2/
public static Task ForEachAsync<T>(this IEnumerable<T> source, int degreeOfParallelism, Func<T, Task> body, IProgress<T> progress = null)
{
return Task.WhenAll(
Partitioner.Create(source).GetPartitions(degreeOfParallelism)
.Select(partition => Task.Run(async () => {
using (partition)
while (partition.MoveNext())
{
await body(partition.Current);
progress?.Report(partition.Current);
}
}))
);
}
}
}
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Parallel;
class Program
{
static async Task Main()
{
var random = new Random();
var delays = Enumerable.Range(0, 42).Select(_ => random.Next(0, 100)).ToList();
IProgress<int> progress = new Progress<int>(i => Console.Write($"{i} "));
var stopwatch = Stopwatch.StartNew();
foreach (var i in delays)
{
await Task.Delay(i);
progress.Report(i);
}
Console.WriteLine($"{Environment.NewLine}Sequential execution: {stopwatch.Elapsed.TotalSeconds} seconds");
stopwatch.Restart();
await delays.ForEachAsync(10, async i => await Task.Delay(i), progress);
Console.WriteLine($"{Environment.NewLine}Parallel execution: {stopwatch.Elapsed.TotalSeconds} seconds");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment