おそらくn番煎じだと思うのだが,個人的なメモも兼ねてROSの基本的な書き方を紹介する。
Publishのみのコード
基本的なもの
#!/usr/bin/env python
import rospy
from std_msgs.msg import Float64MultiArray #適当なメッセージ型をインポートする。
def example_func():
list0 = Float64MultiArray() #適当にインスタンスを生成
#適当な処理を書く
data_pub.publish(list0)
if __name__ == "__main__":
rospy.init_node('example_node')
data_pub = rospy.Publisher('example_data', Float64MultiArray, queue_size=10)
try:
while not rospy.is_shutdown():
#データをpublishする関数を50Hzで回す。
example_func()
rospy.Rate(50).sleep()
except rospy.ROSInterruptException:
pass
queue_sizeは10にしているが,特に意味はない。実際に使う場合はループ内にいろんな関数を書き加えていく感じになるのだろうか。
クラスでまとめて見栄えをよくしたもの
#!/usr/bin/env python
import rospy
from std_msgs.msg import Float64MultiArray
from "プロジェクト名".msg import "メッセージ型" #自作のメッセージも同様にインポートする
class Example():
def __init__(self):
self.data_pub = rospy.Publisher('example_data', Float64MultiArray, queue_size=10)
self.list0 = Float64MultiArray()
def example_func0(self):
#適当な処理を書く
self.data_pub.publish(self.list0)
def example_func1(self):
#適当な処理を書く
if __name__ == "__main__":
rospy.init_node('example_node')
example1 = Example()
try:
while not rospy.is_shutdown():
#データをpublishする関数を50Hzで回す。
example1.example_func0()
rospy.Rate(50).sleep()
except rospy.ROSInterruptException:
pass
関数が増えてくるとごちゃごちゃして分かりにくくなるのでクラスでまとめると良い。
Subscribeのみのコード
#!/usr/bin/env python
import rospy
from std_msgs.msg import Float64MultiArray
def callback(data):
#適当な処理を書く
if __name__ == "__main__":
rospy.init_node('example_node')
sub = rospy.Subscriber("example_data", Float64MultiArray, callback, queue_size=10)
rospy.spin()
rospy.spin()で待機して,トピックを受け取ったらcallback関数を実行する。
PublishとSubscribeを使うコード
基本的なもの
#!/usr/bin/env python
import rospy
from std_msgs.msg import Float64MultiArray
def callback(data):
list0 = Float64MultiArray()
#適当な処理を書く
pub.publish(list0)
if __name__ == "__main__":
rospy.init_node('example_node')
sub = rospy.Subscriber('example_data0', Float64MultiArray, callback, queue_size=10)
pub = rospy.Publisher('example_data1', Float64MultiArray, queue_size=10)
rospy.spin()
トピックを受け取り次第,処理を行いPublishする。
PubとSubを独立させる
#!/usr/bin/env python
import rospy
from std_msgs.msg import Float64MultiArray
list0 = [0.0, 0.0]
list1 = [0.0, 0.0]
def callback0(data):
global list0
list0 = data.data
def callback1(data):
global list1
list1 = data.data
def example_func():
global list0
global list1
pub_data = Float64MultiArray(data = list0 + list1)
pub.publish(pub_data)
if __name__ == "__main__":
rospy.init_node('example_node')
sub0 = rospy.Subscriber('example_data0', Float64MultiArray, callback0, queue_size=10)
sub1 = rospy.Subscriber('example_data1', Float64MultiArray, callback1, queue_size=10)
pub = rospy.Publisher('example_data2', Float64MultiArray, queue_size=10)
try:
while not rospy.is_shutdown():
example_func()
rospy.Rate(50).sleep()
except rospy.ROSInterruptException:
pass
Subscriberのcallback関数内でPublishすると都合が悪い時(例のように複数のSubscriberのデータを利用したい時や,Subscribeの周期とPublishの周期をずらしたい時など)は上のようにglobal変数を利用してみると良いだろう。ちなみにPythonだとwhileループとcallback処理は別のスレッドで実行されるらしい??
Actionlibを使うコード
Cliant(送って受け取る方)
#!/usr/bin/env python
import rospy
import actionlib
from example.msg import ExampleAction #自作のactionメッセージ型。作り方は各自調べて。
from example.msg import ExampleGoal
def example_func():
goal = ExampleGoal()
action_client.wait_for_server()
#goalに何らかの値を代入
action_client.send_goal(goal)
action_client.wait_for_result()
result = action_client.get_result() #resultにExampleResult型のデータが格納される
if __name__ == "__main__":
rospy.init_node('example_node')
action_client = actionlib.SimpleActionClient('example_data', ExampleAction)
try:
while not rospy.is_shutdown():
example_func()
rospy.Rate(1).sleep()
except rospy.ROSInterruptException:
pass
適当なタイミングでexample_funcを実行すればサーバーとデータのやりとりができる。上のコードでは1Hzでサーバーにデータを送っている。
Server(受け取って返す方)
#!/usr/bin/env python
import rospy
from actionlib import SimpleActionServer
from example.msg import ExampleAction
from example.msg import ExampleResult
def example_cb(data):
example_result = ExampleResult()
#適当な処理を書く
action_srv.set_succeeded(result = example_result)
if __name__ == "__main__":
rospy.init_node('example_node')
action_srv = SimpleActionServer('example_data', ExampleAction, execute_cb=example_cb)
rospy.spin()
callback関数内で必ずresultを返さないといけないので,なんらかの結果を待ってからresultを返したい場合は以下のコードも参考。
#!/usr/bin/env python
import rospy
from actionlib import SimpleActionServer
from example.msg import ExampleAction
from example.msg import ExampleResult
example_result = ExampleResult()
example_flag = False
def example_cb(data):
global example_result
global example_flag
example_flag = True
while(example_flag):
rospy.Rate(50).sleep()
a_srv.set_succeeded(result = example_result)
if __name__ == "__main__":
rospy.init_node('example_node')
a_srv = SimpleActionServer('example_data', ExampleAction, execute_cb=example_cb)
try:
while not rospy.is_shutdown():
if example_flag and ("何らかの条件"):
example_flag = False
rospy.Rate(50).sleep()
except rospy.ROSInterruptException:
pass
何らかの条件を達成するとflagがFalseに変化し,resultが返される。流石にもう少しマシな書き方があると思うので,あくまでも参考程度にして欲しい。
終わり
説明したもの以外にservice通信などもあるが,これはaction通信で代用できるのであまり使う機会はないと思う。紹介したコードはあくまでも一例なのでもっと良い書き方を探してみてほしい。