Help us understand the problem. What is going on with this article?

【強化学習初心者向け】シンプルな実装例で学ぶQ学習、DQN、DDQN【CartPoleで棒立て:1ファイルで完結、Kearas使用】

More than 1 year has passed since last update.

※2018年06月23日追記
PyTorchを使用した最新版の内容を次の書籍にまとめました。
つくりながら学ぶ! 深層強化学習 ~PyTorchによる実践プログラミング~ 18年6月28日発売


「倒立振子(棒立て問題)」を、強化学習のQ学習、DQNおよびDDQN(Double DQN)で実装・解説したので、紹介します。

ディープラーニングのライブラリにはKerasを使用しました。

(※追記:17/09/27にHuber関数部分を修正しました)
(※追記:17/10/01にQ学習更新のr抜けを修正しました)
(※追記:17/10/03にQ学習報酬のrewardを修正しました)
(※追記:18/05/16にDDQNのターゲットの更新方法を修正しました)
(※追記:18/06/12にDQNのplayとDDQNのターゲットの更新方法を修正しました)
(※追記:18/10/20にDDQNの行動選択の引数がtargetQNになっていたのを、mainQNに修正しました)

DDQN.gif

概要

Open AI GymのCartPole(棒立て)をQ学習(Q-learning)、DQN(deep Q-learning)およびDDQN(Dobule DQN)で実装しました。

【対象者】
・強化学習に興味がある方
・強化学習を実装例から学びたい方
ゼロからDeepまで学ぶ強化学習を読み、次は簡単な実装例が見たい方
これからの強化学習(白い本)や強化学習(青い本)を読んだが、理解が難しく、簡単な実装例を知りたい方

【得られるもの】
各手法を使用したミニマム・シンプルなプログラムが実装できるようになります。

※保守性や汎用性を重視した「分かる人には扱いやすいプログラム」は書いていません。

強化学習の初学者が分かりやすいように、上から下に読んで、すんなり理解できるプログラムを書くように心がけました。

シンプルなクラス設計、関数設計にし、1ファイルでプログラムが完結しています。

コメントを多めに入れるようにしました。

※各プログラムは120行から170行程度です。
※各コードの詳細説明を掲載すると長くすぎるので、別ページに掲載しております。

棒立て、CartPoleについて

CartPoleはOpen AI Gymと呼ばれるライブラリの「棒を立て続けるタスク」です(冒頭の動画)。

子供の頃、掃除の時間に、ほうきを手のひらで立てて遊んだと思いますが、あれです。
手のひらの代わりに、Cart(車)になっています。

時刻tでの「状態s(t)」はcartの位置x、cartの速度v、棒の角度θ、棒の角速度ωの4次元で表現されます。

私たちにできる行動(Action)は、cartを右に押すか、左に押すかの2択です。

状態s(t)に応じて、適切な行動a(t)を選択し、棒を200stepの間、立て続けられたら成功です。

Open AI gymやディープラーニングのライブラリはWindowsでは使用しにくいので、Ubuntu環境をおすすめします。

私はVirtual Boxを使用したり、PCをWindowsとUbuntuのデュアルブートにしています。

強化学習について

今回やりたいことは、
状態s(t)=[x(t)、v(t)、θ(t)、ω(t)]において、適切な行動a(t)を求めることです。

Q学習、DQNについては、以下の解説が分かりやすいです。
まずはこちらを読んで、イメージをつかんでください。

ゼロからDeepまで学ぶ強化学習

上記サイトで分かりづらかった点のみ、補足的に説明します。

Q学習(Q-learning)

Q関数の表現方法

私は、Q関数の実装方法が最初よく分からなかったので、解説します。

Q学習のゴールは、正しい行動価値関数Q(s, a)を求めることです。

行動価値関数Q(s,a)は、状態sのときに行動aをした場合、その先最大で得られるであろう報酬合計R(t)を返す関数です。
※未来の報酬は、時間割引率をかけて、若干小さくなっています

Q学習では、このQ関数はテーブル(表)形式で実装されることになります。

今回、カートの位置x(t)などの状態変数は全て連続値ですが、「xは-2.4から2.4までを6分割した離散値にする」などとして、「離散化」を行います。

仮に4つの状態変数を全て6分割したとすると、6^4 = 1296 通りの状態で表されます。

Q関数は、行方向が1296、列方向は右に押すか、左に押すかの2通りで、[1296×2]の行列で表現されます。
各行列のマスの中身はQ(s,a)の値となります。

実装:CartPoleをQ学習で解く

実際にQ学習でCartPoleを実装したコードがこちらです。
100行ちょっとになります。

qlearning.py
# coding:utf-8
# [0]ライブラリのインポート
import gym  #倒立振子(cartpole)の実行環境
from gym import wrappers  #gymの画像保存
import numpy as np
import time


# [1]Q関数を離散化して定義する関数 ------------
# 観測した状態を離散値にデジタル変換する
def bins(clip_min, clip_max, num):
    return np.linspace(clip_min, clip_max, num + 1)[1:-1]

# 各値を離散値に変換
def digitize_state(observation):
    cart_pos, cart_v, pole_angle, pole_v = observation
    digitized = [
        np.digitize(cart_pos, bins=bins(-2.4, 2.4, num_dizitized)),
        np.digitize(cart_v, bins=bins(-3.0, 3.0, num_dizitized)),
        np.digitize(pole_angle, bins=bins(-0.5, 0.5, num_dizitized)),
        np.digitize(pole_v, bins=bins(-2.0, 2.0, num_dizitized))
    ]
    return sum([x * (num_dizitized**i) for i, x in enumerate(digitized)])


# [2]行動a(t)を求める関数 -------------------------------------
def get_action(next_state, episode):
           #徐々に最適行動のみをとる、ε-greedy法
    epsilon = 0.5 * (1 / (episode + 1))
    if epsilon <= np.random.uniform(0, 1):
        next_action = np.argmax(q_table[next_state])
    else:
        next_action = np.random.choice([0, 1])
    return next_action


# [3]Qテーブルを更新する関数 -------------------------------------
def update_Qtable(q_table, state, action, reward, next_state):
    gamma = 0.99
    alpha = 0.5
    next_Max_Q=max(q_table[next_state][0],q_table[next_state][1] )
    q_table[state, action] = (1 - alpha) * q_table[state, action] +\
            alpha * (reward + gamma * next_Max_Q)

    return q_table

# [4]. メイン関数開始 パラメータ設定--------------------------------------------------------
env = gym.make('CartPole-v0')
max_number_of_steps = 200  #1試行のstep数
num_consecutive_iterations = 100  #学習完了評価に使用する平均試行回数
num_episodes = 2000  #総試行回数
goal_average_reward = 195  #この報酬を超えると学習終了(中心への制御なし)
# 状態を6分割^(4変数)にデジタル変換してQ関数(表)を作成
num_dizitized = 6  #分割数
q_table = np.random.uniform(
    low=-1, high=1, size=(num_dizitized**4, env.action_space.n))

total_reward_vec = np.zeros(num_consecutive_iterations)  #各試行の報酬を格納
final_x = np.zeros((num_episodes, 1))  #学習後、各試行のt=200でのxの位置を格納
islearned = 0  #学習が終わったフラグ
isrender = 0  #描画フラグ


# [5] メインルーチン--------------------------------------------------
for episode in range(num_episodes):  #試行数分繰り返す
    # 環境の初期化
    observation = env.reset()
    state = digitize_state(observation)
    action = np.argmax(q_table[state])
    episode_reward = 0

    for t in range(max_number_of_steps):  #1試行のループ
        if islearned == 1:  #学習終了したらcartPoleを描画する
            env.render()
            time.sleep(0.1)
            print (observation[0])  #カートのx位置を出力

        # 行動a_tの実行により、s_{t+1}, r_{t}などを計算する
        observation, reward, done, info = env.step(action)

        # 報酬を設定し与える
        if done:
            if t < 195:
                reward = -200  #こけたら罰則
            else:
                reward = 1  #立ったまま終了時は罰則はなし
        else:
            reward = 1  #各ステップで立ってたら報酬追加

        episode_reward += reward  #報酬を追加

        # 離散状態s_{t+1}を求め、Q関数を更新する
        next_state = digitize_state(observation)  #t+1での観測状態を、離散値に変換
        q_table = update_Qtable(q_table, state, action, reward, next_state)

        #  次の行動a_{t+1}を求める 
        action = get_action(next_state, episode)    # a_{t+1} 

        state = next_state

        #終了時の処理
        if done:
            print('%d Episode finished after %f time steps / mean %f' %
                  (episode, t + 1, total_reward_vec.mean()))
            total_reward_vec = np.hstack((total_reward_vec[1:],
                                          episode_reward))  #報酬を記録
            if islearned == 1:  #学習終わってたら最終のx座標を格納
                final_x[episode, 0] = observation[0]
            break

    if (total_reward_vec.mean() >=
            goal_average_reward):  # 直近の100エピソードが規定報酬以上であれば成功
        print('Episode %d train agent successfuly!' % episode)
        islearned = 1
        #np.savetxt('learned_Q_table.csv',q_table, delimiter=",") #Qtableの保存する場合
        if isrender == 0:
            #env = wrappers.Monitor(env, './movie/cartpole-experiment-1') #動画保存する場合
            isrender = 1
    #10エピソードだけでどんな挙動になるのか見たかったら、以下のコメントを外す
    #if episode>10:
    #    if isrender == 0:
    #        env = wrappers.Monitor(env, './movie/cartpole-experiment-1') #動画保存する場合
    #        isrender = 1
    #    islearned=1;

if islearned:
    np.savetxt('final_x.csv', final_x, delimiter=",")

実行結果の一例は以下の通りです。

100試行ほどで、立ちますが、どっか行ってしまいます。
(子供がホウキ立てをやってるときに似ていて興味深い)

learn100.gif

1000試行ほどで、200step立てるようになります。

learned_notcenter.gif

「Q学習の詳細な説明」および、「掲載したコードの詳細な解説」はこちらを御覧ください。

CartPoleでQ学習(Q-learning)を実装・解説【Phythonで強化学習:第1回】

DQN

DQNはQ学習のQ関数をDL(ディープラーニング)で表した方法です。

やはりまずはこちらを読んで、イメージをつかんでください。

ゼロからDeepまで学ぶ強化学習

また、こちらの新しい本にも、最新の強化学習を紹介する章が用意されており、DNQやDDQNからA3Cまで説明があります。

速習 強化学習: 基礎理論とアルゴリズム(書籍)

Qネットワーク

Q関数をディープニューラルネットワークで表したQネットワークについて説明します。

入力層のニューロン数は、状態の次元数となります。
今回であれば、cartの位置x、cartの速度v、棒の角度θ、棒の角速度ωの4つです。

状態の値をそのまま入力層の素子に与えます。
Q学習とは異なり、離散化はしません。
連続値をそのまま入力します。

出力層のニューロン数は、選択できる行動の数となります。
今回であれば、右か左の2つです。

そして出力ニューロンが出力する値は、「右に押すニューロン」であれば、Q(s(t), 右)の値です。
状態s(t)のときにa(t) = 右に押す
を行った場合に、その先に得られる報酬の総計が出力されます。

今回の例ですと以下のようなネットワークを使用します。

Qnetwork.png

Qネットワークの学習

Qネットワークの重みを変化させて、より良いQネットワークを実現します。

