はじめに
本記事は私が深層強化学習の練習をしていたときに作成したコードの紹介になります。
Tensorflowのver2で↓こんなものができました。
概要
このプログラムは 【強化学習】実装しながら学ぶA3C【CartPoleで棒立て:1ファイルで完結】 に記載されているコードをTensorFlow 2、Google Colaboratory上で動かすことができるように修正を加えたものです。
A3Cのアルゴリズムに関しては、上記の参照元か元論文を参照してください。
また、Google Colaboratory 上で gymの結果を正常に出力するために 【強化学習】OpenAI Gym を Google Corab上で描画する方法 (2020.6版) のgym-notebook-wrapperを使用しています。
なお、以下のコードを.ipynbにまとめたものを以下のgithubリポジトリで公開しています
プログラム
事前準備
# colaboratory上での結果描画用のgym-notebook-wrapperをインストール
!apt update && apt install xvfb
!pip install gym-notebook-wrapper
## gym-notebook-wrapperのテスト
## https://qiita.com/ymd_h/items/c393797deb72e1779269
import gnwrapper
import gym
env = gnwrapper.Monitor(gym.make('CartPole-v1'),directory="./test/")
o = env.reset()
for _ in range(100):
o, r, d, i = env.step(env.action_space.sample())
if d:
break
env.display()
A3C部分
# A3C sample: cart pole task
# 以下のURLの実装を TensorFlow 2に変更したもの。
# 参考:https://qiita.com/sugulu/items/acbc909dd9b74b043e45
import gnwrapper
import tensorflow as tf
import gym, time, random
from datetime import datetime
import os
import numpy as np
tf.keras.backend.set_floatx('float64')
start_datetime = datetime.now().strftime('%Y%m%d_%H%M%S')
print("Tensorflow version " + tf.__version__)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # TensorFlow高速化用のワーニングを表示させない
# -- constants of Game
ENV = 'CartPole-v0'
env = gym.make(ENV)
NUM_STATES = env.observation_space.shape[0] # CartPoleは4状態
NUM_ACTIONS = env.action_space.n # CartPoleは、右に左に押す2アクション
NONE_STATE = np.zeros(NUM_STATES)
# -- constants of LocalBrain
MIN_BATCH = 5
LOSS_V = .5 # v loss coefficient
LOSS_ENTROPY = .01 # entropy coefficient
LEARNING_RATE = 5e-3
RMSPropDecaly = 0.99
GOAL = 100
# -- params of Advantage-ベルマン方程式
GAMMA = 0.99
N_STEP_RETURN = 5
GAMMA_N = GAMMA ** N_STEP_RETURN
N_WORKERS = 8 # スレッドの数
Tmax = 10 # 各スレッドの更新ステップ間隔
# ε-greedyのパラメータ
EPS_START = 0.5
EPS_END = 0.0
EPS_STEPS = GOAL*N_WORKERS
from tensorflow.keras.layers import Dense, Flatten, Conv2D
from tensorflow.keras import Model
class Network(Model):
def __init__(self):
# ネットワークを定義
super(Network, self).__init__()
self.l_dense = Dense(16, activation='relu', autocast=False)
self.out_actions = Dense(NUM_ACTIONS, activation='softmax')
self.out_value = Dense(1, activation='linear')
def call(self, x):
x = self.l_dense(x)
y = self.out_actions(x)
z = self.out_value(x)
return y, z
class GlobalBrain:
def __init__(self):
self.model = Network()
self.optimizer = tf.keras.optimizers.Adam()
self._activate_weight()
self.isLearned = False
self.frames = 0
def _activate_weight(self):
x = np.zeros((1, NUM_STATES))
# x = np.random.randn(1, NUM_STATES)
predictions = self.model(x)
# グローバル・ブレインのネットワークを更新する
def update_global_weight_params(self, grads):
self.optimizer.apply_gradients(zip(grads, self.model.trainable_weights))
# グローバル・ブレインのネットワークをプルする
def pull_global_weight_params(self, local_model):
[l_p.assign(g_p) for l_p, g_p in zip(local_model.trainable_weights, self.model.trainable_weights)]
# グローバル・ブレインのネットワークへプッシュする
def push_local_weight_params(self, local_model):
[g_p.assign(l_p) for g_p, l_p in zip(self.model.trainable_weights, local_model.trainable_weights)]
class LocalBrain:
def __init__(self, name, global_brain): # globalなglobal_brainを変数として持つ
with tf.name_scope(name):
self.train_queue = [[], [], [], [], []] # s, a, r, s', s' terminal mask
self.model = Network()
self.global_brain = global_brain
self._activate_weight()
def _activate_weight(self):
x = np.zeros((1, NUM_STATES))
# x = np.random.randn(1, NUM_STATES)
self.model(x)
def loss_func(self, s, a, r, p, v):
# loss関数を定義します
log_prob = tf.math.log(tf.math.reduce_sum(p * a, axis=1, keepdims=True) + 1e-10)
advantage = r - v
loss_policy = - log_prob * tf.stop_gradient(advantage) # stop_gradientでadvantageは定数として扱います
loss_value = LOSS_V * tf.math.square(advantage) # minimize value error
entropy = LOSS_ENTROPY * tf.math.reduce_sum(p * tf.math.log(p + 1e-10), axis=1, keepdims=True) # maximize entropy (regularization)
return tf.math.reduce_mean(loss_policy + loss_value + entropy)
def update_parameter(self): # localbrainの勾配でParameterServerの重みを学習・更新します
if len(self.train_queue[0]) < MIN_BATCH: # データがたまっていない場合は更新しない
return False
s, a, r, s_, s_mask = self.train_queue
self.train_queue = [[], [], [], [], []] # 使ったQueryは初期化する
s = np.vstack(s) # vstackはvertical-stackで縦方向に行列を連結、いまはただのベクトル転置操作
a = np.vstack(a)
r = np.vstack(r)
s_ = np.vstack(s_)
s_mask = np.vstack(s_mask)
_, v = self.model.predict(s_)
# N-1ステップあとまでの時間割引総報酬rに、Nから先に得られるであろう総報酬vに割引N乗したものを足します
r = r + GAMMA_N * v * s_mask # set v to 0 where s_ is terminal state
# lossを取得
with tf.GradientTape() as tape:
p, v = self.model(s)
loss = self.loss_func(s, a, r, p, v)
# gradientの計算
gradients = tape.gradient(loss, self.model.trainable_variables)
# global_brainの重みを更新
self.global_brain.update_global_weight_params(gradients)
return True
def pull_global_parameter(self): # global_brainの重みを引き出す (global→local)
self.global_brain.pull_global_weight_params(self.model)
def push_local_weight_params(self): # global_brainに重みをコピーする(local→global)
self.global_brain.push_local_weight_params(self.model)
def train_push(self, s, a, r, s_): # 学習用のデータを貯めておくキュー
self.train_queue[0].append(s)
self.train_queue[1].append(a)
self.train_queue[2].append(r)
if s_ is None:
self.train_queue[3].append(NONE_STATE)
self.train_queue[4].append(0.)
else:
self.train_queue[3].append(s_)
self.train_queue[4].append(1.)
class Agent:
def __init__(self, name, global_brain, thread_type):
self.brain = LocalBrain(name, global_brain) # 行動を決定するための脳(ニューラルネットワーク)
self.thread_type = thread_type
self.memory = [] # s,a,r,s_の保存メモリ、used for n_step return
self.R = 0. # 時間割引した、「いまからNステップ分あとまで」の総報酬R
self.global_brain = global_brain
def act(self, s):
if self.global_brain.frames >= EPS_STEPS or not self.thread_type == 'learning': # ε-greedy法で行動を決定します 171115修正
eps = EPS_END
else:
eps = EPS_START + self.global_brain.frames * (EPS_END - EPS_START) / EPS_STEPS # linearly interpolate
if self.thread_type == 'learning':
if random.random() < eps:
action = random.randint(0, NUM_ACTIONS - 1) # ランダムに行動
else:
s = np.array([s])
p, _ = self.brain.model.predict(s)
# action = np.argmax(p) # これだと確率最大の行動を、毎回選択
action = np.random.choice(NUM_ACTIONS, p=p[0])
# probability = p のこのコードだと、確率p[0]にしたがって、行動を選択
# pにはいろいろな情報が入っていますが確率のベクトルは要素0番目
else:
s = np.array([s])
p, _ = self.brain.model.predict(s)
action = np.argmax(p) # これだと確率最大の行動を、毎回選択
# action = np.random.choice(NUM_ACTIONS, p=p[0])
return action
def advantage_push_local_brain(self, s, a, r, s_): # advantageを考慮したs,a,r,s_をbrainに与える
def get_sample(memory, n): # advantageを考慮し、メモリからnステップ後の状態とnステップ後までのRを取得する関数
s, a, _, _ = memory[0]
_, _, _, s_ = memory[n - 1]
return s, a, self.R, s_
# one-hotコーディングにしたa_catsをつくり、、s,a_cats,r,s_を自分のメモリに追加
a_cats = np.zeros(NUM_ACTIONS) # turn action into one-hot representation
a_cats[a] = 1
self.memory.append((s, a_cats, r, s_))
# 前ステップの「時間割引Nステップ分の総報酬R」を使用して、現ステップのRを計算
self.R = (self.R + r * GAMMA_N) / GAMMA # r0はあとで引き算している、この式はヤロミルさんのサイトを参照
# advantageを考慮しながら、LocalBrainに経験を入力する
if s_ is None:
while len(self.memory) > 0:
n = len(self.memory)
s, a, r, s_ = get_sample(self.memory, n)
self.brain.train_push(s, a, r, s_)
self.R = (self.R - self.memory[0][2]) / GAMMA
self.memory.pop(0)
self.R = 0 # 次の試行に向けて0にしておく
if len(self.memory) >= N_STEP_RETURN:
s, a, r, s_ = get_sample(self.memory, N_STEP_RETURN)
self.brain.train_push(s, a, r, s_)
self.R = self.R - self.memory[0][2] # # r0を引き算
self.memory.pop(0)
class Environment:
total_reward_vec = np.zeros(10) # 総報酬を10試行分格納して、平均総報酬をもとめる
count_trial_each_thread = 0 # 各環境の試行数
def __init__(self, name, thread_type, global_brain):
self.name = name
self.thread_type = thread_type
self.env = gym.make(ENV)
self.env._max_episode_steps = GOAL * 2
self.agent = Agent(name, global_brain, thread_type) # 環境内で行動するagentを生成
self.global_brain = global_brain
def run(self):
self.agent.brain.pull_global_parameter() # global_brainの重みを自身ThreadのLocalBrainにコピー
if (self.thread_type is 'test') and (self.count_trial_each_thread == 0):
self.env.reset()
# self.env = gym.wrappers.Monitor(self.env, directory="./{}/".format(start_datetime))
self.env = gnwrapper.Monitor(self.env, directory="./{}/".format(start_datetime))
# force=True, video_callable=(lambda ep: ep % 1 == 0)) # 動画保存する場合
# 全セッション内で共有
s = self.env.reset()
R = 0
step = 0
while True:
a = self.agent.act(s) # 行動を決定
s_, r, done, info = self.env.step(a) # 行動を実施
step += 1
self.global_brain.frames += 1 # セッショントータルの行動回数をひとつ増やします
r = 0
if done: # terminal state
s_ = None
if step < GOAL:
r = -1
else:
r = 1
# Advantageを考慮した報酬と経験を、localBrainにプッシュ
self.agent.advantage_push_local_brain(s, a, r, s_)
s = s_
R += r
if done or (step % Tmax == 0): # 終了時がTmaxごとに、parameterServerの重みを更新し、それをコピーする
if not(self.global_brain.isLearned) and self.thread_type == 'learning':
self.agent.brain.update_parameter() # Globalパラメータを更新
self.agent.brain.pull_global_parameter() # global_brainの重みを自身ThreadのLocalBrainにコピー
if done:
self.total_reward_vec = np.hstack((self.total_reward_vec[1:], step)) # トータル報酬の古いのを破棄して最新10個を保持
self.count_trial_each_thread += 1 # このスレッドの総試行回数を増やす
break
# 総試行数、スレッド名、今回の報酬を出力
# print("スレッド:"+self.name + "、試行数:"+str(self.count_trial_each_thread) + "、今回のステップ:" + str(step)+"、平均ステップ:"+str(self.total_reward_vec.mean()))
print('Step: {} traial: {} local step: {} avg step: {} r'.format(self.name, self.count_trial_each_thread, step, self.total_reward_vec.mean()))
if self.thread_type is 'test':
return self.env.display()
# スレッドで平均報酬が一定を越えたら終了
elif self.total_reward_vec.mean() >= GOAL:
self.global_brain.isLearned = True
time.sleep(3.0) # この間に他のlearningスレッドが止まります
self.agent.brain.push_local_weight_params() # この成功したスレッドのパラメータをparameter-serverに渡します
class Worker:
# スレッドは学習環境environmentを持ちます
def __init__(self, thread_name, thread_type, global_brain):
self.environment = Environment(thread_name, thread_type, global_brain)
self.thread_type = thread_type
self.global_brain = global_brain
def run(self):
if self.thread_type == 'learning':
while True:
if not self.global_brain.isLearned:
self.environment.run()
else:
return True
if self.thread_type == 'test':
self.environment.run()
学習の実行とテスト
from concurrent.futures import ThreadPoolExecutor as PoolExecutor
N_WORKERS = 4
# M1.スレッドを作成します
threads = []
with tf.device("/cpu:0"):
global_brain = GlobalBrain() # 全スレッドで共有するパラメータを持つエンティティです
# 学習するスレッドを用意
for i in range(N_WORKERS):
thread_name = 'local_{}'.format(i)
threads.append(Worker(thread_name=thread_name, thread_type="learning", global_brain=global_brain))
# threads.append(Worker(thread_name="end_signal", thread_type="test", global_brain=global_brain))
# COORD = tf.train.Coordinator() # TensorFlowでマルチスレッドにするための準備です
features = []
with PoolExecutor(max_workers=N_WORKERS) as executor:
for worker in threads:
job = lambda: worker.run()
features.append(executor.submit(job))
# テストここから
W = Worker(thread_name="test_thread", thread_type="test", global_brain=global_brain)
W.run()