Help us understand the problem. What is going on with this article?

FlappyBird で強化学習の練習 その1: DQN

※ 2019/04/04 追記: 問題設定を少し修正し、実験をやり直しました。
※ 2019/04/19 追記: モデルを少し変更し、実験をやり直しました。

この記事は何

せっかく Pythonで学ぶ強化学習 をざっと読んだので、手を動かしてみる大作戦です。
FlappyBird という数年前に話題になったゲームがあり、それを強化学習を用いて学習していきたいと思います。
目標は満点である264点を安定して取ることです。
のんびり動かしてみつつ、色々やったことを記録していこうと考えています :muscle:

本記事はひとまずベースラインとなるシンプルな DQN で学習をさせたものを紹介します。
以降の更新で、まずは Rainbow のそれぞれのアイディアを試しつつ、ベースラインからの改良度合いを見ていけたらと考えています。
勉強しつつ書いてるので、何か誤りなどあればコメントいただけると助かります :bow:

実装は jupyter notebook 上で行っており、 今回のコードはこちらです。
リポジトリはこちら: cfiken/flappybird-try

目次

  • DQN とは
  • FlappyBird における問題設定
  • DQN による学習結果
  • 実装の紹介
  • まとめ

DQN とは

DQN は Deep Q-Network の略で、強化学習における Q-Learning を、ディープラーニングを使って実現する手法のことを言います。
ここでは詳解はしません(詳しい記事は既に死ぬほどあります :bow: )。
発表論文は nature に掲載されています。
Deep Mind による記事

Q-Learning では、ある状態と行動の価値(Q値)をTD誤差を用いて反復的に更新し、それが最大となるような行動を取りました。

\begin{align}
Q\left(s_{t}, a_{t}\right) &= Q\left(s_{t}, a_{t}\right)+\eta *\left(R_{t+1}+\gamma \max _{a} Q\left(s_{t+1}, a\right)-Q\left(s_{t}, a_{t}\right)\right) \\
\pi(a|s_t) &= \mathrm{arg}\max_a Q(s_t, a)
\end{align}

DQN では Q-Learning に対して、TD誤差を用いて反復的にQ値を推定するのではなく、状態を入力として入れるとそれぞれの行動をとった際の価値を出力するようなニューラルネットワークに置き換えて価値を推定するモデルとなっています。

\begin{align}
Q\left(s_{t}, a \right) &= f(s_t; \boldsymbol{\theta}) \,\,\,\, (f\mathrm{: neural \, network \, model})\\
\pi(a|s_t) &= \mathrm{arg}\max_a Q(s_t, a)
\end{align}

ここでこの$f(s_t; \boldsymbol{\theta})$は、先程の TD 誤差を最小化するようにパラメータを学習します。

特に DQN の発表論文である では、ゲーム画面という画像から CNN を使って直接価値を推定し、それを使って人間並みの能力を持つエージェントが作成できたという点でとても話題になった手法とのことでした。

また、DQN では、学習の難しい強化学習で学習を安定化させるためのノウハウがいくつか入っています。本記事の実装では Experience Replay と Fixed Target Q-Network, reward clipping などを取り入れています。

FlappyBird における問題設定

強化学習には、状態、行動、報酬のセットが必要です。FlappyBird というタスクを解くにあたって、どのようにそれぞれを定義するかについて説明します。

状態 (state)

単に状態といっても次の2通りがあります。

  1. gym のインターフェースから得られる生の状態
  2. 学習に使うために前処理を適用した後の状態

前者は、FlappyBird の gym 上のインスタンスを env から env.step(action)env.reset() で得られる状態です。
FlappyBird の場合は、 (height, width, channel) = (288, 512, 3) で、それぞれ0-255の値を取る画像データが得られます。

ここから、学習をしやすくするために前処理を行ったものが後者にあたります。
今回は、まずゲームに不要な範囲となっている画面下部の部分を除きます。ちょうど400あたりより下のピクセルはただの背景でした。
その後それぞれのデータをグレースケールに変換し、128x128のサイズに変換して、全体を255.0で割って 0~1 の値に変換します。
それを時系列で 4 frame 並べ、 shape を (width, height, frames) としたものを学習に使用する状態としています。
コードとしては、 Observer クラスの transform メソッドにあたります。

class Observer:
    # ...
    def transform(self, state):
        state = state[:400, :, :]
        grayed = Image.fromarray(state).convert('L')  # h, w, c -> h, w

        resized = grayed.resize((self.width, self.height))
        resized = np.array(resized).astype(np.float32)
        resized = np.transpose(resized, (1, 0)) # h, w -> w, h
        normalized = resized / 255.0
        if len(self._frames) == 0:
            for i in range(self.frame_count):
                self._frames.append(normalized)
        else:
            self._frames.append(normalized)
        feature = np.array(self._frames)
        feature = np.transpose(feature, (1, 2, 0))  # [f, w, h] -> [w, h, f]

行動 (action)

FlappyBird における行動は、何もしない (=0) か飛ぶ (=1) の二種類です。
今回の実装では、学習をしやすくするために「同じ行動を4回繰り返す」という制約を入れています。
すなわち行動は、「4step の間何もしない」か「4step の間飛び続ける」のどちらかとしました。
あくまで今回の実装ではそうしたというだけで、生の行動をそのまま扱っても問題ないかと思います。

報酬 (reward)

FlappyBird のデフォルトの実装では、

  • ドカンの間を一つ抜けるたびに +1
  • ぶつかってしまうと -5
  • それ以外は 0

という報酬が与えられます。今回は学習をしやすくするため、次のような reward shaping を加えました。

def reward_shaping(self, reward):
    if 0.1 > reward > -0.1:
        return 0.01
    elif reward <= -0.1:
        return -1.0
    else:
        return 1.0

全体の学習を安定化させるために報酬をクリップしつつ、長生きする方が良いということを学習させる意図で、前に進むだけで少しの報酬が与えられるようにしています。

ここまでの定義は、あくまで私が識者に聞いたりしつつ現在それで試しているというだけのもので、工夫の余地があると思います。

DQN による学習結果

スコアの計算は次のように行います。

  • モデルを別々に5回学習する
  • それぞれのモデルで50回ずつプレイして、スコア(超えたドカンの数)の平均(と分散)を計算する
  • モデル5個のそれぞれの結果の mean, median を比較する

5つのモデルと最終的な結果は次のようになりました。

1: mean: 31.8200, std: 29.1923
2: mean: 18.1200, std: 17.0407
3: mean: 56.9200, std: 49.4173
4: mean: 53.4400, std: 44.1883
5: mean: 68.4400, std: 67.3041
--- total ---
mean: 45.748, median: 53.440

平均スコアは 45.748, 中央値は 53.440 でした。平均約45本ちょいのドカンを超えているようです。
とはいえまだスコアはブレが大きく、個々の結果を見ると1本で死んでるプレイもあり、安定して攻略できているとは言えません。
が、人間がプレイしても30を超えるのは結構難しいので(私は結構やって人力スコア20ちょいでした...)、人間レベルに達したと言っても良さそうです。

下記は、冒頭に上げたものと同じで、89回成功したときの動画です。

また、学習中の TensorBoard での reward の様子です(5回分)。

スクリーンショット 2019-04-20 21.55.55.png

だいたい 150k step ほどで伸びが止まっています。

実装の紹介

実行時の notebook はこちらです。
主要なところを紹介していきます。

Experience クラス

まずは次のように NamedTuple を使って Experience クラスを定義します。
学習のため、エージェントに行動をさせた結果を学習データとして Experience Reply に積みますが、その1データ分を格納するクラスを作っておきます。

class Experience(NamedTuple):
    state: gym.spaces.Box
    action: int
    reward: float
    next_state: gym.spaces.Box
    done: bool

ここでの state や reward は上記で定義したもの(変換済みのもの)です。

モデル

次に、ニューラルネットによるモデルを定義します。入力を状態、出力を各行動 (0 or 1) 毎の価値を出力するようなモデルです。
今回は tf.keras.Model のサブクラスとして作成し、後にこれを用いて keras の Functional API を使えるようなモデルに変換しました。
(keras, サブクラスでも Functional API 使えるようになってほしい)

class AgentModel(tf.keras.Model):

    def __init__(self, is_training: bool, num_outputs: int):
        super(AgentModel, self).__init__()
        self.is_training = is_training
        self.num_outputs = num_outputs
        k_init = tf.keras.initializers.glorot_normal()
        relu = tf.nn.relu
        self.conv_01 = tf.keras.layers.Conv2D(32, kernel_size=8, strides=4, kernel_initializer=k_init, activation=relu) 
        self.conv_02 = tf.keras.layers.Conv2D(64, kernel_size=4, strides=2, kernel_initializer=k_init, activation=relu)
        self.conv_03 = tf.keras.layers.Conv2D(64, kernel_size=3, strides=1, kernel_initializer=k_init, activation=relu)
        self.flatten = tf.keras.layers.Flatten()
        self.dense = tf.keras.layers.Dense(256, kernel_initializer=k_init, activation=relu)
        self.output_layer = tf.keras.layers.Dense(num_outputs, kernel_initializer=k_init)

    def call(self, inputs):
        outputs = inputs
        outputs = self.conv_01(outputs)
        outputs = self.conv_02(outputs)
        outputs = self.conv_03(outputs)
        outputs = self.flatten(outputs)
        outputs = self.dense(outputs)
        outputs = self.output_layer(outputs)
        return outputs

    def compute_output_shape(self, input_shape):
        shape = tf.TensorShape(input_shape).as_list()
        return [shape[0], self.num_outputs]

モデルの構成としては、特に工夫せずに書籍などを参考にしました。
3層の convolution レイヤのあと、flat なベクトルに変形し、dense レイヤを通して2つの値を出力します。

Agent クラス

Agent クラスでは、次のことを行います。

  • ニューラルネットによるモデルを準備する
  • 状態を入力として行動を返す
  • データをもとにモデルを学習する
class Agent:

    def __init__(self, actions, epsilon, input_shape, learning_rate=0.0001):
        self.actions = actions
        self.epsilon = epsilon
        self.input_shape = input_shape
        self.learning_rate = learning_rate
        self.model = None
        self._teacher_model = None
        self.initialize()

    def initialize(self):
        self.build()
        optimizer = tf.train.RMSPropOptimizer(self.learning_rate)
        self.model.compile(optimizer, loss='mse')

    def save(self, model_path):
        self.model.save_weights(model_path, overwrite=True)

    @classmethod
    def load(cls, env, model_path, epsilon=0.0001):
        actions = list(range(env.action_space.n))
        input_shape = (env.width, env.height, env.frame_count)
        agent = cls(actions, epsilon, input_shape)
        agent.initialize()
        agent.model.load_weights(model_path)
        return agent

    def build(self):
        self.model = AgentModel(is_training=True, num_outputs=len(self.actions))

        # teacher_model を更新するため、両方のモデルで一度計算し重みを取得する
        self._teacher_model = AgentModel(is_training=True, num_outputs=len(self.actions))
        dummy = np.random.randn(1, *self.input_shape).astype(np.float32)
        dummy = tf.convert_to_tensor(dummy)
        _ = self.model.call(dummy)
        _ = self._teacher_model.call(dummy)
        self.update_teacher()

    def policy(self, state) -> int:
        '''
        epsilon greedy で受け取った state をもとに行動を決定する
        '''
        # epsilon greedy
        if np.random.random() < self.epsilon:
            return np.random.randint(len(self.actions))
        else:
            estimates = self.estimate(state)
            return np.argmax(estimates)

    def estimate(self, state):
        '''
        ある state の状態価値を推定する
        '''
        state_as_batch = np.array([state])
        return self.model.predict(state_as_batch)[0]

    def update(self, experiences, gamma):
        '''
        与えられた experiences をもとに学習
        '''
        states = np.array([e.state for e in experiences])
        next_states = np.array([e.next_state for e in experiences])

        estimated_values = self.model.predict(states)
        next_state_values = self._teacher_model.predict(next_states)

        # train
        for i, e in enumerate(experiences):
            reward = e.reward
            if not e.done:
                reward += gamma * np.max(next_state_values[i])
            estimated_values[i][e.action] = reward
        loss = self.model.train_on_batch(states, estimated_values)
        return loss

    def update_teacher(self):
        self._teacher_model.set_weights(self.model.get_weights())

    def play(self, env, episode_count: int = 2, render: bool = True):
        total_rewards = []
        for e in range(episode_count):
            state = env.reset()
            done = False
            episode_reward = 0

            while not done:
                if render:
                    env.render()
                action = self.policy(state)
                step_reward = 0
                for _ in range(4):
                    next_state, reward, done = env.step_with_raw_reward(action)
                    if done:
                        break
                    step_reward += reward
                episode_reward += reward
                state = next_state
            print('episode {}, total reward: {:.4f}'.format(e, episode_reward))
            total_rewards.append(episode_reward)

        env.reset()
        print('average reward by {}: {:.4f}'.format(episode_count, np.mean(total_rewards)))

def policy では、epsilon greedy によって、ランダムに選んだ行動か、与えられた状態をもとに最も価値が高くなる行動を返しています。
def update では、experiences (データの batch)からモデルの学習を行っています。

また、ここでは Fixed Target Q-Network というテクニックを使用しています。
次のように、 update 内で、遷移先の状態価値の計算には self.model ではなく self._teacher_model を使用し、それを正解ラベルとして学習を行っています。

estimated_values = self.model.predict(states)
next_state_values = self._teacher_model.predict(next_states)

self.model は各 minibatch 毎にパラメータが更新されているため、遷移先状態も self.model を用いて推定を行ってしまうと、TD誤差も不安定になってしまいます。
これを避けるために、 self._teacher_model を用意し、定期的に更新しつつも各 minibatch では同じモデルを使えるようにしています。

Trainer クラス

Trainer クラスは、名前の通りモデルの学習を行うクラスです。
agent を初期化し、行動させながらデータを溜め、そのデータを使って agent を学習させます。また、学習状況の可視化や、モデルの保存、前述した teacher_model の更新などの処理も含まれています。
細かい実装はソースを見ていただくとして、メインの学習部分を下記に貼ります。

class Trainer:
    # ... 略 ...
    def train_loop(self, env, agent, episode_count, initial_count):

        for episode in range(episode_count):
            state = env.reset()
            done = False
            step_count = 0
            episode_reward = 0
            while not done:
                action = agent.policy(state)
                step_reward = 0
                for _ in range(4):
                    next_state, reward, done = env.step(action)
                    if done:
                        break
                    step_reward += reward
                e = Experience(state, action, step_reward, next_state, done)
                self.experiences.append(e)
                episode_reward += step_reward
                loss = self.step(episode, agent)
                state = next_state

                if not self.training and (len(self.experiences) >= self.buffer_size or episode >= initial_count):
                    self.begin_training(agent)
                    self.training = True

            self.end_episode(episode, episode_reward, loss, agent)

エピソードカウント分、下記のループを回します。

  1. 環境を初期化する
  2. Agent モデルに状態を渡して行動を受け取る
  3. 環境に対して行動し、遷移先状態と報酬、ゲームが終わったかどうかのフラグを得る
  4. 得られた状態、行動、遷移先状態、報酬を experiences に追加する
  5. (ある程度データが溜まっていれば)モデルを1ステップ学習させる

行動選択後に少し注意点があります。

action = agent.policy(state)
step_reward = 0
for _ in range(4):
    next_state, reward, done = env.step(action)
    if done:
        break
    step_reward += reward

現在の状態から行動を受け取ったあと、4回連続でそれを実行しています。
これは、問題設定で書いた「同じ行動を4回繰り返す」をコードに落としたものです。
通常ではこのような処理は必要なく、得られた行動を1度だけ環境に対して実行してやれば良いと思います。

まとめ

FlappyBird を強化学習で攻略する第一歩として、ベースラインとなる DQN を実装しました。
学習させた結果、人間レベルのプレイが既に出来てきました。
とはいえ、学習が難しいと言われる強化学習を取り扱うためには、問題設定や前処理がかなり重要になってくることも分かりました。
報酬や行動をどのように定義すればうまくいくかなどについても、実問題への適用の際には熟考する必要がありそうです。

Why do not you register as a user and use Qiita more conveniently?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away