Created
March 25, 2023 21:36
-
-
Save StephenCleary/d58a9c2cbb61a483facd4242b5c85466 to your computer and use it in GitHub Desktop.
Dynamic Task.WhenAll
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Developed as part of https://github.com/StephenCleary/StructuredConcurrency but wasn't necessary for that project. | |
/// <summary> | |
/// Similar to <see cref="Task.WhenAll{TResult}(Task{TResult}[])"/>, but allowing any number of tasks to be added, even after waiting has begun. | |
/// At least one task must be added, or else the <see cref="Task"/> will never complete. | |
/// </summary> | |
/// <typeparam name="TResult">The type of the result of the tasks.</typeparam> | |
public sealed class DynamicTaskWhenAll<TResult> | |
{ | |
private readonly TaskCompletionSource<IReadOnlyList<TResult>> _taskCompletionSource = new(); | |
private State _state = new(ImmutableQueue<Exception>.Empty, ImmutableList<TResult>.Empty, false, 0); | |
/// <summary> | |
/// Adds a task to this dynamic waiter. | |
/// Throws an exception if the wait has already completed. | |
/// </summary> | |
/// <param name="task">The task to add.</param> | |
/// <exception cref="InvalidOperationException">The dynamic waiter has already completed.</exception> | |
public void Add(Task<TResult> task) | |
{ | |
_ = task ?? throw new ArgumentNullException(nameof(task)); | |
var localState = InterlockedEx.Apply(ref _state, x => x switch | |
{ | |
{ Done: true } => x, | |
_ => x with { Count = x.Count + 1, Results = x.Results.Add(default!) }, | |
}); | |
if (localState.Done) | |
throw new InvalidOperationException($"{nameof(DynamicTaskWhenAll<TResult>)} has already completed."); | |
var index = localState.Results.Count - 1; | |
Handle(task); | |
async void Handle(Task<TResult> task) | |
{ | |
#pragma warning disable CA1031 // Do not catch general exception types | |
try | |
{ | |
var result = await task.ConfigureAwait(false); | |
var localState = InterlockedEx.Apply(ref _state, x => x switch | |
{ | |
{ Done: true } => x, | |
{ Count: 1 } => x with { Done = true, Count = 0, Results = x.Results.SetItem(index, result) }, | |
_ => x with { Count = x.Count - 1, Results = x.Results.SetItem(index, result) }, | |
}); | |
Complete(localState); | |
} | |
catch (Exception ex) | |
{ | |
var localState = InterlockedEx.Apply(ref _state, x => x switch | |
{ | |
{ Done: true } => x, | |
{ Count: 1 } => x with { Done = true, Count = 0, Results = ImmutableList<TResult>.Empty, Exceptions = x.Exceptions.Enqueue(ex) }, | |
_ => x with { Count = x.Count - 1, Results = ImmutableList<TResult>.Empty, Exceptions = x.Exceptions.Enqueue(ex) }, | |
}); | |
Complete(localState); | |
} | |
#pragma warning restore CA1031 // Do not catch general exception types | |
void Complete(State localState) | |
{ | |
if (!localState.Done) | |
return; | |
if (localState.Exceptions.IsEmpty) | |
_taskCompletionSource.TrySetResult(localState.Results!); | |
else | |
_taskCompletionSource.TrySetException(localState.Exceptions); | |
} | |
} | |
} | |
/// <summary> | |
/// Gets a task which is completed when all tasks added to this dynamic awaiter have completed. | |
/// </summary> | |
public Task<IReadOnlyList<TResult>> Task => _taskCompletionSource.Task; | |
private record class State(ImmutableQueue<Exception> Exceptions, ImmutableList<TResult> Results, bool Done, uint Count); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment