はじめに
今回は以下の記事の続きです。
本プログラムは飛行機をDQNにより自動で操縦することを目指したプログラムです。
飛行のモデリングは『航空機力学入門 (加藤寛一郎・大屋昭男・柄沢研治 著 東京大学出版会)』の3章「微小擾乱の運動方程式」をベースに作成しています。
そしてDQNの実装は東京大学松尾研究室の「サマースクール2024 深層強化学習」をベースにしております。
水平旋回を目指して学習させた結果が以下です。
プログラムはgooglecolabで実行できるように公開しています。
各関数(変更なし)
import numpy as np
from scipy.integrate import odeint, solve_ivp
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import time
import mpl_toolkits.mplot3d.art3d as art3d
from IPython.display import HTML
import matplotlib.animation as animation
from tqdm.notebook import tqdm
# --- 回転行列の定義 ---
def rotation_matrix(psi, theta, phi):
# z軸周り回転
R_z = np.array([[np.cos(psi), -np.sin(psi), 0],
[np.sin(psi), np.cos(psi), 0],
[0, 0, 1]])
# y軸周り回転
R_y = np.array([[np.cos(theta), 0, np.sin(theta)],
[0, 1, 0],
[-np.sin(theta), 0, np.cos(theta)]])
# x軸周り回転
R_x = np.array([[1, 0, 0],
[0, np.cos(phi), -np.sin(phi)],
[0, np.sin(phi), np.cos(phi)]])
# 全体の回転行列
R = R_z @ R_y @ R_x
return R
#全体座標系における速度ベクトルの取得
def get_global_velosity(U0, u, alpha, beta, psi, theta, phi):
# 機体座標系での速度ベクトル
u_b = U0 + u
v_b = u_b * np.sin(beta)
w_b = u_b * np.tan(alpha)
vel_b = np.array([u_b, v_b, w_b])
# 全体座標系での速度ベクトル
vel_e = rotation_matrix(psi, theta, phi) @ np.atleast_2d(vel_b).T
vel_e = vel_e.flatten()
return vel_e
#紙飛行機の描写関数
def plot_paper_airplane(x, y, z, phi, theta, psi, scale, ax):
"""
3次元座標と角度が与えられたとき、その状態の紙飛行機のような図形をプロットする
Args:
x: x座標
y: y座標
z: z座標
psi: ロール角 (ラジアン)
theta : ピッチ角 (ラジアン)
phi: ヨー角 (ラジアン)
機体の大きさをいじりたければscaleを変える
"""
#三角形を描く
poly_left_wing = scale * np.array([[2, 0.0, 0],
[-1, 1, 0],
[-1, 0.1, 0]])
poly_right_wing = poly_left_wing.copy()
poly_right_wing[:,1] = -1 * poly_right_wing[:,1]
poly_left_body = scale * np.array([[2, 0.0, 0.0],
[-1, 0.0, +0.1],
[-1, 0.1, 0.0]])
poly_right_body = poly_left_body.copy()
poly_right_body[:,1] = -1 * poly_right_body[:,1]
R = rotation_matrix(psi, theta, phi) # yaw, pitch, roll
for points in [poly_left_wing, poly_left_body, ]:
# 紙飛行機の点を回転
translated_rotated_points = (R @ points.T).T + np.array([x, y, z])
#描写
ax.add_collection3d(art3d.Poly3DCollection([translated_rotated_points],facecolors='orangered', linewidths=1, edgecolors='k', alpha=0.6))
for points in [poly_right_wing, poly_right_body]:
# 紙飛行機の点を回転
translated_rotated_points = (R @ points.T).T + np.array([x, y, z])
#描写
ax.add_collection3d(art3d.Poly3DCollection([translated_rotated_points],facecolors='lime', linewidths=1, edgecolors='k', alpha=0.6))
OpenAIのgym形式での飛行モデルの作成(ほぼ変更なし)
強化学習ではOpenAIが作成したライブラリであるgymと互換性があるように環境をモデル化することで、過去に作られた豊富な強化学習プログラムがそのまま使えるようになります。
そのため、今回は今まで作成したシミュレーターをgym形式に変更するところから始めていきます。
参考:
import gym
class FlightSimulatorEnv(gym.Env):
metadata = {'render.modes': ['rgb_array']}
# --- 機体パラメータ ---
g = 9.81
# 初期条件
U0 = 293.8
W0 = 0
theta0 = 0.0
# 縦の有次元安定微係数: ロッキードP2V-7
Xu = -0.0215
Zu = -0.227
Mu = 0
Xa = 14.7
Za = -236
Ma = -3.78
Madot = -0.28
Xq = 0
Zq = -5.76
Mq = -0.992
X_deltat = 0.01#0
Z_deltae = -12.9
Z_deltat = 0
M_deltae = -2.48
M_deltat = 0
# 横・方向の有次元安定微係数: ロッキードP2V-7
Yb=-45.4
Lb=-1.71
Nb=0.986
Yp=0.716
Lp=-0.962
Np=-0.0632
Yr=2.66
Lr=0.271
Nr=-0.215
#Y_deltaa=0
L_deltaa=1.72
N_deltaa=-0.0436
Y_deltar=9.17
L_deltar=0.244
N_deltar=-0.666
# --- 縦の運動方程式 ---
# 行列A, Bの定義
A_long = np.array([[Xu, Xa, -W0, -g * np.cos(theta0)],
[Zu / U0, Za / U0, 1 + Zq / U0, (g / U0) * np.sin(theta0)],
[Mu + Madot * (Zu / U0), Ma + Madot * (Za / U0), Mq + Madot * (1 + Zq / U0),
(Madot * g / U0) * np.sin(theta0)],
[0, 0, 1, 0]])
B_long = np.array([[0, X_deltat],
[Z_deltae / U0, Z_deltat / U0],
[M_deltae + Madot * (Z_deltae / U0), M_deltat + Madot * (Z_deltat / U0)],
[0, 0]])
# --- 横・方向の運動方程式 ---
# 行列A, Bの定義
A_lat = np.array([[Yb / U0, (W0 + Yp) / U0, (Yr / U0 - 1), g * np.cos(theta0) / U0, 0],
[Lb, Lp, Lr, 0, 0],
[Nb, Np, Nr, 0, 0],
[0, 1, np.tan(theta0), 0, 0],
[0, 0, 1 / np.cos(theta0), 0, 0]])
B_lat = np.array([[0, Y_deltar / U0],
[L_deltaa, L_deltar],
[N_deltaa, N_deltar],
[0, 0],
[0, 0]])
# --- 統合モデル ---
def aircraft_dynamics(self, t, x, U0, A_long, B_long, A_lat, B_lat, u_inp):
u_long = u_inp[:2]
u_lat = u_inp[2:]
# 縦の運動方程式
dxdt_long = A_long @ np.atleast_2d(x[:4]).T + B_long @ np.atleast_2d(u_long).T
dxdt_long = dxdt_long.flatten()
# 横・方向の運動方程式
dxdt_lat = A_lat @ np.atleast_2d(x[4:9]).T + B_lat @ np.atleast_2d(u_lat).T
dxdt_lat = dxdt_lat.flatten()
# 機体座標系から全体座標系での速度ベクトルへの変換
vel_e = get_global_velosity(U0=U0, u=x[0], alpha=x[1], beta=x[4], psi=x[8], theta=x[3], phi=x[7])
# 縦と横・方向の状態量の変化と位置の変化を結合
dxdt = np.concatenate((dxdt_long, dxdt_lat, vel_e))
return dxdt
def __init__(self):
self.x0 = np.zeros(12)
self.dt = 0.4
self.max_episode_steps = int(100/1)
self.action_space = gym.spaces.Discrete(8)
self.observation_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=(15,), dtype=np.float32)
self.reward_range = (-np.inf, np.inf)
self.reset()
def reset(self):
self.x = self.x0
self.t = 0
self.observation = np.concatenate((self.x, np.array([self.U0, 0, 0])))#速度の変数を追加
self.done = False
self.all_soly = self.x0.reshape(len(self.x0),1)
return self.observation
def step(self, action):
start_time = time.time()
u_action = np.zeros(self.action_space.n)
u_action[action] = 1
#u_actionの前半をプラス方向の入力、後半をマイナスの入力として実装
self.u_inp = u_action[:self.action_space.n//2] - u_action[self.action_space.n//2:]
# t~t+dtまで微分方程式を解く (RK45メソッドを使用)
sol = solve_ivp(self.aircraft_dynamics, [self.t, self.t+self.dt], self.x,
args=(self.U0, self.A_long, self.B_long, self.A_lat, self.B_lat, self.u_inp),
method='RK45')
#t~t+dtまでの結果を追加
self.all_soly = np.append(self.all_soly, sol.y, axis=1)
#次のt+dtにおける初期条件
self.x = sol.y[:, -1] #現在の最終状態
self.t += self.dt
vel_e = get_global_velosity(U0=self.U0, u=self.x[0], alpha=self.x[1], beta=self.x[4], psi=self.x[8], theta=self.x[3], phi=self.x[7])
#状態量
self.observation = np.concatenate((self.x, vel_e)) * 10
# --- 報酬の設定 ---
# --- 時間に応じた報酬の設定 ---
#短時間で達成できるようにしたい
self.reward = -1 #時間に応じて減額
#高度を変えないでほしい 1000ftで0.1の減少
self.reward -= abs(self.x[11])/10**4
#180°旋回することを目的とする
target_vecor = np.array([-1, 0, 0])
#速度ベクトルと目標ベクトルのコサイン類似度で報酬
vel_e = get_global_velosity(U0=self.U0, u=self.x[0], alpha=self.x[1], beta=self.x[4], psi=self.x[8], theta=self.x[3], phi=self.x[7])
cosine_similarity = np.dot(target_vecor, vel_e)/(np.sqrt(np.dot(target_vecor, target_vecor))*np.sqrt(np.dot(vel_e, vel_e)))
self.reward += cosine_similarity
#時間幅に依存しないようにしたい
self.reward *= self.dt
#ほぼ目標を向いたら終了
if cosine_similarity > 0.95:
self.done = True
self.reward += 10
#終了判定
if self.t >= 100: #100s経過した場合
self.done = True
#self.reward -= 100
#info
self.info = elapsed_time = time.time() - start_time
#print(self.t, self.done, self.reward)
#print(self.info)
return self.observation, self.reward, self.done, self.info
def render(self, mode='rgb_array'):
if mode == 'rgb_array':
plt.ion()
fig = plt.figure(dpi=100)
ax = Axes3D(fig)
fig.add_axes(ax)
ax.invert_zaxis()
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_zlabel('z')
ax.set_xlim(0, 500)
ax.set_ylim(-250, 250)
ax.set_zlim(100, -100)
# 軌跡
ax.plot(self.all_soly[9], self.all_soly[10], self.all_soly[11])
ax.set_xlim(min(min(self.all_soly[9]),0), max(max(self.all_soly[9]), 500),)
ax.set_ylim(min(min(self.all_soly[10]),-250), max(max(self.all_soly[10]), 250),)
ax.set_zlim(max(max(self.all_soly[11]), 100),min(min(self.all_soly[11]),-100))
#紙飛行機
plot_paper_airplane(x=self.x[9], y=self.x[10], z=self.x[11], phi=self.x[7], theta=self.x[3], psi=self.x[8], scale=100, ax=ax)
# 描画をキャンバスに保存
fig.canvas.draw()
# キャンバスからRGBデータを取得
image = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
image = image.reshape(fig.canvas.get_width_height()[::-1] + (3,))
#メモリ開放
plt.clf()
plt.close()
return image
def close(self):
pass
def seed(self, seed=None):
pass
dtを1から0.4に変更似ただけです。
DQN(Deep Q-Network)
DQNでは、行動価値関数Qをニューラルネットワークで表現します。
連続値の状態の集合を入力すると、各行動に対応したニューロンにその行動をとった際の価値を出力してくれるというものです。
そのため、行動自体はまだ離散的な選択です。
DQNについては東京大学松尾研究室「サマースクール2024 深層強化学習」をベースとしてpytorchのプログラムを使用しています。
特徴としては以下があげられます。
- 行動価値関数Qをニューラルネットワークで表現
- 経験リプレイ(experience replay)の利用のためリプレイバッファの利用
- ターゲットネットワーク(target Q-network)の利用(正解ラベル用のQ関数を一定期間固定)
DQNの詳しい説明については以下のサイトなどが参考になると思います。
DQNの定義
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical, Normal
from collections import deque
# Q関数の定義
class QNetwork(nn.Module):
def __init__(self, dim_state, num_action, hidden_size=16):
super(QNetwork, self).__init__()
self.fc1 = nn.Linear(dim_state, hidden_size)
self.fc2 = nn.Linear(hidden_size, hidden_size)
self.fc3 = nn.Linear(hidden_size, hidden_size)
self.fc4 = nn.Linear(hidden_size, num_action)
def forward(self, x):
h = F.elu(self.fc1(x))
h = F.elu(self.fc2(h))
h = F.elu(self.fc3(h))
y = F.elu(self.fc4(h))
return y
# リプレイバッファの定義
class ReplayBuffer:
def __init__(self, memory_size):
self.memory_size = memory_size
self.memory = deque([], maxlen = memory_size)
def append(self, transition):
self.memory.append(transition)
def sample(self, batch_size):
batch_indexes = np.random.randint(0, len(self.memory), size=batch_size)
states = np.array([self.memory[index]['state'] for index in batch_indexes])
next_states = np.array([self.memory[index]['next_state'] for index in batch_indexes])
rewards = np.array([self.memory[index]['reward'] for index in batch_indexes])
actions = np.array([self.memory[index]['action'] for index in batch_indexes])
dones = np.array([self.memory[index]['done'] for index in batch_indexes])
return {'states': states, 'next_states': next_states, 'rewards': rewards, 'actions': actions, 'dones': dones}
class Dqn:
def __init__(self, dim_state, num_action, gamma=0.99, lr=0.001, batch_size=32, memory_size=50000, target_update_freq=30):
self.dim_state = dim_state
self.num_action = num_action
self.gamma = gamma # 割引率
self.batch_size = batch_size # Q関数の更新に用いる遷移の数
self.target_update_freq = target_update_freq # ターゲットネットワークを更新する頻度
self.qnet = QNetwork(dim_state, num_action)
self.target_qnet = QNetwork(dim_state, num_action) # ターゲットネットワーク
self.optimizer = optim.Adam(self.qnet.parameters(), lr=lr)
self.replay_buffer = ReplayBuffer(memory_size)
self._target_qnet_update_count = 0
# Q関数を更新
def update_q(self):
batch = self.replay_buffer.sample(self.batch_size)
q = self.qnet(torch.tensor(batch["states"], dtype=torch.float)).gather(1, torch.tensor(batch["actions"]).reshape(-1, 1)) # batch内の行動に対応するQ値だけ取り出します
with torch.no_grad():
# maxQの計算
maxq = torch.max(self.target_qnet(torch.tensor(batch["next_states"],dtype=torch.float)), dim=1).values
target = torch.tensor(batch["rewards"], dtype=torch.float) + self.gamma * maxq * torch.tensor(~batch["dones"], dtype=torch.bool)
self.optimizer.zero_grad() # 勾配の初期化
# lossとしてMSEを利用
loss = nn.MSELoss()(q, target)
loss.backward() # 誤差逆伝播
self.optimizer.step() # 学習
self._target_qnet_update_count += 1
if self._target_qnet_update_count % self.target_update_freq == 0: # 一定数更新したらターゲットネットワークを更新
self.target_qnet.load_state_dict(self.qnet.state_dict())
# Q値が最大の行動を選択
def get_greedy_action(self, state):
state_tensor = torch.tensor(state, dtype=torch.float).view(-1, self.dim_state)
action = torch.argmax(self.qnet(state_tensor).data).item()
return action
# ε-greedyに行動を選択
def get_action(self, state, episode):
epsilon = 0.7 * (1/(episode+1)) # ここでは0.5から減衰していくようなεを設定
if epsilon <= np.random.uniform(0,1):
action = self.get_greedy_action(state)
else:
action = np.random.choice(self.num_action)
return action
最初のリプレイバッファでは学習をしない(Q関数を更新しない)上、Q関数に基づく行動の選択もしていない(こちらは限定的だと思われる)のでとても高速
一気にリプレイバッファ分ミニバッチを使って学習せずに、学習しながら同時に行動している理由は、ランダムな移動だけでは本当に欲しい動きをしないため、Q関数を適切なものに学習できないため
学習
# 各種設定
num_episode = 500 # 学習エピソード数
memory_size = 50000 # replay bufferの大きさ
initial_memory_size = 2000 # 最初に貯めるランダムな遷移の数
target_update_freq = 10 # ターゲットネットワークを更新する頻度
# ログ
episode_rewards = []
num_average_epidodes = 10
env = FlightSimulatorEnv()
max_steps = env.max_episode_steps #env.spec.max_episode_steps # エピソードの最大ステップ数
agent = Dqn(env.observation_space.shape[0], env.action_space.n, memory_size=memory_size, target_update_freq=target_update_freq)
# 最初にreplay bufferにランダムな行動をしたときのデータを入れる
state = env.reset()
for step in tqdm(range(initial_memory_size)):
action = env.action_space.sample() # ランダムに行動を選択
next_state, reward, done, _ = env.step(action)
transition = {
'state': state,
'next_state': next_state,
'reward': reward,
'action': action,
'done': int(done)
}
agent.replay_buffer.append(transition)
state = env.reset() if done else next_state
for episode in tqdm(range(num_episode)):
state = env.reset() # envからは15次元の連続値の観測が返ってくる
episode_reward = 0
for t in range(max_steps):
action = agent.get_action(state, episode) # 行動を選択
next_state, reward, done, _ = env.step(action)
episode_reward += reward
transition = {
'state': state,
'next_state': next_state,
'reward': reward,
'action': action,
'done': int(done)
}
agent.replay_buffer.append(transition)
agent.update_q() # Q関数を更新
state = next_state
if done:
break
episode_rewards.append(episode_reward)
if episode % 50 == 0:
tqdm.write("Episode %d finished | Episode reward %f" % (episode, episode_reward))
# 累積報酬の移動平均を表示
moving_average = np.convolve(episode_rewards, np.ones(num_average_epidodes)/num_average_epidodes, mode='valid')
plt.plot(np.arange(len(moving_average)),moving_average)
plt.title('DQN: average rewards in %d episodes' % num_average_epidodes)
plt.xlabel('episode')
plt.ylabel('rewards')
plt.show()
env.close()
可視化
# 最終的に得られた方策のテスト(可視化)
env = FlightSimulatorEnv()
frames = []
for episode in range(1):
state = env.reset()
frames.append(env.render(mode='rgb_array'))
done = False
while not done:
action = agent.get_greedy_action(state)
state, reward, done, _ = env.step(action)
frames.append(env.render(mode='rgb_array'))
env.close()
plt.figure(figsize=(frames[0].shape[1]/72.0, frames[0].shape[0]/72.0), dpi=72)
patch = plt.imshow(frames[0])
plt.axis('off')
def animate(i):
patch.set_data(frames[i])
anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=2000/len(frames))
anim.save("paper_plane.gif", writer="imagemagick")
HTML(anim.to_jshtml())
可視化結果を見ると、ずっと一方向にロールしながら高度を上げ下げしながら180°旋回するような結果となりました。
これは、
- 右旋回するために右にロールする
- ロールしたせいで高度が下がる
- ロールしつつも高度を上げる
- 高度が上がったので引き起こして90°旋回する
- 以上を繰り返す
という流れになっていました。
原因としては、バンク角について報酬を上げていなかった点、時間経過で報酬を減らしていた点により、このような旋回になったと分析しています。
まとめ
今回は、DQNにより水平旋回をさせるプログラムを作りました。
シミュレーション環境をgym形式にしていたため、Q学習からDQNへ簡単に対応することができました。
そして、やはりQ学習よりもDQNでは滑らかな動きをするようになり、そして法主の与え方によって大きく結果が変わるということもわかりました。
今回は「その4」で一番基本的なDQNでしたが、DDQNであったりPrioritized Experience Replayであったり様々なテクニックがあるようです。
ただ、その5ではそちらではなく、行動空間を連続的にしたSACを実装したいと考えています。