Skip to content

Instantly share code, notes, and snippets.

@Porges
Created October 17, 2017 23:41
Show Gist options
  • Save Porges/d1160a7281f1cab49506a8171aa3bb6c to your computer and use it in GitHub Desktop.
Save Porges/d1160a7281f1cab49506a8171aa3bb6c to your computer and use it in GitHub Desktop.
Producer/consumer based on BlockingCollection with a max number of outstanding tasks
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace ExampleLoop
{
class Program
{
const int maxQueuedItems = 10;
const int maxRunningTasks = 5;
public static void Main()
{
using (var bc = new BlockingCollection<int>(maxQueuedItems))
{
Task.WaitAll(Sender(bc), Receiver(bc));
}
Console.WriteLine("Done!");
}
private static async Task Sender(BlockingCollection<int> collection)
{
for (int i = 0; i < 100; ++i)
{
collection.Add(i);
// simulate delay
await Task.Delay(TimeSpan.FromSeconds(0.1));
}
collection.CompleteAdding();
}
private static async Task Receiver(BlockingCollection<int> collection)
{
using (var semaphore = new SemaphoreSlim(maxRunningTasks))
{
while (collection.TryTake(out var item, Timeout.Infinite))
{
await semaphore.WaitAsync();
// spin off new task
Task.Run(
async () =>
{
try
{
// simulate long processing
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine(item);
}
finally
{
semaphore.Release();
}
});
}
for (int i = 0; i < maxRunningTasks; ++i)
{
// wait for all outstanding tasks to finish
await semaphore.WaitAsync();
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment