0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ROS2でのゼロコピーやShared Memoryによる通信速度の向上

Posted at

はじめに

この記事はAizu Advent Calendar 2024の12日目の記事です。
いつも通り誕生日です。

前置き

現在、修士2年生である私はROS2を使用して研究手法の実現をしています。
具体的には、複数のRGB-Dカメラから色付き点群を取得し、その点群に対して前処理を行い、最終的に3D SLAMを行い処理速度の向上を図っています。
各処理はROS2のノード単位に分割しており、下図のようになっています。

このシステムでは、4つのRGB-Dカメラから色付き点群を取得するのですが、データサイズが巨大です。
色付き点群ですが、一つの点のデータサイズは色情報と位置情報から15B程度になります。
1つのRGB-Dカメラからは1280x720で92万点程度取れるでしょう。
なので、4つのRGB-Dカメラからは368万点の55MB近いデータが取れます。
(参考: IntelRealsense D435・PCLのPointXYZRGB型)

そのデータサイズから各ノードの処理速度が遅くなるのは当然ですが、
処理をノードごとに分割しているため同一マシン内でのノード間通信もシステム全体の速度に影響を与えることが懸念点となりました。

また、通信のために点群の型変換も問題でした。
点群を扱うライブラリとしてPCLライブラリを良く使用しているのですが、通信のためには以下のようにROS2の点群型への変換が必須です。

どうにかならないかと調べてみたところ、ROS2には同一マシン内でのノード間通信を高速にする仕組みがありました。
本記事では同一マシン内でのノード間通信の手法を比較することが目的です。

ROS2での通信方法

詳しいことは省きますが、ROS2には以下の通信の種類があります。

同一処理内での通信(Intra-process Communications)

ROS2では一つの実行単位(target)に対して複数のノードが実行可能であり、同一実行単位内でのノード間通信ではゼロコピー通信が可能です。

また、オリジナルの型からROS2の型への変換をTypeAdaptarクラスに定義し、送受信の型として指定すると、変換を行わない通信も行えます。

後述しますが、ノードをComponent化することで別パッケージの複数のノードを単一プロセスとして実行ができます。

同一ホスト内での通信(Shared Memory Transport)

同一ホスト内での通信では、Shared Memoryを通じての通信が行えます。
ROS2では特に難しい設定をせずとも、この通信がデフォルトで有効となっています。
実行を行うと/dev/shm内に新しくファイルが生成されているのを確認できます。

別ホスト間の通信

この通信では、UDPやTCPを使用した通信になります。
デフォルトでは、UDPを使用した通信になるようです。
UDPやTCPを使用して通信をしているのかはtcpdumpコマンドで確認ができます。

使用プログラム

共有ライブラリ化することで、コンポーネントとしてノードをビルドしています。
それらのノードをlaunchファイルから個別に実行するか同一プロセス内で実行するかを定義します。

プログラムの内容としてはvector<uint64>とROS2のUInt64MultiArrayのコピーによる相互変換を1億個のデータに対して行い通信をします。

Publisherノード

#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include <vector>
#include <climits>
#include <iomanip>

#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/u_int64_multi_array.hpp>

rclcpp::Clock system_clockr(RCL_SYSTEM_TIME);

using namespace std::chrono_literals;

template <>
struct rclcpp::TypeAdapter<std::vector<unsigned long long>, std_msgs::msg::UInt64MultiArray>
{
  using is_specialized = std::true_type;
  using custom_type = std::vector<unsigned long long>;
  using ros_message_type = std_msgs::msg::UInt64MultiArray;

  static void convert_to_ros_message(const custom_type &source, ros_message_type &destination)
  {
    for (auto s : source)
    {
      destination.data.push_back(s);
    }
    std::cout << &(destination.data) << " , " << &(destination.data[0]) << ":" << std::fixed << std::setprecision(9) << system_clockr.now().seconds() << std::endl;
  }

  static void convert_to_custom(const ros_message_type &source, custom_type &destination)
  {
    std::cout << &(source.data) << " , " << &(source.data[0]) << ":" << std::fixed << std::setprecision(9) << system_clockr.now().seconds() << std::endl;
    for (auto s : source.data)
    {
      destination.push_back(s);
    }
  }
};

namespace Sample
{
  class MinimalPublisher : public rclcpp::Node
  {
  public:
    MinimalPublisher(const rclcpp::NodeOptions &options)
        : Node("minimal_publisher", options), count_(0)
    {
      publisher_ = this->create_publisher<MyAdaptedType>("topic", 10);
      timer_ = this->create_wall_timer(
          20s, std::bind(&MinimalPublisher::timer_callback, this));

      tmp.clear();

      for (unsigned long long i = 1; i < 100 * 1000 * 1000; i++)
      {
        tmp.push_back(i % 255);
      }
      RCLCPP_INFO_STREAM(this->get_logger(), "start");
    }

  private:
    using MyAdaptedType = rclcpp::TypeAdapter<std::vector<unsigned long long>, std_msgs::msg::UInt64MultiArray>;
    void timer_callback()
    {
      std::unique_ptr<MyAdaptedType::custom_type> data = std::make_unique<MyAdaptedType::custom_type>();
      *data = tmp;
      data->front() = count;
      count++;

      RCLCPP_INFO_STREAM(this->get_logger(), data->front() << " : " << data.get() << " , " << &(*data)[0]);
      publisher_->publish(std::move(data));
    }
    int count = 0;
    MyAdaptedType::custom_type tmp;
    rclcpp::TimerBase::SharedPtr timer_;
    rclcpp::Publisher<MyAdaptedType>::SharedPtr publisher_;
    size_t count_;
  };
}

#include <rclcpp_components/register_node_macro.hpp>
RCLCPP_COMPONENTS_REGISTER_NODE(Sample::MinimalPublisher)

Subscriberノード

#include <memory>
#include <vector>
#include <iostream>
#include <iomanip>

#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/u_int64_multi_array.hpp>

rclcpp::Clock system_clockr(RCL_SYSTEM_TIME);

using std::placeholders::_1;

template <>
struct rclcpp::TypeAdapter<std::vector<unsigned long long>, std_msgs::msg::UInt64MultiArray>
{
  using is_specialized = std::true_type;
  using custom_type = std::vector<unsigned long long>;
  using ros_message_type = std_msgs::msg::UInt64MultiArray;

  static void convert_to_ros_message(const custom_type &source, ros_message_type &destination)
  {
    for (auto s : source)
    {
      destination.data.push_back(s);
    }
    std::cout << &(destination.data) << " , " << &(destination.data[0]) << ":" << std::fixed << std::setprecision(9) << system_clockr.now().seconds() << std::endl;
  }

  static void convert_to_custom(const ros_message_type &source, custom_type &destination)
  {
    std::cout << &(source.data) << " , " << &(source.data[0]) << ":" << std::fixed << std::setprecision(9) << system_clockr.now().seconds() << std::endl;
    for (auto s : source.data)
    {
      destination.push_back(s);
    }
  }
};

namespace Sample
{
  class MinimalSubscriber : public rclcpp::Node
  {
  public:
    MinimalSubscriber(const rclcpp::NodeOptions &options)
        : Node("minimal_subscriber", options)
    {
      subscription_ = this->create_subscription<MyAdaptedType>(
          "topic", 10, std::bind(&MinimalSubscriber::topic_callback, this, _1));
    }

  private:
    using MyAdaptedType = rclcpp::TypeAdapter<std::vector<unsigned long long>, std_msgs::msg::UInt64MultiArray>;
    void topic_callback(const std::unique_ptr<MyAdaptedType::custom_type> &msg) const
    {
      RCLCPP_INFO_STREAM(this->get_logger(), msg->front() << " : " << msg.get() << " , " << &(*msg)[0]);
    }
    rclcpp::Subscription<MyAdaptedType>::SharedPtr subscription_;
  };
}

#include <rclcpp_components/register_node_macro.hpp>
RCLCPP_COMPONENTS_REGISTER_NODE(Sample::MinimalSubscriber)

比較

Docker上で様々な通信による実行を行い比較します。
比較するのは以下の4パターンです。

  • 単一処理での通信(Intra-process有効)
  • 単一処理での通信(Intra-process無効)
  • 個別処理での通信
  • 別ホスト間での通信(別Dockerコンテナを作成)

以下の4つ出力します。

  • publish関数を呼び出す直前の時間
  • Publishする際の型変換後の時間
  • Subscribeした際の型変換前の時間
  • Subscribeのコールバック関数が呼ばれた直後の時間

単一処理での通信(Intra-process有効)

経過時間(ms)
Pub前の変換 変換しない
通信時間 0.0009227449244
Sub前の変換 変換しない
合計 0.0009227449244

さすがゼロコピーですね。
爆速です。

単一処理での通信(Intra-process無効)

経過時間(ms)
Pub前の変換 5.777174997
通信時間 0.3939839602
Sub前の変換 6.321440983
合計 12.49259994

ゼロコピーほどではないですが、通信が速いですね。
これがShared Memoryによる通信だと思います。
しかし、単一処理による影響なのか、後述の通信よりも変換の速度が遅いです。

個別処理での通信

経過時間(ms)
Pub前の変換 5.302677035
通信時間 2.281115031
Sub前の変換 5.324389982
合計 12.90818205

同一ホスト内での通信のはずですが遅いですね。
tcpdumpには通信が見られなかったのですが、Shared Memoryを使っていないかもです...

別ホスト間での通信

経過時間(ms)
Pub前の変換 5.535251999
通信時間 2.356626034
Sub前の変換 5.343773961
合計 13.23565199

さすがに一番遅いですね。
通信時間は「個別処理での通信」よりも遅い気がしますが、単に別ホスト間の影響で0.2秒ほど遅れているだけだと考えられます。

感想

比較結果からゼロコピーやShared Memory Transportの早さがわかりました。点群データは今回のデータよりも数が多く、更に変換作業もほぼ必須なので研究でも取り入れていきたいです。
ただ、同一処理内でShared Memory Transportが動いていないことが気になったので、今後はそこを検証したいと思います。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?