3行で
・message_filters.ApproximateTimeSynchronizer
を使え
・メッセージを自分で定義するならHeaderをつけろ
・slopをfpsの逆数を使って定義するときはfpsをfloat型にしないと1/fpsは0になるぞ
モチベーション
ロボットを扱う際、センサーが1つしかついていないということは極稀。
複数のセンサーデータを組み合わせた制御をするのに、複数の測定を同時に行ったとみなしたいことがある。
ROSでは複数のトピックから同期的にメッセージを受け取る機能があるので使ってみる。
概要
(前置き)
・メッセージの定義
・メッセージの記述とpublish
データの入力
タイムスタンプの追加
(本題)
・subscriberでのmessage_filters
の使い方
message_filters.Subscriber
オブジェクトの生成
message_filters.ApproximateTimeSynchronizer
オブジェクトの生成
環境
使用環境 | |
---|---|
OS | ubuntu 16.04 LTS |
ROSバージョン | Kinetic |
コード
以下に本記事で使ったコードがあります。
https://github.com/nagiton/sync_sub
内容
###メッセージの定義
http://wiki.ros.org/ja/ROS/Tutorials/CreatingMsgAndSrv
チュートリアルに詳しいので、手順は省略
動作していてHeaderを含むmsgの例(some_position.msg
)を示すのみにします
Header header
float32[3] position
このメッセージはHeaderとfloat32型の3つの要素をもつ配列からなります。
###メッセージの記述とpublish
http://wiki.ros.org/ja/ROS/Tutorials/WritingPublisherSubscriber%28python%29
チュートリアルに詳しいですが、自作メッセージとHeaderを使うための補足をします。
publisherノードでは送信すべきメッセージの内容を記述してpublishします。
まずは、自作したメッセージの型に関するモジュールをimportします。
msgファイルの作成とCMakeLists.txtとpackage.xmlの編集をしてcatkin_makeすると
自動で自作メッセージ型を扱うモジュールが生成されます。
(/catkin_ws/devel/lib/python2.7/dist-packages/<your package>
を参照)
自作メッセージ型を使う場合、このモジュールをimportします。
この例ではsync_sub
パッケージでsome_position
型を定義したので
Header header
from sync_sub.msg import some_position
と書きます。
メッセージの内容を記述するには、まず自作メッセージ型のコンストラクタでオブジェクトを作ります。
msg = some_position()
そのあとで、メンバ変数に所望の値を入力していきます。
あとで同期に使うのでHeaderにかならずタイムスタンプを入れましょう。
これはrospy.Time.now()
を使って実行できます
#rospyでheaderにタイムスタンプを入れる
msg.header.stamp = rospy.Time.now()
#適当なデータ
msg.position = [1,2,3]
こうしておけばpub.publish(msg)
でpublishできます。
###subscriberでのmessage_filters
の使い方
subscriber側ですが、rospy.init_node
したあと、message_filters.Subscriber
オブジェクトをsubscribeしたいトピックの数だけ作ります。
sub1 = message_filters.Subscriber('chatter1', some_position)
sub2 = message_filters.Subscriber('chatter2', some_position)
あとでApproximateTimeSynchronizer
で使うために、同期のズレを何秒まで許すかのパラメータを設定しておきます。
fps = 100. #fpsが整数だと、1/fpsをpython2が評価すると0になってしまう(整数同士の除算は切り捨て除算)
delay = 1/fps*0.5
ROSはpython2で、python2では整数同士の除算は切り捨て除算なのでfpsの逆数を使ってこのパラメータを決める場合、必ずfpsは明示的にfloat型にしましょう。
次にmessage_filters.ApproximateTimeSynchronizer
型のオブジェクトを作り、メンバ関数のregisterCallback
を呼びます。
ts = message_filters.ApproximateTimeSynchronizer([sub1,sub2], 10, delay)
ts.registerCallback(callback)
message_filters.ApproximateTimeSynchronizer
コンストラクタには同期をとるSubscriber
のリストと、queue_sizeと同期のズレとして許容する時間(単位:秒)を渡します。
message_filters.ApproximateTimeSynchronizer
に4つ目の変数としてallow_headerless
を渡せます。
allow_headerless=True
にするとHeaderがなくても同期を取ろうとしてくれるようですが、
"You should avoid this as much as you can, since the delays are unpredictable."
ということで非推奨です。(試してない)
こうすると、sub1とsub2にdelay秒以内の差でメッセージがpublishされたときのみcallback関数を呼び出す処理が行われます。
当然callback関数ではsub1とsub2にpublishされている2つのメッセージを受け取ります。
例えば
def callback(msg1, msg2):
# do something
position1 = msg1.position
position2 = msg2.position
out = [position1, position2]
print(out)
とかやれば、2つのメッセージのpositionに格納されてるリストのリストが出力されます。
ここで複数のセンサーデータを使った処理を書いていけばいいでしょう。