Skip to content

Instantly share code, notes, and snippets.

@StephenCleary
Created March 25, 2023 21:36
Show Gist options
  • Save StephenCleary/d58a9c2cbb61a483facd4242b5c85466 to your computer and use it in GitHub Desktop.
Save StephenCleary/d58a9c2cbb61a483facd4242b5c85466 to your computer and use it in GitHub Desktop.
Dynamic Task.WhenAll
// Developed as part of https://github.com/StephenCleary/StructuredConcurrency but wasn't necessary for that project.
/// <summary>
/// Similar to <see cref="Task.WhenAll{TResult}(Task{TResult}[])"/>, but allowing any number of tasks to be added, even after waiting has begun.
/// At least one task must be added, or else the <see cref="Task"/> will never complete.
/// </summary>
/// <typeparam name="TResult">The type of the result of the tasks.</typeparam>
public sealed class DynamicTaskWhenAll<TResult>
{
private readonly TaskCompletionSource<IReadOnlyList<TResult>> _taskCompletionSource = new();
private State _state = new(ImmutableQueue<Exception>.Empty, ImmutableList<TResult>.Empty, false, 0);
/// <summary>
/// Adds a task to this dynamic waiter.
/// Throws an exception if the wait has already completed.
/// </summary>
/// <param name="task">The task to add.</param>
/// <exception cref="InvalidOperationException">The dynamic waiter has already completed.</exception>
public void Add(Task<TResult> task)
{
_ = task ?? throw new ArgumentNullException(nameof(task));
var localState = InterlockedEx.Apply(ref _state, x => x switch
{
{ Done: true } => x,
_ => x with { Count = x.Count + 1, Results = x.Results.Add(default!) },
});
if (localState.Done)
throw new InvalidOperationException($"{nameof(DynamicTaskWhenAll<TResult>)} has already completed.");
var index = localState.Results.Count - 1;
Handle(task);
async void Handle(Task<TResult> task)
{
#pragma warning disable CA1031 // Do not catch general exception types
try
{
var result = await task.ConfigureAwait(false);
var localState = InterlockedEx.Apply(ref _state, x => x switch
{
{ Done: true } => x,
{ Count: 1 } => x with { Done = true, Count = 0, Results = x.Results.SetItem(index, result) },
_ => x with { Count = x.Count - 1, Results = x.Results.SetItem(index, result) },
});
Complete(localState);
}
catch (Exception ex)
{
var localState = InterlockedEx.Apply(ref _state, x => x switch
{
{ Done: true } => x,
{ Count: 1 } => x with { Done = true, Count = 0, Results = ImmutableList<TResult>.Empty, Exceptions = x.Exceptions.Enqueue(ex) },
_ => x with { Count = x.Count - 1, Results = ImmutableList<TResult>.Empty, Exceptions = x.Exceptions.Enqueue(ex) },
});
Complete(localState);
}
#pragma warning restore CA1031 // Do not catch general exception types
void Complete(State localState)
{
if (!localState.Done)
return;
if (localState.Exceptions.IsEmpty)
_taskCompletionSource.TrySetResult(localState.Results!);
else
_taskCompletionSource.TrySetException(localState.Exceptions);
}
}
}
/// <summary>
/// Gets a task which is completed when all tasks added to this dynamic awaiter have completed.
/// </summary>
public Task<IReadOnlyList<TResult>> Task => _taskCompletionSource.Task;
private record class State(ImmutableQueue<Exception> Exceptions, ImmutableList<TResult> Results, bool Done, uint Count);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment