LoginSignup
17
11

More than 1 year has passed since last update.

ROS講座08 pub & sub 遅延測定

Last updated at Posted at 2018-05-13

環境

この記事は以下の環境で動いています。

項目
CPU Core i5-8250U
Ubuntu 20.04
ROS Noetic

インストールについてはROS講座02 インストールを参照してください。
またこの記事のプログラムはgithubにアップロードされています。ROS講座11 gitリポジトリを参照してください。

概要

PublishとSubscribeはROSの中心的な概念で、最重要の仕組みです。プログラムをノード単位に分けてその間はpub、sub通信にする「疎結合」の概念は有用ですが、反面ノード間の通信には遅延が発生します。そこでその遅延を測定しました。今回start、bridge、endの3つのノードを作成します。startは1hzでrosトピックをパブリッシュするノード、btridgeはsubしたデータをそのままpubするノード、endはトピックをsubするだけのノードです。そのうえでstart->bridge1->bridge2->bridge3->bridge4->endのように5回の通信が発生するようなlaunchをします。
今回は各ノードのloop_rateを1,10,100に変えるのに加えて、ros::spin()で待つ方法の4つを比べてみます。

ソースコード

publishノード

basic_lecture/src/basic_timing_talker.cpp
#include <ros/ros.h>
#include <std_msgs/String.h>

float HZ = 10;
int main(int argc, char** argv)
{
  ros::init(argc, argv, "basic_timing_talker");
  ros::NodeHandle n;
  ros::NodeHandle pn("~");
  pn.getParam("HZ", HZ);
  ros::Publisher chatter_pub = n.advertise<std_msgs::String>("chatter", 10);

  ros::Rate loop_rate(HZ);
  int count = 0;
  while (ros::ok())
  {
    char word[10];
    sprintf(word, "%03i", count);
    std_msgs::String msg;
    msg.data = word;
    ROS_INFO("PUB:%03i:%f", count, ros::Time::now().toSec());
    chatter_pub.publish(msg);
    ros::spinOnce();
    loop_rate.sleep();
    ++count;
  }
  return 0;
}

中継ノード

basic_lecture/src/basic_timing_bridge.cpp
#include <ros/ros.h>
#include <std_msgs/String.h>

ros::Publisher chatter_pub;
int number = 0;
float HZ = 10;

void chatterCallback(const std_msgs::String::ConstPtr& msg)
{
  ROS_INFO("N%02i:%s:%f", number, msg->data.c_str(), ros::Time::now().toSec());
  chatter_pub.publish(msg);
}

int main(int argc, char** argv)
{
  ros::init(argc, argv, "basic_timing_bridge");
  ros::NodeHandle n;
  ros::NodeHandle pn("~");
  pn.getParam("number", number);
  pn.getParam("HZ", HZ);
  chatter_pub = n.advertise<std_msgs::String>("chatter_out", 1000);
  ros::Subscriber chatter_sub = n.subscribe("chatter_in", 1000, chatterCallback);

  if (HZ > 0)
  {
    ros::Rate loop_rate(HZ);
    while (ros::ok())
    {
      ros::spinOnce();
      loop_rate.sleep();
    }
  }
  else
    ros::spin();
  return 0;
}

subscribeノード

basic_lecture/src/basic_timing_listener.cpp
#include <ros/ros.h>
#include <std_msgs/String.h>

float HZ = 10;

void chatterCallback(const std_msgs::String::ConstPtr& msg)
{
  ROS_INFO("SUB: %s:%f", msg->data.c_str(), ros::Time::now().toSec());
}

int main(int argc, char** argv)
{
  ros::init(argc, argv, "basic_timing_listener");
  ros::NodeHandle n;
  ros::NodeHandle pn("~");
  pn.getParam("HZ", HZ);
  ros::Subscriber sub = n.subscribe("chatter", 1000, chatterCallback);

  if (HZ > 0)
  {
    ros::Rate loop_rate(HZ);
    while (ros::ok())
    {
      ros::spinOnce();
      loop_rate.sleep();
    }
  }
  else
    ros::spin();
  return 0;
}

各ROSノードでは無限ループの中の動作周期がHZという変数で制御できるようになっていてlaunchで変更できるようになっています。またHZを0にするとbridgeとlistenerはros::spin()で待つようになります。

CmakeList

CMakeList.txtに以下の行を追加します。

basic_lecture/CMakeLists.txtに追加
add_executable(basic_timing_talker   src/basic_timing_talker.cpp)
add_executable(basic_timing_listener src/basic_timing_listener.cpp)
add_executable(basic_timing_bridge   src/basic_timing_bridge.cpp)

target_link_libraries(basic_timing_talker
  ${catkin_LIBRARIES}
)
target_link_libraries(basic_timing_bridge
  ${catkin_LIBRARIES}
)
target_link_libraries(basic_timing_listener
  ${catkin_LIBRARIES}
)

launch

以下のようなlaunchを組みました。

basic_lecture/launch/timing.launch
<launch>
  <arg name="HZ_ALL" default="1"/>
  <node name="basic_timing_talker1" pkg="basic_lecture" type="basic_timing_talker"   output="screen">
    <remap from="chatter" to="chatter1"/>
    <param name="HZ" value="1"/>
  </node>
  <node name="basic_timing_bridge1" pkg="basic_lecture" type="basic_timing_bridge"   output="screen">
    <param name="number" value="1"/>
    <remap from="chatter_in"  to="chatter1"/>
    <remap from="chatter_out" to="chatter2"/>
    <param name="HZ" value="$(arg HZ_ALL)"/>
  </node>
  <node name="basic_timing_bridge2" pkg="basic_lecture" type="basic_timing_bridge"   output="screen">
    <param name="number" value="2"/>
    <remap from="chatter_in"  to="chatter2"/>
    <remap from="chatter_out" to="chatter3"/>
    <param name="HZ" value="$(arg HZ_ALL)"/>
  </node>
  <node name="basic_timing_bridge3" pkg="basic_lecture" type="basic_timing_bridge"   output="screen">
    <param name="number" value="3"/>
    <remap from="chatter_in"  to="chatter3"/>
    <remap from="chatter_out" to="chatter4"/>
    <param name="HZ" value="$(arg HZ_ALL)"/>
  </node>
  <node name="basic_timing_bridge4" pkg="basic_lecture" type="basic_timing_bridge"   output="screen">
    <param name="number" value="4"/>
    <remap from="chatter_in"  to="chatter4"/>
    <remap from="chatter_out" to="chatter5"/>
    <param name="HZ" value="$(arg HZ_ALL)"/>
  </node>
  <node name="basic_timing_listener1" pkg="basic_lecture" type="basic_timing_listener" output="screen">
    <remap from="chatter" to="chatter5"/>
    <param name="HZ" value="$(arg HZ_ALL)"/>
  </node>
</launch>

このlaunchを実行すると以下のようにtalkerから出たトピックをbridgeがバケツリレーをしてlistenerに伝達されます。
2行目のHZ_ALLが各ROSノードでの無限ループの周波数を決める部分です。後の実験ではここを1,10,100に変更して実験しています。

simple_timing_graph.png

ビルド

cd catkin_ws
catkin build

実行

実行すると以下のように各ノードでのタイムスタンプが表示されます。今回はstartノード(PUB)からendノード(SUB)に到達するまでの時間を図ります。
各ターミナルごとに実行前にsource ~/catkin_ws/devel/setup.bashを実行する必要があります。

1Hzで処理
 roslaunch basic_lecture timing.launch HZ_ALL:=1
10Hzで処理
 roslaunch basic_lecture timing.launch HZ_ALL:=10
100Hzで処理
 roslaunch basic_lecture timing.launch HZ_ALL:=100
spin()で処理
 roslaunch basic_lecture timing.launch HZ_ALL:=0

上記のlaunchを実行すると以下のように各ノードでの実行時間が表示されます。この表示からROSトピックが伝達されるのにどれぐらいの時間がかかるのかを測定します。

実行例(一部)
[ INFO] [1526205649.053395715]: PUB:001:1526205649.053309
[ INFO] [1526205649.059550683]: N01:001:1526205649.059352
[ INFO] [1526205649.060525375]: N02:001:1526205649.060395
[ INFO] [1526205649.070203199]: N03:001:1526205649.070085
[ INFO] [1526205649.075595323]: N04:001:1526205649.075392
[ INFO] [1526205649.084927591]: SUB:001:1526205649.084850
[ INFO] [1526205650.053993449]: PUB:002:1526205650.053881
[ INFO] [1526205650.060140235]: N01:002:1526205650.060027
[ INFO] [1526205650.070058918]: N02:002:1526205650.069946
[ INFO] [1526205650.080793566]: N03:002:1526205650.080732
[ INFO] [1526205650.087050641]: N04:002:1526205650.087002
[ INFO] [1526205650.094194838]: SUB:002:1526205650.094159

結果

HZ_ALL 時間(start->end) 時間(1 HOP)
1 1123ms 224.6ms
10 116ms 22.6ms
100 18ms 3.6ms
spin 7.5ms 1.5ms

spin()を使った時が圧倒的な速さです。loopの時は、HZの値は大きいほうが遅延は小さいみたいです。ただHZの値が大きいとCPU負荷が高くなるのでそこはバランスを見る必要がありそうです。

コメント

ROSのsubscriberのcallbackの処理はsubscribeをした瞬間に実行されると誤解されがちですが、実際はsubscribeした時にそれはstackに積まれます。メインループの処理が終わってros::spinOnce()をした時にstackに積まれている処理を一気に行います。そのためにloop_rateの値に処理のタイミングが左右されます。
ros::spin()で処理をするとメインループで定期的な処理が実行できなくなる半面、subscribeした瞬間に処理が行われるので遅延が少なくなります。
実はspinを使ってもtimerを使うと定時処理ができます。

目次ページへのリンク

ROS講座の目次へのリンク

17
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
17
11