-
-
Save Tirael/863d9e5a1ec6b463fe66609410a8d799 to your computer and use it in GitHub Desktop.
Parallel foreach async enumeration with maximum degree of parallelism
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Collections.Concurrent; | |
| using System.Collections.Generic; | |
| using System.Linq; | |
| using System.Threading.Tasks; | |
| namespace Parallel | |
| { | |
| public static class EnumerableExtensions | |
| { | |
| // Adapted from https://blogs.msdn.microsoft.com/pfxteam/2012/03/05/implementing-a-simple-foreachasync-part-2/ | |
| public static Task ForEachAsync<T>(this IEnumerable<T> source, int degreeOfParallelism, Func<T, Task> body, IProgress<T> progress = null) | |
| { | |
| return Task.WhenAll( | |
| Partitioner.Create(source).GetPartitions(degreeOfParallelism) | |
| .Select(partition => Task.Run(async () => { | |
| using (partition) | |
| while (partition.MoveNext()) | |
| { | |
| await body(partition.Current); | |
| progress?.Report(partition.Current); | |
| } | |
| })) | |
| ); | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Diagnostics; | |
| using System.Linq; | |
| using System.Threading.Tasks; | |
| using Parallel; | |
| class Program | |
| { | |
| static async Task Main() | |
| { | |
| var random = new Random(); | |
| var delays = Enumerable.Range(0, 42).Select(_ => random.Next(0, 100)).ToList(); | |
| IProgress<int> progress = new Progress<int>(i => Console.Write($"{i} ")); | |
| var stopwatch = Stopwatch.StartNew(); | |
| foreach (var i in delays) | |
| { | |
| await Task.Delay(i); | |
| progress.Report(i); | |
| } | |
| Console.WriteLine($"{Environment.NewLine}Sequential execution: {stopwatch.Elapsed.TotalSeconds} seconds"); | |
| stopwatch.Restart(); | |
| await delays.ForEachAsync(10, async i => await Task.Delay(i), progress); | |
| Console.WriteLine($"{Environment.NewLine}Parallel execution: {stopwatch.Elapsed.TotalSeconds} seconds"); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment