Python3
FUJITSUDay 5

パズ○ラの盤面欠損率を計算するスクリプトを高速化する話

富士通全く関係ない話になってしまいましたが許してください。

皆さん、パ○ドラというスマホゲームをご存知でしょうか?

しばらく前に曲○師というめちゃめちゃ強いキャラが実装されて話題になったあのパズド○です。

ちなみに今魔法石100個ばらまいてます。やったことない人はこの機会にやりましょう

今回はタイトルの通り、パ○ドラにおける、各種リーダースキルの盤面欠損率を計算するpythonスクリプトを高速化する話です。

※注:筆者は普段Pythonをあまり使っていないので、いろいろ間違っているかもしれません。マサカリ大歓迎です。

また、今回使ったコードはすべてGitHubにあげてます。

https://github.com/kuwabataK/pazdora-cal

盤面欠損率を計算した結果も上げているので、「高速化なんて興味ないぜ!俺は欠損率が知りたいんだ!」という人はそちらを見てください。


盤面欠損率 is なに

といっても、○ズドラ知らない人にはわからないと思うので、今回テーマにする部分だけ軽く解説しときます。知ってるよって人は飛ばしてください。

本当はプレイ動画とか貼るのが一番早いのですが、著作権的にいろいろグレーなのでやめておきます。

知りたい方はググってください。


パズ○ラとは

パ○ドラはガン○ーさんから配信されているスマホ向けのパズルRPGゲームです。

パズルRPGゲームですというだけあって、パズルして敵を倒していくゲームです。具体的には以下のような感じです。


  1. ダンジョンに入ると、以下のような画面上に5×6マスのドロップが配置された盤面が現れます。
    2018-11-10 12.53.07.png

  2. ドロップの種類は6種類(火、水、木、光、闇、回復)あります。
    これらのドロップの内ひとつだけ選んで自由に動かすことができ、同じ色が3つ以上直線でつながるように並べ替えていきます。
    ↓のような感じ


  3. 最後に掴んでいたドロップを離すと3つ以上繋がっているドロップが消え、消えたドロップに応じて敵キャラに攻撃します。

以上が一回の戦闘の流れです。

このとき、自分のパーティによっては、例えば以下のようなリーダースキルが設定されている場合があります。

木と闇ドロップを同時に消すと攻撃力が5倍

つまり、盤面にどういうドロップがあるかでパーティの火力が変わってくるのです。

このリーダースキルには様々な種類があり、当然条件が厳しいものほど高い火力が出せるようになっています。

そうなってくると、気になるのは、「そのリーダースキルがどれぐらいの確率で発動できるのか?」ということです。

上の例だと、5×6の盤面にランダムにドロップを配置したとき、「木ドロップと闇ドロップがそれぞれ3個以上ある確率」を求めたい。ということになるでしょうか

そう、それが今回のテーマ、「パ○ドラの盤面欠損率を計算したい」です。(前置き終わり)


実装

そんなに難しいロジックでもないですし、何はともあれとりあえず実装です。

モンテカルロ法を使います。


paz-cal_1.py

from numpy.random import *

from itertools import chain

##============= パラメーター設定==============##

## 試行回数を指定
loop_cnt = 100000

## 盤面を指定
height = 5
width = 6

##=============================================##

def check_drops (drops):
## ドロップが3つ以上つながっているかどうかをチェックする
for idx_i, val_i in enumerate(drops):
for idx_j, val_j in enumerate(val_i):
if val_j == val_i[idx_j + 1] and val_i[idx_j + 1] == val_i[idx_j + 2]:
return False
if idx_j + 3 >= len(val_i):
break

## 配列を逆転して再度計算
drops = list(map(list, zip(*drops)))

for idx_i, val_i in enumerate(drops):
for idx_j, val_j in enumerate(val_i):
if val_j == val_i[idx_j + 1] and val_i[idx_j + 1] == val_i[idx_j + 2]:
return False
if idx_j + 3 >= len(val_i):
break
return True

def generate_drops(height, width):
return randint(0,6,(height,width))

## 指定した欠損条件に対して、モンテカルロ法により、欠損しなかった回数と欠損した回数の組を返す
def monte_carlo_freq(lack_cond):
num_ok = 0
num_ng = 0
for x in range(loop_cnt):
drops = generate_drops(height,width)
if check_drops(drops) == False:
continue

drops = list(chain.from_iterable(drops))

fire_drops_num = len([i for i in drops if i == 0])
blue_drops_num = len([i for i in drops if i == 1])
green_drops_num = len([i for i in drops if i == 2])
light_drops_num = len([i for i in drops if i == 3])
black_drops_num = len([i for i in drops if i == 4])
recovery_drops_num = len([i for i in drops if i == 5])

if lack_cond(
fire_drops_num,
blue_drops_num,
green_drops_num,
light_drops_num,
black_drops_num,
recovery_drops_num):
num_ok = num_ok +1
else:
num_ng = num_ng +1
return num_ok, num_ng

## 試行回数、OKの回数、NGの回数、確率を出力する
def print_prob(num_ok, num_ng):
print("試行回数は" + str(num_ok + num_ng))
print("OKの回数は" + str(num_ok))
print("NGの回数は" + str(num_ng))
print("確率は" + str(num_ok/(num_ok + num_ng) * 100))

##==============ここから処理開始================##

## 指定2色がある確率
print("=========================")
print("指定2色がある確率")
num_ok, num_ng = monte_carlo_freq(lambda f, b, g, l, d, r: f >= 3 and d >= 3)
print_prob(num_ok, num_ng)

# ・
# ・
# ・
# 他の条件で延々と処理が続く


考え方は単純で、


  1. ランダムな盤面を生成する

  2. 生成した盤面内に3つ以上繋がったドロップが存在しないことを確かめる

  3. 盤面内の各色ドロップの個数を数え、リーダースキルの発動条件を満たしているかどうかチェックする

  4. 1.~3.の処理を十分な数(今回は10万回)繰り返し、どれぐらいの割合でリーダースキルが発動するかを数え上げる

の流れで、各リーダースキルの盤面欠損率を計算します。

今見るといろいろひどいコードですが、とりあえず計算はできるはずです。

実行時間 1m4.786s

処理速度計測にはMacのtimeコマンドを使用しています。

ちなみに、指定2色がある確率は確率は83.3%程度でした。大体、5~6回に一回は欠損する計算ですね。

これが大きいか小さいかは個人の感覚によるところが大きいですが、個人的には「割と欠損するな・・・」という感覚です。

最近はダンジョンのインフレも激しく、倒すべきタイミングで敵を倒せないと、逆にこちらがやられてしまうことも多いのです。世知辛い・・・


高速化

さて、今回は「パズ○ラの盤面欠損率を計算するスクリプトを高速化する話」なので、これからこのコードを高速化していきます。

短いコードですが、改善点はいろいろあります。


Step1 盤面生成を最初だけにする

上記のコードのmonte_carlo_freq関数の中身を見てみると以下のようなコードがあることがわかります。

for x in range(loop_cnt):

drops = generate_drops(height,width)

このコードは、試行回数分だけランダムな盤面を生成していますが、monte_carlo_freq関数の中に書かれているということは、異なる条件で盤面欠損率を計算するたびに、毎回ランダムな盤面を生成しなおしていることになります。

これは明らかに無駄ですね。異なる条件の盤面欠損率を計算するたびに何も毎回盤面を一から生成する必要はないはずです。

というわけで、盤面生成をスクリプトの最初に持ってきて、すべての条件で使いまわすことにしましょう。

コードは以下のような感じです。


paz-cal_2.py

def monte_carlo_freq(lack_cond,fields):  ## 盤面を引数でもらうように変更

## 盤面を生成する関数を独立させた
def generate_fields():

fields = []

for x in range(loop_cnt):
drops = generate_drops(height,width)
if check_normal_drops(drops) == False:
continue

drops = list(chain.from_iterable(drops))
fields.append(drops)
return fields

##=============================================##

fields = generate_fields() ## 処理の最初に盤面を生成する

## 指定2色がある確率
print("=========================")
print("指定2色がある確率")
num_ok, num_ng = monte_carlo_freq(lambda f, b, g, l, d, r: f <= 2 or d <= 2, fields)
print_prob(num_ok, num_ng)


実行時間: 0m23.948s

たったこれだけで、実行時間が3分の一程度まで小さくなりました。

まあこの修正は同時に計算したいリーダースキル条件がどれだけあるかで変わってくるのですが、

今回は25種類程度の条件で計算しています。条件が多ければ多いほど効果は大きくなるはずです。


Step2 リストを使う

次に以下の関数に注目してみます。

def generate_drops(height, width):

return randint(0,6,(height,width))

これはnumpyrandint関数を使って、0~5までの乱数を生成し、numpyの2次元配列データndarrayとして返しています。

今回のスクリプトではこの2次元配列に対して、同じドロップが上下左右に3つ以上繋がっていないか

forループを回してチェックしているのですが、このようなfor文を使った処理はndarrayには向きません。

なので、以下のように内包表記を使ってリストを返すようにしてみます。

def generate_drops(height, width):

return [[randint(0, 6) for _ in range(width)] for _ in range(height)]

実行時間: 0m11.725s

Step1のときに比べて半分程度になりました。ndarrayをfor文で回すのはアンチパターンだみたいな話はよく聞きますが、リストにするだけでも結構変わりますね。


Step3 count()メソッドを使う

元のコードをよく見てみると以下のようなコードがあるのがわかります。

fire_drops_num = len([i for i in drops if i == 0])

ここのlen([i for i in drops if i == 0])は、dropsリストの中から条件に合うドロップの個数を抽出していますが、内包表記を使って一度リストを生成してから長さを数えています。

pythonではこんなまわりくどいやり方をしなくても、条件に合う要素の個数を数え上げてくれるcount()メソッドが用意されています。なので以下のように書き換えましょう

fire_drops_num = drops.count(0)

実行時間: 0m7.874s

だいたい30%ぐらい高速化しました。元のコードだとリストをいちいち生成して数え上げてるから遅いんですかね。

組み込みのメソッドが用意されているならちゃんとそれを使いましょうということですね。


Step4 処理を簡略化する

monte_carlo_freq()の引数にしているlack_cond()関数は各色のドロップ数からリーダースキルの条件にあるかどうかをbool型で返す関数です。

        if lack_cond(

fire_drops_num,
blue_drops_num,
green_drops_num,
light_drops_num,
black_drops_num,
recovery_drops_num):
num_ok = num_ok +1
else:
num_ng = num_ng +1

実はパ○ドラのリーダースキルは盤面にある各色のドロップ数からのみ計算でき、各色の盤面上の配置はリーダースキル発動条件に影響しません。これは操作時間が無限にある場合は、任意の場所に任意のドロップを動かすことが簡単にできるからです。(実際には操作時間は有限なのでこの限りではないのですが、ここでは考えないことにします。)

つまりパ○ドラの欠損率を調べるためには、最初に生成した盤面の各ドロップの数さえわかればよく、各ドロップの配置情報は必要ないということです。実際、盤面が欠損しているかどうかの判定に用いているlack_cond()関数も各色のドロップ数しか使っていませんよね。

今までは、monte_carlo_freq()の中でドロップの数を数えていましたが、この処理を最初の盤面生成のときに一緒に数え上げてしまいましょう。ついでに生成した盤面の情報はいらないので、generate_fields()は以下のように書き換えることができます。

def generate_field():

drops = generate_drops(height, width)
if not check_normal_drops(drops):
return []
drops = list(chain.from_iterable(drops))
return [drops.count(0),drops.count(1),drops.count(2),drops.count(3),drops.count(4),drops.count(5)]

def generate_fields():
r = [ generate_field() for i in range(loop_cnt)]
return [x for x in r if x != []]

ついでにmonte_carlo_freq()でドロップの数を数える必要はなくなったので、monte_carlo_freq()も以下のように書き換えます。

def monte_carlo_freq(lack_cond, fields):

num_ok = 0
num_ng = 0
for drops in fields:

if lack_cond(
drops[0],
drops[1],
drops[2],
drops[3],
drops[4],
drops[5]):
num_ok = num_ok + 1
else:
num_ng = num_ng + 1
return num_ok, num_ng

実行時間: 0m4.823s

最初は1分以上かかっていたので嘘みたいです。


Step5 Numbaを使ってJITコンパイルする

pythonには高速化のためにNumbaというJITコンパイラが用意されています。

公式サイトを見てみると


Numba is a just-in-time compiler for Python that works best on code that uses NumPy arrays and functions, and loops.


とのことなので、主にNumpyの関数や配列計算なんかを行っているコードに対して使うと効果があるようです。

今回は乱数を生成しているgenerate_drops()関数に適用してみます。

使い方は簡単で、numbaパッケージをインポートしたあと、JITコンパイルする関数に@jitデコレータをつけるだけです。

from numba import jit

@jit

def generate_drops(height, width):
return [[randint(0, 5) for _ in range(width)] for _ in range(height)]

実行時間: 0m2.234s

だいたい半分ぐらいになりました。デコレータつけるだけで実行時間が半分になるのはなかなかすごいんじゃないでしょうか


Step6 行列演算を使って高速化する

numpyのndarrayはfor文の処理が苦手です。逆に言えば、numpy組み込みの行列演算用のメソッドを使っている限りは高速に動作します。なので、for文を行列演算に置き換えることで処理を高速化する事を考えてみましょう。

今回高速化の対象にするのは以下のcheck_drops関数です。


check_drops.py

def check_drops (drops):

## ドロップが3つ以上つながっているかどうかをチェックする
for idx_i, val_i in enumerate(drops):
for idx_j, val_j in enumerate(val_i):
if val_j == val_i[idx_j + 1] and val_i[idx_j + 1] == val_i[idx_j + 2]:
return False
if idx_j + 3 >= len(val_i):
break

## 配列を逆転して再度計算
drops = list(map(list, zip(*drops)))

for idx_i, val_i in enumerate(drops):
for idx_j, val_j in enumerate(val_i):
if val_j == val_i[idx_j + 1] and val_i[idx_j + 1] == val_i[idx_j + 2]:
return False
if idx_j + 3 >= len(val_i):
break
return True


この関数は引数に指定されたdrops(盤面)に同じドロップが3つ以上隣接して存在していないかをチェックします。

(パスドラでは初期盤面に3つ以上繋がったドロップは存在しないようになっています。)

これを行列演算を使って解くのであれば以下のようなコードが考えられます。

# ドロップが3つ以上つながっているかどうかをチェックする

def check_drops (drops):
diff1 = np.roll(drops,1,axis=1) - drops
diff2 = np.roll(drops,-1,axis=1) - drops
diff3 = (100 * diff1[:,1:-1]) - diff2[:,1:-1]
if np.sum(diff3 == 0) != 0:
return False

diff1 = np.roll(drops,1,axis=0) - drops
diff2 = np.roll(drops,-1,axis=0) - drops
diff3 = (100 * diff1[1:-1]) - diff2[1:-1]
if np.sum(diff3 == 0) != 0:
return False
return True

考え方としては、例えば以下のような盤面があったとして、

| 3 | 2 | 4 | 4 | 4 |

| 3 | 5 | 4 | 6 | 1 |
| 1 | 3 | 4 | 2 | 5 |
| 6 | 2 | 5 | 1 | 1 |

この盤面をx軸方向1だけ回転させた行列

| 4 | 3 | 2 | 4 | 4 |

| 1 | 3 | 5 | 4 | 6 |
| 5 | 1 | 3 | 4 | 2 |
| 1 | 6 | 2 | 5 | 1 |

との差分を取ると隣り合ったドロップが同じ時のみ0になることがわかります。

この操作を上下左右に行うことで3つ以上繋がったドロップが存在するかどうかを判定します。

上のコードは2次元配列に対してチェックを行っていますが、ndarrayの配列操作はn次元に拡張できるので、

今回の場合だと、10万回生成した盤面(5×6×100000の行列)に対して、一度に上記のチェックを行うことができます。最終的なコードは以下になりました。


from numpy.random import *
import numpy as np

##============= パラメーター設定==============##

## 試行回数を指定
loop_cnt = 100000

## 盤面を指定
height = 5
width = 6

##=============================================##

## 5×6×100000の3次元配列に対して、x,y方向に3つ以上繋がったドロップがある場合に、その盤面を削除して返す
def filter_drops(fields):
## x軸方向に3以上連結したドロップがないかどうかチェック
diff1 = np.roll(fields,1,axis=2) - fields
diff2 = np.roll(fields,-1,axis=2) - fields
diff3 = (100 * diff1[:,:,1:-1]) - diff2[:,:,1:-1]

## y軸方向に3以上連結したドロップがないかどうかチェック
diff4 = np.roll(fields,1,axis=1) - fields
diff5 = np.roll(fields,-1,axis=1) - fields
diff6 = (100 * diff4[:,1:-1]) - diff5[:,1:-1]

## 一つでも3つ以上繋がったドロップがある場合には、対象の盤面を削除して返す
b = np.all(np.all(diff3 != 0, axis=1),axis=1).astype(int) * np.all(np.all(diff6 != 0, axis=1),axis=1).astype(int)
return fields[b.astype(bool)]

## 3次元に拡張した盤面を作成
def generate_fields(height, width,loop_cnt):
return randint(1,7,(loop_cnt,height,width))

## filterしたあと各色の数を数えた盤面を返す
def generate_fields_and_cnt_drops():
drops = generate_fields(height, width,loop_cnt)
drops = filter_drops(drops)

cnt1 = np.sum(drops == 1, axis=(1,2))
cnt2 = np.sum(drops == 2, axis=(1,2))
cnt3 = np.sum(drops == 3, axis=(1,2))
cnt4 = np.sum(drops == 4, axis=(1,2))
cnt5 = np.sum(drops == 5, axis=(1,2))
cnt6 = np.sum(drops == 6, axis=(1,2))
return np.array([cnt1,cnt2,cnt3,cnt4,cnt5,cnt6]).T.tolist() ## for文での処理を高速化するためリストにして返す

def monte_carlo_freq(lack_cond, fields):
num_ok = sum(lack_cond(*drops) for drops in fields)
num_ng = len(fields) - num_ok
return num_ok, num_ng

# 試行回数、OKの回数、NGの回数、確率を出力する
def print_prob(num_ok, num_ng):
print("試行回数は" + str(num_ok + num_ng))
print("OKの回数は" + str(num_ok))
print("NGの回数は" + str(num_ng))
print("確率は" + str(num_ok/(num_ok + num_ng) * 100))

##==============ここから処理開始================##

fields = generate_fields_and_cnt_drops()

## 指定2色がある確率
print("=========================")
print("指定2色がある確率")
num_ok, num_ng = monte_carlo_freq(lambda f, b, g, l, d, r: f >= 3 and d >= 3,fields)
print_prob(num_ok, num_ng)

# ・
# ・
# ・
# 他の条件で延々と処理が続く

最初の盤面生成のロジックからfor文がなくなっていることがわかると思います

実行時間: 0m1.008s

更に半分ぐらいになりました。numpyの本領発揮といったところでしょうか

ただ、上記のコードを見ればわかるように、行列演算を用いた実装は複雑になりやすいので、実際のプロジェクトでこのような実装を許すかどうかは、議論の余地があるでしょう。(実際filter_drops関数の実装はかなり複雑で、私もこの実装であっている自信があまりありません、半年後に見れば確実にわからないと思います)。一回か二回使うだけの書き捨てのコードであれば、ここまでの最適化はやりすぎだと思いますし、ほんとに高速に実装したいのであれば、部分的にでもC++goなどの実行速度に優れたコンパイル言語を使って実装したほうが、可読性、実行速度の両面で良い場合も多いと思います。


まとめ

まとめると、できるだけ処理を簡略化できないか考えることは当然として、pythonに関して言えば、


  • pythonは配列やリストの生成にコストがかかるので、できる限り生成を抑える

  • リストを作る際にはfor文ではなく内包表記を使うと高速に動作する場合が多い


  • ndarrayを使う場合には、for文ではなく行列演算を使って解けるように工夫する

  • for文を使わなくてはいけない場合はndarrayではなくリストにするか、numbaなどを使って高速化できないか試してみる

あたりを気をつけると高速なコードを書くことができそうです。

最終的な実行時間は以下のようになりました。(数字はあくまで参考値です)

オリジナル
Step1
Step2
Step3
Step4
Step5
Step6

1m4.78s
0m23.94s
0m11.72s
0m7.87s
0m4.82s
0m2.23s
0m1.01s

最初は一分以上かかっていた処理がだいたい1秒ぐらいで終わるようになりました。

ちなみにgo言語を使って同じプログラムを組んでみたら1.3秒ぐらいでした。まあgoの方は雑なコードですし、どんな実装するかにもよるかと思いますが、pythonでgoのようなコンパイル言語を超える実行速度を出せるというのは中々すごいですね。

あと、やってて思ったのですが、こういうベンチ取りながらコードを高速化していく作業って割と楽しいので、新人教育とかでやってみてもおもしろそうかなと思いました。初期コード自体はそんなに難しくないですし、もうちょっと簡単な問題にすれば最初のハードルも低そうです。僕はやったことないので想像ですが、競技プログラミングとかもこんな感じで最適化していくんですかね?

以上、正直言って役に立つ記事になっているかはかなり疑問ですが、退屈しのぎにでもなれば幸いです。


参考URL

パズドラ 運営サイト | パズル&ドラゴンズ - (パズドラ)公式サイト

【NumPy入門】使い方・サンプル集

NumPy v1.15 Manual

Pythonでリストをflattenする方法まとめ

どうすればPythonをJuliaと同じくらい速く動かせるのか? : 様々なやり方で計算の高速化を図る