はじめに
ROSでpythonを使う際,numpy arrayをpublish/subscribeしたいときに変換する関数をまとめておきました.
numpy ⇔ MultiArray変換
ポイントとしては,std_msgs/MultiArrayLayoutを変換したnumpy.ndarrayにあわせて変更する必要がある点です.
converter.py
#!/usr/bin/env python
from functools import partial
from std_msgs.msg import MultiArrayDimension, Float32MultiArray
def _numpy2multiarray(multiarray_type, np_array):
"""Convert numpy.ndarray to multiarray"""
multiarray = multiarray_type()
multiarray.layout.dim = [MultiArrayDimension("dim%d" % i, np_array.shape[i], np_array.shape[i] * np_array.dtype.itemsize) for i in range(np_array.ndim)]
multiarray.data = np_array.reshape(1, -1)[0].tolist()
return multiarray
def _multiarray2numpy(pytype, dtype, multiarray):
"""Convert multiarray to numpy.ndarray"""
dims = map(lambda x: x.size, multiarray.layout.dim)
return np.array(multiarray.data, dtype=pytype).reshape(dims).astype(dtype)
numpy2f32multi = partial(_numpy2multiarray, Float32MultiArray)
f32multi2numpy = partial(_multiarray2numpy, float, np.float32)
partial()
の部分をInt8MultiArray
等で定義することで他のデータ型にも対応させることができます.
Publish/Subscribe
talker.py
#!/usr/bin/env python
import numpy as np
import rospy
from std_msgs.msg import Float32MultiArray
from converter import numpy2f32multi
def talker():
pub = rospy.Publisher('chatter', Float32MultiArray, queue_size=10)
rospy.init_node('talker', anonymous=True)
r = rospy.Rate(10) # 10hz
while not rospy.is_shutdown():
arr = np.random.rand(2, 2, 2).astype(np.float32)
arr_msg = numpy2f32multi(arr)
pub.publish(arr_msg)
r.sleep()
if __name__ == '__main__':
try:
talker()
except rospy.ROSInterruptException: pass
listener.py
#!/usr/bin/env python
import numpy as np
import rospy
from std_msgs.msg import Float32MultiArray
from converter import f32multi2numpy
def callback(msg):
arr = f32multi2numpy(msg)
rospy.loginfo(arr)
rospy.loginfo(type(arr))
rospy.loginfo(arr.shape)
def listener():
rospy.init_node('listener', anonymous=True)
rospy.Subscriber('chatter', Float32MultiArray, callback)
rospy.spin()
if __name__ == '__main__':
listener()