Last active
January 30, 2023 21:55
-
-
Save ianmcook/d2b197fd52e0ff304624a99ed1c2b149 to your computer and use it in GitHub Desktop.
Create and execute an Acero ExecPlan
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <arrow/api.h> | |
#include <arrow/result.h> | |
#include <arrow/compute/api.h> | |
#include <arrow/compute/exec/exec_plan.h> | |
arrow::Status ExecutePlanAndCollectAsTable( | |
std::shared_ptr<arrow::compute::ExecPlan> plan, | |
std::shared_ptr<arrow::Schema> schema, | |
arrow::AsyncGenerator<std::optional<arrow::compute::ExecBatch>> sink_gen) { | |
// translate sink_gen (async) to sink_reader (sync) | |
std::shared_ptr<arrow::RecordBatchReader> sink_reader = | |
arrow::compute::MakeGeneratorReader(schema, std::move(sink_gen), arrow::default_memory_pool()); | |
// validate the ExecPlan | |
ARROW_RETURN_NOT_OK(plan->Validate()); | |
std::cout << "ExecPlan created : " << plan->ToString() << std::endl; | |
// start the ExecPlan | |
plan->StartProducing(); | |
// collect sink_reader into a Table | |
std::shared_ptr<arrow::Table> response_table; | |
ARROW_ASSIGN_OR_RAISE(response_table, | |
arrow::Table::FromRecordBatchReader(sink_reader.get())); | |
std::cout << "Results : " << std::endl << response_table->ToString() << std::endl; | |
// stop producing | |
plan->StopProducing(); | |
// plan mark finished | |
auto future = plan->finished(); | |
return future.status(); | |
} | |
arrow::Status Execute() { | |
//std::cout << arrow::GetBuildInfo().version_string << std::endl; | |
auto null_long = std::numeric_limits<int>::quiet_NaN(); | |
arrow::Int32Builder int_builder; | |
ARROW_RETURN_NOT_OK(int_builder.Append(1)); | |
ARROW_RETURN_NOT_OK(int_builder.Append(1)); | |
ARROW_RETURN_NOT_OK(int_builder.AppendNull()); | |
ARROW_RETURN_NOT_OK(int_builder.Append(2)); | |
ARROW_RETURN_NOT_OK(int_builder.Append(3)); | |
ARROW_RETURN_NOT_OK(int_builder.Append(4)); | |
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> int_array, int_builder.Finish()); | |
arrow::StringBuilder str_builder; | |
ARROW_RETURN_NOT_OK(str_builder.Append("a")); | |
ARROW_RETURN_NOT_OK(str_builder.Append("a")); | |
ARROW_RETURN_NOT_OK(str_builder.Append("a")); | |
ARROW_RETURN_NOT_OK(str_builder.Append("a")); | |
ARROW_RETURN_NOT_OK(str_builder.Append("b")); | |
ARROW_RETURN_NOT_OK(str_builder.Append("b")); | |
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> str_array, str_builder.Finish()); | |
std::vector<std::shared_ptr<arrow::Field>> schema_vector = { | |
arrow::field("int", arrow::int32()), | |
arrow::field("str", arrow::utf8()) | |
}; | |
auto schema = std::make_shared<arrow::Schema>(schema_vector); | |
std::shared_ptr<arrow::Table> table = arrow::Table::Make(schema, {int_array, str_array}); | |
// std::cout << "Data : " << std::endl << table->ToString() << std::endl; | |
ARROW_ASSIGN_OR_RAISE( | |
std::shared_ptr<arrow::compute::ExecPlan> plan, | |
arrow::compute::ExecPlan::Make(*arrow::compute::threaded_exec_context()) | |
); | |
arrow::AsyncGenerator<std::optional<arrow::compute::ExecBatch>> sink_gen; | |
int max_batch_size = 100; | |
auto table_source_options = arrow::compute::TableSourceNodeOptions{table, max_batch_size}; | |
// source node | |
ARROW_ASSIGN_OR_RAISE( | |
arrow::compute::ExecNode * source_node, | |
arrow::compute::MakeExecNode("table_source", plan.get(), {}, table_source_options) | |
); | |
// filter node | |
ARROW_ASSIGN_OR_RAISE( | |
arrow::compute::ExecNode * filter_node, | |
arrow::compute::MakeExecNode("filter", plan.get(), {source_node}, | |
arrow::compute::FilterNodeOptions{ | |
arrow::compute::less(arrow::compute::field_ref("int"), arrow::compute::literal(4)) | |
} | |
) | |
); | |
// project node | |
ARROW_ASSIGN_OR_RAISE( | |
arrow::compute::ExecNode * project_node, | |
arrow::compute::MakeExecNode("project", plan.get(), {filter_node}, | |
arrow::compute::ProjectNodeOptions{ | |
{ | |
arrow::compute::field_ref("int"), | |
arrow::compute::field_ref("str"), | |
arrow::compute::call( | |
"round", | |
{arrow::compute::call( | |
"add_checked", | |
{arrow::compute::field_ref("int"), arrow::compute::literal(1.563)} | |
)}, | |
arrow::compute::RoundOptions(1, arrow::compute::RoundMode::HALF_TO_EVEN) | |
) | |
}, | |
{"int","str","dbl"} | |
} | |
) | |
); | |
//std::cout << "Schema after projection : \n" << project_node->output_schema()->ToString() << std::endl; | |
// aggregate node | |
auto options = std::make_shared<arrow::compute::CountOptions>(arrow::compute::CountOptions::ONLY_VALID); | |
auto aggregate_options = | |
arrow::compute::AggregateNodeOptions{/*aggregates=*/{{"hash_max", options, "int", "foo"}, {"hash_min", options, "dbl", "bar"}}, | |
/*keys=*/{"str"}}; | |
ARROW_ASSIGN_OR_RAISE( | |
arrow::compute::ExecNode * aggregate_node, | |
arrow::compute::MakeExecNode("aggregate", plan.get(), {project_node}, aggregate_options)); | |
// sink node | |
ARROW_RETURN_NOT_OK( | |
arrow::compute::MakeExecNode("sink", plan.get(), {aggregate_node}, arrow::compute::SinkNodeOptions{&sink_gen}) | |
); | |
auto result = ExecutePlanAndCollectAsTable(plan, aggregate_node->output_schema(), sink_gen); | |
return arrow::Status::OK(); | |
} | |
int main(int argc, char** argv) { | |
auto status = Execute(); | |
if (!status.ok()) { | |
std::cerr << "Error occurred : " << status.message() << std::endl; | |
return EXIT_FAILURE; | |
} | |
return EXIT_SUCCESS; | |
} |
You might need to do do this before compiling:
export CPLUS_INCLUDE_PATH=/usr/local/opt/llvm/include/c++/v1:/Library/Developer/CommandLineTools/SDKs/MacOSX.sdk/usr/include
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Compile with
Run with