ROS2のexecutorについて
ROS2では、ノード内で発生する各種コールバック(Subscription, Timer, Service, Actionなど)を executor(エグゼキューター) が管理・実行します。
Node
├── Subscription
├── Timer
├── Service
└── Action
↓
Executor がスケジューリング
本記事では以下の内容を説明します。
- executorの種類(singleThreadedExecutor と multithreadedExecutor)
- コールバックグループの使い方
- マルチスレッド使用時の注意点
1. エグキューターの種類
主に使用されるエグゼキューターは以下の2種類になります。
- singleThreadedExecutor
rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(node);
executor.spin();
特徴:
デフォルトで使用されるエグゼキューター。
全てのコールバックが1つのスレッド内で順番に実行されるので、あるコールバックの処理中に他のコールバックは処理されない。
例:トピックのsubscription内で重い画像処理をしている最中は、Timerが発火しない。
処理時間を要するコールバック処理がないノードや、リアルタイム性の求められないノードに使用する。
- multithreadedExecutor
rclcpp::executors::MultiThreadedExecutor executor;
executor.add_node(node);
executor.spin();
特徴:
コールバックを複数のスレッドで実行するため、並列処理が可能。
共有リソースの競合が発生しうるので設計に注意が必要。
multithreadedExecutorを使用すれば必ず並列処理が実装されるわけではないことに注意が必要です。
multithreadedExecutor使用時の実装では、コールバックグループが重要になります。
コールバックグループとは
コールバックグループは
「どのコールバック同士を並列実行してよいか」を定義する仕組み
になります。
グループには2つの種類があります。
| グループ | 特徴 |
|---|---|
| MutuallyExclusive | グループ内のコールバックは並列処理されない |
| Reentrant | グループ内のコールバックが並列処理される |
- MutuallyExclusive(排他型)
同一のMutuallyExclusiveに所属するコールバック群は、同時には実行されません。
ハードウェア制御などで、共有リソースの競合を防ぐ用途・排他制御の実装に活用されます。
auto group = this->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive);
rclcpp::SubscriptionOptions options;
options.callback_group = group;
subscription_ = this->create_subscription<std_msgs::msg::String>(
"topic",
10,
std::bind(&MyNode::callback, this, std::placeholders::_1),
options);
multithreadedExecutor使用時、コールバックはデフォルトでこのグループに所属するので並列実行されないことに注意。
#include <chrono>
#include <memory>
#include <thread>
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
using namespace std::chrono_literals;
class SampleNode : public rclcpp::Node
{
public:
SampleNode()
: Node("sample")
{
// タイマー(1秒周期)
timer_ = this->create_wall_timer(
1s,
std::bind(&SampleNode::timer_callback, this)
);
// サブスクライバ
sub_ = this->create_subscription<std_msgs::msg::String>(
"topic",
10,
std::bind(&SampleNode::sub_callback, this, std::placeholders::_1)
);
}
private:
void timer_callback()
{
std::this_thread::sleep_for(std::chrono::seconds(3));
RCLCPP_INFO(this->get_logger(), "Timer done");
}
void sub_callback(const std_msgs::msg::String::SharedPtr msg)
{
(void)msg; // 未使用警告防止
RCLCPP_INFO(this->get_logger(), "Sub received");
}
rclcpp::TimerBase::SharedPtr timer_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sub_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<SampleNode>());
rclcpp::shutdown();
return 0;
}
並列処理を実現するには、以下のReentrantを使用します。
- Reentrant(リエントラント型)
同一のReentrantに所属するコールバック群も同時に実行されます。同じコールバック関数が同時に走ることもあります。計算処理や独立したタスク群を並列処理する場合に使用されますが、マルチスレッド処理になるのでスレッドセーフに実装されるように注意が必要。
auto group = this->create_callback_group(
rclcpp::CallbackGroupType::Reentrant);
rclcpp::SubscriptionOptions options;
options.callback_group = group;
subscription_ = this->create_subscription<std_msgs::msg::String>(
"topic",
10,
std::bind(&MyNode::callback, this, std::placeholders::_1),
options);
以下のように記述すれば、コールバックが並列実行されます。
#include <chrono>
#include <memory>
#include <thread>
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
using namespace std::chrono_literals;
class SampleNode : public rclcpp::Node
{
public:
SampleNode()
: Node("sample")
{
// Reentrant Callback Group 作成
callback_group_ =
this->create_callback_group(
rclcpp::CallbackGroupType::Reentrant
);
// タイマー用オプション設定
rclcpp::TimerBase::Options timer_options;
timer_options.callback_group = callback_group_;
timer_ = this->create_wall_timer(
1s,
std::bind(&SampleNode::timer_callback, this),
timer_options
);
// サブスクライバ用オプション設定
rclcpp::SubscriptionOptions sub_options;
sub_options.callback_group = callback_group_;
sub_ = this->create_subscription<std_msgs::msg::String>(
"topic",
10,
std::bind(&SampleNode::sub_callback, this, std::placeholders::_1),
sub_options
);
}
private:
void timer_callback()
{
std::this_thread::sleep_for(std::chrono::seconds(3));
RCLCPP_INFO(this->get_logger(), "Timer done");
}
void sub_callback(const std_msgs::msg::String::SharedPtr msg)
{
(void)msg;
RCLCPP_INFO(this->get_logger(), "Sub received");
}
rclcpp::CallbackGroup::SharedPtr callback_group_;
rclcpp::TimerBase::SharedPtr timer_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sub_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
auto node = std::make_shared<SampleNode>();
// Reentrant を活かすため MultiThreadedExecutor 推奨
rclcpp::executors::MultiThreadedExecutor executor;
executor.add_node(node);
executor.spin();
rclcpp::shutdown();
return 0;
}
- 特徴の比較表
| MutuallyExclusive | Reentrant | |
|---|---|---|
| 同時実行 | しない | する |
| スレッドセーフ | 容易 | 競合に注意が必要 |
| 処理能力 | やや低い | 高い |
| 主な用途 | ハードウェア制御・状態管理 | 並列計算 |
使い分けの目安
- 重い処理はReentrantグループに入れる。
- 共有リソースを扱うようなものはMutuallyExclusiveグループに入れるなど、ロック管理が必要。