3
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

第12回 今更だけど基礎から強化学習を勉強する 連続状態空間モデルベース編

Posted at

前回、前々回はテーブル型の状態を扱っていましたが、今回は連続空間の状態を扱います。

第11回 モンテカルロ木探索編

※ネット上の情報をかき集めて自分なりに実装しているので正確ではない可能性がある点はご注意ください
※ライブラリはTensowflow2.0(+Keras)を使っています。

コード全体

本記事で作成したコードは以下です。

はじめに

モデルベース強化学習の情報ですが論文ベースの話が多く、まだ手法の関係性についてあまり見えていません。
今までとちょっと違いますが今回の目次は以下です。

  • モデルベース強化学習の分類(?)
  • 近似モデルの学習(独自モデル)
  • モデルベース強化学習の論文ベースの近似モデル
  • PETS

モデルベース強化学習の分類(?)

下記の9pに書かれていたモデルベース強化学習の概要をそのまままとめたものです。
とりあえずモデルベース強化学習の概要をつかみたかったので…

[DL輪読会]Model-Based Reinforcement Learning via Meta-Policy Optimization

分類 概要 手法例
Dyna-Style 実環境から近似モデルを学習し、近似モデルのシミュレーションから方策を更新 ME-TRPO,SLBO,MB-MPO,(モンテカルロ木探索もここだと思います)
Policy Search with Backpropagation through Time 近似モデルが不正確なもの=確率的モデルとして方策を更新 PILCO,iLQG,GPS,SVG
Shooting Algorithm ある特定のアクション列を元に近似モデルから報酬を予測する手法 RS,MB-FM,PETS-RS,RETS-CEM

参考

近似モデルの学習(独自モデル)

第10回で書いた近似モデルをそのまま表現しました。
報酬関数$R_{\eta}(s,a)$は回帰問題、状態遷移関数 $T_{\eta}$(s'|s,a) と終了関数 $D_{\eta}(d|s,a)$ は確率密度分布の推定問題として学習します。

今回の実装では以下のようにモデル化しました。

対象 入力 出力 モデル化
状態遷移関数 (状態,アクション) 次の状態の正規分布 ニューラルネットワーク
報酬関数 (状態,アクション) 得られる報酬(の期待値) ニューラルネットワーク
終了関数 (状態,アクション) 終了かどうか ランダムフォレスト

状態遷移関数と報酬関数のモデル

モデルは以下です。

bunrui2-Page-6.png

報酬関数はそのまま線形です。
状態遷移関数は正規分布を出力するのでモデルとしては平均と標準偏差を出力しています。

状態遷移関数から実際に次の状態を出力する場合に乱数の処理が入るので Reparameterization trick も使用しています。(第8回を参考)

コードは以下です。

class _A_MDP_trans_reward(keras.Model):
    def __init__(self, env):
        super().__init__()

        self.batch_size = 32
        dense_units = 16
        activation = "relu"

        # 共通レイヤー
        self.d1 = keras.layers.Dense(dense_units, activation=activation)
        
        # rewardレイヤー
        self.d2_reward = keras.layers.Dense(dense_units, activation=activation)
        self.o_reward = keras.layers.Dense(1, activation="linear")
        
        # 遷移レイヤー
        self.d2_trans = keras.layers.Dense(dense_units, activation=activation)
        self.o_trans_mean = keras.layers.Dense(env.observation_space.shape[0], activation="linear")
        self.o_trans_stddev = keras.layers.Dense(env.observation_space.shape[0], activation="linear")

        # optimizer
        self.optimizer = Adam(learning_rate=0.01)


    # Forward pass
    def call(self, states, actions, training=False):
        x = tf.concat([states, actions], axis=1)
        x = self.d1(x)

        #--- reward
        reward = self.d2_reward(x)
        reward = self.o_reward(reward)

        #--- trans
        t = self.d2_trans(x)
        mean = self.o_trans_mean(t)
        stddev = self.o_trans_stddev(t)

        # σ > 0 になるように変換(指数関数)
        stddev = tf.exp(stddev)

        return reward, mean, stddev


    # 次の状態を返す
    def sample_next_state(self, mean, stddev):
        # Reparameterization trick
        normal_random = tf.random.normal(mean.shape, mean=0., stddev=1.)
        next_states = mean + stddev * normal_random
        return next_states


    # モデルを更新する
    def update(self, state, n_state, action, reward):
        
        # ミニバッチ
        idx = np.random.randint(0, len(state), self.batch_size)
        state_batch = state[idx]
        n_state_batch = n_state[idx]
        action_batch = action[idx]
        reward_batch = reward[idx].reshape(-1, 1)
        
        # 勾配を計算
        with tf.GradientTape() as tape:
            pred_reward, mean, stddev = self(state_batch, action_batch, training=True)
            pred_next_states = self.sample_next_state(mean, stddev)

            # MSE
            reward_loss = tf.reduce_mean(tf.square(reward_batch - pred_reward))
            trans_loss = tf.reduce_mean(tf.square(n_state_batch - pred_next_states))

            loss = reward_loss + trans_loss

        grads = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.trainable_variables))

        return trans_loss.numpy(), reward_loss.numpy()

終了関数のモデル

終了関数は2値分類の学習となります。(終了するか終了しないかの学習)
ただ、そのまま学習しようとすると失敗しました。
理由は簡単で不均衡データを学習するからです。
ようするに終了するというデータに対して終了しないというデータが圧倒的に多くなります。

bunrui2-Page-7.png

今回の実装では不均衡データに対してアンダーサンプリング(多いほうのデータを減らす)を適用し、ランダムフォレストで学習しています。

import sklearn.ensemble
import sklearn.metrics
import imblearn.under_sampling
class _A_MDP_done():
    def __init__(self, env):
        # ランダムフォレストを定義
        self.model = sklearn.ensemble.RandomForestClassifier()
        self.is_fit = False  # 1回でもがくしゅうしていないとエラー

    # Forward pass
    def __call__(self, states, actions, training=False):
        if not self.is_fit:
            # 学習していない場合はとりあえずFalseを返す
            return np.full((len(states), 1), False)

        # 入力データを作成
        x_all = np.append(states, actions, axis=1)

        # 予測
        done = self.model.predict(x_all)
        return done.reshape((-1, 1))


    # モデルの更新
    def update(self, x_state, x_action, done):
        # 入力データを作成
        x_all = np.append(x_state, x_action, axis=1)

        # 教師データ
        y_all = done

        # 終了したデータがない場合は更新をskip
        if sum(y_all==True) == 0:
            return 0
        
        # アンダーサンプリング
        # 終了データ 100、終了しないデータ、3000
        #  → 終了データ 100、終了しないデータ、100
        under = imblearn.under_sampling.RandomUnderSampler(
            sampling_strategy ={
                0 : sum(y_all==True),
                1 : sum(y_all==True)
            })
        x_train_under, y_train_under = under.fit_resample(x_all, y_all)
        
        # モデルを学習
        self.model.fit(x_train_under, y_train_under)
        self.is_fit = True

        # 評価は適合率(precision)
        done_loss = sklearn.metrics.precision_score(y_all, self.model.predict(x_all))
        
        return done_loss

ハイパーパラメータのチューニングは特にしていません。

A_MDPモデル

上記2つのモデルを管理するクラスを作成しています。
アルゴリズムには関係ないのでコードのみ載せておきます。

class A_MDP():
    def __init__(self, env, buffer_size=1024):
        
        # 終了関数モデルの更新間隔
        self.done_update_interval = 50
        
        # 各モデル
        self.reward_state_model = _A_MDP_trans_reward(env)
        self.done_model = _A_MDP_done(env)
        
        # 経験キュー
        self.experiences = collections.deque(maxlen=buffer_size)

        # アクション情報
        self.nb_actions = env.action_space.n
        self.env_actions = [i for i in range(self.nb_actions)]

        self.update_count = 0

    # 全状態を返す
    # 実際は全情報は不明なので、経験キューにある情報を返す
    @property
    def states(self):
        return [e["state"] for e in self.experiences]
    
    # 全アクションを返す
    @property
    def actions(self):
        return self.env_actions

    # 実環境からの経験を追加
    def add(self, experience):
        self.experiences.append(experience)

    # 近似モデルを更新する
    def update(self):
        
        # データ整形
        states = np.asarray([e["state"] for e in self.experiences])
        n_states = np.asarray([e["n_state"] for e in self.experiences])
        actions = np.asarray([e["action"] for e in self.experiences])
        rewards = np.asarray([e["reward"] for e in self.experiences])
        done_list = np.asarray([e["done"] for e in self.experiences])

        # action one-hot
        actions_onehot = np.identity(self.nb_actions)[actions]

        # doneモデルの更新
        # データにあまり変更がない状態で更新しても効果が薄いので、ある程度間隔を置いて更新する
        if self.update_count % self.done_update_interval == 0:
            done_loss = self.done_model.update(states, actions_onehot, done_list)
            self.history_done_loss_episode.append(done_loss)

        # trans,rewardモデルの更新
        trans_loss, reward_loss = self.reward_state_model.update(states, n_states, actions_onehot, rewards)
        self.history_train_loss_episode.append(trans_loss)
        self.history_reward_loss_episode.append(reward_loss)

        self.update_count += 1


    # 予測した次の状態を返す
    def sample_step(self, state, action):
        # action one-hot
        action_onehot = np.identity(self.nb_actions)[action]

        rewards, mean, stddev = self.reward_state_model(states, actions)
        n_states = self.reward_state_model.sample_next_state(mean, stddev)
        done = self.done_model(states, actions)

        return n_states, rewards, done


Q学習クラス

プランニングの手法はQ学習としています。

class Q_learning():
    def __init__(self, env):
        self.nb_actions = env.action_space.n

        self.epsilon = 0.1
        self.test_epsilon = 0.001
        self.batch_size = 32   # バッチサイズ
        self.gamma = 0.9       # 割引率

        # Qモデル
        c = input_ = keras.layers.Input(shape=env.observation_space.shape)
        c = keras.layers.Dense(16, activation="relu")(c)
        c = keras.layers.Dense(16, activation="relu")(c)
        c = keras.layers.Dense(self.nb_actions, activation="linear")(c)
        self.q_model = keras.Model(input_, c)
        self.q_model.compile(optimizer=Adam(learning_rate=0.01), loss='mse')


    def sample_action(self, state, training=False):
        if training:
            epsilon = self.epsilon
        else:
            epsilon = self.test_epsilon
        
        if np.random.random() < epsilon:
            # epsilonより低いならランダムに移動
            return np.random.randint(self.nb_actions)
        else:
            # Q値が最大のアクションを実行
            q = self.q_model(state.reshape(1,-1))[0].numpy()
            return np.argmax(q)
    

    # 実環境から学習
    def update_from_env(self, experiences):

        # ランダムに経験を取得してバッチを作成
        batchs = random.sample(experiences, self.batch_size)

        # データ形式を変形
        states = np.asarray([e["state"] for e in batchs])
        actions = np.asarray([e["action"] for e in batchs])
        n_states = np.asarray([e["n_state"] for e in batchs])
        rewards = np.asarray([e["reward"] for e in batchs])
        done_list = np.asarray([e["done"] for e in batchs])

        return self._update_model(states, actions, n_states, rewards, done_list)


    # 近似モデルから学習
    def update_from_a_mdp(self, a_mdp):

        # ランダムに状態とアクションを取得
        states = random.sample(a_mdp.states, self.batch_size)
        actions = [ random.sample(a_mdp.actions, 1)[0] for _ in range(self.batch_size)]
        states = np.asarray(states)
        actions = np.asarray(actions)

        # 予測
        n_states, rewards, done_list = a_mdp.sample_step(states, actions)

        return self._update_model(states, actions, n_states, rewards, done_list)

    

    # 学習
    def _update_model(self, states, actions, n_states, rewards, done_list):
        
        # Q値をだす        
        q = self.q_model(states).numpy()
        n_q = self.q_model(n_states).numpy()
        
        # 各バッチでQ値を計算
        for i in range(len(states)):
            action = actions[i]
            reward = rewards[i]
            if done_list[i]:
                q[i][action] = reward
            else:
                q[i][action] = reward + self.gamma * np.max(n_q[i])

        # モデルをミニバッチ学習する
        q_loss = self.q_model.train_on_batch(states, q)


メインフロー

メインフローのコードです。

env = gym.make("CartPole-v0")

buffer_size = 2048
warmup_size_a_mdp = 128
warmup_size_ql = 256
epochs = 300

# 近似モデル
a_mdp = A_MDP(env, buffer_size)

# Q学習
q_learning = Q_learning(env)


# 学習ループ
for episode in range(epochs):
    state = np.asarray(env.reset())
    done = False
    total_reward = 0
    step = 0

    while not done:
        
        action = q_learning.sample_action(state, training=True)

        n_state, reward, done, _ = env.step(action)
        n_state = np.asarray(n_state)
        step += 1
        total_reward += reward

        # 経験を保存する
        a_mdp.add({
            "state": state,
            "action": action,
            "reward": reward,
            "n_state": n_state,
            "done": done,
        })
        state = n_state

        # 近似モデルの学習
        if len(a_mdp.experiences) == warmup_size_a_mdp-1:
            print("train start A_MDP")
        if len(a_mdp.experiences) >= warmup_size_a_mdp:
            a_mdp.update()

        # Q学習
        if len(a_mdp.experiences) == warmup_size_ql-1:
            print("train start Qlearning")
        if len(a_mdp.experiences) >= warmup_size_ql:
            #q_learning.update_from_env(a_mdp.experiences)
            q_learning.update_from_a_mdp(a_mdp)
            
env.close()

結果

  • 近似モデルを用いない場合の学習例

Figure_52_1.png

  • 近似モデルのみで学習した場合

Figure_52_2.png

近似モデルを用いない場合と同じ学習傾向ですね。
近似モデルでも学習に問題なさそうです。

  • 近似モデルのloss

Figure_52_3.png

状態遷移関数と報酬関数は問題なく学習できていそうです。
ただ終了関数はあまり学習できていません。(正解率10%ほど)
けどこれがないとQ学習も進まなかったので、正解率は低いですが重要な役割を持っていそうです。

  • Q学習のloss

Figure_52_4.png

lossから見てもちゃんと学習できていそうですね。

モデルベース強化学習の論文ベースの近似モデル

Neural Network Dynamics for Model-Based Deep Reinforcement Learning with Model-Free Fine-Tuning

上記論文のコードを参考にしています。
間違っているかも知れませんが、論文内では状態遷移関数のみが言及されているように思います。
報酬関数は環境から与えられているものとし、終了関数については特に触れられていません。
ですので、報酬関数と終了関数は自作のものを使います。

状態遷移関数モデル

モデルとしては今と次の状態の差分を学習します。
また学習時にはガウスノイズをまぜて学習させます。

また論文と違う点ですが、論文では多分バッチの学習は連続状態を想定していそうですが、実装ではランダムに取り出して学習しています。


class _A_MDP_trans(keras.Model):
    def __init__(self, env):
        super().__init__()

        self.batch_size = 32
        dense_units = 16
        activation = "relu"

        # 共通レイヤー
        self.d1 = keras.layers.Dense(dense_units, activation=activation)
        self.d2 = keras.layers.Dense(dense_units, activation=activation)
        self.o = keras.layers.Dense(env.observation_space.shape[0], activation="linear")

        # optimizer
        self.optimizer = Adam(learning_rate=0.01)


    # Forward pass
    def call(self, states, actions, training=False):
        x = tf.concat([states, actions], axis=1)
        x = self.d1(x)
        x = self.d2(x)
        diff_next_state = self.o(x)

        return diff_next_state


    # 次の状態を返す
    def sample_next_state(self, state, action):
        diff_next_state = self(state, action)
        next_state = state + diff_next_state
        return next_state


    # モデルを更新する
    def update(self, state, n_state, action, reward):
        
        # ミニバッチ
        idx = np.random.randint(0, len(state), self.batch_size)
        state_batch = state[idx]
        n_state_batch = n_state[idx]
        action_batch = action[idx]
        
        diff_state = n_state_batch - state_batch

        # ガウスノイズ
        state_batch += np.random.normal(state_batch.mean(axis=0), state_batch.std(axis=0), size=state_batch.shape)
        diff_state += np.random.normal(diff_state.mean(axis=0), diff_state.std(axis=0), size=diff_state.shape)
        
        # 勾配を計算
        with tf.GradientTape() as tape:
            mse = 0
            for i in range(len(state_batch)):
                pred_diff_state = self(
                    state_batch[i].reshape((1, -1)), 
                    action_batch[i].reshape((1, -1)), 
                    training=True)
                # MSE
                mse += tf.reduce_mean(tf.square(diff_state[i] - pred_diff_state))

            loss = mse / len(state)

        grads = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.trainable_variables))

        return loss.numpy()

MPC(Model Predictive Control)

モデル予測制御(MPC: Model Predictive Control)は未来をシミュレーションしながら最適な行動を予測する制御方法です。
現在の状態から近似モデルを用いて最適なアクションを選ぶ事をMPCというらしいです。

今回はMPCのアルゴリズムとして、各アクションから始めるランダムなアクション長を生成します。
それらをシミュレーションし、得た割引報酬が一番多いアクションを採用する方法を取ります。

def sample_action(a_mdp, init_satate):
    
    nb_actions = アクション数
    gamma = 0.9      # 割引率
    step_length = 3  # アクション長

    # 各アクションを評価
    action_reward = []
    for a in range(nb_actions):
        
        # ランダムにアクション長を作成、先頭は今回のアクション
        actions = [a]
        actions.extend([random.randint(0, nb_actions-1) for _ in range(step_length-1)])

        # そのアクションを使ってシミュレーション
        step = 0
        state = init_satate.reshape((1, -1))
        total_reward = 0
        for action in actions:
            action = np.asarray([action])

            # 近似モデルから次の状態を予測
            reward, n_state, done = a_mdp.sample_step(state, action)

            reward = reward.numpy()[0][0]
            done = done[0][0]
            total_reward += (gamma ** step) * reward
            step += 1
            if done:
                break
            state = n_state
        action_reward.append(total_reward)
    
    # 最大のアクションを選択
    return np.argmax(action_reward)

実行結果

Figure_52_5.png

MPCがランダムな予測なので学習はできていません。。。

PETS

PETS(Probabilistic ensembles with trajectory sampling)は紹介のみとなります。
PETSはPEとTSで分かれています。

まずPEです。
今までの手法は状態の予測を1つのモデルで予測していました。
しかし、複数のモデルを学習させてその平均結果を用いて状態を予測する手法がPEとなります。
(アンサンブル学習)

次にTSですが、MPC部分となります。
ランダムにアクション列を決めるのではなく交差エントロピー法という手法で決める方法です。

交差エントロピー法は、確率分布を $P_r(\theta)$ とした場合、この確率分布に従って複数のアクション列を生成します。

$$ a_t = P_r(\theta_t) $$

複数のアクション列を実際にシミュレーションし、得られた報酬が一番いいアクション列で確率分布を更新する手法が交差エントロピー法です。

あとがき

後半少し手抜きしました。
意外にまとまった資料がないものですね。

3
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?