Created
March 4, 2020 09:06
-
-
Save Micrified/5fa1c9b98d8ce8da613972f8914d5f5c to your computer and use it in GitHub Desktop.
ROS2 Executor callchain
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
bool Executor::get_next_ready_executable(AnyExecutable & any_executable) | |
{ | |
bool success = false; | |
// Check the timers to see if there are any that are ready | |
memory_strategy_->get_next_timer(any_executable, weak_nodes_); | |
if (any_executable.timer) { | |
success = true; | |
} | |
if (!success) { | |
// Check the subscriptions to see if there are any that are ready | |
memory_strategy_->get_next_subscription(any_executable, weak_nodes_); | |
if (any_executable.subscription) { | |
success = true; | |
} | |
} | |
if (!success) { | |
// Check the services to see if there are any that are ready | |
memory_strategy_->get_next_service(any_executable, weak_nodes_); | |
if (any_executable.service) { | |
success = true; | |
} | |
} | |
if (!success) { | |
// Check the clients to see if there are any that are ready | |
memory_strategy_->get_next_client(any_executable, weak_nodes_); | |
if (any_executable.client) { | |
success = true; | |
} | |
} | |
if (!success) { | |
// Check the waitables to see if there are any that are ready | |
memory_strategy_->get_next_waitable(any_executable, weak_nodes_); | |
if (any_executable.waitable) { | |
success = true; | |
} | |
} | |
// At this point any_exec should be valid with either a valid subscription | |
// or a valid timer, or it should be a null shared_ptr | |
if (success) { | |
// If it is valid, check to see if the group is mutually exclusive or | |
// not, then mark it accordingly | |
using callback_group::CallbackGroupType; | |
if ( | |
any_executable.callback_group && | |
any_executable.callback_group->type() == CallbackGroupType::MutuallyExclusive) | |
{ | |
// It should not have been taken otherwise | |
assert(any_executable.callback_group->can_be_taken_from().load()); | |
// Set to false to indicate something is being run from this group | |
// This is reset to true either when the any_exec is executed or when the | |
// any_exec is destructued | |
any_executable.callback_group->can_be_taken_from().store(false); | |
} | |
} | |
// If there is no ready executable, return a null ptr | |
return success; | |
} | |
void Executor::execute_any_executable(AnyExecutable & any_exec) | |
{ | |
if (!spinning.load()) { | |
return; | |
} | |
if (any_exec.timer) { | |
execute_timer(any_exec.timer); | |
} | |
if (any_exec.subscription) { | |
execute_subscription(any_exec.subscription); | |
} | |
if (any_exec.service) { | |
execute_service(any_exec.service); | |
} | |
if (any_exec.client) { | |
execute_client(any_exec.client); | |
} | |
if (any_exec.waitable) { | |
any_exec.waitable->execute(); | |
} | |
// Reset the callback_group, regardless of type | |
any_exec.callback_group->can_be_taken_from().store(true); | |
// Wake the wait, because it may need to be recalculated or work that | |
// was previously blocked is now available. | |
if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) { | |
throw std::runtime_error(rcl_get_error_string().str); | |
} | |
} | |
// https://github.com/ros2/rclcpp/blob/master/rclcpp/src/rclcpp/executor.cpp | |
void Executor::execute_client(rclcpp::ClientBase::SharedPtr client) | |
{ | |
// Automatics | |
auto request_header = client->create_request_header(); | |
std::shared_ptr<void> response = client->create_response(); | |
// Obtain pointers | |
rcl_ret_t status = rcl_take_response(client->get_client_handle().get(),request_header.get(),response.get()); | |
// Execute if okay, otherwise log error | |
if (status == RCL_RET_OK) { | |
client->handle_response(request_header, response); | |
} else if (status != RCL_RET_CLIENT_TAKE_FAILED) { | |
RCUTILS_LOG_ERROR_NAMED( | |
"rclcpp", | |
"take response failed for client of service '%s': %s", | |
client->get_service_name(), rcl_get_error_string().str); | |
rcl_reset_error(); | |
} | |
} | |
// https://github.com/ros2/rclcpp/blob/master/rclcpp/include/rclcpp/client.hpp | |
// std::map<int64_t, std::tuple<SharedPromise, CallbackType, SharedFuture>> pending_requests_; | |
// Pending requests maps an integer to a tuple | |
void handle_response(std::shared_ptr<rmw_request_id_t> request_header, std::shared_ptr<void> response) override { | |
// Automatics | |
std::unique_lock<std::mutex> lock(pending_requests_mutex_); | |
auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(response); | |
int64_t sequence_number = request_header->sequence_number; | |
// TODO(esteve) this should throw instead since it is not expected to happen in the first place | |
if (this->pending_requests_.count(sequence_number) == 0) { | |
RCUTILS_LOG_ERROR_NAMED( | |
"rclcpp", | |
"Received invalid sequence number. Ignoring..."); | |
return; | |
} | |
// Index a tuple | |
auto tuple = this->pending_requests_[sequence_number]; | |
// Obtain first element of the tuple | |
auto call_promise = std::get<0>(tuple); | |
// Obtain second element of the tuple | |
auto callback = std::get<1>(tuple); | |
// Obtain third element of the tuple | |
auto future = std::get<2>(tuple); | |
// Remove sequence number from pending requests | |
this->pending_requests_.erase(sequence_number); | |
// Unlock here to allow the service to be called recursively from one of its callbacks. | |
lock.unlock(); | |
// Passes a pointer to the promise object | |
call_promise->set_value(typed_response); | |
// Invokes the callback, passing the future (third tuple element) | |
callback(future); | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment