Skip to content

Instantly share code, notes, and snippets.

@Micrified
Created March 4, 2020 09:06
Show Gist options
  • Save Micrified/5fa1c9b98d8ce8da613972f8914d5f5c to your computer and use it in GitHub Desktop.
Save Micrified/5fa1c9b98d8ce8da613972f8914d5f5c to your computer and use it in GitHub Desktop.
ROS2 Executor callchain
bool Executor::get_next_ready_executable(AnyExecutable & any_executable)
{
bool success = false;
// Check the timers to see if there are any that are ready
memory_strategy_->get_next_timer(any_executable, weak_nodes_);
if (any_executable.timer) {
success = true;
}
if (!success) {
// Check the subscriptions to see if there are any that are ready
memory_strategy_->get_next_subscription(any_executable, weak_nodes_);
if (any_executable.subscription) {
success = true;
}
}
if (!success) {
// Check the services to see if there are any that are ready
memory_strategy_->get_next_service(any_executable, weak_nodes_);
if (any_executable.service) {
success = true;
}
}
if (!success) {
// Check the clients to see if there are any that are ready
memory_strategy_->get_next_client(any_executable, weak_nodes_);
if (any_executable.client) {
success = true;
}
}
if (!success) {
// Check the waitables to see if there are any that are ready
memory_strategy_->get_next_waitable(any_executable, weak_nodes_);
if (any_executable.waitable) {
success = true;
}
}
// At this point any_exec should be valid with either a valid subscription
// or a valid timer, or it should be a null shared_ptr
if (success) {
// If it is valid, check to see if the group is mutually exclusive or
// not, then mark it accordingly
using callback_group::CallbackGroupType;
if (
any_executable.callback_group &&
any_executable.callback_group->type() == CallbackGroupType::MutuallyExclusive)
{
// It should not have been taken otherwise
assert(any_executable.callback_group->can_be_taken_from().load());
// Set to false to indicate something is being run from this group
// This is reset to true either when the any_exec is executed or when the
// any_exec is destructued
any_executable.callback_group->can_be_taken_from().store(false);
}
}
// If there is no ready executable, return a null ptr
return success;
}
void Executor::execute_any_executable(AnyExecutable & any_exec)
{
if (!spinning.load()) {
return;
}
if (any_exec.timer) {
execute_timer(any_exec.timer);
}
if (any_exec.subscription) {
execute_subscription(any_exec.subscription);
}
if (any_exec.service) {
execute_service(any_exec.service);
}
if (any_exec.client) {
execute_client(any_exec.client);
}
if (any_exec.waitable) {
any_exec.waitable->execute();
}
// Reset the callback_group, regardless of type
any_exec.callback_group->can_be_taken_from().store(true);
// Wake the wait, because it may need to be recalculated or work that
// was previously blocked is now available.
if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) {
throw std::runtime_error(rcl_get_error_string().str);
}
}
// https://github.com/ros2/rclcpp/blob/master/rclcpp/src/rclcpp/executor.cpp
void Executor::execute_client(rclcpp::ClientBase::SharedPtr client)
{
// Automatics
auto request_header = client->create_request_header();
std::shared_ptr<void> response = client->create_response();
// Obtain pointers
rcl_ret_t status = rcl_take_response(client->get_client_handle().get(),request_header.get(),response.get());
// Execute if okay, otherwise log error
if (status == RCL_RET_OK) {
client->handle_response(request_header, response);
} else if (status != RCL_RET_CLIENT_TAKE_FAILED) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"take response failed for client of service '%s': %s",
client->get_service_name(), rcl_get_error_string().str);
rcl_reset_error();
}
}
// https://github.com/ros2/rclcpp/blob/master/rclcpp/include/rclcpp/client.hpp
// std::map<int64_t, std::tuple<SharedPromise, CallbackType, SharedFuture>> pending_requests_;
// Pending requests maps an integer to a tuple
void handle_response(std::shared_ptr<rmw_request_id_t> request_header, std::shared_ptr<void> response) override {
// Automatics
std::unique_lock<std::mutex> lock(pending_requests_mutex_);
auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(response);
int64_t sequence_number = request_header->sequence_number;
// TODO(esteve) this should throw instead since it is not expected to happen in the first place
if (this->pending_requests_.count(sequence_number) == 0) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Received invalid sequence number. Ignoring...");
return;
}
// Index a tuple
auto tuple = this->pending_requests_[sequence_number];
// Obtain first element of the tuple
auto call_promise = std::get<0>(tuple);
// Obtain second element of the tuple
auto callback = std::get<1>(tuple);
// Obtain third element of the tuple
auto future = std::get<2>(tuple);
// Remove sequence number from pending requests
this->pending_requests_.erase(sequence_number);
// Unlock here to allow the service to be called recursively from one of its callbacks.
lock.unlock();
// Passes a pointer to the promise object
call_promise->set_value(typed_response);
// Invokes the callback, passing the future (third tuple element)
callback(future);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment