今回はDDPGを実装してみました。
※ネット上の情報をかき集めて自分なりに実装しているので正確ではない可能性がある点はご注意ください
※ライブラリはTensowflow2.0(+Keras)を使っています。
コード全体
本記事で作成したコードは以下です。
追記:自作フレームワークを作成しています。そちらにも実装があります。
DDPG(Deep Deterministic Policy Gradient)
DPGは連続行動空間を制御するために考案されたアルゴリズムで、Actor-Criticなモデルを用いて行動価値と方策を学習しますが、方策勾配法を使わずに学習するというちょっと変わった手法になります。
DPGにディープラーニングを適用した手法がDDPGです。
参考
- DDPGでPendulum-v0(強化学習, tensorflow2)
- Deterministic Policy Gradient Algorithms(論文)
- Continuous control with deep reinforcement learning(論文)
- [OpenAI Spinning Up] Deep Deterministic Policy Gradient
従来の方策勾配法(Actor-Critic含む)では方策の出力は各アクションの確率でした。(第2回を参照)
連続行動空間ではこれを確率分布に拡張することで連続値を実現していました。(第4回を参照)
DPGでは確率分布ではなく直接値(スカラー値)を出力します。
ですのでDPGでは方策に関して、従来の確率的な方策と比較し、決定論的方策(Deterministic Policies)と言ったりします。
方策を学習するので方策勾配法と比較されますが、手法としてはQ学習に近い内容なのでまずはQ学習からみていきます。
Q学習は以下の行動価値関数を最大化するように学習する手法でした。
$$
Q(s_t,a_t) = E_{s_{t+1}}[ r(s_t) + \gamma \max_{a_{t+1}} Q(s_{t+1},a_{t+1})]
$$
$E_{s_{t+1}}$は次に取りうる全ての状態に対する期待値で、$\max_{a_{t+1}} Q(s_{t+1},a_{t+1})$が取りうるアクションの中から最大のQ値(行動価値)を返す関数です。
ここでDPGではアクションポリシーを $\mu(s)$ と置いた場合以下のように近似します。
$$ \max_{a} Q(s,a) \approx Q(s,\mu(s))$$
Q値が最大のアクションを選ぶのではなく、Q値が最大となるアクションをアクションポリシー $\mu(s)$ が選ぶ(学習する)といった感じです。
DDPGのモデルを見ると分かりやすいと思います。
ActorとCriticは別モデルです。
Actorは状態からアクションを出力し、Criticは状態とアクションを入力にQ値を出力します。
DDPGの主要部分は以上ですが、学習を安定させるために3つのテクニックを使っています。
Replay buffer
DDPGは決定論的方策のため、学習に過去の経験を使いまわせます。
Replay buffer は、経験をメモリー上に保存し、その中からランダムに取り出してミニバッチ学習する手法です。
コードは以下です。
warmup_size = 100 # 最低限貯めるキューサイズ
buffer_size = 1000 # キューの最大サイズ
batch_size = 32 # バッチサイズ
train_interval = 10 # 学習間隔
# 収集する経験は上限を決め、古いものから削除する
from collections import deque
experiences = deque(maxlen=buffer_size)
all_step_count = 0
# 学習ループ
for episode in range(300):
(略)
# 1episodeループ
while not done:
action = 何かしらアクションを決める
n_state, reward, done, _ = env.step(env_action) # 1step進める
# 経験を保存する
episode_exp.append({
"state": state,
"action": action,
"reward": reward,
"n_state": n_state,
"done": done,
})
# wamup_size 分経験が貯まれば学習を開始する
if len(experiences) >= warmup_size and all_step_count % train_interval == 0:
# ランダムに経験を取得してバッチを作成
batchs = random.sample(experiences, batch_size)
# 学習
train(batchs)
all_step_count += 1
Soft-Target
DQNと同様に Target Network を作成して更新に使うモデルの学習を遅らせる手法です。
行動価値関数を直接学習しようとすると環境によってはQ値が発散し、学習が安定しないようです。
(多分、更新目標値=行動価値関数の値がころころ変わってしまうと、学習が行ったり来たりして不安定になる感じ?)
これを低減するために推定する行動価値関数の学習を遅らせる手法が Target Network です。
一定step毎にネットワークの重みを同期する方法を Hard-Target、ゆっくり近づける方法を Soft-Target と言います。
- Hard-Target
model = 価値関数モデル
target_model = 価値関数モデル
#--- 学習
for episode in range(300):
if 一定回数ごと:
# 同期する
target_model.set_weights(model.get_weights())
- Soft-Target
$$ \theta^{target} \leftarrow \tau \theta + (1 - \tau) \theta^{target} $$
ここで$\tau$は重みを近づける割合です。
model = 価値関数モデル
target_model = 価値関数モデル
tau = 0.02 # 重みを近づける割合
#--- 学習
for episode in range(300):
# 重みを近づける
target_model.set_weights(
(tau) * np.array(model.get_weights(), dtype=object)
+ (1-tau) * np.array(target_model.get_weights(), dtype=object)
)
参考:【深層強化学習】【DQN】Target Network
探索ノイズ
Q学習では ε-Greedy(一定確率でランダム行動する)で探索を実現していました。
DDPGは決定論的方策なので常に最適解を選び探索を行いません。
そこでDDPGでは探索用にアクションにノイズを混ぜて学習させます。
ノイズは、論文ではOUノイズを採用していますが OpenAI Spinning Up では平均0のガウス分布を採用しています。(性能が良いそうです)
stddev = 0.2 # ガウス分布の標準偏差
def sample_action(self, state, training=False):
# Actorからアクションを出力
actions = actor_model(state.reshape((1,-1)))
action = actions[0].numpy()
# ガウス分布に従った乱数を出す
noise = np.random.normal(0, stdev, size=env.action_space.shape)
# actionにノイズを混ぜる(念のため-1~1でclip)
action = np.clip(action + noise, -1, 1)
return action
また、ノイズはスケールさせて学習が進む毎にスケールを小さくするといいとの事ですが、本実装では省略しています。
TD3(Twin Delayed DDPG)
TD3はDDPGを改良した手法で、以下3つの手法を取り入れより学習性能をあげた手法になります。
参考
Clipped Double Q learning
DQN の Double Q learning と同じ考えです。
Q学習ではTD誤差を計算する際の $\max_{a_{t+1}} Q(s_{t+1},a_{t+1})$ 項にて次の状態の価値 $Q(s_{t+1},a_{t+1})$ が過大評価(overestimation)される場合に学習が安定しない問題があります。
過大評価(真の価値より良いと見積もる)された値を推定に使うと、間違った評価が伝搬されネットワーク全体の学習が遅くなる事が予測されます。
Double Q learning ではアクションの選択と推定で同じモデルを使うことが過大評価に繋がると予想し、これを切り離して過大評価を防ぐ手法です。
参考:Deep Reinforcement Learning with Double Q-learning(論文)
Double Q learning では行動選択に Q network を使い、Q値推定に Target Q network を使いました。
TD3では Q network を2つ用いてQ値が小さいほうを採用することで対処しています。
- DDPG
$$
L_{critic} = \frac{1}{N} \sum ( r_{t+1} + \gamma Q(s_{t+1}, \mu(s_{t+1})) - Q(s_t, a_t) )^2
$$
- TD3
Q'(s, a) = \min(Q_1(s, \mu(s)), Q_2(s, \mu(s))) \\
L_{critic} = \frac{1}{N} \sum ( r_{t+1} + \gamma Q'(s_{t+1}, s_{t+1}) - Q(s_t, a_t) )^2
class CriticModel(keras.Model):
def __init__(self):
super().__init__()
# 各レイヤーを定義
self.dense1 = keras.layers.Dense(32, activation="relu")
self.value1 = keras.layers.Dense(1, activation="linear")
self.dense2 = keras.layers.Dense(32, activation="relu")
self.value2 = keras.layers.Dense(1, activation="linear")
# Forward pass
def call(self, states, actions, training=False):
# 接続層
x = tf.concat([states, actions], axis=1)
# Qネットワーク1
x1 = self.dense1(x)
q1 = self.value1(x1)
# Qネットワーク2
x2 = self.dense2(x)
q2 = self.value2(x2)
return q1, q2
def update_model():
# 学習タイミング
states = 今の状態
n_states = 次の状態
actions = アクション
# targetモデルから次の状態のアクションを取得
n_actions = target_actor_model(n_states)
# Q値を出力
n_qvals1, n_qvals2 = target_critic_model(n_states, n_actions)
# 小さいほうのQ値を採用
n_qvals = [min(q1, q2) for q1, q2 in zip(n_qvals1.numpy(), n_qvals2.numpy())]
(略)
#--- Actorの学習
with tf.GradientTape() as tape:
# Actorの学習はQ1かQ2どちらかを最大化するように学習
actor_actions = actor_model(states, training=True)
q1, _ = critic_model(states, actor_actions)
actor_loss = -tf.reduce_mean(q1) # 最大化
(略)
#--- Criticの学習
with tf.GradientTape() as tape:
# 両方のネットワークを学習
q1, q2 = critic_model(states, actions, training=True)
loss1 = tf.reduce_mean(tf.square(q_vals - q1))
loss2 = tf.reduce_mean(tf.square(q_vals - q2))
critic_loss = loss1 + loss2
(略)
Target Policy Smoothing
Target Policy にノイズを混ぜます。
これによりQ関数がなめらか(smooth)になり、ポリシー学習時にQ関数のエラー(外れ値や過大評価値かな?)を利用しにくくし、学習の頑健性の向上を図ります。
ノイズは平均0のガウスノイズを0.5を超えないようにclipさせて乗せます。(アクションが-1~1の範囲の場合)
また、アクション範囲を超えないようにノイズを乗せた後にさらにclipするようです。
a'(s') = clip( \mu_{\theta_{target}}(s') + clip(\epsilon, -c, c), a_{low}, a_{high}) \\
\epsilon \sim N(0, \sigma^2)
$N$ は平均0、分散 $\sigma^2$ の正規分布、$c$ がノイズのclip範囲です。
def update_model():
# 学習タイミング
n_states = 次の状態
# targetモデルから次の状態のアクションを取得
n_actions = target_actor_model(n_states)
# ノイズを追加
c = 0.5
noise_stddev = 0.2
clipped_noise = np.clip(np.random.normal(0, noise_stddev, n_actions.shape), -c, c)
n_actions = np.clip(n_actions + clipped_noise, -1, 1)
# Q値を出力
n_qvals1, n_qvals2 = target_critic_model(n_states, n_actions)
(略)
Delayed Policy Update
Q関数に比べてポリシーは更新頻度が低いので、ポリシーの更新頻度を下げるという内容です。
具体的にはCritic2回の更新につきActorは1回更新するようです。
実装
環境は Pendulum-v0
を採用しました。
モデル
実装モデルは以下です。
アクションは tanh
で-1~1の範囲に限定しています。
学習は-1~1ですが、環境に渡す時は環境にあった範囲に直して渡します。
コードは以下です。
- ActorModel
class ActorModel(keras.Model):
def __init__(self, action_space):
super().__init__()
self.noise_stdev = 0.2 # ノイズ用の標準偏差
self.action_space = action_space
# Envアクション用
self.action_centor = (action_space.high + action_space.low)/2
self.action_scale = action_space.high - self.action_centor
# 各レイヤーを定義
self.dense1 = keras.layers.Dense(32, activation="relu")
self.dense2 = keras.layers.Dense(32, activation="relu")
self.dense3 = keras.layers.Dense(32, activation="relu")
self.actions = keras.layers.Dense(action_space.shape[0], activation="tanh")
# optimizer
self.optimizer = Adam(lr=0.003)
# Forward pass
def call(self, inputs, training=False):
x = self.dense1(inputs)
x = self.dense2(x)
x = self.dense3(x)
actions = self.actions(x)
return actions
# 状態を元にactionを算出
def sample_action(self, state, training=False):
actions = self(state.reshape((1,-1)))
action = actions[0].numpy()
if training:
# 学習用
# ノイズを混ぜる
noise = np.random.normal(0, self.noise_stdev, size=self.action_space.shape)
action = np.clip(action + noise, -1, 1)
# 環境用のアクションと学習用のアクションを返す
return (action * self.action_scale + self.action_centor), action
else:
# テスト用、環境に渡すアクションのみを返す
return action * self.action_scale + self.action_centor
- CriticModel
class CriticModel(keras.Model):
def __init__(self):
super().__init__()
# 各レイヤーを定義
self.dense1 = keras.layers.Dense(32, activation="relu")
self.dense2 = keras.layers.Dense(32, activation="relu")
self.dense3 = keras.layers.Dense(32, activation="relu")
self.value1 = keras.layers.Dense(1, activation="linear")
self.dense4 = keras.layers.Dense(32, activation="relu")
self.dense5 = keras.layers.Dense(32, activation="relu")
self.dense6 = keras.layers.Dense(32, activation="relu")
self.value2 = keras.layers.Dense(1, activation="linear")
# optimizer
self.optimizer = Adam(lr=0.003)
# Forward pass
def call(self, states, actions, training=False):
x = tf.concat([states, actions], axis=1)
x1 = self.dense1(x)
x1 = self.dense2(x1)
x1 = self.dense3(x1)
q1 = self.value1(x1)
x2 = self.dense4(x)
x2 = self.dense5(x2)
x2 = self.dense6(x2)
q2 = self.value2(x2)
return q1, q2
学習
Actorの学習とCriticの学習があります。
def update_model(
actor_model,
target_actor_model,
critic_model,
target_critic_model,
experiences,
batch_size,
gamma,
all_train_count,
actor_update_interval,
target_policy_noise_stddev,
target_policy_clip_range,
):
# ランダムに経験を取得してバッチを作成
batchs = random.sample(experiences, batch_size)
# Target Networkを用いて次の状態の価値を出す
n_states = np.asarray([e["n_state"] for e in batchs])
n_actions = target_actor_model(n_states)
# Target Actionのノイズ
clipped_noise = np.clip(np.random.normal(0, target_policy_noise_stddev, n_actions.shape), -target_policy_clip_range, target_policy_clip_range)
n_actions = np.clip(n_actions + clipped_noise, -1, 1)
# 2つのQ値から小さいほうを採用
n_qvals1, n_qvals2 = target_critic_model(n_states, n_actions)
n_qvals = [min(q1, q2) for q1, q2 in zip(n_qvals1.numpy(), n_qvals2.numpy())]
# Qを計算 : reward if done else (reward + gamma * n_qval)
q_vals = np.asarray([
[reward] if done else [reward] + gamma * n_qval
for reward, done, n_qval in zip(
[e["reward"] for e in batchs],
[e["done"] for e in batchs],
n_qvals,
)
])
# データ整形
states = np.asarray([e["state"] for e in batchs])
actions = np.asarray([e["action"] for e in batchs])
#--- Actorの学習
# Actorの学習は少し減らす
if all_train_count % actor_update_interval == 0:
with tf.GradientTape() as tape:
actor_actions = actor_model(states, training=True)
q, _ = critic_model(states, actor_actions)
actor_loss = -tf.reduce_mean(q) # 最大化
grads = tape.gradient(actor_loss, actor_model.trainable_variables)
actor_model.optimizer.apply_gradients(zip(grads, actor_model.trainable_variables))
#--- Criticの学習 MSEで学習
with tf.GradientTape() as tape:
q1, q2 = critic_model(states, actions, training=True)
loss1 = tf.reduce_mean(tf.square(q_vals - q1))
loss2 = tf.reduce_mean(tf.square(q_vals - q2))
critic_loss = loss1 + loss2
grads = tape.gradient(critic_loss, critic_model.trainable_variables)
critic_model.optimizer.apply_gradients(zip(grads, critic_model.trainable_variables))
Targetモデルの同期(Soft-Target)
def update_target_model(actor_model, target_actor_model, critic_model, target_critic_model, soft_tau):
target_actor_model.set_weights(
(1 - soft_tau) * np.array(target_actor_model.get_weights(), dtype=object)
+ (soft_tau) * np.array(actor_model.get_weights(), dtype=object))
target_critic_model.set_weights(
(1 - soft_tau) * np.array(target_critic_model.get_weights(), dtype=object)
+ (soft_tau) * np.array(critic_model.get_weights(), dtype=object))
全体の流れ
env = gym.make("Pendulum-v0")
# ハイパーパラメータ
buffer_size = 10000 # キューの最大容量
warmup_size = 500 # 最低限キューに入れる数
train_interval = 10 # 学習間隔
batch_size = 32 # バッチサイズ
gamma = 0.9 # 割引率
soft_tau = 0.02 # Target network の近づく割合
actor_update_interval = 2 # Actorの更新間隔
target_policy_noise_stddev = 0.2 # Target policy ノイズの標準偏差
target_policy_clip_range = 0.5 # Target policy ノイズのclip範囲
# モデルの定義
actor_model = ActorModel(env.action_space)
target_actor_model = ActorModel(env.action_space)
critic_model = CriticModel()
target_critic_model = CriticModel()
# モデルは一度伝搬させないと重みが作成されない
dummy_state = np.random.normal(0, 0.1, size=(1,) + env.observation_space.shape)
actor_model(dummy_state)
target_actor_model(dummy_state)
target_actor_model.set_weights(actor_model.get_weights())
dummy_action = np.random.normal(0, 0.1, size=(1,) + env.action_space.shape)
critic_model(dummy_state, dummy_action)
target_critic_model(dummy_state, dummy_action)
target_critic_model.set_weights(critic_model.get_weights())
# 収集する経験は上限を決め、古いものから削除する
experiences = deque(maxlen=buffer_size)
all_step_count = 0
all_train_count = 0
# 学習ループ
for episode in range(500):
state = np.asarray(env.reset())
done = False
total_reward = 0
step = 0
# 1episode
while not done:
# アクションを決定
env_action, action = actor_model.sample_action(state, True)
# 1step進める
n_state, reward, done, _ = env.step(env_action)
n_state = np.asarray(n_state)
step += 1
total_reward += reward
experiences.append({
"state": state,
"action": action,
"reward": reward,
"n_state": n_state,
"done": done,
})
state = n_state
# warmup貯まったら train_interval 毎に学習する
if len(experiences) >= warmup_size and all_step_count % train_interval == 0:
# モデルの更新
update_model(
actor_model,
target_actor_model,
critic_model,
target_critic_model,
experiences,
batch_size,
gamma,
all_train_count,
actor_update_interval,
target_policy_noise_stddev,
target_policy_clip_range,
)
# Soft-target
update_target_model(
actor_model,
target_actor_model,
critic_model,
target_critic_model,
soft_tau
)
all_train_count += 1
all_step_count += 1
#--- 5回テストする例
for episode in range(5):
state = np.asarray(env.reset())
env.render()
done = False
total_reward = 0
step = 0
# 1episode
while not done:
action = actor_model.sample_action(state)
n_state, reward, done, _ = env.step(action)
env.render()
state = np.asarray(n_state)
step += 1
total_reward += reward
print("{} step, reward: {}".format(step, total_reward))
env.close()
学習結果
200 step, reward: -135.62393931697093
200 step, reward: -133.9956068205278
200 step, reward: -267.9843515164706
200 step, reward: -1499.221939660923
200 step, reward: -3.617902091432299
あとがき
TRPOやPPOで苦労していたPendulum-v0がすごい簡単に学習できてしまいました。
やはり確率的なものを扱うより値自体を学習させたほうがニューラルネットワークと相性がいいんでしょうかね。