はじめに
上の画像のような試験管に入ったバラバラの色を揃えるパズルゲーム。water sort puzzleという名前で、世界的にもそこそこ名の知れているフリーゲームだそうです。
最近、暇つぶしにと思ってアプリを入れてプレイしたのですが思いのほか面白くてハマりました。そこで、完全に趣味なのですが、このパズルを解くソルバーを作ってみました。
ソルバーのアルゴリズムとしてはグラフ探索のA*アルゴリズムを使用します。
実装
方針
試験管クラス(Tube)と、試験管クラスを変数に持つボードクラス(Board)を実装します。
シャッフル済みboardを受け取り、最終状態までの手順を探索するsolv関数を実装します。
A*アルゴリズムなので、各時点における盤面の終点までの推定コストが必要で、それはevaluate_boardという関数に実装しました。
試験管クラスと盤面クラス
Boardの実装はOpenGymのEnvっぽい感じにしています。
Boardには水の移動のためのstepメソッドを実装しています。
class Tube:
def __init__(self,volume,state=[]):
assert len(state)<=volume
self.volume = volume
self.state = state
self.n_space = self.volume - len(self.state)
self.top_color = -1
self.n_top_color = 0
self.calc_parameter()
def calc_parameter(self):
self.n_space = self.volume-len(self.state)
self.top_color = self.state[-1] if self.state else -1
self.n_top_color = 0
for i in range(len(self.state)):
if self.state[-1]==self.state[-1-i]:
self.n_top_color = i+1
else:
break
def pop(self):
water = []
for _ in range(self.n_top_color):
water.append(self.state.pop())
self.calc_parameter()
return water
def push(self,water):
assert len(water)>0 and all([water[0]==water[i] for i in range(len(water))])
if self.top_color!=-1 and self.top_color != water[0]:return water
while water and len(self.state)<self.volume:
self.state.append(water.pop())
self.calc_parameter()
return water
def push_no_check(self,water):
assert self.n_space >= len(water)
while water:
self.state.append(water.pop())
self.calc_parameter()
def copy(self):
return Tube(self.volume,self.state[:])
class Board:
def __init__(self,n_tube,n_free_tube=2,volume=4,step_count=0):
self.n_tube = n_tube
self.n_free_tube = n_free_tube
self.n_color = self.n_tube - self.n_free_tube
self.volume = volume
self.tubes = [Tube(self.volume) for _ in range(self.n_tube)]
self.step_count = step_count
self.pre_board = None
self.score = 0
def set_tubes(self,tubes=None,water=None):
if tubes is not None:
assert len(tubes)<=self.n_tube
assert tubes[0].volume == self.volume
self.tubes = tubes
elif water is not None:
assert type(water) is list or type(water) is tuple
assert type(water[0]) is list or type(water[0]) is tuple
self.tubes = []
for i,w in enumerate(water):
assert len(w)<=self.volume
self.tubes.append(Tube(self.volume,w))
def reset(self):
water = []
for i in range(self.n_color):
water.extend([i]*self.volume)
shuffle(water)
self.tubes = []
for i in range(self.n_tube-self.n_free_tube):
s=i*self.volume
e=(i+1)*self.volume
self.tubes.append(Tube(self.volume,water[s:e]))
for _ in range(self.n_free_tube):
self.tubes.append(Tube(self.volume))
self.step_count = 0
def step(self,action):
self.step_count += 1
i,j=divmod(action,self.n_tube)
# i->jへ水を移動させる
water=self.tubes[i].pop()
water=self.tubes[j].push(water)
if water:
self.tubes[i].push_no_check(water)
info={
"step_count":self.step_count,
}
done=self.is_done()
reward=1 if done else 0
observation = self.get_observation()
return reward,observation,done,info
def is_done(self):
return all(tube.n_top_color in {0,self.volume} for tube in self.tubes)
def get_observation(self):
return self.tubes
def get_able_actions(self):
actions = []
use_free = 0
for i,tubei in enumerate(self.tubes):
if tubei.n_top_color==self.volume:continue
for j,tubej in enumerate(self.tubes):
if i==j:continue
if tubej.n_top_color==self.volume:continue
# j->iへの水の移動は可能か
if tubei.top_color==tubej.top_color!=-1 and tubei.n_space>0:
actions.append(j*self.n_tube + i)
elif tubei.top_color==-1 and tubej.top_color!=-1 and tubej.n_top_color!=tubej.n_space and use_free==0:
actions.append(j*self.n_tube + i)
if tubei.top_color==-1:use_free=1
return actions
def set_score(self,score):
self.score = score
def set_pre_board(self,board):
self.pre_board = board
def flatten(self):
ary = [self.tubes[i].state + [-1]*self.volume for i in range(self.n_tube)]
value=[]
for x in ary:
value+=x[:self.volume]
return value
def hash_value(self):
value = self.flatten()
value = "".join(list(map(str,value)))
return value
def copy(self):
ret = Board(self.n_tube,self.n_free_tube,self.volume,self.step_count)
ret.set_tubes([tube.copy() for tube in self.tubes])
return ret
def __eq__(self,other):
return self.score==other.score
def __ge__(self,other):
return self.score>=other.score
def __gt__(self,other):
return self.score>other.score
def __le__(self,other):
return self.score<=other.score
def __lt__(self,other):
return self.score<other.score
ソルバー
次にA*アルゴリズムの肝であるヒューリスティック関数とそれを使ったソルバーを実装します。
以下の推定コスト関数は場合によっては実際のコストよりも大きな値を返すので、残念ながら最短手を返す保証はないです。任意の盤面で実際コスト以下の推定コストを返す関数を作れれば最短を返すソルバーが作れます。
def evaluate_board(board):
observation=board.get_observation()
n_switch_color = 0 #
n_sorted_tube = 0 # 揃っている試験管の本数
n_free_tube = 0 # 空試験管の本数
for i,tubei in enumerate(observation):
for j in range(len(tubei.state)-1):
if tubei.state[j]!=tubei.state[j+1]:
n_switch_color += 1
if tubei.n_top_color==len(tubei.state)==board.volume:
n_sorted_tube += 1
if tubei.top_color==-1:
n_free_tube += 1
score = board.n_tube + n_switch_color - n_sorted_tube - n_free_tube
#score = board.n_tube - n_sorted_tube - n_free_tube
return score + board.step_count
def solv(board):
q=[]
heappush(q,board)
searched_board=set()
searched_board.add(board.hash_value())
while q:
try:
board=heappop(q)
actions=board.get_able_actions()
for action in actions:
board_=board.copy()
reward,observation,done,info=board_.step(action)
h = board_.hash_value()
if h in searched_board:continue
searched_board.add(h)
board_.set_pre_board(board)
if reward == 1:
info={"reward":1,"step_count":info["step_count"],"searched_board":len(searched_board)}
return board_,info
score = evaluate_board(board_)
board_.set_score(score)
heappush(q,board_)
except KeyboardInterrupt:
break
info={"reward":0,"step_count":info["step_count"],"searched_board":len(searched_board)}
return board,info
試験管が24本ぐらいまでなら軽く解けます。
簡単な可視化
GIFにして、解いていく様子を観察します。
上のような初期状態なら、下のように解けます。
試験管の数が増えても解けます
以下、可視化のコード。Boardクラスを渡して初期状態から現在の状態までのgifを保存します。
色をmatplotlibのcolor="C0","C1",..で指定している関係上、試験管の数が10を超えると可視化した時の色が被るので注意。
def get_tube_position(n_tube,height,width):
image_size=(width,height)
x_list=[
[1+i*8 for i in range(8)],
[4+i*8 for i in range(7)]
]
for i in range(len(x_list)):
x_list[i].sort(key=lambda x:abs(x-width/2))
y_list=[[height//2],[height//2-height//5,height//2+height//5],[height//2-height//4,height//2,height//2+height//4]]
tube_width=5
if n_tube<=6:# 1行
tube_height=int(height*0.6)
t=n_tube
ary=[t]
tube_position=[]
for y,t in zip(y_list[1-1],ary):
for x in x_list[t%2][:t]:
tube_position.append([x,y])
elif n_tube<=16:# 2行
tube_height=int(height*0.7/2)
s=n_tube%2
t=n_tube//2
if s==0:ary=[t,t]
else:ary=[t,t+1]
tube_position=[]
for y,t in zip(y_list[2-1],ary):
for x in x_list[t%2][:t]:
tube_position.append([x,y])
elif n_tube<=24:# 3行
tube_height=int(height*0.7/3)
s=n_tube%3
t=n_tube//3
if s==0:ary=[t,t,t]
elif s==1:ary=[t,t+1,t]
else:ary=[t+1,t,t+1]
tube_position=[]
for y,t in zip(y_list[3-1],ary):
for x in x_list[t%2][:t]:
tube_position.append([x,y])
else:
raise Exception("not supported.")
tube_position.sort(key=lambda x:x[0])
tube_position.sort(key=lambda x:x[1],reverse=True)
return tube_position,tube_height,tube_width
def save_solv_gif(board,fullpath):
height,width=64,64
tube_position,tube_height,tube_width=get_tube_position(board.n_tube,height,width)
fig, ax = plt.subplots(figsize=(4,4))
ax.set_xticks([0,64])
ax.set_yticks([0,64])
image_ary=[]
while board is not None:
for tube,position in zip(board.tubes,tube_position):
# 描画
x,y=position
x-=tube_width//2
y-=tube_height//2
water_height=tube_height//board.volume
# 水
for i,color in enumerate(tube.state):
r=patches.Rectangle(xy=(x,y + i*water_height),width=tube_width,height=water_height,color=f"C{color}")
ax.add_patch(r)
# 試験管の輪郭
ax.plot([x,x+tube_width],[y,y],color='gray')
ax.plot([x,x],[y,y+tube_height],color='gray')
ax.plot([x+tube_width,x+tube_width],[y,y+tube_height],color='gray')
ax.axes.xaxis.set_visible(False)
ax.axes.yaxis.set_visible(False)
fig.canvas.draw()
data=fig.canvas.tostring_rgb()
w,h=fig.canvas.get_width_height()
img=Image.frombytes("RGB",(w,h),data,"raw")
image_ary.append(img)
ax.cla()
board=board.pre_board
image_ary.reverse()
duration=[1]*len(image_ary)
image_ary[0].save(fullpath,save_all=True, append_images=image_ary[1:], optimize=True, duration=duration, loop=0)
image_ary[0].save(fullpath.replace(".gif",".jpg"))
plt.clf()
plt.close()
その他
・一度探索した盤面をハッシュ化して再び探索しないようにしていますが、やり方がダサい。
・プレイ画面から画像認識で盤面の初期状態の設定を自動でできたらいいなと思います。思っているだけです。
・A*の他、BFSとDFSで試しましたが、DFSが一番早くて驚きました。A*の50~80%の時間で探索できます。ただDFSで得た経路は最短でないことがかなり多いです。一方BFSはA*の50倍ぐらいの時間がかかりますますが、得た経路が必ず最短であり、A*は今回の設定では最短の保証はありませんが、ほとんどのケースで最短を返します。