1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

autoware foundationのtopicを可視化する

Last updated at Posted at 2023-03-21

rosbagsでautoware foundationのtopicを可視化する方法
rosbagsではROSのパッケージをインストールしなくてもtopicを可視化できるので
rosbagを取得してjupyter環境(MAC)に転送して調査できる。

1.rosbagを取得してjupyter環境(MAC)に転送する。

autowareを起動してシュミレーションを起動する。
起動中にautowareの以下のtopicを取得してjupyter環境(MAC)に転送する。

  • /tf
  • /sensing/lidar/concatenated/pointcloud
  • /perception/object_recognition/detection/objects

1.1.autoware起動

source ~/autoware/install/setup.bash
ros2 launch autoware_launch logging_simulator.launch.xml map_path:=$HOME/Downloads/sample-map-rosbag vehicle_model:=sample_vehicle sensor_model:=sample_sensor_kit

1.2.rosbagを取得

source ~/autoware/install/setup.bash
ros2 bag record /tf /sensing/lidar/concatenated/pointcloud /perception/object_recognition/detection/objects

1.3.シュミレーションを起動

source ~/autoware/install/setup.bash
ros2 bag record /tf /sensing/lidar/concatenated/pointcloud /perception/object_recognition/detection/objects

1.4.rosbagをjupyter環境のマシン(MAC)に転送

2.rosbagsの使い方

2.1.rosbagsインストール

pip install rosbags

2.2.jupyterのコード

from rosbags.rosbag2 import Reader
from rosbags.serde import deserialize_cdr
from rosbags.typesys import get_types_from_idl, register_types

import numpy as np
import pandas as pd
import open3d as o3d
from pathlib import Path

rosbag_dir = 'rosbag2/rosbag2_2023_02_21-08_59_39'

def get_connectin(connections, topic):
    connections = [x for x in connections if x.topic == topic]
    return connections

2.2.1. /tfをdump

tf_msgs = []
with Reader(rosbag_dir) as reader:
    connections = get_connectin(reader.connections, '/tf')
    for connection, timestamp, rawdata in reader.messages(connections=connections):
        msg = deserialize_cdr(rawdata, connection.msgtype)
        tf_msgs.append(msg)
tf_msg = tf_msgs[0]
tf_msg

tf.jpg

2.2.2. /sensing/lidar/concatenated/pointcloudをdump

pointcloud2_msgs = []
with Reader(rosbag_dir) as reader:
    connections = get_connectin(reader.connections, '/sensing/lidar/concatenated/pointcloud')
    for connection, timestamp, rawdata in reader.messages(connections=connections):
        msg = deserialize_cdr(rawdata, connection.msgtype)
        pointcloud2_msgs.append(msg)
pointcloud2_msg = pointcloud2_msgs[0]
pointcloud2_msg

pointcloud2.jpg

Pointcloud2のmsgでは点群の座標はdataの中に入っているがunit8で入っているためfloat32に変換する必要がある。Pointcloud2のdataをnumpyに変換する必要がある。
(参考:https://github.com/ros2/common_interfaces/blob/60df44894caafa67767da4c71d91f8c5eb7abf38/sensor_msgs_py/sensor_msgs_py/point_cloud2.py)

Pointcloud2のmsgをnumpyに変換する関数
_DATATYPES = {}
_DATATYPES[1] = np.dtype(np.int8)
_DATATYPES[2] = np.dtype(np.uint8)
_DATATYPES[3] = np.dtype(np.int16)
_DATATYPES[4] = np.dtype(np.uint16)
_DATATYPES[5] = np.dtype(np.int32)
_DATATYPES[6] = np.dtype(np.uint32)
_DATATYPES[7] = np.dtype(np.float32)
_DATATYPES[8] = np.dtype(np.float64)

def dtype_from_fields(fields):
    """
    Convert a Iterable of sensor_msgs.msg.PointField messages to a np.dtype.
    :param fields: The point cloud fields.
                   (Type: iterable of sensor_msgs.msg.PointField)
    :returns: NumPy datatype
    """
    # Create a lists containing the names, offsets and datatypes of all fields
    field_names = []
    field_offsets = []
    field_datatypes = []
    for i, field in enumerate(fields):
        # Datatype as numpy datatype
        datatype = _DATATYPES[field.datatype]
        # Name field
        if field.name == '':
            name = f'{DUMMY_FIELD_PREFIX}_{i}'
        else:
            name = field.name
        # Handle fields with count > 1 by creating subfields with a suffix consiting
        # of "_" followed by the subfield counter [0 -> (count - 1)]
        assert field.count > 0, "Can't process fields with count = 0."
        for a in range(field.count):
            # Add suffix if we have multiple subfields
            if field.count > 1:
                subfield_name = f'{name}_{a}'
            else:
                subfield_name = name
            assert subfield_name not in field_names, 'Duplicate field names are not allowed!'
            field_names.append(subfield_name)
            # Create new offset that includes subfields
            field_offsets.append(field.offset + a * datatype.itemsize)
            field_datatypes.append(datatype.str)
            

    # Create a tuple for each field containing name and data type
    return np.dtype({
        'names': field_names,
        'formats': field_datatypes,
        'offsets': field_offsets,
    })

def getpoints(msg):
    return np.ndarray(
        shape=(msg.width * msg.height, ),
        dtype=dtype_from_fields(msg.fields),
        buffer=msg.data)
Pointcloud2のmsgをPCDに変換
points = getpoints(pointcloud2_msg)
df_points = pd.DataFrame(data=points)
xyz= df_points[['x','y','z']].values

pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(xyz)
o3d.io.write_point_cloud("test_pointcloud0.pcd", pcd)
o3d.visualization.draw_geometries([pcd])
PCDを可視化

pcd.png

2.2.3. /perception/object_recognition/detection/objectsをdump

DetectedObjectsのmsgはautoware foundation独自のmsgのためrosbagsに定義がない。
以下の方法でcustom message typeを追加する必要がある。

定義をgit clone
# autoware_auto_msgsの定義(IDL)をgithubからclone
$ git clone https://github.com/tier4/autoware_auto_msgs.git
rosbagsにcusmtom message typeを追加
add_types = {}

for pathstr in [
    'autoware_auto_msgs/autoware_auto_perception_msgs/msg/DetectedObjects.idl',
    'autoware_auto_msgs/autoware_auto_perception_msgs/msg/DetectedObject.idl',
    'autoware_auto_msgs/autoware_auto_perception_msgs/msg/ObjectClassification.idl',
    'autoware_auto_msgs/autoware_auto_perception_msgs/msg/DetectedObjectKinematics.idl',
    'autoware_auto_msgs/autoware_auto_perception_msgs/msg/Shape.idl',
]:
    msgpath = Path(pathstr)
    msgdef = msgpath.read_text(encoding='utf-8')
    add_types.update(get_types_from_idl(msgdef))

register_types(add_types)
detectedobjectsを抽出
detectedbjects_msgs = []
with Reader(rosbag_dir) as reader:
    connections = get_connectin(reader.connections, '/perception/object_recognition/detection/objects')
    for connection, timestamp, rawdata in reader.messages(connections=connections):
        msg = deserialize_cdr(rawdata, connection.msgtype)
        detectedbjects_msgs.append(msg)
objectのlabelとpositionをprint
detectedbjects_msg = detectedbjects_msgs[0]
detected_objects = []
for object_ in detectedbjects_msg.objects:
    x = object_.kinematics.pose_with_covariance.pose.position.x
    y = object_.kinematics.pose_with_covariance.pose.position.y
    z = object_.kinematics.pose_with_covariance.pose.position.z
    position = [x,y,z]
    for classification in object_.classification:
        label = classification.label
    detected_object = {'label': label, 'position': position}
    detected_objects.append(detected_object)
detected_objects

detected_objects.png

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?