Skip to content

Instantly share code, notes, and snippets.

@Ilchert
Last active July 3, 2025 01:08
Show Gist options
  • Save Ilchert/2cc3ad7811859feee03932e7da62af8a to your computer and use it in GitHub Desktop.
Save Ilchert/2cc3ad7811859feee03932e7da62af8a to your computer and use it in GitHub Desktop.
SelectAsync
using System.Collections.Concurrent;
var data = SelectAsync(Enumerable.Range(0, 10), 4, async p =>
{
await Task.Delay(Random.Shared.Next(250)); // Simulate some async work
return p.ToString();
});
await foreach (var item in data)
{
Console.WriteLine(item);
}
Console.WriteLine("done");
static async IAsyncEnumerable<TResult> SelectAsync<T, TResult>(IEnumerable<T> source, int dop, Func<T, Task<TResult>> selector)
{
var currentIndex = 0;
var cache = new ConcurrentDictionary<int, object>();
var work = Parallel.ForEachAsync(source.Select((p, i) => (i, p)), new ParallelOptions { MaxDegreeOfParallelism = dop }, async (item, _) =>
{
var t = selector(item.p);
await t;
cache.AddOrUpdate(item.i, static (key, newValue) => newValue, static (key, oldValue, newValue) =>
{
var tcs = (TaskCompletionSource)oldValue;
tcs.TrySetResult();
return newValue;
}, t);
});
while (!(work.IsCompleted && cache.IsEmpty))
{
if (cache.TryRemove(currentIndex, out var result))
{
yield return ((Task<TResult>)result).Result;
currentIndex++;
}
else
{
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
if (cache.TryAdd(currentIndex, tcs))
{
Console.WriteLine("waiting for " + currentIndex);
var completed = await Task.WhenAny(work, tcs.Task);
if (completed == work)
break;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment