Python
機械学習
強化学習

三目並べを強化学習する

More than 3 years have passed since last update.

参加させて頂いている勉強会にて三目並べを強化学習する話が出ていたのでコード書いてみました。

参考文献

モンテカルロ法による強化学習超概要

今回使用したのがモンテカルロ法(方策オン型)なので、モンテカルロ法周辺だけ書きます。
(勉強会で話を聞いたのと、ちょろっと本読んだだけなので内容には自信がありませんが。。)
モンテカルロ法について一言で書くと、サンプルエピソード形式の経験から価値観数と最適方策を学習する方法で、政策反復を行いながら、政策評価と政策改善を行っていく手法(らしい)です。
(コードも大まかに政策反復、政策評価、政策改善のブロックに分ける事が出来ます)
以下、利点と欠点を記載します。

モンテカルロ法の利点

  • 環境のダイナミクスの事前知識(モデル)を必要としない
  • それでいながら最適な挙動を達成できる
  • 動的計画法(DP)のように全ての遷移の完全な確率分布を必要としない

モンテカルロ法の問題点

  • エピソードが終わるまで収益がわからないため、完全な漸進型の手続きに実装できない  (TD法であれば完全な漸進型の手続きに実装が可能)
  • エピソードの途中での行動は評価されたとしても、割引が必要となる
  • 学習に十分な量のエピソードが必要で、標本抽出の時間コストが大きい  (上の参考書籍にはTD法と比較して収束性が良くないとも記載がある)

ただ、「強くなるロボティック・ゲームプレイヤーの作り方」に記載されている三目並べの勝率を見ると、モンテカルロ法が最も高かったため、モンテカルロ法のコードを書いています。

参考コード

三目並べなので離散的な空間での学習で、かつ、最終時間ステップTが存在する場合のコードになります。
以下のコードは「強くなるロボティック・ゲームプレイヤーの作り方」に記載されているものをパクった(ryとほぼ同じです。
では、以下にコードを記載していきます。

# coding: utf-8

import numpy as np
from math import floor
import matplotlib.pyplot as plt

class MonteCarloPolycyIteration:
    n_states = 3**9 # 状態数
    n_actions = 9   # 行動数
    T = 5           # 最大ステップ数

    visites = None
    states = None
    actions = None
    rewards = None
    drewards = None
    results = None
    rate = []

    # 盤面を回転させると同じ状態になる
    # その変換を行い状態数を減らす
    convert = [[0,1,2,3,4,5,6,7,8], # 元の状態
               [2,1,0,5,4,3,8,7,6], # 変換(2)
               [6,3,0,7,4,1,8,5,2], # 変換(3)
               [0,3,8,1,4,7,2,5,8], # 変換(4)
               [8,7,6,5,4,3,2,1,0], # 変換(5)
               [6,7,8,3,4,5,0,1,2], # 変換(6)
               [2,5,8,1,4,7,0,3,6], # 変換(7)
               [8,5,2,7,4,1,6,3,0]  # 変換(8)
               ]
    # 3進数から10進数への変換用ベクトル
    power = np.array([ 3**i for i in xrange(8,-1,-1) ], dtype=np.float64)

    def __init__(self,L,M,options):
        self.L = L # 政策反復回数
        self.M = M # エピソード数
        self.options = options
        self.Q =  self.init_Q()
        np.random.seed(555)

    def init_Q(self):
        """ Q関数の初期化(initialize look up table)  """
        return np.zeros((self.n_states,self.n_actions))

    def train(self):
        # policy iteration
        for l in xrange(self.L):
            self.visites = self.init_visites()
            self.states = self.init_matrix()
            self.actions = self.init_matrix()
            self.rewards = self.init_matrix()
            self.drewards = self.init_matrix()
            self.results = self.init_results()

            # episode
            for m in xrange(self.M):
                state3 = self.init_state3()

                # step
                for t in xrange(self.T):
                    state = self.encode(state3)
                    policy = self.generate_policy()
                    policy = self.policy_improvement(policy,state)

                    # 行動選択および実行
                    action, reward, state3, fin = self.action_train(policy, t, state3)

                    self.update(m, t, state, action, reward)

                    if self.isfinished(fin):
                        #print 'fin_state',state3
                        self.results[m] = fin

                        # 割引報酬和の計算
                        self.calculate_discounted_rewards(m, t)
                        break
            self.Q = self.calculate_state_action_value_function()
            self.output_results(l)
            self.rate.append( float(len(self.results[self.results==2]))/self.M )

    def init_visites(self):
        return np.ones((self.n_states, self.n_actions))

    def init_matrix(self):
        return np.ones((self.M, self.T))

    def init_results(self):
        return np.zeros(self.M)

    def init_state3(self):
        return np.zeros(self.n_actions)

    def generate_policy(self):
        return np.zeros(self.n_actions)

    def policy_improvement(self,policy,state):
        if self.options['pmode']==0:
            q = self.Q[state] # 現在の政策での状態価値
            v = max(q) # 現在の状態での最適な行動における価値
            a = np.where(q==v)[0][0] # 最適行動
            policy[a] = 1
        elif self.options['pmode']==1:
            q = self.Q[state] # 現在の政策での状態価値
            v = max(q) # 現在の状態での最適な行動における価値
            a = np.where(q==v)[0][0] # 最適行動
            policy = np.ones(self.n_actions)*self.options['epsilon']/self.n_actions
            policy[a] = 1-self.options['epsilon']+self.options['epsilon']/self.n_actions
        elif self.options['pmode']==2:
            policy = np.exp(self.Q[state]/self.options['tau'])/ \
                        sum(np.exp(self.Q[state]/self.options['tau']))
        return policy

    def action_train(self, policy, t, state3):
        npc_action = self.select_npc_action(t, state3, policy)
        state3[npc_action] = 2
        fin = self.check_game(state3)
        reward = self.calculate_reward(fin)
        if reward is not None:
            return npc_action, reward, state3, fin

        enemy_action = self.select_enemy_action(t, state3)
        state3[enemy_action] = 1
        fin = self.check_game(state3)
        reward = self.calculate_reward(fin)
        return npc_action, reward, state3, fin

    def select_npc_action(self, step, state3, policy):
        a = None
        if step==0:
            a = 0
        else:
            while 1:
                random = np.random.rand()
                cprob = 0
                for a in xrange(self.n_actions):
                    cprob += policy[a]
                    if random<cprob: break  
                # 既にマスが埋まってないかどうか確認
                if state3[a]==0:break
        return a

    def select_enemy_action(self, step, state3):
        reach = 0
        pos = [[0,1,2],[3,4,5],[6,7,8],[0,3,6],[1,4,7],[1,5,8],[0,4,8],[2,4,6]]
        a = None
        for i in xrange(len(pos)):
            state_i = state3[pos[i]]
            val = sum(state_i)
            num = len(state_i[state_i==0])
            if val==2 and num==1:
                idx = int( state_i[state_i==0][0] )
                a = pos[i][idx]
                reach = 1
                break
        if reach==0:
            while 1:
                a = floor(np.random.rand()*8)+1
                if state3[a]==0: break
        return a        

    def check_game(self, state3):
        pos = [[0,1,2],[3,4,5],[6,7,8],[0,3,6],[1,4,7],[1,5,8],[0,4,8],[2,4,6]]
        for i in xrange(len(pos)):
            state_i = state3[pos[i]]
            val_npc = sum(state_i==1)
            val_enemy = sum(state_i==2)
            if val_npc==3:
                return 1 # 勝ち
            elif val_enemy==3:
                return 2 # 負け
        if sum(state3==0)==0:
            return 3 # 引き分け
        return 0 # ゲーム続行

    def calculate_reward(self, fin):
        if fin==2:
            return 10
        elif fin==1:
            return -10
        elif fin==3:
            return 0
        elif fin==0:
            return None        

    def update(self,m,t,state,action,reward):
        """ 状態、行動、報酬、出現回数の更新 """
        self.states[m,t] = state
        self.actions[m,t] = action
        self.rewards[m,t] = reward
        self.visites[state, action] += 1

    def isfinished(self,fin):
        if fin>0: return True
        else: return False

    def calculate_discounted_rewards(self, m, t):
        for pstep in xrange(t-1,-1,-1):
            self.drewards[m,t] = self.rewards[m,t]
            self.drewards[m,pstep] = \
                    self.options['gamma']*self.drewards[m,pstep+1]

    def calculate_state_action_value_function(self):
        Q = self.init_Q()
        for m in xrange(self.M):
            for t in xrange(self.T):
                s = self.states[m,t]
                if s==0: break # 9手目までに終わっている場合は処理終了
                a =self.actions[m,t]
                Q[s,a] += self.drewards[m,t]
        return Q/self.visites

    def output_results(self,l):
        print 'l=%d: Win=%d/%d, Draw=%d/%d, Lose=%d/%d\n' % (l, \
                        len(self.results[self.results==2]),self.M, \
                        len(self.results[self.results==3]),self.M, \
                        len(self.results[self.results==1]),self.M)        

    def encode(self, state3):        
        # stateに(2)~(8)の8種類の変換を加えた後、10進数へ変換
        cands = [ sum(state3[self.convert[i]]*self.power) # indexを入れ替えて、10進数に変換
                    for i in xrange(len(self.convert))]
        # 8個の候補のうち一番小さいものを選ぶ
        return min(cands)+1

if __name__=='__main__':
    options = {'pmode':1,'epsilon':0.05,'tau':2,'gamma':0.9}
    mcpi= MonteCarloPolycyIteration(100,100,options)
    mcpi.train()

    plt.plot(range(len(mcpi.rate)), mcpi.rate)
    plt.show()

実験結果

100stepを1セットとしてεグリーディ法を使用して実験しました。
以下、学習中の100セット分の勝率を掲載します。
win_rate.png

まとめ

学習は上手くいっているようですが、計算遅いです。。
モンテカルロ法をpythonで実装するのは無謀ですね。。
ちなみに、softmax方でも同じような結果が得られました。

お手数ですが間違いありましたら、ご指摘いただけますと助かります。