状態s(t)でa(t)=右に押す の場合、Qネットワークの出力層、右ニューロンはQ(s(t), 右)という値を出力します。

時刻tの時点で出力してほしいのは、r(t)+γ・MAX[Q(s_{t+1}, a_{t+1})]です。
※この教師信号も本当は学習途中です
※r(t)は時刻tでもらう報酬、γは時間割引率

Q(s(t), 右)= r(t)+γ・MAX[Q(s_{t+1}, a_{t+1})]
となれば、学習は終了していることになります。

この、Q(s(t), 右に押す)と r(t)+γ・MAX[Q(s_{t+1}, a_{t+1})]の差が小さくなる方向にQネットワークの重みを更新します。

DQN特有の4つの工夫

DQNは単純にQ関数をDL(ディープラーニング)にする以外に、4つの工夫があります。

こちらのページが分かりやすいので、まずイメージをつかんでください。
DQNをKerasとTensorFlowとOpenAI Gymで実装する

1つ目の工夫Experience Replayは、学習内容をメモリに保存して、ランダムにとりだして学習します。

2つ目の工夫Fixed Target Q-Networkは、1step分ずつ学習するのでなく、複数ステップ分をまとめて学習(バッチ学習)します。

3つ目の工夫報酬のclippingは、各ステップでの報酬を-1から1の間にします。
今回は各ステップで立っていたら報酬0、こけたら報酬-1、195 step以上立って終了したら報酬+1とクリップしました。

4つ目の誤差関数の工夫は、誤差が1以上では二乗誤差でなく絶対値誤差を使用するHuber関数を実装します。

DDQN

DDQN(Double DQN)は行動価値関数Qを、価値と行動を計算するメインのQmainと、MAX[Q(s_{t+1}, a_{t+1})]を評価するQtargetに分ける方法です。

分けることで、Q関数の誤差が増大するのを防ぎます。

今回は、試行ごとにQtargetを更新することでDDQNを実現しました。
各試行ではQtargetはひとつ前の試行のQmainの最終値を使用します。

DQN、DDQNの実装

実装したコードがこちらです。

DQN、DDQNは同じファイルです。
途中のDQN_MODEという変数で切り替えています。

dqnddqn.py
# coding:utf-8
# [0]必要なライブラリのインポート
import gym  # 倒立振子(cartpole)の実行環境
import numpy as np
import time
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
from keras.utils import plot_model
from collections import deque
from gym import wrappers  # gymの画像保存
from keras import backend as K
import tensorflow as tf


# [1]損失関数の定義
# 損失関数にhuber関数を使用します 参考https://github.com/jaara/AI-blog/blob/master/CartPole-DQN.py
def huberloss(y_true, y_pred):
    err = y_true - y_pred
    cond = K.abs(err) < 1.0
    L2 = 0.5 * K.square(err)
    L1 = (K.abs(err) - 0.5)
    loss = tf.where(cond, L2, L1)  # Keras does not cover where function in tensorflow :-(
    return K.mean(loss)


# [2]Q関数をディープラーニングのネットワークをクラスとして定義
class QNetwork:
    def __init__(self, learning_rate=0.01, state_size=4, action_size=2, hidden_size=10):
        self.model = Sequential()
        self.model.add(Dense(hidden_size, activation='relu', input_dim=state_size))
        self.model.add(Dense(hidden_size, activation='relu'))
        self.model.add(Dense(action_size, activation='linear'))
        self.optimizer = Adam(lr=learning_rate)  # 誤差を減らす学習方法はAdam
        # self.model.compile(loss='mse', optimizer=self.optimizer)
        self.model.compile(loss=huberloss, optimizer=self.optimizer)

    # 重みの学習
    def replay(self, memory, batch_size, gamma, targetQN):
        inputs = np.zeros((batch_size, 4))
        targets = np.zeros((batch_size, 2))
        mini_batch = memory.sample(batch_size)

        for i, (state_b, action_b, reward_b, next_state_b) in enumerate(mini_batch):
            inputs[i:i + 1] = state_b
            target = reward_b

            if not (next_state_b == np.zeros(state_b.shape)).all(axis=1):
                # 価値計算(DDQNにも対応できるように、行動決定のQネットワークと価値観数のQネットワークは分離)
                retmainQs = self.model.predict(next_state_b)[0]
                next_action = np.argmax(retmainQs)  # 最大の報酬を返す行動を選択する
                target = reward_b + gamma * targetQN.model.predict(next_state_b)[0][next_action]

            targets[i] = self.model.predict(state_b)    # Qネットワークの出力
            targets[i][action_b] = target               # 教師信号

        # shiglayさんよりアドバイスいただき、for文の外へ修正しました
        self.model.fit(inputs, targets, epochs=1, verbose=0)  # epochsは訓練データの反復回数、verbose=0は表示なしの設定


# [3]Experience ReplayとFixed Target Q-Networkを実現するメモリクラス
class Memory:
    def __init__(self, max_size=1000):
        self.buffer = deque(maxlen=max_size)

    def add(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        idx = np.random.choice(np.arange(len(self.buffer)), size=batch_size, replace=False)
        return [self.buffer[ii] for ii in idx]

    def len(self):
        return len(self.buffer)


# [4]カートの状態に応じて、行動を決定するクラス
# アドバイスいただき、引数にtargetQNを使用していたのをmainQNに修正しました
class Actor:
    def get_action(self, state, episode, mainQN):   # [C]t+1での行動を返す
        # 徐々に最適行動のみをとる、ε-greedy法
        epsilon = 0.001 + 0.9 / (1.0+episode)

        if epsilon <= np.random.uniform(0, 1):
            retTargetQs = mainQN.model.predict(state)[0]
            action = np.argmax(retTargetQs)  # 最大の報酬を返す行動を選択する

        else:
            action = np.random.choice([0, 1])  # ランダムに行動する

        return action


# [5] メイン関数開始----------------------------------------------------
# [5.1] 初期設定--------------------------------------------------------
DQN_MODE = 1    # 1がDQN、0がDDQNです
LENDER_MODE = 1 # 0は学習後も描画なし、1は学習終了後に描画する

env = gym.make('CartPole-v0')
num_episodes = 299  # 総試行回数
max_number_of_steps = 200  # 1試行のstep数
goal_average_reward = 195  # この報酬を超えると学習終了
num_consecutive_iterations = 10  # 学習完了評価の平均計算を行う試行回数
total_reward_vec = np.zeros(num_consecutive_iterations)  # 各試行の報酬を格納
gamma = 0.99    # 割引係数
islearned = 0  # 学習が終わったフラグ
isrender = 0  # 描画フラグ
# ---
hidden_size = 16               # Q-networkの隠れ層のニューロンの数
learning_rate = 0.00001         # Q-networkの学習係数
memory_size = 10000            # バッファーメモリの大きさ
batch_size = 32                # Q-networkを更新するバッチの大記載

# [5.2]Qネットワークとメモリ、Actorの生成--------------------------------------------------------
mainQN = QNetwork(hidden_size=hidden_size, learning_rate=learning_rate)     # メインのQネットワーク
targetQN = QNetwork(hidden_size=hidden_size, learning_rate=learning_rate)   # 価値を計算するQネットワーク
# plot_model(mainQN.model, to_file='Qnetwork.png', show_shapes=True)        # Qネットワークの可視化
memory = Memory(max_size=memory_size)
actor = Actor()

# [5.3]メインルーチン--------------------------------------------------------
for episode in range(num_episodes):  # 試行数分繰り返す
    env.reset()  # cartPoleの環境初期化
    state, reward, done, _ = env.step(env.action_space.sample())  # 1step目は適当な行動をとる
    state = np.reshape(state, [1, 4])   # list型のstateを、1行4列の行列に変換
    episode_reward = 0


    # 2018.05.16
    # skanmeraさんより間違いを修正いただきました
    # targetQN = mainQN   # 行動決定と価値計算のQネットワークをおなじにする
    # ↓
    targetQN.model.set_weights(mainQN.model.get_weights())

    for t in range(max_number_of_steps + 1):  # 1試行のループ
        if (islearned == 1) and LENDER_MODE:  # 学習終了したらcartPoleを描画する
            env.render()
            time.sleep(0.1)
            print(state[0, 0])  # カートのx位置を出力するならコメントはずす

        action = actor.get_action(state, episode, mainQN)   # 時刻tでの行動を決定する
        next_state, reward, done, info = env.step(action)   # 行動a_tの実行による、s_{t+1}, _R{t}を計算する
        next_state = np.reshape(next_state, [1, 4])     # list型のstateを、1行4列の行列に変換

        # 報酬を設定し、与える
        if done:
            next_state = np.zeros(state.shape)  # 次の状態s_{t+1}はない
            if t < 195:
                reward = -1  # 報酬クリッピング、報酬は1, 0, -1に固定
            else:
                reward = 1  # 立ったまま195step超えて終了時は報酬
        else:
            reward = 0  # 各ステップで立ってたら報酬追加(はじめからrewardに1が入っているが、明示的に表す)

        episode_reward += 1 # reward  # 合計報酬を更新

        memory.add((state, action, reward, next_state))     # メモリの更新する
        state = next_state  # 状態更新


        # Qネットワークの重みを学習・更新する replay
        if (memory.len() > batch_size) and not islearned:
            mainQN.replay(memory, batch_size, gamma, targetQN)

        if DQN_MODE:
        # 2018.06.12
        # shiglayさんさんより間違いを修正いただきました
        # targetQN = mainQN   # 行動決定と価値計算のQネットワークをおなじにする
        # ↓
            targetQN.model.set_weights(mainQN.model.get_weights())

        # 1施行終了時の処理
        if done:
            total_reward_vec = np.hstack((total_reward_vec[1:], episode_reward))  # 報酬を記録
            print('%d Episode finished after %f time steps / mean %f' % (episode, t + 1, total_reward_vec.mean()))
            break

    # 複数施行の平均報酬で終了を判断
    if total_reward_vec.mean() >= goal_average_reward:
        print('Episode %d train agent successfuly!' % episode)
        islearned = 1
        if isrender == 0:   # 学習済みフラグを更新
            isrender = 1

            # env = wrappers.Monitor(env, './movie/cartpoleDDQN')  # 動画保存する場合
            # 10エピソードだけでどんな挙動になるのか見たかったら、以下のコメントを外す
            # if episode>10:
            #    if isrender == 0:
            #        env = wrappers.Monitor(env, './movie/cartpole-experiment-1') #動画保存する場合
            #        isrender = 1
            #    islearned=1;

実行結果の一例は以下の通りです。
100試行ほどで、200step立てるようになります。

Q学習のときは学習に1000試行かかりましたが、DQN、DDQNは1/10の100試行ほどで立てるようになりました。

DQNよりDDQNの方が収束が早い気がします。

DQN

DQN.gif

DDQN

DDQN.gif

「DQN、DDQNの詳細な説明」および、「掲載したコードの詳細な解説」はこちらを御覧ください。

CartPoleでDQN(deep Q-learning)、DDQNを実装・解説【Phythonで強化学習:第2回】

以上、CartPoleでQ学習、DQN、DDQNをシンプルに実装する方法を紹介しました。

強化学習の実装イメージを持ってもらえれば幸いです。

次回はディープラーニングを用いたより発展的な強化学習である
dueling network、prioritized experience replay、A3C
あたりを実装する予定です。

しばしお待ち下さい。

以上、ご一読いただき、ありがとうございました。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした