16
15

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【強化学習】OpenAI Gym×Keras-rlで強化学習アルゴリズムを実装していくぞ(DQN編)

Last updated at Posted at 2019-05-05

今回はDQN(Deep Q Network)の実装です。
これはすでに公式で実装されているのですが、理解のために自分なりに実装してみました。

本シリーズ

追記:改めて記事にしています。

概要

  • keras-rlにて、DQN用のAgentを実装
  • Pendiumゲームで画像なしによる学習を実施
  • Pendiumゲームで画像版による学習を実施

コード全体

本記事で作成したコードは以下です。
※1ファイル完結です。
※GoogleColaboratoryは実行結果付き

DQN(Deep Q Network)について

DQNはQ学習にて状態が非線形だと取りうる値が多すぎて学習が間に合わないというアイデアが基本となっている気がします(勝手な想像)
QテーブルをNNで近似するというのが基本アイデアですが、それ以外にもかなりたくさんの手法を取り入れていることが特徴です。

DQNを実装しつつ、各手法を解説していきたいと思います。

DQNAgent(keras-rlのAgent)の実装概要

まずは全体像です。
実装する keras-rl の Agent を元に概要を書きます。

DQNAgent.py
import rl.core
class DQNAgent(rl.core.Agent):
    def __init__(self, **kwargs):
        super(DQNAgent, self).__init__(**kwargs)
        self.compiled = False

        (各ハイパーパラメータの初期化)
        NNモデル(model)の作成
        NNモデル(target_model)の作成

    def reset_states(self):
        (各種初期化)

    def compile(self, optimizer=None, metrics=[]):
        self.compiled = True
        
        NNモデル(model)のコンパイル
        NNモデル(target_model)のコンパイル

    def load_weights(self, filepath):
        (modelのload)

    def save_weights(self, filepath, overwrite=False):
        (modelのsave)

    def forward(self, observation):

        複数フレームの入力処理

        ミニバッチ学習

        フレームスキップ条件
        if self.training:
            ϵを減少させるϵ-greedy法でアクションを決定
        else:
            Q値が最大のアクション
        
        return action

    def backward(self, reward, terminal):
        一定間隔でtarget_modelを更新
        return []

    @property
    def layers(self):
        return []

DQNの各種手法の実装

Qネットワークの定義

以下となります。NNモデルは参考文献をもとにしています。
実装が終わったら改良していきたいですが、とりあえずそのまま実装します。

Layer Filter NumFilters Stride Activation
Conv2D 8×8 32 4 ReLU
Conv2D 4×4 64 2 ReLU
Conv2D 3×3 64 1 ReLU
fc 512 ReLU
Output nb_actions Liner
DQNAgent.py
from keras.layers import *  #*はめんどくさいだけです
from keras.models import Model

def build_network(self):

    # 入力層(window_length, width, height)
    c = input_ = Input(shape=(self.window_length,) + self.input_shape)
    
    if self.enable_image_layer:
        c = Permute((2, 3, 1))(c)  # (window,w,h) -> (w,h,window)

        c = Conv2D(32, (8, 8), strides=(4, 4), padding="same")(c)
        c = Activation("relu")(c)
        c = Conv2D(64, (4, 4), strides=(2, 2), padding="same")(c)
        c = Activation("relu")(c)
        c = Conv2D(64, (3, 3), strides=(1, 1), padding="same")(c)
        c = Activation("relu")(c)
    c = Flatten()(c)

    c = Dense(512, activation="relu")(c)

    c = Dense(self.nb_actions, activation="linear")(c)  # 出力層
    return Model(input_, c)

window_length は後述します。

enable_image_layer を True にすると input_shape が画像になります。
input_shape(width, height) のグレー画像の入力を想定しておりサイズはコンストラクタで指定予定です。
((width,height,channel) 形式には対応していません)

enable_image_layer が False の場合は input_shape はデータになります。
入力後すぐに Flatten(平滑化) されるので特に形式の指定はありません(多分。。。)

DQNAgent.py
def __init__(self, input_shape, enable_image_layer):
    self.input_shape = input_shape 
    self.enable_image_layer = enable_image_layer

Target Network

Target Network という更新用のQネットワークを用意して一定間隔で今のQネットワークをコピーします。
Target Network は一定間隔更新が行われないので古いパラメータを使って更新が行われることとなります。
これによってQ値の更新に時差が生まれて学習がよくなるらしいです。

Q networkと Target Networkの作成

先ほど作成した build_network 関数を使って同じネットワークをそれぞれ作成します。

DQNAgent.py
def __init__(self):
    self.model = self.build_network()         # Q network
    self.target_model = self.build_network()  # target network

コンパイル

optimaizer はユーザが指定する形なので省略します。
論文では独自の RMSProp を使っていますが… まあ RMSProp を改良した Adam が keras で実装されているのでそちらを使います。
また、Target Network のほうは更新しないので oprimizer と損失関数は何でも大丈夫です。

DQNAgent.py
def compile(self, optimizer, metrics=[]):

    # target networkは更新がないので optimizerとlossは何でもいい
    self.target_model.compile(optimizer='sgd', loss='mse')

    def clipped_error_loss(y_true, y_pred):
        損失関数の定義
    
    self.model.compile(loss=clipped_error_loss, optimizer=optimizer, metrics=metrics)

    self.compiled = True

損失関数は後述します。

エラークリップ(損失関数)

Huber損失関数を導入しているとの事。
ここあまりよく分かっていませんが、実装は以下のようです。

DQNAgent.py
import tensorflow as tf
from keras import backend as K

def clipped_error_loss(y_true, y_pred):
    err = y_true - y_pred   # エラー
    L2 = 0.5 * K.square(err)
    L1 = K.abs(err) - 0.5
    
    # エラーが[-1,1]区間ならL2、それ以外ならL1を選択する。
    loss = tf.where((K.abs(err) < 1.0), L2, L1)  # Keras does not cover where function in tensorflow :-(

    return K.mean(loss)

参考:https://github.com/jaromiru/AI-blog/blob/master/CartPole-DQN.py

TargetNetworkの更新

更新するタイミングに違いはない気がするので backward 内で更新しています。
更新は簡単で、keras の model に set_weights/get_weights 関数が定義されています。

DQNAgent.py
def backward(self, reward, terminal):

    # 一定間隔でtarget modelに重さをコピー
    if self.step % self.target_model_update == 0:
        self.target_model.set_weights(self.model.get_weights())

    return []

step は keras-rl 側で定義されており、1step で1づつ増えます( episode を跨いでも増えていきます)
target_mode_update はハイパーパラメータで TargetNetwork の更新間隔です。

Experience Replay

過去の遷移を覚えておき、学習はその中からランダムで選んだ遷移で学習を行います。

Experience Replay用のメモリを作成

deque がランダムアクセスに対して遅いらしいので自作しました。(今後のこともあるので…)
参考:https://github.com/y-kamiya/machine-learning-samples/blob/7b6792ce37cc69051e9053afeddc6d485ad34e79/python3/reinforcement/dqn/agent.py

DQNAgent.py
import random
class ReplayMemory():
    def __init__(self, capacity):
        self.capacity= capacity
        self.index = 0
        self.memory = []
        
    def add(self, experience):
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.index] = experience
        self.index = (self.index + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

リングバッファ構造です。
capacity を超えるデータが来た場合、古いものから消えていきます。
データの取得は random.sample 関数でランダムに重複なしで取得します。

Experience Replayの初期化

コンストラクタで初期化します。
保存サイズはハイパーパラメータです。

DQNAgent.py
def __init__(self, memory_capacity=1_000_000):
    self.memory = ReplayMemory(capacity=memory_capacity)

連続フレームの保存

直近の数フレームを環境(状態)とするために保存します。
ここはReplayメモリと被るのでうまく実装すればメモリが節約できそうですがとりあえず愚直に実装します。

直近のフレーム数は window_length としてハイパーパラメータ化とし、window_length + 1 のサイズの変数を作成します。
+1は1つ先の状態の保存用です。

DQNAgent.py
def __init__(self, window_length=4):
    self.window_length = window_length

def reset_states(self):
    self.recent_observations = [np.zeros(self.input_shape) for _ in range(self.window_length+1)]

更新は forward 関数の先頭で行います。

DQNAgent.py
def forward(self, observation):
    self.recent_observations.append(observation)  # 最後に追加
    self.recent_observations.pop(0)  # 先頭を削除

取得する場合は以下の通り、
0~window_length までが現在の状態、1~window_length+1 までが次の状態です。

# 現在の状態
self.recent_observations[:self.window_length]
# 次の状態
self.recent_observations[1:]

actionとrewardの保存

actionとrewardはQ学習と同様です。

DQNAgent.py
# 初期化
def reset_states(self):
    self.recent_action = 0
    self.recent_reward = 0

# 1つ前を保持
def forward(observation):

    (省略)

    self.recent_action = action
    return action

def backward(self, reward, terminal):
    self.recent_reward = reward

ミニバッチ学習

学習箇所をまとめた関数を作成します。
forward内に直接書いてもいいのですが、長くなるので分けています。
まずは概要です。

DQNAgent.py
def forward_train(self):
    if not self.training:
        return

    ReplayMemoryに追加

    # ReplayMemory確保のため一定期間学習しない。
    if self.step <= self.nb_steps_warmup:
        return
    
    # 学習の更新間隔
    if self.step % self.train_interval != 0:
        return
    
    ReplayMemory からバッチデータを取得
    
    バッチデータと target_model を元に model を更新(学習)

nb_steps_warmup と train_interval はハイパーパラメータです。

DQNAgent.py
def __init__(self, nb_steps_warmup=50000, train_interval=4):
    self.nb_steps_warmup = nb_steps_warmup
    self.train_interval = train_interval

ReplayMemoryに追加

(現在の状態, アクション, 報酬, 次の状態) の形で追加します。

DQNAgent.py
self.memory.add((
   self.recent_observations[:self.window_length], 
   self.recent_action, 
   self.recent_reward, 
   self.recent_observations[1:]
   ))

ReplayMemoryからバッチデータを取得

ReplayMemoryから取り出した後、この後使いやすいように成形します。

DQNAgent.py
batchs = self.memory.sample(self.batch_size)
state0_batch = []
action_batch = []
reward_batch = []
state1_batch = []
for batch in batchs:
   state0_batch.append(batch[0])
   action_batch.append(batch[1])
   reward_batch.append(batch[2])
   state1_batch.append(batch[3])

batch_size はハイパーパラメータです。

DQNAgent.py
def __init__(self, batch_size=32):
    self.batch_size = batch_size

バッチデータを用いてQネットワークを更新

まず更新用に現在のQ値を取得します。

DQNAgent.py
outputs = self.model.predict(np.asarray(state0_batch), self.batch_size)

predict 関数で NN モデルの結果を取得できます。
引数には numpy 型をとる必要があります。
shapeとしては、(batch_size, window_length, input_shape) で入力され、(batch_size, nb_actions) で返ってきます。

同様に更新用のQ値を取得します。
こちらは TargetNetwork から取得します。

DQNAgent.py
target_qvals = self.target_model.predict(np.asarray(state1_batch), self.batch_size)

Q学習と同じ計算式で各バッチデータに対して更新をかけます。

DQNAgent.py
for i in range(self.batch_size):
    maxq = np.max(target_qvals[i])
    td_error = reward_batch[i] + self.gamma * maxq
    outputs[i][action_batch[i]] = td_error

Q学習のα相当はがないですが、optimizer側で処理するのでなくなっていると思います。(多分…)

最後に学習させます。

DQNAgent.py
self.model.train_on_batch(np.asarray(state0_batch), np.asarray(outputs))

こういうバッチ処理で学習する場合は fit より train_on_batch の方がいいらしいです。
追記:いろいろ見ていたら fit でも問題なさそうです。

フレームスキップ

毎フレーム学習するのはコストが高くなるので、一定間隔毎に学習させる方法です。

実装ではフレームスキップ中は同じactionを実行し続ける形となります。
まずはフレームスキップ用の変数を初期化し、スキップ間隔をハイパーパラメータとします。

DQNAgent.py
def __init__(self, action_interval=4):
    self.action_interval = action_interval

def reset_states(self):
    self.repeated_action = 0

forward内のactionを決定する箇所の実行を一定フレーム間隔にします。

DQNAgent.py
def forward(self, observation):

    (省略)

    action = self.repeated_action
    if self.step % self.action_interval == 0:

        アクションの決定

        self.repeated_action = action

    self.recent_action = action
    return action

アクションの決定

Q学習とほとんど同じです。(epsilon の算出は後述)

DQNAgent.py
if self.training:

    epsilon を算出
    
    # ϵ-greedy法
    if epsilon > np.random.uniform(0, 1):
        # ランダム
        action = np.random.randint(0, self.nb_actions)
    else:
        # 現在の状態を取得し、最大Q値から行動を取得。
        state0 = self.recent_observations[1:]
        q_values = self.model.predict(np.asarray([state0]), batch_size=1)[0]
        action = np.argmax(q_values)
else:
    # 現在の状態を取得し、最大Q値から行動を取得。
    state0 = self.recent_observations[1:]
    q_values = self.model.predict(np.asarray([state0]), batch_size=1)[0]
    action = np.argmax(q_values)

ϵ-greedy法の改善

ϵ-greedy法にて ϵ を最初は高い数字(ランダムで移動)にし、後半になるほど低い値(Q値に従う)にします。
ハイパーパラメータとして初期ε値の initial_epsilon、最後のε値の final_epsilon、何フレームかけて減らすかの exploration_steps を追加します。

DQNAgent.py
def __init__(self, initial_epsilon=1.0, final_epsilon=0.1, exploration_steps=1000000):
    self.initial_epsilon = initial_epsilon  
    self.epsilon_step = (initial_epsilon - final_epsilon) / exploration_steps
    self.final_epsilon = final_epsilon

算出は以下。

DQNAgent.py
epsilon = self.initial_epsilon - self.step*self.epsilon_step
if epsilon < self.final_epsilon:
    epsilon = self.final_epsilon

save/load

model を保存するだけです。

DQNAgent.py
def load_weights(self, filepath):
    self.model.load_weights(filepath)
def save_weights(self, filepath, overwrite=False):
    self.model.save_weights(filepath, overwrite=overwrite)

Processor側の実装

報酬の固定

報酬をマイナスなら-1、プラスなら+1にする手法です。
報酬の設定は学習するゲームにかなり依存するのでこの手法は何とも言えませんが…
実装する場合はAgentではなくProcessorで定義する方がいいでしょう。

PendulumProcessorForDQN.py
import rl.core
class PendulumProcessorForDQN(rl.core.Processor):
    def process_reward(self, reward):
        return np.clip(reward, -1., 1.)

画像の前処理

3点あるらしいです。

  • 210×160 → 84×84 にリサイズ
  • 2フレームから最大値をとる。(atariは奇数フレームと偶数フレームで片方しか表示されなかったりするらしいです)
  • グレー化

今回は1つ目と3つめのみ実装しています。
2つ目はあまりにも atari に依存しすぎている気はするので…
こちらも実装はProcessorですね。

PendulumProcessorForDQN.py
import rl.core
from PIL import Image
class PendulumProcessorForDQN(rl.core.Processor):
    def __init__(self,  enable_image=False, reshape_size=(84, 84)):
        self.shape = reshape_size
        self.enable_image = enable_image

    def process_observation(self, observation):
        if not self.enable_image:
            return observation
        img = Image.fromarray(observation)
        img = img.resize(self.shape).convert('L')  # resize and convert to grayscale
        return np.array(img) / 255  # DQNに渡しやすいように正規化

コンストラクタで入力が画像かどうかを指定できるようにしています。

アクション

線形モデルを出力する形ではないのでQ学習と変わらず離散化します。

PendulumProcessorForDQN.py
def process_action(self, action):
    ACT_ID_TO_VALUE = {
        0: [-2.0], 
        1: [-1.0], 
        2: [0.0], 
        3: [+1.0],
        4: [+2.0],
    }
    return ACT_ID_TO_VALUE[action]

Pendiumゲームで学習

##画像なしによる学習

Q学習とほぼ一緒です。
まずは画像なしバージョンです。

DQNAgent.py
import gym
from keras.optimizers import Adam

# 別ファイルにあると仮定しています。コード全体では1つのファイルにまとめています。
from PendulumProcessorForDQN  import PendulumProcessorForDQN
from DQNAgent import DQNAgent

env = gym.make("Pendulum-v0")
nb_actions = 5  # PendulumProcessorで5個と定義しているので5

processor = PendulumProcessorForDQN(enable_image=False)

# 引数が多いので辞書で定義して渡しています。
args={
    "input_shape": env.observation_space.shape, 
    "enable_image_layer": False, 
    "nb_actions": nb_actions, 
    "window_length": 1,         # 入力フレーム数
    "memory_max_size": 10_000,  # 確保するメモリーサイズ
    "nb_steps_warmup": 200,     # 初期のメモリー確保用step数(学習しない)
    "target_model_update": 100, # target networkのupdate間隔
    "action_interval": 1,  # アクションを実行する間隔
    "train_interval": 1,   # 学習する間隔
    "batch_size": 64,   # batch_size
    "gamma": 0.99,     # Q学習の割引率
    "initial_epsilon": 1.0,  # ϵ-greedy法の初期値
    "final_epsilon": 0.1,    # ϵ-greedy法の最終値
    "exploration_steps": 5000,  # ϵ-greedy法の減少step数
    "processor": processor,
}
agent = DQNAgent(**args)
agent.compile(optimizer=Adam())  # optimizerはAdamを指定

# 訓練
print("--- start ---")
print("'Ctrl + C' is stop.")
history = agent.fit(env, nb_steps=50_000, visualize=False, verbose=1)

# 結果を表示
plt.subplot(2,1,1)
plt.plot(history.history["nb_episode_steps"])
plt.ylabel("step")

plt.subplot(2,1,2)
plt.plot(history.history["episode_reward"])
plt.xlabel("episode")
plt.ylabel("reward")

plt.show()

# 訓練結果を見る
agent.test(env, nb_episodes=5, visualize=True)

結果

qiita_03_dqn.PNG

qiita_03_dqn.gif

ちゃんと学習できていますね。

画像ありによる学習

画像への変換は以下を参考にしました。
Keras-RLを用いた深層強化学習コト始め

Processorの変更

PendulumProcessorForDQN.py
from PIL import Image, ImageDraw
def process_observation(self, observation):
    if not self.enable_image:
        return observation
    img = self._get_rgb_state(observation)
    img = Image.fromarray(img)
    img = img.resize(self.shape).convert('L')  # resize and convert to grayscale
    return np.array(img) / 255  # DQNに渡しやすいように正規化

# 状態(x,y座標)から対応画像を描画する関数
def _get_rgb_state(self, state):
    img_size = 84

    h_size = img_size/2.0

    img = Image.new("RGB", (img_size, img_size), (255, 255, 255))
    dr = ImageDraw.Draw(img)

    # 棒の長さ
    l = img_size/4.0 * 3.0/ 2.0
    # 棒のラインの描写
    dr.line(((h_size - l * state[1], h_size - l * state[0]), (h_size, h_size)), (0, 0, 0), 1)

    # 棒の中心の円を描写(それっぽくしてみた)
    buff = img_size/32.0
    dr.ellipse(((h_size - buff, h_size - buff), (h_size + buff, h_size + buff)), outline=(0, 0, 0), fill=(255, 0, 0))

    # 画像の一次元化(GrayScale化)とarrayへの変換
    pilImg = img.convert("L")
    img_arr = np.asarray(pilImg)

    # 画像の規格化
    img_arr = img_arr/255.0

    return img_arr

こちらは変更部分のみ

DQNAgent.py
省略
processor = PendulumProcessorForDQN(enable_image=True, reshape_size=(84, 84))
省略
# 引数が多いので辞書で定義して渡しています。
args={
    "input_shape": (84, 84), 
    "enable_image_layer": True, 
    省略
    "window_length": 3,         # 入力フレーム数
    省略
    "batch_size": 8,   # batch_size
    "exploration_steps": 50000,  # ϵ-greedy法の減少step数
}
省略
history = agent.fit(env, nb_steps=100_000, visualize=False, verbose=1)
省略

window_length は角速度に相当する情報の必要を考慮して3フレームにしています。
また batch_size を小さくしたのは時間の関係です。
batch_size や nb_steps、exploration_steps は時間との兼ね合いで微調整しており、GoogleColaboratory で値が違います。
(やはりGPUは早いですね)

結果

qiita_03_dqn_image.PNG

qiita_03_dqn_image.gif

学習できていないですね…、試行回数が少ないという事にしましょう。(コードに間違いがあったらすいません)

まとめ

想像以上に使われてる手法が多くて…
説明が漏れている所もあるかも知れません。
ただ、DQN系列の強化学習はこれがベースとなるのでいい勉強になりました。
次はDoubleDQN、DurlingNetwork、(できればPrioritizedExperienceReplay)を実装したいと思います。

参考

Optimizer : 深層学習における勾配法について
https://github.com/keras-rl/keras-rl/blob/master/rl/agents/dqn.py
DQNをKerasとTensorFlowとOpenAI Gymで実装する
DQNからRainbowまで 〜深層強化学習の最新動向〜

16
15
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16
15

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?