Skip to content

Instantly share code, notes, and snippets.

@ianmcook
Created August 17, 2023 21:19
Show Gist options
  • Select an option

  • Save ianmcook/2aa9aa82e61c3ea4405450b93cf80fbc to your computer and use it in GitHub Desktop.

Select an option

Save ianmcook/2aa9aa82e61c3ea4405450b93cf80fbc to your computer and use it in GitHub Desktop.
Sort an Arrow Table with Acero
#include <iostream>
#include <arrow/api.h>
#include <arrow/result.h>
#include <arrow/compute/api.h>
#include <arrow/compute/exec/exec_plan.h>
arrow::Status ExecutePlanAndCollectAsTable(
std::shared_ptr<arrow::compute::ExecPlan> plan,
std::shared_ptr<arrow::Schema> schema,
arrow::AsyncGenerator<std::optional<arrow::compute::ExecBatch>> sink_gen) {
// translate sink_gen (async) to sink_reader (sync)
std::shared_ptr<arrow::RecordBatchReader> sink_reader =
arrow::compute::MakeGeneratorReader(schema, std::move(sink_gen), arrow::default_memory_pool());
// validate the ExecPlan
ARROW_RETURN_NOT_OK(plan->Validate());
std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
// start the ExecPlan
plan->StartProducing();
// collect sink_reader into a Table
std::shared_ptr<arrow::Table> response_table;
ARROW_ASSIGN_OR_RAISE(response_table,
arrow::Table::FromRecordBatchReader(sink_reader.get()));
std::cout << "Results : " << std::endl << response_table->ToString() << std::endl;
// stop producing
plan->StopProducing();
// plan mark finished
auto future = plan->finished();
return future.status();
}
arrow::Status Execute() {
//std::cout << arrow::GetBuildInfo().version_string << std::endl;
auto null_long = std::numeric_limits<int>::quiet_NaN();
arrow::Int32Builder int_builder;
ARROW_RETURN_NOT_OK(int_builder.Append(1));
ARROW_RETURN_NOT_OK(int_builder.Append(1));
ARROW_RETURN_NOT_OK(int_builder.AppendNull());
ARROW_RETURN_NOT_OK(int_builder.Append(2));
ARROW_RETURN_NOT_OK(int_builder.Append(3));
ARROW_RETURN_NOT_OK(int_builder.Append(4));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> int_array, int_builder.Finish());
arrow::StringBuilder str_builder;
ARROW_RETURN_NOT_OK(str_builder.Append("a"));
ARROW_RETURN_NOT_OK(str_builder.Append("a"));
ARROW_RETURN_NOT_OK(str_builder.Append("a"));
ARROW_RETURN_NOT_OK(str_builder.Append("a"));
ARROW_RETURN_NOT_OK(str_builder.Append("b"));
ARROW_RETURN_NOT_OK(str_builder.Append("b"));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> str_array, str_builder.Finish());
std::vector<std::shared_ptr<arrow::Field>> schema_vector = {
arrow::field("int", arrow::int32()),
arrow::field("str", arrow::utf8())
};
auto schema = std::make_shared<arrow::Schema>(schema_vector);
std::shared_ptr<arrow::Table> table = arrow::Table::Make(schema, {int_array, str_array});
// std::cout << "Data : " << std::endl << table->ToString() << std::endl;
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<arrow::compute::ExecPlan> plan,
arrow::compute::ExecPlan::Make(*arrow::compute::threaded_exec_context())
);
arrow::AsyncGenerator<std::optional<arrow::compute::ExecBatch>> sink_gen;
int max_batch_size = 100;
auto table_source_options = arrow::compute::TableSourceNodeOptions{table, max_batch_size};
// source node
ARROW_ASSIGN_OR_RAISE(
arrow::compute::ExecNode * source_node,
arrow::compute::MakeExecNode("table_source", plan.get(), {}, table_source_options)
);
// order_by_sink node
ARROW_RETURN_NOT_OK(
arrow::compute::MakeExecNode("order_by_sink", plan.get(), {source_node},
arrow::compute::OrderBySinkNodeOptions{
arrow::compute::SortOptions{
{arrow::compute::SortKey{"int", arrow::compute::SortOrder::Descending}}
},
&sink_gen
}
)
);
auto result = ExecutePlanAndCollectAsTable(plan, source_node->output_schema(), sink_gen);
return arrow::Status::OK();
}
int main(int argc, char** argv) {
auto status = Execute();
if (!status.ok()) {
std::cerr << "Error occurred : " << status.message() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment