Created
February 14, 2017 16:10
-
-
Save Luiz-Monad/ada7404a8fcce77a7889dc7b2597137d to your computer and use it in GitHub Desktop.
Parallel Task Constraining
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class ParallelExtensions | |
{ | |
/// <summary> | |
/// Enumerates through each item in a list in parallel | |
/// </summary> | |
public static void ForEachParallel<T>(this IEnumerable<T> list, Func<T, int, AsyncCallback, IAsyncResult> beginAction, Action<T, int, object, IAsyncResult> endAction, int parallelization = 0) | |
{ | |
if (list == null) { | |
return; | |
} | |
var NumberOfParallelTasks = parallelization <= 0 ? Environment.ProcessorCount * 2 : parallelization; | |
var syncRoot = new object(); | |
var enumerator = list.ToArray().AsEnumerable().GetEnumerator(); | |
var resultList = new List<IAsyncResult>(NumberOfParallelTasks); | |
var exceptions = new List<List<Exception>>(NumberOfParallelTasks); | |
var counter = new CountdownEvent(NumberOfParallelTasks); | |
for (var i = 0; i <= NumberOfParallelTasks - 1; i++) { | |
exceptions[i] = new List<Exception>(); | |
var ii = i; //dont capture the control variable | |
ThreadPool.QueueUserWorkItem(o => InvokeAction(resultList, enumerator, beginAction, endAction, syncRoot, ii, counter, exceptions)); | |
} | |
counter.Wait(); | |
foreach (var iAsyncResult in resultList.Where(e => e != null)) { | |
iAsyncResult.AsyncWaitHandle.Close(); | |
} | |
var manyExceptions = exceptions.SelectMany(e => e); | |
if (manyExceptions.Any()) { | |
throw new AggregateException(manyExceptions.ToArray()); | |
} | |
} | |
private static void InvokeAction<T>( | |
List<IAsyncResult> resultList, | |
IEnumerator<T> enumerator, | |
Func<T, int, AsyncCallback, IAsyncResult> beginAction, | |
Action<T, int, object, IAsyncResult> endAction, | |
object syncRoot, | |
int i, | |
CountdownEvent counter, | |
List<List<Exception>> exceptions) | |
{ | |
T item; | |
var moveNext = false; | |
try { | |
lock (syncRoot) { | |
moveNext = enumerator.MoveNext(); | |
item = enumerator.Current; | |
} | |
} catch (InvalidOperationException iex) { | |
return; | |
} | |
if (!moveNext) { | |
counter.Signal(); | |
return; | |
} | |
Action<Exception> handler = ex => | |
{ | |
lock (resultList) { | |
exceptions[i].Add(ex); | |
} | |
}; | |
beginAction = ExceptionWrap(beginAction, handler); | |
endAction = ExceptionWrap(endAction, handler); | |
var iAsyncResult = beginAction(item, i, ar => | |
{ | |
endAction(item, i, ar.AsyncState, ar); | |
ThreadPool.QueueUserWorkItem(o => InvokeAction(resultList, enumerator, beginAction, endAction, syncRoot, i, counter, exceptions)); | |
}); | |
lock (resultList) { | |
resultList.Add(iAsyncResult); | |
} | |
} | |
private static Func<A, B, C, D> ExceptionWrap<A, B, C, D>(Func<A, B, C, D> F, Action<Exception> E) | |
{ | |
return ((A AA, B BB, C CC) => | |
{ | |
try { | |
return F(AA, BB, CC); | |
} catch (Exception ex) { | |
E(ex); | |
return default(D); | |
} | |
}); | |
} | |
private static Action<A, B, C, D> ExceptionWrap<A, B, C, D>(Action<A, B, C, D> F, Action<Exception> E) | |
{ | |
return ((A AA, B BB, C CC, D DD) => | |
{ | |
try { | |
F(AA, BB, CC, DD); | |
} catch (Exception ex) { | |
E(ex); | |
} | |
}); | |
} | |
} | |
public class CompletedAsyncResult<T> : IAsyncResult | |
{ | |
private T returnValue; | |
public CompletedAsyncResult(T returnValue, AsyncCallback callback) | |
{ | |
this.returnValue = returnValue; | |
callback(this); | |
} | |
public T Value { | |
get { return returnValue; } | |
} | |
object IAsyncResult.AsyncState { | |
get { return null; } | |
} | |
WaitHandle IAsyncResult.AsyncWaitHandle { | |
get { return new ManualResetEvent(false); } | |
} | |
bool IAsyncResult.CompletedSynchronously { | |
get { return true; } | |
} | |
bool IAsyncResult.IsCompleted { | |
get { return true; } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment