はじめに
本記事は【Python】飛行機の運動をシミュレートするプログラムをつくる その5 「SAC」を詳しくまとめ直した記事になります。
【Python】飛行機の運動をシミュレートするプログラムをつくる という記事を連載していたのですが、最初は丁寧に説明していたものの、後半に行くに従いだんだん重複する部分の説明がなくなり、わかりづらくなっていたため改めて1から書き直しました。
連続値の状態空間を扱え、連続値で制御できるSACを使用したため、一通りのベースは完成したかなと考えています。
これまでの記事は以下になります。
概要
本プログラムは飛行機をSACにより連続値制御で操縦することを目指したプログラムです。
飛行のモデリングは『航空機力学入門 (加藤寛一郎・大屋昭男・柄沢研治 著 東京大学出版会)』の3章「微小擾乱の運動方程式」をベースに作成しています。
そしてSACの実装は東京大学松尾研究室の「サマースクール2024 深層強化学習」をベースにしております。
コードはgoogle colabで実行可能な形でアップロードされています。
準備
#Jupyter notebook上では以下を実行
#!pip install gym==0.25.2
#!pip install --upgrade ipywidgets
#!pip install widgetsnbextension
#!jupyter nbextension enable --py widgetsnbextension --sys-prefix
#!pip install imageio-ffmpeg
# 必要なライブラリのインポート.
import os
import glob
from collections import deque
from datetime import timedelta
import pickle
from base64 import b64encode
import math
import numpy as np
import torch
from torch import nn
from torch.distributions import Normal
import torch.nn.functional as F
import gym
import matplotlib.pyplot as plt
from IPython.display import HTML
# Gymの警告を一部無視する.
gym.logger.set_level(40)
# matplotlibをColab上で描画するためのコマンド.
# %matplotlib inline
飛行モデル
参考資料
飛行のモデリングは『航空機力学入門 (加藤寛一郎・大屋昭男・柄沢研治 著 東京大学出版会)』の3章「微小擾乱の運動方程式」をベースに作成しています。
参考サイト
変数の説明
記述のルール
大文字:機体固定座標系における値
小文字:変化量 ex) $U = U_0 + u$
下付き文字0:つり合い状態での値
空気力
$X, Y, Z$:機体の重心にかかる$x,y,z$方向の外力
$L, M, N$:機体の重心周りの$x,y,z$方向の外力によるモーメント
安定微係数
空気力の変数に下付き文字が付いたもの:その変数を下付き文字で偏微分した値を、全機質量や慣性能率で除した値
ex)
$X_u = \frac{1}{m} \frac{\partial X}{\partial u}$
$L_p = \frac{1}{I_{xx}} \frac{\partial L}{\partial p}$
状態量
$U, V, W$:$x,y,z$方向の速度
$P, Q, R$:$x,y,z$軸周りの角速度
$\Phi, \Theta, \Psi$:$x,y,z$軸周りの角度(ロール角, ピッチ角, ヨー角)
制御量
$\delta_a, \delta_e, \delta_r$:エルロン, エレベータ, ラダーの舵角
$\delta_t$:スロットル位置
その他
$g$:重力加速度
$\alpha, \beta$:迎え角, 横滑り角
微小擾乱の運動方程式(『航空機力学入門』3.4節)
飛行機の運動は縦の運動方程式と横・方向の運動方程式の二つに分けることができる。
そして、線形化された微小擾乱の運動方程式は、
状態量$\mathbf{x}(t)$
と
制御量$\mathbf{u}(t)$を用いて
微分方程式
$\dot{\mathbf{x}}=\mathbf{A}\mathbf{x}+\mathbf{B}\mathbf{u}$
の形で表すことができる
縦の運動方程式
\begin{bmatrix} \dot{u} \\ \dot{\alpha} \\ \dot{q} \\ \dot{\theta} \end{bmatrix}
=
\begin{bmatrix}X_{u} & X_{\alpha} & -W_0 & -g\rm{cos}\theta_0 \\
Z_{u}/U_{0} & Z_{\alpha}/U_{0} & 1+Z_{q}/U_{0} & (g/U_0)\rm{sin}\theta_0 \\
M_{u}+M_{\dot{\alpha}}(Z_{u}/U_{0}) & M_{\alpha}+ M_{\dot{\alpha}}(Z_{\alpha}/U_{0}) & M_{q}+M_{\dot{\alpha}}(1+Z_{q}/U_{0}) & (M_{\dot{\alpha}}g/U_0)\rm{sin}\theta_0 \\
0 & 0 & 1 & 0 \end{bmatrix}
\begin{bmatrix} u \\ \alpha \\ q \\ \theta \\ \end{bmatrix}
+
\begin{bmatrix}
0 & X_{\delta_{t}} \\
Z_{\delta_{t}}/U_{0} & Z_{\delta_{t}}/U_{0} \\
M_{\delta_{e}}+M_{\dot{\alpha}} (Z_{\delta_{e}}/U_{0}) & M_{\delta_{t}}+M_{\dot{\alpha}} (Z_{\delta_{t}}/U_{0}) \\
0 & 0 \end{bmatrix}
\begin{bmatrix}\delta_e \\ \delta_t \end{bmatrix}
横・方向の運動方程式
\begin{bmatrix} \dot{\beta} \\ \dot{p} \\ \dot{r} \\ \dot{\phi} \\ \dot{\psi} \end{bmatrix}
=
\begin{bmatrix}
Y_{\beta}/U_{0} &(W_0+Y_{p})/U_{0} & Y_{r}/U_{0}-1 & g\rm{cos}\theta_0/U_{0} & 0 \\
L'_{\beta} & L'_{p} & L'_{r} & 0 & 0 \\
N'_{\beta} & N'_{p} & N'_{r} & 0 & 0 \\
0 & 1 & \rm{tan}\theta_0 & 0 & 0 \\
0 & 0 & 1/\rm{cos}\theta_0 & 0 & 0
\end{bmatrix}
\begin{bmatrix}\beta \\ p \\ r \\ \phi \\ \psi \end{bmatrix}
+
\begin{bmatrix}
0 & Y_{\delta_{r}}/U_{0} \\
L'_{\delta_{a}} & L'_{\delta_{r}}\\
N'_{\delta_{a}} & N'_{\delta_{r}}\\
0 & 0 \\
0 & 0
\end{bmatrix}
\begin{bmatrix}\delta_{a} \\ \delta_{r} \end{bmatrix}
微分方程式の数値解法
この微分方程式をルンゲ=クッタ法を用いて数値計算を行う。
ルンゲ=クッタ法については詳しく解説しない
scipy.integrate.solve_ivp
関数を使用し、RK45
、つまり4次精度を保証して5次の項を見て自動で時間幅を調整してくれる関数を使用する。(実質matlab等のode45と思われる。)
この精度が高くなるほど計算時間が長くなるため、精度と速度のトレードオフを意識する必要がある。
機体固定座標系から地表座標系への座標変換
これまではすべて機体固定座標系において運動方程式を解いてきた。
しかし、描写する際には地面に固定された地表座標系の方がわかりやすいため、変換をする。
座標変換
飛行機の運動に追従して時々刻々と変化する機体固定座標系と、
地面に張り付いた時間変化しない地表座標系の2種類の座標系がある。
『航空機力学入門』1.5節では、地表座標系から、
Z0軸周りに$\Psi$、
Y1軸周りに$\Theta$、
X2軸周りに$\Phi$だけ回転させて機体座標系に変換している。
わかりやすく説明すると、まずヨー方向に回転させ、次にピッチ、最後にロールをしている。
画像出典:
https://www.sky-engin.jp/blog/eulerian-angles/
Z0軸周りに$\Psi$ | Y1軸周りに$\Theta$ | X2軸周りに$\Phi$ |
---|---|---|
各軸の回転行列を考えていく。
ここで、回転行列を新たな座標軸にかけると元の座標軸に戻るものとする。
\begin{align}
\mathbf{R}
&=
\underbrace{\begin{bmatrix} \cos{\psi} & -\sin{\psi} & 0 \\ \sin{\psi} & \cos{\psi} & 0 \\ 0 & 0 & 1 \end{bmatrix}}_{z軸周り回転}
\underbrace{\begin{bmatrix} \cos{\theta} & 0 & \sin{\theta} \\ 0 & 1 & 0 \\ -\sin{\theta} & 0 & \cos{\theta} \end{bmatrix}}_{y軸周り回転}
\underbrace{\begin{bmatrix} 1 & 0 & 0 \\ 0 & \cos{\phi} & -\sin{\phi} \\ 0 & \sin{\phi} & \cos{\phi} \end{bmatrix}}_{x軸周り回転} \\
&=
\begin{bmatrix}
c_\psi c_\theta & c_\psi s_\theta s_\phi – c_\phi s_\psi & s_\psi s_\phi + c_\psi c_\phi s_\theta \\
c_\theta s_\psi & c_\psi c_\phi + s_\psi s_\theta s_\phi & c_\phi s_\psi s_\theta – c_\psi s_\phi \\
-s_\theta & c_\theta s_\phi & c_\theta c_\phi
\end{bmatrix}
\end{align}
とすると、機体座標系での位置(添え字b)と全体座標系での位置(添え字e)とすると、以下の関係があり、その時間微分である速度についても同様の関係が成り立つ
\begin{bmatrix} x_e \\ y_e \\ z_e \end{bmatrix}
=
\begin{bmatrix}
c_\psi c_\theta & c_\psi s_\theta s_\phi – c_\phi s_\psi & s_\psi s_\phi + c_\psi c_\phi s_\theta \\
c_\theta s_\psi & c_\psi c_\phi + s_\psi s_\theta s_\phi & c_\phi s_\psi s_\theta – c_\psi s_\phi \\
-s_\theta & c_\theta s_\phi & c_\theta c_\phi
\end{bmatrix}
\begin{bmatrix} x_b \\ y_b \\ z_b \end{bmatrix}
機体座標系
今まで使用してきた運動方程式は任意の機体座標系で使用できるものであった。
『航空機力学入門』1.4節において、機体座標系の取り方は3つ紹介されている。
- 慣性主軸:X軸を機体の慣性主軸に一致することで角運動量の表現が簡単になる
- 安定軸:X軸を定常つり合い飛行時の機体速度ベクトルと一致させる、つまりうだけが速度を持ち、v、wが0
- 機体軸:機体の基準線に一致させる
今回は、飛行経路をプロットする際にコーディングしやすいように機体軸を採用する
この場合、各速度の成分は、迎角αと横滑り角βの定義から以下のように計算できる
$
U=U_{0}+u \
$
$
V=V_{c}\sin{\beta}\approx U\sin{\beta} \
$
$
W = U\tan{\alpha}
$
飛行機の描写
飛行機の翼端灯に習い、右を緑、左を赤色に塗った紙飛行機を追加する。
翼2枚、胴体2枚の合計4つの三角形のサーフェスを作り、座標変換をして回転し、その後重心位置を平行移動させることで3次元空間上にプロットしている。
プログラム
以上の内容を微分方程式を数値的に解くところ以外をPythonでコードにしていく。
今回の機体については、『航空機力学入門』4.4節を参考に、ロッキードP2V-7の値を使用こととする。
引用:https://ja.wikipedia.org/wiki/P-2_(%E8%88%AA%E7%A9%BA%E6%A9%9F)
import numpy as np
from scipy.integrate import odeint, solve_ivp
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import time
import mpl_toolkits.mplot3d.art3d as art3d
from IPython.display import HTML
import matplotlib.animation as animation
from tqdm.notebook import tqdm
# --- 回転行列の定義 ---
def rotation_matrix(psi, theta, phi):
# z軸周り回転
R_z = np.array([[np.cos(psi), -np.sin(psi), 0],
[np.sin(psi), np.cos(psi), 0],
[0, 0, 1]])
# y軸周り回転
R_y = np.array([[np.cos(theta), 0, np.sin(theta)],
[0, 1, 0],
[-np.sin(theta), 0, np.cos(theta)]])
# x軸周り回転
R_x = np.array([[1, 0, 0],
[0, np.cos(phi), -np.sin(phi)],
[0, np.sin(phi), np.cos(phi)]])
# 全体の回転行列
R = R_z @ R_y @ R_x
return R
#全体座標系における速度ベクトルの取得
def get_global_velosity(U0, u, alpha, beta, psi, theta, phi):
# 機体座標系での速度ベクトル
u_b = U0 + u
v_b = u_b * np.sin(beta)
w_b = u_b * np.tan(alpha)
vel_b = np.array([u_b, v_b, w_b])
# 全体座標系での速度ベクトル
vel_e = rotation_matrix(psi, theta, phi) @ np.atleast_2d(vel_b).T
vel_e = vel_e.flatten()
return vel_e
#紙飛行機の描写関数
def plot_paper_airplane(x, y, z, phi, theta, psi, scale, ax):
"""
3次元座標と角度が与えられたとき、その状態の紙飛行機のような図形をプロットする
Args:
x: x座標
y: y座標
z: z座標
psi: ロール角 (ラジアン)
theta : ピッチ角 (ラジアン)
phi: ヨー角 (ラジアン)
機体の大きさをいじりたければscaleを変える
"""
#三角形を描く
poly_left_wing = scale * np.array([[2, 0.0, 0],
[-1, 1, 0],
[-1, 0.1, 0]])
poly_right_wing = poly_left_wing.copy()
poly_right_wing[:,1] = -1 * poly_right_wing[:,1]
poly_left_body = scale * np.array([[2, 0.0, 0.0],
[-1, 0.0, +0.1],
[-1, 0.1, 0.0]])
poly_right_body = poly_left_body.copy()
poly_right_body[:,1] = -1 * poly_right_body[:,1]
R = rotation_matrix(psi, theta, phi) # yaw, pitch, roll
for points in [poly_left_wing, poly_left_body, ]:
# 紙飛行機の点を回転
translated_rotated_points = (R @ points.T).T + np.array([x, y, z])
#描写
ax.add_collection3d(art3d.Poly3DCollection([translated_rotated_points],facecolors='orangered', linewidths=1, edgecolors='k', alpha=0.6))
for points in [poly_right_wing, poly_right_body]:
# 紙飛行機の点を回転
translated_rotated_points = (R @ points.T).T + np.array([x, y, z])
#描写
ax.add_collection3d(art3d.Poly3DCollection([translated_rotated_points],facecolors='lime', linewidths=1, edgecolors='k', alpha=0.6))
# --- 機体パラメータ ---
g = 9.81
# 初期条件
U0 = 293.8
W0 = 0
theta0 = 0.0
# 縦の有次元安定微係数: ロッキードP2V-7
Xu = -0.0215
Zu = -0.227
Mu = 0
Xa = 14.7
Za = -236
Ma = -3.78
Madot = -0.28
Xq = 0
Zq = -5.76
Mq = -0.992
X_deltat = 1.0 #10sで10m/s加速か #0.01 #0
Z_deltae = -12.9
Z_deltat = 0
M_deltae = -2.48
M_deltat = 0
# 横・方向の有次元安定微係数: ロッキードP2V-7
Yb=-45.4
Lb=-1.71
Nb=0.986
Yp=0.716
Lp=-0.962
Np=-0.0632
Yr=2.66
Lr=0.271
Nr=-0.215
#Y_deltaa=0
L_deltaa=1.72
N_deltaa=-0.0436
Y_deltar=9.17
L_deltar=0.244
N_deltar=-0.666
# --- 縦の運動方程式 ---
# 行列A, Bの定義
A_long = np.array([[Xu, Xa, -W0, -g * np.cos(theta0)],
[Zu / U0, Za / U0, 1 + Zq / U0, (g / U0) * np.sin(theta0)],
[Mu + Madot * (Zu / U0), Ma + Madot * (Za / U0), Mq + Madot * (1 + Zq / U0),
(Madot * g / U0) * np.sin(theta0)],
[0, 0, 1, 0]])
B_long = np.array([[0, X_deltat],
[Z_deltae / U0, Z_deltat / U0],
[M_deltae + Madot * (Z_deltae / U0), M_deltat + Madot * (Z_deltat / U0)],
[0, 0]])
# --- 横・方向の運動方程式 ---
# 行列A, Bの定義
A_lat = np.array([[Yb / U0, (W0 + Yp) / U0, (Yr / U0 - 1), g * np.cos(theta0) / U0, 0],
[Lb, Lp, Lr, 0, 0],
[Nb, Np, Nr, 0, 0],
[0, 1, np.tan(theta0), 0, 0],
[0, 0, 1 / np.cos(theta0), 0, 0]])
B_lat = np.array([[0, Y_deltar / U0],
[L_deltaa, L_deltar],
[N_deltaa, N_deltar],
[0, 0],
[0, 0]])
# --- 統合モデル ---
def aircraft_dynamics(t, x, u_inp):
u_long = u_inp[:2]
u_lat = u_inp[2:]
# 縦の運動方程式
dxdt_long = A_long @ np.atleast_2d(x[:4]).T + B_long @ np.atleast_2d(u_long).T
dxdt_long = dxdt_long.flatten()
# 横・方向の運動方程式
dxdt_lat = A_lat @ np.atleast_2d(x[4:9]).T + B_lat @ np.atleast_2d(u_lat).T
dxdt_lat = dxdt_lat.flatten()
# 機体座標系から全体座標系での速度ベクトルへの変換(速度を微分して位置にするため)
vel_e = get_global_velosity(U0=U0, u=x[0], alpha=x[1], beta=x[4], psi=x[8], theta=x[3], phi=x[7])
# 縦と横・方向の状態量の変化と位置の変化を結合
dxdt = np.concatenate((dxdt_long, dxdt_lat, vel_e))
return dxdt
OpenAIのgym形式での飛行モデルの作成
強化学習ではOpenAIが作成したライブラリであるgymと互換性があるように環境をモデル化することで、過去に作られた豊富な強化学習プログラムがそのまま使えるようになります。
そのため、今回は今まで作成したシミュレーターをgym形式に変更するところから始めていきます。
そして、最後に自前環境の登録することで呼び出せるようにしています。
参考サイト:
init
時間
時間幅dtを決めています。
dtが短ければ細かい入力ができますが、1エピソードの実行時間が長くなります。
逆に大きくすると実行時間は短くなり学習は早くなりますが、ギザギザした入力になってしまいます。
また、シミュレーション時間は一律100sにするために、max_episode_steps
を計算しています。これはもともと互換性を持たせるためにenv.spec.max_episode_steps
とアクセスできるようにしたかったのですが、できなかったためenv.max_episode_steps
というようにアクセスするようにしています。
行動
self.action_space
では、行動の入力としてエルロン, エレベータ, ラダーの舵角(-1 rad ~ +1 rad)、スロットル位置(-1~1)の4つとなるように設定しています。
状態
self.observation_space
では、状態の出力として、
回転角:$\phi, \theta, \psi$
角速度:$p, q, r$
地表座標系の位置:$x_{ground}, y_{ground}, z_{ground}$
x方向速度:$u$
迎え角, 横滑り角:$\alpha, \beta$
及び、上記の時間微分の計24この変数です。
x方向速度と迎え角, 横滑り角からy,z方向速度が計算できるため、この3つの変数は実質的に速度$u,v,w$です。
以上より、実質的に状態の変数は、位置、速度、加速度、姿勢角度、姿勢角速度、姿勢角加速度を与えています。
回転角の時間微分が角速度なので重複した変数が存在しますが、実装の容易性より無視しています。
変数の範囲はそれぞれ違うので、(-inf~+inf)としています。
reset
変数を初期化をしています。
時間は0 sに、
各状態量はすべて0となるように設定しています。
step
ここでは、行動であるaction
を受け取って1ステップだけ微分方程式を解いています。
今回のゴールとして、できるだけ高度を変えずに180°旋回することを目指しています。
飛行機の速度ベクトルと目標となる真後ろのベクトルである[-1,0,0]とのコサイン類似度を使用してどれくらい旋回したかを判定しています。
コサイン類似度とは、ベクトル間の角度が$\theta$の時、$cos\theta$の値であり、[-1, 1]の範囲であり、1に近づくほど同じ方向を向いていると言えます。
さらに、高度をあまり変えてほしくないため、高度変化に応じて報酬を減らすようにもしています。
本当はバンク角が大きくなりすぎないようにも設定したのですが、あまりうまく学習できなかったので今回は入れていません。
終了判定
終了判定として
- コサイン類似度が0.95以上
- 高度変化が100 ft以下
としています。
また、ゲームオーバーの判定として、
- 飛行時間が
max_episode_time
を超える
としています。
info
デバック用にこのステップを実行するのにかかった時間、報酬の内訳を出力しています。
render
各フレームの紙飛行機を描写し、斜めからと3方向からの合計4方向からの描写をnumpyのarray形式で出力しています。
飛行機の飛行が1方向からの描写だとわかりづらく、4方向からにしました。
renderを使うことで、後から動画を作ることができます。
報酬の計算
今回報酬には以下の3種類の報酬を与えています。
- 毎ステップ与える即時報酬
- 旋回に応じた一度だけの段階的報酬
- 終了時の報酬
即時報酬
即時報酬では、
- 高度変化によるペナルティ
- バンク角によるペナルティ(今回は複雑なので使用せず)
- 進行方向と目標とのコサイン類似度に応じた報酬
としています。
上記はステップごとに行われるため、時間幅dtに依存すること(例えばdt=0.1だと1秒間に10回報酬がもらえるが、dt=1だと1回しかもらえない)を避けるためにdtを掛け算して同じ時間当たりの報酬の変化を一定にしています。
また、コサイン類似度による報酬は、もともと[-1, 1]の範囲で与えていました。しかし、角度が90°以下になると報酬が0以上になるため、ゴールせずにずっと旋回が90°以上180°未満を維持した方が報酬をもらい続けられ、学習がうまくいかなかったため[-2, 0]の範囲に変更しています。
さらに、あとの最初にコサイン類似度が低くても問題ないですが、時間がたってもまだコサイン類似度が低い場合によりペナルティを与えるために、時間に応じてスケールが大きくなるようにしています。
段階的報酬
段階的報酬では、旋回が45°、90°、135°、180°できた際にかかった時間が短いほど大きな報酬を与えるようにしています。
これを設定することで、うまく旋回するようになりました。
終了時の報酬
目標を達成したら時間が短いほど大きな報酬を与えるようにしています。
他の報酬よりも大きな報酬を与えることで、よりゴールに向かって学習が進むようにしています。
seed
下記サイトを参考にシード値を固定してみるが、サンプリングが再現できないので原因を探し中
プログラム
import gym
class FlightSimulatorEnv(gym.Env):
metadata = {'render.modes': ['rgb_array']}
def __init__(self):
self.dt = 0.2 #[s]
self.max_episode_time = 50 #[s]
self.max_episode_steps = int(self.max_episode_time/self.dt)
self.action_space = gym.spaces.Box(low=-1, high=1, shape=(4,), dtype=np.float32)
self.observation_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=(24,), dtype=np.float32) #self.observation_space.shape で使うので注意
self.reward_range = (-np.inf, np.inf)
self.reset()
def reset(self):
self.x = np.zeros(self.observation_space.shape[0]//2)
self.t = 0
self.observation = np.zeros(self.observation_space.shape)
self.once_reward_flags = [False, False, False, False]
self.done = False
self.game_clear = False
self.game_over = False
self.all_soly = self.x.reshape(len(self.x),1)
return self.observation
def step(self, action):
start_time = time.time()
#tanhでactionが出てくるので(-1, 1)
self.u_inp = np.array(action)
# t~t+dtまで微分方程式を解く (RK45メソッドを使用)
sol = solve_ivp(aircraft_dynamics, [self.t, self.t+self.dt], self.x,
args=(self.u_inp, ),
method='RK45')
#t~t+dtまでの結果を追加
self.all_soly = np.append(self.all_soly, sol.y, axis=1)
#次のt+dtにおける初期条件
self.x = sol.y[:, -1] #現在の最終状態
self.t += self.dt
#状態量の微分値
self.dxdt = aircraft_dynamics(self.t, self.x, self.u_inp)
#状態量(積分値12 微分値12)
self.observation = np.concatenate((self.x, self.dxdt))
# --- 終了判定 ---
#180°旋回することを目的とする
target_vecor = np.array([-1, 0, 0])
#速度ベクトルと目標ベクトルのコサイン類似度で報酬
vel_e = self.observation[21:24]
cosine_similarity = np.dot(target_vecor, vel_e)/(np.sqrt(np.dot(target_vecor, target_vecor))*np.sqrt(np.dot(vel_e, vel_e)))
#ほぼ目標を向いたら終了
#if cosine_similarity > 0.95:
#ほぼ目標を向いて高度が±1000になったら終了
if cosine_similarity > 0.95 and abs(self.x[11]) < 1000:
self.done = True
self.game_clear = True
# --- ゲームオーバー判定 ---
if self.t >= self.max_episode_time:
self.done = True
self.game_over = True
# --- 報酬の計算 ---
reward_sum, rewards_dict = self.reward_function()
self.reward = reward_sum
# --- info ---
elapsed_time = time.time() - start_time
self.info = {'elapsed_time': elapsed_time,
"rewards_dict": rewards_dict}
#print(self.t, self.done, self.reward)
#print(self.info)
return self.observation, self.reward, self.done, self.info
def reward_function(self):
# --- 報酬の設定 ---
rewards_dict = {}
# --- 時間幅の影響を無くした即時報酬の設定 ---
#時間幅に依存しないようにしたいのでdtをかける
#短時間で達成できるようにしたい
rewards_dict["time_penalty"] = (0) * self.dt #時間に応じて減額
#高度を変えないでほしい 1000ftで-1
rewards_dict["altitude_penalty"] = (-1 * abs(self.x[11])/10**3) * self.dt
#バンク角が大きすぎないでほしい(今回は使用せず)
if abs(self.x[7])%(2*np.pi) > np.pi /4:
rewards_dict["bank_angle_penalty"] = (0) * self.dt #-1 * (abs(self.x[7])%(2*np.pi) - np.pi/4))
else:
rewards_dict["bank_angle_penalty"] = (0) * self.dt
# --- 時間に応じた即時報酬の設定 ---
#180°旋回することを目的とする
target_vecor = np.array([-1, 0, 0])
#速度ベクトルと目標ベクトルのコサイン類似度で報酬
vel_e = self.observation[21:24]
cosine_similarity = np.dot(target_vecor, vel_e)/(np.sqrt(np.dot(target_vecor, target_vecor))*np.sqrt(np.dot(vel_e, vel_e)))
#報酬の範囲が[-1,1]のままだとずっと後ろに進んでいれば報酬が増え続けるので[-2,0]にする。
#rewards_dict["nose_direction"] = (cosine_similarity-1) * self.dt
#さらに、いつまでも反対を向いていて欲しくないので、時間に比例して大きくする
rewards_dict["nose_direction_penalty"] = (cosine_similarity-1) * self.dt * (self.t/self.max_episode_time)
# --- 一度だけの報酬の設定 ---
#途中経過
#45°旋回完了
if cosine_similarity > -0.7 and self.once_reward_flags[0]==False:
rewards_dict["once_reward"] = 25/self.t
self.once_reward_flags[0] = True
#90°旋回完了
elif cosine_similarity > 0.0 and self.once_reward_flags[1]==False:
rewards_dict["once_reward"] = 50/self.t
self.once_reward_flags[1] = True
#135°旋回完了
elif cosine_similarity > 0.7 and self.once_reward_flags[2]==False:
rewards_dict["once_reward"] = 100/self.t
self.once_reward_flags[2] = True
#180°旋回完了
elif cosine_similarity > 0.95 and self.once_reward_flags[3]==False:
rewards_dict["once_reward"] = 200/self.t
self.once_reward_flags[3] = True
else:
rewards_dict["once_reward"] = 0
# --- 終了による報酬 ---
if self.game_clear == True:
rewards_dict["done_reward"] = 500 /self.t
else:
rewards_dict["done_reward"] = 0
# --- ゲームオーバーによる報酬 ---
if self.game_over == True:
rewards_dict["gameover_penalty"] = -0.2 * self.max_episode_time #-10
else:
rewards_dict["gameover_penalty"] = 0
# --- 報酬の合計 ---
reward_sum = 0
for reward in rewards_dict.values():
reward_sum += reward
return reward_sum, rewards_dict
def render(self, mode='rgb_array'):
if mode == 'rgb_array':
plt.ion()
fig, axes = plt.subplots(2, 2, figsize=(2*6.4, 2*4.8), dpi=72, tight_layout=True, subplot_kw=dict(projection='3d'))
for i in range(2):
for j in range(2):
ax = axes[i,j]
#fig.add_axes(ax)
ax.invert_zaxis()
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_zlabel('z')
ax.set_xlim(0, 500)
ax.set_ylim(-250, 250)
ax.set_zlim(100, -100)
# 軌跡
ax.plot(self.all_soly[9], self.all_soly[10], self.all_soly[11])
ax.set_xlim(min(min(self.all_soly[9]),0), max(max(self.all_soly[9]), 500),)
ax.set_ylim(min(min(self.all_soly[10]),-250), max(max(self.all_soly[10]), 250),)
ax.set_zlim(max(max(self.all_soly[11]), 100),min(min(self.all_soly[11]),-100))
#紙飛行機
plot_paper_airplane(x=self.x[9], y=self.x[10], z=self.x[11], phi=self.x[7], theta=self.x[3], psi=self.x[8], scale=100, ax=ax)
axes[0,0].view_init(elev=90, azim=-90)
axes[1,0].view_init(elev=0, azim=-90)
axes[1,1].view_init(elev=0, azim=0)
# 描画をキャンバスに保存
fig.canvas.draw()
# キャンバスからRGBデータを取得
image = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
image = image.reshape(fig.canvas.get_width_height()[::-1] + (3,))
#メモリ開放
plt.clf()
plt.close()
return image
#4つの角度からではなく斜めの1つの角度のみ
def render_old(self, mode='rgb_array'):
if mode == 'rgb_array':
plt.ion()
fig = plt.figure(dpi=100)
ax = Axes3D(fig)
fig.add_axes(ax)
ax.invert_zaxis()
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_zlabel('z')
ax.set_xlim(0, 500)
ax.set_ylim(-250, 250)
ax.set_zlim(100, -100)
# 軌跡
ax.plot(self.all_soly[9], self.all_soly[10], self.all_soly[11])
ax.set_xlim(min(min(self.all_soly[9]),0), max(max(self.all_soly[9]), 500),)
ax.set_ylim(min(min(self.all_soly[10]),-250), max(max(self.all_soly[10]), 250),)
ax.set_zlim(max(max(self.all_soly[11]), 100),min(min(self.all_soly[11]),-100))
#紙飛行機
plot_paper_airplane(x=self.x[9], y=self.x[10], z=self.x[11], phi=self.x[7], theta=self.x[3], psi=self.x[8], scale=100, ax=ax)
# 描画をキャンバスに保存
fig.canvas.draw()
# キャンバスからRGBデータを取得
image = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
image = image.reshape(fig.canvas.get_width_height()[::-1] + (3,))
#メモリ開放
plt.clf()
plt.close()
return image
def close(self):
pass
def seed(self, seed=None):
self.action_space.seed(seed)
self.observation_space.seed(seed)
print(f"{seed=}")
pass
#自前環境の登録
from gym.envs.registration import register
register(
id="my-v0",
entry_point=FlightSimulatorEnv,
)
Soft Actor-Critic(SAC)の実装
方策モデルをNNにしたことで、連続値の状態から連続値のアクションを出力できるようになりました。さらに、DQNでは貪欲法でアクションを一つだけ選ぶ方法でしたが、NNになったため、複数のアクションの値を同時に出力できるようになりました。
また、アクションには乱数zが乗っているため、毎回同じ行動をとるのではなく、ランダム性が生まれるようになりました。
https://qiita.com/pocokhc/items/354a2ddf4cbd742d3191
から画像を引用
方策モデル
状態を受け取って行動を決定するニューラルネットワーク
決定的に行動を決める時はfoward
関数を使用。
探索時はある程度ランダム性を持たせるために、乱数を使用した標準正規分布を足して行動を決定するsample
関数を使用
#状態を受け取り,ガウス分布の平均と標準偏差の対数を計算し、actionを出力する方策モデル
class SACActor(nn.Module):
def __init__(self, state_shape, action_shape):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_shape[0], 256),
nn.ReLU(inplace=True),
nn.Linear(256, 256),
nn.ReLU(inplace=True),
nn.Linear(256, 2 * action_shape[0]),
)
#tanhでmean(actionに相当)を[-1, 1]にする(こちらではノイズ載せた後にtanhをかけないため)
def forward(self, states):
return torch.tanh(self.net(states).chunk(2, dim=-1)[0])
def sample(self, states):
means, log_stds = self.net(states).chunk(2, dim=-1)
return reparameterize(means, log_stds.clamp(-20, 2))
def reparameterize(means, log_stds):
""" Reparameterization Trickを用いて,確率論的な行動とその確率密度を返す. """
# 標準偏差.logから戻す
stds = log_stds.exp()
# 標準ガウス分布から,ノイズをサンプリングする.
#引数のmeansはサイズの指定のみ
noises = torch.randn_like(means)
# Reparameterization Trickを用いて,N(means, stds)からのサンプルを計算する.
us = means + noises * stds
# tanh を適用し,確率論的な行動を計算する.
actions = torch.tanh(us)
# 確率論的な行動の確率密度の対数を計算する.
# ガウス分布 `N(0, stds * I)` における `noises * stds` の確率密度の対数(= \log \pi(u|a))を計算する.
log_pis = -0.5 * math.log(2 * math.pi) * log_stds.size(-1) - log_stds.sum(dim=-1, keepdim=True) - (0.5 * noises.pow(2)).sum(dim=-1, keepdim=True)
log_pis -= torch.log(1 - actions.pow(2) + 1e-6).sum(dim=-1, keepdim=True)
return actions, log_pis
方策評価モデル
状態と行動から価値Qを評価するニューラルネットワークを2つ備えたモデル
#Clipped Double Q
#stateとactionを受け取って価値Qを2つ(Double)出力する方策評価モデル
class SACCritic(nn.Module):
def __init__(self, state_shape, action_shape):
super().__init__()
self.net1 = nn.Sequential(
nn.Linear(state_shape[0] + action_shape[0], 256),
nn.ReLU(inplace=True),
nn.Linear(256, 256),
nn.ReLU(inplace=True),
nn.Linear(256, 1),
)
self.net2 = nn.Sequential(
nn.Linear(state_shape[0] + action_shape[0], 256),
nn.ReLU(inplace=True),
nn.Linear(256, 256),
nn.ReLU(inplace=True),
nn.Linear(256, 1),
)
def forward(self, states, actions):
x = torch.cat([states, actions], dim=-1)
return self.net1(x), self.net2(x)
リプレイバッファ
環境に対して行動した経験をためておくバッファ。
学習時はここからサンプリングして勾配を計算していく
class ReplayBuffer:
def __init__(self, buffer_size, state_shape, action_shape, device):
# 次にデータを挿入するインデックス.
self._p = 0
# データ数.
self._n = 0
# リプレイバッファのサイズ.
self.buffer_size = buffer_size
# GPU上に保存するデータ.
self.states = torch.empty((buffer_size, *state_shape), dtype=torch.float, device=device)
self.actions = torch.empty((buffer_size, *action_shape), dtype=torch.float, device=device)
self.rewards = torch.empty((buffer_size, 1), dtype=torch.float, device=device)
self.dones = torch.empty((buffer_size, 1), dtype=torch.float, device=device)
self.next_states = torch.empty((buffer_size, *state_shape), dtype=torch.float, device=device)
def append(self, state, action, reward, done, next_state):
self.states[self._p].copy_(torch.from_numpy(state))
self.actions[self._p].copy_(torch.from_numpy(action))
self.rewards[self._p] = float(reward)
self.dones[self._p] = float(done)
self.next_states[self._p].copy_(torch.from_numpy(next_state))
self._p = (self._p + 1) % self.buffer_size
self._n = min(self._n + 1, self.buffer_size)
def sample(self, batch_size):
idxes = np.random.randint(low=0, high=self._n, size=batch_size)
return (
self.states[idxes],
self.actions[idxes],
self.rewards[idxes],
self.dones[idxes],
self.next_states[idxes]
)
SAC全体
class SAC():
def __init__(self, state_shape, action_shape, device=torch.device('cuda'), seed=0,
batch_size=256, gamma=0.99, lr_actor=3e-4, lr_critic=3e-4,
replay_size=10**6, start_steps=10**4, tau=5e-3, alpha=0.2, reward_scale=1.0):
# シードを設定する.
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
# リプレイバッファ.
self.buffer = ReplayBuffer(
buffer_size=replay_size,
state_shape=state_shape,
action_shape=action_shape,
device=device,
)
# Actor-Criticのネットワークを構築する.
self.actor = SACActor(
state_shape=state_shape,
action_shape=action_shape
).to(device)
self.critic = SACCritic(
state_shape=state_shape,
action_shape=action_shape
).to(device)
self.critic_target = SACCritic(
state_shape=state_shape,
action_shape=action_shape
).to(device).eval()
# ターゲットネットワークの重みを初期化し,勾配計算を無効にする.
self.critic_target.load_state_dict(self.critic.state_dict())
for param in self.critic_target.parameters():
param.requires_grad = False
# オプティマイザ.
self.optim_actor = torch.optim.Adam(self.actor.parameters(), lr=lr_actor)
self.optim_critic = torch.optim.Adam(self.critic.parameters(), lr=lr_critic)
# その他パラメータ.
self.learning_steps = 0
self.batch_size = batch_size
self.device = device
self.gamma = gamma
self.start_steps = start_steps
self.tau = tau
self.alpha = alpha
self.reward_scale = reward_scale
def explore(self, state):
""" 確率論的な行動と,その行動の確率密度の対数 log(pi(a|s)) を返す. """
state = torch.tensor(state, dtype=torch.float, device=self.device).unsqueeze_(0)
with torch.no_grad():
action, log_pi = self.actor.sample(state)
return action.cpu().numpy()[0], log_pi.item()
def exploit(self, state):
""" 決定論的な行動を返す. """
state = torch.tensor(state, dtype=torch.float, device=self.device).unsqueeze_(0)
with torch.no_grad():
action = self.actor(state)
return action.cpu().numpy()[0]
def is_update(self, steps):
# 学習初期の一定期間(start_steps)は学習しない.
return steps >= max(self.start_steps, self.batch_size)
def step(self, env, state, t, steps):
t += 1
# 学習初期の一定期間(start_steps)は,ランダムに行動して多様なデータの収集を促進する.
if steps <= self.start_steps:
action = env.action_space.sample()
else:
action, _ = self.explore(state)
next_state, reward, done, _ = env.step(action)
done_masked = done
# リプレイバッファにデータを追加する.
self.buffer.append(state, action, reward, done_masked, next_state)
# エピソードが終了した場合には,環境をリセットする.
if done:
t = 0
next_state = env.reset()
return next_state, t
def update(self):
self.learning_steps += 1
states, actions, rewards, dones, next_states = self.buffer.sample(self.batch_size)
self.update_critic(states, actions, rewards, dones, next_states)
self.update_actor(states)
self.update_target()
def update_critic(self, states, actions, rewards, dones, next_states):
curr_qs1, curr_qs2 = self.critic(states, actions)
with torch.no_grad():
next_actions, log_pis = self.actor.sample(next_states)
next_qs1, next_qs2 = self.critic_target(next_states, next_actions)
next_qs = torch.min(next_qs1, next_qs2) - self.alpha * log_pis
target_qs = rewards * self.reward_scale + (1.0 - dones) * self.gamma * next_qs
loss_critic1 = (curr_qs1 - target_qs).pow_(2).mean()
loss_critic2 = (curr_qs2 - target_qs).pow_(2).mean()
self.optim_critic.zero_grad()
(loss_critic1 + loss_critic2).backward(retain_graph=False)
self.optim_critic.step()
def update_actor(self, states):
actions, log_pis = self.actor.sample(states)
qs1, qs2 = self.critic(states, actions)
loss_actor = (self.alpha * log_pis - torch.min(qs1, qs2)).mean()
self.optim_actor.zero_grad()
loss_actor.backward(retain_graph=False)
self.optim_actor.step()
def update_target(self):
for t, s in zip(self.critic_target.parameters(), self.critic.parameters()):
t.data.mul_(1.0 - self.tau)
t.data.add_(self.tau * s.data)
学習用クラス
機能として
- 学習
- 報酬の評価
- 最終的な飛行の可視化
- 最終的な飛行の入力と報酬の時間変化の可視化
- 任意ステップ数ごとの報酬の変化のグラフ作成
が実装されています。
Gymの環境の場合、render
関数を使用して自動的に動画を作成してくれるラッパーを使用
def wrap_monitor(env):
""" Gymの環境をmp4に保存するために,環境をラップする関数. """
return gym.wrappers.RecordVideo(env, '/tmp/monitor', episode_trigger=lambda x: True)
def play_mp4():
""" 保存したmp4をHTMLに埋め込み再生する関数. """
path = glob.glob(os.path.join('/tmp/monitor', '*.mp4'))[0]
mp4 = open(path, 'rb').read()
url = "data:video/mp4;base64," + b64encode(mp4).decode()
return HTML("""<video width=400 controls><source src="%s" type="video/mp4"></video>""" % url)
# Commented out IPython magic to ensure Python compatibility.
from torch.utils.tensorboard import SummaryWriter
# %load_ext tensorboard
# TensorBoardをColab内に起動. リアルタイムに学習経過が更新されます
writer = SummaryWriter('./logs')
class Trainer:
def __init__(self, env, env_test, algo, seed=0, num_steps=10**6, eval_interval=10**4, num_eval_episodes=3):
self.env = env
self.env_test = env_test
self.algo = algo
# 環境の乱数シードを設定する.
self.env.seed(seed)
self.env_test.seed(2**31-seed)
# 平均収益を保存するための辞書.
self.returns = {'step': [], 'return': []}
# データ収集を行うステップ数.
self.num_steps = num_steps
# 評価の間のステップ数(インターバル).
self.eval_interval = eval_interval
# 評価を行うエピソード数.
self.num_eval_episodes = num_eval_episodes
def train(self):
""" num_stepsステップの間,データ収集・学習・評価を繰り返す. """
# 学習開始の時間
self.start_time = time.time()
# エピソードのステップ数.
t = 0
# 環境を初期化する.
state = self.env.reset()
#for steps in range(1, self.num_steps + 1):
for steps in tqdm(range(1, self.num_steps + 1)):
# 環境(self.env),現在の状態(state),現在のエピソードのステップ数(t),今までのトータルのステップ数(steps)を
# アルゴリズムに渡し,状態・エピソードのステップ数を更新する.
state, t = self.algo.step(self.env, state, t, steps)
# アルゴリズムが準備できていれば,1回学習を行う.
if self.algo.is_update(steps):
self.algo.update()
# 一定のインターバルで評価する.
if steps % self.eval_interval == 0:
self.evaluate(steps)
def evaluate(self, steps):
""" 複数エピソード環境を動かし,平均収益を記録する. """
returns = []
for _ in range(self.num_eval_episodes):
state = self.env_test.reset()
done = False
episode_return = 0.0
while (not done):
action = self.algo.exploit(state)
state, reward, done, _ = self.env_test.step(action)
episode_return += reward
returns.append(episode_return)
mean_return = np.mean(returns)
self.returns['step'].append(steps)
self.returns['return'].append(mean_return)
writer.add_scalar('Reward', mean_return, steps)
#print(f'Num steps: {steps:<6} '
# f'Return: {mean_return:<5.1f} '
# f'Time: {str(timedelta(seconds=int(time.time() - self.start_time)))}')
#tqdm.write(f'Num steps: {steps:<6} '
# f'Return: {mean_return:<5.1f} '
# f'Time: {str(timedelta(seconds=int(time.time() - self.start_time)))}')
def visualize(self):
""" 1エピソード環境を動かし,mp4を再生する. """
env = wrap_monitor(gym.make(self.env.unwrapped.spec.id))
state = env.reset()
done = False
while (not done):
action = self.algo.exploit(state)
state, _, done, _ = env.step(action)
del env
return play_mp4()
def visualize_graph(self):
""" 1エピソード環境を動かし,入力と報酬の内訳をグラフ化する """
# 最終的に得られた方策のテスト(可視化)
#each_reward の初期化
env = gym.make(self.env.unwrapped.spec.id)
state = env.reset()
action = self.algo.exploit(state)
_, _, _, info = env.step(action)
each_reward = {key: [] for key in info["rewards_dict"].keys()}
frames = []
sum_reward = []
#1エピソード動かす
for episode in range(1):
state = env.reset()
done = False
while not done:
action = self.algo.exploit(state)
state, reward, done, info = env.step(action)
frames.append(action)
sum_reward.append(reward)
for key, value in info["rewards_dict"].items():
each_reward[key].append(value)
# 入力の時間変化
test_time = np.linspace(0, (len(frames)-1)*env.dt, len(frames))
elevator_values = [item[0] for item in frames]
throttle_values = [item[1] for item in frames]
aileron_values = [item[2] for item in frames]
rudder_values = [item[3] for item in frames]
plt.figure()
plt.plot(test_time, elevator_values, label="elevator")
plt.plot(test_time, throttle_values, label="throttle")
plt.plot(test_time, aileron_values, label="aileron")
plt.plot(test_time, rudder_values, label="rudder")
plt.xlabel("time [s]")
plt.ylabel("value")
plt.xlim(0, (len(frames)-1)*env.dt)
plt.ylim(-1, 1)
plt.title('Time series plot of input')
plt.legend()
plt.tight_layout()
plt.show()
# 報酬の遷移
test_time = np.linspace(0, (len(each_reward[next(iter(each_reward))])-1)*env.dt, len(each_reward[next(iter(each_reward))]))
plt.figure()
for key, value in each_reward.items():
plt.plot(test_time, value, label=key)
plt.plot(test_time, sum_reward, label="sum_reward")
plt.xlabel("time [s]")
plt.ylabel("value")
plt.xlim(0, (len(frames)-1)*env.dt)
#plt.ylim(-1, 1)
plt.title('Time series plot of reward')
plt.legend()
plt.tight_layout()
plt.show()
# 報酬の合計の遷移
plt.figure()
for key, value in each_reward.items():
value = np.array(value)
plt.plot(test_time, np.cumsum(value), label=key)
plt.plot(test_time, np.cumsum(sum_reward), label="sum_reward")
plt.xlabel("time [s]")
plt.ylabel("value")
plt.xlim(0, (len(frames)-1)*env.dt)
#plt.ylim(-1, 1)
plt.title('Time series plot of cumulative reward')
plt.legend()
plt.tight_layout()
plt.show()
env.close()
del env
def plot(self):
""" 平均収益のグラフを描画する. """
fig = plt.figure(figsize=(8, 6))
plt.plot(self.returns['step'], self.returns['return'])
plt.xlabel('Steps', fontsize=24)
plt.ylabel('Return', fontsize=24)
plt.tick_params(labelsize=18)
plt.tight_layout()
実行
学習
ENV_ID = 'my-v0'
SEED = 0
REWARD_SCALE = 1.0
start_steps=50 * 10 ** 3 #学習せずに集めるだけの回数
NUM_STEPS = 150 * 10 ** 3 #学習ステップ
EVAL_INTERVAL = 1 * 10 ** 3 #評価間隔
replay_size=10**6 #バッファの数(全ステップよりも大きくする必要性)
#env = FlightSimulatorEnv()
#env_test = FlightSimulatorEnv()
env = gym.make(ENV_ID)
env_test = gym.make(ENV_ID)
#追加
if torch.cuda.is_available():
device = torch.device("cuda")
else:
device = torch.device("cpu")
print("!!!GPUではなくCPUで実行されているため低速です!!!")
#SAC default param
#state_shape,
#action_shape,
#device=torch.device('cuda'),
#seed=0,
#batch_size=256,
#gamma=0.99,
#lr_actor=3e-4,
#lr_critic=3e-4,
#replay_size=10**6,
#start_steps=10**4,
#tau=5e-3,
#alpha=0.2,
#reward_scale=1.0
algo = SAC(
state_shape=env.observation_space.shape,
action_shape=env.action_space.shape,
seed=SEED,
reward_scale=REWARD_SCALE,
device = device,
start_steps = start_steps,
replay_size=replay_size
)
trainer = Trainer(
env=env,
env_test=env_test,
algo=algo,
seed=SEED,
num_steps=NUM_STEPS,
eval_interval=EVAL_INTERVAL,
)
学習中の報酬変化
TensorBoardの右上の設定ボタンから、Reload Data
にチェックを入れると表示されるようになる
%tensorboard --logdir="logs"
trainer.train()
学習結果の可視化
trainer.plot()
最終的な飛行の可視化
4方向から見た際の飛行を動画で可視化しています
trainer.visualize()
さらに、各入力の時間変化、報酬の内訳の時間変化、累積の報酬の時間変化を表示します。
trainer.visualize_graph()