Last active
July 11, 2025 16:19
-
-
Save RupertAvery/adff0e177fdbb096670a2022ec12d957 to your computer and use it in GitHub Desktop.
A wrapper class using Channels to asynchonously process a queue that can be added to at any time
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Job | |
{ | |
private static int lastId; | |
public int Id { get; private set; } | |
public int Duration { get; private set; } | |
public Job(int value) | |
{ | |
Id = lastId + 1; | |
Duration = value * 2000; | |
lastId = Id; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Threading.Channels; | |
public class Processor<T> where T : class | |
{ | |
private readonly Channel<T> _channel = Channel.CreateUnbounded<T>(); | |
private int _degreeOfParallelism; | |
public Processor(int degreeOfParallelism) | |
{ | |
_degreeOfParallelism = degreeOfParallelism; | |
} | |
public async Task Add(T job) | |
{ | |
await _channel.Writer.WriteAsync(job); | |
} | |
public void Complete() | |
{ | |
// marks the channel as complete | |
_channel.Writer.Complete(); | |
} | |
public Task Start(Func<T, CancellationToken, Task> action, CancellationToken token) | |
{ | |
var consumers = new List<Task>(); | |
for (var i = 0; i < _degreeOfParallelism; i++) | |
{ | |
consumers.Add(ProcessTask(token, action)); | |
} | |
return Task.WhenAll(consumers); | |
} | |
private async Task ProcessTask(CancellationToken token, Func<T, CancellationToken, Task> action) | |
{ | |
while (await _channel.Reader.WaitToReadAsync(token)) | |
{ | |
var job = await _channel.Reader.ReadAsync(token); | |
try { | |
await action(job, token); | |
} | |
catch { | |
// handle exceptions | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var cts = new CancellationTokenSource(); | |
var p = new Processor<Job>(4); | |
bool running = true; | |
Console.WriteLine("Enter 1-9 to add jobs of different durations (x 5 seconds), or X to quit"); | |
// Start processing in a new thread, so we can interact with the user below | |
var task = Task.Run(async () =>{ | |
var token = cts.Token; | |
Console.WriteLine("Started processing"); | |
await p.Start(async (job, cancellationToken) => { | |
Console.WriteLine($"Started #{job.Id}"); | |
await Task.Delay(job.Duration); | |
Console.WriteLine($"Finished #{job.Id}"); | |
}, token); | |
Console.WriteLine("Stopped processing"); | |
}); | |
while (running) | |
{ | |
var input = Console.ReadLine(); | |
if (int.TryParse(input, out var number)) | |
{ | |
await p.Add(new Job(number)); | |
} | |
if (input == "x") | |
{ | |
p.Complete(); | |
running = false; | |
} | |
} | |
Console.WriteLine("Waiting for tasks to complete"); | |
// wait for all tasks to complete | |
await task; | |
Console.WriteLine("Done"); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This may be misleading here since it's using
Task
which does not correspond to a discrete thread. ThedegreeOfParallelism
may or may not cause the same corresponding number of threads to be provisioned by the thread pool.