LoginSignup
0

More than 1 year has passed since last update.

numpy.ndarrayとstd_msgs/MultiArrayの相互変換

Last updated at Posted at 2021-10-13

はじめに

ROSでpythonを使う際,numpy arrayをpublish/subscribeしたいときに変換する関数をまとめておきました.

numpy ⇔ MultiArray変換

ポイントとしては,std_msgs/MultiArrayLayoutを変換したnumpy.ndarrayにあわせて変更する必要がある点です.

converter.py
#!/usr/bin/env python

from functools import partial

from std_msgs.msg import MultiArrayDimension, Float32MultiArray

def _numpy2multiarray(multiarray_type, np_array):
    """Convert numpy.ndarray to multiarray"""
    multiarray = multiarray_type()
    multiarray.layout.dim = [MultiArrayDimension("dim%d" % i, np_array.shape[i], np_array.shape[i] * np_array.dtype.itemsize) for i in range(np_array.ndim)]
    multiarray.data = np_array.reshape(1, -1)[0].tolist()
    return multiarray

def _multiarray2numpy(pytype, dtype, multiarray):
    """Convert multiarray to numpy.ndarray"""
    dims = map(lambda x: x.size, multiarray.layout.dim)
    return np.array(multiarray.data, dtype=pytype).reshape(dims).astype(dtype)

numpy2f32multi = partial(_numpy2multiarray, Float32MultiArray)
f32multi2numpy = partial(_multiarray2numpy, float, np.float32)

partial()の部分をInt8MultiArray等で定義することで他のデータ型にも対応させることができます.

Publish/Subscribe

talker.py
#!/usr/bin/env python

import numpy as np
import rospy
from std_msgs.msg import Float32MultiArray

from converter import numpy2f32multi


def talker():
    pub = rospy.Publisher('chatter', Float32MultiArray, queue_size=10)
    rospy.init_node('talker', anonymous=True)
    r = rospy.Rate(10) # 10hz
    while not rospy.is_shutdown():
        arr = np.random.rand(2, 2, 2).astype(np.float32)
        arr_msg = numpy2f32multi(arr)
        pub.publish(arr_msg)
        r.sleep()

if __name__ == '__main__':
    try:
        talker()
    except rospy.ROSInterruptException: pass
listener.py
#!/usr/bin/env python

import numpy as np
import rospy
from std_msgs.msg import Float32MultiArray

from converter import f32multi2numpy


def callback(msg):
    arr = f32multi2numpy(msg)
    rospy.loginfo(arr)
    rospy.loginfo(type(arr))
    rospy.loginfo(arr.shape)


def listener():
    rospy.init_node('listener', anonymous=True)
    rospy.Subscriber('chatter', Float32MultiArray, callback)
    rospy.spin()

if __name__ == '__main__':
    listener()

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0