LoginSignup
11
13

More than 1 year has passed since last update.

【強化学習】OpenAI Gym×Keras-rlで強化学習アルゴリズムを実装していくぞ(Q学習編)

Last updated at Posted at 2019-05-03

前回の続きです。
強化学習といえばQ学習ですね。
強化学習の実装例としてkerasの実装例はたくさんあるのですが、keras-rlの実装例はなかったので、まずはQ学習から実装してみたいと思います。

本シリーズ

追記:改めて記事にしています。

概要

  • ゲームの前処理としてProcessorを実装しgymの入出力を調整
  • Q学習用のkeras-rlとしてAgentを実装
  • keras-rlのハイパーパラメータをoptunaで最適化

コード全体

本記事で作成したコードは以下です。(GoogleColaboratoryは実行結果付き)
※1ファイル完結です。
※GoogleColaboratoryは実行結果付き

QLAgent(keras-rlのAgent)の実装

Q学習について

強化学習の歴史の中で古くからある手法の1つがTD学習という手法です。
TD学習では状態しか見ていなかった所をアクションまで見るようにしたのがQ学習(Q-Learning)という手法です。(多分…)

TD学習ではTD誤差というものを計算してある状態の評価値を更新していく学習となります。
TD誤差は以下の式となります。

$$r_{t}+\gamma V(s_{t+1}) - V(s_{t})$$

実際の更新式は以下です。

$$V(s_{t}) \leftarrow V(s_{t}) + \alpha(r_{t}+\gamma V(s_{t+1}) - V(s_{t}) )$$

$\gamma$が割引率(評価値の伝搬率)、$\alpha$が学習率(評価値の更新率)となります。
イメージとしては以下のように報酬を伝搬していく感じですね。

qiita_02_ql_td.PNG

TD学習では状態だけを見ていましたが、これをアクションまで拡張したものがQ学習となります。

$$Q(s_{t},a_{t}) \leftarrow Q(s_{t},a_{t}) + \alpha(r_{t}+\gamma \max_pQ(s_{t+1,p}) - Q(s_{t},a_{t}) )$$

ゲームの前処理

今回試すゲームは棒立て(Pendulum)です。

pendium_ql.py
env = gym.make("Pendulum-v0")

ただ、このゲームは直接Q学習で学習することはできないので、Q学習で学習できるように環境とアクションの離散化を行います。

Processorの実装

Q学習では、入力と出力(環境とアクション)は線形モデル(連続した数値)を扱えません。
ですが、今回のゲームは線形モデルのため1,2,3といった離散した数値に変換する必要があります。

離散化/線形化のためにProcessorクラスを継承して中身を実装していきます。

pendium_ql.py
import rl.core
class PendulumProcessor(rl.core.Processor):

    # アクションの再定義
    # 引数は、agentから出力されたactionの値(ここでは離散値)
    # 戻り値は、envに渡すactionの値  
    def process_action(self, action):
        アクションの線形化
        return action

    # 環境の再定義
    # 引数は、envから出力されたobservation
    # 戻り値が agent へ渡すobservation
    def process_observation(self, observation):
        環境の離散化
        return observation

アクションの線形化

Pendulumではアクションはジョイントの力として -2.0 ~ +2.0 の範囲を受け付けます。
Pendulum-v0のwiki
今回はこれを5パターンに分けます。

pendium_ql.py
def process_action(self, action):
    ACT_ID_TO_VALUE = {
        0: [-2.0], 
        1: [-1.0], 
        2: [0.0], 
        3: [+1.0],
        4: [+2.0],
    }
    return ACT_ID_TO_VALUE[action]
>>> p = PendulumProcessor()
>>> p.process_action(4)
[2.0]

環境の離散化

アクション同様に環境も離散化します。
Pendulumの環境は以下の値をとります。

Num Observation Min Max
0 cos(theta) -1.0 1.0
1 sin(theta) -1.0 1.0
2 theta dot -8.0 8.0

実装では分割数が指定出来るようにしています。

pendium_ql.py
def __init__(self, split_num=8):
    # 各値の最小,最大値
    self.env_low = np.array([-1.0, -1.0, -8.0])
    self.env_high = np.array([1.0, 1.0, 8.0])

    # 分割の基準となる値
    self.env_dx = (self.env_high - self.env_low) / split_num

def process_observation(self, observation):
    d = []
    for i, v in enumerate(observation):
        if v < self.env_low[i]:
            v  = self.env_low[i]
        elif v > self.env_high[i]:
            v = self.env_high[i]
        d.append(int(round((v - self.env_low[i]) / self.env_dx[i])))
    return d
>>> p = PendulumProcessor(split_num=8)
>>> p.process_observation((0.5, -0.1, 4.2))
[6, 4, 6]

Agentの実装

QL用のagentを実装していきます。
あまりkeras-rlのAgentを実装する資料が見つからなかったので基本は公式のソースコードを見つつ実装しています。

まずはAgentの継承からです。

QLAgent.py
import rl.core
class QLAgent(rl.core.Agent):
    def __init__(self, **kwargs):
        super(QLAgent, self).__init__(**kwargs)
        self.compiled = False
    
    def reset_states(self):
        pass

    def compile(self, optimizer=None, metrics=[]):
        self.compiled = True
   
    def load_weights(self, filepath):
        pass
        
    def save_weights(self, filepath, overwrite=False):
        pass

    def forward(self, observation):
        action = 0
        return action

    def backward(self, reward, terminal):
        return []
    
    @property
    def layers(self):
        return []

上記が主な実装関数となり、順番に実装していきたいと思います。
self.compiled はないと怒られました。
また、reset_states関数だけは必須ではなく任意となります。

Qテーブルの実装

まずはQ値を保存するQテーブルを実装していきます。
アルゴリズムとしては状態を文字列として、ハッシュテーブルで表現しています。
速度は分からないですが、環境の形式に対してかなり幅広く対応できる点と分かりやすい点より採用しています。

以下のようなイメージです。

(1, 2, 7)  # 状態(環境)
  ↓
"1_2_7"  # 文字列にする
  ↓
q_actions = q_table["1_2_7"]  # Qテーブルのindexになる。
  ↓
q_actions[0] -> 0.75
q_actions[1] -> 0.3
q_actions[2] -> 0.1

※q_actionsは、アクションがindexになり、それぞれQ値が入る。
※今回だと"1_2_7"状態の0アクションのQ値は0.75

まずはQテーブルの初期化を、コンストラクタで実装します。
また、初期化にあたりアクション数が必要なので引数に nb_actions を追加しています。

QLAgent.py
def __init__(self, nb_actions, **kwargs):
    self.nb_actions = nb_actions

    self.q_table = {}
    self.q_table[""] = [np.random.uniform(low=-1, high=1) for _ in range(self.nb_actions)]

何もない状態""も最初に作成しています。
Q値の初期値は-1~1の間のランダムな値です。

forward関数の実装

次はforward関数を実装していきます。
学習のメインとなる場所ですね。
forwardは環境の1step(env.step)が実行される前に呼ばれる関数で、引数には状態(環境)が入ってきます。
戻り値は実行するアクションの値を返す必要があります。

forwardの実装は以下のようなフローで実装します。

def forward(self, observation):
    
    1. Qテーブルを文字列化
    2. Qテーブルにない状態(環境)なら新規追加

    if self.training:
        3. Q値を更新する
    
    if self.training:
        4. ϵ-greedy法でアクションを決める
    else:
        5. テスト環境なのでQ値が最大のアクションを返す
    
    return action

self.training変数はkeras-rl側で定義されており、トレーニング中ならTrueが入っています。

1. Qテーブルを文字列化。

strで型変換して実現しています。

QLAgent.py
# 文字列化して一意にする。
observation = "_".join([str(o) for o in observation])

2. Qテーブルにない状態(環境)なら新規追加

QLAgent.py
# Qテーブルになければ追加(無限に増えます)
if observation not in self.q_table:
    # Q値の初期化
    self.q_table[observation] = [ np.random.uniform(low=-1, high=1) for _ in range(self.nb_actions) ]

文字列化したobservationに対してQテーブルになければ追加しています。
何もない状態と同様にQ値は-1~1の間のランダムな値にしています。

3. Q値を更新する。

Q値に必要な値は1つ前のobservationとaction、その結果のreward(報酬)と現在のobservationです。

まずは1つ前の状態とアクションをforward関数の最後で保存します。

QLAgent.py
def forward(self, observation):
    

    self.prev_observation = observation
    self.prev_action = action
    return action

rewardはbackward関数から取得します。
こちらは環境の1step(env.step)が実行された後に呼ばれる関数で、引数にはreward(報酬)が入っています。
戻り値はmetricsを返すらしいのですが…、よく分かっていません(調査を後回しにしています)

QLAgent.py
def backward(self, reward, terminal):
    self.prev_reward = reward
    return []

現在の状態(環境)はforwardの引数にあるのでこれで準備が整いました。
Q値の更新式を再掲しておきます。

$$Q(s_{t},a_{t}) \leftarrow Q(s_{t},a_{t}) + \alpha(r_{t}+\gamma \max_pQ(s_{t+1,p}) - Q(s_{t},a_{t}) )$$

QLAgent.py
q_val = self.q_table[self.prev_observation][self.prev_action]  # Q(St,At)
next_maxq = self.q_table[observation][np.argmax(self.q_table[observation])]  # MAX(Q(St+1,At))

# 更新
self.q_table[self.prev_observation][self.prev_action] = q_val + self.traning_rate * (self.prev_reward + self.gammma * next_maxq - q_val)

traning_rateとgammaはハイパーパラメータになるのでユーザが指定できるようにコンストラクタに追加しておきます。

QLAgent.py
def __init__(self, nb_actions, traning_rate=0.5, gamma=0.99, **kwargs):
    self.traning_rate = traning_rate
    self.gamma = gamma

最後に忘れずに初期値を設定しておきます。
初期値はreset_states関数を利用します。

QLAgent.py
def reset_states(self):
    self.prev_observation  = ""
    self.prev_action = 0
    self.prev_reward = 0

4. ϵ-greedy法でアクションを決める

ϵ-greedy法はシンプルで、確率ϵでランダムなアクションを実行し、そうじゃなければQ値に従ってアクションを決めます。

QLAgent.py
if self.epsilon > np.random.uniform(0, 1):
    # ランダム
    action = np.random.randint(0, self.nb_actions)
else:
    # Q値が最大のアクションを取得
    action = np.argmax(self.q_table[observation])

epsilonの値はハイパーパラメータとなるのでユーザが指定できるようにコンストラクタに追加しています。
デフォルトは適当に0.1(10%)としています。

QLAgent.py
def __init__(self, nb_actions, traning_rate=0.5, gamma=0.99, epsilon=0.1, **kwargs):
    self.epsilon = epsilon

5. テスト環境時にQ値が最大のアクションを返す。

Qテーブルを参照し、Q値が最大のアクションを取得します。

QLAgent.py
action = np.argmax(self.q_table[observation])

save/loadの実装

Qテーブルを保存するだけです。
今回は手軽さを重視してpython標準ライブラリのpickleを使いました。

QLAgent.py
import pickle
import os



def load_weights(self, filepath):
    with open(filepath, 'rb') as f:
        self.q_table = pickle.load(f)

def save_weights(self, filepath, overwrite=False):
    if overwrite or not os.path.isfile(filepath):
        with open(filepath, 'wb') as f:
            pickle.dump(self.q_table, f)

agentの使い方

実際に使ってみます。

Pendulum_QL.py
import gym

# 別ファイルにあると仮定しています。コード全体では1つのファイルにまとめています。
from PendulumProcessor import PendulumProcessor
from QLAgent import QLAgent

env = gym.make("Pendulum-v0")
nb_actions = 5  # PendulumProcessorで5個と定義しているので5

processor = PendulumProcessor(split_num=8)

# processorはAgentのコンストラクタの引数で渡します。
agent = QLAgent(nb_actions=nb_actions, traning_rate=0.5, gamma=0.99, epsilon=0.1, processor=processor)
agent.compile()

#--- 以下、前回の記事と同様です ---
# 訓練
print("--- start ---")
print("'Ctrl + C' is stop.")
history = agent.fit(env, nb_steps=1_000_000, visualize=False, verbose=1)

# 結果を表示
plt.subplot(2,1,1)
plt.plot(history.history["nb_episode_steps"])
plt.ylabel("step")

plt.subplot(2,1,2)
plt.plot(history.history["episode_reward"])
plt.xlabel("episode")
plt.ylabel("reward")

plt.show()

# 訓練結果を見る
agent.test(env, nb_episodes=5, visualize=True)

結果

qiita_02_test_no_optuna.png

qiita_02_test_no_optuna_anime.gif

一応学習はできているようです。

Optunaによるハイパーパラメータの調整

ここからはちょっとしたおまけです。
今回ハイパーパラメータがいくつか出てきました。
今後も増えることが予想されるのでハイパーパラメータを自動で最適化してくれるOptunaを試してみたいと思います。

参考:Optuna+KerasでCNNのハイパーパラメータを最適化

インストール

pip install optuna

※GoogleColaboratory

!pip install optuna

最適化関数の準備

まずは最適化するための関数を定義します。

Pendulum_QL_optuna.py
def objective(trial):
    env = gym.make("Pendulum-v0")
    nb_actions = 5  # PendulumProcessorで5個と定義しているので5
    processor = PendulumProcessor(split_num=trial.suggest_int('split_num', 10, 20))

    agent = QLAgent(nb_actions=nb_actions, 
        traning_rate=trial.suggest_uniform('traning_rate', 0.0, 1.0), 
        gamma=trial.suggest_uniform('gamma', 0.0, 1.0), 
        epsilon=0.001,
        processor=processor)
    agent.compile()

    agent.fit(env, nb_steps=100_000, visualize=False, verbose=0)
    history = agent.test(env, nb_episodes=50, visualize=False, verbose=0)

    ave_reward = sum([ n for n in history.history["episode_reward"]]) / 50
    return -ave_reward

最適化ポイントは、以下の3箇所です。
epsilonは乱数の要素が大きく最適化と相性が悪そうなので低めにして固定しています。

trial.suggest_int('split_num', 10, 20)
trial.suggest_uniform('traning_rate', 0.0, 1.0)
trial.suggest_uniform('gamma', 0.0, 1.0)

fitは情報が表示されないように visualize=False , verbose=0 にし、時間がかかるのでnb_stepsは少し小さい値にしています。

agent.fit(env, nb_steps=100_000, visualize=False, verbose=0)

testも同様に visualize=False , verbose=0 にし、評価用に戻り値を保存しています。

history = agent.test(env, nb_episodes=50, visualize=False, verbose=0)

最後にtestの結果からエピソードの平均報酬をだしてそれを返します。
マイナスをつけているのはOptunaが値が最小になるように最適化されるためとなります。

ave_reward = sum([ n for n in history.history["episode_reward"]]) / 50
return -ave_reward

最適化の実行

Pendulum_QL_optuna.py
import optuna

study = optuna.create_study()
study.optimize(objective, n_trials=100)
print(study.best_params)

Optimizeで最適化が実行されます。
n_trialsは試行回数となります。
結果は study.best_params で確認できるようです。

枝刈り(Pruning)の実施

Optunaは探索しても意味がなさそうなパラメータを判断して途中で切り上げる機能があるそうです。
とりあえず作成してみましたが効果はよく分かっていません。(ちょっと早くなってる?)

参考
Optuna+Kerasで転移学習のオプティマイザーと学習率を探してみた
optunaでPruningする時に早すぎる枝刈りをされないようにする方法

まずは枝刈り用のcallback関数を定義します。
keras-rlのAgentなら使いまわせる実装です。

Pendulum_QL_optuna.py
import rl.callbacks
class OptunaCallback(rl.callbacks.Callback):
    def __init__(self, trial):
        self.trial = trial

    def on_episode_end(self, episode, logs):
        episode_reward = -logs["episode_reward"]
        self.trial.report(episode_reward, step=episode)

        # 打ち切り判定
        if self.trial.should_prune(episode):
            raise optuna.structs.TrialPruned()

fitの引数に定義したcallbackを渡します。

Pendulum_QL_optuna.py
agent.fit(env, nb_steps=100_000, visualize=False, verbose=0, callbacks=[OptunaCallback(trial)])

最後にウォームアップ期間を設定します。
不要かもしれませんが。。。

Pendulum_QL_optuna.py
study = optuna.create_study(pruner=optuna.pruners.MedianPruner(n_warmup_steps=1000))

結果

{'split_num': 10, 'traning_rate': 0.4761866993824728, 'gamma': 0.8901017844845089}

パラメータを変更して再度学習させてみました。

qiita_02_test_optuna.png

qiita_02_test_optuna_anime.gif

・・・悪くなってますねorz

まとめ

keras-rlでQ学習の実装ができました。
またoptunaによってハイパーパラメータの最適化もできるようになりました。
次はDQNを実装したいと思います。

参考

TD学習の考え方
強化学習
TD 学習による ゲームの進行度ごとの 評価関数の調整
これから強化学習を勉強する人のための「強化学習アルゴリズム・マップ」と、実装例まとめ
CartPoleでQ学習(Q-learning)を実装・解説
【3目並べで学ぶ強化学習】Q-LearningとDQNを徹底解説 (1/2)

11
13
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
13