LoginSignup
0
1

More than 1 year has passed since last update.

ROSの基本的な書き方(python)

Posted at

 おそらくn番煎じだと思うのだが,個人的なメモも兼ねてROSの基本的な書き方を紹介する。

Publishのみのコード

基本的なもの

#!/usr/bin/env python
import rospy
from std_msgs.msg import Float64MultiArray #適当なメッセージ型をインポートする。

def example_func():
    list0 = Float64MultiArray() #適当にインスタンスを生成
    #適当な処理を書く
    data_pub.publish(list0)

if __name__ == "__main__":
    rospy.init_node('example_node')
    data_pub = rospy.Publisher('example_data', Float64MultiArray, queue_size=10)
    try:
        while not rospy.is_shutdown():
            #データをpublishする関数を50Hzで回す。
            example_func()
            rospy.Rate(50).sleep()
    except rospy.ROSInterruptException:
        pass

 queue_sizeは10にしているが,特に意味はない。実際に使う場合はループ内にいろんな関数を書き加えていく感じになるのだろうか。

クラスでまとめて見栄えをよくしたもの

#!/usr/bin/env python
import rospy
from std_msgs.msg import Float64MultiArray
from "プロジェクト名".msg import "メッセージ型" #自作のメッセージも同様にインポートする

class Example():
    def __init__(self):
        self.data_pub = rospy.Publisher('example_data', Float64MultiArray, queue_size=10)
        self.list0 = Float64MultiArray()
    def example_func0(self):
        #適当な処理を書く
        self.data_pub.publish(self.list0)
    def example_func1(self):
        #適当な処理を書く

if __name__ == "__main__":
    rospy.init_node('example_node')
    example1 = Example()
    try:
        while not rospy.is_shutdown():
            #データをpublishする関数を50Hzで回す。
            example1.example_func0()
            rospy.Rate(50).sleep()
    except rospy.ROSInterruptException:
        pass

 関数が増えてくるとごちゃごちゃして分かりにくくなるのでクラスでまとめると良い。

Subscribeのみのコード

#!/usr/bin/env python
import rospy
from std_msgs.msg import Float64MultiArray

def callback(data):
    #適当な処理を書く

if __name__ == "__main__":
    rospy.init_node('example_node')
    sub = rospy.Subscriber("example_data", Float64MultiArray, callback, queue_size=10)
    rospy.spin()

 rospy.spin()で待機して,トピックを受け取ったらcallback関数を実行する。

PublishとSubscribeを使うコード

基本的なもの

#!/usr/bin/env python
import rospy
from std_msgs.msg import Float64MultiArray

def callback(data):
    list0 = Float64MultiArray()
    #適当な処理を書く
    pub.publish(list0)

if __name__ == "__main__":
    rospy.init_node('example_node')
    sub = rospy.Subscriber('example_data0', Float64MultiArray, callback, queue_size=10)
    pub = rospy.Publisher('example_data1', Float64MultiArray, queue_size=10)
    rospy.spin()

 トピックを受け取り次第,処理を行いPublishする。

PubとSubを独立させる

#!/usr/bin/env python
import rospy
from std_msgs.msg import Float64MultiArray

list0 = [0.0, 0.0]
list1 = [0.0, 0.0]

def callback0(data):
    global list0
    list0 = data.data

def callback1(data):
    global list1
    list1 = data.data

def example_func():
    global list0
    global list1
    pub_data = Float64MultiArray(data = list0 + list1)
    pub.publish(pub_data)

if __name__ == "__main__":
    rospy.init_node('example_node')
    sub0 = rospy.Subscriber('example_data0', Float64MultiArray, callback0, queue_size=10)
    sub1 = rospy.Subscriber('example_data1', Float64MultiArray, callback1, queue_size=10)
    pub = rospy.Publisher('example_data2', Float64MultiArray, queue_size=10)
    try:
        while not rospy.is_shutdown():
            example_func()
            rospy.Rate(50).sleep()
    except rospy.ROSInterruptException:
        pass

 Subscriberのcallback関数内でPublishすると都合が悪い時(例のように複数のSubscriberのデータを利用したい時や,Subscribeの周期とPublishの周期をずらしたい時など)は上のようにglobal変数を利用してみると良いだろう。ちなみにPythonだとwhileループとcallback処理は別のスレッドで実行されるらしい??

Actionlibを使うコード

Cliant(送って受け取る方)

#!/usr/bin/env python
import rospy
import actionlib
from example.msg import ExampleAction #自作のactionメッセージ型。作り方は各自調べて。
from example.msg import ExampleGoal

def example_func():
    goal = ExampleGoal()
    action_client.wait_for_server()
    #goalに何らかの値を代入
    action_client.send_goal(goal)
    action_client.wait_for_result()
    result = action_client.get_result() #resultにExampleResult型のデータが格納される

if __name__ == "__main__":
    rospy.init_node('example_node')
    action_client = actionlib.SimpleActionClient('example_data', ExampleAction)
    try:
        while not rospy.is_shutdown():
            example_func()
            rospy.Rate(1).sleep()
    except rospy.ROSInterruptException:
        pass

 適当なタイミングでexample_funcを実行すればサーバーとデータのやりとりができる。上のコードでは1Hzでサーバーにデータを送っている。

Server(受け取って返す方)

#!/usr/bin/env python
import rospy
from actionlib import SimpleActionServer
from example.msg import ExampleAction
from example.msg import ExampleResult

def example_cb(data):
    example_result = ExampleResult()
    #適当な処理を書く
    action_srv.set_succeeded(result = example_result)

if __name__ == "__main__":
    rospy.init_node('example_node')
    action_srv = SimpleActionServer('example_data', ExampleAction, execute_cb=example_cb)
    rospy.spin()

 callback関数内で必ずresultを返さないといけないので,なんらかの結果を待ってからresultを返したい場合は以下のコードも参考。

#!/usr/bin/env python
import rospy
from actionlib import SimpleActionServer
from example.msg import ExampleAction
from example.msg import ExampleResult

example_result = ExampleResult()
example_flag = False

def example_cb(data):
    global example_result
    global example_flag
    example_flag = True
    while(example_flag):
        rospy.Rate(50).sleep()
    a_srv.set_succeeded(result = example_result)

if __name__ == "__main__":
    rospy.init_node('example_node')
    a_srv = SimpleActionServer('example_data', ExampleAction, execute_cb=example_cb)
    try:
        while not rospy.is_shutdown():
            if example_flag and ("何らかの条件"):
                example_flag = False
            rospy.Rate(50).sleep()
    except rospy.ROSInterruptException:
        pass

 何らかの条件を達成するとflagがFalseに変化し,resultが返される。流石にもう少しマシな書き方があると思うので,あくまでも参考程度にして欲しい。

終わり

 説明したもの以外にservice通信などもあるが,これはaction通信で代用できるのであまり使う機会はないと思う。紹介したコードはあくまでも一例なのでもっと良い書き方を探してみてほしい。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1