概要
普段から ros::AsyncSpinner を使えばいいんではなかろうか.
基本的には公式Wikiのこの話を読んでください.
追記
単純に一定時間のループを実装したい場合には,下記の例ではなくて,ROSのTimerを使ったほうが素直で簡単に実装できることがわかりました.
便利なのでそちらも試してみるのをおすすめします.というか,これこそ標準の手法としてチュートリアルで書いてあるべき方法なのでは…
ros::spin と spinOnce
よくある例
チュートリアルとかでよく見かけるROSのコードってこんな感じです.
初期化とか
ros::Rate rate(50);
while(ros::ok()){
制御とかの処理
ros::spinOnce();
rate.sleep();
}
Topicのコールバック関数でイベントドリブンに動くプログラムの場合は,while文とspinOnceではなく,ros::spin()
関数だけでブロッキングしてます.
このspinなんちゃらという関数を実行すると,コールバックとかが処理されます.
spinを使ってると,ほぼ即時にコールバックが呼ばれるのですが,spinOnceでは,関数実行時のキューを読み,キューがあればコールバックが呼ばれ,キューがなければコールバックが呼ばれません.
なので,spinOnceの実行周期がそのままコールバック関数の呼ばれる周期になってしまいます.
キューを大きくしておけば,Topicの周期よりspinOnceの周期が遅くてもまとめて処理することができます.
ただ,spinOnceしてから実際にコールバックが呼ばれるまでには(短い)遅延があるため,spinOnceしてから制御とかの処理をするようにしてしまうと,場合によっては制御の途中でコールバックが呼ばれ,参照している値が変わってしまう場合があり,それを気にしているためか標準では上記のようにspinOnceをしてからsleepするような構成が勧められているようです.
この例の問題
が,sleepが大きく,かつ購読しているTopicの更新周期と近い周期だったりすると,プログラムの実行開始タイミングによっては周期と同じだけの遅延が残り続ける場合が発生します.
10Hzで更新されているTopicを10HzでspinOnceすると,最大で0.1秒の遅延が残り続けることになってしまいます.
これが一段なら良いですが,ROSの場合は多段にTopicを読んでは投げてをしていく構成をとることが多く,どんどんと遅延がかさんでしまうことがありうるということです.
GPSみたいに1Hzでしかデータは来ないけど,即時処理してほしい,みたいなものだと特に困っちゃいます.
かと言って,無駄にsleepを小さくしてspinOnceを呼ぶ回数を増やすのもCPUリソース的にちょっとなぁと思ってました.
んで,見つけたのが冒頭のAsyncSpinnerです.
ros::AsyncSpinner を使ってみる
AsyncSpinnerは,指定した数のスレッドを立てて,そこでspinしてくれる道具です.
マルチスレッドプログラムになってしまうため,排他制御が必要となる点が最大の難点です.
ですが,従来のサンプルにあるような簡単な例であれば,排他制御も簡単にできるので,難しいことを考えずに今までと同じようなノリでプログラムがかけると思います.
例えば,繰り返しヘッダにタイムスタンプを与えて,publishし,自分自身で読み取るというプログラムですが,排他制御は各コールバック,ループの先頭で lock_guard を作っているだけです.
# include <ros/ros.h>
# include <std_msgs/Header.h>
# include <iostream>
# include <mutex>
std::mutex m;
void head_cb1(const std_msgs::HeaderConstPtr& msg){
std::lock_guard<std::mutex> lock(m); //ロックする
std::printf("Received1 : %d %f %f\r\n", msg->seq, msg->stamp.toSec(), ros::Time::now().toSec() - msg->stamp.toSec());
ROS_INFO("Received1 : %d %f %f", msg->seq, msg->stamp.toSec(), ros::Time::now().toSec() - msg->stamp.toSec());
}
void head_cb2(const std_msgs::HeaderConstPtr& msg){
std::lock_guard<std::mutex> lock(m); //ロックする
std::printf("Received2 : %d %f %f\r\n", msg->seq, msg->stamp.toSec(), ros::Time::now().toSec() - msg->stamp.toSec());
ROS_INFO("Received2 : %d %f %f", msg->seq, msg->stamp.toSec(), ros::Time::now().toSec() - msg->stamp.toSec());
}
int main (int argc, char** argv){
ros::init(argc, argv, "spin_study");
ros::NodeHandle nh("~");
ros::Publisher pub;
ros::Subscriber sub1;
ros::Subscriber sub2;
pub = nh.advertise<std_msgs::Header>("/header", 10);
sub1 = nh.subscribe("/header",10,head_cb1);
sub2 = nh.subscribe("/header",10,head_cb2);
ros::AsyncSpinner spinner(1); //spinを処理するスレッド数を引数に渡す
spinner.start();
ros::Rate rate(1);
while(ros::ok()){
{
// 排他制御のためにスコープを追加
std::lock_guard<std::mutex> lock(m);
std_msgs::Header header;
header.seq = 0;
header.stamp = ros::Time::now();
ROS_INFO("publish() : %d %f", header.seq, header.stamp.toSec());
pub.publish(header);
header.seq = 1;
header.stamp = ros::Time::now();
ROS_INFO("publish() : %d %f", header.seq, header.stamp.toSec());
pub.publish(header);
}
//AsyncSpinnerを使わない場合は↓が必要
//ros::spinOnce();
ROS_INFO("sleep()");
rate.sleep();
}
spinner.stop();
return 0;
}
std::lock_guard は, mutex の unlock し忘れを避けることができるので,より気楽に使えます.
普段マルチスレッドプログラムを書かない人でも,手っ取り早く使えるんでいいんじゃないでしょうか.
lock_guardは,コンストラクタが呼ばれた段階で引数で渡されたオブジェクトを利用して排他制御のロックをしてくれます.
デストラクタが呼ばれると,ロックを解除してくれます.
なので,上記の例では,コールバック関数内はlock_guardが有効で在り続けるため,他のコールバックは実行されません.
関数実行が終わると,スコープ外になりデストラクタがよばれ,ロックが解除されるため,他の関数が実行されます.
このようにすることで,ひとつのオブジェクトを複数で書きかえることがないようにしています.
メイン関数内のwhile分も,制御する部分については一貫して同じ入力値を保っていたほうが良い場合が多いので,制御部分だけスコープを追加し,そのスコープ内でlock_guardを作っています.
こうすれば,rate.sleep()している間やwhileの条件判断をしている間などはコールバックが呼ばれた時に処理され,制御実行中はコールバックが呼ばれても処理が進まないようになります.
このサンプルも,spinOnceのコメントを外して,spinnerのstartとstopをコメントアウトすれば通常の例と同じ動作になるので,表示される時間を眺めてみて違いを追いかけてみるとわかりやすいかと思います.
スレッド数を増やして,コールバックのlock_guardをコメントアウトしてみると,排他制御がなくなり,片方のコールバック関数内のprintfとROS_INFOの間でもう片方のコールバックが動いたりするのがわかると思います.
まとめ
はやくAsyncSpinnerの存在を知りたかった…
これだけえらそうなこと書いてますが,自分はあんまりマルチスレッドプログラムを書かないので,もっといい方法とかあったら教えてください.