Created
November 3, 2023 09:51
-
-
Save pitermarx/00602b2e7189e1410ee75c2d6471952f to your computer and use it in GitHub Desktop.
An example on how to use channels
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Threading; | |
using System.Threading.Channels; | |
using System.Threading.Tasks; | |
class Processor<T> where T : class | |
{ | |
private readonly Channel<T> _channel = Channel.CreateUnbounded<T>(); | |
private readonly Task processors; | |
public Processor(Func<T, Task> action, int numberOfConsumers = 4) | |
=> this.processors = Task.WhenAll(Enumerable.Range(0, numberOfConsumers).Select(_ => CreateProcessor(action))); | |
public ValueTask Add(T job) => _channel.Writer.WriteAsync(job); | |
public Task Complete() { | |
_channel.Writer.Complete(); | |
return processors; | |
} | |
private async Task CreateProcessor(Func<T, Task> action) { | |
while (await _channel.Reader.WaitToReadAsync()) { | |
var job = await _channel.Reader.ReadAsync(); | |
try { | |
await action(job); | |
} | |
catch { | |
// handle exceptions | |
} | |
} | |
} | |
} | |
var p = new Processor<string>(async id => { | |
Console.WriteLine($"Started #{id}"); | |
await Task.Delay(5000); | |
Console.WriteLine($"Finished #{id}"); | |
}); | |
while (Console.ReadLine() is string input && input != "x") await p.Add(input); | |
await p.Complete(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment