0
0

ROS2講座22 rostopicのログをrosbagに保存する

Posted at

環境

この記事は以下の環境で動いています。

項目
CPU Core i5-8250U
Ubuntu 22.04
ROS2 Humble

概要

simやロボット実機を動かしているときに、rostopicの動きを確認したくなることは多々あります。
ros2の標準ツールでもログはとれるのですが、「取得間隔の間引き」が出来ないためにpythonノードを書いてみます。

rosbag

rosbagはROSで使われるrostopicのログの形式です。

ソースコード

pythonノード

logger.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import sys
import uuid
import datetime
import os

# ROS2
import rclpy
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
from rclpy.serialization import serialize_message
from ros2topic.api import get_msg_class
import rosbag2_py


class SimpleBagRecorder(Node):
    def __init__(self):
        super().__init__('simple_bag_recorder')
        self.writer = rosbag2_py.SequentialWriter()

        # create directory
        iso_data_time = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
        target_dir_path = "rosbag/" + iso_data_time
        os.makedirs(target_dir_path, exist_ok=True)

        storage_options = rosbag2_py._storage.StorageOptions(
            uri=target_dir_path + "/log",
            storage_id='sqlite3')
        converter_options = rosbag2_py._storage.ConverterOptions('', '')
        self.writer.open(storage_options, converter_options)

        # parameter
        def declare_topic_parameter(stream_name):
            self.declare_parameter(stream_name + '.topic_list', ["dummy"])
            self.declare_parameter(stream_name + '.interval', 0.0)

        declare_topic_parameter('full')
        declare_topic_parameter('high')
        declare_topic_parameter('middle')
        declare_topic_parameter('low')

        self.interval_timer = self.create_timer(2.0, self.timerCallback)
        self.subscriber_dict = {}
        self.interval_dict = {}
        self.written_stamp_dict = {}

    def timerCallback(self):
        def get_topic_interval_dict(stream_name):
            interval = self.get_parameter(stream_name + '.interval').get_parameter_value().double_value
            return {topic: interval for topic in self.get_parameter(stream_name + '.topic_list').get_parameter_value().string_array_value}

        full_topic_interval_dict = get_topic_interval_dict('full')
        high_topic_interval_dict = get_topic_interval_dict('high')
        middle_topic_interval_dict = get_topic_interval_dict('middle')
        low_topic_interval_dict = get_topic_interval_dict('low')

        self.interval_dict = full_topic_interval_dict | high_topic_interval_dict | middle_topic_interval_dict | low_topic_interval_dict

        all_topic_set = set(self.interval_dict.keys())
        subscribed_topic_set = set(self.subscriber_dict.keys())
        existed_topic_list = set([x[0] for x in self.get_topic_names_and_types()])

        # get new subscribed topics
        add_topic_set = (all_topic_set - subscribed_topic_set) & existed_topic_list
        for topic in add_topic_set:
            if self.add_topic(topic):
                print(f"rosbag new topic: {topic}")

    def add_topic(self, topic):
        message_type_class = get_msg_class(self, topic, include_hidden_topics=True)
        if message_type_class is None:
            print(f'{topic} is not published')
            return False

        filtered_list = list(filter(lambda x: x[0] == topic, self.get_topic_names_and_types()))
        if len(filtered_list) == 0:
            print(f'{topic} is not published')
            return False
        message_type_str = filtered_list[0][1][0]

        topic_info = rosbag2_py._storage.TopicMetadata(
            name=topic,
            type=message_type_str,
            serialization_format='cdr')
        self.writer.create_topic(topic_info)

        if topic in self.subscriber_dict:
            print(f'{topic} is already subscribed')
            return False
        self.subscriber_dict[topic] = self.create_subscription(
            message_type_class,
            topic,
            lambda msg : self.topic_callback(topic, msg),
            10)
        return True

    def topic_callback(self, topic, msg):
        ros_now = self.get_clock().now()
        if topic in self.interval_dict and topic in self.written_stamp_dict:
            elapsed_duration = float((ros_now - self.written_stamp_dict[topic]).nanoseconds) / (10 ** 9)
            if elapsed_duration < self.interval_dict[topic]:
                return

        self.written_stamp_dict[topic] = ros_now
        self.writer.write(
            topic,
            serialize_message(msg),
            ros_now.nanoseconds)

####### main ######
def main(argv=sys.argv):
    try:
        rclpy.init()
        sbr = SimpleBagRecorder()
        rclpy.spin(sbr)
        rclpy.shutdown()
    except Exception as e:
        print(e)
    finally:
        print("del SimpleBagRecorder")
        del sbr

if __name__ == "__main__":
    main(sys.argv)
パラメーター例
/**:
  ros__parameters:
    full:
      interval: 0.0 # full rate
      topic_list:
        - "/topic_a"
    high:
      interval: 0.2 # 5Hz
      topic_list:
        - "topic_b"
        - "topic_c"

ビルド&実行

ビルド
source /opt/ros/humble/setup.bash
cd ros2_ws
colcon build
実行
ros2 run xxx xxx

rosbagの閲覧方法

plotjuggerがおすすめです。

目次ページへのリンク

ROS2講座の目次へのリンク

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0