環境
この記事は以下の環境で動いています。
項目 | 値 |
---|---|
CPU | Core i5-8250U |
Ubuntu | 22.04 |
ROS2 | Humble |
概要
simやロボット実機を動かしているときに、rostopicの動きを確認したくなることは多々あります。
ros2の標準ツールでもログはとれるのですが、「取得間隔の間引き」が出来ないためにpythonノードを書いてみます。
rosbag
rosbagはROSで使われるrostopicのログの形式です。
ソースコード
pythonノード
logger.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import uuid
import datetime
import os
# ROS2
import rclpy
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
from rclpy.serialization import serialize_message
from ros2topic.api import get_msg_class
import rosbag2_py
class SimpleBagRecorder(Node):
def __init__(self):
super().__init__('simple_bag_recorder')
self.writer = rosbag2_py.SequentialWriter()
# create directory
iso_data_time = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
target_dir_path = "rosbag/" + iso_data_time
os.makedirs(target_dir_path, exist_ok=True)
storage_options = rosbag2_py._storage.StorageOptions(
uri=target_dir_path + "/log",
storage_id='sqlite3')
converter_options = rosbag2_py._storage.ConverterOptions('', '')
self.writer.open(storage_options, converter_options)
# parameter
def declare_topic_parameter(stream_name):
self.declare_parameter(stream_name + '.topic_list', ["dummy"])
self.declare_parameter(stream_name + '.interval', 0.0)
declare_topic_parameter('full')
declare_topic_parameter('high')
declare_topic_parameter('middle')
declare_topic_parameter('low')
self.interval_timer = self.create_timer(2.0, self.timerCallback)
self.subscriber_dict = {}
self.interval_dict = {}
self.written_stamp_dict = {}
def timerCallback(self):
def get_topic_interval_dict(stream_name):
interval = self.get_parameter(stream_name + '.interval').get_parameter_value().double_value
return {topic: interval for topic in self.get_parameter(stream_name + '.topic_list').get_parameter_value().string_array_value}
full_topic_interval_dict = get_topic_interval_dict('full')
high_topic_interval_dict = get_topic_interval_dict('high')
middle_topic_interval_dict = get_topic_interval_dict('middle')
low_topic_interval_dict = get_topic_interval_dict('low')
self.interval_dict = full_topic_interval_dict | high_topic_interval_dict | middle_topic_interval_dict | low_topic_interval_dict
all_topic_set = set(self.interval_dict.keys())
subscribed_topic_set = set(self.subscriber_dict.keys())
existed_topic_list = set([x[0] for x in self.get_topic_names_and_types()])
# get new subscribed topics
add_topic_set = (all_topic_set - subscribed_topic_set) & existed_topic_list
for topic in add_topic_set:
if self.add_topic(topic):
print(f"rosbag new topic: {topic}")
def add_topic(self, topic):
message_type_class = get_msg_class(self, topic, include_hidden_topics=True)
if message_type_class is None:
print(f'{topic} is not published')
return False
filtered_list = list(filter(lambda x: x[0] == topic, self.get_topic_names_and_types()))
if len(filtered_list) == 0:
print(f'{topic} is not published')
return False
message_type_str = filtered_list[0][1][0]
topic_info = rosbag2_py._storage.TopicMetadata(
name=topic,
type=message_type_str,
serialization_format='cdr')
self.writer.create_topic(topic_info)
if topic in self.subscriber_dict:
print(f'{topic} is already subscribed')
return False
self.subscriber_dict[topic] = self.create_subscription(
message_type_class,
topic,
lambda msg : self.topic_callback(topic, msg),
10)
return True
def topic_callback(self, topic, msg):
ros_now = self.get_clock().now()
if topic in self.interval_dict and topic in self.written_stamp_dict:
elapsed_duration = float((ros_now - self.written_stamp_dict[topic]).nanoseconds) / (10 ** 9)
if elapsed_duration < self.interval_dict[topic]:
return
self.written_stamp_dict[topic] = ros_now
self.writer.write(
topic,
serialize_message(msg),
ros_now.nanoseconds)
####### main ######
def main(argv=sys.argv):
try:
rclpy.init()
sbr = SimpleBagRecorder()
rclpy.spin(sbr)
rclpy.shutdown()
except Exception as e:
print(e)
finally:
print("del SimpleBagRecorder")
del sbr
if __name__ == "__main__":
main(sys.argv)
パラメーター例
/**:
ros__parameters:
full:
interval: 0.0 # full rate
topic_list:
- "/topic_a"
high:
interval: 0.2 # 5Hz
topic_list:
- "topic_b"
- "topic_c"
ビルド&実行
ビルド
source /opt/ros/humble/setup.bash
cd ros2_ws
colcon build
実行
ros2 run xxx xxx
rosbagの閲覧方法
plotjuggerがおすすめです。