Skip to content

Instantly share code, notes, and snippets.

@Szer
Created July 3, 2021 09:59
Show Gist options
  • Select an option

  • Save Szer/d8d6e6c88bb36551c7a446374ce9e6dc to your computer and use it in GitHub Desktop.

Select an option

Save Szer/d8d6e6c88bb36551c7a446374ce9e6dc to your computer and use it in GitHub Desktop.
Tasks pipelining
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
public static class TaskExt
{
public static async IAsyncEnumerable<O> Map<I, O>(this IEnumerable<Func<Task<I>>> tasks, Func<I, O> map)
{
foreach (var task in tasks)
{
var i = await task();
yield return map(i);
}
}
public static IAsyncEnumerable<O> MapUnordered<I, O>(this IEnumerable<Func<Task<I>>> tasks, int degree,
Func<I, O> map)
{
//if (degree < 1) throw;
if (degree == 1) return tasks.Map(map);
var channel = Channel.CreateBounded<O>(degree);
Task.Run(async () =>
{
var limiter = new SemaphoreSlim(degree, degree);
var theEnd = new object();
foreach (var task in tasks)
{
await limiter.WaitAsync();
task().ContinueWith(async t =>
{
if (t.IsCompletedSuccessfully)
{
await channel.Writer.WriteAsync(map(t.Result));
if (theEnd == null && limiter.CurrentCount == degree)
{
channel.Writer.TryComplete();
}
}
else
channel.Writer.TryComplete(t.Exception);
limiter.Release();
});
}
theEnd = null;
}).ContinueWith(t =>
{
// just in cast something goes totally wrong to not let caller hang forever
if (!t.IsCompletedSuccessfully) channel.Writer.TryComplete(t.Exception);
});
return channel.Reader.ReadAllAsync();
}
}
public class Foo
{
public static async Task Main()
{
var r = new Random();
var sw = Stopwatch.StartNew();
var stream =
Enumerable.Range(0, 50)
.Select(i => new Func<Task<int>>(async () =>
{
var delay = r.Next(500, 5000);
if (i % 10 == 0) delay *= 5; // imitating rare slow requests
Console.WriteLine($"{sw.ElapsedMilliseconds,8} - {i,3} producing");
await Task.Delay(delay); // imitating IO work
return i;
}))
.MapUnordered(10, i => $"{sw.ElapsedMilliseconds,8} - {i,3} consuming");
await foreach (var result in stream)
{
Console.WriteLine(result);
}
Console.WriteLine("done");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment