Created
March 21, 2023 12:19
-
-
Save amosbird/6fa681a5ce6c04d93cb0456cced9f886 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
static void initRowsBeforeLimit(IOutputFormat * output_format) | |
{ | |
RowsBeforeLimitCounterPtr rows_before_limit_at_least; | |
/// TODO: add setRowsBeforeLimitCounter as virtual method to IProcessor. | |
std::vector<LimitTransform *> limits; | |
std::vector<PartialSortingTransform *> partial_sortings; | |
std::vector<RemoteSource *> remote_sources; | |
std::unordered_set<IProcessor *> visited; | |
struct QueuedEntry | |
{ | |
IProcessor * processor; | |
}; | |
std::queue<QueuedEntry> queue; | |
queue.push({ output_format }); | |
visited.emplace(output_format); | |
while (!queue.empty()) | |
{ | |
auto * processor = queue.front().processor; | |
queue.pop(); | |
if (auto * limit = typeid_cast<LimitTransform *>(processor)) | |
{ | |
limits.emplace_back(limit); | |
} | |
else if (auto * sorting = typeid_cast<PartialSortingTransform *>(processor)) | |
{ | |
partial_sortings.emplace_back(sorting); | |
/// Don't go to children. Take rows_before_limit from last PartialSortingTransform. | |
continue; | |
} | |
else if (const auto * transforming = dynamic_cast<const ITransformingStep *>(processor)) | |
{ | |
/// Don't go to children if current transform changes the number of rows. | |
if (!transforming->getTransformTraits().preserves_number_of_rows) | |
continue; | |
} | |
else if (auto * source = typeid_cast<RemoteSource *>(processor)) | |
{ | |
remote_sources.emplace_back(source); | |
} | |
/// Skip totals and extremes port for output format. | |
if (auto * format = dynamic_cast<IOutputFormat *>(processor)) | |
{ | |
auto * child_processor = &format->getPort(IOutputFormat::PortKind::Main).getOutputPort().getProcessor(); | |
if (visited.emplace(child_processor).second) | |
queue.push({ child_processor }); | |
continue; | |
} | |
for (auto & child_port : processor->getInputs()) | |
{ | |
auto * child_processor = &child_port.getOutputPort().getProcessor(); | |
if (visited.emplace(child_processor).second) | |
queue.push({ child_processor }); | |
} | |
} | |
auto assign_limit_counter = [&](auto & processors) | |
{ | |
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>(); | |
for (auto & processor : processors) | |
processor->setRowsBeforeLimitCounter(rows_before_limit_at_least); | |
}; | |
if (!partial_sortings.empty()) | |
assign_limit_counter(partial_sortings); | |
else if (!remote_sources.empty()) | |
assign_limit_counter(remote_sources); | |
else if (!limits.empty()) | |
assign_limit_counter(limits); | |
/// If there is a limit, then enable rows_before_limit_at_least | |
/// It is needed when zero rows is read, but we still want rows_before_limit_at_least in result. | |
if (!limits.empty()) | |
rows_before_limit_at_least->add(0); | |
if (rows_before_limit_at_least) | |
output_format->setRowsBeforeLimitCounter(rows_before_limit_at_least); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment