LoginSignup
6
6

More than 1 year has passed since last update.

広告でよく見る試験管パズルゲームをプログラムで解いてみた

Last updated at Posted at 2022-04-08

はじめに

img.jpg

上の画像のような試験管に入ったバラバラの色を揃えるパズルゲーム。water sort puzzleという名前で、世界的にもそこそこ名の知れているフリーゲームだそうです。
最近、暇つぶしにと思ってアプリを入れてプレイしたのですが思いのほか面白くてハマりました。そこで、完全に趣味なのですが、このパズルを解くソルバーを作ってみました。
ソルバーのアルゴリズムとしてはグラフ探索のA*アルゴリズムを使用します。

実装

方針

試験管クラス(Tube)と、試験管クラスを変数に持つボードクラス(Board)を実装します。
シャッフル済みboardを受け取り、最終状態までの手順を探索するsolv関数を実装します。
A*アルゴリズムなので、各時点における盤面の終点までの推定コストが必要で、それはevaluate_boardという関数に実装しました。

試験管クラスと盤面クラス

Boardの実装はOpenGymのEnvっぽい感じにしています。
Boardには水の移動のためのstepメソッドを実装しています。

.py
class Tube:
    def __init__(self,volume,state=[]):
        assert len(state)<=volume
        self.volume = volume
        self.state = state
        self.n_space = self.volume - len(self.state)
        self.top_color = -1
        self.n_top_color = 0
        self.calc_parameter()

    def calc_parameter(self):
        self.n_space = self.volume-len(self.state)
        self.top_color = self.state[-1] if self.state else -1
        self.n_top_color = 0
        for i in range(len(self.state)):
            if self.state[-1]==self.state[-1-i]:
                self.n_top_color = i+1
            else:
                break

    def pop(self):
        water = []
        for _ in range(self.n_top_color):
            water.append(self.state.pop())
        self.calc_parameter()
        return water

    def push(self,water):
        assert len(water)>0 and all([water[0]==water[i] for i in range(len(water))])
        if self.top_color!=-1 and self.top_color != water[0]:return water
        while water and len(self.state)<self.volume:
            self.state.append(water.pop())
        self.calc_parameter()
        return water
    
    def push_no_check(self,water):
        assert self.n_space >= len(water)
        while water:
            self.state.append(water.pop())
        self.calc_parameter()
    
    def copy(self):
        return Tube(self.volume,self.state[:])

class Board:
    def __init__(self,n_tube,n_free_tube=2,volume=4,step_count=0):
        self.n_tube = n_tube
        self.n_free_tube = n_free_tube
        self.n_color = self.n_tube - self.n_free_tube
        self.volume = volume
        self.tubes = [Tube(self.volume) for _ in range(self.n_tube)]
        self.step_count = step_count
        self.pre_board = None
        self.score = 0
    
    def set_tubes(self,tubes=None,water=None):
        if tubes is not None:
            assert len(tubes)<=self.n_tube
            assert tubes[0].volume == self.volume
            self.tubes = tubes
        elif water is not None:
            assert type(water) is list or type(water) is tuple
            assert type(water[0]) is list or type(water[0]) is tuple
            self.tubes = []
            for i,w in enumerate(water):
                assert len(w)<=self.volume
                self.tubes.append(Tube(self.volume,w))
                
    def reset(self):
        water = []
        for i in range(self.n_color):
            water.extend([i]*self.volume)
        shuffle(water)
        self.tubes = []
        for i in range(self.n_tube-self.n_free_tube):
            s=i*self.volume
            e=(i+1)*self.volume
            self.tubes.append(Tube(self.volume,water[s:e]))
        for _ in range(self.n_free_tube):
            self.tubes.append(Tube(self.volume))
        self.step_count = 0

    def step(self,action):
        self.step_count += 1
        i,j=divmod(action,self.n_tube)
        # i->jへ水を移動させる
        water=self.tubes[i].pop()
        water=self.tubes[j].push(water)
        if water:
            self.tubes[i].push_no_check(water)
        info={
            "step_count":self.step_count,
        }
        done=self.is_done()
        reward=1 if done else 0
        observation = self.get_observation()
        return reward,observation,done,info
   
    def is_done(self):
        return all(tube.n_top_color in {0,self.volume} for tube in self.tubes)

    def get_observation(self):
        return self.tubes
        
    def get_able_actions(self):
        actions = []
        use_free = 0
        for i,tubei in enumerate(self.tubes):
            if tubei.n_top_color==self.volume:continue
            for j,tubej in enumerate(self.tubes):
                if i==j:continue
                if tubej.n_top_color==self.volume:continue
                # j->iへの水の移動は可能か
                if tubei.top_color==tubej.top_color!=-1 and tubei.n_space>0:
                    actions.append(j*self.n_tube + i)
                elif tubei.top_color==-1 and tubej.top_color!=-1 and tubej.n_top_color!=tubej.n_space and use_free==0:
                    actions.append(j*self.n_tube + i)
            if tubei.top_color==-1:use_free=1
        return actions

    def set_score(self,score):
        self.score = score

    def set_pre_board(self,board):
        self.pre_board = board

    def flatten(self):
        ary = [self.tubes[i].state + [-1]*self.volume for i in range(self.n_tube)]
        value=[]
        for x in ary:
            value+=x[:self.volume]
        return value

    def hash_value(self):
        value = self.flatten()
        value = "".join(list(map(str,value)))
        return value
    
    def copy(self):
        ret = Board(self.n_tube,self.n_free_tube,self.volume,self.step_count)
        ret.set_tubes([tube.copy() for tube in self.tubes])
        return ret
    
    def __eq__(self,other):
        return self.score==other.score

    def __ge__(self,other):
        return self.score>=other.score

    def __gt__(self,other):
        return self.score>other.score

    def __le__(self,other):
        return self.score<=other.score

    def __lt__(self,other):
        return self.score<other.score

ソルバー

次にA*アルゴリズムの肝であるヒューリスティック関数とそれを使ったソルバーを実装します。
以下の推定コスト関数は場合によっては実際のコストよりも大きな値を返すので、残念ながら最短手を返す保証はないです。任意の盤面で実際コスト以下の推定コストを返す関数を作れれば最短を返すソルバーが作れます。

.py
def evaluate_board(board):
    observation=board.get_observation()
    n_switch_color = 0 #
    n_sorted_tube = 0  # 揃っている試験管の本数
    n_free_tube = 0    # 空試験管の本数
    for i,tubei in enumerate(observation):
        for j in range(len(tubei.state)-1):
            if tubei.state[j]!=tubei.state[j+1]:
                n_switch_color += 1
        if tubei.n_top_color==len(tubei.state)==board.volume:
            n_sorted_tube += 1
        if tubei.top_color==-1:
            n_free_tube += 1
    score = board.n_tube + n_switch_color - n_sorted_tube - n_free_tube
    #score = board.n_tube - n_sorted_tube - n_free_tube
    return score + board.step_count

def solv(board):
    q=[]
    heappush(q,board)
    searched_board=set()
    searched_board.add(board.hash_value())
    while q:
        try:
            board=heappop(q)
            actions=board.get_able_actions()
            for action in actions:
                board_=board.copy()
                reward,observation,done,info=board_.step(action)
                h = board_.hash_value()
                if h in searched_board:continue
                searched_board.add(h)
                board_.set_pre_board(board)
                if reward == 1:
                    info={"reward":1,"step_count":info["step_count"],"searched_board":len(searched_board)}
                    return board_,info
                score = evaluate_board(board_)
                board_.set_score(score)
                heappush(q,board_)
        except KeyboardInterrupt:
            break
    info={"reward":0,"step_count":info["step_count"],"searched_board":len(searched_board)}
    return board,info

試験管が24本ぐらいまでなら軽く解けます。

簡単な可視化

GIFにして、解いていく様子を観察します。
img.jpg
上のような初期状態なら、下のように解けます。

img.gif

試験管の数が増えても解けます

以下、可視化のコード。Boardクラスを渡して初期状態から現在の状態までのgifを保存します。
色をmatplotlibのcolor="C0","C1",..で指定している関係上、試験管の数が10を超えると可視化した時の色が被るので注意。

save_solv_gif.py
def get_tube_position(n_tube,height,width):
    image_size=(width,height)
    x_list=[
        [1+i*8 for i in range(8)],
        [4+i*8 for i in range(7)]
    ]
    for i in range(len(x_list)):
        x_list[i].sort(key=lambda x:abs(x-width/2))
    y_list=[[height//2],[height//2-height//5,height//2+height//5],[height//2-height//4,height//2,height//2+height//4]]
    tube_width=5
    if n_tube<=6:# 1行
        tube_height=int(height*0.6)
        t=n_tube
        ary=[t]
        tube_position=[]
        for y,t in zip(y_list[1-1],ary):
            for x in x_list[t%2][:t]:
                tube_position.append([x,y])

    elif n_tube<=16:# 2行
        tube_height=int(height*0.7/2)
        s=n_tube%2
        t=n_tube//2
        if s==0:ary=[t,t]
        else:ary=[t,t+1]
        tube_position=[]
        for y,t in zip(y_list[2-1],ary):
            for x in x_list[t%2][:t]:
                tube_position.append([x,y])

    elif n_tube<=24:# 3行
        tube_height=int(height*0.7/3)
        s=n_tube%3
        t=n_tube//3
        if s==0:ary=[t,t,t]
        elif s==1:ary=[t,t+1,t]
        else:ary=[t+1,t,t+1]
        tube_position=[]
        for y,t in zip(y_list[3-1],ary):
            for x in x_list[t%2][:t]:
                tube_position.append([x,y])
    else:
        raise Exception("not supported.")
    tube_position.sort(key=lambda x:x[0])
    tube_position.sort(key=lambda x:x[1],reverse=True)
    return tube_position,tube_height,tube_width

def save_solv_gif(board,fullpath):
    height,width=64,64
    tube_position,tube_height,tube_width=get_tube_position(board.n_tube,height,width)
    fig, ax = plt.subplots(figsize=(4,4))
    ax.set_xticks([0,64])
    ax.set_yticks([0,64])
    image_ary=[]
    while board is not None:
        for tube,position in zip(board.tubes,tube_position):
            # 描画
            x,y=position
            x-=tube_width//2
            y-=tube_height//2
            water_height=tube_height//board.volume
            # 水
            for i,color in enumerate(tube.state):
                r=patches.Rectangle(xy=(x,y + i*water_height),width=tube_width,height=water_height,color=f"C{color}")
                ax.add_patch(r)
            # 試験管の輪郭
            ax.plot([x,x+tube_width],[y,y],color='gray')
            ax.plot([x,x],[y,y+tube_height],color='gray')
            ax.plot([x+tube_width,x+tube_width],[y,y+tube_height],color='gray')
        ax.axes.xaxis.set_visible(False)
        ax.axes.yaxis.set_visible(False)
        fig.canvas.draw()
        data=fig.canvas.tostring_rgb()
        w,h=fig.canvas.get_width_height()
        img=Image.frombytes("RGB",(w,h),data,"raw")
        image_ary.append(img)
        ax.cla()
        board=board.pre_board
    image_ary.reverse()
    duration=[1]*len(image_ary)
    image_ary[0].save(fullpath,save_all=True, append_images=image_ary[1:], optimize=True, duration=duration, loop=0)
    image_ary[0].save(fullpath.replace(".gif",".jpg"))
    plt.clf()
    plt.close()

その他

・一度探索した盤面をハッシュ化して再び探索しないようにしていますが、やり方がダサい。
・プレイ画面から画像認識で盤面の初期状態の設定を自動でできたらいいなと思います。思っているだけです。
・A*の他、BFSとDFSで試しましたが、DFSが一番早くて驚きました。A*の50~80%の時間で探索できます。ただDFSで得た経路は最短でないことがかなり多いです。一方BFSはA*の50倍ぐらいの時間がかかりますますが、得た経路が必ず最短であり、A*は今回の設定では最短の保証はありませんが、ほとんどのケースで最短を返します。

6
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
6