概要
ROS/ROS2で多くの人がつまづくポイントとして、
- TFの座標変換がムズい...クオータ二オンってなんやねん
- Gazeboのros_controlの設定項目が多すぎてURDFモデルが思うように表示されない
- 自己位置推定の理論も実装も難解すぎる
等がよく挙げられていますが、個人的に隠れたハマりポイントだと思っているのが、
ノード内の処理のブロッキング
です。
ROSにおけるブロッキングの概要と原因
単に「ブロッキング」というと、ある時間のかかる処理を待つために、他の処理の動作が阻害されること全般を指し、広い意味を持ちます。
一方でROS(特にPython)においてユーザーを苦しめるブロッキングの大半は、
時間が掛かる処理でTopicのSubscribe(受信)が阻害される
ことでしょう。
このような現象が発生する根本原因として、ROS/ROS2がTopicをSubscribeするタイミングが挙げられます。
TopicのようなPub/Sub通信では、受信側のノードがTopicが来ていないかどうかを確認する、ポーリング(polling)と呼ばれる処理が必要となります。ROS1のノードではrospy.spin()
、ROS2のノードではrclpy.spin()
という関数が実装されていることが多いですが、実はこの関数がポーリングの役割を果たします。もっと具体的に言うと、
全てのSubscriberに関してポーリングを実行して、Topicが来ていればコールバック関数を実行する
という処理をループ実行します。ループ実行ということは、このコールバック関数が全て完了するまで、次のポーリングを実行できないこととなります。
これがROSのSubscriberにおけるブロッキングの正体です。
ブロッキング発生の典型例
このようなブロッキングが発生する典型例として、以下のような機能を持つノードが挙げられるでしょう。
- 2つのSubscriber(Subscriber AとSubscriber B)を持つ
- Subscriber Aは時間が掛かる処理をコールバック関数に持つ
- Subscriber Bはノードのメンバ変数を更新する役割を持つ
- Subscriber Bで更新されたメンバ変数を元にSubscriber Aのコールバック関数の処理を条件分岐する(下の例では処理を中断している)
ROS2 Humbleでの実装例を以下に示します。
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import threading
import time
class MyNode(Node):
def __init__(self):
super().__init__('my_node')
self.logger = self.get_logger()
# Subscriber A(時間のかかる処理をコールバック関数に持つ)
self.subscription_a = self.create_subscription(
String, 'topic_a', self.listener_callback_a, 10)
# Subscriber Bで更新されるメンバ変数
self.member_variable = None
# Subscriber B(メンバ変数を更新する)
self.subscription_b = self.create_subscription(
String, 'topic_b', self.listener_callback_b, 10)
def func_time_consuming(self, data):
"""時間のかかるメソッド"""
self.logger.info(f'Starting long processing for {data}')
self.member_variable = None
# 時間のかかる処理をループ実行
for i in range(10):
# Subscriber Bで更新されたメンバ変数に応じて処理を変える
if self.member_variable is not None:
break
time.sleep(5) # 時間のかかる処理を模擬
self.logger.info(f'Finished processing for {data}')
def listener_callback_a(self, msg):
"""Subscriber Aのコールバック関数(時間のかかるメソッドを含む)"""
self.get_logger().info(f'Received on topic_a: {msg.data}')
# 時間のかかるメソッドを実行
self.func_time_consuming(msg.data)
def listener_callback_b(self, msg):
"""Subscriber Bのコールバック関数(メンバ変数を更新)"""
self.get_logger().info(f'Received on topic_b: {msg.data}')
self.member_variable = msg.data
self.get_logger().info(f'Member variable updated: {self.member_variable}')
def main(args=None):
rclpy.init(args=args)
node = MyNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
本来であればSubscriber Bにトピックが届いたらすぐにメンバ変数self.member_variable
を更新し、Subscriber Aのコールバック関数func_time_consuming
の処理条件を変えたいのですが、上のプログラムではfunc_time_consuming
自身の実行が終わるまでポーリングが行われないため、self.member_variable
が更新されません。よって条件分岐が想定通り機能せず、ブロッキングにより処理が阻害されることが見て取れるでしょう。
解決策
時間が掛かる処理をthreading.Thread()
クラスで別スレッドに分けることで、この処理が完了しなくとも次のポーリングが行われるようになります。
・実装例
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import threading
import time
class MyNode(Node):
def __init__(self):
super().__init__('my_node')
self.logger = self.get_logger()
# Subscriber A(時間のかかる処理をコールバック関数に持つ)
self.subscription_a = self.create_subscription(
String, 'topic_a', self.listener_callback_a, 10)
# Subscriber Bで更新されるメンバ変数
self.member_variable = None
# Subscriber B(メンバ変数を更新する)
self.subscription_b = self.create_subscription(
String, 'topic_b', self.listener_callback_b, 10)
def func_time_consuming(self, data):
"""時間のかかるメソッド"""
self.logger.info(f'Starting long processing for {data}')
self.member_variable = None
# 時間のかかる処理をループ実行
for i in range(10):
# Subscriber Bで更新されたメンバ変数に応じて処理を変える
if self.member_variable is not None:
break
time.sleep(5) # 時間のかかる処理を模擬
self.logger.info(f'Finished processing for {data}')
def listener_callback_a(self, msg):
"""Subscriber Aのコールバック関数(時間のかかるメソッドを含む)"""
self.get_logger().info(f'Received on topic_a: {msg.data}')
# 時間のかかるメソッドを別スレッドで実行
thread = threading.Thread(target=self.func_time_consuming, args=(msg.data,))
thread.start()
def listener_callback_b(self, msg):
"""Subscriber Bのコールバック関数(メンバ変数を更新)"""
self.get_logger().info(f'Received on topic_b: {msg.data}')
self.member_variable = msg.data
self.get_logger().info(f'Member variable updated: {self.member_variable}')
def main(args=None):
rclpy.init(args=args)
node = MyNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
変更した部分は、
self.func_time_consuming(msg.data)
を
thread = threading.Thread(target=self.func_time_consuming, args=(msg.data,))
thread.start()
と変えただけであり、シンプルな記述でブロッキングを解決できたことが分かります。threading.Thread()
を用いることで時間のかかる処理が別スレッドで実行されるため、メインスレッドはコールバック関数の終端まで短時間で到達し、次のポーリングが実行されることとなります。
なお、上記のユースケースではメンバ変数の代わりにROSパラメータを使用することで、ブロッキングを解決しなくとも他のノードから条件分岐を制御することが可能です。あくまでブロッキングを解除するという観点で、スレッドを分けるという処理が有効であるだけで、同様の機能を実現する方法は他にもあることにご注意ください。
TopicだけでなくServiceやActionではさらに複雑なブロッキングが発生することがあるため、こちらについても後日別記事で触れようと思います。