Created
October 7, 2019 10:09
-
-
Save wi7a1ian/979821eccdb4ca69ac8091e55865b1e7 to your computer and use it in GitHub Desktop.
Usage of three basic DataFlow types in C# #csharp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
static void Main(string[] args) | |
{ | |
var cts = new CancellationTokenSource(); | |
cts.CancelAfter(TimeSpan.FromSeconds(10)); | |
TestProducerConsumerViaBufferBlock(cts.Token); | |
TestProducerConsumerViaActionBlock(cts.Token); | |
TestParallelTransformViaTransformBlock(cts.Token); | |
} | |
static void TestProducerConsumerViaBufferBlock(CancellationToken ctoken) | |
{ | |
var opt = new DataflowBlockOptions { | |
BoundedCapacity = 3, // max 3 messages can be send | |
MaxMessagesPerTask = 1, | |
CancellationToken = ctoken | |
}; | |
var bufferBlock = new BufferBlock<int>(opt); | |
var post01 = Task.Run(() => { | |
bufferBlock.Post(0); | |
bufferBlock.Post(1); | |
}, ctoken); | |
var receive = Task.Run(async () => { | |
while (await bufferBlock.OutputAvailableAsync()) { // or !bufferBlock.Completion.IsCompleted | |
Console.WriteLine($"BufferBlock {Thread.CurrentThread.ManagedThreadId} #{bufferBlock.Receive()}"); | |
// or while (source.TryReceive(out data)) | |
} | |
}, ctoken); | |
var post2 = Task.Run(() => { | |
bufferBlock.Post(2); | |
}, ctoken); | |
Task.WhenAll(post01, post2).ContinueWith( _ => bufferBlock.Complete(), ctoken); | |
Task.WaitAll(bufferBlock.Completion, receive); | |
} | |
static void TestProducerConsumerViaActionBlock(CancellationToken ctoken) | |
{ | |
var opt = new ExecutionDataflowBlockOptions { | |
CancellationToken = ctoken, | |
MaxDegreeOfParallelism = 4 | |
}; | |
var actionBlock = new ActionBlock<int>( x => { | |
Console.WriteLine($"ActionBlock {Thread.CurrentThread.ManagedThreadId} #{x}"); | |
}, opt); | |
var post = Task.Run(() => { | |
actionBlock.Post(3); | |
actionBlock.Post(4); | |
actionBlock.Post(5); | |
}, ctoken).ContinueWith(_ => actionBlock.Complete(), ctoken); // fire and forget | |
actionBlock.Completion.Wait(); | |
} | |
private static void TestParallelTransformViaTransformBlock(CancellationToken ctoken) | |
{ | |
var opt = new ExecutionDataflowBlockOptions | |
{ | |
CancellationToken = ctoken, | |
MaxDegreeOfParallelism = 4, | |
EnsureOrdered = true // ensure FIFO transformation | |
}; | |
var transformBlock = new TransformBlock<int, string>(x => | |
$"TransformBlock {Thread.CurrentThread.ManagedThreadId} #{x}" | |
, opt); | |
var printBlock = new ActionBlock<string>(x => { | |
Console.WriteLine($"ActionBlock {Thread.CurrentThread.ManagedThreadId} {x}"); | |
}, opt); | |
transformBlock.LinkTo(printBlock); | |
transformBlock.Completion.ContinueWith( _ => printBlock.Complete()); | |
var post = Task.Run(() => { | |
transformBlock.Post(6); | |
transformBlock.Post(7); | |
transformBlock.Post(8); | |
}, ctoken).ContinueWith(_ => transformBlock.Complete(), ctoken); // fire and forget | |
Task.WaitAll(transformBlock.Completion, printBlock.Completion); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
you should use SendAsync instead of Post because data send by Post will be ignored (Post doesnt wait) when you reach limit of buffer.