Skip to content

Instantly share code, notes, and snippets.

@wi7a1ian
Created October 7, 2019 10:09
Show Gist options
  • Save wi7a1ian/979821eccdb4ca69ac8091e55865b1e7 to your computer and use it in GitHub Desktop.
Save wi7a1ian/979821eccdb4ca69ac8091e55865b1e7 to your computer and use it in GitHub Desktop.
Usage of three basic DataFlow types in C# #csharp
static void Main(string[] args)
{
var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(10));
TestProducerConsumerViaBufferBlock(cts.Token);
TestProducerConsumerViaActionBlock(cts.Token);
TestParallelTransformViaTransformBlock(cts.Token);
}
static void TestProducerConsumerViaBufferBlock(CancellationToken ctoken)
{
var opt = new DataflowBlockOptions {
BoundedCapacity = 3, // max 3 messages can be send
MaxMessagesPerTask = 1,
CancellationToken = ctoken
};
var bufferBlock = new BufferBlock<int>(opt);
var post01 = Task.Run(() => {
bufferBlock.Post(0);
bufferBlock.Post(1);
}, ctoken);
var receive = Task.Run(async () => {
while (await bufferBlock.OutputAvailableAsync()) { // or !bufferBlock.Completion.IsCompleted
Console.WriteLine($"BufferBlock {Thread.CurrentThread.ManagedThreadId} #{bufferBlock.Receive()}");
// or while (source.TryReceive(out data))
}
}, ctoken);
var post2 = Task.Run(() => {
bufferBlock.Post(2);
}, ctoken);
Task.WhenAll(post01, post2).ContinueWith( _ => bufferBlock.Complete(), ctoken);
Task.WaitAll(bufferBlock.Completion, receive);
}
static void TestProducerConsumerViaActionBlock(CancellationToken ctoken)
{
var opt = new ExecutionDataflowBlockOptions {
CancellationToken = ctoken,
MaxDegreeOfParallelism = 4
};
var actionBlock = new ActionBlock<int>( x => {
Console.WriteLine($"ActionBlock {Thread.CurrentThread.ManagedThreadId} #{x}");
}, opt);
var post = Task.Run(() => {
actionBlock.Post(3);
actionBlock.Post(4);
actionBlock.Post(5);
}, ctoken).ContinueWith(_ => actionBlock.Complete(), ctoken); // fire and forget
actionBlock.Completion.Wait();
}
private static void TestParallelTransformViaTransformBlock(CancellationToken ctoken)
{
var opt = new ExecutionDataflowBlockOptions
{
CancellationToken = ctoken,
MaxDegreeOfParallelism = 4,
EnsureOrdered = true // ensure FIFO transformation
};
var transformBlock = new TransformBlock<int, string>(x =>
$"TransformBlock {Thread.CurrentThread.ManagedThreadId} #{x}"
, opt);
var printBlock = new ActionBlock<string>(x => {
Console.WriteLine($"ActionBlock {Thread.CurrentThread.ManagedThreadId} {x}");
}, opt);
transformBlock.LinkTo(printBlock);
transformBlock.Completion.ContinueWith( _ => printBlock.Complete());
var post = Task.Run(() => {
transformBlock.Post(6);
transformBlock.Post(7);
transformBlock.Post(8);
}, ctoken).ContinueWith(_ => transformBlock.Complete(), ctoken); // fire and forget
Task.WaitAll(transformBlock.Completion, printBlock.Completion);
}
@metem
Copy link

metem commented Sep 3, 2020

you should use SendAsync instead of Post because data send by Post will be ignored (Post doesnt wait) when you reach limit of buffer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment