0
1

【Python】飛行機の運動をシミュレートするプログラムをつくる その4 「DQN」

Posted at

はじめに

今回は以下の記事の続きです。

本プログラムは飛行機をDQNにより自動で操縦することを目指したプログラムです。

飛行のモデリングは『航空機力学入門 (加藤寛一郎・大屋昭男・柄沢研治 著 東京大学出版会)』の3章「微小擾乱の運動方程式」をベースに作成しています。

そしてDQNの実装は東京大学松尾研究室の「サマースクール2024 深層強化学習」をベースにしております。

水平旋回を目指して学習させた結果が以下です。

paper_plane (2).gif

プログラムはgooglecolabで実行できるように公開しています。

各関数(変更なし)

import numpy as np
from scipy.integrate import odeint, solve_ivp
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import time
import mpl_toolkits.mplot3d.art3d as art3d
from IPython.display import HTML
import matplotlib.animation as animation
from tqdm.notebook import tqdm

# --- 回転行列の定義 ---
def rotation_matrix(psi, theta, phi):
  # z軸周り回転
  R_z = np.array([[np.cos(psi), -np.sin(psi), 0],
                   [np.sin(psi), np.cos(psi), 0],
                   [0, 0, 1]])
  # y軸周り回転
  R_y = np.array([[np.cos(theta), 0, np.sin(theta)],
                   [0, 1, 0],
                   [-np.sin(theta), 0, np.cos(theta)]])
  # x軸周り回転
  R_x = np.array([[1, 0, 0],
                   [0, np.cos(phi), -np.sin(phi)],
                   [0, np.sin(phi), np.cos(phi)]])
  # 全体の回転行列
  R = R_z @ R_y @ R_x
  return R



#全体座標系における速度ベクトルの取得
def get_global_velosity(U0, u, alpha, beta, psi, theta, phi):
    # 機体座標系での速度ベクトル
    u_b = U0 + u
    v_b = u_b * np.sin(beta)
    w_b = u_b * np.tan(alpha)
    vel_b = np.array([u_b, v_b, w_b])

    # 全体座標系での速度ベクトル
    vel_e = rotation_matrix(psi, theta, phi) @ np.atleast_2d(vel_b).T
    vel_e = vel_e.flatten()
    return vel_e




#紙飛行機の描写関数
def plot_paper_airplane(x, y, z, phi, theta, psi, scale, ax):
    """
    3次元座標と角度が与えられたとき、その状態の紙飛行機のような図形をプロットする

    Args:
      x: x座標
      y: y座標
      z: z座標
      psi: ロール角 (ラジアン)
      theta : ピッチ角 (ラジアン)
      phi: ヨー角 (ラジアン)
      機体の大きさをいじりたければscaleを変える

    """

    #三角形を描く
    poly_left_wing = scale * np.array([[2, 0.0, 0],
                                      [-1, 1, 0],
                                      [-1, 0.1, 0]])
    poly_right_wing = poly_left_wing.copy()
    poly_right_wing[:,1] = -1 * poly_right_wing[:,1]

    poly_left_body = scale * np.array([[2, 0.0, 0.0],
                                      [-1, 0.0, +0.1],
                                      [-1, 0.1, 0.0]])
    poly_right_body = poly_left_body.copy()
    poly_right_body[:,1] = -1 * poly_right_body[:,1]



    R = rotation_matrix(psi, theta, phi) # yaw, pitch, roll


    for points in [poly_left_wing, poly_left_body, ]:
        # 紙飛行機の点を回転
        translated_rotated_points = (R @ points.T).T + np.array([x, y, z])
        #描写
        ax.add_collection3d(art3d.Poly3DCollection([translated_rotated_points],facecolors='orangered', linewidths=1, edgecolors='k', alpha=0.6))

    for points in [poly_right_wing, poly_right_body]:
        # 紙飛行機の点を回転
        translated_rotated_points = (R @ points.T).T + np.array([x, y, z])
        #描写
        ax.add_collection3d(art3d.Poly3DCollection([translated_rotated_points],facecolors='lime', linewidths=1, edgecolors='k', alpha=0.6))

OpenAIのgym形式での飛行モデルの作成(ほぼ変更なし)

強化学習ではOpenAIが作成したライブラリであるgymと互換性があるように環境をモデル化することで、過去に作られた豊富な強化学習プログラムがそのまま使えるようになります。

そのため、今回は今まで作成したシミュレーターをgym形式に変更するところから始めていきます。

参考:

import gym
class FlightSimulatorEnv(gym.Env):
    metadata = {'render.modes': ['rgb_array']}

    # --- 機体パラメータ ---
    g = 9.81
    # 初期条件
    U0 = 293.8
    W0 = 0
    theta0 = 0.0

    # 縦の有次元安定微係数: ロッキードP2V-7
    Xu = -0.0215
    Zu = -0.227
    Mu = 0

    Xa = 14.7
    Za = -236
    Ma = -3.78
    Madot = -0.28

    Xq = 0
    Zq = -5.76
    Mq = -0.992

    X_deltat = 0.01#0
    Z_deltae = -12.9
    Z_deltat = 0
    M_deltae = -2.48
    M_deltat = 0


    # 横・方向の有次元安定微係数: ロッキードP2V-7
    Yb=-45.4
    Lb=-1.71
    Nb=0.986

    Yp=0.716
    Lp=-0.962
    Np=-0.0632

    Yr=2.66
    Lr=0.271
    Nr=-0.215

    #Y_deltaa=0
    L_deltaa=1.72
    N_deltaa=-0.0436

    Y_deltar=9.17
    L_deltar=0.244
    N_deltar=-0.666




    # --- 縦の運動方程式 ---
    # 行列A, Bの定義
    A_long = np.array([[Xu, Xa, -W0, -g * np.cos(theta0)],
                      [Zu / U0, Za / U0, 1 + Zq / U0, (g / U0) * np.sin(theta0)],
                      [Mu + Madot * (Zu / U0), Ma + Madot * (Za / U0), Mq + Madot * (1 + Zq / U0),
                        (Madot * g / U0) * np.sin(theta0)],
                      [0, 0, 1, 0]])

    B_long = np.array([[0, X_deltat],
                      [Z_deltae / U0, Z_deltat / U0],
                      [M_deltae + Madot * (Z_deltae / U0), M_deltat + Madot * (Z_deltat / U0)],
                      [0, 0]])


    # --- 横・方向の運動方程式 ---
    # 行列A, Bの定義
    A_lat = np.array([[Yb / U0, (W0 + Yp) / U0, (Yr / U0 - 1), g * np.cos(theta0) / U0, 0],
                      [Lb, Lp, Lr, 0, 0],
                      [Nb, Np, Nr, 0, 0],
                      [0, 1, np.tan(theta0), 0, 0],
                      [0, 0, 1 / np.cos(theta0), 0, 0]])

    B_lat = np.array([[0, Y_deltar / U0],
                      [L_deltaa, L_deltar],
                      [N_deltaa, N_deltar],
                      [0, 0],
                      [0, 0]])


    # --- 統合モデル ---
    def aircraft_dynamics(self, t, x, U0, A_long, B_long, A_lat, B_lat, u_inp):

      u_long = u_inp[:2]
      u_lat = u_inp[2:]

      # 縦の運動方程式
      dxdt_long = A_long @ np.atleast_2d(x[:4]).T + B_long @ np.atleast_2d(u_long).T
      dxdt_long = dxdt_long.flatten()

      # 横・方向の運動方程式
      dxdt_lat = A_lat @ np.atleast_2d(x[4:9]).T + B_lat @ np.atleast_2d(u_lat).T
      dxdt_lat = dxdt_lat.flatten()

      # 機体座標系から全体座標系での速度ベクトルへの変換
      vel_e = get_global_velosity(U0=U0, u=x[0], alpha=x[1], beta=x[4], psi=x[8], theta=x[3], phi=x[7])

      # 縦と横・方向の状態量の変化と位置の変化を結合
      dxdt = np.concatenate((dxdt_long, dxdt_lat, vel_e))
      return dxdt







    def __init__(self):
        self.x0 = np.zeros(12)
        self.dt = 0.4
        self.max_episode_steps = int(100/1)

        self.action_space = gym.spaces.Discrete(8)
        self.observation_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=(15,), dtype=np.float32)
        self.reward_range = (-np.inf, np.inf)
        self.reset()

    def reset(self):
        self.x = self.x0
        self.t = 0
        self.observation = np.concatenate((self.x, np.array([self.U0, 0, 0])))#速度の変数を追加

        self.done = False
        self.all_soly = self.x0.reshape(len(self.x0),1)

        return self.observation

    def step(self, action):
        start_time = time.time()

        u_action = np.zeros(self.action_space.n)
        u_action[action] = 1

        #u_actionの前半をプラス方向の入力、後半をマイナスの入力として実装
        self.u_inp = u_action[:self.action_space.n//2] - u_action[self.action_space.n//2:]

        # t~t+dtまで微分方程式を解く (RK45メソッドを使用)
        sol = solve_ivp(self.aircraft_dynamics, [self.t, self.t+self.dt], self.x,
            args=(self.U0, self.A_long, self.B_long, self.A_lat, self.B_lat, self.u_inp),
            method='RK45')

        #t~t+dtまでの結果を追加
        self.all_soly = np.append(self.all_soly, sol.y, axis=1)

        #次のt+dtにおける初期条件
        self.x = sol.y[:, -1] #現在の最終状態
        self.t += self.dt

        vel_e = get_global_velosity(U0=self.U0, u=self.x[0], alpha=self.x[1], beta=self.x[4], psi=self.x[8], theta=self.x[3], phi=self.x[7])
        #状態量
        self.observation = np.concatenate((self.x, vel_e)) * 10





        # --- 報酬の設定 ---

        # --- 時間に応じた報酬の設定 ---
        #短時間で達成できるようにしたい
        self.reward = -1 #時間に応じて減額

        #高度を変えないでほしい 1000ftで0.1の減少
        self.reward -= abs(self.x[11])/10**4

        #180°旋回することを目的とする
        target_vecor = np.array([-1, 0, 0])
        #速度ベクトルと目標ベクトルのコサイン類似度で報酬
        vel_e = get_global_velosity(U0=self.U0, u=self.x[0], alpha=self.x[1], beta=self.x[4], psi=self.x[8], theta=self.x[3], phi=self.x[7])
        cosine_similarity = np.dot(target_vecor, vel_e)/(np.sqrt(np.dot(target_vecor, target_vecor))*np.sqrt(np.dot(vel_e, vel_e)))
        self.reward +=  cosine_similarity

        #時間幅に依存しないようにしたい
        self.reward *= self.dt

        #ほぼ目標を向いたら終了
        if cosine_similarity > 0.95:
            self.done = True
            self.reward += 10

        #終了判定
        if self.t >= 100: #100s経過した場合
            self.done = True
            #self.reward -= 100


        #info
        self.info = elapsed_time = time.time() - start_time

        #print(self.t, self.done, self.reward)
        #print(self.info)


        return self.observation, self.reward, self.done, self.info




    def render(self, mode='rgb_array'):

        if mode == 'rgb_array':
            plt.ion()
            fig = plt.figure(dpi=100)
            ax = Axes3D(fig)
            fig.add_axes(ax)
            ax.invert_zaxis()
            ax.set_xlabel('x')
            ax.set_ylabel('y')
            ax.set_zlabel('z')
            ax.set_xlim(0, 500)
            ax.set_ylim(-250, 250)
            ax.set_zlim(100, -100)

            # 軌跡
            ax.plot(self.all_soly[9], self.all_soly[10], self.all_soly[11])
            ax.set_xlim(min(min(self.all_soly[9]),0), max(max(self.all_soly[9]), 500),)
            ax.set_ylim(min(min(self.all_soly[10]),-250), max(max(self.all_soly[10]), 250),)
            ax.set_zlim(max(max(self.all_soly[11]), 100),min(min(self.all_soly[11]),-100))

            #紙飛行機
            plot_paper_airplane(x=self.x[9],  y=self.x[10], z=self.x[11], phi=self.x[7], theta=self.x[3], psi=self.x[8], scale=100, ax=ax)

            # 描画をキャンバスに保存
            fig.canvas.draw()

            # キャンバスからRGBデータを取得
            image = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
            image = image.reshape(fig.canvas.get_width_height()[::-1] + (3,))

            #メモリ開放
            plt.clf()
            plt.close()

        return image


        def close(self):
            pass

        def seed(self, seed=None):
            pass

dtを1から0.4に変更似ただけです。

DQN(Deep Q-Network)

DQNでは、行動価値関数Qをニューラルネットワークで表現します。
連続値の状態の集合を入力すると、各行動に対応したニューロンにその行動をとった際の価値を出力してくれるというものです。
そのため、行動自体はまだ離散的な選択です。

DQNについては東京大学松尾研究室「サマースクール2024 深層強化学習」をベースとしてpytorchのプログラムを使用しています。

特徴としては以下があげられます。

  • 行動価値関数Qをニューラルネットワークで表現
  • 経験リプレイ(experience replay)の利用のためリプレイバッファの利用
  • ターゲットネットワーク(target Q-network)の利用(正解ラベル用のQ関数を一定期間固定)

DQNの詳しい説明については以下のサイトなどが参考になると思います。

DQNの定義

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical, Normal
from collections import deque

# Q関数の定義
class QNetwork(nn.Module):
    def __init__(self, dim_state, num_action, hidden_size=16):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(dim_state, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, hidden_size)
        self.fc4 = nn.Linear(hidden_size, num_action)

    def forward(self, x):
        h = F.elu(self.fc1(x))
        h = F.elu(self.fc2(h))
        h = F.elu(self.fc3(h))
        y = F.elu(self.fc4(h))
        return y

# リプレイバッファの定義
class ReplayBuffer:
    def __init__(self, memory_size):
        self.memory_size = memory_size
        self.memory = deque([], maxlen = memory_size)

    def append(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        batch_indexes = np.random.randint(0, len(self.memory), size=batch_size)
        states      = np.array([self.memory[index]['state'] for index in batch_indexes])
        next_states = np.array([self.memory[index]['next_state'] for index in batch_indexes])
        rewards     = np.array([self.memory[index]['reward'] for index in batch_indexes])
        actions     = np.array([self.memory[index]['action'] for index in batch_indexes])
        dones   = np.array([self.memory[index]['done'] for index in batch_indexes])
        return {'states': states, 'next_states': next_states, 'rewards': rewards, 'actions': actions, 'dones': dones}
class Dqn:
    def __init__(self, dim_state, num_action, gamma=0.99, lr=0.001, batch_size=32, memory_size=50000, target_update_freq=30):
        self.dim_state = dim_state
        self.num_action = num_action
        self.gamma = gamma  # 割引率
        self.batch_size = batch_size  # Q関数の更新に用いる遷移の数
        self.target_update_freq = target_update_freq  # ターゲットネットワークを更新する頻度
        self.qnet = QNetwork(dim_state, num_action)
        self.target_qnet = QNetwork(dim_state, num_action)  # ターゲットネットワーク
        self.optimizer = optim.Adam(self.qnet.parameters(), lr=lr)
        self.replay_buffer = ReplayBuffer(memory_size)
        self._target_qnet_update_count = 0

    # Q関数を更新
    def update_q(self):
        batch = self.replay_buffer.sample(self.batch_size)
        q = self.qnet(torch.tensor(batch["states"], dtype=torch.float)).gather(1, torch.tensor(batch["actions"]).reshape(-1, 1))  # batch内の行動に対応するQ値だけ取り出します

        with torch.no_grad():
          # maxQの計算
          maxq = torch.max(self.target_qnet(torch.tensor(batch["next_states"],dtype=torch.float)), dim=1).values
          target = torch.tensor(batch["rewards"], dtype=torch.float) + self.gamma * maxq * torch.tensor(~batch["dones"], dtype=torch.bool)

        self.optimizer.zero_grad() # 勾配の初期化
        # lossとしてMSEを利用
        loss = nn.MSELoss()(q, target)
        loss.backward() # 誤差逆伝播
        self.optimizer.step() # 学習

        self._target_qnet_update_count += 1
        if self._target_qnet_update_count % self.target_update_freq == 0:  # 一定数更新したらターゲットネットワークを更新
          self.target_qnet.load_state_dict(self.qnet.state_dict())

    # Q値が最大の行動を選択
    def get_greedy_action(self, state):
        state_tensor = torch.tensor(state, dtype=torch.float).view(-1, self.dim_state)
        action = torch.argmax(self.qnet(state_tensor).data).item()
        return action

    # ε-greedyに行動を選択
    def get_action(self, state, episode):
        epsilon = 0.7 * (1/(episode+1))  # ここでは0.5から減衰していくようなεを設定
        if epsilon <= np.random.uniform(0,1):
            action = self.get_greedy_action(state)
        else:
            action = np.random.choice(self.num_action)
        return action

最初のリプレイバッファでは学習をしない(Q関数を更新しない)上、Q関数に基づく行動の選択もしていない(こちらは限定的だと思われる)のでとても高速

一気にリプレイバッファ分ミニバッチを使って学習せずに、学習しながら同時に行動している理由は、ランダムな移動だけでは本当に欲しい動きをしないため、Q関数を適切なものに学習できないため

学習

# 各種設定
num_episode = 500  # 学習エピソード数
memory_size = 50000  # replay bufferの大きさ
initial_memory_size = 2000  # 最初に貯めるランダムな遷移の数
target_update_freq = 10  # ターゲットネットワークを更新する頻度

# ログ
episode_rewards = []
num_average_epidodes = 10

env = FlightSimulatorEnv()
max_steps =  env.max_episode_steps #env.spec.max_episode_steps  # エピソードの最大ステップ数

agent = Dqn(env.observation_space.shape[0], env.action_space.n, memory_size=memory_size, target_update_freq=target_update_freq)

# 最初にreplay bufferにランダムな行動をしたときのデータを入れる
state = env.reset()
for step in tqdm(range(initial_memory_size)):
    action = env.action_space.sample() # ランダムに行動を選択
    next_state, reward, done, _ = env.step(action)
    transition = {
        'state': state,
        'next_state': next_state,
        'reward': reward,
        'action': action,
        'done': int(done)
    }
    agent.replay_buffer.append(transition)
    state = env.reset() if done else next_state

for episode in tqdm(range(num_episode)):
    state = env.reset()  # envからは15次元の連続値の観測が返ってくる
    episode_reward = 0
    for t in range(max_steps):
        action = agent.get_action(state, episode)  # 行動を選択
        next_state, reward, done, _ = env.step(action)
        episode_reward += reward
        transition = {
            'state': state,
            'next_state': next_state,
            'reward': reward,
            'action': action,
            'done': int(done)
        }
        agent.replay_buffer.append(transition)
        agent.update_q()  # Q関数を更新
        state = next_state
        if done:
            break
    episode_rewards.append(episode_reward)
    if episode % 50 == 0:
        tqdm.write("Episode %d finished | Episode reward %f" % (episode, episode_reward))

# 累積報酬の移動平均を表示
moving_average = np.convolve(episode_rewards, np.ones(num_average_epidodes)/num_average_epidodes, mode='valid')
plt.plot(np.arange(len(moving_average)),moving_average)
plt.title('DQN: average rewards in %d episodes' % num_average_epidodes)
plt.xlabel('episode')
plt.ylabel('rewards')
plt.show()

env.close()

image.png

可視化

# 最終的に得られた方策のテスト(可視化)
env = FlightSimulatorEnv()
frames = []
for episode in range(1):
    state = env.reset()
    frames.append(env.render(mode='rgb_array'))
    done = False
    while not done:
        action = agent.get_greedy_action(state)
        state, reward, done, _ = env.step(action)
        frames.append(env.render(mode='rgb_array'))
env.close()

plt.figure(figsize=(frames[0].shape[1]/72.0, frames[0].shape[0]/72.0), dpi=72)
patch = plt.imshow(frames[0])
plt.axis('off')

def animate(i):
    patch.set_data(frames[i])

anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=2000/len(frames))
anim.save("paper_plane.gif", writer="imagemagick")
HTML(anim.to_jshtml())

paper_plane (2).gif

可視化結果を見ると、ずっと一方向にロールしながら高度を上げ下げしながら180°旋回するような結果となりました。

これは、

  1. 右旋回するために右にロールする
  2. ロールしたせいで高度が下がる
  3. ロールしつつも高度を上げる
  4. 高度が上がったので引き起こして90°旋回する
  5. 以上を繰り返す

という流れになっていました。
原因としては、バンク角について報酬を上げていなかった点、時間経過で報酬を減らしていた点により、このような旋回になったと分析しています。

まとめ

今回は、DQNにより水平旋回をさせるプログラムを作りました。
シミュレーション環境をgym形式にしていたため、Q学習からDQNへ簡単に対応することができました。
そして、やはりQ学習よりもDQNでは滑らかな動きをするようになり、そして法主の与え方によって大きく結果が変わるということもわかりました。

今回は「その4」で一番基本的なDQNでしたが、DDQNであったりPrioritized Experience Replayであったり様々なテクニックがあるようです。
ただ、その5ではそちらではなく、行動空間を連続的にしたSACを実装したいと考えています。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1