103
136

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

【強化学習】UnityとPythonを使ってDQNアルゴリズム実装してみた

Last updated at Posted at 2018-05-22

#前置き
Pythonには科学計算をするため様々なライブラリやフレームワークが揃っており、強化学習をやるのにとても便利です。
しかし、強化学習をするためにはアルゴリズムの実装だけでなく、行動するエージェントや、変化する環境などのゲーム部分も実装していく必要があります。
残念ながら、Pythonはそういったゲーム部分を実装するのはとても面倒です。

一方で、Unityは初心者でもわりとクオリティの高いゲームを作れる環境が整っています。
なので、Unityでゲーム部分を実装、Pythonでアルゴリズムの実装、というのがいいかなと個人的に考えています。

#Unity ML-Agents

しかし、UnityとPythonで強化学習を行っていくためには、両者の間でうまく連携させる仕組みを作る必要があります。
今の所その方法が私にはわからないので、今回はUnity ML-Agentsというものを利用していきます。

UnityとPythonを連携する仕組みについて解説されている記事って全くないんですよね・・・。
最近、Pythonは急激に人気が高まってきたので、そのあたり需要があるかと思ったんですが、そうでもないのかもしれません。

#Unity側の実装

ではさっそく実装していきたいと思います。

SnapCrab_NoName_2018-5-22_11-41-23_No-00.png

今回作るのは倒立振子です。

実装や設定に関しては、
【Unity強化学習】自作ゲームで強化学習
こちらで解説したのと同じものになります。

ちなみにAcademyの設定だけすこし変更しました。

SnapCrab_NoName_2018-5-22_11-44-16_No-00.png

これはPythonでUnityアプリを開いたときの画面の大きさです。デフォルトの80×80だと小さくて見えづらいので、1280×720にしました。

あとはBrainTypeをExternalにしてPythonフォルダにビルドします。
名前はCartPole_DQNにしました。

SnapCrab_NoName_2018-5-22_11-47-30_No-00.png SnapCrab_NoName_2018-5-22_11-50-44_No-00.png

#Python側の実装

Pythonフォルダにあるunityagentsの中身が、UnityとPythonを連携させるための、Python側のファイル群です。

ML-Agentsはデフォルトでは、PPOアルゴリズムを使用しており、unitytrainersフォルダは、そのモデルの実装と学習させるための諸々のファイル群です。今回はDQNアルゴリズムを実装するので使いません。

Pythonフォルダに次のようなpyファイルを書いて実行してみましょう。

import numpy as np

from unityagents import UnityEnvironment

env_path='CartPole_DQN'
brain_name='CartPoleBrain'

if __name__=='__main__':

    #環境を構築
    env=UnityEnvironment(file_name=env_path)
    #環境リセット
    env.reset(train_mode=False)
    for step in range(1000):
        #ランダム行動
        action=np.random.randint(0,2)
        #step
        action_info=env.step(vector_action=action)
        #状態
        state=action_info[brain_name].vector_observations
        #報酬
        reward=action_info[brain_name].rewards
        #終了条件
        done=action_info[brain_name].local_done
        #max_step数に達したか
        max_reach=action_info[brain_name].max_reached
        #表示
        print('\n ===== {} step ======'.format(step))
        print('\naction=', action)
        print('\nstate=', state)
        print('\nreward=', reward)
        print('\ndone=', done)
        print('\nmax_reach=', max_reach)


    #環境終了
    env.close()

ランダムな行動を1000step実行して、そのときの観測状態を表示します。

CartPole_test.gif

SnapCrab_NoName_2018-5-23_15-39-57_No-00.png

Unity、Python両方とも画像のように動けば問題ありません。

これで、Unity,Python間を連携させる仕組みができたので、あとはアルゴリズムを実装して強化学習を行っていきます。

DQNアルゴリズムに関しては、
【強化学習初心者向け】シンプルな実装例で学ぶQ学習、DQN、DDQN【CartPoleで棒立て:1ファイルで完結、Kearas使用】
こちらの記事を参考にさせていただきました。

# coding:utf-8
# 必要なライブラリのインポート
import numpy as np
import tensorflow as tf
import os

from collections import deque
from unityagents import UnityEnvironment

# Q関数をディープラーニングのネットワークをクラスとして定義
class QNetwork:
    def __init__(self, learning_rate=0.01, state_size=3, action_size=2, hidden_size=10):
        self.x = tf.placeholder(tf.float32, [None, state_size])

        self.W_d1 = tf.Variable(tf.truncated_normal([state_size, hidden_size], stddev=0.01))
        self.b_d1 = tf.Variable(tf.zeros([hidden_size]))
        self.h_d1 = tf.nn.relu(tf.matmul(self.x, self.W_d1) + self.b_d1)

        self.W_d2 = tf.Variable(tf.truncated_normal([hidden_size, hidden_size], stddev=0.01))
        self.b_d2 = tf.Variable(tf.zeros([hidden_size]))
        self.h_d2 = tf.nn.relu(tf.matmul(self.h_d1, self.W_d2) + self.b_d2)

        self.W_out = tf.Variable(tf.truncated_normal([hidden_size, action_size], stddev=0.01))
        self.b_out = tf.Variable(tf.zeros([action_size]))

        self.y = tf.matmul(self.h_d2, self.W_out) + self.b_out

        # loss function
        self.y_ = tf.placeholder(tf.float32, [None, action_size])
        self.loss = tf.reduce_mean(tf.losses.huber_loss(self.y_, self.y))

        # train operation
        optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
        self.training = optimizer.minimize(self.loss)

        # session
        self.sess = tf.Session()
        self.sess.run(tf.global_variables_initializer())

        #saver
        self.saver=tf.train.Saver()
        self.cwd=os.getcwd()

    # 重みの学習
    def replay(self, memory, batch_size, gamma):
        inputs = np.zeros((batch_size, 3))
        targets = np.zeros((batch_size, 2))
        mini_batch = memory.sample(batch_size)

        #next_stateの状態が0のとき…rが教師信号
        #next_stateの状態が0以外のとき…r+γmaxQ(s,a)が教師信号
        for i, (state_b, action_b, reward_b, next_state_b) in enumerate(mini_batch):
            inputs[i:i + 1] = state_b
            target=reward_b

            if not (next_state_b==np.zeros(state_b.shape)).all(axis=1):
                retmainQs = self.sess.run(self.y, feed_dict={self.x: next_state_b})[0]
                next_action = np.argmax(retmainQs)  # 最大の報酬を返す行動を選択する
                target = reward_b + gamma * retmainQs[next_action]

            targets[i]=self.sess.run(self.y,feed_dict={self.x:state_b}) #Qネットワークの出力
            targets[i][action_b]=target #教師信号
            self.sess.run(self.training,feed_dict={self.x:inputs,self.y_:targets})

# Experience ReplayとFixed Target Q-Networkを実現するメモリクラス
class Memory:
    def __init__(self, max_size=1000):
        self.buffer = deque(maxlen=max_size)

    def add(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        idx = np.random.choice(np.arange(len(self.buffer)), size=batch_size, replace=False)
        return [self.buffer[ii] for ii in idx]

    def len(self):
        return len(self.buffer)


# カートの状態に応じて、行動を決定するクラス
class Actor:
    def get_action(self, state, targetQN):   # [C]t+1での行動を返す
        # 徐々に最適行動のみをとる、ε-greedy法
        epsilon = 0.01

        if epsilon <= np.random.uniform(0, 1):
            retTargetQs = targetQN.sess.run(targetQN.y,feed_dict={targetQN.x:state})[0]
            action = np.argmax(retTargetQs)  # 最大の報酬を返す行動を選択する

        else:
            action = np.random.choice([0, 1])  # ランダムに行動する

        return action


if __name__=='__main__':
    # メイン関数開始----------------------------------------------------
    #  初期設定--------------------------------------------------------
    FILE_PATH='./models/CartPole_DQN'
    ENV_PATH = 'CartPole_DQN'
    BRAIN_NAME = 'CartPoleBrain'

    env = UnityEnvironment(file_name=ENV_PATH)
    num_episodes = 3000  # 総試行回数
    max_number_of_steps = 1000  # 1試行の最大step数
    gamma = 0.99  # 割引係数
    # ---
    hidden_size = 16  # Q-networkの隠れ層のニューロンの数
    learning_rate = 0.00001  # Q-networkの学習係数
    memory_size = 10000  # バッファーメモリの大きさ
    batch_size = 32  # Q-networkを更新するバッチの大きさ

    # Qネットワークとメモリ、Actorの生成--------------------------------------------------------
    mainQN = QNetwork(hidden_size=hidden_size, learning_rate=learning_rate)  # メインのQネットワーク
    memory = Memory(max_size=memory_size)
    actor = Actor()

    # メインルーチン--------------------------------------------------------
    for episode in range(num_episodes):  # 試行数分繰り返す
        env.reset(train_mode=True)  # cartPoleの環境初期化
        action_info=env.step(vector_action=[-1])
        state=action_info[BRAIN_NAME].vector_observations #状態

        reward_sum=0 #報酬合計

        for t in range(max_number_of_steps + 1):  # 1試行のループ
            #5step毎に行動を決定
            if t%5==0:
                action = actor.get_action(state, mainQN)  # 時刻tでの行動を決定する
            #それ以外は何も行動しない
            else:
                action = -1

            action_info = env.step(vector_action=[action])  # 行動a_tの実行による、s_{t+1}, _R{t}を計算する

            next_state = action_info[BRAIN_NAME].vector_observations  # 状態
            reward = action_info[BRAIN_NAME].rewards[0]  # 報酬
            done = action_info[BRAIN_NAME].local_done[0]  # 終了条件
            max_reach = action_info[BRAIN_NAME].max_reached[0]  # max_stepに達したか

            reward_sum+=reward #報酬の合計

            #終了条件かmax_stepに達したらnext_stateの状態は0とする
            if done or max_reach:
                next_state=np.zeros(next_state.shape)

            if t%5==0:
                memory.add((state, action, reward, next_state))  # メモリの更新する
                # Qネットワークの重みを学習・更新する replay
                if (memory.len() > batch_size):
                    mainQN.replay(memory, batch_size, gamma)

            state = next_state  # 状態更新

            # 1施行終了時の処理
            if done or max_reach:
                print('%d Episode finished after %f time steps, %f reward_sum'
                      % (episode, t + 1,reward_sum))
                break

        #重みの保存
        if (episode+1)%100==0:
            if not os.path.exists(FILE_PATH):
                os.mkdir(FILE_PATH)
            mainQN.saver.save(mainQN.sess, mainQN.cwd + FILE_PATH + '/model_{}.ckpt'.format(episode+1))
            print('\nsaved model')

    env.close()

#結果

500episode

CartPole.gif

初期値によって学習の収束までにかかる時間は変わりますが、1000episodeまでにはおおよそ収束するのではないかと思います。

このときは500episodeで倒れることはなくなりました。

#参考

  1. 【強化学習初心者向け】シンプルな実装例で学ぶQ学習、DQN、DDQN【CartPoleで棒立て:1ファイルで完結、Kearas使用】
  2. 【Unity強化学習】自作ゲームで強化学習
  3. Tensorflow Deep Q-Network
103
136
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
103
136

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?