Help us understand the problem. What is going on with this article?

ROSのトピック通信におけるPublisherとSubscriberの内部動作(概要)

More than 3 years have passed since last update.

はじめに

twitter:@yukky_saito

ROS C++のチュートリアルが終わって自分でノードが作成できる人(中級者)向けの記事になります。

ROSを触っている人でチュートリアルの内部がどうのように動いているかを理解している人は少ないように思います。また、内部動作を知らなければadvertise・subscribeメソッド時に指定するバッファサイズがどのような意味を持っているのかわかりません。

ros::Publisher chatter_pub = n.advertise<std_msgs::String>("chatter", 1000);
ros::Subscriber sub = n.subscribe("chatter", 1000, chatterCallback);

PublisherとSubscriberの内部動作を説明し、どのようなバッファサイズがどのように影響するのかなどを踏まえ説明していきます。

ROSの内部動作

ROSは大きくMiddleware層とApplication層に分けられます。ROSを利用する場合、Middleware層を理解しなくても十分なため、Middleware層の内部資料はほぼないと思います。※この記事もソースコードからロジックを読んでいるので多少間違っているところがあるかもしれません。
image.png

ROSノードを立ち上げると、mainスレッドを含め通常は計5つのスレッドが立ち上がります。その中でトピック(Pub/Sub)通信に関連するのはmainスレッドとpoll managerスレッドです。
mainスレッドはROSノードのアプリケーション処理(普段みんなが書いているようなコード)実行を担当し、poll managerスレッドがMiddleware層の通信まわりの処理をバックグラウンドで担当してくれます。
通信の概念としてはトピック通信をしていますが、ノード≒プロセスなのでプロセス間通信をする必要があり、実際はsocket通信で実装され逐次的に各ノードと通信しています。トピック通信の概念では恰も同時に通信しているように見えますが、実際は逐次的にsocket通信を行っています。

Publisherの内部動作

Publisherがチュートリアルのtalkerの用にpublishメソッドをコールすると

chatter_pub.publish(msg);

内部でデータをシリアライズし、下記の図のようにpublish queueというリングバッファにシリアライズされたデータが格納されます。
poll managerは常にポーリングをしており、mainスレッドがpublish queueにデータを格納した後(コード)、poll managerにシグナルを発行し(コード)、poll managerスレッドが送信処理をはじめます(コード)
poll managerスレッドはpublish queueないの最も古いデータを参照し、自身につながっているsubscriberの数だけ用意されたwrite bufferへキューイングします。
その後、各write bufferの中身をエンキューしてsocket通信のsendシステムコールでsubscriberへ送信しています。

image.png
※図は結構簡略化しています

この時のpublish queueサイズ(=write bufferサイズ?)が送信バッファになります。基本的に最新のデータを必要とすることが多いのでバッファサイズは1で良いと思います。
データのロストが致命的になる場合やkalman filter, particle filterなど時系列処理がある場合は送信バッファサイズを上げてももいいかもしれません。

Subscriberの内部動作

Subscriberの内部動作は結構ややこしいです・・・。

Subscribe時は、poll managerがPublisherと繋がっているソケットをポーリングし続け、データが来たらsubscription queueというリングバッファにデータを格納し、internal callback queueにsubscription queueのアドレスを格納します。

重要:この時、subscription queue(リングバッファ)の要素を上書きした場合はinternal callback queueにsubscription queueのアドレスを格納しません。

その後、mainスレッドはros::spin()によってinternal callback queueを監視しておりinternal callback queueの要素をデキューし対応するsubscription queueから最も古いデータを参照してcallback関数を実行します。

image.png

subscription queueのサイズが受信バッファになっています。
複数のトピックを購読した場合、一つ問題が発生します。callback関数の処理が間に合わずsubscription queueのサイズをオーバーする(上書き)時、internal callback queueの順序は固定されているにも関わらず、subscription queueの中身は自由に変わってしまうということです。
つまり、上記の図で言う上のsubscription queueに最新のデータがたくさん入っているにも関わらず、下のsubscription queueのデータばかり参照されcallback関数が呼ばれ、スタベーション状態になる可能性があります。

これらを防ぐには受信バッファのサイズをそれぞれ1にすることでsubscription queueのサイズをオーバーする(上書き)場合でも交互に呼ばれるようになり、スタベーションを防げます。
もしくは、internal callback queueという一つのキューでまとめるのではなく別にcallback queueを作成し、callback queueごとに優先度をつけることでも解決できます。こちらのほうが、ディベロッパーとしての自由度は高いです。複数スレッドでそれぞれのcallback queueの監視とかも出来ます。
参考 http://wiki.ros.org/roscpp/Overview/Callbacks%20and%20Spinning

yukkysaito
twitter:@yukky_saito github:@yukkysaito
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした