環境
この記事は以下の環境で動いています。
項目 | 値 |
---|---|
CPU | Core i5-8250U |
Ubuntu | 20.04 |
ROS | Noetic |
インストールについてはROS講座02 インストールを参照してください。
またこの記事のプログラムはgithubにアップロードされています。ROS講座11 gitリポジトリを参照してください。
概要
PublishとSubscribeはROSの中心的な概念で、最重要の仕組みです。プログラムをノード単位に分けてその間はpub、sub通信にする「疎結合」の概念は有用ですが、反面ノード間の通信には遅延が発生します。そこでその遅延を測定しました。今回start、bridge、endの3つのノードを作成します。startは1hzでrosトピックをパブリッシュするノード、btridgeはsubしたデータをそのままpubするノード、endはトピックをsubするだけのノードです。そのうえでstart->bridge1->bridge2->bridge3->bridge4->endのように5回の通信が発生するようなlaunchをします。
今回は各ノードのloop_rateを1,10,100に変えるのに加えて、ros::spin()で待つ方法の4つを比べてみます。
ソースコード
publishノード
#include <ros/ros.h>
#include <std_msgs/String.h>
float HZ = 10;
int main(int argc, char** argv)
{
ros::init(argc, argv, "basic_timing_talker");
ros::NodeHandle n;
ros::NodeHandle pn("~");
pn.getParam("HZ", HZ);
ros::Publisher chatter_pub = n.advertise<std_msgs::String>("chatter", 10);
ros::Rate loop_rate(HZ);
int count = 0;
while (ros::ok())
{
char word[10];
sprintf(word, "%03i", count);
std_msgs::String msg;
msg.data = word;
ROS_INFO("PUB:%03i:%f", count, ros::Time::now().toSec());
chatter_pub.publish(msg);
ros::spinOnce();
loop_rate.sleep();
++count;
}
return 0;
}
中継ノード
#include <ros/ros.h>
#include <std_msgs/String.h>
ros::Publisher chatter_pub;
int number = 0;
float HZ = 10;
void chatterCallback(const std_msgs::String::ConstPtr& msg)
{
ROS_INFO("N%02i:%s:%f", number, msg->data.c_str(), ros::Time::now().toSec());
chatter_pub.publish(msg);
}
int main(int argc, char** argv)
{
ros::init(argc, argv, "basic_timing_bridge");
ros::NodeHandle n;
ros::NodeHandle pn("~");
pn.getParam("number", number);
pn.getParam("HZ", HZ);
chatter_pub = n.advertise<std_msgs::String>("chatter_out", 1000);
ros::Subscriber chatter_sub = n.subscribe("chatter_in", 1000, chatterCallback);
if (HZ > 0)
{
ros::Rate loop_rate(HZ);
while (ros::ok())
{
ros::spinOnce();
loop_rate.sleep();
}
}
else
ros::spin();
return 0;
}
subscribeノード
#include <ros/ros.h>
#include <std_msgs/String.h>
float HZ = 10;
void chatterCallback(const std_msgs::String::ConstPtr& msg)
{
ROS_INFO("SUB: %s:%f", msg->data.c_str(), ros::Time::now().toSec());
}
int main(int argc, char** argv)
{
ros::init(argc, argv, "basic_timing_listener");
ros::NodeHandle n;
ros::NodeHandle pn("~");
pn.getParam("HZ", HZ);
ros::Subscriber sub = n.subscribe("chatter", 1000, chatterCallback);
if (HZ > 0)
{
ros::Rate loop_rate(HZ);
while (ros::ok())
{
ros::spinOnce();
loop_rate.sleep();
}
}
else
ros::spin();
return 0;
}
各ROSノードでは無限ループの中の動作周期がHZという変数で制御できるようになっていてlaunchで変更できるようになっています。またHZを0にするとbridgeとlistenerはros::spin()
で待つようになります。
CmakeList
CMakeList.txtに以下の行を追加します。
add_executable(basic_timing_talker src/basic_timing_talker.cpp)
add_executable(basic_timing_listener src/basic_timing_listener.cpp)
add_executable(basic_timing_bridge src/basic_timing_bridge.cpp)
target_link_libraries(basic_timing_talker
${catkin_LIBRARIES}
)
target_link_libraries(basic_timing_bridge
${catkin_LIBRARIES}
)
target_link_libraries(basic_timing_listener
${catkin_LIBRARIES}
)
launch
以下のようなlaunchを組みました。
<launch>
<arg name="HZ_ALL" default="1"/>
<node name="basic_timing_talker1" pkg="basic_lecture" type="basic_timing_talker" output="screen">
<remap from="chatter" to="chatter1"/>
<param name="HZ" value="1"/>
</node>
<node name="basic_timing_bridge1" pkg="basic_lecture" type="basic_timing_bridge" output="screen">
<param name="number" value="1"/>
<remap from="chatter_in" to="chatter1"/>
<remap from="chatter_out" to="chatter2"/>
<param name="HZ" value="$(arg HZ_ALL)"/>
</node>
<node name="basic_timing_bridge2" pkg="basic_lecture" type="basic_timing_bridge" output="screen">
<param name="number" value="2"/>
<remap from="chatter_in" to="chatter2"/>
<remap from="chatter_out" to="chatter3"/>
<param name="HZ" value="$(arg HZ_ALL)"/>
</node>
<node name="basic_timing_bridge3" pkg="basic_lecture" type="basic_timing_bridge" output="screen">
<param name="number" value="3"/>
<remap from="chatter_in" to="chatter3"/>
<remap from="chatter_out" to="chatter4"/>
<param name="HZ" value="$(arg HZ_ALL)"/>
</node>
<node name="basic_timing_bridge4" pkg="basic_lecture" type="basic_timing_bridge" output="screen">
<param name="number" value="4"/>
<remap from="chatter_in" to="chatter4"/>
<remap from="chatter_out" to="chatter5"/>
<param name="HZ" value="$(arg HZ_ALL)"/>
</node>
<node name="basic_timing_listener1" pkg="basic_lecture" type="basic_timing_listener" output="screen">
<remap from="chatter" to="chatter5"/>
<param name="HZ" value="$(arg HZ_ALL)"/>
</node>
</launch>
このlaunchを実行すると以下のようにtalkerから出たトピックをbridgeがバケツリレーをしてlistenerに伝達されます。
2行目のHZ_ALLが各ROSノードでの無限ループの周波数を決める部分です。後の実験ではここを1,10,100に変更して実験しています。
ビルド
cd catkin_ws
catkin build
実行
実行すると以下のように各ノードでのタイムスタンプが表示されます。今回はstartノード(PUB)からendノード(SUB)に到達するまでの時間を図ります。
各ターミナルごとに実行前にsource ~/catkin_ws/devel/setup.bash
を実行する必要があります。
roslaunch basic_lecture timing.launch HZ_ALL:=1
roslaunch basic_lecture timing.launch HZ_ALL:=10
roslaunch basic_lecture timing.launch HZ_ALL:=100
roslaunch basic_lecture timing.launch HZ_ALL:=0
上記のlaunchを実行すると以下のように各ノードでの実行時間が表示されます。この表示からROSトピックが伝達されるのにどれぐらいの時間がかかるのかを測定します。
[ INFO] [1526205649.053395715]: PUB:001:1526205649.053309
[ INFO] [1526205649.059550683]: N01:001:1526205649.059352
[ INFO] [1526205649.060525375]: N02:001:1526205649.060395
[ INFO] [1526205649.070203199]: N03:001:1526205649.070085
[ INFO] [1526205649.075595323]: N04:001:1526205649.075392
[ INFO] [1526205649.084927591]: SUB:001:1526205649.084850
[ INFO] [1526205650.053993449]: PUB:002:1526205650.053881
[ INFO] [1526205650.060140235]: N01:002:1526205650.060027
[ INFO] [1526205650.070058918]: N02:002:1526205650.069946
[ INFO] [1526205650.080793566]: N03:002:1526205650.080732
[ INFO] [1526205650.087050641]: N04:002:1526205650.087002
[ INFO] [1526205650.094194838]: SUB:002:1526205650.094159
結果
HZ_ALL | 時間(start->end) | 時間(1 HOP) |
---|---|---|
1 | 1123ms | 224.6ms |
10 | 116ms | 22.6ms |
100 | 18ms | 3.6ms |
spin | 7.5ms | 1.5ms |
spin()を使った時が圧倒的な速さです。loopの時は、HZの値は大きいほうが遅延は小さいみたいです。ただHZの値が大きいとCPU負荷が高くなるのでそこはバランスを見る必要がありそうです。
コメント
ROSのsubscriberのcallbackの処理はsubscribeをした瞬間に実行されると誤解されがちですが、実際はsubscribeした時にそれはstackに積まれます。メインループの処理が終わってros::spinOnce()をした時にstackに積まれている処理を一気に行います。そのためにloop_rateの値に処理のタイミングが左右されます。
ros::spin()で処理をするとメインループで定期的な処理が実行できなくなる半面、subscribeした瞬間に処理が行われるので遅延が少なくなります。
実はspinを使ってもtimerを使うと定時処理ができます。