前回のTransformerに続き、そろそろDeepな強化学習も理解したいなということで、実装してみました。
この記事では、古典的なテーブルのQ学習とそれをニューラルネットで表したDQNの実装をします。
更新式の導出は他の記事に任せますが、その式の意味合いと気持ちを汲み取って説明します。
こちらにコードを公開しています↓
問題設定
WWWWWWWWWWWWWWWWWWWWW
WS W W W
W W WWWWW W W W WWWWW
W W W W W W
W W W WWW WWWWW W WWW
W W W W W
W W W WWW WWW W WWWWW
W W GW
WWWWWWWWWWWWWWWWWWWWW
- S: スタート
- G: ゴール
- W: 壁
このような複数経路ある迷路を、スタートからゴールまで最短で到達することを目指します。
ちなみにこの迷路は迷路自動作成ツールを使って作りました。
エージェントはマップ全体を見ることができず、上、下、左、右の4つのどれかの行動をした結果、移動できれば移動し、移動できなければ(壁なら)その場に留まります。
暗闇で移動しているイメージです。
移動した結果、ゴールならゴールであると認識できます。
テーブルのQ学習
Qテーブル
テーブルのQ学習では、「状態$s$ $\times$ 行動$a$」の価値を表すテーブル$Q(s, a)$を持ち、これを更新していくことで学習を進めます。
迷路の例では、状態$s$は位置情報、行動$a$は上下左右の4つです。
学習(更新式)
状態$s$で行動$a$を選択して、状態$s'$になったときに、価値テーブル$Q(s, a)$を次式で更新します。
$$
\begin{align}
Q(s, a) &\leftarrow Q(s, a) + \alpha\left(R(s, a) + \gamma \max_{a'} Q(s', a') - {Q(s, a)}\right) \\
&= (1 - \alpha)Q(s, a) + \alpha\left(R(s, a) + \gamma \max_{a'} Q(s', a')\right)
\end{align}
$$
- $R(s, a)$: 状態$s$で行動$a$を選択したときの報酬
- $\alpha$: 学習率
- $\gamma$: 割引率
式の導出は他の記事に任せて、気持ちだけ読み取っておきます。
この更新式は、「現在の評価値$Q(s,a)$」と「今受け取った報酬$R(s,a)$と移動先の最大価値$\max_{a'} Q(s',a')$」を内分したものと考えることができます。
気持ちとしては、現在の評価値を汲み取りつつ、たった今受け取った価値も考慮したいというものですね。
その調整パラメータが$\alpha$です。$\alpha=1$なら完全に今受けとった価値だけを考慮することになります。
$\gamma$は移動先の価値をどれくらい考慮するかという係数です。$\gamma=0$なら、今受け取った報酬だけを価値とすることになります。
迷路の例では、行動した結果ゴールしたら報酬が与えられ、それ以外の場合では報酬はありません。
すなわちほとんどの行動は報酬が与えられません。
$\gamma\ne 0$とすることで、ゴールしたマスから価値が伝播して、正しい行動を学習できることになります。
行動選択
状態$s$にいるエージェントは、価値が最も高い行動を選択します。
$$
\mathrm{arg}\max_{a} Q(s, a)
$$
ただし、常にこれをやってしまうと、「現在最も価値が高いと思われる行動」を取り、「さらに良い行動の可能性」を捨ててしまうことになります。
そのため、一定確率でランダムな行動を選択するようにします。
実装
import numpy as np
class Q_learning:
"""単純なQ-learning"""
def __init__(self, maze):
# 迷路
self.maze = maze
# 状態(=エージェントの位置)は迷路の盤面数
row, col = self.maze.board_size()
self.num_state = row * col
# 行動の数は上下左右の4種類
self.num_action = 4
# Qは 状態数 x 行動数
self.Q = np.zeros((self.num_state, self.num_action))
# 現在の状態
self.state = self.get_state()
def select_best_action(self):
"""評価値の最も高い行動を探す"""
return self.Q[self.state, :].argmax()
def select_action(self, random_rate=0.01):
"""一定の確率で、ベストでない動きをする"""
if np.random.rand() < random_rate:
return np.random.randint(self.num_action)
return self.select_best_action()
def get_state(self):
"""状態を取得"""
_, col = self.maze.board_size()
x, y = self.maze.get_position()
return x * col + y
def reward(self):
"""報酬"""
return 0 if self.maze.is_goal() else -1
def from_start(self):
"""スタートからやり直す"""
self.maze.reset()
self.state = self.get_state()
def step(self, learning_rate, discount_rate, random_rate):
# 行動の選択。ベストアクションとは限らない。
action = self.select_action(random_rate)
# 選択された行動に従い動く。ただし、壁がある場合は無視される
self.maze.move(action)
# 移動後の状態を取得
next_state = self.get_state()
# ベストアクションを選択
next_action = self.select_best_action()
# Q[s][a] += 学習率 * ( 報酬 + 割引率 * ( max_{s'} Q[s'][a'] ) - Q[s][a] )
self.Q[self.state][action] += learning_rate * (
self.reward()
+ discount_rate * self.Q[next_state][next_action]
- self.Q[self.state][action]
)
# 移動後の状態を現在の状態に記録
self.state = next_state
呼び出し側のコード(抜粋)
# 何回ゴールするか
EPISODE_MAX = 1000
# ゴールまでの打ち切りステップ数
STEP_MAX = 3000
# 学習率
LEARNING_RATE = 0.1
# 割引率
DISCOUNT_RATE = 0.95
# 描画スピード
SLEEP_TIME = 0.001
maze = Maze(BOARD)
q_learn = Q_learning(maze)
for episode in range(EPISODE_MAX):
step = 0
q_learn.from_start()
# ランダムに最善でない行動を取る
random_rate = 0.01 + 0.9 / (1 + episode)
while not maze.is_goal() and step < STEP_MAX:
# エージェントの1ステップ(行動、評価値の更新)
q_learn.step(LEARNING_RATE, DISCOUNT_RATE, random_rate)
# 迷路描画
maze.draw()
step += 1
time.sleep(SLEEP_TIME)
print("\x1b[K") # 行末までをクリア
print(f"episode : {episode} step : {step} ")
コードの解説はコメント通りです。
学習の様子
はじめのほう
途中経過1
途中経過2
学習できてきた
Deep Q Network(DQN)
いよいよDeepなQ学習です。
Qネットワーク
先程のQテーブルがニューラルネットワーク$Q_\theta(s,a)$に置き換わります。($\theta$はニューラルネットで学習するパラメータ)
状態を入力として、各行動の価値を出力するネットワークです。
迷路の例では、2次元の位置座標$(i,j)$が入力で、行動数4次元の出力になります。
今回の実装では、全結合のネットワークを使いました。
学習
Qネットワーク$Q_{\theta}$を学習するときの学習データとして、次のものを使います。
- 入力:$s$
- ターゲットの$a$次元目:
$$
R(s, a) + \gamma \max_{a'} Q_{\theta'}(s', a')
$$
ターゲットの$a$次元目以外は、現在のネットワークの出力そのままをターゲットとします。(変更しないように)
$R(s, a), \gamma, a', s'$の意味は、Qテーブルのときと同様です。
ここでも導出は他の記事に任せて、式の気持ちを説明します。
この式の形は、Qテーブルの方でも出てきた式と同じ形をしています。
Qテーブルの方では、$1-\alpha:\alpha$で内分していた式ですね。
ニューラルネットでは、これをターゲットとしてその方向に更新することで、少しだけターゲットに近づくことになります。(更新しまくるとオーバーフィットして、ターゲットとほぼ一致するでしょう。)
Qテーブルの$\alpha$が学習率という名前だったとおり、まさにニューラルネット側でも学習ステップ幅に対応することがわかります。
式中の$\theta'$は少し前の$\theta$の値です。
$\theta$の更新に$\theta'$を使っているのは、ニューラルネットがFitすべきターゲットがぶれないようにするためです。
毎回ターゲットが変わるとニューラルネットがどこに合わせればいいかわからなくなるので、適当な間隔で$\theta'\leftarrow \theta$としてあげます。
例外として、$s'$が終了状態(迷路の例ではゴール)では、ターゲットを$R(s,a)$とします。
これは、次の状態の最大価値$Q(s',a')$を伝播させる必要がないためだと思われます。(ゴールしたら次の行動は無い)
学習データ
ニューラルネットの学習には学習データが必要なので、これまで行動した履歴を蓄えておく必要があります。
蓄える情報は、$(s, a, R(s,a), s')$です。
これを一定数蓄えて、先の方法でターゲットを作り学習させるわけです。
学習に使うデータは、蓄えられたメモリからランダムにサンプリングして使います。
より学習がうまくいくように、損失が大きいものを重点的にサンプリングする工夫をしているものもあります。
学習は毎回行動のたびに更新するのではなく、一定の間隔でサンプリングしたミニバッチで学習します。
行動選択
テーブルのQ学習と同様に、状態$s$にいるエージェントは、価値が最も高い行動を選択します。
$$
\mathrm{arg}\max_{a} Q_{\theta}(s, a)
$$
ただし、一定確率でランダムな行動を選択します。
実装
import numpy as np
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
from keras.layers.advanced_activations import PReLU
from collections import deque
class Memory:
def __init__(self, max_size):
self.memory = deque(maxlen=max_size)
def add(self, experience):
self.memory.append(experience)
def sample(self, batch_size):
select = np.random.choice(self.len(), size=batch_size, replace=False)
return [self.memory[i] for i in select]
def len(self):
return len(self.memory)
def DQN(loss, input_dim, output_dim, hidden_size, learning_rate):
model = Sequential()
model.add(Dense(hidden_size, input_dim=input_dim))
model.add(PReLU())
model.add(Dense(hidden_size))
model.add(PReLU())
model.add(Dense(output_dim))
model.compile(optimizer=Adam(learning_rate), loss=loss)
return model
class Deep_Q_learning:
"""Deep Q-learning"""
def __init__(self, maze, loss, hidden_size, learning_rate, memory_size):
self.maze = maze
# 状態の次元数は、(i, j)座標の2次元
self.state_dim = 2
# 行動の数は上下左右の4種類
self.num_action = 4
# 現在の状態
self.state = self.get_state()
# Q が NNになる
self.Qmain = DQN(
loss, self.state_dim, self.num_action, hidden_size, learning_rate
)
self.Qtarget = DQN(
loss, self.state_dim, self.num_action, hidden_size, learning_rate
)
# メモリ
self.memory = Memory(memory_size)
def update(self, batch_size, discount_rate, epoch):
if self.memory.len() < batch_size:
return
mini_batch = self.memory.sample(batch_size)
# 状態を入力として
inputs = np.array([state for (state, _, _, _, _) in mini_batch])
# 基本はQネットワークの出力をそのまま予測する
targets = self.Qmain.predict(inputs)
actions = np.array([action for (_, action, _, _, _) in mini_batch])
rewards = np.array([reward for (_, _, reward, _, _) in mini_batch])
next_states = np.array([next_state for (_, _, _, next_state, _) in mini_batch])
not_fins = np.array([int(not fin) for (_, _, _, _, fin) in mini_batch])
next_actions = self.Qmain.predict(next_states).argmax(axis=1)
pred_max = self.Qtarget.predict(next_states)[
np.arange(batch_size), next_actions
]
# 報酬 + 割引率 * ( max_{a'} Q(s', a') ) がターゲット
targets[np.arange(batch_size), actions] = rewards + discount_rate * pred_max * not_fins
self.Qmain.fit(inputs, targets, epochs=epoch, verbose=0)
def select_action(self, random_rate):
"""一定の確率で、ベストでない動きをする"""
if np.random.rand() < random_rate:
return np.random.randint(self.num_action)
return self.select_best_action()
def select_best_action(self):
"""評価値の最も高い行動を探す"""
pred = self.Qmain.predict([self.state])[0]
action = pred.argmax()
return action
def get_state(self):
"""状態を取得"""
row, col = self.maze.board_size()
x, y = self.maze.get_position()
return x / row, y / col
# return x, y
def reward(self):
"""報酬"""
return 1 if self.maze.is_goal() else -1
def from_start(self):
"""スタートからやり直す"""
self.maze.reset()
self.state = self.get_state()
def step(self, random_rate):
# 行動の選択。ベストアクションとは限らない。
action = self.select_action(random_rate)
# 選択された行動に従い動く。ただし、壁がある場合は無視される
self.maze.move(action)
# 移動後の状態を取得
next_state = self.get_state()
# ゴールしたかどうか
fin = self.maze.is_goal()
# 報酬
reward = self.reward()
# メモリに追加
self.memory.add((self.state, action, reward, next_state, fin))
# 移動後の状態を現在の状態に記録
self.state = next_state
def Qmain_to_Qtarget(self):
self.Qtarget.set_weights(self.Qmain.get_weights())
呼び出し側コード(抜粋)
# 何回ゴールするか
EPISODE_MAX = 1000
# ゴールまでの打ち切りステップ数
STEP_MAX = 3000
# 学習率
LEARNING_RATE = 0.001
# 割引率
DISCOUNT_RATE = 0.9
# 描画スピード
SLEEP_TIME = 0.0001
# 隠れ層のサイズ
HIDDEN_SIZE = 32
# 蓄える行動履歴データの数
MEMORY_SIZE = 500000
# バッチサイズ
BATCH_SIZE = 1024
# 損失関数
LOSS = "mse"
# ミニバッチで繰り返す学習回数
EPOCH = 2
# 更新する間隔
UPDATE_INTERVAL = 10
# ターゲットネットワークを更新する間隔
update_target_epsode_interval = 10
maze = Maze(BOARD)
deep_q_learn = Deep_Q_learning(maze, LOSS, HIDDEN_SIZE, LEARNING_RATE, MEMORY_SIZE)
for episode in range(EPISODE_MAX):
step = 0
deep_q_learn.from_start()
# ランダムに最善でない行動を取る
random_rate = max(0.98 ** episode, 0.1)
if episode % update_target_epsode_interval == 0:
deep_q_learn.Qmain_to_Qtarget()
while not maze.is_goal() and step < STEP_MAX:
# エージェントの1ステップ(行動、評価値の更新)
deep_q_learn.step(random_rate)
# 迷路描画
maze.draw()
# Qネットワークの重みを更新する
if step % UPDATE_INTERVAL == 0:
deep_q_learn.update(BATCH_SIZE, DISCOUNT_RATE, EPOCH)
step += 1
time.sleep(SLEEP_TIME)
print("\x1b[K") # 行末までをクリア
print(f"episode : {episode} step : {step} ")
このサイズの迷路では、300エピソード(300回ゴール)したあたりで、まともな動きをするようになりました。
あとがき
今回は何回も同じ迷路を解くことで、学習をしました。
汎用の迷路を解くには、問題設定を変えて、マップ全体が見える状態でマップ全体をネットワークに突っ込み、CNNなどを使って学習する必要がありそうです。
また、DQNのような価値ベースの手法ではなくて、方策勾配法という方策ベースのものがあります。
確実な断言はできませんが、いろんな問題に対して方策勾配法のほうが経験的には良いそうです。(聞いた話)
Deepな強化学習を実際にやってみて、学習がうまくいかないときに、何が原因かよくわからなくて、かなり苦戦しました。
バッチサイズやメモリーサイズを増やすだけでうまくいくようになったりするみたいです。
色んな要素が絡んでて難しいですね。