Skip to content

Instantly share code, notes, and snippets.

@amosbird
Created March 21, 2023 12:19
Show Gist options
  • Save amosbird/6fa681a5ce6c04d93cb0456cced9f886 to your computer and use it in GitHub Desktop.
Save amosbird/6fa681a5ce6c04d93cb0456cced9f886 to your computer and use it in GitHub Desktop.
static void initRowsBeforeLimit(IOutputFormat * output_format)
{
RowsBeforeLimitCounterPtr rows_before_limit_at_least;
/// TODO: add setRowsBeforeLimitCounter as virtual method to IProcessor.
std::vector<LimitTransform *> limits;
std::vector<PartialSortingTransform *> partial_sortings;
std::vector<RemoteSource *> remote_sources;
std::unordered_set<IProcessor *> visited;
struct QueuedEntry
{
IProcessor * processor;
};
std::queue<QueuedEntry> queue;
queue.push({ output_format });
visited.emplace(output_format);
while (!queue.empty())
{
auto * processor = queue.front().processor;
queue.pop();
if (auto * limit = typeid_cast<LimitTransform *>(processor))
{
limits.emplace_back(limit);
}
else if (auto * sorting = typeid_cast<PartialSortingTransform *>(processor))
{
partial_sortings.emplace_back(sorting);
/// Don't go to children. Take rows_before_limit from last PartialSortingTransform.
continue;
}
else if (const auto * transforming = dynamic_cast<const ITransformingStep *>(processor))
{
/// Don't go to children if current transform changes the number of rows.
if (!transforming->getTransformTraits().preserves_number_of_rows)
continue;
}
else if (auto * source = typeid_cast<RemoteSource *>(processor))
{
remote_sources.emplace_back(source);
}
/// Skip totals and extremes port for output format.
if (auto * format = dynamic_cast<IOutputFormat *>(processor))
{
auto * child_processor = &format->getPort(IOutputFormat::PortKind::Main).getOutputPort().getProcessor();
if (visited.emplace(child_processor).second)
queue.push({ child_processor });
continue;
}
for (auto & child_port : processor->getInputs())
{
auto * child_processor = &child_port.getOutputPort().getProcessor();
if (visited.emplace(child_processor).second)
queue.push({ child_processor });
}
}
auto assign_limit_counter = [&](auto & processors)
{
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
for (auto & processor : processors)
processor->setRowsBeforeLimitCounter(rows_before_limit_at_least);
};
if (!partial_sortings.empty())
assign_limit_counter(partial_sortings);
else if (!remote_sources.empty())
assign_limit_counter(remote_sources);
else if (!limits.empty())
assign_limit_counter(limits);
/// If there is a limit, then enable rows_before_limit_at_least
/// It is needed when zero rows is read, but we still want rows_before_limit_at_least in result.
if (!limits.empty())
rows_before_limit_at_least->add(0);
if (rows_before_limit_at_least)
output_format->setRowsBeforeLimitCounter(rows_before_limit_at_least);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment