はじめに
7/2 ~ 11/9の間、自動運転AIチャレンジ2024に参加させていただいておりました。
本チャレンジの予選において、 Frenet座標系(SL座標系) を用いた障害物回避プランナーを実装したので、まとめておきます。
なおこのプランナーは、実車での動作未確認ですのでご注意ください。
背景
既存プランナーでは障害物をうまく回避できなかった ため、自作実装しました。
当初は、実証実験等で揉まれて完成度が高まっている既存プランナーbehavior_path_planner
を利用する予定でした。自作実装を避けたかった理由は、ゼロから設計・実装をやり切る工数が不足していること、実車での運用を考慮すると些細なバグや処理速度が後々の問題となる可能性が高いことなどがありました。
しかし、予選を進める中で、既存プランナーでは一部の障害物を回避できないことが明らかになりました。以降、少し具体的に説明します。
まずbehavior_path_planner
は、以下のように目標軌跡から見て障害物と 反対側 に回避軌跡を生成します(間違っていたらごめんなさい!)。これは、道路中央を走行するようなユースケース(例:公道走行)では問題なく機能しますが、レースのようにコース端を攻める走行では壁とのマージンが足りず、正しい回避軌跡が生成されないことがあります。その結果、障害物に衝突するケースがありました。
該当するソースコードは以下の部分あたりと思われます。
以下を参考にbehavior_path_planner
のソースコードをカスタマイズする方法もありましたが、予選時点では気づけませんでした。
以上より、やむを得ず自作の回避プランナーの実装を行いました。
Frenet座標系とは
回避プランナー実装は様々ありますが、今回は Frenet座標系 で実装を行いました。
Frenet座標系の詳細については、以下の記事が詳しいので省略します。
直交座標系(いわゆるXY座標系)との比較したときに、 計算量は増えるものの、道路形状に応じた滑らかな軌跡が生成できる と考えたため、今回はFrenet座標系を採用しました。
他比較は以下です。一部、筆者の主観が入っています。
項目 | Frenet座標系 | 直交座標系 |
---|---|---|
基本構造 | 道路中心線に沿った進行方向(S軸)と横方向(L軸)で構成 | X軸とY軸の2次元直交座標系 |
障害物回避 | 道路中心線に沿った進行方向と横方向を基に簡単に調整できる | 障害物の位置調整が複雑化しやすい |
計算の複雑さ | Frenet座標系への変換と逆変換が必要で、計算がやや複雑 | 変換が不要で、計算が比較的簡単 |
直感的な理解 | 道路に沿った進行方向の表現が直感的だが、座標変換が必要 | 直感的で広く一般的に使われる表現 |
ソースコード
csv_to_trajectory_node
が出力する/planning/scenario_planning/trajectory_csv
を上書き修正する形で回避プランナーを実現しました。
良い設計とは言えません(と言うかまともに設計されていません)ので、もしこのコードを活用する場合は、必要に応じて改良を加えてください。また、Pythonで効率性を考慮せずに書いているため、実車での処理速度を懸念される場合は、C++で再実装するなどの対策を取ることをおすすめします。
処理は大まかに言うと3ステップで、①XY座標系→SL座標系の変換、②SL座標系で回避軌跡の生成、③SL座標系→XY座標系の逆変換を行っています。
import rclpy
from rclpy.node import Node
from autoware_auto_perception_msgs.msg import PredictedObjects
from tier4_planning_msgs.msg import AvoidanceDebugMsgArray
from autoware_auto_planning_msgs.msg import Trajectory, TrajectoryPoint
from geometry_msgs.msg import PoseWithCovarianceStamped
from autoware_auto_planning_msgs.msg import PathWithLaneId
from std_msgs.msg import Int32
import math
import numpy as np
from scipy.interpolate import CubicSpline
class AvoidancePlanner(Node):
def __init__(self):
super().__init__('avoidance_planner_node')
self.objects = None
self.avoidance_info = None
self.trajectory_csv = None
self.trajectory_autoware = None
self.current_pose = None
self.previous_pose = None
self.path = None
self.goal_status = None
self.subscription_objects = self.create_subscription(
PredictedObjects, '/perception/object_recognition/objects',
self.listener_callback_objects, 1)
self.subscription_avoidance = self.create_subscription(
AvoidanceDebugMsgArray, '/planning/scenario_planning/lane_driving/behavior_planning/behavior_path_planner/debug/avoidance_debug_message_array',
self.listener_callback_avoidance, 1)
self.subscription_trajectory_csv = self.create_subscription(
Trajectory, '/planning/scenario_planning/trajectory_csv',
self.listener_callback_trajectory_csv, 1)
self.subscription_pose = self.create_subscription(
PoseWithCovarianceStamped, '/sensing/gnss/pose_with_covariance',
self.listener_callback_pose, 1)
self.subscription_path = self.create_subscription(
PathWithLaneId, '/planning/scenario_planning/lane_driving/behavior_planning/path_with_lane_id',
self.listener_callback_path, 1)
self.subscription_goal_status = self.create_subscription(
Int32, '/planning/mission_planning/goal_status_cc',
self.listener_callback_goal_status, 1)
self.subscription_trajectory_autoware = self.create_subscription(
Trajectory, '/planning/scenario_planning/trajectory_autoware',
self.listener_callback_trajectory_autoware, 1)
self.publisher = self.create_publisher(
Trajectory, '/planning/scenario_planning/trajectory', 1)
def listener_callback_objects(self, msg):
self.objects = msg.objects
def listener_callback_avoidance(self, msg):
self.avoidance_info = msg.avoidance_info
def listener_callback_pose(self, msg):
self.previous_pose = self.current_pose
self.current_pose = msg.pose.pose
def listener_callback_path(self, msg):
self.path = msg
def listener_callback_goal_status(self, msg):
self.goal_status = msg
def listener_callback_trajectory_autoware(self, msg):
self.trajectory_autoware = msg
def listener_callback_trajectory_csv(self, msg):
self.trajectory_csv = msg
changed_msg = msg
if self.goal_status is not None and self.goal_status.data == 1:
self.publisher.publish(self.trajectory_autoware)
return
# 前方の最近接オブジェクトを見つける
target_distance = 15.0
closest_object = None
min_distance = float('inf')
if self.objects and self.current_pose:
for obj in self.objects:
obj_pos = obj.kinematics.initial_pose_with_covariance.pose.position
distance = self.calculate_distance(self.current_pose.position, (obj_pos.x, obj_pos.y))
# 前方のオブジェクトのみを考慮
if distance <= target_distance and distance < min_distance:
# # オブジェクトが自車の前方にあるか確認
# if self.is_object_in_front(self.previous_pose, self.current_pose, (obj_pos.x, obj_pos.y)):
# min_distance = distance
# closest_object = obj
min_distance = distance
closest_object = obj
# 最近接オブジェクトが見つかった場合、トラジェクトリを修正
if closest_object:
obj_pos = closest_object.kinematics.initial_pose_with_covariance.pose.position
changed_msg = self.modify_trajectory_in_sl(changed_msg, (obj_pos.x, obj_pos.y))
# 修正したトラジェクトリをパブリッシュ
self.publisher.publish(changed_msg)
def is_object_in_front(self, previous_pose, current_pose, object_position):
""" 前方の物体を判別 """
# 前回位置との差分ベクトルを計算
direction_vector = (
current_pose.position.x - previous_pose.position.x,
current_pose.position.y - previous_pose.position.y
)
# 自車からオブジェクトへのベクトル
to_object_vector = (
object_position[0] - current_pose.position.x,
object_position[1] - current_pose.position.y
)
# 内積を計算
dot_product = direction_vector[0] * to_object_vector[0] + direction_vector[1] * to_object_vector[1]
# 内積が正かつオブジェクトが自車の前方にある場合
return dot_product > 0
def remove_points_behind_vehicle(self, points):
""" 自車の現在位置より後方にあるトラジェクトリポイントを削除 """
if self.current_pose is None:
return points # 現在の位置がまだ取得されていない場合は、全てのポイントを残す
# 自車の現在位置に最も近いトラジェクトリポイントを見つける
min_distance = float('inf')
closest_index = 0
for i, point in enumerate(points):
distance = self.calculate_distance(point.pose.position, (self.current_pose.position.x, self.current_pose.position.y))
if distance < min_distance:
min_distance = distance
closest_index = i
# 最も近いポイント以降を残す
return points[closest_index:]
def modify_trajectory_in_sl(self, trajectory, object_position):
""" SL座標系で回避軌跡を生成する """
# SL座標に変換
trajectory_sl, object_sl = self.convert_xy_to_sl(trajectory.points, object_position)
# 回避方向を物体と左右道路境界から決定
# 物体と左右道路境界の最小距離を計算
distance_left = [self.calculate_distance(point, object_position) for point in self.path.left_bound[:20]]
distance_right = [self.calculate_distance(point, object_position) for point in self.path.right_bound[:20]]
min_distance_left = min(distance_left)
min_distance_right = min(distance_right)
# 回避方向を決定
avoidance_direction = np.sign(min_distance_left - min_distance_right)
# 回避オブジェクトに合わせてL座標を修正
for i, (s, _) in enumerate(trajectory_sl):
if self.is_within_influence_radius(s, object_sl) and (abs(object_sl[1]) < 5):
# 物体の真横に向かってなめらかにL座標をシフト
avoidance_range = 5.5
# 物体から遠い方の道路境界からXm内側を最大シフト量とする
X = 1.7
max_boundary_distance = max(min_distance_left, min_distance_right) - abs(object_sl[1])
avoidance_shift = max_boundary_distance - X
distance_to_object = abs(s - object_sl[0])
max_shift = avoidance_direction * avoidance_shift
shift_factor = max(0, min(1, (avoidance_range - distance_to_object) / avoidance_range))
trajectory_sl[i] = (s, max_shift * shift_factor)
# XY座標に戻す
modified_trajectory_points = self.convert_sl_to_xy(trajectory_sl, trajectory.points)
# 移動平均でスムージング
window_size = 3
smoothed_x = np.convolve([p[0] for p in modified_trajectory_points], np.ones(window_size)/window_size, mode='valid')
smoothed_y = np.convolve([p[1] for p in modified_trajectory_points], np.ones(window_size)/window_size, mode='valid')
# 始点と終点を元のものに戻す
smoothed_x = np.concatenate(([modified_trajectory_points[0][0]], smoothed_x, [modified_trajectory_points[-1][0]]))
smoothed_y = np.concatenate(([modified_trajectory_points[0][1]], smoothed_y, [modified_trajectory_points[-1][1]]))
modified_trajectory_points = list(zip(smoothed_x, smoothed_y))
# 新しいトラジェクトリを作成
modified_trajectory = Trajectory()
modified_trajectory.header = trajectory.header
for (x, y), original_point in zip(modified_trajectory_points, trajectory.points):
new_point = TrajectoryPoint()
new_point.pose.position.x = float(x)
new_point.pose.position.y = float(y)
new_point.pose.position.z = original_point.pose.position.z
new_point.longitudinal_velocity_mps = original_point.longitudinal_velocity_mps
modified_trajectory.points.append(new_point)
return modified_trajectory
def convert_xy_to_sl(self, points, obj_pos):
""" XY座標をSL座標に変換 """
xs = [p.pose.position.x for p in points]
ys = [p.pose.position.y for p in points]
# 曲線のパラメトリック表現
s = [0] # S座標
for i in range(1, len(xs)):
dx = xs[i] - xs[i - 1]
dy = ys[i] - ys[i - 1]
ds = math.sqrt(dx ** 2 + dy ** 2)
s.append(s[-1] + ds)
# オブジェクトに最も近いトラジェクトリポイントを見つける
min_distance = float('inf')
closest_index = 0
for i, (x, y) in enumerate(zip(xs, ys)):
distance = math.sqrt((x - obj_pos[0])**2 + (y - obj_pos[1])**2)
if distance < min_distance:
min_distance = distance
closest_index = i
# 最も近い点の座標を取得
closest_x = xs[closest_index]
closest_y = ys[closest_index]
# オブジェクトのSL座標を計算
obj_s = s[closest_index]
obj_l = -min_distance if (closest_x - obj_pos[0]) * (ys[closest_index] - closest_y) - (closest_y - obj_pos[1]) * (xs[closest_index] - closest_x) > 0 else min_distance
sl_points = [(s_i, 0) for s_i in s]
return sl_points, (obj_s, obj_l)
def convert_sl_to_xy(self, sl_points, reference_points):
""" SL座標をXY座標に変換 """
xs = [p.pose.position.x for p in reference_points]
ys = [p.pose.position.y for p in reference_points]
# 曲線のパラメトリック表現を取得
s = [0] # S座標
for i in range(1, len(xs)):
dx = xs[i] - xs[i - 1]
dy = ys[i] - ys[i - 1]
ds = math.sqrt(dx ** 2 + dy ** 2)
s.append(s[-1] + ds)
# スプライン補間を使用して曲線を定義
cs_x = CubicSpline(s, xs, bc_type='clamped')
cs_y = CubicSpline(s, ys, bc_type='clamped')
xy_points = []
for s_i, l in sl_points:
# 曲線上の位置を計算
x = cs_x(s_i)
y = cs_y(s_i)
# 接線ベクトルを計算
epsilon = 1e-6
dx = cs_x(s_i + epsilon) - cs_x(s_i - epsilon)
dy = cs_y(s_i + epsilon) - cs_y(s_i - epsilon)
# 法線ベクトルを計算 (接線ベクトルを90度回転)
nx = -dy
ny = dx
# 法線ベクトルを正規化
norm = math.sqrt(nx ** 2 + ny ** 2)
if norm != 0:
nx /= norm
ny /= norm
# 法線方向にlだけ移動
x += l * nx
y += l * ny
xy_points.append((x, y))
return xy_points
def is_within_influence_radius(self, s, object_position):
"""S座標がオブジェクトの影響範囲内か確認"""
# シンプルなチェック、実際のオブジェクト位置に基づいて精査
return abs(s - object_position[0]) < 10.0
def calculate_distance(self, point, object_position):
"""ユークリッド距離を計算"""
return math.sqrt((point.x - object_position[0]) ** 2 + (point.y - object_position[1]) ** 2)
def main(args=None):
rclpy.init(args=args)
node = AvoidancePlanner()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()
結果
以下のような回避軌跡が生成されます。
障害物に対して回避軌跡が生成できるようになりました。
※ 軌跡がバタついているのは回避対象の絞り込みが甘いためです、要改善ポイントです
おわりに
Autowareは主に公道での一般的な走行ユースケースを想定して設計されていると思います。そのため、今回のようなレース走行ユースケースでは想定外の挙動がどうしても発生することがあります。本取り組みが、今後のAIチャレンジ参加者の参考になれば…と思い記事として残しておきます。
なお、Autoware Practiceのプランナーを改良する手もあったなあ…と、記事をまとめていて思いました。本記事を読んだ方はぜひチャレンジしてみてください。
この記事は、AI文章校正ツール「ちゅらいと」で校正されています。