はじめに
自動運転AIチャレンジで一番大きなハードルとなる軌跡データの編集。こちらをカートを走らせながら自動でやっちゃおうというのがこちらの開発の趣旨。
前提
カートの走行した軌跡(X座標、Y座標、Z座標とカートのヨー角)をOdometry(nav_msgs/msg/Odometry)型の推定自己位置というデータの形でトピック /localization/kinematic_state から受信しCSVファイルに出力するパッケージの開発を目指した。
AWSIMでは自動運転モードの他に手動運転モードもあり、手動で走らせた走行データを軌跡データに落とし込み、最適なTrajectoryファイル作成に活用することも可能。
さらにこちらは以下のAI開発のためのデータ収集にも活用された。
方針
出力側(publish側)
発行トピック / 型
/localization/kinematic_state に nav_msgs/msg/Odometry を publish。
含める内容
header.stamp(正確な時刻。シミュならシミュ時刻、実機ならuse_sim_timeに応じて)
header.frame_id = "map"(慣例)
child_frame_id = "base_link"(慣例)
pose.pose.position.{x,y,z}:地図座標系での位置
pose.pose.orientation.{x,y,z,w}:車体姿勢(クォータニオン)
入力側(subscribe側)
購読
/localization/kinematic_state を Odometry で subscribe
ログの出力
create_timer(1.0, tick) で 1秒ごとに1行だけ CSV へ書き込み。
出力列は 7列固定:x, y, z(Odometryの位置)+ x_quat, y_quat, z_quat, w_quat。
raceline 参照
起動時に /mnt/data/raceline_cctb_30km_wide_from_garage.csv を読み込み(パラメータ raceline_csv で差し替え可)。
停止条件とリソース管理
データ行数が 122行 に達したらログ停止・シャットダウン(保険)。
実装
上記の方針をコードに落とし込むと以下のようになる
#!/usr/bin/env python3
import rclpy, atexit, signal, os, csv, math
from rclpy.node import Node
from nav_msgs.msg import Odometry
from datetime import datetime
OUT_DIR = "/output"
timestamp = datetime.now().strftime("%Y-%m-%d_%H%M%S")
OUT = os.path.join(OUT_DIR, f"xy_log_{timestamp}.csv")
# 添付のラインCSV(必要に応じてパラメータで差し替え可)
DEFAULT_RACELINE_CSV = "/mnt/data/raceline_cctb_30km_wide_from_garage.csv"
MAX_DATA_ROWS = 122 # データ行上限(ヘッダ除く)
class XYLogger(Node):
def __init__(self):
super().__init__('xy_logger')
self.last_msg = None
self.rows_written = 0
self.closed = False
self.sub = self.create_subscription(
Odometry, '/localization/kinematic_state', self.cb, 20
)
# raceline CSV をロード
self.raceline_csv = self.declare_parameter('raceline_csv', DEFAULT_RACELINE_CSV).value
self.raceline = self._load_raceline(self.raceline_csv) # list[dict] or None
os.makedirs(os.path.dirname(OUT), exist_ok=True)
self.f = open(OUT, 'a', newline='', buffering=1)
self.w = csv.writer(self.f)
# ヘッダは7列のみ
if self.f.tell() == 0:
self.w.writerow(['x','y','z','x_quat','y_quat','z_quat','w_quat'])
# 1秒ごとに書き込み
self.timer = self.create_timer(1.0, self.tick)
# 終了時クリーンアップ
atexit.register(self._close)
rclpy.get_default_context().on_shutdown(self._close)
signal.signal(signal.SIGTERM, lambda *_: rclpy.shutdown())
def cb(self, msg: Odometry):
self.last_msg = msg
def tick(self):
if self.rows_written >= MAX_DATA_ROWS:
self.get_logger().info(f"Reached {MAX_DATA_ROWS} rows. Stopping.")
self._close()
try:
rclpy.shutdown()
except Exception:
pass
return
if self.last_msg is None:
return
msg = self.last_msg
p = msg.pose.pose.position
# 7列のベース: x, y, z は Odometry の位置
row = [f'{p.x:.6f}', f'{p.y:.6f}', f'{p.z:.6f}']
# quat は raceline があれば最近傍の行から、無ければ Odometry の姿勢を使用
if self.raceline is not None:
_, rl = self._nearest_on_raceline(p.x, p.y)
row += [
f'{rl["x_quat"]:.6f}', f'{rl["y_quat"]:.6f}',
f'{rl["z_quat"]:.6f}', f'{rl["w_quat"]:.6f}'
]
else:
q = msg.pose.pose.orientation
row += [f'{q.x:.6f}', f'{q.y:.6f}', f'{q.z:.6f}', f'{q.w:.6f}']
try:
self.w.writerow(row)
self.f.flush()
self.rows_written += 1
except Exception as e:
self.get_logger().error(f"Write failed: {e}")
self._close()
try:
rclpy.shutdown()
except Exception:
pass
def _load_raceline(self, path):
try:
if not os.path.exists(path):
self.get_logger().warn(f"raceline CSV not found: {path}")
return None
data = []
with open(path, newline='') as f:
r = csv.DictReader(f)
for i, row in enumerate(r):
try:
data.append({
"x": float(row.get("x", 0.0)),
"y": float(row.get("y", 0.0)),
"z": float(row.get("z", 0.0)),
"x_quat": float(row.get("x_quat", 0.0)),
"y_quat": float(row.get("y_quat", 0.0)),
"z_quat": float(row.get("z_quat", 0.0)),
"w_quat": float(row.get("w_quat", 1.0)),
})
except Exception as ex:
self.get_logger().warn(f"Skip bad row {i}: {ex}")
if not data:
self.get_logger().warn(f"raceline CSV empty: {path}")
return None
self.get_logger().info(f"Loaded raceline: {len(data)} points from {path}")
return data
except Exception as e:
self.get_logger().error(f"Failed to load raceline CSV: {e}")
return None
def _nearest_on_raceline(self, x, y):
best_i = 0
best_d2 = float('inf')
rl_best = None
for i, rl in enumerate(self.raceline):
dx = rl["x"] - x
dy = rl["y"] - y
d2 = dx*dx + dy*dy
if d2 < best_d2:
best_d2 = d2
best_i = i
rl_best = rl
return best_i, rl_best
def _close(self):
if self.closed:
return
self.closed = True
try:
if hasattr(self, 'timer') and self.timer is not None:
self.timer.cancel()
except Exception:
pass
try:
if hasattr(self, 'f') and not self.f.closed:
self.f.flush()
self.f.close()
except Exception:
pass
def main():
rclpy.init()
n = XYLogger()
try:
rclpy.spin(n)
finally:
n._close()
try:
rclpy.shutdown()
except Exception:
pass
if __name__ == '__main__':
main()
その他setup.pyやpackage.xmlの作成も必要になるが、そちらに関しては次項目のrepository参照
使い方
上記のRepositoryをAIチャレンジフォルダの以下のディレクトリにgit cloneする。
aichallenge
└-workspace
└-src
└-xy_logger←ここ
その後dockerを起動し、docker内で./build_autoware.bashコマンドをterminalに打ち込み、ビルドを行った上でAWSIMを起動させるとXY_Loggerが起動、走行終了後aichallgneフォルダ内のoutputフォルダ内にてCSVファイルが出力されることが確認できる