はじめに
どうも、y-tetsuです。
これまでに2つの記事を通して、Pythonでライフゲームを作ってきました。
前回は、"標準Pythonのみ"という条件で処理の高速化を試してみたのですが、すぐに限界が来てしまいました。そろそろ違う取り組みを試してもいい頃かなと思いまして、今回は外部ライブラリを導入してコードの改良をやってみたいと思います。
どうやらQiitaの記事の中には、便利なライブラリを用いた高速なライフゲームの実装例が、いくつかあるようです。
とても面白そうなので、これらを参考にして、私もNumPyとOpenCVに触れてみようと思います。(ついでに、どれくらい高速化できるのかも確かめてみたいと思います)
おさらい
本題へ入る前に、これまで作ってきたものの"おさらい"を、軽くしておきたいと思います。
ライフゲームとは
ライフゲームの仕様については、以下のWikipediaに詳しく書かれています。
簡単に言うと、ライフゲームとは格子状の世界の中で生死を繰り返すセルの状態をシミュレーションするものです。セルは"生き物"の事で、格子のマス目の上で生きているのか死んでいるのかを表しています。
シミュレーションのルールは以下のみのシンプルなものです。
- 生きたセルの周囲のどこかに、生きたセルが2つか3ついる場合、そのセルは生き続ける(それ以外は死んでしまう)
- 死んだセルの周囲のどこかに、生きたセルが3ついる場合、そこに新しいセルが生まれる
代表的な事例を図にまとめたものが以下になります。それぞれ、黒いマスが生きたセル、白いマスが死んだセル、黄色い枠は注目するセルである事を表しています。
このルールを元に、前の世代から次の世代を次々に計算することで、鮮やかなライフゲームの世界が繰り広げられることになります。生きたセルの微妙な配置次第で、予想もつかない様々な過程が生まれてくるのが、ライフゲームの非常に面白いところです。
ちょっと変わった仕様の実装
本記事では筆者の思い付きで、通常のライフゲームのルールにアレンジを加えています。今回は、以下の仕様も含めて実装したいと思います。
名称 | 内容 |
---|---|
トーラス世界 | 上下と左右の端っこがくっついた形状の世界(3次元で表すとドーナツ型の表面上の世界) |
色進化 | 新しくセルが生まれる際、周囲の生きたセルの色に応じて、新たな色になる |
寿命 | 生き続けているセルが、年を取るようになり、時間とともに見た目が変化し、やがて寿命を迎えるようになる |
詳細は、過去に書いた以下の記事を参照して下さい。
https://qiita.com/y-tetsu/items/264d263717f933ad3cb2
デモ
ライフゲームのサンプルとして、「グライダー銃」を紹介します。通常のルールの元では以下のような感じになります。(生きたセルは"■"で表されています)
左右に動く何か大きな物体がぶつかり合った結果、右下に向けて、小さな飛行機のようなもの(グライダーと呼ばれています)が、次々に発射されるパターンです。(ずっとこれが続きます)
そして、このグライダー銃に、アレンジした仕様を追加したものが以下になります。
上下左右が繋がった世界の中で、新しく生まれるセルには色が付き、かつ寿命で形状(セルの見た目)が変わりつつ、世代交代を繰り返します。通常のルールとは全く別の様相が展開されるようになります。
このように、いつもとはちょっとルールが変わったライフゲームを、標準PythonのみのコードからNumPyとOpenCVを使ったものへと、作り変えたいと思います。
前回から、新たに"-c2"オプションを追加しました。これにより、パステル調の色彩が設定できるようになりました。上記のgifも"-c2"オプションの実行結果となっております。
ライブラリの導入
NumPyやOpenCVを使うと、ライフゲームの処理を高速化できるようです。これらのライブラリがどういったものか、簡単に触れておきます。
NumPyとは
本記事では、ライフゲームの世界を2次元のリスト(配列)で表現しています。NumPyを使う事によって、この配列のデータ処理を高速化することが期待できそうです。
(Copilotより)
NumPy(ナムパイ)は、Pythonでの多次元配列を扱う数値計算ライブラリです。
主に科学技術計算やWebデータの分析などで大規模なデータを効率的に扱う際に利用されます。
具体的には、以下のような用途で活用されます。
- データの計算や統計値計算
- データの異常度計算
- 画像処理
- 機械学習
NumPyの特徴は、高速なデータ処理が可能であることです。
PythonはCやJavaに比べて遅いため、NumPyの配列オブジェクト「ndarray」を使用することで、
効率的な数値計算が実現できます。
OpenCVとは
本来OpenCVは画像処理用のライブラリですが、ライフゲームの世界も2次元の"画像"とみなしてしまえば、これを利用できそうです。NumPy配列と連携して、画像に特化した効率の良い機能(関数)の活用が期待できそうです。
(Copilotより)
OpenCV(Open Source Computer Vision Library)は、
画像処理や画像認識のためのオープンソースのライブラリです。
PythonやC++、Java、Android、Node.jsなどで使用できます。
このライブラリは、画像や動画から情報を抽出し、処理するための豊富な機能を提供しています。
具体的には、以下のようなことができます。
- 画像の読み込み・表示・保存: 画像を読み込んで表示したり、保存したりできます。
- 画像のトリミング・リサイズ: 画像の一部を切り取ったり、サイズを変更したりできます。
- グレースケール化: カラー画像を白黒画像に変換できます。
- 画像の2値化: 画像を白と黒の2色に変換できます。
- 顔認識や物体検出: 顔の検出や物体の追跡なども簡潔なコードで実現できます。
OpenCVは、AI開発や画像処理において非常に重要なツールであり、
Pythonで利用されることの多いライブラリです。
環境の準備
コードを動かす場合、あらかじめ以下を実行して、各ライブラリをインストールしておいて下さい。
> pip install numpy
> pip install opencv-python
筆者が動作確認した時の環境は、以下の通りです。
- Windows10
- ディスプレイサイズ 1366x768
- プロセッサ 1.6GHz
- メモリ 4.00GB
- Python 3.9
- numpy 1.21.1
- opencv-python 4.10.0.84
コードを作り変える
これからライブラリを使ってコードを作り変えるにあたり、今回は「現在の世代(セルの状態)を元に、次の世代を計算する部分」に特化して説明したいと思います。
変更前
次の世代を計算する処理は、_update
メソッドで実施するようにしています。ここで計算した次の世代を、後の処理でコンソール上に描画するという流れです。これを繰り返すことで、ライフゲームの世界が動き出します。
★クリックで展開★ 変更前のコード
class GameOfLife:
def _update(self):
max_y, max_x, dirs = self.y, self.x, self.dirs
world, colors, torus = self.world, self.colors, self.torus
mortal, ages = self.mortal, self.ages
pre_world = [[x for x in row] for row in world]
pre_colors = [[x for x in row] for row in colors]
for y in range(max_y):
for x in range(max_x):
next_cell, alive, color = 0, 0, 0
# check around
for dx, dy in dirs:
next_x, next_y = x + dx, y + dy
calc = False
if torus:
# if torus option is enabled,
# top and bottom, left and right are connected.
next_x, next_y = next_x % max_x, next_y % max_y
calc = True
else:
calc = (0 <= next_x < max_x) and (0 <= next_y < max_y)
if calc:
next_color = pre_colors[next_y][next_x]
if next_color > color:
color = next_color
if pre_world[next_y][next_x]:
alive += 1
# judge next cell
if pre_world[y][x] and (alive == 2 or alive == 3):
next_cell = 1
elif alive == 3:
colors[y][x] = color + 1
next_cell = 1
if next_cell:
if mortal:
# if mortal option is enabled, living cells are aging.
if pre_world[y][x]:
next_cell = self._aging(x, y, next_cell)
else:
ages[y][x] = 1
else:
ages[y][x], colors[y][x] = 0, 0
# update world
world[y][x] = next_cell
self.step += 1
def _aging(self, x, y, cell):
self.ages[y][x] += 1
for index, lifespan in enumerate(self.lifespans):
if self.ages[y][x] < lifespan:
cell = index
break
else:
# lifespan has expired.
self.ages[y][x] = 0
cell = 0
return cell
セルの生死
ライフゲームの世界は2次元リスト(self.world
)で保持しています。まず、xとyの2重ループにて各セルを探索します。そして周囲8方向(self.dirs
)のセルの状態をチェックし、生きたセルの個数に応じて、生き延びるセルと新たに生まれるセルを計算します。
次の世代が求まると、描画用にライフゲームの世界(self.world
)を更新します。
class GameOfLife:
def __init__(self, ...):
:
:
self.x, self.y = x, y
self.dirs = [
(-1, -1), (0, -1), (1, -1),
(-1, 0), (1, 0),
(-1, 1), (0, 1), (1, 1),
]
def _update(self):
max_y, max_x, dirs = self.y, self.x, self.dirs
:
:
for y in range(max_y):
for x in range(max_x):
next_cell, alive, color = 0, 0, 0
# check around
for dx, dy in dirs:
next_x, next_y = x + dx, y + dy
↑のあたりで、xとyの二重ループで2次元の各セルを順にめぐり、さらにdirsのループにて8方向の探索を進めていきます。
class GameOfLife:
def _update(self):
:
:
pre_world = [[x for x in row] for row in world]
for y in range(max_y):
for x in range(max_x):
next_cell, alive, color = 0, 0, 0
:
:
# judge next cell
if pre_world[y][x] and (alive == 2 or alive == 3):
next_cell = 1
elif alive == 3:
colors[y][x] = color + 1
next_cell = 1
:
:
# update world
world[y][x] = next_cell
↑のあたりで、周囲の生きたセル数を元に、次に生き延びるセルと新たに生まれるセルを求めています。
トーラス世界
上下左右をくっつける処理は以下の部分で計算しています。
class GameOfLife:
def _update(self):
:
:
for y in range(max_y):
for x in range(max_x):
next_cell, alive, color = 0, 0, 0
# check around
for dx, dy in dirs:
next_x, next_y = x + dx, y + dy
calc = False
if torus:
# if torus option is enabled,
# top and bottom, left and right are connected.
next_x, next_y = next_x % max_x, next_y % max_y
(x, y)
の座標をそれぞれの最大値でmod(%
)を取り、端を超えていたら(最大値より大きい or 0より小さい場合)、反対側の端に移動させて循環するようにしています。
色進化
色進化は、セルが生まれる際、周囲で一番進化した(値の大きい)ものから、さらに1歩進化する(1足す)処理により実現しています。
class GameOfLife:
def _update(self):
:
:
pre_colors = [[x for x in row] for row in colors]
for y in range(max_y):
for x in range(max_x):
next_cell, alive, color = 0, 0, 0
# check around
for dx, dy in dirs:
next_x, next_y = x + dx, y + dy
:
:
if calc:
next_color = pre_colors[next_y][next_x]
if next_color > color:
color = next_color
:
:
# judge next cell
if pre_world[y][x] and (alive == 2 or alive == 3):
next_cell = 1
elif alive == 3:
colors[y][x] = color + 1
これまでは、現世代で進化したセルを逐次反映させて、次の世代を計算していましたが、今回記事からは仕様を変更しています。
次世代の計算の際に逐次反映をやめて、現世代のみから次世代を計算するように変更しています。以降で出てくる、フィルタリングとの相性を鑑みての対応となります。
寿命
寿命の処理は、前世代から生き続けたものに対して実施しています。_aging
メソッドにて、前回から生き続けた場合は年齢(ages
)を加算し、寿命(lifespans
)で定義した既定の年齢に応じて見た目の形状(marks
)を変えています。また、最大値を超えると寿命を全うするよう処理しています。
self.alives = [(' ', 0), (self.alive, 10), ('□', 30), ('・', 60)]
self.lifespans = [alive[1] for alive in self.alives]
self.marks = [alive[0] for alive in self.alives]
class GameOfLife:
def _update(self):
:
:
for y in range(max_y):
for x in range(max_x):
:
:
if next_cell:
if mortal:
# if mortal option is enabled, living cells are aging.
if pre_world[y][x]:
next_cell = self._aging(x, y, next_cell)
def _aging(self, x, y, cell):
self.ages[y][x] += 1
for index, lifespan in enumerate(self.lifespans):
if self.ages[y][x] < lifespan:
cell = index
break
else:
# lifespan has expired.
self.ages[y][x] = 0
cell = 0
return cell
地味ですが、通常なら固定となるパターンが、寿命によりやがて消えていくという動きが加わります。
変更後
続いて、NumPyとOpenCVを使って書き換えたコードが以下になります。
★クリックで展開★ 変更後のコード
class GameOfLife:
def __init__(self):
# numpy & open-cv
self.world = np.array(self.world, dtype=np.uint8)
self.ages = np.copy(self.world)
self.colors = np.array(self.colors, dtype=np.uint8)
self.kernel = np.array(
[[1, 1, 1], [1, 0, 1], [1, 1, 1]], dtype=np.uint8)
def _update(self):
# get previous
pre_world = np.copy(self.world)
ones = np.ones_like(pre_world, dtype=np.uint8)
pre_cells = (pre_world >= 1) * ones
pre_colors = np.copy(self.colors)
if self.torus:
# wrap around
pre_cells = np.pad(pre_cells, 1, 'wrap')
pre_colors = np.pad(pre_colors, 1, 'wrap')
# get alive and born
kernel = self.kernel
border_type = cv2.BORDER_ISOLATED
around_cells = cv2.filter2D(pre_cells, -1,
kernel, borderType=border_type)
max_colors = cv2.dilate(pre_colors, kernel, borderType=border_type)
if self.torus:
# remove around
pre_cells = pre_cells[1:-1, 1:-1]
pre_colors = pre_colors[1:-1, 1:-1]
around_cells = around_cells[1:-1, 1:-1]
max_colors = max_colors[1:-1, 1:-1]
alive = (pre_cells == 1) * ((around_cells == 2) | (around_cells == 3))
born = (pre_cells == 0) * (around_cells == 3)
# update next cells
next_cells = alive | born
self.world = next_cells * ones
self.colors = alive * pre_colors + born * (max_colors + ones)
if self.mortal:
# aging
max_index = len(self.lifespans) - 1
for index, lifespan in enumerate(reversed(self.lifespans)):
aging_cells = np.where(alive & (self.ages < lifespan))
self.world[aging_cells] = max_index - index
self.ages[np.where(next_cells)] += 1
# check if expiring lifespan
max_lifespan = self.lifespans[-1]
self.world[np.where(self.ages >= max_lifespan)] = 0
self.ages[np.where(self.world == 0)] = 0
self.step += 1
セルの生死
これまでリストで保持していたself.world
をNumPy配列に置き換えました。そして周囲の生きたセルのカウントは、OpenCVのフィルタリングを使って実現するように変更しています。
class GameOfLife:
def __init__(self):
# numpy & open-cv
self.world = np.array(self.world, dtype=np.uint8)
self.kernel = np.array(
[[1, 1, 1], [1, 0, 1], [1, 1, 1]], dtype=np.uint8)
def _update(self):
# get previous
pre_world = np.copy(self.world)
ones = np.ones_like(pre_world, dtype=np.uint8)
pre_cells = (pre_world >= 1) * ones
:
:
# get alive and born
kernel = self.kernel
border_type = cv2.BORDER_ISOLATED
around_cells = cv2.filter2D(pre_cells, -1,
kernel, borderType=border_type)
:
:
alive = (pre_cells == 1) * ((around_cells == 2) | (around_cells == 3))
born = (pre_cells == 0) * (around_cells == 3)
# update next cells
next_cells = alive | born
self.world = next_cells * ones
NumPy配列を使う事で、配列同士の演算(条件に合うものの抽出や、乗算など)を簡単に表現できるようになります。変更前のコードにあった「3重ループのいかつい構造」はすっきり取り払われました。
周囲のセル数のカウントには、cv2.filter2D
という画像のフィルタリングを行う関数を利用しています。この関数の仕組みを簡単に説明しておきたいと思います。
filter2D関数について
filter2D
関数は、xとyの二次元の行列(画像データ)に対して、フィルタリングするためのものです(畳み込みとも呼ばれます)。フィルタリングによって、画像をぼかしたりエッジのみを抽出したり、といった画像処理が行えます。
使い方は以下のとおりです。(今回扱ったものに絞っています)
dst = cv2.filter2D(src, ddepth, kernel[, borderType])
引数 | 用途 |
---|---|
src | 入力画像(NumPy配列) |
ddepth | 出力画像の型を指定する。-1の場合、入力画像と同じ型。 |
kernel | カーネル(NumPy配列) |
borderType | 画像端の境界の扱い方を指定する。今回はcv2.BORDER_ISOLATEDを指定し、端を超える部分は無視する設定としました。 |
詳しくは、以下のサイトが参考になりました。
https://pystyle.info/opencv-filtering/
セル数のカウントについて
フィルタリングにはカーネル(kernel
)と呼ばれるものを用います。今回は中心が"0"、周囲8マスを"1"としたものを用意しました。filter2D
を実行する際に、このカーネルを使ってフィルタリングすることで、各セルの周囲のセル数が求まります。
カーネルの見た目は以下の通りです。
例として、次の2次元配列の入力について考えてみます。
"1"のオレンジ色の部分が、生きたセルを表しています。
フィルタリングは、入力された配列(input)の各要素(セル)に対して、カーネルを適用し処理していきます。イメージを以下に示します。
上図は"input"の赤色のセルに対して、フィルタリングを行う場合を表しています。まず対象のセルの周りから、カーネルと同じサイズの領域(黄色と赤色の部分)を考えます。
ここに対して、単純に以下を実施します。
- 黄色と赤色の部分の値とカーネルの値を、ちょうど重なる場所同士で掛ける
- 1.の結果をすべて足し合わせて、対象セルのフィルタリング結果とする
- 1.と2.を全セルに対して行い、"input"と同じサイズの結果を得る
前の図で"target"としていたセルの結果を赤色で強調しています。この場合、対象セルの周囲には2つの生きたセルがあるという事を示しています。
ちなみに、一番左上のセルを計算する場合など、カーネルが端からはみ出す部分については、今回は無視する(足し合わせない)設定としています。
今回処理した目的は画像のぼかしなどではなくセルのカウントですが、周囲8マスのセルの数がフィルタリングによって、正しく求められる事が確認できました。
トーラス世界
上下左右がくっついたトーラス世界の実装は、以下の3ステップで実現しています。
- 入力の外周に反対側の端の値をパディングする(付け足す)
- 各セルの周囲の計算を行う
- パディングした部分を取り除く
パディングにはnp.pad
を用いています。wrap
オプションを指定する事で、反対側の端の値をパディングするようにしています。
最外周のパディング部分は、一つ内側のセルの周囲を正しく計算するために入れたものなので、用が済んだらすぐに除去しています。当初は、filter2D
の境界のオプション指定(borderType
)でラップしてくれるものがないか試したのですが、現状は対応していないようで諦めました。
class GameOfLife:
def _update(self):
# get previous
pre_world = np.copy(self.world)
ones = np.ones_like(pre_world, dtype=np.uint8)
pre_cells = (pre_world >= 1) * ones
:
:
if self.torus:
# wrap around
pre_cells = np.pad(pre_cells, 1, 'wrap')
:
:
# get alive and born
kernel = self.kernel
border_type = cv2.BORDER_ISOLATED
around_cells = cv2.filter2D(pre_cells, -1,
kernel, borderType=border_type)
:
:
if self.torus:
# remove around
pre_cells = pre_cells[1:-1, 1:-1]
around_cells = around_cells[1:-1, 1:-1]
:
:
alive = (pre_cells == 1) * ((around_cells == 2) | (around_cells == 3))
born = (pre_cells == 0) * (around_cells == 3)
np.ones_like
は既存の配列と同じサイズの、要素が全て1となっている配列を取得する関数です。
色進化
色進化は新しくセルが生まれる際に、周囲の色番号の最大値を取得し、さらに1を足す事で実現しています。
最大値の取得にはcv2.dilate
関数を使いました。
class GameOfLife:
def _update(self):
# get previous
pre_colors = np.copy(self.colors)
:
:
# get alive and born
kernel = self.kernel
border_type = cv2.BORDER_ISOLATED
around_cells = cv2.filter2D(pre_cells, -1,
kernel, borderType=border_type)
max_colors = cv2.dilate(pre_colors, kernel, borderType=border_type)
:
:
alive = (pre_cells == 1) * ((around_cells == 2) | (around_cells == 3))
born = (pre_cells == 0) * (around_cells == 3)
:
:
# update next cells
self.colors = alive * pre_colors + born * (max_colors + ones)
dilateは膨張(dilation)を表しているそうです。画像処理では、輪郭の線を太くするなどの使い方があるそうです。詳細は以下が参考になりました。
https://pystyle.info/opencv-morpology-operation/
寿命
寿命の処理は、大きく分けて2つあります。1つは、生きたセルの年齢を更新し、その中から既定の値に達したものの形状(見た目)を変える部分です。そしてもう1つが、寿命が尽きたかどうかを判定する部分です。
形状 | 年齢 |
---|---|
■ | 1以上、10未満 |
□ | 10以上、30未満 |
・ | 30以上、60未満 |
' ' | 60以上(寿命を全う) |
class GameOfLife:
def _update(self):
:
:
if self.mortal:
# aging
max_index = len(self.lifespans) - 1
for index, lifespan in enumerate(reversed(self.lifespans)):
aging_cells = np.where(alive & (self.ages < lifespan))
self.world[aging_cells] = max_index - index
self.ages[np.where(next_cells)] += 1
# check if expiring lifespan
max_lifespan = self.lifespans[-1]
self.world[np.where(self.ages >= max_lifespan)] = 0
self.ages[np.where(self.world == 0)] = 0
np.where
は、条件を満たす要素のインデックス(位置)を取得するために使いました。以下のサイトが参考になりました。
https://note.nkmk.me/python-numpy-where/
効果のほどは?
ここまでで、NumPyとOpenCVを使ったコードの書き換えがひと通り終わりました。はたして、このコードは本当に高速化されているのでしょうか。
今回はline_profiler
を使って、_update
メソッドの処理時間を計測してみます。
前回記事ではcProfile
を取り上げましたが、これらのプロファイリングツールの特徴は以下のようになります。
ツール | 特徴 |
---|---|
cProfile | 関数単位で処理時間を計測できる |
line_profiler | 関数のコード行ごとに処理時間を計測できる(その分cProfileよりも、結果が出るまでに時間がかかる) |
これらの特徴から、まずcProfile
でおおよそのあたりを付けた後に、line_profiler
で詳細にプロファイリングする、といった使い方がおススメです(受け売りです)。
Pythonのプロファイリングについては下記の記事を参考にしました。
https://qiita.com/meshidenn/items/4dbde22d1e7a13a255bb
前準備
先ずは以下で、line_profiler
をインストールします。
pip install line_profiler
次に計測用に下記のコードを足します。
- プロファイルを実施しない場合にもソースが動くよう対策を入れる
-
@profile
デコレータを計測したいメソッドにつける
# for lineprofiler
try:
profile
except NameError:
# dummy for no profile
def profile(func):
return func
class GameOfLife:
:
:
@profile
def _update(self):
:
:
プロファイルを行わない場合、@profile
が未定義となってエラーが出てしまうので、冒頭の処理で対策しています。
次に、プロファイル時に動かすコードを用意します。
from game_of_life import GameOfLife
if __name__ == '__main__':
setting = {}
setting['sample'] = 'glider-gun'
setting['torus'] = True
setting['mortal'] = True
setting['color2'] = True
setting['wait'] = 0.0
GameOfLife(**setting).start()
冒頭の色付きグライダー銃を動かした時の、パフォーマンスを計測するためのコードです。
以上で準備完了です。それでは、早速プロファイルしてみます。
プロファイル結果
実行方法
以下のコマンドで実行できます。
kernprof -l -v kernprof_glider-gun.py
オプション名 | 内容 |
---|---|
-l | 行単位のプロファイリングを行う |
-v | 詳細な出力を行う |
変更前の結果
変更前は、トータルで13秒かかる結果となりました。
Wrote profile results to kernprof_update.py.lprof
Timer unit: 1e-06 s
Total time: 13.487 s
File: C:\game_of_life.py
Function: _update at line 139
Line # Hits Time Per Hit % Time Line Contents
==============================================================
139 @profile
140 def _update(self):
141 149 991.9 6.7 0.0 max_y, max_x, dirs = self.y, self.x, self.dirs
142 149 519.1 3.5 0.0 world, colors, torus = self.world, self.colors, self.torus
143 149 432.3 2.9 0.0 mortal, ages = self.mortal, self.ages
144
145 149 115117.3 772.6 0.9 pre_world = [[x for x in row] for row in world]
146 149 102762.4 689.7 0.8 pre_colors = [[x for x in row] for row in colors]
147 3576 4869.8 1.4 0.0 for y in range(max_y):
148 133653 154227.1 1.2 1.1 for x in range(max_x):
149 130226 143334.8 1.1 1.1 next_cell, alive, color = 0, 0, 0
150 # check around
151 1172034 1429410.7 1.2 10.6 for dx, dy in dirs:
152 1041808 1310531.5 1.3 9.7 next_x, next_y = x + dx, y + dy
153 1041808 980774.6 0.9 7.3 calc = False
154 1041808 995379.6 1.0 7.4 if torus:
155 # if torus option is enabled,
156 # top and bottom, left and right are connected.
157 1041808 1356260.8 1.3 10.1 next_x, next_y = next_x % max_x, next_y % max_y
158 1041808 1039338.5 1.0 7.7 calc = True
159 else:
160 calc = (0 <= next_x < max_x) and (0 <= next_y < max_y)
161 1041808 1013964.6 1.0 7.5 if calc:
162 1041808 1247507.3 1.2 9.2 next_color = pre_colors[next_y][next_x]
163 1041808 1161610.5 1.1 8.6 if next_color > color:
164 42524 43424.6 1.0 0.3 color = next_color
165 1041808 1315183.5 1.3 9.8 if pre_world[next_y][next_x]:
166 71352 82620.5 1.2 0.6 alive += 1
167
168 # judge next cell
169 130226 166853.8 1.3 1.2 if pre_world[y][x] and (alive == 2 or alive == 3):
170 5244 6037.9 1.2 0.0 next_cell = 1
171 124982 146520.6 1.2 1.1 elif alive == 3:
172 3756 5634.9 1.5 0.0 colors[y][x] = color + 1
173 3756 3953.0 1.1 0.0 next_cell = 1
174 130226 161653.1 1.2 1.2 if next_cell:
175 9000 9633.1 1.1 0.1 if mortal:
176 # if mortal option is enabled, living cells are aging.
177 9000 11765.2 1.3 0.1 if pre_world[y][x]:
178 5244 83312.2 15.9 0.6 next_cell = self._aging(x, y, next_cell)
179 else:
180 3756 5088.5 1.4 0.0 ages[y][x] = 1
181 else:
182 121226 206192.3 1.7 1.5 ages[y][x], colors[y][x] = 0, 0
183
184 # update world
185 130226 181440.6 1.4 1.3 world[y][x] = next_cell
186 149 619.6 4.2 0.0 self.step += 1
細かな結果の見方は、以下の表を参考にして下さい。
列名 | 内容 |
---|---|
Line | コードの行番号 |
Hits | 実行回数 |
Time | 累計の実行時間(us) |
Per Hit | 1回あたりの実行時間(us) |
% Time | 全体に占める実行時間の割合(%) |
Line Contents | Pythonのコード |
変更後の結果
そして変更後は?…というと、…なんと、トータルで0.3秒という✨圧倒的な高速化✨が達成されていました!!
Wrote profile results to kernprof_update.py.lprof
Timer unit: 1e-06 s
Total time: 0.367252 s
File: C:\game_of_life.py
Function: _update at line 143
Line # Hits Time Per Hit % Time Line Contents
==============================================================
143 @profile
144 def _update(self):
145 # get previous
146 149 9270.8 62.2 2.5 pre_world = np.copy(self.world)
147 149 15848.1 106.4 4.3 ones = np.ones_like(pre_world, dtype=np.uint8)
148 149 10158.3 68.2 2.8 pre_cells = (pre_world >= 1) * ones
149 149 4321.2 29.0 1.2 pre_colors = np.copy(self.colors)
150
151 149 338.4 2.3 0.1 if self.torus:
152 # wrap around
153 149 92527.3 621.0 25.2 pre_cells = np.pad(pre_cells, 1, 'wrap')
154 149 80789.3 542.2 22.0 pre_colors = np.pad(pre_colors, 1, 'wrap')
155
156 # get alive and born
157 149 380.0 2.6 0.1 kernel = self.kernel
158 149 449.9 3.0 0.1 border_type = cv2.BORDER_ISOLATED
159 298 17840.1 59.9 4.9 around_cells = cv2.filter2D(pre_cells, -1,
160 149 194.9 1.3 0.1 kernel, borderType=border_type)
161 149 7617.9 51.1 2.1 max_colors = cv2.dilate(pre_colors, kernel, borderType=border_type)
162
163 149 369.2 2.5 0.1 if self.torus:
164 # remove around
165 149 820.3 5.5 0.2 pre_cells = pre_cells[1:-1, 1:-1]
166 149 572.7 3.8 0.2 pre_colors = pre_colors[1:-1, 1:-1]
167 149 501.9 3.4 0.1 around_cells = around_cells[1:-1, 1:-1]
168 149 483.2 3.2 0.1 max_colors = max_colors[1:-1, 1:-1]
169
170 149 14521.6 97.5 4.0 alive = (pre_cells == 1) * ((around_cells == 2) | (around_cells == 3))
171 149 8040.1 54.0 2.2 born = (pre_cells == 0) * (around_cells == 3)
172
173 # update next cells
174 149 1210.8 8.1 0.3 next_cells = alive | born
175 149 3307.5 22.2 0.9 self.world = next_cells * ones
176 149 10054.5 67.5 2.7 self.colors = alive * pre_colors + born * (max_colors + ones)
177
178 149 419.4 2.8 0.1 if self.mortal:
179 # aging
180 149 630.4 4.2 0.2 max_index = len(self.lifespans) - 1
181 745 2964.2 4.0 0.8 for index, lifespan in enumerate(reversed(self.lifespans)):
182 596 36242.3 60.8 9.9 aging_cells = np.where(alive & (self.ages < lifespan))
183 596 11570.3 19.4 3.2 self.world[aging_cells] = max_index - index
184 149 11702.7 78.5 3.2 self.ages[np.where(next_cells)] += 1
185
186 # check if expiring lifespan
187 149 456.3 3.1 0.1 max_lifespan = self.lifespans[-1]
188 149 9195.9 61.7 2.5 self.world[np.where(self.ages >= max_lifespan)] = 0
189 149 13833.9 92.8 3.8 self.ages[np.where(self.world == 0)] = 0
190
191 149 618.9 4.2 0.2 self.step += 1
結果の比較
Pythonの2次元リストの実装を、NumPyとOpenCVを使った実装に変えることで、13秒→0.3秒と43倍も高速化する結果となりました。
変更前のコードは、3重ループの中の各処理の実行回数がとても多く、重くなっているようでした。
一方、変更後の方はトーラス世界用の外周のパディングに、最も(50%近く)時間がかかっている結果でした。ですが、NumPyの行列演算やOpenCVのフィルタ処理は圧倒的に軽く、各コード行のTime
の列の累積実行時間は、全体的に変更前と変更後で1~2桁くらい違っていました。
やはり以下の要因が、主に高速化につながったものと思われます。
- NumPy配列を使う事で、配列の型を最適化したこと(型を限定することで、Pythonに備わっている汎用性を高めるための手厚い操作を削減したこと)
- Pythonのループを使わず、NumPyやOpenCVの関数(内部はCやC++が動作)に全てを任せたこと
Pythonの高速化に関する全般的な知識については、「ハイパフォーマンスPython 第2版」という以下の書籍がとても参考になりました。
https://www.oreilly.co.jp/books/9784873119908/
コード全文
今回の最終結果のコード全文を、折りたたんで置いております。
★クリックで展開★ 最終結果のコード全文
import time
from platform import system
from ctypes import windll
from random import random
from datetime import datetime
import json
import pprint
import numpy as np
import cv2
# for lineprofiler
try:
profile
except NameError:
# dummy for no profile
def profile(func):
return func
class GameOfLife:
def __init__(self, sample=None, name='game_of_life', x=30, y=15,
world=None, max_step=None, wait=0.03, delay=0.0,
alive='■', ratio=0.5, loop=False, torus=False, mortal=False,
color=False, color2=False, json_file=None):
self.sample = sample
self.name = name
samples, colors = self._load_samples('samples.json'), None
if sample in samples:
self.name = sample
if 'world' in samples[sample]:
world = samples[sample]['world']
if 'max_step' in samples[sample]:
if max_step is None:
max_step = samples[sample]['max_step']
if 'colors' in samples[sample]:
colors = samples[sample]['colors']
if 'wait' in samples[sample]:
wait = samples[sample]['wait']
self.x, self.y = x, y
rand = False
if world is not None:
self.x, self.y = len(world[0]), len(world)
self.world = [[x for x in row] for row in world]
else:
rand = True
self.world = [
[random() < ratio for _ in range(x)] for _ in range(y)
]
self.max_step = max_step if max_step is not None else 100
self.wait = wait
self.delay = delay
self.alive = alive
self.ratio = ratio
self.loop = loop
self.torus = torus
self.mortal = mortal
self.color = color
self.color2 = color2
if json_file is not None:
rand = False
self._load(json_file)
self.step = 1
self.colors = [[0 for _ in row] for row in self.world]
if colors:
self.colors = colors
self.alives = [(' ', 0), (self.alive, 10), ('□', 30), ('・', 60)]
self.lifespans = [alive[1] for alive in self.alives]
self.marks = [alive[0] for alive in self.alives]
color_type = None
if color:
color_type = 'normal'
elif color2:
color_type = 'pastel'
self.console = Console(self.x, self.y, self.name,
self.marks, color_type)
if rand:
self._dump()
# numpy & open-cv
self.world = np.array(self.world, dtype=np.uint8)
self.ages = np.copy(self.world)
self.colors = np.array(self.colors, dtype=np.uint8)
self.kernel = np.array(
[[1, 1, 1], [1, 0, 1], [1, 1, 1]], dtype=np.uint8)
def start(self):
try:
self.console.setup()
while True:
self.console.display(self.world, self.step, self.colors)
if self.step == self.max_step:
if not self.loop:
# if loop option is enabled, game_of_life is never end.
break
if self.step == 1:
time.sleep(self.delay)
self._update()
self._wait()
except KeyboardInterrupt:
return
finally:
self.console.teardown()
def _load_samples(self, samples_file):
samples = {}
try:
with open(samples_file, 'r') as f:
samples = json.load(f)
except FileNotFoundError:
pass
return samples
def _load(self, json_file):
try:
with open(json_file, 'r') as f:
settings = json.load(f)
self.name = settings['name']
self.x = settings['x']
self.y = settings['y']
self.world = settings['world']
self.max_step = settings['step']
self.wait = settings['wait']
self.delay = settings['delay']
self.alive = settings['alive']
self.ratio = settings['ratio']
self.loop = settings['loop']
self.torus = settings['torus']
self.mortal = settings['mortal']
self.color = settings['color']
self.color2 = settings['color2']
except FileNotFoundError:
pass
@profile
def _update(self):
# get previous
pre_world = np.copy(self.world)
ones = np.ones_like(pre_world, dtype=np.uint8)
pre_cells = (pre_world >= 1) * ones
pre_colors = np.copy(self.colors)
if self.torus:
# wrap around
pre_cells = np.pad(pre_cells, 1, 'wrap')
pre_colors = np.pad(pre_colors, 1, 'wrap')
# get alive and born
kernel = self.kernel
border_type = cv2.BORDER_ISOLATED
around_cells = cv2.filter2D(pre_cells, -1,
kernel, borderType=border_type)
max_colors = cv2.dilate(pre_colors, kernel, borderType=border_type)
if self.torus:
# remove around
pre_cells = pre_cells[1:-1, 1:-1]
pre_colors = pre_colors[1:-1, 1:-1]
around_cells = around_cells[1:-1, 1:-1]
max_colors = max_colors[1:-1, 1:-1]
alive = (pre_cells == 1) * ((around_cells == 2) | (around_cells == 3))
born = (pre_cells == 0) * (around_cells == 3)
# update next cells
next_cells = alive | born
self.world = next_cells * ones
self.colors = alive * pre_colors + born * (max_colors + ones)
if self.mortal:
# aging
max_index = len(self.lifespans) - 1
for index, lifespan in enumerate(reversed(self.lifespans)):
aging_cells = np.where(alive & (self.ages < lifespan))
self.world[aging_cells] = max_index - index
self.ages[np.where(next_cells)] += 1
# check if expiring lifespan
max_lifespan = self.lifespans[-1]
self.world[np.where(self.ages >= max_lifespan)] = 0
self.ages[np.where(self.world == 0)] = 0
self.step += 1
def _wait(self):
time.sleep(self.wait)
def _dump(self):
now = datetime.now().strftime('%Y%m%d%H%M%S')
json_file = 'world' + now + '.json'
settings = {
'name': self.console.name,
'x': self.x,
'y': self.y,
'world': self.world,
'step': self.max_step,
'wait': self.wait,
'delay': self.delay,
'alive': self.alive,
'ratio': self.ratio,
'loop': self.loop,
'torus': self.torus,
'mortal': self.mortal,
'color': self.color,
'color2': self.color2,
}
with open(json_file, 'w') as f:
output = pprint.pformat(settings, indent=4,
width=1000, sort_dicts=False)
output = output.replace("'", '"')
output = output.replace('True', 'true')
output = output.replace('False', 'false')
f.write(output)
class Console:
def __init__(self, x, y, name, marks, color_type):
self.x = x
self.y = y
self.name = name
self.marks = marks
if 'win' in system().lower():
self._enable_win_escape_code()
if color_type == 'normal':
self.color_list = [
'\033[39m', # 0: default
'\033[38;2;255;255;255m', # 1: white
'\033[38;2;0;153;68m', # 2: green
'\033[38;2;230;0;18m', # 3: red
'\033[38;2;235;202;27m', # 4: yellow
'\033[38;2;145;0;0m', # 5: brown
'\033[38;2;0;160;233m', # 6: cyan
'\033[38;2;241;158;194m', # 7: pink
'\033[38;2;240;131;0m', # 8: orange
'\033[38;2;146;7;131m', # 9: purple
]
elif color_type == 'pastel':
self.color_list = [
'\033[39m', # 0: default
'\033[38;2;255;255;255m', # 1: white
'\033[38;2;168;255;211m', # 2: green
'\033[38;2;255;168;211m', # 3: red
'\033[38;2;255;255;168m', # 4: yellow
'\033[38;2;255;168;168m', # 5: brown
'\033[38;2;168;255;255m', # 6: cyan
'\033[38;2;255;168;255m', # 7: pink
'\033[38;2;255;211;168m', # 8: orange
'\033[38;2;211;168;255m', # 9: purple
]
self.title = self._setup_title()
self._get_world = self._get_uncolorized_world
if color_type is not None:
self._get_world = self._get_colorized_world
self.max_col = 18 if self.x * self.y > 3000 else 50
if y < self.max_col:
self.max_col = y
else:
self.max_col = 80
self.print_cnt = (y + self.max_col - 1) // self.max_col
def setup(self):
self._clear_screen()
self._cursor_hyde()
def teardown(self):
self._cursor_show()
def display(self, world, step, colors):
cursor_up = ''
if step > 1:
cursor_up = self._cursor_up(self.y + 4)
screens = self._get_world(world, colors)
screens[0] = cursor_up + self._get_title() + screens[0]
screens[-1] = screens[-1] + self._get_step(step)
for screen in screens:
print(screen, end='')
def _enable_win_escape_code(self):
kernel = windll.kernel32
kernel.SetConsoleMode(kernel.GetStdHandle(-11), 7)
def _clear_screen(self):
print("\033[;H\033[2J")
def _cursor_hyde(self):
print('\033[?25l', end='')
def _cursor_show(self):
print('\033[?25h', end='')
def _cursor_up(self, n):
return f'\033[{n}A'
def _setup_title(self):
name, x, y = self.name, self.x, self.y
count = x * y
max_cell_size = 6552
title = f'{name} ({x} x {y})'
if count > max_cell_size:
title += f' * warning : max_cell_size({max_cell_size}) is over! *'
return title
def _get_title(self):
return f"{self.title}\n"
def _get_uncolorized_world(self, world, _):
marks = self.marks
max_y, max_x = self.y, self.x
line = []
# setup screen for display world on cli
screens = ['┌' + ('─' * (max_x * 2)) + '┐\n']
for y in range(max_y):
cells = ''
for x in range(max_x):
cells += marks[world[y][x]]
line += ['│' + cells + '│']
for i in range(self.print_cnt):
start = i * self.max_col
end = (i + 1) * self.max_col
screens += ['\n'.join(line[start:end]) + '\n']
screens += ['└' + ('─' * (max_x * 2)) + '┘\n']
return screens
@profile
def _get_colorized_world(self, world, colors):
color_list = self.color_list
marks = self.marks
max_y, max_x = self.y, self.x
max_color = len(color_list)
line = []
# setup screen for display world on cli
screens = ['┌' + ('─' * (max_x * 2)) + '┐\n']
for y in range(max_y):
cells, pre_color = '', 0
world_y = world[y]
colors_y = colors[y]
cells = [' '] * max_x
for x, cell in enumerate(world_y):
if cell:
mark = marks[cell]
# change color if it is different from previous
if color := colors_y[x] % max_color:
if color == pre_color:
cells[x] = mark
else:
cells[x] = color_list[color] + mark
else:
if pre_color:
cells[x] = color_list[0] + mark
else:
cells[x] = mark
pre_color = color
if pre_color:
line += ['│' + ''.join(cells) + color_list[0] + '│']
else:
line += ['│' + ''.join(cells) + '│']
for i in range(self.print_cnt):
start = i * self.max_col
end = (i + 1) * self.max_col
screens += ['\n'.join(line[start:end]) + '\n']
screens += ['└' + ('─' * (max_x * 2)) + '┘\n']
return screens
def _get_step(self, step):
return f'step = {step}\n'
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(
description='A game of life simulator on CLI')
# requied arg
parser.add_argument('sample', nargs='?')
# requied optional text arg
options = (('-n', '--name'), ('-j', '--json'), ('-a', '--alive'))
for option in options:
parser.add_argument(*option)
# requied optional int arg
options = (('-x',), ('-y',), ('-s', '--step'))
for option in options:
parser.add_argument(*option, type=int)
# requied optional float arg
options = (('-w', '--wait'), ('-d', '--delay'), ('-r', '--ratio'))
for option in options:
parser.add_argument(*option, type=float)
# optional flag
options = (
('-l', '--loop'), ('-t', '--torus'), ('-m', '--mortal'),
('-c', '--color'), ('-c2', '--color2'))
for option in options:
parser.add_argument(*option, action="store_true")
args = parser.parse_args()
setting = {}
# set args if defined
options = (
('sample', args.sample), ('name', args.name), ('x', args.x),
('y', args.y), ('json_file', args.json), ('max_step', args.step),
('delay', args.delay), ('alive', args.alive), ('ratio', args.ratio))
for key, value in options:
if value:
setting[key] = value
# set args if not None
if args.wait is not None:
setting['wait'] = args.wait
# set args
options = (
('loop', args.loop), ('torus', args.torus), ('mortal', args.mortal),
('color', args.color), ('color2', args.color2))
for key, value in options:
setting[key] = value
GameOfLife(**setting).start()
コマンドプロンプトのフォントサイズを"6"にして、100x100サイズを通常のルールで動かすと以下のようになりました。(こんなに文字の小さいコマンドプロンプトを見たのは、この時が初めてでした)
おわりに
NumPyやOpenCVに触れてみて、配列の演算やフィルタリングについて少し知ることができました。特に、ループを使わずに2次元配列を一気に計算する感覚はとても新鮮でした。また、通常から少しアレンジしたライフゲームの仕様を、「どのようにライブラリを活用すれば実現できるのか?」という事を考えている時間も非常にエキサイティングでした。
今回の変更を通して、世代交代の処理を大幅に高速化できましたが、実は全体からみると描画の処理がボトルネックになっており、まだまだ改善の余地があります。筆者が求める物にたどり着くには、今しばらく頭を悩ます必要がありそうです。
それでは、NumPyとOpenCVを使った実装を記念して、仕様をアレンジしたバージョンの「シュシュポッポ列車」を御覧いただき、締めくくりたいと思います。
最後までお読みいただきありがとうございました!
なお、ソースコード全体の最新版は以下のGitHubリポジトリにございます。
つづき
その後、続きの記事を書きましたので、良かったら読んでください。
高速化も落ち着いたので、ただ眺めるだけのネタ記事となっています。