rosbagsでautoware foundationのtopicを可視化する方法
rosbagsではROSのパッケージをインストールしなくてもtopicを可視化できるので
rosbagを取得してjupyter環境(MAC)に転送して調査できる。
1.rosbagを取得してjupyter環境(MAC)に転送する。
autowareを起動してシュミレーションを起動する。
起動中にautowareの以下のtopicを取得してjupyter環境(MAC)に転送する。
- /tf
- /sensing/lidar/concatenated/pointcloud
- /perception/object_recognition/detection/objects
1.1.autoware起動
source ~/autoware/install/setup.bash
ros2 launch autoware_launch logging_simulator.launch.xml map_path:=$HOME/Downloads/sample-map-rosbag vehicle_model:=sample_vehicle sensor_model:=sample_sensor_kit
1.2.rosbagを取得
source ~/autoware/install/setup.bash
ros2 bag record /tf /sensing/lidar/concatenated/pointcloud /perception/object_recognition/detection/objects
1.3.シュミレーションを起動
source ~/autoware/install/setup.bash
ros2 bag record /tf /sensing/lidar/concatenated/pointcloud /perception/object_recognition/detection/objects
1.4.rosbagをjupyter環境のマシン(MAC)に転送
2.rosbagsの使い方
2.1.rosbagsインストール
pip install rosbags
2.2.jupyterのコード
from rosbags.rosbag2 import Reader
from rosbags.serde import deserialize_cdr
from rosbags.typesys import get_types_from_idl, register_types
import numpy as np
import pandas as pd
import open3d as o3d
from pathlib import Path
rosbag_dir = 'rosbag2/rosbag2_2023_02_21-08_59_39'
def get_connectin(connections, topic):
connections = [x for x in connections if x.topic == topic]
return connections
2.2.1. /tfをdump
tf_msgs = []
with Reader(rosbag_dir) as reader:
connections = get_connectin(reader.connections, '/tf')
for connection, timestamp, rawdata in reader.messages(connections=connections):
msg = deserialize_cdr(rawdata, connection.msgtype)
tf_msgs.append(msg)
tf_msg = tf_msgs[0]
tf_msg
2.2.2. /sensing/lidar/concatenated/pointcloudをdump
pointcloud2_msgs = []
with Reader(rosbag_dir) as reader:
connections = get_connectin(reader.connections, '/sensing/lidar/concatenated/pointcloud')
for connection, timestamp, rawdata in reader.messages(connections=connections):
msg = deserialize_cdr(rawdata, connection.msgtype)
pointcloud2_msgs.append(msg)
pointcloud2_msg = pointcloud2_msgs[0]
pointcloud2_msg
Pointcloud2のmsgでは点群の座標はdataの中に入っているがunit8で入っているためfloat32に変換する必要がある。Pointcloud2のdataをnumpyに変換する必要がある。
(参考:https://github.com/ros2/common_interfaces/blob/60df44894caafa67767da4c71d91f8c5eb7abf38/sensor_msgs_py/sensor_msgs_py/point_cloud2.py)
Pointcloud2のmsgをnumpyに変換する関数
_DATATYPES = {}
_DATATYPES[1] = np.dtype(np.int8)
_DATATYPES[2] = np.dtype(np.uint8)
_DATATYPES[3] = np.dtype(np.int16)
_DATATYPES[4] = np.dtype(np.uint16)
_DATATYPES[5] = np.dtype(np.int32)
_DATATYPES[6] = np.dtype(np.uint32)
_DATATYPES[7] = np.dtype(np.float32)
_DATATYPES[8] = np.dtype(np.float64)
def dtype_from_fields(fields):
"""
Convert a Iterable of sensor_msgs.msg.PointField messages to a np.dtype.
:param fields: The point cloud fields.
(Type: iterable of sensor_msgs.msg.PointField)
:returns: NumPy datatype
"""
# Create a lists containing the names, offsets and datatypes of all fields
field_names = []
field_offsets = []
field_datatypes = []
for i, field in enumerate(fields):
# Datatype as numpy datatype
datatype = _DATATYPES[field.datatype]
# Name field
if field.name == '':
name = f'{DUMMY_FIELD_PREFIX}_{i}'
else:
name = field.name
# Handle fields with count > 1 by creating subfields with a suffix consiting
# of "_" followed by the subfield counter [0 -> (count - 1)]
assert field.count > 0, "Can't process fields with count = 0."
for a in range(field.count):
# Add suffix if we have multiple subfields
if field.count > 1:
subfield_name = f'{name}_{a}'
else:
subfield_name = name
assert subfield_name not in field_names, 'Duplicate field names are not allowed!'
field_names.append(subfield_name)
# Create new offset that includes subfields
field_offsets.append(field.offset + a * datatype.itemsize)
field_datatypes.append(datatype.str)
# Create a tuple for each field containing name and data type
return np.dtype({
'names': field_names,
'formats': field_datatypes,
'offsets': field_offsets,
})
def getpoints(msg):
return np.ndarray(
shape=(msg.width * msg.height, ),
dtype=dtype_from_fields(msg.fields),
buffer=msg.data)
Pointcloud2のmsgをPCDに変換
points = getpoints(pointcloud2_msg)
df_points = pd.DataFrame(data=points)
xyz= df_points[['x','y','z']].values
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(xyz)
o3d.io.write_point_cloud("test_pointcloud0.pcd", pcd)
o3d.visualization.draw_geometries([pcd])
PCDを可視化
2.2.3. /perception/object_recognition/detection/objectsをdump
DetectedObjectsのmsgはautoware foundation独自のmsgのためrosbagsに定義がない。
以下の方法でcustom message typeを追加する必要がある。
定義をgit clone
# autoware_auto_msgsの定義(IDL)をgithubからclone
$ git clone https://github.com/tier4/autoware_auto_msgs.git
rosbagsにcusmtom message typeを追加
add_types = {}
for pathstr in [
'autoware_auto_msgs/autoware_auto_perception_msgs/msg/DetectedObjects.idl',
'autoware_auto_msgs/autoware_auto_perception_msgs/msg/DetectedObject.idl',
'autoware_auto_msgs/autoware_auto_perception_msgs/msg/ObjectClassification.idl',
'autoware_auto_msgs/autoware_auto_perception_msgs/msg/DetectedObjectKinematics.idl',
'autoware_auto_msgs/autoware_auto_perception_msgs/msg/Shape.idl',
]:
msgpath = Path(pathstr)
msgdef = msgpath.read_text(encoding='utf-8')
add_types.update(get_types_from_idl(msgdef))
register_types(add_types)
detectedobjectsを抽出
detectedbjects_msgs = []
with Reader(rosbag_dir) as reader:
connections = get_connectin(reader.connections, '/perception/object_recognition/detection/objects')
for connection, timestamp, rawdata in reader.messages(connections=connections):
msg = deserialize_cdr(rawdata, connection.msgtype)
detectedbjects_msgs.append(msg)
objectのlabelとpositionをprint
detectedbjects_msg = detectedbjects_msgs[0]
detected_objects = []
for object_ in detectedbjects_msg.objects:
x = object_.kinematics.pose_with_covariance.pose.position.x
y = object_.kinematics.pose_with_covariance.pose.position.y
z = object_.kinematics.pose_with_covariance.pose.position.z
position = [x,y,z]
for classification in object_.classification:
label = classification.label
detected_object = {'label': label, 'position': position}
detected_objects.append(detected_object)
detected_objects