LoginSignup
6
1

More than 1 year has passed since last update.

8パズルで強化学習

Last updated at Posted at 2021-09-23

0. はじめに

Pythonで学ぶ強化学習を読みましたので、8パズルを題材に強化学習を実装してみたいと思います。
正直まだよくわかっていませんが、頭の整理のためここに記事を残したいと思います。
間違いや実装のよくない点などございましたら、ご指摘いただけますと幸いです。

0-1. この記事でやること

8パズルを題材に強化学習の各手法を実装します。
扱うのはValue Iteration、Q-Learning、ニューラルネットワークを使ったQ-Learningです。

以前、8パズルで各種グラフ探索系のアルゴリズムを整理しました。今回は強化学習で8パズルを解きます。

8パズルは基本的な探索(BFS)で十分解くことのできるので、わざわざ強化学習を使う必要はありません(ニューラルネットワークでFizzBuzzをやるようなものです)。ですが、自分の手で強化学習を実装するための、いい感じのチュートリアルになるのではと考えやってみました。 
数式少なめ、お気持ちや実装がメインです。
「強化学習の手始めになにか実装してみたい」という方向けです。
新しい工夫や手法を実装したら随時加筆していく予定です。

コードはgithubに載せています。
https://github.com/persimmon-persimmon/rl-eight-puzzle

0-2.強化学習って?

強化学習については以下のような素晴らしい記事がたくさんありますので、この記事では詳細な説明は割愛させていただきます。

強化学習には大きく分けて価値ベース、戦略ベースという分類がありますが、この記事で扱うのはすべて価値ベースの手法です。

私の理解でおおざっぱに説明しますと以下みたいな感じです。


環境とAgentがある。Agentは環境に対して何かしらの行動を選択し、行動の結果を得る。その経験に基づき学習していく。
用語として、環境の状態(state)、環境に対するAgentの行動(action)、行動の報酬(reward)がある。
環境の状態(state)に対しAgentが行動(action)を行い、報酬(reward)を得て、状態が次の状態に遷移する(next_state)。
これらの(state,action,reward,next_state)をまとめたものを経験(experience)と呼ぶ。

例えば経験(s0,a0,r0,s1)を得たとする。

・報酬r0 > 0なら、状態s0において行動a0は良い行動と評価される。反対に負の報酬なら良くない行動と評価される。
・報酬r0 > 0 でなくても、遷移先の状態s1が良い状態なら、状態s0において行動a0は良い行動と評価される。反対にs1が良くない状態なら、良くない行動と評価される。
・状態s0での行動a0が良い行動と評価されたら、以後、状態s0で行動a0を選択する確率を上げる。良くない行動なら、選択確率を下げる。
・以上を繰り返し、良い行動、良い状態を評価する目利きを得る。


例として、ウィニングイレブン的なサッカーの環境を考えます。状態は選手の位置、ボールの位置などです。行動は選手の操作です。Agentはウィニングイレブン的に選手を操作します。ゴールを決めた時のみ報酬が発生するものとします。
ある瞬間、コーナーキックからゴールを決めたとします。Agentはコーナーキックという状態は報酬が発生しやすいのだと学習し、コーナーキックの状態価値を上げます。次からコーナーキックに遷移しやすい行動を取るようになります。

まあ、言ってることはなんとなくわかると思います。
ですが問題は「具体的にどうやって実装するんですか?」という点ではないでしょうか。

上の説明の中で「状態の価値」「状態における行動の価値」という言葉が出ました。価値ベースの強化学習ではこれらを返す関数を推定することを目的とします。

状態の価値を返す関数
$V:state$ -> 価値

状態における行動の価値を返す関数
$Q:(state,action)$ -> 価値

0-3. 8パズルって?

スライドパズルとも呼ばれます。以下の画像のようなパズルのことです。画像のものは4x4の15パズルです。これのひとつ小さい3x3のものは8パズルと呼ばれます。
正方形に並んだマスのうち一つだけ空白マスがあり、それを利用して数字を順番に並べるパズルです

スクリーンショット2021-08-20 23.09.22.png

以下の記事でも解説しているので参照ください。

0-4. 8パズルの実装

8パズルの実装を載せます。ポイントはstepという関数です。
行動を引数に指定すると、(状態、報酬、終了フラグ、情報)を返しますが、報酬が発生するのは終了状態に遷移できた時のみで、そうでないとき報酬は常に0になります。
終了状態に近づくような良い行動をとっても、環境から報酬は得られないということです。
環境外で「今の行動で終了に近づいた。これは良い行動だった」と判断できるようにならないといけません。

square_puzzle.py

from random import randint
class SquarePuzzle:
    """
    正方形パズルクラス
    一辺の長さ=3の場合,8パズルになる. 以下が最終状態. 空白マスは0で表す.
    1 2 3
    4 5 6
    7 8 0
    一辺の長さ=4の場合,15パズルになる.
    """
    def __init__(self,edge_length=3,board=None):
        """
        コンストラクタ.
        edge_length:一辺の長さ.
        board:初期状態を指定する場合に使う. マスの配置を一次元化したもの.
        """
        if board is not None:
            assert len(board)==edge_length**2,f"invalid square. edge_length={edge_length} and board={board}"
            self.space = [x for x in range(edge_length ** 2) if board[x]==0][0]
            board = list(board)
        else:
            board=[i+1 for i in range(edge_length**2)]
            board[-1]=0  
            self.space = edge_length ** 2 - 1
        self.edge_length = edge_length
        self.board = board
        self.actions = [[0,1],[0,-1],[1,0],[-1,0]]
        self.step_count = 0

    def reset(self,shuffle_count=100):
        """
        板を初期化する.
        最終状態からshuffle_count回シャッフルする.shuffle_countが増えるほどパズルの難易度が上がる.
        """
        self.board=[i + 1 for i in range(self.edge_length ** 2)]
        self.board[-1]=0  
        self.space = self.edge_length ** 2 - 1
        self.step_count = 0
        pre_space = -1
        for _ in range(shuffle_count):
            i, j = divmod(self.space, self.edge_length)
            di, dj = self.actions[randint(0, len(self.actions) - 1)]
            ni, nj = i + di,j + dj
            if 0 <= ni < self.edge_length and 0 <= nj < self.edge_length and ni * self.edge_length + nj != pre_space:
                self.board[self.space], self.board[ni * self.edge_length+nj] = self.board[ni * self.edge_length + nj], self.board[self.space]
                pre_space = self.space
                self.space = ni * self.edge_length + nj
        return tuple(self.board)

    def step(self,action,air=False):
        """
        行動の結果(状態,報酬,終了フラグ,info)を返す.
        指定の方向へ動かせない場合,状態を変えず返す.
        action:行動.空白マスを動かす方向を表す. {0:右,1:左,2:下,3:上}
        air:状態を変えずに行動の結果を取得したい場合Trueにする.
        """
        if not air:self.step_count += 1
        i,j = divmod(self.space,self.edge_length)
        di,dj = self.actions[action]
        ni,nj = i+di,j+dj
        if air:
            board_ = self.board.copy()
        else:
            board_ = self.board
        if 0 <= ni < self.edge_length and 0 <= nj < self.edge_length:
            board_[self.space],board_[ni*self.edge_length+nj] = board_[ni*self.edge_length+nj],board_[self.space]
            if not air:self.space = ni * self.edge_length+nj
        done = all(board_[i] == (i + 1) % (self.edge_length ** 2) for i in range(self.edge_length ** 2))
        reward = 1 if done else 0
        info = {"step_count":self.step_count}
        return tuple(board_), reward, done, info
    
    def get_state(self):
        """
        現在の状態を返す.
        """
        return tuple(self.board)
        
    def get_able_actions(self):
        """
        可能なアクションを返す.
        """
        ret = []
        i,j = divmod(self.space,self.edge_length)
        if j < self.edge_length - 1:ret.append(0) # 右
        if 0 < j:ret.append(1) # 左
        if i < self.edge_length - 1:ret.append(2) # 下
        if 0 < i:ret.append(3) # 上
        return ret

    def show(self):
        """
        現在の状態をコンソールに表示する.
        """
        for i in range(self.edge_length):
            print(self.board[i*self.edge_length:(i+1)*self.edge_length])

1.Value Iteration

価値反復法です。状態価値関数$V$を推定する手法です。
「ありうる全ての状態で、取りうる全ての行動をやってみよう」という感じの方法です。
ありうる全ての状態に対し可能な行動を全て行い、以下の要領で$V$を推定します。
・ある状態から行動して報酬が発生したら、その状態の価値を上げる。
・ある状態から、価値の高い状態に遷移できるなら、その状態の価値を上げる。
(詳しくはBellman Equationを参照)
これを行うため、全状態の価値を記録するテーブルを用意します。
pythonの実装では、$V$はdictで表現します。
また、どれぐらい価値を上げるかを割引率で指定します。0より大きく1未満の値です。報酬期待値の現在価値を計算するのに使用します、例えば、一手先で報酬1を得られる状態と、十手先で報酬1を得られる状態では前者の方が価値が高いはずです。割引率でこれを表現します。

具体的な方法は以下の実装内のコメントに書いていますので、ご参照ください。
この手法ではまだAgentは登場しません。

1-1.Value Iterationの実装

value_iteration.py
def ValueIteration(gamma=0.95, th=0.01):
    """
    value iterationで全状態の価値評価をする.
    はじめ,すべての状態は価値0とする.
    以下の価値更新処理をすべての状態に行う.何度も繰り返し,更新幅がth未満になったら終了.
    ・終了状態に遷移できたら,その状態の価値を1とする.
    ・遷移可能な状態のうち最大価値に割引率gammaをかけたものをその状態の価値とする.
    繰り返しごとに終了状態に近いものから順に価値が伝搬していくイメージ.
    return:状態価値テーブルV
    """
    actions = list(range(4))
    states = all_solvable_boards() #全状態の列挙
    V = {}
    for state in states:
        V[tuple(state)] = 0 #はじめすべての状態は価値0
    count = 0
    while True:
        delta = 0 # 更新幅
        for state in V:
            if state == (1,2,3,4,5,6,7,8,0):continue #終了状態は対象外
            expected_rewards = [] #各行動の報酬の推定値を記録する. ->1-2. 報酬の推定値について
            env_ = SquarePuzzle(edge_length=3, board=state)
            for action in actions:
                next_state, reward, done, info = env_.step(action,air=True)
                r = reward + gamma * V[next_state]
                expected_rewards.append(r)
            max_reward = max(expected_rewards)
            delta = max(delta, abs(max_reward - V[state]))
            V[state] = max_reward
        count+=1
        if delta < th:# 更新幅が指定値未満なら収束したとみなし処理を終える
            break
    return V

実行するとだいたい1分で収束し、$V$を得られます。
中身を確認すると終了状態に近い状態の価値は高く推定されていることがわかります。
現在の状態から価値が高い状態に遷移するよう行動を選択していけば、8パズルを解くことができます。

1-2. Value Iterationの厳しい点 報酬の推定値について

今回の環境は、行動aを選択したら必ず行動aができるという環境設定ですが、そうでない環境もあります。

たとえばOpenAiにあるFrozenLakeという環境は、ある行動を選択しても地面が凍っているため確率的に違う行動をして意図しない状態に遷移してしまう、という環境です。

こういう場合、行動aの報酬推定値は、行動aを選択した時に遷移しうる状態たちの「遷移確率 × 遷移先状態の価値」の和になります。

ただ、これをするには、行動aにおける遷移先状態たちの(遷移確率,遷移先状態)の情報が必要になります。つまり以下の$T$がわかっている必要があります。
$T(state'|state,action)$:状態stateで行動actionを選択した時、状態state'に遷移する確率

これはなかなか厳しい制約です。(まあ、8パズルでは問題にならないですが、)
サッカーで言えば、たとえば全ての局面においてパスを出してきちんと通る確率がわかっている必要があります。

$T$がわかっている状態での学習をモデルベース学習、そうでないものをモデルフリー学習といいます。
順序が前後した説明になりますが、上記の$T$が環境をモデル化したものというわけです。

2. Q-Learning

遷移関数$T$が未知の状態で学習するべくAgentを登場させます。
Agentに環境を与え、実際に行動をさせます。遷移関数$T$がわからなくても行動すれば状態は遷移し報酬が発生します。その結果から行動価値関数$Q(state,action)$を推定します。

2-1. Qの更新方法 TD誤差

ある状態$s_0$で行動$a_0$を取り、報酬$r_0$を得て状態$s_1$に遷移したとします。
この経験($s_0,a_0,r_0,s_1$)から$Q(s_0,a_0)$の値を以下のように更新します。

$Q(s_0,a_0) ← Q(s_0,a_0) + lr (r_0 + γ max_a(Q(s_1,a))-Q(s_0,a_0))$

$lr:$学習率。TD誤差をどの程度反映させるかを指定します。
$γ:$割引率。報酬期待値の現在価値を計算するのに使用します。

上式右側の二項目がTD誤差(Temporal Difference Error)と呼ばれるものです。
状態$s_0$における行動$a_0$の更新前の評価値$Q(s_0,a_0)$と、行動後の評価値$r_0 + γ max_a(Q(s_1,a))$の誤差です。
これを反映させてQを更新します。

2-2. ε-Greedy法

Agentは一度報酬を得るとその行動をいい方法として学習し、次からその行動を選択するようになります。このとき、__過去の経験から最適とされた行動のみ選択する__ようになったらどうなるでしょうか。
学習した行動が最適な行動なら問題ないですが、それができずに「報酬は得られるが非常に効率の悪い行動」を学習した場合、具合が悪いです。効率が悪いのにその行動を繰り返して、他の最適な行動を探索しようとしません。

このような状況を回避するため、確率的にランダムな行動をとるようにします。これをε-Greedy法といいます。

2-3. Q-Learningの実装

以下が実装です。Agentは学習が済んでいない状況ではランダムな行動をとります。
8パズルはどんな盤面でも26手以内に解けますが、ランダムな行動で解くのは厳しいです。今回の環境では解けた時だけ報酬が発生するので、一度でも解けないと学習が進みません。なので、はじめはランダムな行動でも解ける簡単な盤面を与え、学習ができたら与える盤面を難しくしていきます(カリキュラム学習という手法です)。
また、難易度の割に手数を重ねても解けないという場合、エピソードを打ち切るようにしています。
エピソード:環境の開始から終了までの期間。

q_learning_agent.py
import json
from square_puzzle import SquarePuzzle
from collections import deque,defaultdict
import numpy as np
import pickle
class QLearningAgent:
    """
    Q-Learningのエージェント.
    """
    def __init__(self,epsilon=0.1):
        """
        コンストラクタ.
        epsilon:epsilon-greedy法のepsilon. ここで指定した割合だけ探索的行動をする.
        """
        self.epsilon = epsilon
        self.Q = {}

    def policy(self,state,actions):
        """
        epsilon-greedy法で決定した行動を返す.
        epsilonの割合だけランダムに行動を決める.
        それ以外は過去の経験から算出した価値の高い行動を取る.
        """
        if np.random.random() < self.epsilon:
            return np.random.randint(len(actions))
        else:
            if state in self.Q and sum(self.Q[state]) != 0:
                return np.argmax(self.Q[state])
            else:
                return np.random.randint(len(actions))

    def learn(self,env,episode_count=100,gamma=0.9,learning_rate=0.1,read_model=False):
        """
        学習する.
        簡単な状態から学習するため,シャッフル回数の初期値を3として,十分学習できたら状態の難易度を上げる.
        50回勝率が9割以上になった時にシャッフル回数をプラスする.
        """
        actions = list(range(len(env.actions)))
        self.Q = defaultdict(lambda: [0]*len(actions))
        if read_model:self.read_model()
        shuffle_count = 3
        win_ratio = 0
        reward_ary = deque(maxlen=50)
        self.log_ary = []
        for i in range(episode_count):
            if win_ratio > 0.9:
                reward_ary = deque(maxlen=50)
                shuffle_count+=1
            state = env.reset(shuffle_count)
            done = False
            while not done:
                action = self.policy(state, actions)
                next_state, reward, done, info = env.step(action)
                gain = reward + gamma * max(self.Q[next_state])
                estimated = self.Q[state][action]
                self.Q[state][action] += learning_rate * (gain - estimated)
                state = next_state
                if info["step_count"] >= shuffle_count * 2:break
            reward_ary.append(reward)
            win_ratio = sum(reward_ary)/50
            if i%100==0:print([i,reward,win_ratio,shuffle_count,info["step_count"],len(self.Q)])
        self.save_model()
    
    def output_log(self,):
        """
        log_aryを出力する.
        """
        current_dir = os.path.dirname(__file__)
        with open(os.path.join(current_dir,"q_learning.json"),"w") as f:
            f.write(json.dumps(self.log_ary))

    def save_model(self,):
        """
        Qテーブルを保存する.
        """
        current_dir = os.path.dirname(__file__)
        pickle.dump(dict(self.Q), open(os.path.join(current_dir,"q_table.pkl"),"wb"))
        
    def read_model(self,):
        """
        Qテーブルを読み込む.
        """
        current_dir = os.path.dirname(__file__)
        Q_ = pickle.load(open(os.path.join(current_dir,"q_table.pkl"),"rb"))
        action_length = len(list(Q_.keys())[0])
        self.Q = defaultdict(lambda: [0]*action_length)
        for k in Q_:
            self.Q[k] = Q_[k]

def train():
    agent = QLearningAgent()
    env = SquarePuzzle()
    agent.learn(env, episode_count=150000,gamma=0.95)

if __name__ == "__main__":
    train()

上の実装では15万回Agentにプレイさせています。どんな盤面でも8~9割ぐらいで解けるようになります。

2-4. SARSA

Q-Learningでは経験($s_0,a_0,r_0,s_1$)を得た時、Qを以下で更新していました。
$Q(s_0,a_0) ← Q(s_0,a_0) + lr (r_0 + γ max_a(Q(s_1,a))-Q(s_0,a_0))$

$max_a(Q(s_1,a))$は遷移先状態で最大価値の行動を選択する想定で価値を見積もっているということを意味します。
しかし実際にはAgentはε-Greedy法に基づいて行動を選択するので、最大価値の行動を選択する保証はありません。εの確率でランダムな行動をします。
そこでQの更新方法を変えたパターンを考えてみます。

$policy:state$→ε-Greedy法で選択した行動
$Q(s_0,a_0) ← Q(s_0,a_0) + lr (r_0 + γ Q(s_1,policy(s_1))-Q(s_0,a_0))$

この更新方法の手法をSARSAと言います。
更新に使う価値見積もりにε-Greedy法という戦略に基づいた行動を使っています。
このように戦略に基づいて遷移先状態の価値見積もりを行う方法をOn-Policyと言います。
一方、Q-LearningのようにそうでないものをOff-Policyと言います。

3. NNを使ったQ-Learing

これまで見た手法では状態価値や行動価値をテーブルに記録していました。全パターンの状態、行動がメモリに乗り切る程度の数なら問題ないですが、そうでない場合きついです。状態や行動が連続値を取る場合も同様にきついです。
そこでニューラルネットワーク(NN)で$Q$関数を表現します。
強化学習でよく聞くDQN(Deep Q-Network)とは、ざっくり言えば、「Q-Learningの$Q$関数をDNNで表現し、学習を安定させるためにいろいろ工夫をしたもの」という感じです。

3-1.ExperienceRaplay

行動を選択するたびに得られた経験で$Q$テーブルを更新していましたが、NNを使う場合はこの経験を蓄積してバッチ学習をさせます。

3-2.実装

q_learning_nn_agent.py

import json
import random
from collections import deque,defaultdict,namedtuple
import numpy as np
from square_puzzle import SquarePuzzle
from sklearn.neural_network import MLPRegressor
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder,StandardScaler
import pickle
Experience = namedtuple("Experience",["state", "action", "reward", "next_state", "done"])
class QLearningNnAgent:
    """
    Q-Learningのエージェント.
    NNを使うためにExperience Replay機能を実装する.
    Fixed Target Q-Netwrok.
    """
    def __init__(self,epsilon=0.1,buffer_size=1024,batch_size=64):
        """
        コンストラクタ.
        epsilon:epsilon-greedy法のepsilon. ここで指定した割合だけ探索的行動をする.
        buffer_size:Experience Replayのため,バッファしておく経験の量.
        batch_size:一度の学習で使用する経験のサイズ.
        """
        self.epsilon = epsilon
        self.estimate_probs = False
        self.initialized = False
        self.model = None
        self.buffer_size = buffer_size
        self.batch_size = batch_size
        self.experiences = deque(maxlen=self.buffer_size)
        self.actions = []

    def initialize(self, experiences):
        # 数字をOneHoeエンコーディングする
        ary=[]
        for i in range(9):
            ary.append([(j+i)%9 for j in range(9)])
        enc=OneHotEncoder(sparse=False)
        enc.fit(ary)
        estimator = MLPRegressor(hidden_layer_sizes=(512,512,512,), max_iter=1)
        self.model = Pipeline([("preprocessing", enc), ("estimator", estimator)])

        self.update([experiences[0]], gamma=0)
        self.initialized = True

    def policy(self,state,actions):
        """
        epsilon-greedy法で決定した行動を返す.
        epsilonの割合だけランダムに行動を決める.
        それ以外は過去の経験から算出した価値の高い行動を取る.
        """
        if np.random.random() < self.epsilon or not self.initialized:
            return np.random.randint(len(actions))
        else:
            estimates = self.estimate(state)
            if self.estimate_probs:
                action = np.random.choice(actions,size=1,p=estimates)[0]
                return action
            else:
                return np.argmax(estimates)

    def estimate(self, s):
        """
        self.policy内で使用する.
        """
        estimated = self.model.predict([s])[0]
        return estimated

    def learn(self,env,episode_count=100,gamma=0.9,learning_rate=0.1,render=False,report_interval=50):
        """
        簡単な状態から学習するため,シャッフル回数の初期値を3として,十分学習できたら状態の難易度を上げる.
        50回勝率が9割以上になった時にシャッフル回数をプラスする.
        """
        actions = list(range(len(env.actions)))
        self.actions = actions
        shuffle_count = 3
        win_ratio = 0
        reward_ary = deque(maxlen=50)
        self.log_ary = []
        for i in range(episode_count):
            if win_ratio > 0.9:
                reward_ary = deque(maxlen=50)
                shuffle_count+=1
            state = env.reset(shuffle_count)
            done = False
            action_count = 0
            experiences_ = []
            while not done:
                action = self.policy(state, actions)
                next_state, reward, done, info = env.step(action)
                action_count += 1
                if done:
                    reward = 1
                else:
                    reward = 0
                exp = Experience(state,action,reward,next_state,done)
                self.experiences.append(exp)

                if len(self.experiences) >= self.batch_size and self.initialized == False:
                    self.initialize(self.experiences)

                if len(self.experiences) >= self.batch_size:
                    batch = random.sample(self.experiences, self.batch_size)
                    self.update(batch,gamma)

                state = next_state
                if action_count >= min(shuffle_count*2,100):break
            reward_ary.append(reward)
            win_ratio = sum(reward_ary)/50
            if i%100==0:print([i,reward,win_ratio,shuffle_count,info["step_count"]])
        self.save_model()
        self.output_log()
    
    def output_log(self,):
        """
        log_aryを出力する.
        """
        current_dir = os.path.dirname(__file__)
        with open(os.path.join(current_dir,"q_learning_nn.json"),"w") as f:
            f.write(json.dumps(self.log_ary))

    def save_model(self,):
        """
        モデルを保存する.
        """
        current_dir = os.path.dirname(__file__)
        print(os.path.join(current_dir,"model_q_nn.pkl"))
        pickle.dump(self.model, open(os.path.join(current_dir,"model_q_nn.pkl"),"wb"))

    def read_model(self,):
        """
        モデルを読み込む.
        """
        current_dir = os.path.dirname(__file__)
        self.model = pickle.load(open(os.path.join(current_dir,"model_q_nn.pkl"),"rb"))

    def _predict(self, states):# 各状態における、各行動の価値を予測し、返す。
        """
        学習時に使用する.
        """
        if self.initialized:
            predicteds = self.model.predict(states)
        else:
            size = len(self.actions) * len(states)
            predicteds = np.random.uniform(size=size)
            predicteds = predicteds.reshape((-1, len(self.actions)))
        return predicteds

    def update(self, experiences, gamma):
        """
        学習する.
        """
        states = np.vstack([e.state for e in experiences])
        next_states = np.vstack([e.next_state for e in experiences])

        estimateds = self._predict(states)
        future = self._predict(next_states)

        for i, e in enumerate(experiences):
            reward = e.reward # 報酬を取得
            if not e.done:
                reward += gamma * np.max(future[i]) # 終了状態でない場合、新しい状態の予測価値に割引率をかけたものを加算
            estimateds[i][e.action] = reward # これで予測値を上書き
        estimateds = np.array(estimateds)
        states = self.model.named_steps["preprocessing"].transform(states)
        self.model.named_steps["estimator"].partial_fit(states, estimateds) # 上書きした予測値で学習

def train():
    agent = QLearningNnAgent()
    env = SquarePuzzle()
    agent.learn(env, episode_count=10000)

if __name__ == "__main__":
    train()

多少学習は進みますが、難しい盤面になると解けなくなります。使っているNNの大きさが足りていないのかもしれません。
8パズルは取りうる盤面が18万程度なのでNNを大きくして過学習させれば一応は解けるようにはなるはずですが、あまり意味ないのでやっていません。

3-3.他改善方法

Q-Learning Agentから経験を採取し、それでNNのQ-Leaning Agentを学習させるといったことも可能です。模倣学習という手法です。

4.CNNを使ったQ-Learing

盤面の数字の位置を見て学習できる畳み込み層のネットワーク(CNN)を使えばいい感じになるのではと考えました。

4-1.実装

実装にはkeras-rlを使用します。
8パズルクラスをkeras-rl向けに作り替え、学習時におこなっていた以下の工夫をCallbackクラスおよびProcessorクラスで実装します。
・はじめは簡単な盤面を与え、学習が進み具合で難易度を調整する→Callbackクラスのon_episode_endで実装
・難易度の割になかなか解けない場合、エピソードを打ち切る→Processorクラスのprocess_stepで実装
・パズルの盤面をOneHotエンコーディングする→Processorクラスのprocess_stepで実装
以下、全体の実装です。

square_puzzle_keras.py

from random import randint
import gym.spaces
import copy
import numpy as np
import random
import rl.callbacks
import rl.core
from collections import deque
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation, Flatten, Conv2D
from tensorflow.keras.optimizers import Adam
from rl.agents.dqn import DQNAgent
from rl.policy import EpsGreedyQPolicy,BoltzmannQPolicy
from rl.memory import SequentialMemory

class SquarePuzzleEnv(gym.core.Env):
    """
    正方形パズルクラス
    一辺の長さ=3の場合,8パズルになる. 以下が最終状態. 空白マスは0で表す.
    1 2 3
    4 5 6
    7 8 0
    一辺の長さ=4の場合,15パズルになる.
    """
    def __init__(self,edge_length=3,board=None):
        """
        edge_length:一辺の長さ.
        board:初期状態を指定する場合に使う. マスの配置を一次元化したもの.
        """
        if board is not None:
            assert len(board)==edge_length**2,f"invalid square. edge_length={edge_length} and board={board}"
            self.space = [x for x in range(edge_length ** 2) if board[x]==0][0]
            board = list(board)
        else:
            board=[i+1 for i in range(edge_length**2)]
            board[-1]=0  
            self.space = edge_length ** 2 - 1
        self.edge_length = edge_length
        self.board = board
        self.actions = [[0,1],[0,-1],[1,0],[-1,0]]
        self.n_action = 4
        self.action_space = gym.spaces.Discrete(self.n_action)
        self.observation_space = gym.spaces.Box(low=0, high=1, shape=(self.edge_length,self.edge_length,self.edge_length**2))
        self.step_count = 0
        self.shuffle_count = 100

    def reset(self):
        """
        板を初期化する.
        最終状態からshuffle_count回シャッフルする.shuffle_countが増えるほどパズルの難易度が上がる.
        """
        self.board=[i + 1 for i in range(self.edge_length ** 2)]
        self.board[-1]=0  
        self.space = self.edge_length ** 2 - 1
        self.step_count = 0
        pre_space = -1
        for _ in range(self.shuffle_count):
            i, j = divmod(self.space, self.edge_length)
            di, dj = self.actions[randint(0, len(self.actions) - 1)]
            ni, nj = i + di,j + dj
            if 0 <= ni < self.edge_length and 0 <= nj < self.edge_length and ni * self.edge_length + nj != pre_space:
                self.board[self.space], self.board[ni * self.edge_length+nj] = self.board[ni * self.edge_length + nj], self.board[self.space]
                pre_space = self.space
                self.space = ni * self.edge_length + nj
        return self.get_state()

    def step(self,action):
        """
        行動の結果(状態,報酬,終了フラグ,info)を返す.
        指定の方向へ動かせない場合,状態を変えず返す.
        action:行動.空白マスを動かす方向を表す. {0:右,1:左,2:下,3:上}
        """
        self.step_count += 1
        i,j = divmod(self.space,self.edge_length)
        di,dj = self.actions[action]
        ni,nj = i+di,j+dj
        if 0 <= ni < self.edge_length and 0 <= nj < self.edge_length:
            self.board[self.space],self.board[ni*self.edge_length+nj] = self.board[ni*self.edge_length+nj],self.board[self.space]
            self.space = ni * self.edge_length+nj
        done = all(self.board[i] == (i + 1) % (self.edge_length ** 2) for i in range(self.edge_length ** 2))
        reward = 1 if done else 0
        info = {"step_count":self.step_count,"shuffle_count":self.shuffle_count}
        return self.get_state(), reward, done, info
    
    def get_state(self):
        """
        現在の状態を返す.
        """
        return tuple(self.board)
        
    def get_able_actions(self):
        """
        可能なアクションを返す.
        """
        ret = []
        i,j = divmod(self.space,self.edge_length)
        if j < self.edge_length - 1:ret.append(0) # 右
        if 0 < j:ret.append(1) # 左
        if i < self.edge_length - 1:ret.append(2) # 下
        if 0 < i:ret.append(3) # 上
        return ret

    def show(self):
        """
        現在の状態をコンソールに表示する.
        """
        for i in range(self.edge_length):
            print(self.board[i*self.edge_length:(i+1)*self.edge_length])

    def set_shuffle_count(self,count):
        self.shuffle_count = count

# 難易度を調整するCallback
class ControlCallbak(rl.callbacks.Callback):
    def __init__(self,start_shuffle_count = 3):
        self.reward_history = deque([0]*50,maxlen=50)
        self.shuffle_count = start_shuffle_count

    def on_episode_end(self, episode, logs):
        self.reward_history.append(logs["episode_reward"])
        wp = sum(self.reward_history)/len(self.reward_history)
        if wp > 0.9:
            self.shuffle_count += 1
            self.env.set_shuffle_count(self.shuffle_count)
            self.reward_history = deque([0]*50,maxlen=50)

    def on_train_begin(self,logs):
        self.env.set_shuffle_count(self.shuffle_count)

# エピソードを打ち切るProcessor
class TrainProcessor(rl.core.Processor):
    def process_step(self, observation, reward, done, info):
        if info["step_count"]>2*info["shuffle_count"]:
            done = 1
        observation = self.process_observation(observation)
        return observation, reward, done, info

    def process_observation(self, observation):
        # 盤面をOneHotエンコーディングしテンソルにする
        ret = []
        ary = [0]*len(observation)
        l = int(round(len(observation)**0.5))
        for i in observation:
            ary[i] = 1
            ret.extend(ary.copy())
            ary[i] = 0
        ret = np.array(ret)
        ret = ret.reshape((l,l,l**2))
        return ret

env = SquarePuzzleEnv()
model = Sequential()
model.add(Conv2D(16,(2,2),activation="relu",input_shape=(1,) + env.observation_space.shape))
model.add(Conv2D(32,(2,2),activation="relu"))
model.add(Flatten())
model.add(Dense(512))
model.add(Activation('relu'))
model.add(Dense(env.n_action))
model.add(Activation('linear'))
print(model.summary())

# モデルのコンパイル
pc = TrainProcessor()
memory = SequentialMemory(limit=50000, window_length=1)
policy = EpsGreedyQPolicy(0.1)
dqn = DQNAgent(model=model, nb_actions=env.n_action, memory=memory, nb_steps_warmup=500, target_model_update=1e-2, policy=policy, processor=pc)
dqn.compile(Adam(lr=1e-3), metrics=['mae'])

cc = ControlCallbak()
history = dqn.fit(env, nb_steps=10000, visualize=False, verbose=2, nb_max_episode_steps=300, callbacks=[cc])

結果としては思ったほど学習は改善しませんでした。

参考

6
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
1