ChainerRLで三目並べを深層強化学習(Double DQN)してみた

  • 21
    Like
  • 1
    Comment

ChainerRLのベータ版がリリースされたとのことで、早速使わせていただきました。ここでは、Quick Start Guideのソースを参考に三目並べ(○×ゲーム)用に変更しています。

パッケージのインストール

はじめにChainerRLをインストールします。

pip install chainerrl

cmakeが必要になりますので、もし未導入のときは事前にインストールしておいてください。

brew install cmake

なお私の環境は以下の通りです。

  • macOS Sierra 10.12.3 (MBP Late2016)
  • Anaconda3-4.1.0
  • Chainer 1.21.0

ゲームボードの準備

プレーヤーの種類(DQN、ランダム打ち、人間など)に関わらず、○×ゲームをするにはゲームボードが必要になりますので、はじめに作成しておきます。
なお今回はファイルを分割せずに1つのファイルに全ソースを書いていきますので、冒頭に必要ライブラリもimportしておきます。

dqn.py
import chainer
import chainer.functions as F
import chainer.links as L
import chainerrl
import numpy as np

#ゲームボード
class Board():
    def reset(self):
        self.board = np.array([0] * 9, dtype=np.float32)
        self.winner = None
        self.missed = False
        self.done = False

    def move(self, act, turn):
        if self.board[act] == 0:
            self.board[act] = turn
            self.check_winner()
        else:
            self.winner = turn*-1
            self.missed = True
            self.done = True

    def check_winner(self):
        win_conditions = ((0,1,2),(3,4,5),(6,7,8),(0,3,6),(1,4,7),(2,5,8),(0,4,8),(2,4,6))
        for cond in win_conditions:
            if self.board[cond[0]] == self.board[cond[1]] == self.board[cond[2]]:
                if self.board[cond[0]]!=0:
                    self.winner=self.board[cond[0]]
                    self.done = True
                    return
        if np.count_nonzero(self.board) == 9:
            self.winner = 0
            self.done = True

    def get_empty_pos(self):
        empties = np.where(self.board==0)[0]
        if len(empties) > 0:
            return np.random.choice(empties)
        else:
            return 0

    def show(self):
        row = " {} | {} | {} "
        hr = "\n-----------\n"
        tempboard = []
        for i in self.board:
            if i == 1:
                tempboard.append("○")
            elif i == -1:
                tempboard.append("×")
            else:
                tempboard.append(" ")
        print((row + hr + row + hr + row).format(*tempboard))

機能は以下の5つ。Python初心者につきおぼつかないソースで恐縮ですが、何をやっているかは見ていただければわかると思います。

  • reset ゲームボードの初期化。各エピソードの開始前に実行
  • move 手の配置の実行。配置後に勝敗判定やミス(置けないマスへの配置)、ゲーム終了を判定
  • check_winner 勝利判定
  • get_empty_pos 配置可能なマスのインデックスのうち一つをランダムで取得。後述しますがランダム打ちさせる時に使います
  • show ボードの状態を表示します。人間との対戦用です

Explorer時に使用するランダム打ちの準備

局所解に陥らないよう、たまに冒険させるのがよいそうですし、Quickstartもそのような実装になっていましたので、ここでもそれにならいます。Quickstartではgymのそれを利用していましたが、ここでは自作するほかありませんので、以下のコードを末尾に追加します。

dqn.py
#explorer用のランダム関数オブジェクト
class RandomActor:
    def __init__(self, board):
        self.board = board
        self.random_count = 0
    def random_action_func(self):
        self.random_count += 1
        return self.board.get_empty_pos()

random_action_funcがこのオブジェクトの大切なところです。先ほど作成したBoardのget_empty_posを呼び出して、配置可能なマスを取得し呼び出し元に返却します。また、あとで統計情報としてどの程度この関数が使用されたか(DQNが考えた手でなくランダムで返したか)を把握できるように、カウンターをインクリメントします。この程度のものをなぜわざわざ別オブジェクトとしたか?についてはこのあとで説明します。

Q-functionの準備

DQNさせる上での本丸であり、ChainerRLの出番です。

dqn.py
#Q関数
class QFunction(chainer.Chain):
    def __init__(self, obs_size, n_actions, n_hidden_channels=81):
        super().__init__(
            l0=L.Linear(obs_size, n_hidden_channels),
            l1=L.Linear(n_hidden_channels, n_hidden_channels),
            l2=L.Linear(n_hidden_channels, n_hidden_channels),
            l3=L.Linear(n_hidden_channels, n_actions))
    def __call__(self, x, test=False):
        #-1を扱うのでleaky_reluとした
        h = F.leaky_relu(self.l0(x))
        h = F.leaky_relu(self.l1(h))
        h = F.leaky_relu(self.l2(h))
        return chainerrl.action_value.DiscreteActionValue(self.l3(h))

・・・以上です。ちょっと拍子抜けするほどシンプルですね。普通にNNを定義するのとほとんど変わりありません。

環境とAgentの準備

なんと、周辺に作り込むべきものはもうできあがったので、あとは環境とAgentを準備してゲームの進行を作り込むのみです。まずは環境とAgentから。

dqn.py
# ボードの準備
b = Board()
# explorer用のランダム関数オブジェクトの準備
ra = RandomActor(b)
# 環境と行動の次元数
obs_size = 9
n_actions = 9
# Q-functionとオプティマイザーのセットアップ
q_func = QFunction(obs_size, n_actions)
optimizer = chainer.optimizers.Adam(eps=1e-2)
optimizer.setup(q_func)
# 報酬の割引率
gamma = 0.95
# Epsilon-greedyを使ってたまに冒険。50000ステップでend_epsilonとなる
explorer = chainerrl.explorers.LinearDecayEpsilonGreedy(
    start_epsilon=1.0, end_epsilon=0.3, decay_steps=50000, random_action_func=ra.random_action_func)
# Experience ReplayというDQNで用いる学習手法で使うバッファ
replay_buffer = chainerrl.replay_buffer.ReplayBuffer(capacity=10 ** 6)
# Agentの生成(replay_buffer等を共有する2つ)
agent_p1 = chainerrl.agents.DoubleDQN(
    q_func, optimizer, replay_buffer, gamma, explorer,
    replay_start_size=500, update_frequency=1,
    target_update_frequency=100)
agent_p2 = chainerrl.agents.DoubleDQN(
    q_func, optimizer, replay_buffer, gamma, explorer,
    replay_start_size=500, update_frequency=1,
    target_update_frequency=100)

さて、Epsilon-greedyのところで、先ほど作ったRandomActorが登場します。冒険する際に利用する関数への参照をexplorerに予め渡しておく必要がありますが、その関数へ引数を渡せないっぽい?ので、事前にインスタンス化したRandomActorオブジェクトのメンバ変数にゲームボードへの参照を渡しておき、explorerの内部処理では引数なしにrandom_action_funcを呼んでもらって大丈夫にしてみた次第です。もっとスマートなやり方があると思いますので、むしろ教えてください。。

また、ε-greedyのやり方を、コンスタントな値とするのではなく徐々に減らしていく方式(LinearDecayEpsilonGreedy)に変更しています。最初1.0=常にランダムからはじめて、50000ステップかけて最終的に0.3まで減らします。この数字も妥当かどうかわからないのでいろいろ変えながらやるといいかもしれません。

Agentはoptimizerやreplay_bufferを共有するP1とP2を生成し互いに戦わせます。

ゲーム進行部分の作成

もう実行したくてだいぶうずうずしていることとお察ししますが、これを追記すれば実行できますので、もうちょっとガマンしてください。

dqn.py
#学習ゲーム回数
n_episodes = 20000
#カウンタの宣言
miss = 0
win = 0
draw = 0
#エピソードの繰り返し実行
for i in range(1, n_episodes + 1):
    b.reset()
    reward = 0
    agents = [agent_p1, agent_p2]
    turn = np.random.choice([0, 1])
    last_state = None
    while not b.done:
        #配置マス取得
        action = agents[turn].act_and_train(b.board.copy(), reward)
        #配置を実行
        b.move(action, 1)
        #配置の結果、終了時には報酬とカウンタに値をセットして学習
        if b.done == True:
            if b.winner == 1:
                reward = 1
                win += 1
            elif b.winner == 0:
                draw += 1
            else:
                reward = -1
            if b.missed is True:
                miss += 1
            #エピソードを終了して学習
            agents[turn].stop_episode_and_train(b.board.copy(), reward, True)
            #相手もエピソードを終了して学習。相手のミスは勝利として学習しないように
            if agents[1 if turn == 0 else 0].last_state is not None and b.missed is False:
                #前のターンでとっておいたlast_stateをaction実行後の状態として渡す
                agents[1 if turn == 0 else 0].stop_episode_and_train(last_state, reward*-1, True)
        else:
            #学習用にターン最後の状態を退避
            last_state = b.board.copy()
            #継続のときは盤面の値を反転
            b.board = b.board * -1
            #ターンを切り替え
            turn = 1 if turn == 0 else 0

    #コンソールに進捗表示
    if i % 100 == 0:
        print("episode:", i, " / rnd:", ra.random_count, " / miss:", miss, " / win:", win, " / draw:", draw, " / statistics:", agent_p1.get_statistics(), " / epsilon:", agent_p1.explorer.epsilon)
        #カウンタの初期化
        miss = 0
        win = 0
        draw = 0
        ra.random_count = 0
    if i % 10000 == 0:
        # 10000エピソードごとにモデルを保存
        agent_p1.save("result_" + str(i))

print("Training finished.")

20000ゲームを繰り返すfor文とゲーム内のターンを繰り返すwhile文の入れ子で構成されています。ポイントとしては、先攻も後攻もAgent自身です。このゲームでは内部的に○×の変わりに自身の手を1、相手の手を-1としてボードに配置していきますが、先攻後攻どちらの環境・アクションも学習したいので、ゲーム進行内で符号を出し分けるのではなく、常にボードには1を配置するようにします。

        #配置を実行
        b.move(action, 1)

当然そのままだとボードが1だらけになってしまいますので、ターン交代時にボードの符号を反転させています。

        #継続のときは盤面の値を反転
        else:
            b.board = b.board * -1

そして最後に、学習済みモデルを保存しています。このディレクトリは存在しない場合もChainerRLが作ってくれるようなので、10000エピソード毎にエピソード数を末尾に付したディレクトリで履歴を保存していくようにしました。同じexperienceで学習させてるので、保存するのはagent_p1のみとしています。

学習の実行

それでは、いざ実行してみましょう・・・!
はじめのうちはepsilonの値が大きいので、ほとんどがランダム打ち(rndの値がランダムで打った回数)となります。そのためミスも少ないのですが、徐々にランダム打ちが減るとDQNさんが自ら考えた手で打つ機会が増えるため一時的にミスも増えてきますが、学習が進むとそれも収束して15000回を超えるとほぼ1桁前半になりました。

episode: 100  / rnd: 761  / miss: 1  / win: 85  / draw: 14  / statistics: [('average_q', 0.11951273068342624), ('average_loss', 0.09235552993858538)]  / epsilon: 0.994778
episode: 200  / rnd: 722  / miss: 3  / win: 85  / draw: 12  / statistics: [('average_q', 0.35500590929140996), ('average_loss', 0.12790488153218765)]  / epsilon: 0.9895
episode: 300  / rnd: 756  / miss: 6  / win: 82  / draw: 12  / statistics: [('average_q', 0.6269444783473722), ('average_loss', 0.12164947750267516)]  / epsilon: 0.984278
 :(中略)
episode: 19800  / rnd: 212  / miss: 1  / win: 69  / draw: 30  / statistics: [('average_q', 0.49387913595157096), ('average_loss', 0.07891365175610675)]  / epsilon: 0.3
episode: 19900  / rnd: 229  / miss: 1  / win: 61  / draw: 38  / statistics: [('average_q', 0.49195677296191365), ('average_loss', 0.07796313042393459)]  / epsilon: 0.3
episode: 20000  / rnd: 216  / miss: 0  / win: 70  / draw: 30  / statistics: [('average_q', 0.509864846571749), ('average_loss', 0.07866546801090374)]  / epsilon: 0.3
Training finished.

いざ自分と対戦!

ミスせず打てているようですし、たまにランダム打ちしていながらも結構Drawになりますので、強さを確かめるべく自分と対戦してみます。

HumanPlayerの作成

まずは人間が打てるようにするためのインターフェイスとして、HumanPlayerなるオブジェクトを作成します。

dqn.py
#人間のプレーヤー
class HumanPlayer:
    def act(self, board):
        valid = False
        while not valid:
            try:
                act = input("Please enter 1-9: ")
                act = int(act)
                if act >= 1 and act <= 9 and board[act-1] == 0:
                    valid = True
                    return act-1
                else:
                    print ("Invalid move")
            except Exception as e:
                    print (act +  " is invalid")

対人ゲーム進行部分の作成

進行部分です。DQNのエージェントは1、人間は-1になるように固定しつつ、先攻・後攻はエピソード開始前に「DQNエージェントが先攻かどうか」ということを決定して初回をスキップする/しないを制御しています。その関係で、先攻・後攻に関わらずエージェントは常に○で人間は常に×です。

dqn.py
#検証
human_player = HumanPlayer()
for i in range(10):
    b.reset()
    dqn_first = np.random.choice([True, False])
    while not b.done:
        #DQN
        if dqn_first or np.count_nonzero(b.board) > 0:
            b.show()
            action = agent_p1.act(b.board.copy())
            b.move(action, 1)
            if b.done == True:
                if b.winner == 1:
                    print("DQN Win")
                elif b.winner == 0:
                    print("Draw")
                else:
                    print("DQN Missed")
                agent_p1.stop_episode()
                continue
        #人間
        b.show()
        action = human_player.act(b.board.copy())
        b.move(action, -1)
        if b.done == True:
            if b.winner == -1:
                print("HUMAN Win")
            elif b.winner == 0:
                print("Draw")
            agent_p1.stop_episode()

print("Test finished.")

ポイントは、agentはここでは学習しないので、act()およびstop_episode()を使用するようになっている点でしょうか。これもQuickstart通りです。

さて、これで対戦の準備は整いましたが、再度2万回学習させるのも不毛なので保存したAgentを読み込むようにします。本当はdqn.pyの実行パラメータで学習する/既存モデルを読み込むを切り替えるのがスマートですが、早くプレイしたいので以下の通り学習エピソード数を0にすることで学習処理をスキップします。

dqn.py
#学習ゲーム回数
n_episodes = 0

そして、学習処理の完了後に以下のコードを追加してモデルを読み込みます。

dqn.py
print("Training finished.")

agent_p1.load("result_20000")  #←これを追加

準備ができたら、いざ対戦です!

Training finished.
   |   |   
-----------
   |   |   
-----------
   |   |   
   |   |   
-----------
   |   |   
-----------
 ○ |   |   
Please enter 1-9: 1
 × |   |   
-----------
   |   |   
-----------
 ○ |   |   
 × |   |   
-----------
   |   |   
-----------
 ○ |   | ○ 
Please enter 1-9: 8

対戦できますね!やったー!!

おわりに

DQNもPythonもかじりたての私にお付き合いいただきありがとうございます。
「ルールを教えなくてもほぼ間違いなく・定石通り打てる」というところまで成長してくれたのはなかなか嬉しいものです。
しかも、Chainerをそのまま利用してDQNを実装するより、はるかにスッキリとしました。ChainerRLすごい!!見通しがよくなったことで、いろいろ改良しようとしてバグを混在させる・・・といったことが防げそうです。

間違っていたり「本来こうすべき」「こうした方が学習が進む」「これだと学習できていないよ」みたいなものが山ほどあると思いますので、いろいろご指摘をいただけると幸いです。何卒よろしくお願いします。

特に課題に思っているのは、Agentの打ち方がほぼ毎回同じになっているように見えることです。もっと冒険させなければいけないのかな?35万エピソード学習させると、定石通り打つので強く、εを0にするとほぼ毎回Drawとなるので、良いことではあるのですが。。なお15万〜20万エピソードからは結果もlossも一定になりました。

ソース全体

一応、ソースの全体を掲載しておきます。環境が揃っていればコピペしてすぐに動かせると思います。

dqn.py
import chainer
import chainer.functions as F
import chainer.links as L
import chainerrl
import numpy as np

#ゲームボード
class Board():
    def reset(self):
        self.board = np.array([0] * 9, dtype=np.float32)
        self.winner = None
        self.missed = False
        self.done = False

    def move(self, act, turn):
        if self.board[act] == 0:
            self.board[act] = turn
            self.check_winner()
        else:
            self.winner = turn*-1
            self.missed = True
            self.done = True

    def check_winner(self):
        win_conditions = ((0,1,2),(3,4,5),(6,7,8),(0,3,6),(1,4,7),(2,5,8),(0,4,8),(2,4,6))
        for cond in win_conditions:
            if self.board[cond[0]] == self.board[cond[1]] == self.board[cond[2]]:
                if self.board[cond[0]]!=0:
                    self.winner=self.board[cond[0]]
                    self.done = True
                    return
        if np.count_nonzero(self.board) == 9:
            self.winner = 0
            self.done = True

    def get_empty_pos(self):
        empties = np.where(self.board==0)[0]
        if len(empties) > 0:
            return np.random.choice(empties)
        else:
            return 0

    def show(self):
        row = " {} | {} | {} "
        hr = "\n-----------\n"
        tempboard = []
        for i in self.board:
            if i == 1:
                tempboard.append("○")
            elif i == -1:
                tempboard.append("×")
            else:
                tempboard.append(" ")
        print((row + hr + row + hr + row).format(*tempboard))

#explorer用のランダム関数オブジェクト
class RandomActor:
    def __init__(self, board):
        self.board = board
        self.random_count = 0
    def random_action_func(self):
        self.random_count += 1
        return self.board.get_empty_pos()

#Q関数
class QFunction(chainer.Chain):
    def __init__(self, obs_size, n_actions, n_hidden_channels=81):
        super().__init__(
            l0=L.Linear(obs_size, n_hidden_channels),
            l1=L.Linear(n_hidden_channels, n_hidden_channels),
            l2=L.Linear(n_hidden_channels, n_hidden_channels),
            l3=L.Linear(n_hidden_channels, n_actions))
    def __call__(self, x, test=False):
        #-1を扱うのでleaky_reluとした
        h = F.leaky_relu(self.l0(x))
        h = F.leaky_relu(self.l1(h))
        h = F.leaky_relu(self.l2(h))
        return chainerrl.action_value.DiscreteActionValue(self.l3(h))

# ボードの準備
b = Board()
# explorer用のランダム関数オブジェクトの準備
ra = RandomActor(b)
# 環境と行動の次元数
obs_size = 9
n_actions = 9
# Q-functionとオプティマイザーのセットアップ
q_func = QFunction(obs_size, n_actions)
optimizer = chainer.optimizers.Adam(eps=1e-2)
optimizer.setup(q_func)
# 報酬の割引率
gamma = 0.95
# Epsilon-greedyを使ってたまに冒険。50000ステップでend_epsilonとなる
explorer = chainerrl.explorers.LinearDecayEpsilonGreedy(
    start_epsilon=1.0, end_epsilon=0.3, decay_steps=50000, random_action_func=ra.random_action_func)
# Experience ReplayというDQNで用いる学習手法で使うバッファ
replay_buffer = chainerrl.replay_buffer.ReplayBuffer(capacity=10 ** 6)
# Agentの生成(replay_buffer等を共有する2つ)
agent_p1 = chainerrl.agents.DoubleDQN(
    q_func, optimizer, replay_buffer, gamma, explorer,
    replay_start_size=500, update_frequency=1,
    target_update_frequency=100)
agent_p2 = chainerrl.agents.DoubleDQN(
    q_func, optimizer, replay_buffer, gamma, explorer,
    replay_start_size=500, update_frequency=1,
    target_update_frequency=100)

#学習ゲーム回数
n_episodes = 20000
#カウンタの宣言
miss = 0
win = 0
draw = 0
#エピソードの繰り返し実行
for i in range(1, n_episodes + 1):
    b.reset()
    reward = 0
    agents = [agent_p1, agent_p2]
    turn = np.random.choice([0, 1])
    last_state = None
    while not b.done:
        #配置マス取得
        action = agents[turn].act_and_train(b.board.copy(), reward)
        #配置を実行
        b.move(action, 1)
        #配置の結果、終了時には報酬とカウンタに値をセットして学習
        if b.done == True:
            if b.winner == 1:
                reward = 1
                win += 1
            elif b.winner == 0:
                draw += 1
            else:
                reward = -1
            if b.missed is True:
                miss += 1
            #エピソードを終了して学習
            agents[turn].stop_episode_and_train(b.board.copy(), reward, True)
            #相手もエピソードを終了して学習。相手のミスは勝利として学習しないように
            if agents[1 if turn == 0 else 0].last_state is not None and b.missed is False:
                #前のターンでとっておいたlast_stateをaction実行後の状態として渡す
                agents[1 if turn == 0 else 0].stop_episode_and_train(last_state, reward*-1, True)
        else:
            #学習用にターン最後の状態を退避
            last_state = b.board.copy()
            #継続のときは盤面の値を反転
            b.board = b.board * -1
            #ターンを切り替え
            turn = 1 if turn == 0 else 0

    #コンソールに進捗表示
    if i % 100 == 0:
        print("episode:", i, " / rnd:", ra.random_count, " / miss:", miss, " / win:", win, " / draw:", draw, " / statistics:", agent_p1.get_statistics(), " / epsilon:", agent_p1.explorer.epsilon)
        #カウンタの初期化
        miss = 0
        win = 0
        draw = 0
        ra.random_count = 0
    if i % 10000 == 0:
        # 10000エピソードごとにモデルを保存
        agent_p1.save("result_" + str(i))

print("Training finished.")

#人間のプレーヤー
class HumanPlayer:
    def act(self, board):
        valid = False
        while not valid:
            try:
                act = input("Please enter 1-9: ")
                act = int(act)
                if act >= 1 and act <= 9 and board[act-1] == 0:
                    valid = True
                    return act-1
                else:
                    print("Invalid move")
            except Exception as e:
                print(act +  " is invalid")

#検証
human_player = HumanPlayer()
for i in range(10):
    b.reset()
    dqn_first = np.random.choice([True, False])
    while not b.done:
        #DQN
        if dqn_first or np.count_nonzero(b.board) > 0:
            b.show()
            action = agent_p1.act(b.board.copy())
            b.move(action, 1)
            if b.done == True:
                if b.winner == 1:
                    print("DQN Win")
                elif b.winner == 0:
                    print("Draw")
                else:
                    print("DQN Missed")
                agent_p1.stop_episode()
                continue
        #人間
        b.show()
        action = human_player.act(b.board.copy())
        b.move(action, -1)
        if b.done == True:
            if b.winner == -1:
                print("HUMAN Win")
            elif b.winner == 0:
                print("Draw")
            agent_p1.stop_episode()

print("Test finished.")

参考サイト