この記事は自作している強化学習フレームワークの解説記事です。
次:PlaNet
World Models
参考
・論文:https://arxiv.org/abs/1803.10122
・作者のblog:https://worldmodels.github.io/
・コードサンプル:https://paperswithcode.com/paper/world-models
・(論文)World Models(2018) | Qiita
・機械学習論文読み:World Models | Qiita
・World Models (the long version) | ADG Efficiency
1. 動機
私たちの脳は日常生活にある膨大な情報を扱うために、空間的・時間的な側面を抽象化して学習しています。
(画像は論文より引用、脳内では自転車が抽象化されている)
この脳内で抽象化された予測モデルに従うことで、例えば反射行動など無意識化で行うような迅速な行動を行う必要があるタスクに対応できていると考えられています。
World Models(世界モデル)ではこの仮説を元に、大規模なモデルで環境を適切に抽象化できれば意思決定は最小限のモデルで学習できるのではないか、という考えの元作り出されたアルゴリズムです。
2. World Models の構成
Wold Models は V(Vision)、M(Memory)、C(Controller) の3つのコンポーネントから成り立ちます。
環境から送られた状態は WorldModel(VとM)を通して抽象化され、それを受け取った C がアクションを決定します。
各コンポーネントは独立しており、分けて考えることができるので順に見ていきます。
2-1. V(Vision、VAE)
V の役割は、環境から渡される高次元な状態を低次元な状態に圧縮する事です。
次元圧縮には変分オートエンコーダー(Variational AutoEncoder; VAE)が使われます。
VAEを通してオリジナルの画像から次元圧縮された状態 z を獲得するのが目的です。
本記事ではVAE側の詳細な内容については省略し、関係ある部分のみ触れます。
(VAEについては以下の記事が参考になりました)
・Variational Autoencoder徹底解説 | Qiita
・【Tensorflowによる実装付き】Variational Auto-Encoder(VAE)を理解する | 楽しみながら理解するAI・機械学習入門
・Variational AutoEncoder | Keras
・畳み込み変分オートエンコーダ | Tensorflow
・TensorFlow Probability で作る変分オートエンコーダ(VAE)| Google Developers
具体的なネットワークの例は以下です。
コード例は以下です。
import tensorflow.keras as keras
from tensorflow.keras import layers as kl
class _VAE(keras.Model):
def __init__(self):
super().__init__()
self.z_size = 32 # 潜在空間のサイズ
# --- encoder
input_shape = (64, 64, 3)
in_layer = c = kl.Input(shape=input_shape)
c = kl.Conv2D(filters=32, kernel_size=4, strides=2, activation="relu")(c)
c = kl.Conv2D(filters=64, kernel_size=4, strides=2, activation="relu")(c)
c = kl.Conv2D(filters=128, kernel_size=4, strides=2, activation="relu")(c)
c = kl.Conv2D(filters=256, kernel_size=4, strides=2, activation="relu")(c)
c = kl.Flatten()(c)
z_mean = kl.Dense(self.z_size)(c)
z_log_var = kl.Dense(self.z_size)(c)
self.encoder = keras.Model(in_state, [z_mean, z_log_var])
# --- decoder
in_state = c = kl.Input(shape=(self.z_size,))
c = kl.Dense(2 * 2 * 256, activation="relu")(c)
c = kl.Reshape((1, 1, 2 * 2 * 256))(c)
c = kl.Conv2DTranspose(128, kernel_size=5, strides=2, padding="valid", activation="relu")(c)
c = kl.Conv2DTranspose(64, kernel_size=5, strides=2, padding="valid", activation="relu")(c)
c = kl.Conv2DTranspose(32, kernel_size=6, strides=2, padding="valid", activation="relu")(c)
c = kl.Conv2DTranspose(3, kernel_size=6, strides=2, padding="valid", activation="sigmoid")(c)
self.decoder = keras.Model(in_state, c)
def call(self, x):
return self.decode(self.encode(x))
def encode(self, x, training=False):
z_mean, z_log_var = self.encoder(x, training=training)
# reparameterize
e = tf.random.normal(z_mean.shape)
z = z_mean + tf.exp(0.5 * z_log_var) * e
if training:
return z_mean, z_log_var, z
else:
return z
def decode(self, z, training=False):
return self.decoder(z, training=training)
def sample(self, size=1):
z = np.random.normal(size=(size, self.z_size))
return self.decode(z), z
2-2. M(Memory、MDN-RNN)
M の役割は未来を予測する事で、今の状態から次の状態を予測します。
予測方法は、各ステップの状態を時系列データとし、次の状態をRNNで予測します。
次の状態は(マルコフ決定過程のモデルでは)確率的に決まるので確率モデルで表現する必要があります。
なので、混合密度ネットワーク(Mixture Density Network; MDN)をはさんで次の状態を出力します。
VAEと同様にRNNとMDNの詳細は省略します。
別途記事を書いているので詳細はそちらをご覧ください。
・LSTM(RNN)で可変長な時系列と隠れ状態について調べてみた(Tensorflow2.0)
・混合密度ネットワーク(Tensorflow2.0)
ネットワークの例は以下です。
MDNは1つのステップに対して適用されるので、LSTMからMDNに渡すタイミングでバッチ×ステップ数に変換しています。
温度パラメータは乱数の強さで0なら決定的になります。
コード例は以下です。
class _MDNRNN(keras.Model):
def __init__(self):
super().__init__()
self.action_num = アクション数
self.z_size = VAEの潜在空間のサイズ
self.num_mixture = 正規分布の数
self.temperature = 温度パラメータ(乱数の反映率)
# --- RNN(LSTM)
self.lstm_layer = kl.LSTM(256, return_sequences=True, return_state=True)
# --- MDN
self.mdn_layer = kl.Dense(self.z_size * self.num_mixture * 3)
def call(self, z, onehot_actions, hidden_state, training=False):
batch_size = z.shape[0]
timesteps = z.shape[1]
# z + action
x = tf.concat([z, onehot_actions], axis=2)
# (batch, timesteps, z + action) -> (batch, timesteps, lstm_dim)
x, h, c = self.lstm_layer(x, initial_state=hidden_state, training=training)
# -> (batch * timesteps, lstm_dim)
x = tf.reshape(x, (batch_size * timesteps, -1))
# -> (batch * timesteps, z * num_mix * 3)
x = self.mdn_layer(x, training=training)
# -> (batch * timesteps, z, num_mix * 3)
x = tf.reshape(x, (-1, self.z_size, self.num_mixture * 3))
# -> (batch * timesteps, z, num_mix) * 3
pi, mu, log_sigma = tf.split(x, 3, axis=2)
return pi, mu, log_sigma, [h, c]
def forward(self, z, action, hidden_state, rnn_only: bool = True):
""" 1ステップ進める(主にhidden_state用) """
onehot_actions = tf.one_hot(np.array([action]), self.action_num, axis=1)
# (batch, shape) -> (batch, 1, shape)
z = z[:, np.newaxis, ...]
onehot_actions = onehot_actions[:, np.newaxis, ...]
pi, mu, log_sigma, hidden_state = self(z, onehot_actions, hidden_state)
# --- hidden_state を進めるだけの場合はここまで
if rnn_only:
return hidden_state
# --- 次の状態も予測する場合
batch = z.shape[0]
z_size = z.shape[2]
sigma = np.exp(log_sigma)
if self.temperature > 0:
# softmax
pi /= self.temperature # adjust temperatures
pi = pi - tf.reduce_max(pi, axis=1, keepdims=True) # overflow_protection
exp_pi = tf.exp(pi)
pi = exp_pi / tf.reduce_sum(exp_pi, axis=1, keepdims=True)
samples = np.zeros((batch, z_size))
for i in range(batch):
for j in range(z_size):
if self.temperature == 0:
# 最大値(決定的)
idx = np.argmax(pi[i][j])
z = mu[i][j][idx]
else:
idx = random.choices([i for i in range(self.num_mixture)], weights=pi[i][j])
z = np.random.normal(mu[i][j][idx], sigma[i][j][idx] * self.temperature)
samples[i][j] = z
return samples, hidden_state
def get_initial_state(self):
return self.lstm_layer.cell.get_initial_state(batch_size=1, dtype=tf.float32)
[余談] sketch RNN
MDN+RNN による未来予測は既にある手法で、過去の事例の一つに sketch RNN があります。
これは Gigazine でも取り上げられていました。
お絵かき途中の線をニューラルネットワークが受け継いでイラストを完成させてくれる「Sketch-RNN」 | Gigazine
まだデモページは生きており遊ぶことができました。結構面白かったです。
https://magenta.tensorflow.org/sketch-rnn-demo
2-3. C(Controller、Linear)
Controllerの役割は報酬が最大になるようなアクションを決めることです。
仮説が正しければCは小さい構成でも問題ないはずなので、単純な線形モデルとして表現します。
$$ a_t = W_c [z_t h_t] + b_c$$
1次関数ですね、$W_c$ と $b_c$ が学習するパラメータを表し $[z_t h_t]$ は関数でいう所の $x$ に相当します。
コード例は以下です。
class _Controller(keras.Model):
def __init__(self):
super().__init__()
self.out_layer = kl.Dense(アクション数)
def call(self, z, hidden_state):
x = tf.concat([z, hidden_state[1]], axis=1)
return self.out_layer(x)
hidden_state ですが、以下の図をベースにいうと中身は [h, c] です。
h は出力の値で、時系列情報を記憶しているのは c となります。
なのでControllerの入力には c のみを使用しています。
3. 学習
3-1. VAE - Vの学習
ここからは学習の話です。
VAEの学習データはランダムに行動した結果を使い、そのデータを元に教師あり学習を行います。
lossは以下です。
$$
loss(\theta) = -E_{z \sim E_{\theta}} \Big[ \log D_\theta(x'|z) \Big] + KLD \Big(E_{\theta}(z|x) || P(z) \Big)
$$
数式の細かい話は参考サイトにまかせて、意味だけを見ていきます。
第1項は再構築された対数尤度で、xの結果出力された潜在変数zと予測されたx'上の分布が与えられます。
これを最大化することで、教師データ $P(x|z)$ の分布と予測結果 $P(x'|z)$ の分布の類似性が最大化されます。
ここはガウス分布を仮定している場合、対数尤度の最大化は平均二乗誤差の最小化(MSE)にしても同じ結果になるらしく、実装はMSEにしています。(https://adgefficiency.com/world-models/)
$$ -E_{z \sim E_{\theta}} \Big[ \log D_\theta(x'|z) \Big] \rightarrow E_{z \sim E_{\theta}} \Big[ ||x' -x|| \Big]^2$$
第2項は、想定している潜在空間の確率分布 $P(z)$ と実際の潜在空間の確率分布とのKL距離です。
KL距離は確率分布の類似度を表す指標で、これを0に近づける事で潜在空間を想定している確率分布に近づけます。
直感的には潜在空間に更に条件を付与しているので、正則化項または圧縮項と解釈できます。
標準正規分布を想定した場合のKL項は以下となります。
$$
KLD \Big(E_{\theta}(z|x) || P(z) \Big) = \frac{1}{2}(1 + \log(\sigma^2_{\theta}) - \sigma^2_{\theta} - \mu_{\theta})
$$
まとめると、第1項が予測を正確にする項で、第2項が z を正規分布内に押し込める項です。
コード例は以下になります。
x = states = 状態(shape=(batch_size, w, h, ch))
optimizer = keras.optimizers.Adam()
with tf.GradientTape() as tape:
z_mean, z_log_var, z = vae.encode(x, training=True)
pred_x = vae.decode(z, training=True)
# reconstruction loss (logistic), commented out.
"""
eps = 1e-6 # avoid taking log of zero
rc_loss = tf.reduce_mean(
tf.reduce_sum(
-(x * tf.math.log(pred_x + eps) + (1.0 - x) * tf.math.log(1.0 - pred_x + eps)),
axis=[1, 2, 3],
)
)
"""
# reconstruction loss (MSE)
rc_loss = tf.reduce_sum(tf.square(x - pred_x), axis=[1, 2, 3])
rc_loss = tf.reduce_mean(rc_loss)
# KL loss
kl_tolerance = 0.5
kl_loss = -0.5 * tf.reduce_sum(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var), axis=1)
kl_loss = tf.maximum(kl_loss, kl_tolerance * vae.z_size)
kl_loss = tf.reduce_mean(kl_loss)
loss = rc_loss + kl_loss
grads = tape.gradient(loss, vae.trainable_variables)
optimizer.apply_gradients(zip(grads, vae.trainable_variables))
kl_tolerance は再構築項(第1項)に比べてKL項(第2項)が過度に最適化するのを防ぐ制限項です。
再構築項はKL項に比べて比較的弱いので制限を設けて改善しているとの事でした。(https://github.com/hardmaru/WorldModelsExperiments/issues/8)
ただ、潜在空間のサイズを掛けてる理由は見当たりませんでした…。
(Free Bits側の論文もSketchRNN側の論文も kl_tolerance は定数です)
3-2. MDN-RNN - Mの学習
V と同じく学習データはランダムに行動して取得します。
1エピソード長は固定として1エピソード=1batchとしているようです。
(R2D2のburn-inみたいなことはしていない1)
loss は MDN と同じで以下です。
(詳細は以前書いた記事を見てください)
$$
loss = - \log \sum_k{\pi_k(x) \phi(y,\mu_k(x), \sigma_k(x))}
$$
コード例は以下です。
states = 時系列の状態 shape=(batch_size, sequence_length + 1, w, h, ch)
actions = 時系列のアクション shape=(batch_size, sequence_length, a)
optimizer = keras.optimizers.Adam()
# onehot
onehot_actions = tf.one_hot(actions, アクション数, axis=2)
# すべての状態を一気にencodeする
states = states.reshape((batch_size * (sequence_length + 1),) + states.shape[2:])
z = vae.encode(states).numpy()
z = z.reshape((batch_size, sequence_length + 1, -1))
# stateとnext_stateに分ける
z1 = z[:, :-1, ...]
z2 = z[:, 1:, ...]
# next_stateはMDNで使うので、時系列情報をなくす
z2 = z2.reshape((batch_size * sequence_length, -1, 1))
with tf.GradientTape() as tape:
pi, mu, log_sigma, _ = rnn(z1, onehot_actions, None, training=True)
# log softmax
pi = pi - tf.reduce_max(pi, axis=2, keepdims=True) # overflow_protection
log_pi = pi - tf.math.log(tf.reduce_sum(tf.exp(pi), axis=2, keepdims=True))
# log gauss
log_gauss = -0.5 * (np.log(2 * np.pi) + 2 * log_sigma + (z2 - mu) ** 2 / tf.exp(log_sigma) ** 2)
# loss
loss = tf.reduce_sum(tf.exp(log_pi + log_gauss), axis=2, keepdims=True)
loss = tf.maximum(loss, 1e-6) # log(0) 回避
loss = -tf.math.log(loss)
loss = tf.reduce_mean(loss)
grads = tape.gradient(loss, rnn.trainable_variables)
optimizer.apply_gradients(zip(grads, rnn.trainable_variables))
3-3. 進化戦略 - Cの学習
Cの学習はパラメータが少ないことを利用して、ディープラーニングでよく使われる勾配降下法ではなく、進化戦略(Evolution Strategy; ES)で学習します。
進化戦略はメタヒューリスティックなアルゴリズムの1つで、他に有名なアルゴリズムとして遺伝的アルゴリズム等があります。2
論文では進化戦略の1つである CMA-ES で学習していますが、コードの実装を見ると以下から選べるようでした。
- PEPG(Parameter-exploring Policy Gradients)
- GA(Genetic Algorithm; 遺伝的アルゴリズム)
- CMA-ES(Covariance Matrix Adaptation Evolution Strategy; 共分散行列適応進化戦略)
- シンプルな進化戦略(Basic Version of OpenAI Evolution Strategies)
フレームワーク上は遺伝的アルゴリズムで実装してみました。
実数値をとるので以前記事を書いたBLX-α交叉のGAを用いています。
コード例は以下です。
(数エピソード回して評価するので少し変則的な書き方になっています、ちゃんとしたブラックボックス的な学習もフレームワーク側でいつか実装するかもしれません)
各個体の遺伝子はパラメータとし、パラメータを固定した状態で数エピソードを実行した後の報酬の平均を評価値とします。
その評価結果を元に個体を選んで交叉させる流れです。
# ニューラルネット(Tensorflow)を1次元の配列にして返す
def get_flat_params(model)
params = model.get_weights()
params_list = [tf.reshape(layer, [1, -1]) for layer in params]
params_list = tf.concat(params_list, axis=1)
return tf.reshape(params_list, [-1]).numpy()
# 1次元の配列をニューラルネット(Tensorflow)の重さに反映する
def set_flat_params(model, flat_params):
n = 0
weights = []
for layer in model.trainable_variables:
# shape のサイズを計算(各要素を掛け合わせる)
size = reduce(lambda a, b: a * b, layer.shape)
w = tf.reshape(flat_params[n : n + size], layer.shape)
weights.append(w)
n += size
model.set_weights(weights)
class Worker(DiscreteActionWorker):
def __init__(self, *args):
# 現在のパラメータをベスト個体とする
best_params = get_flat_params(self.controller)
self.param_length = len(best_params) # 遺伝子の長さ
# --- 初期個体を生成
self.elite_rewards = [[] for _ in range(self.num_individual)]
self.elite_params = [best_params]
for _ in range(self.num_individual - 1):
# 初期個体は正規分布に従った乱数で生成
p = np.random.randn(self.param_length) * self.randn_sigma
self.elite_params.append(p)
self.params_idx = 0
def call_on_reset(self, state: np.ndarray, invalid_actions: List[int]) -> None:
# エピソードの最初に、評価したい個体のパラメータをセットする
self.controller.set_flat_params(self.elite_params[self.params_idx])
self.total_reward = 0
# RNNの初期状態
self.hidden_state = self.rnn.get_initial_state()
def call_policy(self, state: np.ndarray, invalid_actions: List[int]) -> int:
# VAE より z を取得
z = vae.encode(state[np.newaxis, ...])
# z と hidden_state より、controller から action を取得
q = self.controller(z, self.hidden_state)[0].numpy()
action = np.argmax(q) # 重複はほぼないので無視
# hidden_state を進める
self.hidden_state = self.rnn.forward(z, action, self.hidden_state)
return action
def call_on_step(
self,
next_state: np.ndarray,
reward: float,
done: bool,
next_invalid_actions: List[int],
):
self.total_reward += reward
if done:
# 結果を保存
self.elite_rewards[self.params_idx].append(self.total_reward)
if len(self.elite_rewards[self.params_idx]) == self.num_simulations:
self.params_idx += 1
# 一通り個体が評価されたら交叉する
if self.params_idx >= len(self.elite_params):
self._eval()
self.params_idx = 0
self.elite_rewards = [[] for _ in range(self.num_individual)]
return {}
def _eval(self):
# 数エピソード実行した平均が評価値
elite_rewards = np.array(self.elite_rewards).mean(axis=1)
# --- エリート戦略
next_elite_params = []
best_idx = random.choice(np.where(elite_rewards == elite_rewards.max())[0])
best_params = self.elite_params[best_idx]
next_elite_params.append(best_params)
weights = elite_rewards - elite_rewards.min()
if weights.sum() == 0:
weights = np.full(len(elite_rewards), 1 / len(elite_rewards))
else:
weights = weights / weights.sum()
# --- 子の作成
while len(next_elite_params) < self.num_individual:
# --- 親個体の選択(ルーレット方式、重複あり)
idx1 = np.argmax(np.random.multinomial(1, weights))
idx2 = np.argmax(np.random.multinomial(1, weights))
# --- BLX-α交叉
c = []
for i in range(self.param_length):
if self.elite_params[idx1][i] < self.elite_params[idx2][i]:
xmin = self.elite_params[idx1][i]
xmax = self.elite_params[idx2][i]
else:
xmin = self.elite_params[idx2][i]
xmax = self.elite_params[idx1][i]
dx = xmax - xmin
rmin = xmin - self.blx_a * dx
rmax = xmax + self.blx_a * dx
_c = (rmax - rmin) * random.random() + rmin
# 突然変異
if random.random() < self.mutation:
_c = np.random.randn() * self.randn_sigma
c.append(_c)
next_elite_params.append(c)
self.elite_params = next_elite_params
ハイパーパラメータ
class Config(DiscreteActionConfig):
train_mode: int = 1 # 学習項目変更用です
lr: float = 0.001
batch_size: int = 32
capacity: int = 100_000
memory_warmup_size: int = 100
# VAE
z_size: int = 32
kl_tolerance: float = 0.5
# MDN-RNN
sequence_length: int = 10
rnn_units: int = 256
num_mixture: int = 5 # number of mixtures in MDN
temperature: float = 1.15
# GA
num_simulations: int = 16
num_individual: int = 16
mutation: float = 0.01
randn_sigma: float = 1.0
blx_a: float = 0.1
勝手にQ&A
論文を読んだり実装した時に思い浮かんだ疑問です。(自問自答)
-
なぜVAE?AEでもいいのでは?
論文ではVAEを使うとAEより、圧縮後の潜在次元zがよりロバストになるとありました。
私の解釈ですが、これは潜在次元zがある程度意味のあるまとまりに押し込まれるからだと思います。
zが近い者同士は似た画像になっていてほしいので、これを表現するためにVAEを適用したのかなと思います。 -
reward(報酬)とdone(終了状態)は学ばなくていいの?
doomではrewardを学んでいますが、car-racingでは学んでいません。
doneは1エピソードのstep数を固定にしている事で不要にしているっぽいです。
論文での実験では比較的単純なタスクでしか実験できていないので、reward/done の学習までまだ実施できていないような感じでした。 -
温度パラメータの役割は?
MDNの入力になっている温度パラメータ $\tau$(乱数の強さ)ですが、これはRNNの学習に影響します。
$\tau = 0$ で学習した場合は次の状態が決定的に決まるように学習されます。
図は論文内の比較結果です。
左が温度パラメータ、右側が実環境のスコアです。(真ん中はwold models上でのスコアなので参考程度です)
図は目安ですが、なるべく実環境に近い乱数の強さが望ましいように思います。
4. 学習
環境(grid)
以下のような環境です。
猫は上下左右に動けますが、ちゃんと動く確率は80%です。
魚にたどり着くと+1、穴に落ちると-1、動くたびに-0.04の報酬が手に入ります。
1エピソードの累計報酬は -3 ~ 0.8 ほどの値をとり、ランダムに行動すると -1.48 ぐらいの報酬になります。
(学習できていないと正の報酬は難しいですが、難易度は優しめです)
学習コード
4段階に分けて学習します。
- ランダム行動で、サンプルを収集する
- 収集したサンプルで VAE を学習
- 収集したサンプルで MDNRNN を学習
- 学習したネットワークを元に Controller を学習
学習に使用したコードは github を参照してください。
train_mode で学習方法を切り替えています。
パラメータ数
今回の構成だと以下になります。Cは隠れ層もないので流石のパラメータ数ですね。
Model | ParameterCount |
---|---|
VAE | 692,194(encoder) + 3,561,059(decoder) |
MDN-RNN | 448(LSTM) + 81(MDN) |
Controller | 40 |
VAE の可視化
コードでいうと plot_vae() を実行した結果です。
ランダムな潜在変数 z を25個生成し、それをデコードした画像です。
結構カラフルな画像ですが、ちゃんと復元できていますね。
(時々猫がおかしかったり分身したりする画像が生成されますが、概ね問題ない画像です)
学習後の評価結果と可視化
評価結果は以下でした。
Average reward for 100 episodes: 0.4840000081807375
Controller はたった40個のパラメータしかないのにも関わらず学習できています。
1エピソードの詳細な状況を可視化してみました。
画像の説明ですが、図の左上がオリジナルの環境です。
original とあるのが強化学習が受け取る状態です。(64×64にリサイズされた後を受け取っています)
decode は original 画像を VAE を通して復元した結果です。
action の下にある画像は MDN-RNN を通して予測された次の状態 z を復元したものです。
復元結果は毎回変わる(MDNなので)のでサンプルとして縦に3個出力しています。
100%正確とまではいきませんが、かなりいい感じに学習できているように見えます。
おわりに
モデルベース学習も勉強を進めたかったのでまずは有名なアルゴリズムから実装してみました。
WorldModelsよりVAEやMDNといった既存手法の学習のほうが大変でした…。
-
WorldModelsは2018年3月、R2D2は2018年9月提出 ↩