Last active
April 18, 2025 08:08
-
-
Save VincentH-Net/3d40e7ef6dd7ff9b25313421dad5ed08 to your computer and use it in GitHub Desktop.
BackgroundForEach helper - schedule background parallel processing of collection from a Microsoft Orleans grain
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Immutable; | |
using System.Diagnostics.CodeAnalysis; | |
namespace InnoWvate.Net; | |
public static partial class OrleansParallel | |
{ | |
/// <summary> | |
/// We use a baseline of 100 tasks per CPU based on how many grain calls you can have outstanding before you start saturating network, activation churn, or memory. | |
/// This factor ensures that we utilize CPU well even when most grain calls are I/O bound (e.g. network calls) - so we favor throughput over latency. | |
/// </summary> | |
/// <remarks>Note that 0.5 vCPU in Azure Container Apps is rounded up to 1 reported processor, so both 0.5 vCPU and 1 vCPU have the same default max degree of parallelism.</remarks> | |
public static int DefaultMaxDegreeOfParallelism => Math.Max(1, Environment.ProcessorCount) * 100; | |
/// <summary>Calculates the maximum degree of parallelism based on the default max degree of parallelism and a fraction.</summary> | |
/// <param name="fraction">A fraction between 0 and 1 (inclusive)</param> | |
/// <returns>A maximum degree of parallelism >= 1</returns> | |
public static int FractionOfDefaultMaxDegreeOfParallelism(float fraction) => (fraction is < 0 or > 1) | |
? throw new ArgumentOutOfRangeException(nameof(fraction), "Must be between 0 and 1 inclusive") | |
: Math.Max(1, (int)Math.Round(DefaultMaxDegreeOfParallelism * fraction)); | |
/// <summary> | |
/// Schedule background processing of items in an <see cref="IEnumerable{T}" /> with controlled paralellism. Allows per-item exception handling and loop completion handling.<br /> | |
/// <br /> | |
/// Safe to use in Orleans grain methods IF the callbacks do not access the grain state (it's fields/properties).<br /> | |
/// It is safe to call grain methods from within <paramref name="body"/> by using method-local variables/parameters for grain references or for a grain factory (of the grain or injected); these variables will be captured in <paramref name="body"/>.<br /> | |
/// See <a href="https://learn.microsoft.com/en-us/dotnet/orleans/grains/external-tasks-and-grains#example-make-a-grain-call-from-code-that-runs-on-a-thread-pool">Orleans guidance on external tasks and grains</a> | |
/// </summary> | |
/// <remarks>TIP: Wrap calls to this method in a local static method within a grain method, to prevent accessing grain state in the callbacks</remarks> | |
/// <typeparam name="T">The item type</typeparam> | |
/// <param name="items">The items to process</param> | |
/// <param name="body">Will be called for each item to process it</param> | |
/// <param name="error"> | |
/// If specified, will be called for each item that raises an exception from within <paramref name="body"/>.<br /> | |
/// - Processing of other items will continue, unless the exception is an <see cref="OperationCanceledException"/>.<br /> | |
/// - If the operation is canceled, <paramref name="error"/> will be called with an <see cref="OperationCanceledException"/> for any items that were canceled while processing, | |
/// and for any items that did not yet start processing.<br /> | |
/// - If the operation is aborted due to an exception raised outside of <paramref name="body"/>, | |
/// <paramref name="error"/> will be called with that exception for any items that did not yet start or complete processing. | |
/// </param> | |
/// <param name="done">If specified, will be called when all items are processed; will not not called if the operation was canceled or was aborted due to an exception raised outside of <paramref name="body"/></param> | |
/// <param name="maxDegreeOfParallelism"> | |
/// The maximum number of items processed at the same time for this invocation of | |
/// <see cref="BackgroundForEach{T}(IEnumerable{T}, Func{T, CancellationToken, ValueTask}, Action{T, Exception}?, Action?, int, CancellationToken)">BackgroundForEach</see>; | |
/// must be greater than 0. Optional; default is <see cref="DefaultMaxDegreeOfParallelism">MaxDegreeOfParallelism</see> | |
/// </param> | |
/// <param name="cancellationToken">Optional cancellation token; will be passed in to <paramref name="body"/></param> | |
public static void BackgroundForEach<T>( | |
IEnumerable<T> items, | |
Func<T, CancellationToken, ValueTask> body, | |
Action<T, Exception>? error = null, | |
Action? done = null, | |
int? maxDegreeOfParallelism = null, | |
CancellationToken cancellationToken = default | |
) | |
{ | |
maxDegreeOfParallelism ??= DefaultMaxDegreeOfParallelism; | |
if (maxDegreeOfParallelism < 1) | |
throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism), "must be greater than 0"); | |
var itemsSnapshot = items.ToImmutableArray(); | |
bool[] itemsProcessed = new bool[itemsSnapshot.Length]; | |
_ = Task.Run(async () => | |
{ | |
try | |
{ | |
using var semaphore = new SemaphoreSlim(maxDegreeOfParallelism.Value); | |
var tasks = itemsSnapshot.Select(async (item, index) => | |
{ | |
await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); | |
try { | |
cancellationToken.ThrowIfCancellationRequested(); | |
await body(item, cancellationToken).ConfigureAwait(false); | |
itemsProcessed[index] = true; | |
} | |
catch (OperationCanceledException) { throw; } | |
catch (Exception ex) | |
{ | |
itemsProcessed[index] = true; | |
try { error?.Invoke(item, ex); } catch { } | |
} | |
finally { _ = semaphore.Release(); } | |
}); | |
await Task.WhenAll(tasks).ConfigureAwait(false); | |
try { done?.Invoke(); } catch { } | |
} | |
catch (Exception ex) | |
{ | |
if (error is null) return; | |
// Note that we cannot check the tasks status here because evaluating the tasks enumerator | |
// could start tasks in the enumerator that were not evaluated yet by Task.WhenAll; hence the use of itemsProcessed | |
for (int i = 0; i < itemsProcessed.Length; i++) | |
{ | |
if (!itemsProcessed[i]) | |
{ | |
itemsProcessed[i] = true; | |
try { error(itemsSnapshot[i], ex); } catch { } | |
} | |
} | |
} | |
}, cancellationToken); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment