はじめに
OpenAI Gymのブロック崩しを挑戦します。
OpenAIからお借りしたイメージ
https://gym.openai.com/videos/2019-10-21--mqt8Qj1mwo/Breakout-v0/poster.jpg
今回はKerasを使ってActor-Criticモデルを構築して訓練ます。Kerasはより簡単にディープラーニングを導入するAPI、自分で調整する必要がある変数や関数が少ないので初心者向けいいサンプルだと思います。
Actor-Criticモデルは強化学習モデルの一つです。詳しい説明はTensorflowのブログに載せています。 (https://www.tensorflow.org/tutorials/reinforcement_learning/actor_critic) しかしリンクのCartPole例と違って、今回ブロック崩しをやります。CartPoleのような4つの変数より、ブロック崩し(每フレーム1枚84x84の画像)の方が遥かに多いのでCNNネットワークを使います。詳しい構成は後で説明します。
この記事は7月にGithubに投稿した記事に基づいて作成します。
https://github.com/leolui2004/atari_rl_ac
結果
先に結果を載せます。1回訓練 = 1ゲームで、8時間をかけて5000回訓練した(episode)。結果はラスト50回の平均を取ります。1枚目はスコア、2枚目はステップ数(timestep)となります。
スコアは若干増えた感じがありますがステップ数の方がかなりいい結果が出ました。ちなみに最初は50回を平均取れないので異常に見えてしまいます。
やり方
ゲームプレイと強化学習分かれて説明します。ゲームプレイの部分はOpenAIのGym環境の中にゲームをやります。強化学習の部分はゲームからもらった変数を訓練して、予測したアクションをゲームに反映されます。
ゲームプレイ
import gym
import random
import numpy as np
env = gym.make('Breakout-v4')
episode_limit = 5000
random_step = 20
timestep_limit = 100000 #永遠にプレーできないように制限
model_train = 1 #0にすると訓練しない(ただのランダムプレー)
log_save = 1 #0にするとログ保存しない
log_path = 'atari_ac_log.txt'
score_list = []
step_list = []
for episode in range(episode_limit):
#毎回プレーする前に環境をリセット
observation = env.reset()
score = 0
#ボールの位置をランダムさせるために最初のランダムのステップ数で何もしない
for _ in range(random.randint(1, random_step)):
observation_last = observation
observation, _, _, _ = env.step(0)
#行動したの観測データをエンコード(後ほど説明)
state = encode_initialize(observation, observation_last)
for timestep in range(timestep_limit):
observation_last = observation
#予測するアクションをモデルから取得(後ほど説明)
action = action_choose(state[np.newaxis, :], epsilon, episode, action_space)
#予測したアクションに基づいて行動
observation, reward, done, _ = env.step(action)
#行動したの観測データをエンコード(後ほど説明)
state_next = encode(observation, observation_last, state)
if model_train == 1:
#行動したの観測データをモデルに送って学習させる(後ほど説明)
network_learn(state[np.newaxis, :], action, reward, state_next[np.newaxis, :], done)
state = state_next
score += reward
#ゲーム終わりもしくはtimestep_limit到達(強制終了)
if done or timestep == timestep_limit - 1:
#結果を記録
score_list.append(score)
step_list.append(timestep)
if log_save == 1:
log(log_path, episode, timestep, score)
print('Episode {} Timestep {} Score {}'.format(episode + 1, timestep, score))
break
#アクションを一定の程度にランダムさせる関数(後ほど説明)
epsilon = epsilon_reduce(epsilon, episode)
env.close()
強化学習
エンコードの部分はグレースケール転換、リサイズと4つ連続の84x84の画像(フレーム)を合成します。これによるとよりボールのアクションを記録できて、訓練しやすいということです。
from skimage.color import rgb2gray
from skimage.transform import resize
frame_length = 4
frame_width = 84
frame_height = 84
def encode_initialize(observation, last_observation):
processed_observation = np.maximum(observation, last_observation)
processed_observation_resize = np.uint8(resize(rgb2gray(processed_observation), (frame_width, frame_height)) * 255)
state = [processed_observation_resize for _ in range(frame_length)]
state_encode = np.stack(state, axis=0)
return state_encode
def encode(observation, last_observation, state):
processed_observation = np.maximum(observation, last_observation)
processed_observation_resize = np.uint8(resize(rgb2gray(processed_observation), (frame_width, frame_height)) * 255)
state_next_return = np.reshape(processed_observation_resize, (1, frame_width, frame_height))
state_encode = np.append(state[1:, :, :], state_next_return, axis=0)
return state_encode
ネットワークと訓練の部分は一番難しいですが、最初に図で表現するとこんな感じです。
先エンコードしたデータを2階層のConv2D層に送ります。そして平坦化して、2階層のDense層に送ります。最後に出力は4つ(OpenAI Gym Breakout-v4の仕様上で NOPE, FIRE, LEFT, RIGHT)となります。ちなみに活性化関数はreluで、ロス関数は論文通り、学習率はactor, critic両方とも0.001、それぞれを設定します。
from keras import backend as K
from keras.layers import Dense, Input, Flatten, Conv2D
from keras.models import Model, load_model
from keras.optimizers import Adam
from keras.utils import plot_model
verbose = 0
action_dim = env.action_space.n
action_space = [i for i in range(action_dim)] # ['NOOP', 'FIRE', 'RIGHT', 'LEFT']
discount = 0.97
actor_lr = 0.001 #actorの学習率
critic_lr = 0.001 #criticの学習率
pretrain_use = 0 #1にすると訓練されたモデルを使う
actor_h5_path = 'atari_ac_actor.h5'
critic_h5_path = 'atari_ac_critic.h5'
#モデル構築
input = Input(shape=(frame_length, frame_width, frame_height))
delta = Input(shape=[1])
con1 = Conv2D(32, (8, 8), strides=(4, 4), padding='same', activation='relu')(input)
con2 = Conv2D(64, (4, 4), strides=(2, 2), padding='same', activation='relu')(con1)
fla1 = Flatten()(con2)
dense = Dense(128, activation='relu')(fla1) #prob, valueをシェア
prob = Dense(action_dim, activation='softmax')(dense) #actor部分
value = Dense(1, activation='linear')(dense) #critic部分
#ロス関数の定義
def custom_loss(y_true, y_pred):
out = K.clip(y_pred, 1e-8, 1-1e-8) #限界を設定
log_lik = y_true * K.log(out) #方策勾配
return K.sum(-log_lik * delta)
if pretrain_use == 1:
#訓練されたモデルを使う
actor = load_model(actor_h5_path, custom_objects={'custom_loss': custom_loss}, compile=False)
critic = load_model(critic_h5_path)
actor = Model(inputs=[input, delta], outputs=[prob])
critic = Model(inputs=[input], outputs=[value])
policy = Model(inputs=[input], outputs=[prob])
actor.compile(optimizer=Adam(lr=actor_lr), loss=custom_loss)
critic.compile(optimizer=Adam(lr=critic_lr), loss='mean_squared_error')
#アクションを予測
def action_choose(state, epsilon, episode, action_space):
#epsilonは最初に1に設定して徐々に下げる
#毎回行動する時ランダム数字と比べて
#epsilonの方が大きいならランダムアクションを取る
if epsilon >= random.random() or episode < initial_replay:
action = random.randrange(action_dim)
else:
probabiliy = policy.predict(state)[0]
#予測した結果は4つのアクションに対してそれぞれ確率がある
#その確率に沿ってアクションを選ぶ
action = np.random.choice(action_space, p=probabiliy)
return action
#データを学習
def network_learn(state, action, reward, state_next, done):
reward_clip = np.sign(reward)
critic_value = critic.predict(state)
critic_value_next = critic.predict(state_next)
target = reward_clip + discount * critic_value_next * (1 - int(done))
delta = target - critic_value
actions = np.zeros([1, action_dim])
actions[np.arange(1), action] = 1
actor.fit([state, delta], actions, verbose=verbose)
critic.fit(state, target, verbose=verbose)
この部分は他の機能として強化学習との直接関係がないですけどを合わせて書きます。
import matplotlib.pyplot as plt
model_save = 1 #0にするとモデルを保存しない
score_avg_freq = 50
epsilon_start = 1.0 #epsilon開始時の確率
epsilon_end = 0.1 #epsilon最低の確率(最低でも10%でランダムアクション)
epsilon_step = episode_limit
epsilon = 1.0
epsilon_reduce_step = (epsilon_start - epsilon_end) / epsilon_step
initial_replay = 200
actor_graph_path = 'atari_ac_actor.png'
critic_graph_path = 'atari_ac_critic.png'
policy_graph_path = 'atari_ac_policy.png'
#epsilon下げさせるの関数
def epsilon_reduce(epsilon, episode):
if epsilon > epsilon_end and episode >= initial_replay:
epsilon -= epsilon_reduce_step
return epsilon
#ログを書く
def log(log_path, episode, timestep, score):
logger = open(log_path, 'a')
if episode == 0:
logger.write('Episode Timestep Score\n')
logger.write('{} {} {}\n'.format(episode + 1, timestep, score))
logger.close()
if pretrain_use == 1:
if model_save == 1:
actor.save(actor_h5_path)
critic.save(critic_h5_path)
else:
if model_save == 1:
actor.save(actor_h5_path)
critic.save(critic_h5_path)
#モデル構成を図に出力
plot_model(actor, show_shapes=True, to_file=actor_graph_path)
plot_model(critic, show_shapes=True, to_file=critic_graph_path)
plot_model(policy, show_shapes=True, to_file=policy_graph_path)
#結果を図に出力
xaxis = []
score_avg_list = []
step_avg_list = []
for i in range(1, episode_limit + 1):
xaxis.append(i)
if i < score_avg_freq:
score_avg_list.append(np.mean(score_list[:]))
step_avg_list.append(np.mean(step_list[:]))
else:
score_avg_list.append(np.mean(score_list[i - score_avg_freq:i]))
step_avg_list.append(np.mean(step_list[i - score_avg_freq:i]))
plt.plot(xaxis, score_avg_list)
plt.show()
plt.plot(xaxis, step_avg_list)
plt.show()