Last active
January 22, 2024 23:31
-
-
Save ianmcook/c96b80f943406dc791f0e9455e11ac11 to your computer and use it in GitHub Desktop.
Acero ExecPlan for TPC-H Query 06
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <arrow/api.h> | |
#include <arrow/type.h> | |
#include <arrow/result.h> | |
#include <arrow/io/api.h> | |
#include <arrow/compute/api.h> | |
#include <arrow/acero/exec_plan.h> | |
#include <arrow/acero/options.h> | |
#include <parquet/arrow/reader.h> | |
arrow::Status ExecutePlanAndCollectAsTable( | |
std::shared_ptr<arrow::acero::ExecPlan> plan, | |
std::shared_ptr<arrow::Schema> schema, | |
arrow::AsyncGenerator<std::optional<arrow::compute::ExecBatch>> sink_gen) { | |
// translate sink_gen (async) to sink_reader (sync) | |
std::shared_ptr<arrow::RecordBatchReader> sink_reader = | |
arrow::acero::MakeGeneratorReader(schema, std::move(sink_gen), arrow::default_memory_pool()); | |
// validate the ExecPlan | |
ARROW_RETURN_NOT_OK(plan->Validate()); | |
std::cout << "ExecPlan created : " << plan->ToString() << std::endl; | |
// start the ExecPlan | |
plan->StartProducing(); | |
// collect sink_reader into a Table | |
std::shared_ptr<arrow::Table> response_table; | |
ARROW_ASSIGN_OR_RAISE(response_table, arrow::Table::FromRecordBatchReader(sink_reader.get())); | |
std::cout << "Results : " << std::endl << response_table->ToString() << std::endl; | |
// stop producing | |
plan->StopProducing(); | |
// plan mark finished | |
auto future = plan->finished(); | |
return future.status(); | |
} | |
arrow::Status Execute() { | |
// read Parquet file | |
std::shared_ptr<arrow::io::RandomAccessFile> input; | |
ARROW_ASSIGN_OR_RAISE(input, arrow::io::ReadableFile::Open("lineitem.parquet")); | |
std::unique_ptr<parquet::arrow::FileReader> arrow_reader; | |
ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input, arrow::default_memory_pool(), &arrow_reader)); | |
std::shared_ptr<arrow::Table> table; | |
ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table)); | |
// make ExecPlan | |
ARROW_ASSIGN_OR_RAISE( | |
std::shared_ptr<arrow::acero::ExecPlan> plan, | |
arrow::acero::ExecPlan::Make(*arrow::compute::threaded_exec_context()) | |
); | |
// make source node | |
arrow::AsyncGenerator<std::optional<arrow::compute::ExecBatch>> sink_gen; | |
int max_batch_size = 1000; | |
auto table_source_options = arrow::acero::TableSourceNodeOptions{table, max_batch_size}; | |
ARROW_ASSIGN_OR_RAISE( | |
arrow::acero::ExecNode * source_node, | |
arrow::acero::MakeExecNode("table_source", plan.get(), {}, table_source_options) | |
); | |
// make filter node | |
arrow::compute::Expression filter_expr = arrow::compute::call("and", { | |
arrow::compute::call("and", { | |
arrow::compute::call("and", { | |
arrow::compute::greater_equal(arrow::compute::field_ref("l_shipdate"), arrow::compute::literal(arrow::Date32Scalar(8766))), | |
// January 1, 1994 is 8766 days after January 1, 1970 | |
arrow::compute::less(arrow::compute::field_ref("l_shipdate"), arrow::compute::literal(arrow::Date32Scalar(9131))) | |
// January 1, 1995 is 9131 days after January 1, 1970 | |
}), | |
arrow::compute::call("and", { | |
arrow::compute::greater_equal(arrow::compute::field_ref("l_discount"), arrow::compute::literal(0.05)), | |
arrow::compute::less_equal(arrow::compute::field_ref("l_discount"), arrow::compute::literal(0.07)) | |
}), | |
}), | |
arrow::compute::less(arrow::compute::field_ref("l_quantity"), arrow::compute::literal(24.0)) | |
}); | |
ARROW_ASSIGN_OR_RAISE( | |
arrow::acero::ExecNode * filter_node, | |
arrow::acero::MakeExecNode("filter", plan.get(), {source_node}, | |
arrow::acero::FilterNodeOptions{filter_expr} | |
) | |
); | |
// make project node | |
ARROW_ASSIGN_OR_RAISE( | |
arrow::acero::ExecNode * project_node, | |
arrow::acero::MakeExecNode("project", plan.get(), {filter_node}, | |
arrow::acero::ProjectNodeOptions{ | |
{ | |
arrow::compute::call( | |
"multiply", | |
{arrow::compute::field_ref("l_extendedprice"), arrow::compute::field_ref("l_discount")} | |
) | |
}, | |
{"product"} | |
} | |
) | |
); | |
// make aggregate node | |
auto options = std::make_shared<arrow::compute::ScalarAggregateOptions>(/*skip_nulls=*/true, /*min_count=*/1); | |
auto aggregate_options = | |
arrow::acero::AggregateNodeOptions{/*aggregates=*/{{"sum", options, "product", "revenue"}}, /*keys=*/{}}; | |
ARROW_ASSIGN_OR_RAISE( | |
arrow::acero::ExecNode * aggregate_node, | |
arrow::acero::MakeExecNode("aggregate", plan.get(), {project_node}, aggregate_options) | |
); | |
// make sink node | |
ARROW_RETURN_NOT_OK( | |
arrow::acero::MakeExecNode("sink", plan.get(), {aggregate_node}, arrow::acero::SinkNodeOptions{&sink_gen}) | |
); | |
// execute plan | |
auto result = ExecutePlanAndCollectAsTable(plan, aggregate_node->output_schema(), sink_gen); | |
return arrow::Status::OK(); | |
} | |
int main(int argc, char** argv) { | |
auto status = Execute(); | |
if (!status.ok()) { | |
std::cerr << "Error occurred : " << status.message() << std::endl; | |
return EXIT_FAILURE; | |
} | |
return EXIT_SUCCESS; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Variations of this using Acero's newer declarative API: