Skip to content

Instantly share code, notes, and snippets.

@VincentH-Net
Last active April 18, 2025 08:08
Show Gist options
  • Save VincentH-Net/3d40e7ef6dd7ff9b25313421dad5ed08 to your computer and use it in GitHub Desktop.
Save VincentH-Net/3d40e7ef6dd7ff9b25313421dad5ed08 to your computer and use it in GitHub Desktop.
BackgroundForEach helper - schedule background parallel processing of collection from a Microsoft Orleans grain
using System.Collections.Immutable;
using System.Diagnostics.CodeAnalysis;
namespace InnoWvate.Net;
public static partial class OrleansParallel
{
/// <summary>
/// We use a baseline of 100 tasks per CPU based on how many grain calls you can have outstanding before you start saturating network, activation churn, or memory.
/// This factor ensures that we utilize CPU well even when most grain calls are I/O bound (e.g. network calls) - so we favor throughput over latency.
/// </summary>
/// <remarks>Note that 0.5 vCPU in Azure Container Apps is rounded up to 1 reported processor, so both 0.5 vCPU and 1 vCPU have the same default max degree of parallelism.</remarks>
public static int DefaultMaxDegreeOfParallelism => Math.Max(1, Environment.ProcessorCount) * 100;
/// <summary>Calculates the maximum degree of parallelism based on the default max degree of parallelism and a fraction.</summary>
/// <param name="fraction">A fraction between 0 and 1 (inclusive)</param>
/// <returns>A maximum degree of parallelism >= 1</returns>
public static int FractionOfDefaultMaxDegreeOfParallelism(float fraction) => (fraction is < 0 or > 1)
? throw new ArgumentOutOfRangeException(nameof(fraction), "Must be between 0 and 1 inclusive")
: Math.Max(1, (int)Math.Round(DefaultMaxDegreeOfParallelism * fraction));
/// <summary>
/// Schedule background processing of items in an <see cref="IEnumerable{T}" /> with controlled paralellism. Allows per-item exception handling and loop completion handling.<br />
/// <br />
/// Safe to use in Orleans grain methods IF the callbacks do not access the grain state (it's fields/properties).<br />
/// It is safe to call grain methods from within <paramref name="body"/> by using method-local variables/parameters for grain references or for a grain factory (of the grain or injected); these variables will be captured in <paramref name="body"/>.<br />
/// See <a href="https://learn.microsoft.com/en-us/dotnet/orleans/grains/external-tasks-and-grains#example-make-a-grain-call-from-code-that-runs-on-a-thread-pool">Orleans guidance on external tasks and grains</a>
/// </summary>
/// <remarks>TIP: Wrap calls to this method in a local static method within a grain method, to prevent accessing grain state in the callbacks</remarks>
/// <typeparam name="T">The item type</typeparam>
/// <param name="items">The items to process</param>
/// <param name="body">Will be called for each item to process it</param>
/// <param name="error">
/// If specified, will be called for each item that raises an exception from within <paramref name="body"/>.<br />
/// - Processing of other items will continue, unless the exception is an <see cref="OperationCanceledException"/>.<br />
/// - If the operation is canceled, <paramref name="error"/> will be called with an <see cref="OperationCanceledException"/> for any items that were canceled while processing,
/// and for any items that did not yet start processing.<br />
/// - If the operation is aborted due to an exception raised outside of <paramref name="body"/>,
/// <paramref name="error"/> will be called with that exception for any items that did not yet start or complete processing.
/// </param>
/// <param name="done">If specified, will be called when all items are processed; will not not called if the operation was canceled or was aborted due to an exception raised outside of <paramref name="body"/></param>
/// <param name="maxDegreeOfParallelism">
/// The maximum number of items processed at the same time for this invocation of
/// <see cref="BackgroundForEach{T}(IEnumerable{T}, Func{T, CancellationToken, ValueTask}, Action{T, Exception}?, Action?, int, CancellationToken)">BackgroundForEach</see>;
/// must be greater than 0. Optional; default is <see cref="DefaultMaxDegreeOfParallelism">MaxDegreeOfParallelism</see>
/// </param>
/// <param name="cancellationToken">Optional cancellation token; will be passed in to <paramref name="body"/></param>
public static void BackgroundForEach<T>(
IEnumerable<T> items,
Func<T, CancellationToken, ValueTask> body,
Action<T, Exception>? error = null,
Action? done = null,
int? maxDegreeOfParallelism = null,
CancellationToken cancellationToken = default
)
{
maxDegreeOfParallelism ??= DefaultMaxDegreeOfParallelism;
if (maxDegreeOfParallelism < 1)
throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism), "must be greater than 0");
var itemsSnapshot = items.ToImmutableArray();
bool[] itemsProcessed = new bool[itemsSnapshot.Length];
_ = Task.Run(async () =>
{
try
{
using var semaphore = new SemaphoreSlim(maxDegreeOfParallelism.Value);
var tasks = itemsSnapshot.Select(async (item, index) =>
{
await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try {
cancellationToken.ThrowIfCancellationRequested();
await body(item, cancellationToken).ConfigureAwait(false);
itemsProcessed[index] = true;
}
catch (OperationCanceledException) { throw; }
catch (Exception ex)
{
itemsProcessed[index] = true;
try { error?.Invoke(item, ex); } catch { }
}
finally { _ = semaphore.Release(); }
});
await Task.WhenAll(tasks).ConfigureAwait(false);
try { done?.Invoke(); } catch { }
}
catch (Exception ex)
{
if (error is null) return;
// Note that we cannot check the tasks status here because evaluating the tasks enumerator
// could start tasks in the enumerator that were not evaluated yet by Task.WhenAll; hence the use of itemsProcessed
for (int i = 0; i < itemsProcessed.Length; i++)
{
if (!itemsProcessed[i])
{
itemsProcessed[i] = true;
try { error(itemsSnapshot[i], ex); } catch { }
}
}
}
}, cancellationToken);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment