Skip to content

Instantly share code, notes, and snippets.

@RupertAvery
Last active July 11, 2025 16:19
Show Gist options
  • Save RupertAvery/adff0e177fdbb096670a2022ec12d957 to your computer and use it in GitHub Desktop.
Save RupertAvery/adff0e177fdbb096670a2022ec12d957 to your computer and use it in GitHub Desktop.
A wrapper class using Channels to asynchonously process a queue that can be added to at any time
public class Job
{
private static int lastId;
public int Id { get; private set; }
public int Duration { get; private set; }
public Job(int value)
{
Id = lastId + 1;
Duration = value * 2000;
lastId = Id;
}
}
using System.Threading.Channels;
public class Processor<T> where T : class
{
private readonly Channel<T> _channel = Channel.CreateUnbounded<T>();
private int _degreeOfParallelism;
public Processor(int degreeOfParallelism)
{
_degreeOfParallelism = degreeOfParallelism;
}
public async Task Add(T job)
{
await _channel.Writer.WriteAsync(job);
}
public void Complete()
{
// marks the channel as complete
_channel.Writer.Complete();
}
public Task Start(Func<T, CancellationToken, Task> action, CancellationToken token)
{
var consumers = new List<Task>();
for (var i = 0; i < _degreeOfParallelism; i++)
{
consumers.Add(ProcessTask(token, action));
}
return Task.WhenAll(consumers);
}
private async Task ProcessTask(CancellationToken token, Func<T, CancellationToken, Task> action)
{
while (await _channel.Reader.WaitToReadAsync(token))
{
var job = await _channel.Reader.ReadAsync(token);
try {
await action(job, token);
}
catch {
// handle exceptions
}
}
}
}
var cts = new CancellationTokenSource();
var p = new Processor<Job>(4);
bool running = true;
Console.WriteLine("Enter 1-9 to add jobs of different durations (x 5 seconds), or X to quit");
// Start processing in a new thread, so we can interact with the user below
var task = Task.Run(async () =>{
var token = cts.Token;
Console.WriteLine("Started processing");
await p.Start(async (job, cancellationToken) => {
Console.WriteLine($"Started #{job.Id}");
await Task.Delay(job.Duration);
Console.WriteLine($"Finished #{job.Id}");
}, token);
Console.WriteLine("Stopped processing");
});
while (running)
{
var input = Console.ReadLine();
if (int.TryParse(input, out var number))
{
await p.Add(new Job(number));
}
if (input == "x")
{
p.Complete();
running = false;
}
}
Console.WriteLine("Waiting for tasks to complete");
// wait for all tasks to complete
await task;
Console.WriteLine("Done");
@CharlieDigital
Copy link

limited to X number of threads

This may be misleading here since it's using Task which does not correspond to a discrete thread. The degreeOfParallelism may or may not cause the same corresponding number of threads to be provisioned by the thread pool.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment