環境
この記事は以下の環境で動いています。
項目 | 値 |
---|---|
CPU | Core i5-8250U |
Ubuntu | 16.04 |
ROS | Kinetic |
インストールについてはROS講座02 インストールを参照してください。
またこの記事のプログラムはgithubにアップロードされています。ROS講座11 gitリポジトリを参照してください。
概要
smachは状態遷移をベースにハイレベルなタスクの実行を行うためのpythonライブラリです。状態遷移をGUIで表示するなど便利な機能もあります。
このライブラリの使い方を解説していきます。
インストール
sudo apt install ros-noetic-smach-ros ros-noetic-smach-viewer
例1 基本の使い方
基本的な使い方を説明します。
ソースコード
smachの基本はStateです。各Stateは処理(excute()
)を行い1つの結果(outcomes
)を出します。
StateMachineはそれ自体も1つの結果(outcomes
)を出します。しかしStateMachineは内部にStateを持ち、そのStateの結果でほかのStateに遷移するか結果を出力をするかの接続をします。StateMachineのexecute()
を実行すると内部の登録したStateのexecute()
が順番に実行されます。
#!/usr/bin/env python3
import rospy
import smach
class State1(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['done','exit'])
self.counter = 0
def execute(self, userdata):
rospy.loginfo('Executing state STATE1')
rospy.sleep(2.0)
if self.counter < 3:
self.counter += 1
return 'done'
else:
return 'exit'
class State2(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['done'])
def execute(self, userdata):
rospy.loginfo('Executing state STATE2')
rospy.sleep(2.0)
return 'done'
def main():
rospy.init_node('smach_somple1')
sm_top = smach.StateMachine(outcomes=['succeeded'])
with sm_top:
smach.StateMachine.add('STATE1', State1(), transitions={'done':'STATE2', 'exit':'succeeded'})
smach.StateMachine.add('STATE2', State2(), transitions={'done':'STATE1'})
outcome = sm_top.execute()
if __name__ == '__main__':
main()
- 2つのStateを定義しています。
- State1 結果はdoneとexitがある。
- State2 結果はdoneがある。
- どちらも処理の中身はwaitのみ
- sm_topというStateMachineを作成、結果はsucceededのみ
- State1とState2があり
- State1のdone -> State2に移行
- Staet1のexit -> succeededに移行
- State2のdone -> State1に移行
- State1とState2があり
実行
roscore
rosrun py_lecture smach_simple1.py
実行すると以下のように順に状態が遷移していくのが分かります。
[INFO] [1585988828.285383]: Executing state STATE1
[ INFO ] : State machine transitioning 'STATE1':'done'-->'STATE2'
[INFO] [1585988830.287671]: Executing state STATE2
[ INFO ] : State machine transitioning 'STATE2':'done'-->'STATE1'
[INFO] [1585988832.292678]: Executing state STATE1
[ INFO ] : State machine transitioning 'STATE1':'done'-->'STATE2'
[INFO] [1585988834.297569]: Executing state STATE2
[ INFO ] : State machine transitioning 'STATE2':'done'-->'STATE1'
[INFO] [1585988836.301891]: Executing state STATE1
[ INFO ] : State machine transitioning 'STATE1':'done'-->'STATE2'
[INFO] [1585988838.308503]: Executing state STATE2
[ INFO ] : State machine transitioning 'STATE2':'done'-->'STATE1'
[INFO] [1585988840.314152]: Executing state STATE1
[ INFO ] : State machine terminating 'STATE1':'exit':'succeeded'
例2 StateMachineの階層とIntrospection
StateMachineは階層構造を持つことができます。またIntrospectionという機能を使うとStateMachineの状況をGUIで確認することができます。
ソースコード
先ほどの例ではStateMAchineはStateのみをaddしていましたが、StateMachineをaddすることもできます。
#!/usr/bin/env python
import rospy
import smach
import smach_ros
class State1(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['done','exit'])
self.counter = 0
def execute(self, userdata):
rospy.loginfo('Executing state STATE1')
rospy.sleep(2.0)
if self.counter < 3:
self.counter += 1
return 'done'
else:
return 'exit'
class State2(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['done'])
def execute(self, userdata):
rospy.loginfo('Executing state STATE2')
rospy.sleep(2.0)
return 'done'
class State3(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['done'])
def execute(self, userdata):
rospy.loginfo('Executing state STATE2')
rospy.sleep(2.0)
return 'done'
# main
def main():
rospy.init_node('smach_somple2')
sm_top = smach.StateMachine(outcomes=['succeeded'])
with sm_top:
smach.StateMachine.add('STATE3', State3(), transitions={'done':'SUB'})
sm_sub = smach.StateMachine(outcomes=['done'])
with sm_sub:
smach.StateMachine.add('STATE1', State1(), transitions={'done':'STATE2', 'exit':'done'})
smach.StateMachine.add('STATE2', State2(), transitions={'done':'STATE1'})
smach.StateMachine.add('SUB', sm_sub, transitions={'done':'succeeded'})
sis = smach_ros.IntrospectionServer('smach_server', sm_top, '/SM_ROOT')
sis.start()
outcome = sm_top.execute()
rospy.spin()
sis.stop()
if __name__ == '__main__':
main()
- 上位のStateMachine(sm_top)にはState3と下位のステートマシン("SUB": sm_sub)があります。
- 下位のStateMachine(sm_sub)にはState1とState2があります。
- 上位のStateMAchineの遷移で下位のStateMachineに遷移することはできません。例えばState3からSUBに遷移することはできますが、State2に遷移するなどということはできません。
- 'smach_ros.IntrospectionServer()'を宣言すると、smach_viewerで状態を確認できます。
実行
roscore
rosrun py_lecture smach_simple1.py
rosrun smach_viewer smach_viewer.py
以下のような画面が表示されます。
例3 複雑なタスクの実行
現実的なタスクを実行するにはsmachのROSノードからほかのROSノードに命令をすることが必要です。ここではactionlibを使ってmove_baseに指令を送ります。
また実際のタスクを実行すると途中で外部からの指示で中止を送れることが必要になります。
ソースコード
#!/usr/bin/env python
import rospy
import smach
import smach_ros
from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal
from std_msgs.msg import Empty
# define state Foo
class MoveBaseState(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['outcome1','outcome2'])
self.counter = 0
def execute(self, userdata):
rospy.loginfo('Executing state FOO')
if self.counter < 3:
self.counter += 1
return 'outcome1'
else:
return 'outcome2'
# define state Bar
class Bar(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['outcome1'])
def execute(self, userdata):
rospy.loginfo('Executing state BAR')
return 'outcome1'
def child_term_cb(outcome_map):
print("child_term_cb")
if outcome_map['MOVE2'] == 'succeeded':
return True
if outcome_map['MONITOR2']:
return True
return False
def out_cb(outcome_map):
print("out_cb")
if outcome_map['MOVE2'] == 'succeeded':
return 'done'
else:
return 'exit'
def monitor_cb(ud, msg):
print("monitor_cb")
return False
def main():
rospy.init_node('smach_example_state_machine')
# Create the top level SMACH state machine
sm_top = smach.StateMachine(outcomes=['done', 'exit'])
# Open the container
with sm_top:
goal1=MoveBaseGoal()
goal1.target_pose.header.frame_id = "dtw_robot1/map"
goal1.target_pose.pose.position.x = 0.5
goal1.target_pose.pose.orientation.w = 1.0
smach.StateMachine.add('MOVE1', smach_ros.SimpleActionState('/dtw_robot1/move_base', MoveBaseAction, goal=goal1), transitions={'succeeded':'TASK2', 'preempted':'done', 'aborted':'done'})
task2_concurrence = smach.Concurrence(outcomes=['done', 'exit'], default_outcome='done', child_termination_cb = child_term_cb, outcome_cb = out_cb)
with task2_concurrence:
goal2=MoveBaseGoal()
goal2.target_pose.header.frame_id = "dtw_robot1/map"
goal2.target_pose.pose.position.x = -0.5
goal2.target_pose.pose.orientation.w = 1.0
smach.Concurrence.add('MOVE2', smach_ros.SimpleActionState('/dtw_robot1/move_base', MoveBaseAction, goal=goal2))
smach.Concurrence.add('MONITOR2', smach_ros.MonitorState("/sm_stop", Empty, monitor_cb))
smach.StateMachine.add('TASK2', task2_concurrence, transitions={'done':'done', 'exit':'exit'})
# Execute SMACH plan
sis = smach_ros.IntrospectionServer('smach_server', sm_top, '/SM_ROOT')
sis.start()
outcome = sm_top.execute()
rospy.spin()
sis.stop()
if __name__ == '__main__':
main()
実行
roslaunch nav_lecture move_base.launch
rosrun smach_viewer smach_viewer.py
rosrun py_lecture smach_task.py
上記を普通に実行しますと以下のようにTASK1とTASK2の2つのタスクを順次実行します。
TASK2の実行途中で以下のコマンドを打つとTASK2を中止します。
rostopic pub -1 /sm_stop std_msgs/Empty "{}"