Skip to content

Instantly share code, notes, and snippets.

@Luiz-Monad
Created February 14, 2017 16:10
Show Gist options
  • Save Luiz-Monad/ada7404a8fcce77a7889dc7b2597137d to your computer and use it in GitHub Desktop.
Save Luiz-Monad/ada7404a8fcce77a7889dc7b2597137d to your computer and use it in GitHub Desktop.
Parallel Task Constraining
public static class ParallelExtensions
{
/// <summary>
/// Enumerates through each item in a list in parallel
/// </summary>
public static void ForEachParallel<T>(this IEnumerable<T> list, Func<T, int, AsyncCallback, IAsyncResult> beginAction, Action<T, int, object, IAsyncResult> endAction, int parallelization = 0)
{
if (list == null) {
return;
}
var NumberOfParallelTasks = parallelization <= 0 ? Environment.ProcessorCount * 2 : parallelization;
var syncRoot = new object();
var enumerator = list.ToArray().AsEnumerable().GetEnumerator();
var resultList = new List<IAsyncResult>(NumberOfParallelTasks);
var exceptions = new List<List<Exception>>(NumberOfParallelTasks);
var counter = new CountdownEvent(NumberOfParallelTasks);
for (var i = 0; i <= NumberOfParallelTasks - 1; i++) {
exceptions[i] = new List<Exception>();
var ii = i; //dont capture the control variable
ThreadPool.QueueUserWorkItem(o => InvokeAction(resultList, enumerator, beginAction, endAction, syncRoot, ii, counter, exceptions));
}
counter.Wait();
foreach (var iAsyncResult in resultList.Where(e => e != null)) {
iAsyncResult.AsyncWaitHandle.Close();
}
var manyExceptions = exceptions.SelectMany(e => e);
if (manyExceptions.Any()) {
throw new AggregateException(manyExceptions.ToArray());
}
}
private static void InvokeAction<T>(
List<IAsyncResult> resultList,
IEnumerator<T> enumerator,
Func<T, int, AsyncCallback, IAsyncResult> beginAction,
Action<T, int, object, IAsyncResult> endAction,
object syncRoot,
int i,
CountdownEvent counter,
List<List<Exception>> exceptions)
{
T item;
var moveNext = false;
try {
lock (syncRoot) {
moveNext = enumerator.MoveNext();
item = enumerator.Current;
}
} catch (InvalidOperationException iex) {
return;
}
if (!moveNext) {
counter.Signal();
return;
}
Action<Exception> handler = ex =>
{
lock (resultList) {
exceptions[i].Add(ex);
}
};
beginAction = ExceptionWrap(beginAction, handler);
endAction = ExceptionWrap(endAction, handler);
var iAsyncResult = beginAction(item, i, ar =>
{
endAction(item, i, ar.AsyncState, ar);
ThreadPool.QueueUserWorkItem(o => InvokeAction(resultList, enumerator, beginAction, endAction, syncRoot, i, counter, exceptions));
});
lock (resultList) {
resultList.Add(iAsyncResult);
}
}
private static Func<A, B, C, D> ExceptionWrap<A, B, C, D>(Func<A, B, C, D> F, Action<Exception> E)
{
return ((A AA, B BB, C CC) =>
{
try {
return F(AA, BB, CC);
} catch (Exception ex) {
E(ex);
return default(D);
}
});
}
private static Action<A, B, C, D> ExceptionWrap<A, B, C, D>(Action<A, B, C, D> F, Action<Exception> E)
{
return ((A AA, B BB, C CC, D DD) =>
{
try {
F(AA, BB, CC, DD);
} catch (Exception ex) {
E(ex);
}
});
}
}
public class CompletedAsyncResult<T> : IAsyncResult
{
private T returnValue;
public CompletedAsyncResult(T returnValue, AsyncCallback callback)
{
this.returnValue = returnValue;
callback(this);
}
public T Value {
get { return returnValue; }
}
object IAsyncResult.AsyncState {
get { return null; }
}
WaitHandle IAsyncResult.AsyncWaitHandle {
get { return new ManualResetEvent(false); }
}
bool IAsyncResult.CompletedSynchronously {
get { return true; }
}
bool IAsyncResult.IsCompleted {
get { return true; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment