Last active
April 19, 2016 14:01
-
-
Save smithkl42/ca5b98e909952eddef1e3a076c0dd91d to your computer and use it in GitHub Desktop.
Parallel helpers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Threading.Tasks; | |
using System.Threading.Tasks.Dataflow; | |
using MoreLinq; | |
namespace Payboard.Common | |
{ | |
public static class ParallelHelpers | |
{ | |
/// <summary> | |
/// Iterate asynchronously in parallel over a data source, but only with a given degree of parallelization, so that we | |
/// don't DOS ourselves. | |
/// </summary> | |
public static async Task ForEachParallel<T>(this IEnumerable<T> list, Func<T, Task> action, | |
int maxParallelization = 100) | |
{ | |
// Create the execution block | |
var options = new ExecutionDataflowBlockOptions | |
{ | |
MaxDegreeOfParallelism = maxParallelization | |
}; | |
var sendMessageBlock = new ActionBlock<T>(async item => { await action(item); }, options); | |
// Send everything to the execution block and wait for it to finish | |
list.ForEach(item => sendMessageBlock.Post(item)); | |
sendMessageBlock.Complete(); | |
await sendMessageBlock.Completion; | |
} | |
/// <summary> | |
/// Select asynchronously parallel from a data source, but only with a given degree of parallelization, so that we | |
/// don't DOS ourselves. | |
/// </summary> | |
public static async Task<List<TResult>> SelectParallel<TSource, TResult>(this IEnumerable<TSource> list, | |
Func<TSource, Task<TResult>> mapFunc, | |
int maxParallelization = 100) | |
{ | |
// Create the execution block | |
var options = new ExecutionDataflowBlockOptions | |
{ | |
MaxDegreeOfParallelism = maxParallelization | |
}; | |
var results = new List<TResult>(); | |
var sendMessageBlock = new ActionBlock<TSource>(async item => | |
{ | |
var result = await mapFunc(item); | |
lock (results) | |
{ | |
results.Add(result); | |
} | |
}, options); | |
// Send everything to the execution block and wait for it to finish | |
list.ForEach(cu => sendMessageBlock.Post(cu)); | |
sendMessageBlock.Complete(); | |
await sendMessageBlock.Completion; | |
return results; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment