LoginSignup
50
42

More than 5 years have passed since last update.

【強化学習中級者向け】実装例から学ぶ優先順位付き経験再生 prioritized experience replay DQN 【CartPoleで棒立て:1ファイルで完結】

Last updated at Posted at 2017-10-09

※2018年06月23日追記
PyTorchを使用した最新版の内容を次の書籍にまとめました。
つくりながら学ぶ! 深層強化学習 ~PyTorchによる実践プログラミング~ 18年6月28日発売


強化学習DQNの発展編である「優先順位付き経験再生 prioritized experience replay」を実装・解説したので、紹介します。

openaigym.video.0.9237.video000000.gif

概要

Open AI GymのCartPoleで、優先順位付き経験再生 prioritized experience replayにしたDQNの実装・解説をします。

プログラムが1ファイルで完結し、学習・理解しやすいようにしています。

【対象者】
・強化学習DQNの発展版に興味がある方
速習 強化学習: 基礎理論とアルゴリズム(書籍)を読んで、Dueling Networkを知ったが、実装方法がよく分からない方
・実装例を見たほうが、アルゴリズムを理解しやすい方

【得られるもの】
ミニマム・シンプルなプログラムが実装例から、優先順位付き経験再生 prioritized experience replayを理解・使用できるようになります。

【注意】
本記事に入る前に、以下の記事で、「Open AI gymのCartPoleの使い方」と「DQNの理論と実装」をなんとなく理解しておいてください。
CartPoleでDQN(deep Q-learning)、DDQNを実装・解説【Phythonで強化学習:第2回】

以下の記事で「優先順位付き経験再生」の概要をなんとなく感じておいてください。
Introduction to Prioritized Experience Replay(日本語)

速習 強化学習: 基礎理論とアルゴリズム(書籍)の、優先順位付き経験再生を説明も読んでおくと、なお良いです。

優先順位付き経験再生について

prioritized experience replayは、DQNでメモリに保存していた状態(s(t), a(t), r(t), s(t+1), a(t+1) )をexperience Replayする際に、優先順位をつけましょうって方法です。

では、何で優先順位をつけるかというと、TD誤差の大きさです。

TD誤差の大きさとは、
[r(t) + γ × max[Q(s(t+1), a(t+1))] - Q(s(t), a(t))

のことです。

このTD誤差が大きいサンプルを優先的に学習して、DQNのネットワークの誤差が小さくなるようにしましょうって作戦になります。

やりたいことはとてもシンプルです。

優先順位付き経験再生の実装

実装には以下のサイトを参考にしました。
LET’S MAKE A DQN: DOUBLE LEARNING AND PRIORITIZED EXPERIENCE REPLAY

Jaromiruさんのサイトでは、二分木でTD誤差を格納すると最も早いと説明されていますが、分かりづらくなるので、今回はシンプルなリスト(deque)で実装しています。

まずは、全コードを紹介して、その後、重要な部分を解説します。

duelingNetwork.py
# coding:utf-8
# [0]必要なライブラリのインポート
import gym  # 倒立振子(cartpole)の実行環境
import numpy as np
import time
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
from keras.utils import plot_model
from collections import deque
from gym import wrappers  # gymの画像保存
from keras import backend as K
import tensorflow as tf


# [1]損失関数の定義
# 損失関数にhuber関数を使用します 参考https://github.com/jaara/AI-blog/blob/master/CartPole-DQN.py
def huberloss(y_true, y_pred):
    err = y_true - y_pred
    cond = K.abs(err) < 1.0
    L2 = 0.5 * K.square(err)
    L1 = (K.abs(err) - 0.5)
    loss = tf.where(cond, L2, L1)  # Keras does not cover where function in tensorflow :-(
    return K.mean(loss)


# [2]Q関数をディープラーニングのネットワークをクラスとして定義
class QNetwork:
    def __init__(self, learning_rate=0.01, state_size=4, action_size=2, hidden_size=10):
        self.model = Sequential()
        self.model.add(Dense(hidden_size, activation='relu', input_dim=state_size))
        self.model.add(Dense(hidden_size, activation='relu'))
        self.model.add(Dense(action_size, activation='linear'))
        self.optimizer = Adam(lr=learning_rate)  # 誤差を減らす学習方法はAdamとし、勾配は最大1にクリップする
        # self.model.compile(loss='mse', optimizer=self.optimizer)
        self.model.compile(loss=huberloss, optimizer=self.optimizer)

    # 重みの学習
    def replay(self, memory, batch_size, gamma, targetQN):
        inputs = np.zeros((batch_size, 4))
        targets = np.zeros((batch_size, 2))
        mini_batch = memory.sample(batch_size)

        for i, (state_b, action_b, reward_b, next_state_b) in enumerate(mini_batch):
            inputs[i:i + 1] = state_b
            target = reward_b

            if not (next_state_b == np.zeros(state_b.shape)).all(axis=1):
                # 価値計算(DDQNにも対応できるように、行動決定のQネットワークと価値観数のQネットワークは分離)
                retmainQs = self.model.predict(next_state_b)[0]
                next_action = np.argmax(retmainQs)  # 最大の報酬を返す行動を選択する
                target = reward_b + gamma * targetQN.model.predict(next_state_b)[0][next_action]

            targets[i] = self.model.predict(state_b)  # Qネットワークの出力
            targets[i][action_b] = target  # 教師信号
            self.model.fit(inputs, targets, epochs=1, verbose=0)  # epochsは訓練データの反復回数、verbose=0は表示なしの設定


    # [※p1] 優先順位付き経験再生で重みの学習
    def pioritized_experience_replay(self, memory, batch_size, gamma, targetQN, memory_TDerror):

        # 0からTD誤差の絶対値和までの一様乱数を作成(昇順にしておく)
        sum_absolute_TDerror = memory_TDerror.get_sum_absolute_TDerror()
        generatedrand_list = np.random.uniform(0, sum_absolute_TDerror,batch_size)
        generatedrand_list = np.sort(generatedrand_list)

        # [※p2]作成した乱数で串刺しにして、バッチを作成する
        batch_memory = Memory(max_size=batch_size)
        idx = 0
        tmp_sum_absolute_TDerror = 0
        for (i,randnum) in enumerate(generatedrand_list):
            while tmp_sum_absolute_TDerror < randnum:
                tmp_sum_absolute_TDerror += abs(memory_TDerror.buffer[idx]) + 0.0001
                idx += 1

            batch_memory.add(memory.buffer[idx])


        # あとはこのバッチで学習する
        inputs = np.zeros((batch_size, 4))
        targets = np.zeros((batch_size, 2))
        for i, (state_b, action_b, reward_b, next_state_b) in enumerate(batch_memory.buffer):
            inputs[i:i + 1] = state_b
            target = reward_b

            if not (next_state_b == np.zeros(state_b.shape)).all(axis=1):
                # 価値計算(DDQNにも対応できるように、行動決定のQネットワークと価値観数のQネットワークは分離)
                retmainQs = self.model.predict(next_state_b)[0]
                next_action = np.argmax(retmainQs)  # 最大の報酬を返す行動を選択する
                target = reward_b + gamma * targetQN.model.predict(next_state_b)[0][next_action]

            targets[i] = self.model.predict(state_b)  # Qネットワークの出力
            targets[i][action_b] = target  # 教師信号
            self.model.fit(inputs, targets, epochs=1, verbose=0)  # epochsは訓練データの反復回数、verbose=0は表示なしの設定


# [2]Experience ReplayとFixed Target Q-Networkを実現するメモリクラス
class Memory:
    def __init__(self, max_size=1000):
        self.buffer = deque(maxlen=max_size)

    def add(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        idx = np.random.choice(np.arange(len(self.buffer)), size=batch_size, replace=False)
        return [self.buffer[ii] for ii in idx]

    def len(self):
        return len(self.buffer)


# [※p3] Memoryクラスを継承した、TD誤差を格納するクラスです
class Memory_TDerror(Memory):
    def __init__(self, max_size=1000):
        super().__init__(max_size)

    # add, sample, len は継承されているので定義不要

    # TD誤差を取得
    def get_TDerror(self, memory, gamma, mainQN, targetQN):
        (state, action, reward, next_state) = memory.buffer[memory.len() - 1]   #最新の状態データを取り出す
        # 価値計算(DDQNにも対応できるように、行動決定のQネットワークと価値観数のQネットワークは分離)
        next_action = np.argmax(mainQN.model.predict(next_state)[0])  # 最大の報酬を返す行動を選択する
        target = reward + gamma * targetQN.model.predict(next_state)[0][next_action]
        TDerror = target - targetQN.model.predict(state)[0][action]
        return TDerror

    # TD誤差をすべて更新
    def update_TDerror(self, memory, gamma, mainQN, targetQN):
        for i in range(0, (self.len() - 1)):
            (state, action, reward, next_state) = memory.buffer[i]  # 最新の状態データを取り出す
            # 価値計算(DDQNにも対応できるように、行動決定のQネットワークと価値観数のQネットワークは分離)
            next_action = np.argmax(mainQN.model.predict(next_state)[0])  # 最大の報酬を返す行動を選択する
            target = reward + gamma * targetQN.model.predict(next_state)[0][next_action]
            TDerror = target - targetQN.model.predict(state)[0][action]
            self.buffer[i] = TDerror

    # TD誤差の絶対値和を取得
    def get_sum_absolute_TDerror(self):
        sum_absolute_TDerror = 0
        for i in range(0, (self.len() - 1)):
            sum_absolute_TDerror += abs(self.buffer[i]) + 0.0001  # 最新の状態データを取り出す

        return sum_absolute_TDerror


# [3]カートの状態に応じて、行動を決定するクラス
class Actor:
    def get_action(self, state, episode, targetQN):  # [C]t+1での行動を返す
        # 徐々に最適行動のみをとる、ε-greedy法
        epsilon = 0.001 + 0.9 / (1.0 + episode)

        if epsilon <= np.random.uniform(0, 1):
            retTargetQs = targetQN.model.predict(state)[0]
            action = np.argmax(retTargetQs)  # 最大の報酬を返す行動を選択する

        else:
            action = np.random.choice([0, 1])  # ランダムに行動する

        return action


# [4] メイン関数開始----------------------------------------------------
# [4.1] 初期設定--------------------------------------------------------
DQN_MODE = 1  # 1がDQN、0がDDQNです
LENDER_MODE = 1  # 0は学習後も描画なし、1は学習終了後に描画する

env = gym.make('CartPole-v0')
num_episodes = 299  # 総試行回数
max_number_of_steps = 200  # 1試行のstep数
goal_average_reward = 195  # この報酬を超えると学習終了
num_consecutive_iterations = 10  # 学習完了評価の平均計算を行う試行回数
total_reward_vec = np.zeros(num_consecutive_iterations)  # 各試行の報酬を格納
gamma = 0.99  # 割引係数
islearned = 0  # 学習が終わったフラグ
isrender = 0  # 描画フラグ
# ---
hidden_size = 16  # Q-networkの隠れ層のニューロンの数
learning_rate = 0.00001  # Q-networkの学習係数
memory_size = 1000  # バッファーメモリの大きさ
batch_size = 32  # Q-networkを更新するバッチの大記載

# [4.2]Qネットワークとメモリ、Actorの生成--------------------------------------------------------
mainQN = QNetwork(hidden_size=hidden_size, learning_rate=learning_rate)  # メインのQネットワーク
targetQN = QNetwork(hidden_size=hidden_size, learning_rate=learning_rate)  # 価値を計算するQネットワーク
# plot_model(mainQN.model, to_file='Qnetwork.png', show_shapes=True)        # Qネットワークの可視化
memory = Memory(max_size=memory_size)
memory_TDerror = Memory_TDerror(max_size=memory_size)

actor = Actor()

# [4.3]メインルーチン--------------------------------------------------------
for episode in range(num_episodes):  # 試行数分繰り返す
    env.reset()  # cartPoleの環境初期化
    state, reward, done, _ = env.step(env.action_space.sample())  # 1step目は適当な行動をとる
    state = np.reshape(state, [1, 4])  # list型のstateを、1行4列の行列に変換
    episode_reward = 0

    for t in range(max_number_of_steps + 1):  # 1試行のループ
        if (islearned == 1) and LENDER_MODE:  # 学習終了したらcartPoleを描画する
            env.render()
            time.sleep(0.1)
            print(state[0, 0])  # カートのx位置を出力するならコメントはずす

        action = actor.get_action(state, episode, mainQN)  # 時刻tでの行動を決定する
        next_state, reward, done, info = env.step(action)  # 行動a_tの実行による、s_{t+1}, _R{t}を計算する
        next_state = np.reshape(next_state, [1, 4])  # list型のstateを、1行4列の行列に変換

        # 報酬を設定し、与える
        if done:
            next_state = np.zeros(state.shape)  # 次の状態s_{t+1}はない
            if t < 195:
                reward = -1  # 報酬クリッピング、報酬は1, 0, -1に固定
            else:
                reward = 1  # 立ったまま195step超えて終了時は報酬
        else:
            reward = 0  # 各ステップで立ってたら報酬追加(はじめからrewardに1が入っているが、明示的に表す)

        episode_reward += 1  # reward  # 合計報酬を更新

        memory.add((state, action, reward, next_state))  # メモリの更新する

        # [※p4]TD誤差を格納する
        TDerror = memory_TDerror.get_TDerror(memory, gamma, mainQN, targetQN)
        memory_TDerror.add(TDerror)

        state = next_state  # 状態更新

        # [※p5]Qネットワークの重みを学習・更新する replay
        if (memory.len() > batch_size) and not islearned:
            if  total_reward_vec.mean() < 20:
                mainQN.replay(memory, batch_size, gamma, targetQN)
            else:
                mainQN.pioritized_experience_replay(memory, batch_size, gamma, targetQN, memory_TDerror)

        if DQN_MODE:
            targetQN = mainQN  # 行動決定と価値計算のQネットワークをおなじにする

        # 1施行終了時の処理
        if done:
            # [※p6]TD誤差のメモリを最新に計算しなおす
            targetQN = mainQN  # 行動決定と価値計算のQネットワークをおなじにする
            memory_TDerror.update_TDerror(memory, gamma, mainQN, targetQN)

            total_reward_vec = np.hstack((total_reward_vec[1:], episode_reward))  # 報酬を記録
            print('%d Episode finished after %f time steps / mean %f' % (episode, t + 1, total_reward_vec.mean()))
            break

    # 複数施行の平均報酬で終了を判断
    if total_reward_vec.mean() >= goal_average_reward:
        print('Episode %d train agent successfuly!' % episode)
        islearned = 1
        if isrender == 0:  # 学習済みフラグを更新
            isrender = 1
            env = wrappers.Monitor(env, './movie/cartpole_prioritized')  # 動画保存する場合

およそ70試行と、DQNよりちょっと早く学習できます。

実行結果の一例は以下の通りです。

openaigym.video.0.9237.video000000.gif

コードの中で重要な部分を解説します。

コードの解説

DQNから変化している部分は[※p]でコメントしています。

DQNはこちら●CartPoleでDQN(deep Q-learning)、DDQNを実装・解説

まず、※p1の優先順位付き経験再生で重みの学習の部分です。
コードをピックアップすると次の通りです。

今回はdequeに状態(s(t), a(t), r(t), s(t+1), a(t+1) )を格納するのに加えて、別のメモリmemory_TDerrorを用意して、そのときのTD誤差も保存しています。

 # [※p1] 優先順位付き経験再生で重みの学習
    def pioritized_experience_replay(self, memory, batch_size, gamma, targetQN, memory_TDerror):

        # 0からTD誤差の絶対値和までの一様乱数を作成(昇順にしておく)
        sum_absolute_TDerror = memory_TDerror.get_sum_absolute_TDerror()
        generatedrand_list = np.random.uniform(0, sum_absolute_TDerror,batch_size)
        generatedrand_list = np.sort(generatedrand_list)

        # [※p2]作成した乱数で串刺しにして、バッチを作成する
        batch_memory = Memory(max_size=batch_size)
        idx = 0
        tmp_sum_absolute_TDerror = 0
        for (i,randnum) in enumerate(generatedrand_list):
            while tmp_sum_absolute_TDerror < randnum:
                tmp_sum_absolute_TDerror += abs(memory_TDerror.buffer[idx]) + 0.0001
                idx += 1

            batch_memory.add(memory.buffer[idx])


        # あとはこのバッチで学習する
        inputs = np.zeros((batch_size, 4))
        targets = np.zeros((batch_size, 2))
        for i, (state_b, action_b, reward_b, next_state_b) in enumerate(batch_memory.buffer):
            inputs[i:i + 1] = state_b
            target = reward_b

            if not (next_state_b == np.zeros(state_b.shape)).all(axis=1):
                # 価値計算(DDQNにも対応できるように、行動決定のQネットワークと価値観数のQネットワークは分離)
                retmainQs = self.model.predict(next_state_b)[0]
                next_action = np.argmax(retmainQs)  # 最大の報酬を返す行動を選択する
                target = reward_b + gamma * targetQN.model.predict(next_state_b)[0][next_action]

            targets[i] = self.model.predict(state_b)  # Qネットワークの出力
            targets[i][action_b] = target  # 教師信号
            self.model.fit(inputs, targets, epochs=1, verbose=0)  # epochsは訓練データの反復回数、verbose=0は表示なしの設定

上では、最初に、メモリのTD誤差の絶対値和を求めて、その範囲内でバッチ処理分の一様乱数を生成しています。

そして、その一様乱数に当てはまる状態(s(t), a(t), r(t), s(t+1), a(t+1) )をバッチ処理に利用するものとして、ピックアップしています。
図示すると以下の通りです。

pri.png

上の図ですが、2つのメモリがあります。
まず、TD誤差のメモリの誤差の絶対値の和=sum_absolute_TDerrorを求めます。
次に、0からsum_absolute_TDerrorの範囲で、バッチ処理数分の一様乱数を生成します。
そしてその乱数が突き刺さる部分の (s(t), a(t), r(t), s(t+1), a(t+1))をバッチ処理に使用します。

なお絶対値の和を求めていくときに、0.0001を毎回足して、あまりに誤差が小さいものが無視されるのを防いでいます(※割り算しないので、なくても良いですが、ある方が安定する気がする)。

優先順位付き経験再生を実装する工夫は、ほぼ以上となります。

次に、※p3では、Memoryクラスを継承した、TD誤差を格納するクラスを定義しています。

# [※p3] Memoryクラスを継承した、TD誤差を格納するクラスです
class Memory_TDerror(Memory):

このクラスでは3つメソッドを新たに定義しています。
・最新の(s(t), a(t), r(t), s(t+1), a(t+1))のTD誤差を計算し、格納
・メモリ内の全てのTD誤差を更新
・TD誤差の絶対値和=sum_absolute_TDerrorを取得

そしてメインルーチンのなかでは、※p4でTD誤差を格納しています。

TDerror = memory_TDerror.get_TDerror(memory, gamma, mainQN, targetQN)
memory_TDerror.add(TDerror)

またメインルーチンでは、※p5でQネットワークの重みを学習・更新するexperience replayを行いますが、最初は普通のDQNにし、少しQ関数の更新が進んでから、優先順位付き経験再生を使用しています。

# [※p5]Qネットワークの重みを学習・更新する replay
        if (memory.len() > batch_size) and not islearned:
            if  total_reward_vec.mean() < 20:
                mainQN.replay(memory, batch_size, gamma, targetQN)
            else:
                mainQN.pioritized_experience_replay(memory, batch_size, gamma, targetQN, memory_TDerror)

Q関数が全然更新されていない状態で、優先順位付き経験再生をすると、そもそものQ(s,a)が、あまりにめちゃくちゃなので、不安定になるのを防ぐ工夫です。

最後に1試行が終わったときには、 ※p6でTD誤差のメモリを最新に計算しなおしています。

# 1施行終了時の処理
        if done:
            # [※p6]TD誤差のメモリを最新に計算しなおす
            targetQN = mainQN  # 行動決定と価値計算のQネットワークをおなじにする
            memory_TDerror.update_TDerror(memory, gamma, mainQN, targetQN)

これは1試行が終わってQ関数の精度が高くなると、格納していたTD誤差が実態と合わなくなるので、最新のQ関数でTD誤差を計算し直しています。

まとめ

以上、CartPoleで優先順位付き経験再生 prioritized experience replay DQN を実装・解説しました。

次回はディープラーニングを用いた強化学習である
A3Cを実装する予定です。

以上、ご一読いただき、ありがとうございました。

50
42
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
50
42