Last active
July 3, 2025 01:08
-
-
Save Ilchert/2cc3ad7811859feee03932e7da62af8a to your computer and use it in GitHub Desktop.
SelectAsync
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Concurrent; | |
var data = SelectAsync(Enumerable.Range(0, 10), 4, async p => | |
{ | |
await Task.Delay(Random.Shared.Next(250)); // Simulate some async work | |
return p.ToString(); | |
}); | |
await foreach (var item in data) | |
{ | |
Console.WriteLine(item); | |
} | |
Console.WriteLine("done"); | |
static async IAsyncEnumerable<TResult> SelectAsync<T, TResult>(IEnumerable<T> source, int dop, Func<T, Task<TResult>> selector) | |
{ | |
var currentIndex = 0; | |
var cache = new ConcurrentDictionary<int, object>(); | |
var work = Parallel.ForEachAsync(source.Select((p, i) => (i, p)), new ParallelOptions { MaxDegreeOfParallelism = dop }, async (item, _) => | |
{ | |
var t = selector(item.p); | |
await t; | |
cache.AddOrUpdate(item.i, static (key, newValue) => newValue, static (key, oldValue, newValue) => | |
{ | |
var tcs = (TaskCompletionSource)oldValue; | |
tcs.TrySetResult(); | |
return newValue; | |
}, t); | |
}); | |
while (!(work.IsCompleted && cache.IsEmpty)) | |
{ | |
if (cache.TryRemove(currentIndex, out var result)) | |
{ | |
yield return ((Task<TResult>)result).Result; | |
currentIndex++; | |
} | |
else | |
{ | |
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); | |
if (cache.TryAdd(currentIndex, tcs)) | |
{ | |
Console.WriteLine("waiting for " + currentIndex); | |
var completed = await Task.WhenAny(work, tcs.Task); | |
if (completed == work) | |
break; | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment