3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【ROS2】時間がかかる処理中にSubscribeできない問題(ブロッキング)の解決策

Last updated at Posted at 2025-04-27

概要

ROS/ROS2で多くの人がつまづくポイントとして、

  • TFの座標変換がムズい...クオータ二オンってなんやねん
  • Gazeboのros_controlの設定項目が多すぎてURDFモデルが思うように表示されない
  • 自己位置推定の理論も実装も難解すぎる

等がよく挙げられていますが、個人的に隠れたハマりポイントだと思っているのが、

ノード内の処理のブロッキング

です。

ROSにおけるブロッキングの概要と原因

 単に「ブロッキング」というと、ある時間のかかる処理を待つために、他の処理の動作が阻害されること全般を指し、広い意味を持ちます。
 一方でROS(特にPython)においてユーザーを苦しめるブロッキングの大半は、

時間が掛かる処理でTopicのSubscribe(受信)が阻害される

 ことでしょう。

このような現象が発生する根本原因として、ROS/ROS2がTopicをSubscribeするタイミングが挙げられます。
TopicのようなPub/Sub通信では、受信側のノードがTopicが来ていないかどうかを確認する、ポーリング(polling)と呼ばれる処理が必要となります。ROS1のノードではrospy.spin()、ROS2のノードではrclpy.spin()という関数が実装されていることが多いですが、実はこの関数がポーリングの役割を果たします。もっと具体的に言うと、

全てのSubscriberに関してポーリングを実行して、Topicが来ていればコールバック関数を実行する

という処理をループ実行します。ループ実行ということは、このコールバック関数が全て完了するまで、次のポーリングを実行できないこととなります。
これがROSのSubscriberにおけるブロッキングの正体です。

ブロッキング発生の典型例

このようなブロッキングが発生する典型例として、以下のような機能を持つノードが挙げられるでしょう。

  • 2つのSubscriber(Subscriber AとSubscriber B)を持つ
  • Subscriber Aは時間が掛かる処理をコールバック関数に持つ
  • Subscriber Bはノードのメンバ変数を更新する役割を持つ
  • Subscriber Bで更新されたメンバ変数を元にSubscriber Aのコールバック関数の処理を条件分岐する(下の例では処理を中断している)

ROS2 Humbleでの実装例を以下に示します。

import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import threading
import time

class MyNode(Node):
    def __init__(self):
        super().__init__('my_node')
        self.logger = self.get_logger()

        # Subscriber A(時間のかかる処理をコールバック関数に持つ)
        self.subscription_a = self.create_subscription(
            String, 'topic_a', self.listener_callback_a, 10)

        # Subscriber Bで更新されるメンバ変数
        self.member_variable = None
        # Subscriber B(メンバ変数を更新する)
        self.subscription_b = self.create_subscription(
            String, 'topic_b', self.listener_callback_b, 10)

    def func_time_consuming(self, data):
        """時間のかかるメソッド"""
        self.logger.info(f'Starting long processing for {data}')
        self.member_variable = None
        # 時間のかかる処理をループ実行
        for i in range(10):
            # Subscriber Bで更新されたメンバ変数に応じて処理を変える
            if self.member_variable is not None:
                break
            time.sleep(5)  # 時間のかかる処理を模擬
        self.logger.info(f'Finished processing for {data}')

    def listener_callback_a(self, msg):
        """Subscriber Aのコールバック関数(時間のかかるメソッドを含む)"""
        self.get_logger().info(f'Received on topic_a: {msg.data}')
        # 時間のかかるメソッドを実行
        self.func_time_consuming(msg.data)

    def listener_callback_b(self, msg):
        """Subscriber Bのコールバック関数(メンバ変数を更新)"""
        self.get_logger().info(f'Received on topic_b: {msg.data}')
        self.member_variable = msg.data
        self.get_logger().info(f'Member variable updated: {self.member_variable}')

def main(args=None):
    rclpy.init(args=args)
    node = MyNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

本来であればSubscriber Bにトピックが届いたらすぐにメンバ変数self.member_variableを更新し、Subscriber Aのコールバック関数func_time_consumingの処理条件を変えたいのですが、上のプログラムではfunc_time_consuming自身の実行が終わるまでポーリングが行われないため、self.member_variableが更新されません。よって条件分岐が想定通り機能せず、ブロッキングにより処理が阻害されることが見て取れるでしょう。

解決策

時間が掛かる処理をthreading.Thread()クラスで別スレッドに分けることで、この処理が完了しなくとも次のポーリングが行われるようになります。

・実装例

import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import threading
import time

class MyNode(Node):
    def __init__(self):
        super().__init__('my_node')
        self.logger = self.get_logger()

        # Subscriber A(時間のかかる処理をコールバック関数に持つ)
        self.subscription_a = self.create_subscription(
            String, 'topic_a', self.listener_callback_a, 10)

        # Subscriber Bで更新されるメンバ変数
        self.member_variable = None
        # Subscriber B(メンバ変数を更新する)
        self.subscription_b = self.create_subscription(
            String, 'topic_b', self.listener_callback_b, 10)

    def func_time_consuming(self, data):
        """時間のかかるメソッド"""
        self.logger.info(f'Starting long processing for {data}')
        self.member_variable = None
        # 時間のかかる処理をループ実行
        for i in range(10):
            # Subscriber Bで更新されたメンバ変数に応じて処理を変える
            if self.member_variable is not None:
                break
            time.sleep(5)  # 時間のかかる処理を模擬
        self.logger.info(f'Finished processing for {data}')

    def listener_callback_a(self, msg):
        """Subscriber Aのコールバック関数(時間のかかるメソッドを含む)"""
        self.get_logger().info(f'Received on topic_a: {msg.data}')
        # 時間のかかるメソッドを別スレッドで実行
        thread = threading.Thread(target=self.func_time_consuming, args=(msg.data,))
        thread.start()

    def listener_callback_b(self, msg):
        """Subscriber Bのコールバック関数(メンバ変数を更新)"""
        self.get_logger().info(f'Received on topic_b: {msg.data}')
        self.member_variable = msg.data
        self.get_logger().info(f'Member variable updated: {self.member_variable}')

def main(args=None):
    rclpy.init(args=args)
    node = MyNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

変更した部分は、

self.func_time_consuming(msg.data)

thread = threading.Thread(target=self.func_time_consuming, args=(msg.data,))
thread.start()

と変えただけであり、シンプルな記述でブロッキングを解決できたことが分かります。threading.Thread()を用いることで時間のかかる処理が別スレッドで実行されるため、メインスレッドはコールバック関数の終端まで短時間で到達し、次のポーリングが実行されることとなります。

 なお、上記のユースケースではメンバ変数の代わりにROSパラメータを使用することで、ブロッキングを解決しなくとも他のノードから条件分岐を制御することが可能です。あくまでブロッキングを解除するという観点で、スレッドを分けるという処理が有効であるだけで、同様の機能を実現する方法は他にもあることにご注意ください。

 TopicだけでなくServiceやActionではさらに複雑なブロッキングが発生することがあるため、こちらについても後日別記事で触れようと思います。

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?