前回の内容
今回の内容
今回はActionについてみていきましょう。
参考:http://wiki.ros.org/ja/actionlib_tutorials/Tutorials
Action
actionlib
ここまでTopicやServiceを見てきましたが、ロボットの自律移動の指令など、「長時間かかるのでTopicのように非同期通信にしたいけど、Serviceのように成否は知りたい」というときもあると思います。そんなときactionlibというライブラリを用いれば、これが可能となります。仕組みとしては、指令を出すTopicと結果を返すTopicを扱いやすくまとめてくれています。
Action Messageの作成
ActionをするにはAction Messageというものを定義する必要があります。
まず、ros_beginnerの下にactionというディレクトリを作成し、その中に以下のようなファイルを作成しましょう。
geometry_msgs/Twist target_vel
int32 timeout_sec
---
bool bumper_hit
---
geometry_msgs/Twist current_vel
一番上がGoalと呼ばれる引数のようなもので、目的の値を表します。
真ん中がResultと呼ばれ、その成否を主に返します。
一番下がFeedbackと呼ばれ、途中経過として返したい情報を発行します。
次に、CMakeLists.txtを変更しましょう。
find_packageの中に、geometry_msgsとactionlib_msgsを足します。
60行目あたりのadd_action_filesをコメントアウトし、actionファイルはGoUntilBumper.actionに変更します。
その下のgenerate_messagesの中に、geometry_msgsとactionlib_msgsを追加します。
最後に~/catkin_wsで$ catkin_make
をしてビルドしましょう。
$ ls devel/share/ros_beginner/msg/
とすると、msgファイルがたくさんできているはずです。
これで準備完了です。
ActionServerの作成
#!/usr/bin/env python
import rospy
from geometry_msgs.msg import Twist
from kobuki_msgs.msg import BumperEvent
import actionlib
from ros_beginner.msg import GoUntilBumperAction
from ros_beginner.msg import GoUntilBumperResult
from ros_beginner.msg import GoUntilBumperFeedback
class BumperAction(object):
def __init__(self):
self._pub = rospy.Publisher('/mobile_base/commands/velocity', Twist,
queue_size=10)
self._sub = rospy.Subscriber('/mobile_base/events/bumper',
BumperEvent, self.bumper_callback, queue_size=1)
# Parameterから最大速度を取得
self._max_vel = rospy.get_param('~max_vel', 0.5)
# bumper_actionという名前でGoUntilBumperAction型のActionを作成(self.go_until_bumperが実際の動作)
self._action_server = actionlib.SimpleActionServer(
'bumper_action', GoUntilBumperAction,
execute_cb=self.go_until_bumper, auto_start=False)
# バンパーが当たったかどうか
self._hit_bumper = False
# ActionServerをスタートさせる
self._action_server.start()
# バンパーに当たった時の処理、Subscriberに呼ばれる
def bumper_callback(self, bumper):
self._hit_bumper = True
# ActionServerの実体
def go_until_bumper(self, goal):
print(goal.target_vel)
r = rospy.Rate(10.0)
zero_vel = Twist()
for i in range(10 * goal.timeout_sec):
# 外部から停止指令が来ているかどうか
if self._action_server.is_preempt_requested():
self._action_server.set_preempted()
break
# バンパーにぶつかっているかどうか
if self._hit_bumper:
self._pub.publish(zero_vel)
break
else:
# 目標の速度でロボットを動かす
if goal.target_vel.linear.x > self._max_vel:
goal.target_vel.linear.x = self._max_vel
self._pub.publish(goal.target_vel)
# 現在の速度を途中経過としてFeedbackで返す
feedback = GoUntilBumperFeedback(current_vel=goal.target_vel)
self._action_server.publish_feedback(feedback)
r.sleep()
# 実行結果を返す
result = GoUntilBumperResult(bumper_hit=self._hit_bumper)
self._action_server.set_succeeded(result)
if __name__ == '__main__':
rospy.init_node('bumper_action')
bumper_action = BumperAction()
# 無限ループでActionServerが呼ばれるのを待つ
rospy.spin()
ActionClientの作成
#!/usr/bin/env python
import rospy
import actionlib
from ros_beginner.msg import GoUntilBumperAction
from ros_beginner.msg import GoUntilBumperGoal
def go_until_bumper():
# bumper_actionという名前でGoUntilBumperAction型のActionClientを作成
action_client = actionlib.SimpleActionClient('bumper_action', GoUntilBumperAction)
# ActionServerの準備を待つ
action_client.wait_for_server()
# 目標の値をセットする
goal = GoUntilBumperGoal()
goal.target_vel.linear.x = 0.8
goal.timeout_sec = 10
# ゴールを送る
action_client.send_goal(goal)
# 結果を待つ
action_client.wait_for_result()
# 結果を取得
result = action_client.get_result()
if result.bumper_hit:
rospy.loginfo('bumper hit!!')
else:
rospy.loginfo('failed')
if __name__ == '__main__':
try:
rospy.init_node('bumper_client')
go_until_bumper()
except rospy.ROSInterruptException:
pass
実行
$ roslaunch kobuki_gazebo kobuki_playground.launch
でシミュレーターを立ち上げ、
いつものようにchmodで実行可能にした後、
$ rosrun ros_beginner bumper_action.py
$ rosrun ros_beginner bumper_client.py
を実行しましょう。
ロボットの移動が完了すると、client側に結果が表示されるはずです。
Pythonライブラリにする
今の機能をライブラリにしてみましょう。
そのためにはまず、以下の手順で、~/catkin_ws/src/ros_beginner/src/ros_beginner/を作る必要があります。
$ cd ~/catkin_ws/src/ros_beginner
$ mkdir -p src/ros_beginner
スクリプトをディレクトリ内にコピー
$ cp scripts/bumper_action.py src/ros_beginner/
$ cp scripts/bumper_client.py src/ros_beginner/
空のファイル__init.pyを作成(ライブラリとして認識されるために必要)
$ touch src/ros_beginner/__init__.py
次に、以下の~/catkin_ws/src/ros_beginner/setup.pyを作成します。
from distutils.core import setup
from catkin_pkg.python_setup import generate_distutils_setup
setup_args = generate_distutils_setup(
packages=['ros_beginner'],
package_dir={'': 'src'},
)
setup(**setup_args)
次にCMakeList.txtの23行目あたりのcatkin_python_setup()
をコメントアウトしてください。
最後に~/catkin_wsでcatkin_make
をすれば準備完了です。
ライブラリを使ってActionServerプログラムを書くと、
#!/usr/bin/env python
import rospy
from ros_beginner.bumper_action import BumperAction
if __name__ == '__main__':
rospy.init_node('bumper_action_use_lib')
bumper_action = BumperAction()
rospy.spin()
ActionClientプログラムを書くと、
#!/usr/bin/env python
import rospy
from ros_beginner.bumper_client import go_until_bumper
rospy.init_node('bumper_client_use_lib')
go_until_bumper()
こんなに分かりやすくなりましたね。
まとめ
今回はActionをみていきました。
次回はROSのGUIについてみていきましょう。