LoginSignup
12
8

More than 1 year has passed since last update.

ROS講座113 smachでタスクを実行する

Last updated at Posted at 2020-04-05

環境

この記事は以下の環境で動いています。

項目
CPU Core i5-8250U
Ubuntu 16.04
ROS Kinetic

インストールについてはROS講座02 インストールを参照してください。
またこの記事のプログラムはgithubにアップロードされています。ROS講座11 gitリポジトリを参照してください。

概要

smachは状態遷移をベースにハイレベルなタスクの実行を行うためのpythonライブラリです。状態遷移をGUIで表示するなど便利な機能もあります。
このライブラリの使い方を解説していきます。

インストール

smachのインストール
sudo apt install ros-noetic-smach-ros ros-noetic-smach-viewer 

例1 基本の使い方

基本的な使い方を説明します。

ソースコード

smachの基本はStateです。各Stateは処理(excute())を行い1つの結果(outcomes)を出します。
StateMachineはそれ自体も1つの結果(outcomes)を出します。しかしStateMachineは内部にStateを持ち、そのStateの結果でほかのStateに遷移するか結果を出力をするかの接続をします。StateMachineのexecute()を実行すると内部の登録したStateのexecute()が順番に実行されます。

py_lecture/scripts/smach_simple1.py
#!/usr/bin/env python3

import rospy
import smach

class State1(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['done','exit'])
        self.counter = 0

    def execute(self, userdata):
        rospy.loginfo('Executing state STATE1')
        rospy.sleep(2.0)
        if self.counter < 3:
            self.counter += 1
            return 'done'
        else:
            return 'exit'

class State2(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['done'])

    def execute(self, userdata):
        rospy.loginfo('Executing state STATE2')
        rospy.sleep(2.0)
        return 'done'

def main():
    rospy.init_node('smach_somple1')

    sm_top = smach.StateMachine(outcomes=['succeeded'])
    with sm_top:
        smach.StateMachine.add('STATE1', State1(), transitions={'done':'STATE2', 'exit':'succeeded'})
        smach.StateMachine.add('STATE2', State2(), transitions={'done':'STATE1'})

    outcome = sm_top.execute()

if __name__ == '__main__':
    main()
  • 2つのStateを定義しています。
    • State1 結果はdoneとexitがある。
    • State2 結果はdoneがある。
    • どちらも処理の中身はwaitのみ
  • sm_topというStateMachineを作成、結果はsucceededのみ
    • State1とState2があり
      • State1のdone -> State2に移行
      • Staet1のexit -> succeededに移行
      • State2のdone -> State1に移行

実行

1つ目のターミナル
roscore
2つ目のターミナル
rosrun py_lecture smach_simple1.py

実行すると以下のように順に状態が遷移していくのが分かります。

出力
[INFO] [1585988828.285383]: Executing state STATE1
[  INFO ] : State machine transitioning 'STATE1':'done'-->'STATE2'
[INFO] [1585988830.287671]: Executing state STATE2
[  INFO ] : State machine transitioning 'STATE2':'done'-->'STATE1'
[INFO] [1585988832.292678]: Executing state STATE1
[  INFO ] : State machine transitioning 'STATE1':'done'-->'STATE2'
[INFO] [1585988834.297569]: Executing state STATE2
[  INFO ] : State machine transitioning 'STATE2':'done'-->'STATE1'
[INFO] [1585988836.301891]: Executing state STATE1
[  INFO ] : State machine transitioning 'STATE1':'done'-->'STATE2'
[INFO] [1585988838.308503]: Executing state STATE2
[  INFO ] : State machine transitioning 'STATE2':'done'-->'STATE1'
[INFO] [1585988840.314152]: Executing state STATE1
[  INFO ] : State machine terminating 'STATE1':'exit':'succeeded'

例2 StateMachineの階層とIntrospection

StateMachineは階層構造を持つことができます。またIntrospectionという機能を使うとStateMachineの状況をGUIで確認することができます。

ソースコード

先ほどの例ではStateMAchineはStateのみをaddしていましたが、StateMachineをaddすることもできます。

py_lecture/scripts/smach_simple2.py
#!/usr/bin/env python

import rospy
import smach
import smach_ros

class State1(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['done','exit'])
        self.counter = 0

    def execute(self, userdata):
        rospy.loginfo('Executing state STATE1')
        rospy.sleep(2.0)
        if self.counter < 3:
            self.counter += 1
            return 'done'
        else:
            return 'exit'

class State2(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['done'])

    def execute(self, userdata):
        rospy.loginfo('Executing state STATE2')
        rospy.sleep(2.0)
        return 'done'

class State3(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['done'])

    def execute(self, userdata):
        rospy.loginfo('Executing state STATE2')
        rospy.sleep(2.0)
        return 'done'

# main
def main():
    rospy.init_node('smach_somple2')

    sm_top = smach.StateMachine(outcomes=['succeeded'])
    with sm_top:
        smach.StateMachine.add('STATE3', State3(), transitions={'done':'SUB'})

        sm_sub = smach.StateMachine(outcomes=['done'])
        with sm_sub:
            smach.StateMachine.add('STATE1', State1(), transitions={'done':'STATE2', 'exit':'done'})
            smach.StateMachine.add('STATE2', State2(), transitions={'done':'STATE1'})

        smach.StateMachine.add('SUB', sm_sub, transitions={'done':'succeeded'})

    sis = smach_ros.IntrospectionServer('smach_server', sm_top, '/SM_ROOT')
    sis.start()
    outcome = sm_top.execute()
    rospy.spin()
    sis.stop()

if __name__ == '__main__':
    main()
  • 上位のStateMachine(sm_top)にはState3と下位のステートマシン("SUB": sm_sub)があります。
  • 下位のStateMachine(sm_sub)にはState1とState2があります。
  • 上位のStateMAchineの遷移で下位のStateMachineに遷移することはできません。例えばState3からSUBに遷移することはできますが、State2に遷移するなどということはできません。
  • 'smach_ros.IntrospectionServer()'を宣言すると、smach_viewerで状態を確認できます。

実行

1つ目のターミナル
roscore
2つ目のターミナル
rosrun py_lecture smach_simple1.py
3つ目のターミナル
rosrun smach_viewer smach_viewer.py 

以下のような画面が表示されます。

smach2.gif

例3 複雑なタスクの実行

現実的なタスクを実行するにはsmachのROSノードからほかのROSノードに命令をすることが必要です。ここではactionlibを使ってmove_baseに指令を送ります。
また実際のタスクを実行すると途中で外部からの指示で中止を送れることが必要になります。

ソースコード

py_lecture/scripts/smach_task.py
#!/usr/bin/env python

import rospy
import smach
import smach_ros
from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal
from std_msgs.msg import Empty

# define state Foo
class MoveBaseState(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome1','outcome2'])
        self.counter = 0

    def execute(self, userdata):
        rospy.loginfo('Executing state FOO')
        if self.counter < 3:
            self.counter += 1
            return 'outcome1'
        else:
            return 'outcome2'


# define state Bar
class Bar(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome1'])

    def execute(self, userdata):
        rospy.loginfo('Executing state BAR')
        return 'outcome1'

def child_term_cb(outcome_map):
    print("child_term_cb")
    if outcome_map['MOVE2'] == 'succeeded':
        return True
    if outcome_map['MONITOR2']:
        return True
    return False

def out_cb(outcome_map):
    print("out_cb")
    if outcome_map['MOVE2'] == 'succeeded':
        return 'done'
    else:
        return 'exit'

def monitor_cb(ud, msg):
    print("monitor_cb")
    return False

def main():
    rospy.init_node('smach_example_state_machine')

    # Create the top level SMACH state machine
    sm_top = smach.StateMachine(outcomes=['done', 'exit'])

    # Open the container
    with sm_top:
        goal1=MoveBaseGoal()
        goal1.target_pose.header.frame_id = "dtw_robot1/map"
        goal1.target_pose.pose.position.x = 0.5
        goal1.target_pose.pose.orientation.w = 1.0
        smach.StateMachine.add('MOVE1', smach_ros.SimpleActionState('/dtw_robot1/move_base', MoveBaseAction, goal=goal1), transitions={'succeeded':'TASK2', 'preempted':'done', 'aborted':'done'})

        task2_concurrence = smach.Concurrence(outcomes=['done', 'exit'], default_outcome='done', child_termination_cb = child_term_cb, outcome_cb = out_cb)
        with task2_concurrence:
            goal2=MoveBaseGoal()
            goal2.target_pose.header.frame_id = "dtw_robot1/map"
            goal2.target_pose.pose.position.x = -0.5
            goal2.target_pose.pose.orientation.w = 1.0
            smach.Concurrence.add('MOVE2', smach_ros.SimpleActionState('/dtw_robot1/move_base', MoveBaseAction, goal=goal2))
            smach.Concurrence.add('MONITOR2', smach_ros.MonitorState("/sm_stop", Empty, monitor_cb))
        smach.StateMachine.add('TASK2', task2_concurrence, transitions={'done':'done', 'exit':'exit'}) 



    # Execute SMACH plan
    sis = smach_ros.IntrospectionServer('smach_server', sm_top, '/SM_ROOT')
    sis.start()
    outcome = sm_top.execute()
    rospy.spin()
    sis.stop()


if __name__ == '__main__':
    main()

実行

1つ目のターミナル
roslaunch nav_lecture move_base.launch 
2つ目のターミナル
rosrun smach_viewer smach_viewer.py 
3つ目のターミナル
rosrun py_lecture smach_task.py

上記を普通に実行しますと以下のようにTASK1とTASK2の2つのタスクを順次実行します。

smach3_1.gif

TASK2の実行途中で以下のコマンドを打つとTASK2を中止します。

4つ目のターミナル
rostopic pub -1 /sm_stop std_msgs/Empty "{}" 

smach3_2.gif

参考

smach tutorial

目次ページへのリンク

ROS講座の目次へのリンク

12
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
8