Created
November 18, 2020 10:29
-
-
Save jstclair/6170bbeb96d6446e83592e44147690ab to your computer and use it in GitHub Desktop.
Commented demo for using System.Threading.Channels
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Diagnostics; | |
using System.Threading.Tasks; | |
using System.Threading.Channels; | |
using System.Linq; | |
using System.Linq.Expressions; | |
using System.Threading; | |
namespace ChannelsDemo | |
{ | |
/// <summary> | |
/// Simple "getting-started" demo of System.Threading.Channels | |
/// </summary> | |
/// <remarks> | |
/// There is no error handling demonstrated here. Typically, you would surround `WaitTo(Read|Write)Async` with `try {} catch (ChannelClosedException) {}` | |
/// but that would obscure some of the code here. | |
/// </remarks> | |
class Program | |
{ | |
// NOTE: You can adjust these to see how various combinations work | |
// NOTE: You might want to put some artificial delays in OutputGreeting (via `await Task.Delay(TimeSpan.FromSeconds(2))` | |
private const int MaxItemsToProduce = 5_000; | |
private static readonly int MaxReaders = Environment.ProcessorCount; // or, hard-coded value, like 1, 2, or 10 | |
private static readonly int MaxBoundedSize = 2 * MaxReaders; // or, hard-coded value, like 1, 2, or 400 | |
// NOTE: in all these examples, the time is dominated by the slowness of the Console. Try disabling this and comparing elapsed times. | |
private const bool WriteToConsole = true; | |
// NOTE: in scenarios where you want to simulate CPU-bound work, you can disable writing to console and set this instead. Calls Thread.SpinWait(n) | |
private const int CpuSpinTime = 0; | |
// NOTE: when experimenting with different strategies, this can help you visualize where tasks are in the channel | |
private const bool UseTracing = false; | |
/* Additional Note: | |
* Performance is constrained when running a Debug build and/or in VS. Try building a Release build and running in a separate console. | |
* | |
* Examples from my machine using MaxItemsToProduce = 5000 and the MultipleCompetingTransformsAndConsumers example | |
* | |
* [WriteToConsole = true] | |
* Debug/F5: 1_183 items/sec | |
* Debug/Cmd: 1_350 items/sec | |
* Release/F5: 1_204 items/sec | |
* Release/Cmd: 1_371 items/sec | |
* | |
* [WriteToConsole = false] | |
* Debug/F5: 49_341 items/sec | |
* Debug/Cmd: 204_788 items/sec | |
* Release/F5: 44_856 items/sec | |
* Release/Cmd: 204_611 items/sec | |
* | |
* NOTE: There is also overhead for such a small number of items. Increasing MaxItemsToProduce to 500_000: | |
* | |
* [WriteToConsole = false] | |
* Release/F5: 845_265 items/sec (MultipleCompetingTransformsAndConsumers) | |
* Release/F5: 1_638_367 items/sec (Simple) | |
*/ | |
static async Task Main(string[] args) | |
{ | |
Console.WriteLine("Welcome to the Channels Demo app"); | |
// NOTE: 2 types of channels - unbounded and bounded. Bounded channels have a limit - writes will block until reads have drained the channel. | |
// Unbounded channels will never block reads or writes, but assume you have capacity to store all items in memory. | |
// Our first channel is just a list of ids that we want channel consumers to read, so we can easily hold hundreds of thousands in memory. | |
// But for observing the flow of processing (i.e., with UseTracing = true), you might want to look at a Bounded channel | |
var idChannel = Channel.CreateUnbounded<string>(); | |
//var idChannel = Channel.CreateBounded<string>(MaxBoundedSize); | |
// Our second channel is doing work, and we want to constrain the amount of simultaneous work (both for memory and resource consumption). | |
// But you can test various scenarios where you use an unbounded channel here as well. | |
var greetChannel = Channel.CreateBounded<string>(MaxBoundedSize); | |
//var greetChannel = Channel.CreateUnbounded<string>(); | |
var sw = Stopwatch.StartNew(); | |
// NOTE: because the examples close the channel, you can only run a single example at a time | |
await Simple(idChannel, greetChannel); | |
//await MultipleConsumers(idChannel, greetChannel); | |
//await MultipleTransformsAndConsumers(idChannel, greetChannel); | |
//var otherChannel = Channel.CreateBounded<string>(MaxBoundedSize); | |
//await MultipleCompetingTransformsAndConsumers(idChannel, greetChannel, otherChannel); | |
sw.Stop(); | |
Console.WriteLine($"{MaxItemsToProduce:N0} consumed in {sw.Elapsed.TotalSeconds:F1} seconds [{(MaxItemsToProduce/sw.Elapsed.TotalSeconds):N0} items/second]"); | |
} | |
// Example task of producing data and writing it to a ChannelWriter | |
private static async Task ProduceIds(ChannelWriter<string> writer, int max) | |
{ | |
var ids = Enumerable.Range(1, max); | |
foreach (var id in ids) | |
{ | |
Trace($"{nameof(ProduceIds)}: writing id {id}"); | |
// NOTE: this works just fine... | |
await writer.WriteAsync($"{id}"); | |
// NOTE: This is an optimization for avoiding the await when the channel isn't blocking, or has a large capacity | |
// In the current case, since we have an unbounded writer, we would never need to await. | |
//if (writer.TryWrite($"{id}") == false) | |
//{ | |
// await writer.WriteAsync($"{id}"); | |
//} | |
// NOTE: you can signal an error by calling `Complete(Exception)` - this will be an orderly shutdown. | |
//if (id == 1337) | |
//{ | |
// Trace("Closing the channel for 1337"); | |
// var e = new Exception($"You should never get a 1337"); | |
// writer.Complete(e); | |
// // In the simple `writer.WriteAsync` above, if you `Complete` the channel, you will attempt to write an additional item to it. | |
//} | |
} | |
Trace($"{nameof(ProduceIds)}: completed writing"); | |
// when we are done, signal the channel that no more data is coming | |
writer.Complete(); | |
} | |
// Example task of reading data from one channel (ChannelReader<T>) and writing it to another channel (ChannelWriter<T>) | |
private static async Task TransformIdAndPublishGreeting(ChannelReader<string> reader, | |
ChannelWriter<string> writer, string greeting, bool autoClose = false) | |
{ | |
while (await reader.WaitToReadAsync()) | |
{ | |
Trace($"{nameof(TransformIdAndPublishGreeting)}: entered WaitToReadAsync"); | |
// NOTE: this will also work just fine.. | |
while (reader.TryRead(out var id)) | |
{ | |
await writer.WriteAsync(FormatGreeting(greeting, id)); | |
} | |
// NOTE: but again, like above, we can avoid the unnecessary writing await as long as possible | |
//while (reader.TryRead(out var id)) | |
//{ | |
// var msg = FormatGreeting(greeting, id); | |
// if (writer.TryWrite(msg) == false) | |
// { | |
// Trace($"{nameof(TransformIdAndPublishGreeting)}: TryWrite: had to take slow path"); | |
// await writer.WriteAsync(msg); | |
// } | |
//} | |
Trace($"{nameof(TransformIdAndPublishGreeting)}: exited WaitToReadAsync"); | |
} | |
// NOTE: if we have a single instance of this task, we can close the writer when we're done; | |
// but if we have multiple instances of this task, that would fail (because `Complete()` can only | |
// be called once on a channel. | |
if (autoClose) | |
{ | |
Trace($"{nameof(TransformIdAndPublishGreeting)}: completing writer"); | |
writer.Complete(); | |
} | |
} | |
private static string FormatGreeting(string greeting, string id) => $"{greeting} {id}"; | |
// Example task of reading from a channel | |
private static async Task OutputGreeting(ChannelReader<string> reader, string optionalPrefix = "") | |
{ | |
while (await reader.WaitToReadAsync()) | |
{ | |
Trace($"{nameof(OutputGreeting)}{optionalPrefix}: entered WaitToReadAsync"); | |
while (reader.TryRead(out var greeting)) | |
{ | |
if (WriteToConsole) | |
Console.WriteLine($"{optionalPrefix}{greeting}"); | |
// NOTE: this is here to heat up your CPUs | |
else if (CpuSpinTime >= 0) | |
Thread.SpinWait(CpuSpinTime); | |
} | |
Trace($"{nameof(OutputGreeting)}{optionalPrefix}: exited WaitToReadAsync"); | |
} | |
} | |
/* NOTE: Simple example | |
* In this example, we are using a single task for each process - this is closest to a traditional pipe architecture, with the advantage that | |
* we natively handle async actions. | |
*/ | |
private static async Task Simple(Channel<string> idChannel, Channel<string> greetChannel) | |
{ | |
var produceTask = Task.Run(() => ProduceIds(idChannel.Writer, MaxItemsToProduce)); | |
var transformTask = Task.Run(() => TransformIdAndPublishGreeting(idChannel.Reader, greetChannel.Writer, "Hello", autoClose: true)); | |
var greetTask = Task.Run(() => OutputGreeting(greetChannel.Reader)); | |
// NOTE: we await each task - in addition, we want to await any reader channels. Only readers have a `Task Completion`; writers have a `.Complete()` method. | |
await Task.WhenAll(idChannel.Reader.Completion, produceTask, transformTask, greetChannel.Reader.Completion, greetTask); | |
} | |
/* NOTE: Multiple Consumers example | |
* In this example, we are using a single task for producing and transforming, but multiple greeting processes. Since the greeting only reads, | |
* there's minimal changes required. | |
*/ | |
private static async Task MultipleConsumers(Channel<string> idChannel, Channel<string> greetChannel) | |
{ | |
var produceTask = Task.Run(() => ProduceIds(idChannel.Writer, MaxItemsToProduce)); | |
var transformTask = Task.Run(() => TransformIdAndPublishGreeting(idChannel.Reader, greetChannel.Writer, "Hello", autoClose: true)); | |
var greetTasks = Enumerable.Range(0, MaxReaders) | |
.Select(i => Task.Run(() => OutputGreeting(greetChannel.Reader, $"{i}"))) | |
.ToArray(); // NOTE: *Very* important that we materialize the running tasks, since `.Select` is lazy | |
await Task.WhenAll(new[] { idChannel.Reader.Completion, produceTask, transformTask, greetChannel.Reader.Completion }.Concat(greetTasks)); | |
} | |
/* NOTE: Multiple Transforms And Consumers example | |
* In this example, we are using a single task for producing, but multiple transform and greeting processes. Since the transform task was | |
* responsible for signalling that it was done writing in the single-task scenario, we now need to handle this explicitly. | |
*/ | |
private static async Task MultipleTransformsAndConsumers(Channel<string> idChannel, Channel<string> greetChannel) | |
{ | |
var produceTask = Task.Run(() => ProduceIds(idChannel.Writer, MaxItemsToProduce)); | |
var transformTasks = Enumerable.Range(0, MaxReaders * 2) | |
.Select(i => Task.Run(() => TransformIdAndPublishGreeting(idChannel.Reader, greetChannel.Writer, $"'{i}' Hello", autoClose: false))) | |
.ToArray(); | |
var greetTasks = Enumerable.Range(0, MaxReaders * 2) | |
.Select(i => Task.Run(() => OutputGreeting(greetChannel.Reader, $"{i}"))) | |
.ToArray(); | |
// NOTE: once the transform tasks have completed writing to their channel, we can signal the greet tasks that no more data is coming. | |
await Task.WhenAll(new [] {idChannel.Reader.Completion, produceTask }.Concat(transformTasks)); | |
greetChannel.Writer.Complete(); | |
await Task.WhenAll(new[] { greetChannel.Reader.Completion }.Concat(greetTasks)); | |
} | |
/* NOTE: Multiple Competing Transforms And Consumers example | |
* In this example, we are using a single task for producing, but multiple transform and greeting processes. In addition, the transform task | |
* is now writing to multiple channels. In real code, this might be producing a read model and updating a search index. Like the MultipleTransformsAndConsumers | |
* example, we are responsible for explicitly signalling that the transform tasks are completed writing. | |
*/ | |
private static async Task MultipleCompetingTransformsAndConsumers(Channel<string> idChannel, Channel<string> greetChannel, Channel<string> otherChannel) | |
{ | |
// NOTE: in this example, tweaking the running tasks is important for throughput. In my case, I have 12 processors. | |
// If I leave these all set to `MaxReaders`, the best result is: | |
// 205_000 items/sec | |
// If I adjust them to MaxReaders/3 (so that consumers roughly equal CPUs), the best result is: | |
// 218_702 items/sec | |
// NOTE: this will also depend on the work each of your tasks is doing. If some are slow, you may wish to create more of them... | |
// NOTE: the produceTask will also burn a CPU while it's working (for large # of items), so you may wish to calculate based on MaxReaders - 1 | |
var produceTask = Task.Run(() => ProduceIds(idChannel.Writer, MaxItemsToProduce)); | |
var readersPerTask = MaxReaders; // MaxReaders / 3; | |
var transformTasks = Enumerable.Range(0, readersPerTask) | |
.Select(i => Task.Run(() => TransformIdAndPublishGreeting(idChannel.Reader, greetChannel.Writer, otherChannel.Writer, $"'{i}' Hello"))) | |
.ToArray(); | |
// NOTE: We want to have multiple consumers, but we can use the same underlying function; we will use the optional prefix to help | |
// distinguish between the two tasks in the console. Each set of greet tasks reads from it's own channel (so slowing down one will only | |
// affect those tasks. If you want to see this, you can add a `if (optionalPrefix.StartsWith("OTHER") Thread.SpinWait(CpuSpinTime);` in | |
// the `OutputGreeting` method. | |
var greetTasks = Enumerable.Range(0, readersPerTask) | |
.Select(i => Task.Run(() => OutputGreeting(greetChannel.Reader, $"GREET [{i:00}] "))) | |
.ToArray(); | |
var otherTasks = Enumerable.Range(0, readersPerTask) | |
.Select(i => Task.Run(() => OutputGreeting(otherChannel.Reader, $"OTHER: [{i:00}] "))) | |
.ToArray(); | |
// NOTE: like in MultipleTransformsAndConsumers, we signal consumers that there's no more data to come. However, in this case, we need to | |
// signal to both channels. | |
await Task.WhenAll(new[] { idChannel.Reader.Completion, produceTask }.Concat(transformTasks)); | |
greetChannel.Writer.Complete(); | |
otherChannel.Writer.Complete(); | |
await Task.WhenAll(new[] { greetChannel.Reader.Completion, otherChannel.Reader.Completion }.Concat(greetTasks).Concat(otherTasks)); | |
} | |
// NOTE: Overload of TransformIdAndPublishGreeting that takes a multiple channel writers (for the MultipleCompetingTransformsAndConsumers example) | |
private static async Task TransformIdAndPublishGreeting(ChannelReader<string> reader, | |
ChannelWriter<string> writer, ChannelWriter<string> otherWriter, string greeting) | |
{ | |
while (await reader.WaitToReadAsync()) | |
{ | |
while (reader.TryRead(out var id)) | |
{ | |
await writer.WriteAsync($"{greeting}, {id}"); | |
await otherWriter.WriteAsync($"{greeting}, {id}"); | |
} | |
} | |
} | |
private static void Trace(string msg) | |
{ | |
if (UseTracing) Console.WriteLine($" {msg}"); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment