#はじめに
twitter:@yukky_saito
ROS C++のチュートリアルが終わって自分でノードが作成できる人(中級者)向けの記事になります。
ROSを触っている人で[チュートリアル]
(http://wiki.ros.org/ja/ROS/Tutorials/WritingPublisherSubscriber%28c%2B%2B%29)の内部がどうのように動いているかを理解している人は少ないように思います。また、内部動作を知らなければadvertise・subscribeメソッド時に指定するバッファサイズがどのような意味を持っているのかわかりません。
ros::Publisher chatter_pub = n.advertise<std_msgs::String>("chatter", 1000);
ros::Subscriber sub = n.subscribe("chatter", 1000, chatterCallback);
PublisherとSubscriberの内部動作を説明し、どのようなバッファサイズがどのように影響するのかなどを踏まえ説明していきます。
#ROSの内部動作
ROSは大きくMiddleware層とApplication層に分けられます。ROSを利用する場合、Middleware層を理解しなくても十分なため、Middleware層の内部資料はほぼないと思います。※この記事もソースコードからロジックを読んでいるので多少間違っているところがあるかもしれません。
ROSノードを立ち上げると、mainスレッドを含め通常は計5つのスレッドが立ち上がります。その中でトピック(Pub/Sub)通信に関連するのはmainスレッドとpoll managerスレッドです。
mainスレッドはROSノードのアプリケーション処理(普段みんなが書いているようなコード)実行を担当し、poll managerスレッドがMiddleware層の通信まわりの処理をバックグラウンドで担当してくれます。
通信の概念としてはトピック通信をしていますが、ノード≒プロセスなのでプロセス間通信をする必要があり、実際はsocket通信で実装され逐次的に各ノードと通信しています。トピック通信の概念では恰も同時に通信しているように見えますが、実際は逐次的にsocket通信を行っています。
#Publisherの内部動作
Publisherが[チュートリアル]
(http://wiki.ros.org/ja/ROS/Tutorials/WritingPublisherSubscriber%28c%2B%2B%29)のtalkerの用にpublishメソッドをコールすると
chatter_pub.publish(msg);
内部でデータをシリアライズし、下記の図のようにpublish queueというリングバッファにシリアライズされたデータが格納されます。
poll managerは常にポーリングをしており、mainスレッドがpublish queueにデータを格納した後(コード)、poll managerにシグナルを発行し(コード)、poll managerスレッドが送信処理をはじめます(コード)。
poll managerスレッドはpublish queueないの最も古いデータを参照し、自身につながっているsubscriberの数だけ用意されたwrite bufferへキューイングします。
その後、各write bufferの中身をエンキューしてsocket通信のsendシステムコールでsubscriberへ送信しています。
この時のpublish queueサイズ(=write bufferサイズ?)が送信バッファになります。基本的に最新のデータを必要とすることが多いのでバッファサイズは1で良いと思います。
データのロストが致命的になる場合やkalman filter, particle filterなど時系列処理がある場合は送信バッファサイズを上げてももいいかもしれません。
#Subscriberの内部動作
Subscriberの内部動作は結構ややこしいです・・・。
Subscribe時は、poll managerがPublisherと繋がっているソケットをポーリングし続け、データが来たらsubscription queueというリングバッファにデータを格納し、internal callback queueにsubscription queueのアドレスを格納します。
重要:この時、subscription queue(リングバッファ)の要素を上書きした場合はinternal callback queueにsubscription queueのアドレスを格納しません。
その後、mainスレッドはros::spin()によってinternal callback queueを監視しておりinternal callback queueの要素をデキューし対応するsubscription queueから最も古いデータを参照してcallback関数を実行します。
subscription queueのサイズが受信バッファになっています。
複数のトピックを購読した場合、一つ問題が発生します。callback関数の処理が間に合わずsubscription queueのサイズをオーバーする(上書き)時、internal callback queueの順序は固定されているにも関わらず、subscription queueの中身は自由に変わってしまうということです。
つまり、上記の図で言う上のsubscription queueに最新のデータがたくさん入っているにも関わらず、下のsubscription queueのデータばかり参照されcallback関数が呼ばれ、スタベーション状態になる可能性があります。
これらを防ぐには受信バッファのサイズをそれぞれ1にすることでsubscription queueのサイズをオーバーする(上書き)場合でも交互に呼ばれるようになり、スタベーションを防げます。
もしくは、internal callback queueという一つのキューでまとめるのではなく別にcallback queueを作成し、callback queueごとに優先度をつけることでも解決できます。こちらのほうが、ディベロッパーとしての自由度は高いです。複数スレッドでそれぞれのcallback queueの監視とかも出来ます。
参考 http://wiki.ros.org/roscpp/Overview/Callbacks%20and%20Spinning