概要
最近 ROS を触れてないのですが、2016年に作ったmqtt_bridge のissueやPRを放置してしまっていたので、12月に入ってから改めてテコ入れしはじめました。python3 対応したいんですが、動作確認環境もままならない環境で手を入れるのもあれなんで、取り急ぎはということで rostest 対応して circleci で動かしました。実装内容はこちら。
工夫したこと
publisher を安定化
ROS の publisher には、インスタンスを生成した直後に publish しても subscriber にデータが届かない問題(仕様)があります。これだとテストを書くのに困ります。以下のように、publisher 生成後に rosmaster の subscriber 数と、publisher に紐づいた subscriber 数が一致するまで待つロジックを入れることで、解決しました。昔、社内向けに工夫していた実装ですが、bitbucketを掘り起こして見つけました。昔の自分、ありがとう!
def get_publisher(self, topic_path, msg_type, **kwargs):
pub = rospy.Publisher(topic_path, msg_type, **kwargs)
num_subs = len(_get_subscribers(topic_path))
for i in range(10):
num_cons = pub.get_num_connections()
if num_cons == num_subs:
return pub
time.sleep(0.1)
raise RuntimeError("failed to get publisher")
def _get_subscribers(self, topic_path):
ros_master = rosgraph.Master('/rostopic')
topic_path = rosgraph.names.script_resolve_name('rostopic', topic_path)
state = ros_master.getSystemState()
subs = []
for sub in state[1]:
if sub[0] == topic_path:
subs.extend(sub[1])
return subs
MagicMock で pubsub をテスト
unittest.mock が python2.7 向けに backport された mock を使って pubsub をテストしました。
ros_callback_ping = MagicMock()
rospy.Subscriber("/ping", Bool, ros_callback_ping)
ros_callback_ping.assert_called_once_with(Bool(True))
みたいなコードを書くことで、 subscriber が Bool(True)
の値で1回だけ呼び出されることをテストできます。
CircleCI で rostest
現職では CI に CircleCI を使っているのですが、ローカル作業用のコマンド circleci が便利です。
ros の docker ファイルを使ったテストを .circleci/config.yml
を書いて、
circleci local execute --job rostest
のように実行すると、ローカルで簡単に rostest を実行できます。
まとめ
久々にROS触ると楽しいですね。次は python3 + noetic 対応です。