LoginSignup
7
8

More than 1 year has passed since last update.

第3回 今更だけど基礎から強化学習を勉強する 価値推定編(TD法、モンテカルロ法、GAE)

Last updated at Posted at 2021-05-03

今回はモデルフリーの環境における価値の推定(評価)手法の話がメインとなります。

第2回 ディープラーニング編(Q学習、方策勾配法/REINFORCE、A3C/A2C)
第4回 連続行動空間編

※ネット上の情報をかき集めて自分なりに実装しているので正確ではない可能性がある点はご注意ください
※ライブラリはTensowflow2.0(+Keras)を使っています。

コード全体

本記事で作成したコードは以下です。

環境(CartPole-v0)

第2回と同じ CartPole-v0を使っていきます。

学習コード概要

第2回のA3C/A2Cアルゴリズムをベースに価値関数の評価(推定)手法を見ていきます。
まずはメインとなるコードを作成し、手法に影響ある部分だけを変えて実装していきます。

今回のモデルは以下です。

bunrui-Page-4-3.png

Actor側の出力を softmax にしています。
そのままだと学習がうまくいかなかったので前段階にDense層をはさみました。
学習のメインコードは以下です。

import gym
import numpy as np
import tensorflow as tf
from tensorflow.python import keras
from tensorflow.keras.optimizers import Adam

# softmaxでアクションを決定
def SoftmaxPolicy(model, state, nb_actions):
    action_probs, _ = model(state.reshape((1,-1)))
    return np.random.choice(list(range(nb_actions)), size=1, p=action_probs[0].numpy())[0]

with gym.make("CartPole-v0") as env:
    nb_actions = env.action_space.n
    obs_shape = env.observation_space.shape

    # モデルの定義
    c = input_ = keras.layers.Input(shape=obs_shape)
    c = keras.layers.Dense(10, activation="relu")(c)
    c1 = keras.layers.Dense(10, activation="relu")(c)
    actor_layer = keras.layers.Dense(nb_actions, activation="softmax")(c1)
    critic_layer = keras.layers.Dense(1, activation="linear")(c)
    model = keras.Model(input_, [actor_layer, critic_layer])
    model.compile(optimizer=Adam(lr=0.01))

    # 経験バッファ用
    experiences = []

    # 学習ループ
    for episode in range(300):
        state = np.asarray(env.reset())
        done = False
        total_reward = 0

        # 1episode
        while not done:
            # アクションを決定
            action = SoftmaxPolicy(model, state, nb_actions)

            # 1step進める
            n_state, reward, done, _ = env.step(action)
            n_state = np.asarray(n_state)
            total_reward += reward

            # 経験を追加
            experiences.append({
                "state": state,
                "action": action,
                "reward": reward,
                "n_state": n_state,
                "done": done,
            })
            state = n_state

            (a)1step毎に学習する場合の処理を記載

        (b)1episode毎に学習する場合の処理を記載

        print("episode: {}, reward: {}".format(episode, total_reward))

各手法において、毎ステップ学習する手法の場合は(a)に、1エピソード毎に学習する手法は(b)にコードを記載します。

次にモデル学習側のメインコードです。
(c)に各手法での推定価値の計算を記述します。

その後、推定価値は正規化をおこなっています。
正規化は強化学習の話ではなく機械学習全般の話ですね。
ここでは平均0、分散1になるように正規化しています。

def train(model, experiences):

    (c) 各手法で推定価値(v_val)を計算

    # 推定価値の正規化(平均0,分散1)
    std = v_val.std()
    if std != 0:
        v_val = (v_val - v_val.mean()) / std

    # データ整形
    states = np.asarray([e["state"] for e in experiences])
    actions = np.asarray([e["action"] for e in experiences])

    # actionをonehot化
    onehot_actions = tf.one_hot(action_batch, nb_actions)

    # 勾配を計算
    with tf.GradientTape() as tape:
        action_probs, v = model(state_batch, training=True)

        # π(a|s)を計算
        selected_action_probs = tf.reduce_sum(onehot_actions * action_probs, axis=1, keepdims=True)

        # log(π(a|s)) * Q(s,a) を計算
        selected_action_probs = tf.clip_by_value(selected_action_probs, 1e-10, 1.0)  # 0にならないようにclip
        policy_loss = tf.math.log(selected_action_probs) * advantage

        # Value loss
        value_loss = tf.reduce_mean((v_val - v) ** 2, axis=1, keepdims=True)

        #--- 方策エントロピー
        entropy = tf.math.log(selected_action_probs) * selected_action_probs
        entropy = tf.reduce_sum(entropy, axis=1, keepdims=True)

        #--- total loss
        value_loss_weight = 1.0
        entropy_weight = 0.1
        loss2 = -policy_loss + value_loss_weight * value_loss - entropy_weight * entropy

        # ミニバッチ処理(平均をlossに)
        loss = tf.reduce_mean(loss2)

    # 勾配を元にoptimizerでモデルを更新
    gradients = tape.gradient(loss, model.trainable_variables)
    model.optimizer.apply_gradients(zip(gradients, model.trainable_variables))

価値関数の推定の仕方

第1回でも話しましたが強化学習は現在の価値を推定し、その差分を価値関数または方策関数という形で学習していく手法となります。
現在の価値の推定にはベルマン方程式が使われますが、未来の報酬 $V_\pi(s_{t+1})$ がありそのままでは計算できません。
今回はこの価値の推定方法についてまとめてみました。

TD法

1step進めた結果を元に価値を予測する手法です。
ExperienceMemoryと相性がよく、Q学習でよく使われる手法ですね。

現在の価値は以下の式で推定されます。

$$ V(s_t) = r_{t+1} + \gamma V(s_{t+1}) $$

次の状態の価値 $V(s_{t+1})$ が推定値となり、現在の価値関数から予測した値となります。

コードは以下です。

batch_size = 16

# 学習ループ
experiences = []
for episode in range(300):
    # 1episode
    while not done:
        
        experiences.append(経験)

        # (a) batchサイズたまったら学習
        if len(experiences) >= batch_size:
            train_TD(model, experiences)
            # On-policyアルゴリズムは学習毎に方策が変わるので、経験は学習毎に初期化する
            experiences = []


def train_TD(model, experiences):
    gamma = 0.9

    # 次の状態の推定価値を取得
    _, n_v = model(np.asarray([e["n_state"] for e in experiences]))
    n_v = n_v.numpy()

    # 各経験毎に価値を推定
    v_vals = []
    for i, e in enumerate(experiences):
        if e["done"]:
            # 終了時は次の状態がないので報酬のみ
            r = e["reward"]
        else:
            r = e["reward"] + gamma * n_v[i][0]
        v_vals.append(r)
    v_vals = np.asarray(v_vals).reshape((-1, 1))  # 整形

    (以下学習コード)

学習結果です。
On-policy 手法とは相性が悪く学習できていませんね。

cartpole3-1-TD.png

モンテカルロ法

1エピソードの結果を元に価値を推定する手法です。
推定といっていますが、実際に観測した報酬を使って推定しています。
TD法であった次の状態価値の推定値 $V(s_{t+1})$ が無くなっているのが特徴です。

$$ V(s_t) = r_{t+1} + \gamma r_{t+2} ... + \gamma^{T-t-1} r_{T-t} $$

# 学習ループ
experiences = []
for episode in range(300):
    # 1episode
    while not done:
        
        experiences.append(経験)

    # (b) 1episode毎に処理する
    if len(experiences) >= batch_size:
        train_MC(model, experiences)
        # On-policyアルゴリズムは学習毎に方策が変わるので、経験は学習毎に初期化する
        experiences = []


def train_MC(model, experiences):
    gamma = 0.9

    # 各経験毎に価値を推定、後ろから計算
    v_vals = []
    r = 0
    for e in reversed(experiences):
        if e["done"]:
            # 終了時は次の状態がないので報酬のみ
            r = e["reward"]
        else:
            r = e["reward"] + gamma * r
        v_vals.append(r)
    v_vals.reverse()  # 反転して元に戻す
    v_vals = np.asarray(v_vals).reshape((-1, 1))  # 整形

    (以下学習コード)

価値の計算は後ろから計算する事で計算しやすくなります。
学習結果です。

cartpole3-1-monte.png

Multistep lerning

モンテカルロ法では1エピソード全ての経験が対象でしたが、n-step先を対象に価値を推定する手法が Multistep lerning です。
1-stepの場合はTD法と同じになり、エピソード長と同じステップならモンテカルロ法と同じになります。

$$ V(s_t) = r_{t+1} + \gamma r_{t+2} ... + \gamma^{t+n-1} V(s_{t+n}) $$

n_step = 16

# 学習ループ
experiences = []
for episode in range(300):
    # 1episode
    while not done:
        
        experiences.append(経験)

        # (a) N step毎に処理する
        if len(experiences) >= n_step:
            train_ML(model, experiences)
            # On-policyアルゴリズムは学習毎に方策が変わるので、経験は学習毎に初期化する
            experiences = []


def train_ML(model, experiences):
    gamma = 0.9

    # 各経験毎に価値を推定、後ろから計算
    if experiences[-1]["done"]:
        # 最後が終わりの場合は全部使える
        r = 0
    else:
        # 最後が終わりじゃない場合は予測値vで補完する
        _, n_v = model(np.atleast_2d(experiences[-1]["n_state"]))
        r = n_v[0][0].numpy()

    v_vals = []
    for e in reversed(experiences):
        if e["done"]:
            r = 0
        r = e["reward"] + gamma * r
        v_vals.append(r)
    v_vals.reverse()  # 反転して元に戻す
    v_vals = np.asarray(v_vals).reshape((-1, 1))  # 整形

    (以下学習コード)

学習結果です。

cartpole3-1-multi.png

TD(λ)法

上記3つの手法における各stepでの推定価値ですが、本質的には同じ価値を推定しているので重みを使って平均化することができます。
どういうことかというとあるタイミングの価値はTD法だと (1-step価値)、モンテカルロ法だと (エピソード最後までの価値) ですが、全stepを足した平均 (1-step価値 + 2-step価値 + ... + エピソード最後までの価値) を使っても問題ないという考えです。

各step $n$毎の推定価値 $G_t^n$ は以下です。

\begin{align}
1step &: \ G_t^1 = r_{t+1} + \gamma V(s_{t+1}) \\
2step &: \ G_t^2 = r_{t+1} + \gamma r_{t+2} + \gamma^2 V(s_{t+2}) \\
nstep &: \ G_t^n = r_{t+1} + \gamma r_{t+2} + ... + \gamma^{n-1} r_{t+n} + \gamma^n V(s_{t+n}) \\
episode end &: \ G_t^{T-n} = r_{t+1} + \gamma r_{t+2} + ... + \gamma^{T-t} r_{T-t} \\
\end{align}

TD(λ)法ではこれら各stepに $\lambda^{n-1}$ に比例して重みづけをします。

$$ G_t^1 + \lambda G_t^2 + \lambda^2 G_t^3 ... $$

ここで $1 + \lambda + \lambda^2 + ... \to \frac{1}{1-\lambda}$ なので正規化するために $1-\lambda$ を掛けます。
すると以下となります。

$$ G_t^\lambda = (1-\lambda)(G_t^1 + \lambda G_t^2 + ... + \lambda^{T-n-2} G_t^{T-n-1}) + \lambda^{T-n-1} G_t^{T-n}$$

エピソード最後の $G_t^{T-n}$ に $1-\lambda$ がかかっていないのは、
$$(1-\lambda)(1 + \lambda + \lambda^2 + ... + \lambda^{T-t-2}) = 1- \lambda^{T-t-1}$$
になるからです。

各stepの重み $\lambda$ ですが $\lambda=0$ の場合TD法と同じになり、$\lambda=1$ の場合モンテカルロ法と同じになります。

コードは以下です。

# 学習ループ
experiences = []
for episode in range(300):
    # 1episode
    while not done:
        
        experiences.append(経験)

    # (b) 1エピソード毎に処理する
    if len(experiences) >= n_step:
        train_TDh(model, experiences)
        # On-policyアルゴリズムは学習毎に方策が変わるので、経験は学習毎に初期化する
        experiences = []


def train_TDh(model, experiences):
    gamma = 0.9
    td_lambda = 0.9

    # 次の状態の推定価値を出しておく
    _, n_v = model(np.asarray([e["n_state"] for e in experiences]))
    n_v = n_v.numpy()

    v_vals = []
    for i in range(len(experiences)):
        # 各step
        r = 0
        t = 0
        G = 0
        for j in range(i, len(experiences)-1):
            # 割引報酬を計算
            r += (gamma ** t) * experiences[j]["reward"]
            t += 1

            # 推定価値を計算
            G = (td_lambda ** (t-1)) * (r + (gamma ** t) * n_v[j][0])

        # G全体の計算とエピソード最後の報酬
        G = (1 - td_lambda) * G + (r + (gamma ** t) * experiences[-1]["reward"])

        v_vals.append(G)
    v_vals = np.asarray(v_vals).reshape((-1, 1))  # 整形

    (以下学習コード)

学習結果です。

cartpole3-1-TDh.png

方策勾配法における価値関数の推定

方策勾配法では行動価値 $Q^{\pi_\theta}(s,a)$ を推定する必要があります。

$$
\nabla J(θ) \propto E_{\pi_\theta} [ log \pi_{\theta}(a|s) Q^{\pi_\theta}(s,a) ]
$$

基本的にこの推定方法はTD法等上記で紹介した方法で推定しても問題ありません。
ただ、より学習を安定するためにいくつか手法がるようです。

参考

Baseline

方策勾配法では分散を小さくすることが重要となります。(機械学習全般にいえますが)
分散が大きいと学習後の方策が大きく異なってしまい、性能が大きく向上する可能性もありますが大きく低下する可能性もあります。

分散を小さくする手法の1つがBaseline $b(s_t)$ の導入です。

$$
\nabla J(θ) \propto E_{\pi_\theta} [ log \pi_{\theta}(a|s) (Q^{\pi_\theta}(s,a) - b(s_t))]
$$

Baseline $b(s_t)$ はどんな値をつかってもいいのですが、実装では価値の平均を採用しています。

コードは以下です。
経験の収集はモンテカルロ法で実装しています。

# 学習ループ
experiences = []
for episode in range(300):
    # 1episode
    while not done:
        
        experiences.append(経験)

    # (b) 1エピソード毎に処理する
    if len(experiences) >= n_step:
        train_B(model, experiences)
        # On-policyアルゴリズムは学習毎に方策が変わるので、経験は学習毎に初期化する
        experiences = []


def train_B(model, experiences):
    gamma = 0.9

    # 各経験毎に価値を推定、後ろから計算
    v_vals = []
    r = 0
    for e in reversed(experiences):
        if e["done"]:
            # 終了時は次の状態がないので報酬のみ
            r = e["reward"]
        else:
            r = e["reward"] + gamma * r
        v_vals.append(r)
    v_vals.reverse()  # 反転して元に戻す
    v_vals = np.asarray(v_vals).reshape((-1, 1))  # 整形

    # ベースラインとして平均を引く
    v_vals -= np.mean(v_vals)

    (以下学習コード)

学習結果です。

cartpole3-1-baseline.png

Advantage function

Advantage functionは以下で定義されます。

$$
A^{\pi}(s,a) = Q^{\pi}(s,a) - V^{\pi}(s)
$$

この Advantage function を用いて更新する手法となります。

$$
\nabla J(θ) \propto E_{\pi_\theta} [ log \pi_{\theta}(a|s) A^{\pi}(s,a)]
$$

Baselineの $b(s)$ を $V^{\pi}(s)$ にした物と同じですね。
コードは以下です。
経験の収集はモンテカルロ法で実装しています。

# 学習ループ
experiences = []
for episode in range(300):
    # 1episode
    while not done:
        
        experiences.append(経験)

    # (b) 1エピソード毎に処理する
    if len(experiences) >= n_step:
        train_Adv(model, experiences)
        # On-policyアルゴリズムは学習毎に方策が変わるので、経験は学習毎に初期化する
        experiences = []


def train_Adv(model, experiences):
    gamma = 0.9

    # 各経験毎に価値を推定、後ろから計算
    v_vals = []
    r = 0
    for e in reversed(experiences):
        if e["done"]:
            # 終了時は次の状態がないので報酬のみ
            r = e["reward"]
        else:
            r = e["reward"] + gamma * r
        v_vals.append(r)
    v_vals.reverse()  # 反転して元に戻す
    v_vals = np.asarray(v_vals).reshape((-1, 1))  # 整形

    # Vを引く
    _, v = model(np.asarray([e["state"] for e in experiences]))
    v_vals -= v.numpy()

    (以下学習コード)

学習結果です。

cartpole3-1-adv.png

GAE

Advantage function でも未来の価値をどう推定するかは決まっていませんでした。(上記実装ではモンテカルロ法で推定しています)
GAE(Generalized Advantage Estimator)では未来の価値をTD誤差を元に推定する方法となります。

TD誤差 $\delta_t^V$ は以下です。

$$ \delta_t^V = r_t + \gamma V(s_{t+1}) - V(s_t)$$

これを元にTD(λ)法のように各ステップをλで重みづけした手法がGAEとなります。

\hat{A}_t^{GAE(\gamma, \lambda)} = (1 - \lambda)(\delta_t^V + \lambda (\delta_t^V + \gamma \delta_{t+1}^V) + \lambda^2 (\delta_t^V + \gamma \delta_{t+1}^V + \gamma^2 \delta_{t+2}^V) + ... )

この式を解くと以下になります。

\hat{A}_t^{GAE(\gamma, \lambda)} = \sum^{\infty}_{l=0}( \gamma \lambda)^l \delta^V_{t+l}
n_step = 16

# 学習ループ
experiences = []
for episode in range(300):
    # 1episode
    while not done:
        
        experiences.append(経験)

        # (a) n-step毎に処理する
        if len(experiences) >= n_step:
            train_GAE(model, experiences)
            # On-policyアルゴリズムは学習毎に方策が変わるので、経験は学習毎に初期化する
            experiences = []


def train_GAE(model, experiences):
    gamma = 0.9
    gae_lambda = 0.9

    # 推定価値をだす
    _, v = model(np.asarray([e["state"] for e in experiences]))
    _, n_v = model(np.asarray([e["n_state"] for e in experiences]))
    v = v.numpy()
    n_v = n_v.numpy()

    # 逆から計算する
    last = 0
    v_vals = []
    for i in reversed(range(len(experiences))):
        e = experiences[i]
        if e["done"]:
            delta = e["reward"] - v[i]
            last = 0
        else:
            delta = e["reward"] + gamma * n_v[i] - v[i]
        last = delta + gamma * gae_lambda * last
        v_vals.append(last)
    v_vals.reverse()
    v_vals = np.asarray(v_vals).reshape((-1, 1))  # 整形

    (以下学習コード)

学習結果です。

cartpole3-1-GAE.png

あとがき

結果ですが、環境側の報酬や使うアルゴリズムによってかなり性能が変わります。
参考程度ですね。

価値推定方法は各手法に既知として組み込まれている場合が多く理解するまでに苦労しました。。。

Baseline についてちょっとだけ追記

他の実装をいくつか見た中から Baseline の実装について紹介しておきます。
こういう実装があるっていうだけで比較等は行いません(対象の環境にかなり左右されるので比較が難しい)
また、GAEは基本Baselineと合わせて使う事が多いようです。(GAEの結果からさらにBaselineを引く)

  • 平均

本記事の実装ですね。推定価値の平均を引く手法です。

v_vals = v_vals - np.mean(v_vals)
  • 状態価値

Advantage function です。
予測状態価値を引くやり方です。
状態価値モデルが実装されている手法しか適用できません。

v = value_model(states)  # 状態価値モデルから現状の価値を予測
v_vals = v_vals - v      # 推定した価値から予測した価値を引く
  • 標準正規分布に正規化

標準正規分布に正規化する方法です。

v_vals = (v_vals - np.mean(v_vals)) / (np.std(v_vals) + 1e-8)
  • 分散を1に正規化

平均は無視して分散のみ正規化する方法です。

v_vals = v_vals / (np.std(v_vals) + 1e-4)
7
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
8