0. はじめに
自粛期間中に競技プログラミングにすっかりはまり、日々データ構造やアルゴリズムの勉強に勤しんでおります。
グラフ探索系のアルゴリズムといえば、AtCoderでは幅優先探索(BFS)、ダイクストラ法が頻出ですが、他にもさまざまなアルゴリズムがあります。整理のため8パズルを題材に各アルゴリズムの概要を解説していきたいと思います。
間違いや実装のよくない点などございましたら、ご指摘いただけますと幸いです。
0-1. この記事でやること
プログラムで8パズルを作り、それを解くsolverを作成する形で各アルゴリズムを整理、比較します。
扱うアルゴリズムは幅優先探索(BFS)、A*アルゴリズム、IDDFS、IDA*です。
BFSについては他のアルゴリズムの導入として軽く触る程度です。
この記事ではBFS、DFSおよびグラフや木の知識は前提とさせていただきます。
使用する言語はpython3です。記事内にpythonのコードを載せていますが、コードは読み飛ばしてもらっても大丈夫かと思います。わかりにくいところがあれば教えてください。
コードはgithubに載せています。
とりあえず以下が各アルゴリズムのざっくりまとめです。
アルゴリズム | |
---|---|
BFS | 始点に近いところから順に探索する。実装にはキューを使うのが一般的 |
A*アルゴリズム | BFSやダイクストラ法の知識あり探索版。終点までの推定コストで探索するノードに優先順位をつける。ダイクストラ同様、実装にはヒープを使う。ある条件を満たさないと最短経路を返す保証はない |
IDDFS | 深さに制限をつけてDFSする。終点が見つからなければ、制限を緩めて再度DFSする。使用メモリが深さに対して線形になるのが特徴 |
IDA* | IDDFSの知識あり探索版 |
0-2. まず、8パズルって?
スライドパズルとも呼ばれます。以下の画像のようなパズルのことです。画像のものは4x4の15パズルです。これのひとつ小さい3x3のものは8パズルと呼ばれます。
正方形に並んだマスのうち一つだけ空白マスがあり、それを利用して数字を順番に並べるパズルです。
8パズルは取りうる盤面を頂点としたグラフとみなし、グラフ探索のアルゴリズムで解くことができます。
例えば以下の盤面は、マスのスライドによって遷移できるので、隣接している頂点とみなします。
この記事では以下の状態を8パズルのゴールとして進めていきます。
ちなみに8パズルは取りうる盤面の状態が18万程度(9の階乗割る2)で、どの盤面でも31手以内に解けるのでプログラムで扱いやすいです。これが15パズルになると状態の数がめちゃくちゃ増えて(16の階乗割る2)、扱うのが大変です。
0-3. 8パズルのプログラム
8パズル、15パズルを正方形パズルに一般化した正方形パズルクラスです。
状態として保持するのは、盤面を表す変数(board)と、正方形の一辺の長さ(edge_length)などです。なお、boardは盤面を一次元化したものとし、空白マスは0で表します。
メソッドは盤面を動かすstep、盤面をシャッフルするresetなどがあります。
(のちに「強化学習で8パズル」という記事を書く予定なので、強化学習で使うenvironment風の実装にしています)
from random import randint
class SquarePuzzle:
"""
正方形パズルクラス
一辺の長さ=3の場合,8パズルになる. 以下が最終状態. 空白マスは0で表す.
1 2 3
4 5 6
7 8 0
一辺の長さ=4の場合,15パズルになる.
"""
def __init__(self,edge_length=3,board=None):
"""
コンストラクタ.
edge_length:一辺の長さ.
board:初期状態を指定する場合に使う. マスの配置を一次元化したもの.
"""
if board is not None:
assert len(board)==edge_length**2,f"invalid square. edge_length={edge_length} and board={board}"
self.space = [x for x in range(edge_length ** 2) if board[x]==0][0]
board = list(board)
else:
board=[i+1 for i in range(edge_length**2)]
board[-1]=0
self.space = edge_length ** 2 - 1
self.edge_length = edge_length
self.board = board
self.actions = [[0,1],[0,-1],[1,0],[-1,0]]
self.step_count = 0
def reset(self,shuffle_count=100):
"""
板を初期化する.
最終状態からshuffle_count回シャッフルする.shuffle_countが増えるほどパズルの難易度が上がる.
"""
self.board=[i + 1 for i in range(self.edge_length ** 2)]
self.board[-1]=0
self.space = self.edge_length ** 2 - 1
self.step_count = 0
pre_space = -1
for _ in range(shuffle_count):
i, j = divmod(self.space, self.edge_length)
di, dj = self.actions[randint(0, len(self.actions) - 1)]
ni, nj = i + di,j + dj
if 0 <= ni < self.edge_length and 0 <= nj < self.edge_length and ni * self.edge_length + nj != pre_space:
self.board[self.space], self.board[ni * self.edge_length+nj] = self.board[ni * self.edge_length + nj], self.board[self.space]
pre_space = self.space
self.space = ni * self.edge_length + nj
return tuple(self.board)
def step(self,action,air=False):
"""
行動の結果(状態,報酬,終了フラグ,info)を返す.
指定の方向へ動かせない場合,状態を変えず返す.
action:行動.空白マスを動かす方向を表す. {0:右,1:左,2:下,3:上}
air:状態を変えずに行動の結果を取得したい場合Trueにする.
"""
if not air:self.step_count += 1
i,j = divmod(self.space,self.edge_length)
di,dj = self.actions[action]
ni,nj = i+di,j+dj
if air:
board_ = self.board.copy()
else:
board_ = self.board
if 0 <= ni < self.edge_length and 0 <= nj < self.edge_length:
board_[self.space],board_[ni*self.edge_length+nj] = board_[ni*self.edge_length+nj],board_[self.space]
if not air:self.space = ni * self.edge_length+nj
done = all(board_[i] == (i + 1) % (self.edge_length ** 2) for i in range(self.edge_length ** 2))
reward = 1 if done else 0
info = {"step_count":self.step_count}
return tuple(board_), reward, done, info
def get_state(self):
"""
現在の状態を返す.
"""
return tuple(self.board)
def get_able_actions(self):
"""
可能なアクションを返す.
"""
ret = []
i,j = divmod(self.space,self.edge_length)
if j < self.edge_length - 1:ret.append(0) # 右
if 0 < j:ret.append(1) # 左
if i < self.edge_length - 1:ret.append(2) # 下
if 0 < i:ret.append(3) # 上
return ret
def show(self):
"""
現在の状態をコンソールに表示する.
"""
for i in range(self.edge_length):
print(self.board[i*self.edge_length:(i+1)*self.edge_length])
1. 幅優先探索(BFS)
言わずもがなのBFSです。始点に近いノードから順にすべて探索していきます。探索は終点にたどり着いたら終了です。インスタンス化した8パズルオブジェクトを与えると、最終状態までの行動の配列を返します。
from collections import deque
def bfs_solver(env,logger=None):
"""
幅優先探索で解く.
env:square puzzleインスタンス.
logger:loggerインスタンス.使わない場合はNone.
-> return:解けたかどうかのフラグ,初期状態から最終状態までのaction配列,探索回数.
"""
def log(message):
if logger is not None:
logger.info(message)
board = env.get_state()
# はじめから最終状態のときの処理
if all(board[i]==(i+1)%(len(board)) for i in range(len(board))):
return True,[],0
node = Node(board,pre=None,action=None)
dist = {}
dist[board] = node
q = deque()
q.append(node)
log("search start.")
search_count = 0
end_node = None
while end_node is None and q:
search_count += 1
if search_count % 1000 == 0:log(f"..search states = {search_count}")
node = q.popleft()
env_ = SquarePuzzle(env.edge_length,node.board)
for action in env_.get_able_actions():
next_state, reward, done, info = env_.step(action,air=True)
next_node = Node(next_state,node,action)
if done:
end_node = next_node
break
if next_state in dist:continue
dist[next_state] = next_node
q.append(next_node)
log("search end.")
if end_node is None:
log("no answer.")
return False,[],search_count
node = end_node
actions = []
while node.pre is not None:
actions.append(node.action)
node = node.pre
actions.reverse()
return True,actions,search_count
8パズルの盤面をノードとして扱うため、Nodeというクラスを定義しています。上のbfs_solverではこのNodeを頂点として探索を行なっています。Nodeクラスは始点からの経路ノードをpre変数に全て保持しています。
class Node():
"""
盤面をノード化する.
"""
def __init__(self,board,pre=None,action=None):
"""
コンストラクタ.
board:盤面.
pre:親ノード.
action:親ノードからこのノードへ遷移する時の行動.
"""
self.board = board
self.edge_length = int(len(board)**0.5)
self.pre = pre
self.action = action
self.cost = pre.cost + 1 if pre is not None else 0 # 初期状態からこのノードまでの実コスト
BFSを実行します。
if __name__=='__main__':
import time
env = SquarePuzzle(edge_length=3)
env.reset()
start = time.time()
flg,actions,search_count = bfs_solver(env)
process_time = time.time() - start
for action in actions:
env.show()
env.step(action)
print("↓")
env.show()
print(f"hands={len(actions)}, search_count={search_count}, process_time = {process_time}")
hands=26, search_count=142053, process_time = 4.499105215072632
手数26回で解ける盤面なら探索盤面数14万、処理時間4.5秒ぐらいで解けます。
ちなみにですが、このbfsの実装はあまり速くないです。いちいち8パズルをインスタンス化したり、%演算で探索回数のログを取ったりと、遅くなる箇所がいくつかあります。面倒なので放置しています。
##1-1. BFSの探索イメージ
BFSの探索イメージ図です。始点から近いところを順にすべて見ています。
一手の段の一番左と一番右の盤面を見てください。なんとなく「改善できるんじゃね?」と思わないでしょうか。パッと見た感じ、右の盤面の方が終了状態に近いですから、こっちを優先して探索した方が効率が上がりそうな気がしないでしょうか。
次に紹介するA*アルゴリズムでは、こういう考えが取り入れられています。
2. A*アルゴリズム
A*アルゴリズムは「エー スター アルゴリズム」と読みます。
各ノードについて終点までの推定距離(終点までどれぐらい距離がありそうか)を計算して、始点からの実コストとの和で、経路長の推定値を出します。この推定値が小さいものから優先して探索します。
A*アルゴリズムのために、盤面から終点までの推定コストを算出する関数$h$を定義します。
$h$:状態 -> 終点までの推定コスト
$h$ をどういうものにするかは作成者の判断になります。的外れな関数にするとBFSより却って遅くなりますし、いい感じにするとめちゃくちゃ速くなります。
8パズルでは各マスの最終状態のマス位置までマンハッタン距離の和を$h$とするのが一般的ですので、ここでもそれを実装します。
この$h$をNodeに実装し、経路長の推定値で比較できるようにします。
class Node():
"""
盤面をノード化する.
ノード同士を比較できるよう比較演算用関数をつける.
比較する値scoreは,スタートから自ノードまでの実コスト+自ノードからゴールまでの推定コスト.
"""
def __init__(self,board,pre=None,action=None):
"""
コンストラクタ.
board:盤面.
pre:親ノード.
action:親ノードからこのノードへ遷移する時の行動.
"""
self.board = board
self.edge_length = int(len(board)**0.5)
self.pre = pre
self.action = action
self.cost = pre.cost + 1 if pre is not None else 0 # 初期状態からこのノードまでの実コスト
self.heuristic = self._get_heuristic() # boardからゴールまでの推定コスト(ヒューリスティック値)
self.score = self.heuristic + self.cost
def _get_heuristic(self):
"""
最終状態までの推定コストを返す.
"""
ret=0
for i in range(self.edge_length):
for j in range(self.edge_length):
t = self.board[i * self.edge_length+j] - 1
t = len(self.board) - 1 if t == -1 else t
ti,tj = divmod(t, self.edge_length)
ret += abs(i - ti) + abs(j - tj)
return ret
# Node比較用関数
def __le__(self,other):
return self.score <= other.score
def __ge__(self,other):
return self.score >= other.score
def __lt__(self,other):
return self.score < other.score
def __gt__(self,other):
return self.score > other.score
def __eq__(self,other):
return self.score == other.score
pythonの補足。上のコードのle,geみたいな特定の名前でクラスメソッドを実装すれば自作クラスで比較演算ができるようになります。
2-1. A*アルゴリズムの実装
実装には優先度付きキューを使います。
以下がA*アルゴリズムsolverです。BFSではキューを使っていましたがA*では優先度付きキューを使用しています。終点までの推定コスト算出と比較をNodeクラスに任せているので、BFSの実装とほとんど変わりません。
from heapq import heappop, heappush
def A_star_solver(env,logger=None):
"""
A starアルゴリズムで解く.
env:square puzzleインスタンス.
logger:loggerインスタンス.
-> return:解けたかどうかのフラグ,初期状態から最終状態までのaction配列,探索回数.
"""
def log(message):
if logger is not None:
logger.info(message)
board = env.get_state()
# はじめから最終状態のときの処理
if all(board[i]==(i+1)%(len(board)) for i in range(len(board))):
return True,[],0
node = Node(board,pre=None,action=None)
dist = {}
dist[board] = node
q = []
heappush(q,node)
log("search start.")
search_count = 0
end_node = None
while end_node is None and q:
search_count += 1
if search_count % 1000 == 0:log(f"..search states = {search_count}")
node = heappop(q)
env_ = SquarePuzzle(env.edge_length,node.board)
for action in env_.get_able_actions():
next_state, reward, done, info = env_.step(action,air=True)
next_node = Node(next_state,pre=node,action=action)
if done:
end_node = next_node
break
if next_state in dist and dist[next_state]<=next_node:continue
dist[next_state] = next_node
heappush(q,next_node)
log("search end.")
if end_node is None:
log("no answer.")
return False,[],search_count
node = end_node
actions = []
while node.pre is not None:
actions.append(node.action)
node = node.pre
actions.reverse()
return True,actions,search_count
実行結果は以下です。BFSより速くなります。
hands=26, search_count=50382, process_time = 1.8390610218048096
ピンと来た方もいると思いますが、推定関数$h$が常に0を返す関数なら、これはダイクストラと一致します。
つまりA*アルゴリズムはダイクストラを知識あり探索版に拡張したものということになります。
2-2. A*アルゴリズムは最短経路を返すか
一般にA*アルゴリズムは最短経路を返す保証はありません。たとえば以下のような$h$をめちゃくちゃなものに設定した例では最短経路を返しません。
最短経路はA->B->D->Eですが、Bから終点への推定コストが10000とめちゃくちゃな値になっているので、A*アルゴリズムはBの探索を後回しにし、結局A->C->D->Eの経路を返します。
最短経路を返すのは、以下の条件を満たす場合です。(ダイクストラ法はこの条件を満たします)
$h'(n):$ノード$n$から終点までのコスト
$h(n):$ノード$n$から終点までの推定コスト
任意のノード$n$について、$h(n)<=h'(n)$
2-3. A*アルゴリズムの問題点
BFS、ダイクストラの問題点でもありますが、メモリをめちゃくちゃ使います。幸い、8パズルでは取り得る状態の総数が18万程度なのでなんとかなりますが、これが15パズルになると状態の総数がめちゃくちゃ増え、BFSもA*アルゴリズムも歯が立ちません。
次に説明するIDDFSを使うと、なんとかこの問題を回避することができます。
3. IDDFS
IDDFSはiterative deepening depth-first searchの略です。「反復深化深さ優先探索」という和訳が当てられています。
雑に説明すると以下のようになります。
深さに制限を課した始点からDFSを行い、終点を探索する。終点が見つかれば終わり、見つからなければ深さ制限をひとつゆるくして再度始点からDFSを行う。これを終点が見つかるまで行う。
探索イメージは以下のようになります。
以上を終点が見つかるまで深さを大きくしながら繰り返します。
始点付近のノードは何度も探索することになります。さらに、グラフが木でない場合、以下の赤線のような効率のよくない探索も発生したりします。
「効率悪くない?」「素直にBFSすればよくない?」と思った方もいるかと思います。実際、BFSより探索時間は長くなります。しかし使用メモリを考えるとBFSやA*アルゴリズムよりもはるかに小さいメモリで済みます。
例えば、各ノードの平均子ノード数が$b$のグラフの探索を考えます。このグラフの深さ$d$まで探索しているとき、使用するメモリはBFS、A*だと$O(d^b)$になりますが、IDDFSでは$O(db)$で済みます。
BFS、A*で15パズルを解こうとするとメモリエラーになります。一方、IDDFSでは時間はかかりますが解くことができます。
通常のDFSとの違いで言えば、通常のDFSははじめに終点にたどり着いた経路が最短であるという保証はありませんが、IDDFSの場合、最短の保証があります。
他にもゲーム木などでもよく使われるようです。
3-1. IDDFSの実装
深さ(=始点からのコスト)に制限を設けDFSし、終点に辿り着かなかったら打ち切り、制限を緩くして再度DFSします。
深さはNodeが持っている始点からの実コスト(cost)を使います。これがthを超えたらそのノードの探索を打ち切ります。終点に辿り着くまで、thをすこしずつ増やしていきます。
def IDDFS_solver(env,logger=None):
"""
IDDFSアルゴリズムで解く.
env:square puzzleインスタンス.
logger:loggerインスタンス.使わない場合はNone.
-> return:解けたかどうかのフラグ,初期状態から最終状態までのaction配列,探索回数.
"""
def log(message):
if logger is not None:
logger.info(message)
board = env.get_state()
# はじめから最終状態のときの処理
if all(board[i]==(i+1)%(len(board)) for i in range(len(board))):
return True,[],0
log("search start.")
th = 0
search_count = 0
end_node = None
while end_node is None and th <= 31:
th += 1
board = env.get_state()
node = Node(board,pre=None,action=None)
stc = []
stc.append(node)
while end_node is None and stc:
search_count += 1
if search_count % 1000 == 0:log(f"..search count = {search_count}. th = {th}.")
node = stc.pop()
env_ = SquarePuzzle(env.edge_length,node.board)
for action in env_.get_able_actions():
next_state, reward, done, info = env_.step(action,air=True)
next_node = Node(next_state,pre=node,action=action)
if done:
end_node = next_node
break
if node.pre is not None and next_state == node.pre.board:continue
if next_node.cost > th:continue
stc.append(next_node)
log("search end.")
if end_node is None:
log("no answer.")
return False,[],search_count
node = end_node
actions = []
while node.pre is not None:
actions.append(node.action)
node = node.pre
actions.reverse()
return True,actions,search_count
以下が実行結果です。25手で解ける盤面に対し、探索状態数が356万、処理時間が117秒です。
hands=25, search_count=3568906, process_time = 117.46405720710754
4. IDA*
iterative deepening A*です。IDDFSの知識あり探索版に拡張したものです。
IDDFSは始点からのコストに上限を設け、探索を打ち切っていましたが、IDA*は推定経路長に上限を設け、探索を打ち切ります。
4-1. IDA*の実装
実装はIDDFSとほとんど変わりませんので変更箇所だけ載せます。Nodeが持っている推定経路長(score)がthを超えたら、そのノードの探索を打ち切ります。
if next_node.score > th:continue # IDDFSでは「if next_node.cost > th:continue」だった
以下が実行結果です。25手で解ける盤面に対し、探索状態数が35万、処理時間が11.8秒です。IDDFSからかなり改善されました。
hands=25, search_count=347708, process_time = 11.77236795425415
5. それぞれのアルゴリズムの比較
同じ盤面に対して各ソルバーを実行してみます。
使用メモリを調べるためmemory_profilerを使います。
if __name__=='__main__':
"""
各ソルバーを比較する.
"""
import time
env = SquarePuzzle(edge_length=3)
env.reset()
board = env.board
from solver.bfs_solver import bfs_solver
from solver.a_star_solver import A_star_solver
from solver.iddfs_solver import IDDFS_solver
from solver.id_a_star_solver import ID_A_star_solver
solvers = (bfs_solver,A_star_solver,IDDFS_solver,ID_A_star_solver)
from memory_profiler import memory_usage
import numpy as np
for solver in solvers:
env = SquarePuzzle(edge_length=3 ,board=board)
start = time.time()
flg,actions,search_count = solver(env)
process_time = time.time() - start
mem = memory_usage((solver,(env,)))
print(f"{solver.__name__}:hands={len(actions)}, search_count={search_count}, process_time = {process_time}, memory = {np.mean(mem)}")
bfs_solver:hands=20, search_count=32174, process_time = 1.192046880722046, memory = 57.51255580357143
A_star_solver:hands=20, search_count=6211, process_time = 0.2990536689758301, memory = 56.347005208333336
IDDFS_solver:hands=20, search_count=232156, process_time = 8.695612907409668, memory = 56.36328125
ID_A_star_solver:hands=20, search_count=34054, process_time = 1.0756721496582031, memory = 56.374441964285715
A*アルゴリズムが最も速く、IDDFSが最も時間がかかっています。
次にA*とIDA*に15パズルを解かせてみます。ただ、あまりに難しい盤面だと時間がかかりますので、簡単な盤面で試します。
A_star_solver:hands=21, search_count=227512, process_time = 15.191603899002075, memory = 165.33802480916032
ID_A_star_solver:hands=21, search_count=508032, process_time = 24.561185121536255, memory = 102.73046875
実行時間はA*の方が短いですが、使用メモリはIDA*の方が少なく済んでいます。
まとめ
8パズルを題材にグラフ探索のアルゴリズムを見ていきました。
次は8パズルを題材に強化学習をする記事を書きたいと思います。