Created
May 4, 2020 08:19
-
-
Save renestein/e85bde8c461b839b51f195892aae92b9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Tasks::Task<size_t> WhenAsyncForkJoinDataflowThenAllInputsProcessedImpl(int inputItemsCount) | |
{ | |
//Create TransformBlock. As the name of the block suggests, TransformBlock transforms input to output. | |
//Following block transforms int to string. | |
auto transform1 = DataFlowAsyncFactory::CreateTransformBlock<int, int>([](const int& item)-> Tasks::Task<int> | |
{ | |
//Simulate work | |
co_await Tasks::GetCompletedTask(); | |
auto message = "int: " + to_string(item) + "\n"; | |
cout << message; | |
co_return item; | |
}); | |
//Fork dataflow (even numbers are processed in one TransformBlock, for odd numbers create another transformBlock) | |
auto transform2 = DataFlowAsyncFactory::CreateTransformBlock<int, string>([](const int& item)->Tasks::Task<string> | |
{ | |
//Simulate work | |
co_await Tasks::GetCompletedTask(); | |
auto message = "Even number: " + to_string(item) + "\n"; | |
cout << message; | |
co_return to_string(item); | |
}, | |
//Accept only even numbers. | |
//Condition is evaluated for every input. | |
//If the condition evaluates to true, input is accepted; otherwise input is ignored. | |
[](const int& item) | |
{ | |
return item % 2 == 0; | |
}); | |
auto transform3 = DataFlowAsyncFactory::CreateTransformBlock<int, string>([](const int& item)->Tasks::Task<string> | |
{ | |
//Simulate work | |
co_await Tasks::GetCompletedTask(); | |
auto message = "Odd number: " + to_string(item) + "\n"; | |
cout << message; | |
co_return to_string(item); | |
}, | |
//Accept only odd numbers. | |
//Condition is evaluated for every input. | |
//If the condition evaluates to true, input is accepted; otherwise input is ignored. | |
[](const int& item) | |
{ | |
return item % 2 != 0; | |
}); | |
//End fork. | |
vector<string> _processedItems{}; | |
auto finalAction = DataFlowSyncFactory::CreateActionBlock<string>([&_processedItems](const string& item) | |
{ | |
auto message = "Final action: " + item + "\n"; | |
cout << message; | |
_processedItems.push_back(item); | |
}); | |
//Fork | |
transform1->ConnectTo(transform2); | |
transform1->ConnectTo(transform3); | |
//end fork | |
//Join | |
transform3->ConnectTo(finalAction); | |
transform2->ConnectTo(finalAction); | |
//End join | |
//Start dataflow | |
transform1->Start(); | |
//Add input data to the first block. | |
for (int i = 0; i < inputItemsCount; ++i) | |
{ | |
co_await transform1->AcceptInputAsync(i); | |
} | |
//All input data are in the dataflow. Send notification that no more data will be added. | |
//This does not mean that all data in the dataflow are processed! | |
transform1->Complete(); | |
//Wait for last block. | |
co_await finalAction->Completion(); | |
const auto processedItemsCount = _processedItems.size(); | |
co_return processedItemsCount; | |
} | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment