#前置き
Pythonには科学計算をするため様々なライブラリやフレームワークが揃っており、強化学習をやるのにとても便利です。
しかし、強化学習をするためにはアルゴリズムの実装だけでなく、行動するエージェントや、変化する環境などのゲーム部分も実装していく必要があります。
残念ながら、Pythonはそういったゲーム部分を実装するのはとても面倒です。
一方で、Unityは初心者でもわりとクオリティの高いゲームを作れる環境が整っています。
なので、Unityでゲーム部分を実装、Pythonでアルゴリズムの実装、というのがいいかなと個人的に考えています。
#Unity ML-Agents
しかし、UnityとPythonで強化学習を行っていくためには、両者の間でうまく連携させる仕組みを作る必要があります。
今の所その方法が私にはわからないので、今回はUnity ML-Agentsというものを利用していきます。
UnityとPythonを連携する仕組みについて解説されている記事って全くないんですよね・・・。
最近、Pythonは急激に人気が高まってきたので、そのあたり需要があるかと思ったんですが、そうでもないのかもしれません。
#Unity側の実装
ではさっそく実装していきたいと思います。
今回作るのは倒立振子です。
実装や設定に関しては、
【Unity強化学習】自作ゲームで強化学習
こちらで解説したのと同じものになります。
ちなみにAcademyの設定だけすこし変更しました。
これはPythonでUnityアプリを開いたときの画面の大きさです。デフォルトの80×80だと小さくて見えづらいので、1280×720にしました。
あとはBrainTypeをExternalにしてPythonフォルダにビルドします。
名前はCartPole_DQNにしました。
#Python側の実装
Pythonフォルダにあるunityagentsの中身が、UnityとPythonを連携させるための、Python側のファイル群です。
ML-Agentsはデフォルトでは、PPOアルゴリズムを使用しており、unitytrainersフォルダは、そのモデルの実装と学習させるための諸々のファイル群です。今回はDQNアルゴリズムを実装するので使いません。
Pythonフォルダに次のようなpyファイルを書いて実行してみましょう。
import numpy as np
from unityagents import UnityEnvironment
env_path='CartPole_DQN'
brain_name='CartPoleBrain'
if __name__=='__main__':
#環境を構築
env=UnityEnvironment(file_name=env_path)
#環境リセット
env.reset(train_mode=False)
for step in range(1000):
#ランダム行動
action=np.random.randint(0,2)
#step
action_info=env.step(vector_action=action)
#状態
state=action_info[brain_name].vector_observations
#報酬
reward=action_info[brain_name].rewards
#終了条件
done=action_info[brain_name].local_done
#max_step数に達したか
max_reach=action_info[brain_name].max_reached
#表示
print('\n ===== {} step ======'.format(step))
print('\naction=', action)
print('\nstate=', state)
print('\nreward=', reward)
print('\ndone=', done)
print('\nmax_reach=', max_reach)
#環境終了
env.close()
ランダムな行動を1000step実行して、そのときの観測状態を表示します。
Unity、Python両方とも画像のように動けば問題ありません。
これで、Unity,Python間を連携させる仕組みができたので、あとはアルゴリズムを実装して強化学習を行っていきます。
DQNアルゴリズムに関しては、
【強化学習初心者向け】シンプルな実装例で学ぶQ学習、DQN、DDQN【CartPoleで棒立て:1ファイルで完結、Kearas使用】
こちらの記事を参考にさせていただきました。
# coding:utf-8
# 必要なライブラリのインポート
import numpy as np
import tensorflow as tf
import os
from collections import deque
from unityagents import UnityEnvironment
# Q関数をディープラーニングのネットワークをクラスとして定義
class QNetwork:
def __init__(self, learning_rate=0.01, state_size=3, action_size=2, hidden_size=10):
self.x = tf.placeholder(tf.float32, [None, state_size])
self.W_d1 = tf.Variable(tf.truncated_normal([state_size, hidden_size], stddev=0.01))
self.b_d1 = tf.Variable(tf.zeros([hidden_size]))
self.h_d1 = tf.nn.relu(tf.matmul(self.x, self.W_d1) + self.b_d1)
self.W_d2 = tf.Variable(tf.truncated_normal([hidden_size, hidden_size], stddev=0.01))
self.b_d2 = tf.Variable(tf.zeros([hidden_size]))
self.h_d2 = tf.nn.relu(tf.matmul(self.h_d1, self.W_d2) + self.b_d2)
self.W_out = tf.Variable(tf.truncated_normal([hidden_size, action_size], stddev=0.01))
self.b_out = tf.Variable(tf.zeros([action_size]))
self.y = tf.matmul(self.h_d2, self.W_out) + self.b_out
# loss function
self.y_ = tf.placeholder(tf.float32, [None, action_size])
self.loss = tf.reduce_mean(tf.losses.huber_loss(self.y_, self.y))
# train operation
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
self.training = optimizer.minimize(self.loss)
# session
self.sess = tf.Session()
self.sess.run(tf.global_variables_initializer())
#saver
self.saver=tf.train.Saver()
self.cwd=os.getcwd()
# 重みの学習
def replay(self, memory, batch_size, gamma):
inputs = np.zeros((batch_size, 3))
targets = np.zeros((batch_size, 2))
mini_batch = memory.sample(batch_size)
#next_stateの状態が0のとき…rが教師信号
#next_stateの状態が0以外のとき…r+γmaxQ(s,a)が教師信号
for i, (state_b, action_b, reward_b, next_state_b) in enumerate(mini_batch):
inputs[i:i + 1] = state_b
target=reward_b
if not (next_state_b==np.zeros(state_b.shape)).all(axis=1):
retmainQs = self.sess.run(self.y, feed_dict={self.x: next_state_b})[0]
next_action = np.argmax(retmainQs) # 最大の報酬を返す行動を選択する
target = reward_b + gamma * retmainQs[next_action]
targets[i]=self.sess.run(self.y,feed_dict={self.x:state_b}) #Qネットワークの出力
targets[i][action_b]=target #教師信号
self.sess.run(self.training,feed_dict={self.x:inputs,self.y_:targets})
# Experience ReplayとFixed Target Q-Networkを実現するメモリクラス
class Memory:
def __init__(self, max_size=1000):
self.buffer = deque(maxlen=max_size)
def add(self, experience):
self.buffer.append(experience)
def sample(self, batch_size):
idx = np.random.choice(np.arange(len(self.buffer)), size=batch_size, replace=False)
return [self.buffer[ii] for ii in idx]
def len(self):
return len(self.buffer)
# カートの状態に応じて、行動を決定するクラス
class Actor:
def get_action(self, state, targetQN): # [C]t+1での行動を返す
# 徐々に最適行動のみをとる、ε-greedy法
epsilon = 0.01
if epsilon <= np.random.uniform(0, 1):
retTargetQs = targetQN.sess.run(targetQN.y,feed_dict={targetQN.x:state})[0]
action = np.argmax(retTargetQs) # 最大の報酬を返す行動を選択する
else:
action = np.random.choice([0, 1]) # ランダムに行動する
return action
if __name__=='__main__':
# メイン関数開始----------------------------------------------------
# 初期設定--------------------------------------------------------
FILE_PATH='./models/CartPole_DQN'
ENV_PATH = 'CartPole_DQN'
BRAIN_NAME = 'CartPoleBrain'
env = UnityEnvironment(file_name=ENV_PATH)
num_episodes = 3000 # 総試行回数
max_number_of_steps = 1000 # 1試行の最大step数
gamma = 0.99 # 割引係数
# ---
hidden_size = 16 # Q-networkの隠れ層のニューロンの数
learning_rate = 0.00001 # Q-networkの学習係数
memory_size = 10000 # バッファーメモリの大きさ
batch_size = 32 # Q-networkを更新するバッチの大きさ
# Qネットワークとメモリ、Actorの生成--------------------------------------------------------
mainQN = QNetwork(hidden_size=hidden_size, learning_rate=learning_rate) # メインのQネットワーク
memory = Memory(max_size=memory_size)
actor = Actor()
# メインルーチン--------------------------------------------------------
for episode in range(num_episodes): # 試行数分繰り返す
env.reset(train_mode=True) # cartPoleの環境初期化
action_info=env.step(vector_action=[-1])
state=action_info[BRAIN_NAME].vector_observations #状態
reward_sum=0 #報酬合計
for t in range(max_number_of_steps + 1): # 1試行のループ
#5step毎に行動を決定
if t%5==0:
action = actor.get_action(state, mainQN) # 時刻tでの行動を決定する
#それ以外は何も行動しない
else:
action = -1
action_info = env.step(vector_action=[action]) # 行動a_tの実行による、s_{t+1}, _R{t}を計算する
next_state = action_info[BRAIN_NAME].vector_observations # 状態
reward = action_info[BRAIN_NAME].rewards[0] # 報酬
done = action_info[BRAIN_NAME].local_done[0] # 終了条件
max_reach = action_info[BRAIN_NAME].max_reached[0] # max_stepに達したか
reward_sum+=reward #報酬の合計
#終了条件かmax_stepに達したらnext_stateの状態は0とする
if done or max_reach:
next_state=np.zeros(next_state.shape)
if t%5==0:
memory.add((state, action, reward, next_state)) # メモリの更新する
# Qネットワークの重みを学習・更新する replay
if (memory.len() > batch_size):
mainQN.replay(memory, batch_size, gamma)
state = next_state # 状態更新
# 1施行終了時の処理
if done or max_reach:
print('%d Episode finished after %f time steps, %f reward_sum'
% (episode, t + 1,reward_sum))
break
#重みの保存
if (episode+1)%100==0:
if not os.path.exists(FILE_PATH):
os.mkdir(FILE_PATH)
mainQN.saver.save(mainQN.sess, mainQN.cwd + FILE_PATH + '/model_{}.ckpt'.format(episode+1))
print('\nsaved model')
env.close()
#結果
500episode
初期値によって学習の収束までにかかる時間は変わりますが、1000episodeまでにはおおよそ収束するのではないかと思います。
このときは500episodeで倒れることはなくなりました。
#参考