-
-
Save HalidCisse/7f8e3980034a9df41437816da44f8681 to your computer and use it in GitHub Desktop.
Channels Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading.Channels; | |
using System.Threading.Tasks; | |
namespace ChannelsAreCool | |
{ | |
//Disclaimer : I didn't actually run this code so it might not quite work. | |
//Feel free to complain or ask questions and i'll fix it. | |
public static class Example | |
{ | |
public static async Task RunExample() | |
{ | |
const int maxMessagesToBuffer = 100; | |
var channel = Channel.CreateBounded<string>(maxMessagesToBuffer); | |
//bounded channels are important if the consumer/reader is slower than the | |
//producer. You don't want your app to keep buffering until you explode with | |
//an OutOfMemoryException in production... or use .CreateUnbounded if you don't care | |
var reader = channel.Reader; | |
var writer = channel.Writer; | |
//You typically would need to run a dedicated thread to await and proccess | |
//message from the channel, but we can use Task.Run to 'borrow' a thread | |
//from the thread pool for the same purpose. The thread pool will compensate for it | |
var worker1 = Task.Run(() => ListenToChannel(channel.Reader)); | |
//You can uncomment these to have multiple readers listening to the | |
//same channel. It works and it is safe. | |
//var worker2 = Task.Run(() => ListenToChannel(channel.Reader)); | |
//var worker3 = Task.Run(() => ListenToChannel(channel.Reader)); | |
//var worker4 = Task.Run(() => ListenToChannel(channel.Reader)); | |
//This will try to write lots of messages to the channel | |
//but since the reader is slower (because of the Task.Delay) | |
//WriteAsync will block until there is space in the channel | |
//This technique is called Back Pressure and and can help slow | |
//a writer from getting too far ahead. | |
//NOTE: Unbounded channels do not block ever | |
for (int i = 0; i < 1000; i++) | |
await writer.WriteAsync($"Message # {i}"); | |
//ALWAYS do this to wake up the readers and tell them you are done | |
//If you don't they will stay awaiting 'WaitToReadAsync()' forever | |
writer.Complete(); | |
//[Optional] | |
//This will wait until the readers have read all remaining messages in the | |
//channel. This is optional bute | |
await reader.Completion;//this is option but it can be important. | |
//NOTE: You can also 'await Task.WhenAll(worker1, worker2, worker3, worker4);' | |
//wait for the workers to have completed processing, not just draining the channel | |
} | |
private static async Task ListenToChannel(ChannelReader<string> reader) | |
{ | |
//because async methods use a state machine to handle awaits | |
//it is safe to await in an infinte loop. Thank you C# compiler gods! | |
while (await reader.WaitToReadAsync())//if this returns false the channel is completed | |
{ | |
//as a note, if there are multiple readers but only one message, only one reader | |
//wakes up. This prevents inefficent races. | |
string messageString; | |
while (reader.TryRead(out messageString))//yes, yes I know about 'out var messageString'... | |
{ | |
Console.WriteLine($"The listener just read {messageString}!"); | |
await Task.Delay(25);//this simulates some work... | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment