🎄🎄🎄🎄🎄🎄〜メリー・クリスマス〜🦌🛷🎊🎅🦌🎁
フューチャーAdventCalender 2023の25日目(The last of this year!)です。
今年で入社8年目になりますが、趣味は相変わらずパズルとそれらを解くためのアルゴリズムです。
筆者は過去のAdvent Calendar (クリスマス当番)やフューチャー技術ブログで、パズルやAI関連の記事をいくつか発表しましたので、ぜひ合わせて一読してください。
HOWより、WHATを重視する時代
さて、生成AIやLLMが盛り上がっている今年、皆さんはどう感じているでしょうか。
私は、徐々にプログラミングの文法や書き方、そして既存のデータ構造やアルゴリズムにこだわらなくてもよい時代が来たかなと思います。
PCはまだそんなに普及できていない年代の頃に、タイピストという、文字を打ってプリントする職業も存在しました。
技術の発展によって、専門知識や技術の独占の壁をどんどんぶっ壊していく今時代は、昔から見ると頭脳労働であったが、現在もしくは近い将来には単純な重複体力労働になり、当然ながらそのような職業が徐々に消えていく途上ですね。
プログラミング言語の発展も同じく、機械語やアセンブリ言語が書ける人間はかっこいいですが、タスクを完成するために、あえてアセンブリ言語でゴリゴリ書く人はいないでしょう。
print("Hello world!")
で解決するぐらいのタスクに対して、機械語やアセンブリ、更にCPU原理やロジックゲートから学習するのはおかしいですよね。
巨人の肩の上に立つ我々は、車輪の再発明は要らないです。
最近開発フレームワークや自動生成の仕組みの発展で、作りたいものを構築する時に、プログラミングのタスクの大半に占めるのは、実行手順の実現よりも、何を作りたいのほうですね。
つまり、次世代のプログラミング言語は、HOWにこだわらなく、WHATを注目するだけでよいかと思います。
HOWに関して既存知識の部分は既存ライブラリやAIにまかせばよいですね。
一例を上げると、MongoDBのGUIクライアント「Compass」で、クエリ言語は全く知らなくても、自然言語でやりたいことを表現すれば、AIが自動的にそれなりのクエリ文が合成してくれます。
自然言語: find videos uploaded within 1 week (data format is YYYYMMDD)
↓
Mongoクエリ:{"upload_date": {"$gte": "20231217", "$lt": "20231224"}}
参考: Prompt a Natural Language Query
パズルを解くのも、HOWではなく、WHATの問題である
冒頭でリストアップした過去記事にもありましたが、汎用的にパズルのソルバーなど頑張って書いてみましたが、それらは全部HOWに注目していました。
一般人がパズルを解くために、深さ優先広さ優先、A*、双方向探索なんやらのアルゴリズムのお勉強からスタートするのは、ハードル高すぎるだろうと思い始めました。
では、2x2x2のルービックキューブは、ChatGPTに投げたら、解決してくれるでしょうか。
検証してみました。
まずは、表現方式を説明し、そしていくつかの例をあげました。最後に、新たな問題を投げて、はい、ChatGPT、どうぞ、解答してください。
結果は、よくある「真剣に嘘をついている」だけですね。
W G O Y O G W R G O Y B R B R Y B B Y W W O G R
の状態からChatGPTの解答のR' F' U L' F D R2 U2
を適用したら、R R W R B R B G Y W G Y O O G O B W G B Y W Y O
になりました。
バラバラのままじゃないですか。期待の揃った状態はW W W W O O G G R R B B O O G G R R B B Y Y Y Y
ですけど。
それは、実は予想通りです。16日目の @ShimizuJimmy さんのLLMと略語の同定の記事にもありましたが、実はLLMは算数が苦手ですね。
ChatGPTはこのような厳密さを求めるロジック系問題に向いていないこと分かりました。
では、もっと自動的に、知能的に、トレンドに乗る次世代プログラミング流でのパズル解決方法はありますか。
私は下記2ステップで分けて考えました。
- パズル問題 → 数学問題に変換 (WHAT部分を定義する)
- 数学問題 → 既存の汎用ソルバーで解決 (HOW部分を委託する)
SMTソルバーでやってみよう
マイクロソフト社が始めたオープンソースプロジェクトZ3は、定理证明やソフトウェア検証に有力で知られています。
それの論理基礎は「充足可能性モジュロ理論 Satisfiability Modulo Theories(SMT)」ですが、充足可能性問題(SAT)の拡張と簡単に理解すれば良いでしょう。
Z3のオフィシャルソースコードにexample.pyの一例を上げます。
Pythonソース及びそれの実行結果は下記のようです。
from z3 import *
x = Real('x')
y = Real('y')
s = Solver()
s.add(x + y > 5, x > 1, y > 1)
print(s.check())
print(s.model())
# output: [y = 2, x = 4]
限定条件をソルバーに追加したら、それは充足可能か(つまり矛盾はないか)、可能な場合に満たすモデル(具体値の例)も取得できます。
制限条件を入力 → 充足可能性をチェックする
Qiitaの記事Z3で数独パズルを解くとか、Z3で n-queens を解く¶とかの例は多数ありますが、興味があればご参考ください。
今回は、SPIテストなどによく登場する推論問題を、SMTソルバーで解く例を挙げます。
上記はネット上の推論問題例ですが、そのまま与えられた条件をソルバーに加え、必ず正しいかどうかを検証するために、宣言の否定が充足可能かどうかをチェックすれば良いです。(矛盾による証明)
from z3 import *
def is_highest(all, x):
return And([x >= i for i in all])
def is_lowest(all, x):
return And([x <= i for i in all])
def is_second(all, x, highest):
conditions = [x < highest]
for i in all:
if i == highest:
continue
if i == x:
continue
conditions.append(i < x)
return And(conditions)
p, q, r, s, t = Ints("p q r s t")
all = [p, q, r, s, t]
solver = Solver()
solver.add(r > s)
solver.add(t > r)
solver.add(Not(is_highest(all, t)))
solver.add(q > p)
solver.add(Distinct(all))
# prove q is the highest
if solver.check(Not(is_highest(all, q))) == unsat:
print("proved")
else:
print("not sure")
# prove s is the lowest
if solver.check(Not(is_lowest(all, s))) == unsat:
print("proved")
else:
print("not sure")
# since we have known q is the highest
# prove the second highest is p or t
if solver.check(Not(Or(is_second(all, p, q), is_second(all, t, q)))) == unsat:
print("proved")
else:
print("not sure")
# output:
# proved
# not sure
# proved
「ア」と「ウ」は証明できましたが、「イ」の否定も充足可能なので、必ず正しいとは言えないです。
答えはEですね。
ルービックキューブの制約を数学で表現
今回は2x2x2のルービックキューブを例とします。
ルービックキューブはツイスティパズルの一種として、この「汎用的にパズルのソルバーを実装してみた」の記事で紹介した状態と操作のデータ構造上を使って表現します。ぜひご確認ください。
下記cube_2x2x2.yaml
のように2x2x2のルービックキューブのパズル初期状態及び操作を定義します。
# ⓪①
# ②③
# ④⑤⑥⑦⑧⑨⑩⑪
# ⑫⑬⑭⑮⑯⑰⑱⑲
# ⑳㉑
# ㉒㉓
turnSet:
baseTurns:
- name: U
value: [0, 1, 3, 2, 0, 4, 10, 8, 6, 4, 5, 11, 9, 7, 5]
- name: D
value: [12, 14, 16, 18, 12, 13, 15, 17, 19, 13, 20, 21, 23, 22, 20]
- name: L
value: [0, 6, 20, 19, 0, 2, 14, 22, 11, 2, 4, 5, 13, 12, 4]
- name: R
value: [1, 18, 21, 7, 1, 3, 10, 23, 15, 3, 8, 9, 17, 16, 8]
- name: F
value: [2, 8, 21, 13, 2, 3, 16, 20, 5, 3, 6, 7, 15, 14, 6]
- name: B
value: [0, 12, 23, 9, 0, 1, 4, 22, 17, 1, 10, 11, 19, 18, 10]
extenders:
- name: U'
function: REVERSE
params: [U]
- name: U2
function: COMBINE
params: [U, U]
- name: D'
function: REVERSE
params: [D]
- name: D2
function: COMBINE
params: [D, D]
- name: L'
function: REVERSE
params: [L]
- name: L2
function: COMBINE
params: [L, L]
- name: R'
function: REVERSE
params: [R]
- name: R2
function: COMBINE
params: [R, R]
- name: F'
function: REVERSE
params: [F]
- name: F2
function: COMBINE
params: [F, F]
- name: B'
function: REVERSE
params: [B]
- name: B2
function: COMBINE
params: [B, B]
goalState: [W, W, W, W, O, O, G, G, R, R, B, B, O, O, G, G, R, R, B, B, Y, Y, Y, Y]
パズルロジック本体
パズルの読み込みや基本操作の定義は下記です。
from io import StringIO, TextIOWrapper
from itertools import chain
from random import choices, randrange
import yaml
class TwistyPuzzle:
@classmethod
def from_yaml(cls, file: TextIOWrapper):
with file:
obj = yaml.safe_load(file)
return cls(**obj)
@classmethod
def state_to_str(cls, state: list[str]):
return " ".join(state)
@classmethod
def turn_names_to_str(cls, turn_names: list[str]):
return " ".join(turn_names)
@classmethod
def perm_to_str(cls, perm: list[int]):
return " ".join(map(str, perm))
@classmethod
def turns_to_str(cls, turns: dict[str, list[int]]):
return '\n'.join(f' {k}: {cls.perm_to_str(v)}' for k, v in turns.items())
def __init__(self, **config):
self.goal_state = config.get('goalState')
self.state_size = len(self.goal_state)
self.turn_names: list[str] = []
# Permutaions are represented by a kind of cycle notation which is called "array cycle notation" by me.
# The cycle notation in math e.g. (1,2,3)(4,5) is represented as a one dimention array as [1,2,3,1,4,5,4] here.
# The "array cycle notation" is convinient when operating the swap.
# Iterate the array and swap with a tmp variable one by one.
self.array_cycle_turns: dict[str, list[int]] = {}
# word representation of permutation is usually called one line notation
self.word_repr_turns: dict[str, list[int]] = {}
self.__populate_base_turns(config['turnSet']['baseTurns'])
self.__populate_ext_turns(config['turnSet'].get('extenders', []))
def __populate_base_turns(self, baseTurns: list[dict[str, str | list[int]]]):
for turn in baseTurns:
turn_name: str = turn['name']
array_cycle: list[int] = turn['value']
word_repr = self.__array_cycle_to_word_repr(array_cycle)
normalized_array_cycle = array_cycle # TODO
# self.__word_repr_to_array_cycle(word_repr)
self.turn_names.append(turn_name)
self.word_repr_turns[turn_name] = word_repr
self.array_cycle_turns[turn_name] = normalized_array_cycle
def __populate_ext_turns(self, extenders: list[dict[str, str | list[str]]]):
for extender in extenders:
turn_name: str = extender['name']
if extender['function'] == 'REVERSE':
array_cycle = list(reversed(
self.array_cycle_turns[extender['params'][0]]
))
word_repr = self.__array_cycle_to_word_repr(array_cycle)
normalized_array_cycle = array_cycle # TODO
# self.__word_repr_to_array_cycle(word_repr)
self.turn_names.append(turn_name)
self.word_repr_turns[turn_name] = word_repr
self.array_cycle_turns[turn_name] = normalized_array_cycle
elif extender['function'] == 'COMBINE':
array_cycle = list(
chain(*[self.array_cycle_turns[p] for p in extender['params']]))
word_repr = self.__array_cycle_to_word_repr(array_cycle)
normalized_array_cycle = array_cycle # TODO
# self.__word_repr_to_array_cycle(word_repr)
self.turn_names.append(turn_name)
self.word_repr_turns[turn_name] = word_repr
self.array_cycle_turns[turn_name] = normalized_array_cycle
def __array_cycle_to_word_repr(self, array_cycle: list[int]):
word_repr = list(range(self.state_size))
tmp = None
for i in array_cycle:
tmp, word_repr[i] = word_repr[i], tmp
return word_repr
def __str__(self) -> str:
with StringIO() as output:
print("goalState:", self.state_to_str(
self.goal_state), file=output)
print("turns (array cycle notation):", file=output)
print(self.turns_to_str(self.array_cycle_turns), file=output)
print("turns (word representation):", file=output)
print(self.turns_to_str(self.word_repr_turns), file=output)
return output.getvalue()
def take_one_move(self, state: list[str], turn_name: str):
tmp = None
res = state[:]
for i in self.array_cycle_turns[turn_name]:
tmp, res[i] = res[i], tmp
return res
def take_moves(self, state: list[str], turn_names: list[str]):
res = state[:]
tmp = None
for turn_name in turn_names:
for i in self.array_cycle_turns[turn_name]:
tmp, res[i] = res[i], tmp
return res
def scramble(self, times=1000, random_more=True):
k = times + randrange(times) if random_more else times
return self.take_moves(self.goal_state, choices(self.turn_names, k=k))
if __name__ == '__main__':
p = TwistyPuzzle.from_yaml(open('examples/cube_2x2x2.yaml'))
print(p)
ソルバーの実装
下記はソルバーの部分ですが、後ほど説明を加えます。
from z3 import *
from twisty_puzzle import TwistyPuzzle
DEBUG = False
# DEBUG = True
def debug(msg: str):
if DEBUG:
print(msg)
class SolveLimitExceededException(Exception):
def __str__(self):
return 'solve step limit exceeded'
class PuzzleSolver:
def __init__(self, puzzle: TwistyPuzzle, start_state: list[str], goal_state: list[str], allowed_turn_names: list[str] = None):
if allowed_turn_names is None:
allowed_turn_names = puzzle.turn_names
self.puzzle = puzzle
self.allowed_turn_names = allowed_turn_names
self.ctx = Context()
self.solver = Solver(ctx=self.ctx)
# Timeout: 0.2 seconds. Make this longer if you cannot find a solution
self.solver.set("timeout", 200)
self.model: ModelRef = None
self.moves: list[ArithRef] = []
# The first element of state_iters is the start state variables.
# While deepening the move number, the state_iters[i] represents state variables after operating the i-th move.
self.state_iters = [[Int(s, ctx=self.ctx) for s in start_state]]
self.goal_state_vars = [Int(s, ctx=self.ctx) for s in goal_state]
# assign each color in the state a distict integer value
for i, s in enumerate(set(goal_state)):
self.solver.add(Int(s, ctx=self.ctx) == i)
# Iterative deepening solve, to find from the shortest solution
def solve_next(self, limit=15):
self.__reject_current_model()
while not self.__check_solved():
self.__deepen_one_move()
depth = len(self.moves)
if depth > limit:
raise SolveLimitExceededException()
debug(f'trying depth: {depth}')
return list(map(lambda move: self.allowed_turn_names[self.model[move].as_long()], self.moves))
def __check_solved(self):
# check satification which makes the last state iteration equal to the goal state
check_cond = [self.state_iters[-1][j] == self.goal_state_vars[j]
for j in range(self.puzzle.state_size)]
res = self.solver.check(check_cond)
if res == sat:
self.model = self.solver.model()
return True
if res == unknown:
debug(self.solver.reason_unknown())
self.model = None
return False
def __deepen_one_move(self):
last_iter = self.state_iters[-1]
i = len(self.state_iters) # next iteration index
move = Int(f'M_{i}', ctx=self.ctx)
new_iter = [Int(f'S_{i}_{j}', ctx=self.ctx)
for j in range(self.puzzle.state_size)]
self.solver.add(0 <= move, move < len(self.allowed_turn_names))
# constrains on last move and current move
if len(self.moves) > 0:
last_move = self.moves[-1]
no_adjacency_list = [
("U", "U2"),
("U", "U'"),
("D", "D2"),
("D", "D'"),
("L", "L2"),
("L", "L'"),
("R", "R2"),
("R", "R'"),
("F", "F2"),
("F", "F'"),
("B", "B2"),
("B", "B'"),
]
self.solver.add(last_move != move)
for a, b in no_adjacency_list:
if a not in self.allowed_turn_names or b not in self.allowed_turn_names:
continue
self.solver.add(Implies(
last_move == self.allowed_turn_names.index(a),
Not(move == self.allowed_turn_names.index(b))))
self.solver.add(Implies(
last_move == self.allowed_turn_names.index(b),
Not(move == self.allowed_turn_names.index(a))))
for i, turn_name in enumerate(self.allowed_turn_names):
word_repr = self.puzzle.word_repr_turns[turn_name]
for new_index, last_index in enumerate(word_repr):
self.solver.add(
Implies(move == i, last_iter[last_index] == new_iter[new_index]))
self.moves.append(move)
self.state_iters.append(new_iter)
# add additional constraints, which is to reject the current model (move_vars), for finding more solutions
def __reject_current_model(self):
if self.model is None:
return
self.solver.add(Or(*[move != self.model[move].as_long()
for move in self.moves], self.ctx))
class TwoPhaseSolver:
def __init__(self, puzzle: TwistyPuzzle, start_state: list[str], goal_state: list[str]):
self.puzzle = puzzle
self.start_state = start_state
self.goal_state = goal_state
self.replace_map = {
'W': 'a',
'Y': 'a',
'R': 'b',
'O': 'b',
'G': 'b',
'B': 'b',
}
self.second_phase_turns = [
'U', 'U2', "U'",
'D', 'D2', "D'",
'F2', 'B2', 'L2', 'R2'
]
self.max_steps = 11
replace_start = list(map(lambda c: self.replace_map[c], start_state))
replace_goal = list(map(lambda c: self.replace_map[c], goal_state))
self.first_phase_solver = PuzzleSolver(
self.puzzle, replace_start, replace_goal)
def solve_next(self):
while True:
debug('Solving phase 1 ...')
first_phase_moves = self.first_phase_solver.solve_next(
self.max_steps)
debug(first_phase_moves)
second_phase_start_state = self.puzzle.take_moves(
self.start_state, first_phase_moves)
debug('Solving phase 2 ...')
second_phase_solver = PuzzleSolver(
self.puzzle, second_phase_start_state, self.goal_state, self.second_phase_turns)
try:
second_phase_moves = second_phase_solver.solve_next(
self.max_steps - len(first_phase_moves))
except SolveLimitExceededException:
continue
break
return first_phase_moves + second_phase_moves
def validate_answer(puzzle: TwistyPuzzle, start_state: list[str], res: list[str]):
actual = puzzle.take_moves(start_state, res)
try:
assert actual == puzzle.goal_state
except AssertionError:
print('Wrong answer!', actual)
exit(1)
if __name__ == '__main__':
p = TwistyPuzzle.from_yaml(open('examples/cube_2x2x2.yaml'))
start_state = p.scramble()
print('start:', TwistyPuzzle.state_to_str(start_state))
print('goal:', TwistyPuzzle.state_to_str(p.goal_state))
print()
s = TwoPhaseSolver(p, start_state, p.goal_state)
res = s.solve_next()
print(f'result: {TwistyPuzzle.turn_names_to_str(res)}')
validate_answer(p, start_state, res)
# output:
# start: W G O Y O G W R G O Y B R B R Y B B Y W W O G R
# goal: W W W W O O G G R R B B O O G G R R B B Y Y Y Y
# result: R' D L U B' D L2 U2 B2 U F2
ソルバー実装の説明
ルービックキューブの状態をz3.Int
のlistで表現します。
同じ色だったら同じ整数を付与したいです。
z3.Int
は名称で識別しますので、整数の名称は色の名称そのままになります。
当然ながら、ルービックキューブを操作したら、状態が変わりますので、1操作ずつ状態の世代を管理する必要があります。1世代の状態はz3.Int
のlistになっているので、解けるまでの全世代の状態の履歴は、2次元の整数のlistになります。(ソース上のself.state_iters
)
最初の1世代はルービックキューブの現在の状態です。
最後世代はの揃った状態に期待しますので、__check_solved
の時、最後の世代の状態がgoal_state_vars
の定義と一致するか(一致することが充足可能か)でチェックします。
充足可能であれば、モデルを取得し、それが答えになります。充足可能ではない場合、解答の長さが足りないことで、答えのステップ数(状態の履歴数も)をプラス1にします。
ステップ数を最初0にしてスタートし、結果が見つかるまで1ずつ長くして、結果に最短の答えが求められます。
異なる結果を求め続けたい場合、制限条件に現在の結果を否定すればよいですので、solve_next()
の冒頭に
__reject_current_model()
を呼んで、前回の答えmoves
の値を合致しないことを条件に加えています。
操作の表現も同じく、各種類の操作に整数z3.Int
を割り当てます。例えばU=0, D=1, L=2, ...
状態の世代間のマッピング関係は操作で定義しています。
操作がPermutationとOne-line notation (Word representation)の2つの表現がサポートしています。Word representationのほうは自然に状態変更前後のマッピング関係を表現しています。
そのロジックはPuzzleSolver.__deepen_one_move()
の最後の部分になります。
上記のソースコードと合わせて読んでください。
Two Phaseの実装で加速
論理上2x2x2のルービックキューブは、最悪のケースでも11ステップ以内に解けますが(God's Number)、それでもソルバーに変数が多すぎます。なかなか有限時間内で求められなかったです。
Kociemba氏が提案した3x3x3ルービックキューブのTwo Phase解法を2x2x2キューブにも適用しています。
上と下の白(W)と黄(Y)をa
に置換、他の4面の色をb
に置換し、a
とb
2色のルービックキューブをまず解きます。その段階をPhase 1と呼びます。
次に、色を従来通りに戻し、'U', 'U2', "U'", 'D', 'D2', "D'", 'F2', 'B2', 'L2', 'R2'
のみ使って最後まで解けます。その段階をPhase 2と呼びます。
Phase 1は色数が少なく、Phase 2は操作可能の数が少ないので、計算量がだいぶ減ります。
その考え方を実装しているのはTwoPhaseSolver
です。Phase 1用とPhase 2用のPuzzleSolver
をそれぞれ用意し、パイプラインのように、Phase 1の結果を基づいてPhase 2で続けて計算します。2 Phase合わせたステップ数が11に超えた場合、もしくはタイムアウト(一旦200
msと設定していますが、ローカル環境によって調整したほうが良いでしょう)が発生した場合、Phase 1のsolve_next()
を呼んで次の解答を出力し続けます。
結果的に、ChatGPTが解答できないルービックキューブ問題は、Z3 SMTソルバーで解けるようになりました。
実行時間が数秒から1分ぐらいかかりますが、クラシックの解法よりは、メモリ消費量が比較的に少ない(100Mぐらい)です。
そしてヒューリスティックは一切使っていないので、事前計算したパターンDBなども全部不要です。
つまり、パズルの定義の変化に強くて、他のパズルにもすぐ適用できて、汎用性が高いと思います。(Two Phase技法だけは汎用できないです。)
改善したいポイント
- あり得ない操作をパズル上で定義したい。それをソルバーの制限条件として加えたら、無駄が減らせる見込み。例えば、(下記2ルールは上の
__deepen_one_move()
でベタ書きになっている)- 同じ操作を繰り返すことがありえない (
U2
はあるのに、U U
を試さなくても良いため) - 同じ平面の操作を連続で実施しえない (
U
の次にU'
やU2
がありえない相殺されてしまうため)
- 同じ操作を繰り返すことがありえない (
- マルチスレッド化して高速化したい
- Phase 1とPhase 2がパイプラインになっていますので、お互いに待ち合うのがもったいない
- Phase 2の結果を待たなくてもPhase 1を計算し続けて、結果をqueueにためておくのが良いだろう
- パズルを完全に解くのは、手順が長いパズルに向いていない(変数の数爆発)
- そのため、決め手のアルゴリズムを発見するエンジンとしてSMTを使いたい
- 参考:パズルを解ける用のアルゴリズムの発見エンジン
では、みなさん、素敵なクリスマスを、良いお年を〜