2
1

はじめに

どうも、y-tetsuです。

以前、Pythonでライフゲームを作った話を記事にした者です。

あれから1年半ほど経ちましたが、最近またライフゲームの創作欲が高まりまして、コードを少し触りました。折角なので、その内容を記事に残しておこうと思います。

今回の記事のテーマは、パフォーマンスの改善です。cProfileというPythonの標準モジュールを使って、足を引っ張っているボトルネックの部分を見つけ、そこに対策を打ち、コードをより高速化していく過程を書いてみようと思います。

できたもの

最終的に、以前のものより約5倍近く速く動作するものになりました(細かくは4.85倍)。前より大きな画面サイズで、わりとヌルヌル動くまでになったかなぁと思います。

以下は、長寿型の"どんぐり"を動かした様子です。なんとこの先、"5206世代"まで変化を続けるそうです。(なぜに"どんぐり"…)

完成形のコードは、以下のGitHubのリポジトリから取得できます。

ヨシ、いっちょ動かしてやろう!という奇特な御仁のために。

Python 3.9以上のWindows10環境で動作確認しております。

上記のGitHubリポジトリより、任意のフォルダに以下をダウンロードして下さい。

  • game_of_life.py
  • samples.json

ダウンロードしたフォルダで下記コマンドを実行すれば動くと思います。

> python game_of_life.py [オプション]

パフォーマンスを改善する

前提条件

「パフォーマンス改善」と一口に言っても、様々な手段が考えられると思います。今回は筆者の個人的な思いにより、以下の範囲で改善を試みたいと思います。

  • 使う言語はPythonのみ (CPython)
  • 画面表示は、もちろんCLI上で (CLIはコンソールと呼んだり、Windowsではコマンドプロンプトとも呼んだりします)
  • 標準モジュール以外は使わない

他の言語に比べ、処理速度の面で非力なPythonのみのロジックで、無骨なコンソール上に、どこまで鮮やかなライフゲームの世界を表現できるのか?今回はその部分を追求してみます。

動作確認に使用したPC環境のスペックは以下の通りです。

環境 スペック
ディスプレイサイズ 1366x768
OS Windows
プロセッサ 1.6GHz
メモリ 4.00GB
物理コア数 2

皆様のお使いのPCですと、性能が良すぎて、改善の効果があまり伝わらないかもしれませんが…。

元となるコード

以前の記事から、色々と手が入って形は変わっていますが、処理速度面は特に意識せずの作りっぱなしの状態です。

閉じておりますので、適宜開いてご確認下さい。Pythonに覚えのある方は、是非一緒に改善案を考えて頂けると、小躍りして喜びます。

★クリックで展開★ 改善前のコード
game_of_life.py
import time
from platform import system
from ctypes import windll
from random import random
from datetime import datetime
import json
import pprint


class GameOfLife:
    def __init__(self, sample=None, name='game_of_life', x=30, y=15,
                 world=None, max_step=None, wait=0.03, delay=0.0,
                 alive='', ratio=0.5, loop=False, torus=False, mortal=False,
                 color=False, json_file=None):
        self.sample = sample
        self.name = name
        samples, colors = self._load_samples('samples.json'), None
        if sample in samples:
            self.name = sample
            if 'world' in samples[sample]:
                world = samples[sample]['world']
            if 'max_step' in samples[sample]:
                if max_step is None:
                    max_step = samples[sample]['max_step']
            if 'colors' in samples[sample]:
                colors = samples[sample]['colors']
            if 'wait' in samples[sample]:
                wait = samples[sample]['wait']

        self.x, self.y = x, y
        rand = False
        if world is not None:
            self.x, self.y = len(world[0]), len(world)
            self.world = [[x for x in row] for row in world]
        else:
            rand = True
            self.world = [
                [random() < ratio for _ in range(x)] for _ in range(y)
            ]

        self.max_step = max_step if max_step is not None else 100
        self.wait = wait
        self.delay = delay
        self.alive = alive
        self.ratio = ratio
        self.loop = loop
        self.torus = torus
        self.mortal = mortal
        self.color = color

        if json_file is not None:
            rand = False
            self._load(json_file)

        self.dirs = [
            (-1, -1), (0, -1), (1, -1),
            (-1,  0),          (1,  0),
            (-1,  1), (0,  1), (1,  1),
        ]
        self.step = 1
        self.ages = [[x for x in row] for row in self.world]
        self.colors = [[0 for _ in row] for row in self.world]
        if colors:
            self.colors = colors

        self.alives = (('  ', 0), (self.alive, 10), ('', 30), ('', 60))
        self.lifespans = [alive[1] for alive in self.alives]
        self.marks = [alive[0] for alive in self.alives]

        self.console = Console(self.x, self.y, self.name, self.marks)

        if rand:
            self._dump()

    def start(self):
        try:
            self.console.setup()
            while True:
                self.console.display(self.world, self.step, self.colors)
                if self.step == self.max_step:
                    if not self.loop:
                        # if loop option is enabled, game_of_life is never end.
                        break
                if self.step == 1:
                    time.sleep(self.delay)
                self._update()
                self._wait()
        except KeyboardInterrupt:
            return
        finally:
            self.console.teardown()

    def _load_samples(self, samples_file):
        samples = {}
        try:
            with open(samples_file, 'r') as f:
                samples = json.load(f)
        except FileNotFoundError:
            pass
        return samples

    def _load(self, json_file):
        try:
            with open(json_file, 'r') as f:
                settings = json.load(f)
                self.name = settings['name']
                self.x = settings['x']
                self.y = settings['y']
                self.world = settings['world']
                self.max_step = settings['step']
                self.wait = settings['wait']
                self.delay = settings['delay']
                self.alive = settings['alive']
                self.ratio = settings['ratio']
                self.loop = settings['loop']
                self.torus = settings['torus']
                self.mortal = settings['mortal']
                self.color = settings['color']
        except FileNotFoundError:
            pass

    def _update(self):
        max_y, max_x = self.y, self.x
        world, colors = self.world, self.colors
        mortal, ages = self.mortal, self.ages
        new_world = []
        for y in range(max_y):
            next_cells = []
            for x in range(max_x):
                if next_cell := self._next_cell(x, y):
                    if mortal:
                        # if mortal option is enabled, living cells are ageing.
                        if world[y][x]:
                            next_cell = self._ageing(x, y, next_cell)
                        else:
                            ages[y][x] = 1
                else:
                    ages[y][x], colors[y][x] = 0, 0
                next_cells += [next_cell]
            new_world += [next_cells]
        self.world = [[x for x in row] for row in new_world]
        self.step += 1

    def _ageing(self, x, y, cell):
        self.ages[y][x] += 1
        for index, lifespan in enumerate(self.lifespans):
            if self.ages[y][x] < lifespan:
                cell = index
                break
        else:
            # lifespan has expired.
            self.ages[y][x] = 0
            cell = 0
        return cell

    def _next_cell(self, x, y):
        alive, color = self._get_around(x, y)
        if self.world[y][x]:
            return 1 if alive == 2 or alive == 3 else 0
        if alive == 3:
            if self.color:
                # if color option is enabled,
                # the new cell evolves from the most evolved cell around it.
                # immediate reflection due to diversity.
                self.colors[y][x] = color + 1
            return 1
        return 0

    def _get_around(self, x, y):
        max_x, max_y = self.x, self.y
        world, colors, torus = self.world, self.colors, self.torus
        alive, color = 0, 0
        for dx, dy in self.dirs:
            next_x, next_y = x + dx, y + dy
            if torus:
                # if torus option is enabled,
                # top and bottom, left and right are connected.
                next_x, next_y = next_x % max_x, next_y % max_y
            if (0 <= next_x < max_x) and (0 <= next_y < max_y):
                if self.color:
                    # if color option is enabled,
                    # the color of most evolved cell around it is inherited.
                    color = max(color, colors[next_y][next_x])
                if world[next_y][next_x]:
                    alive += 1
        return alive, color

    def _wait(self):
        time.sleep(self.wait)

    def _dump(self):
        now = datetime.now().strftime('%Y%m%d%H%M%S')
        json_file = 'world' + now + '.json'
        settings = {
            'name': self.console.name,
            'x': self.x,
            'y': self.y,
            'world': self.world,
            'step': self.max_step,
            'wait': self.wait,
            'delay': self.delay,
            'alive': self.alive,
            'ratio': self.ratio,
            'loop': self.loop,
            'torus': self.torus,
            'mortal': self.mortal,
            'color': self.color,
        }
        with open(json_file, 'w') as f:
            output = pprint.pformat(settings, indent=4,
                                    width=1000, sort_dicts=False)
            output = output.replace("'", '"')
            output = output.replace('True', 'true')
            output = output.replace('False', 'false')
            f.write(output)


class Console:
    def __init__(self, x, y, name, marks):
        self.x = x
        self.y = y
        self.name = name
        self.marks = marks
        if 'win' in system().lower():
            self._enable_win_escape_code()
        self.color_list = [
            '\x1b[39m',                # 0: default
            '\033[38;2;255;255;255m',  # 1: white
            '\033[38;2;0;153;68m',     # 2: green
            '\033[38;2;230;0;18m',     # 3: red
            '\033[38;2;235;202;27m',   # 4: yellow
            '\033[38;2;145;0;0m',      # 5: brown
            '\033[38;2;0;160;233m',    # 6: cyan
            '\033[38;2;241;158;194m',  # 7: pink
            '\033[38;2;240;131;0m',    # 8: orange
            '\033[38;2;146;7;131m',    # 9: purple
        ]
        self.title = self._setup_title()

    def setup(self):
        self._clear_screen()
        self._cursor_hyde()

    def teardown(self):
        self._cursor_show()

    def display(self, world, step, colors):
        if step > 1:
            self._cursor_up(self.y + 4)
        self._display_title()
        self._display_world(world, colors)
        self._display_step(step)

    def _enable_win_escape_code(self):
        kernel = windll.kernel32
        kernel.SetConsoleMode(kernel.GetStdHandle(-11), 7)

    def _clear_screen(self):
        print("\033[;H\033[2J")

    def _cursor_hyde(self):
        print('\033[?25l', end='')

    def _cursor_show(self):
        print('\033[?25h', end='')

    def _cursor_up(self, n):
        print(f'\033[{n}A', end='')

    def _setup_title(self):
        name, x, y = self.name, self.x, self.y
        count = x * y
        max_cell_size = 3025
        title = f'{name} ({x} x {y})'
        if count > max_cell_size:
            title += f' * warning : max_cell_size({max_cell_size}) is over! *'
        return title

    def _display_title(self):
        print(f"{self.title}")

    def _display_world(self, world, colors):
        color_list = self.color_list
        marks = self.marks
        max_y, max_x = self.y, self.x
        max_color = len(color_list)
        line = []
        # display world on cli
        print('' + '' * (max_x * 2) + '')
        for y in range(max_y):
            cells = ''
            for x in range(max_x):
                color = colors[y][x] % max_color
                mark = marks[world[y][x]]
                cells += self.color_list[color] + mark + self.color_list[0]
            line += ['' + cells + '']
        print('\n'.join(line))
        print('' + '' * (max_x * 2) + '')

    def _display_step(self, step):
        print(f'step = {step}')


if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser(
                description='A game of life simulator on CLI')
    parser.add_argument('sample', nargs='?')
    parser.add_argument('-n', '--name')
    parser.add_argument('-x', type=int)
    parser.add_argument('-y', type=int)
    parser.add_argument('-j', '--json')
    parser.add_argument('-s', '--step', type=int)
    parser.add_argument('-w', '--wait', type=float)
    parser.add_argument('-d', '--delay', type=float)
    parser.add_argument('-a', '--alive')
    parser.add_argument('-r', '--ratio', type=float)
    parser.add_argument('-l', '--loop', action="store_true")
    parser.add_argument('-t', '--torus', action="store_true")
    parser.add_argument('-m', '--mortal', action="store_true")
    parser.add_argument('-c', '--color', action="store_true")
    args = parser.parse_args()

    setting = {}
    if args.sample:
        setting['sample'] = args.sample
    if args.name:
        setting['name'] = args.name
    if args.x:
        setting['x'] = args.x
    if args.y:
        setting['y'] = args.y
    if args.json:
        setting['json_file'] = args.json
    if args.step:
        setting['max_step'] = args.step
    if args.wait is not None:
        setting['wait'] = args.wait
    if args.delay:
        setting['delay'] = args.delay
    if args.alive:
        setting['alive'] = args.alive
    if args.ratio:
        setting['ratio'] = args.ratio
    setting['loop'] = args.loop
    setting['torus'] = args.torus
    setting['mortal'] = args.mortal
    setting['color'] = args.color

    GameOfLife(**setting).start()

テストを用意する

パフォーマンスを改善するにあたり、コードを改良していくわけですが、新たに加えた変更にどの程度の効果があるのかを、はっきり確認できるよう前もって準備しておく必要があります。

私の目指すところとしては、なるべく大画面のコンソール上で、最も負荷のかかるフルオプションの状況において、ヌルヌル&サクサク動くライフゲームの世界を実現したいです。(もちろん、独自の機能は維持したままです)

今回は、47x47サイズで作った長寿型の"ノアの方舟"を使ってみようと思います。用意したサンプルの中ではそこそこ大きいですし、動きもありますので。

ちなみに、ウェイトを入れた素の"ノアの方舟"の様子は以下です。(ネーミング、理由は分かりませんがセンスを感じます)

この"ノアの方舟"に対して、本コードで用意している以下のオプションを設定し、最も負荷のかかる状態とします。(オプションの内容については前回の記事を参照して下さい)

オプション名 設定内容
トーラス世界 有効
寿命 有効
色進化 有効
ウェイト時間 0.0秒(なし)

そして、これを3回実行した際の「処理時間」を一つの指標とします。

これから、この指標をより短縮するよう、コードを改変していきます。

指標となる処理を実行するために、testsフォルダを用意してtest_game_of_life.pyファイルを配置します。そして、以下のテストコードを追加します。

test_game_of_life.py
import unittest
from game_of_life import GameOfLife


class TestGameOfLife(unittest.TestCase):
    def test_noahs_ark_elp(self):
        setting = {}
        setting['sample'] = 'noahs-ark'
        setting['torus'] = True
        setting['mortal'] = True
        setting['color'] = True
        setting['wait'] = 0.0
        for _ in range(3):
            GameOfLife(**setting).start()

この時、test_game_of_life.pyにはライフゲームの基本機能のユニットテストも同時に入れておりますが、ここでは割愛いたします。

本記事では、コードを書き変える際、あらかじめ追加しておいたユニットテストを全てパスする前提で、話を進めます。呼吸するように、そうできたら良いなといつも思います。(コードを変えていくら速くなっても、元の動作が狂ってしまっては、意味がありませんからね)

改善のための4つのStep

元となるコードと、その性能を示す指標が用意出来ました。これからココを起点に、次の4つのStepを通してコードの処理速度を改善していきたいと思います。

Step1 : 処理時間を計測する

冒頭でご紹介したcProfileが、ここでいよいよ登場します。コイツを使うと何がいいのかというと、一口に、どの処理にどれだけ時間がかかっているのかを、きっちり計って示してくれるところです。

「cProfileとは」というところは一旦後回しにして、兎にも角にも動かして、現状のコードの実態を早急(さっきゅう)に見てみましょう。

以下を実行します。

> python -m cProfile -s cumtime -m unittest tests.test_game_of_life.TestGameOfLife.test_noahs_ark_elp

これは、先ほど用意したテストコードを走らせて、処理時間の計測結果を出力するコマンドです。

ちなみに現状のコードは、コンソール上で以下のような動作をしています。

ウェイトなしのはずが、随分とモッサリした印象です。

そしてお待ちかね、cProfileの計測結果が出ました!(長いので一部抜粋しております)

         9940406 function calls (9939627 primitive calls) in 53.831 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     39/1    0.000    0.000   53.904   53.904 {built-in method builtins.exec}
        1    0.000    0.000   53.903   53.903 runpy.py:200(run_module)
        1    0.000    0.000   53.836   53.836 runpy.py:64(_run_code)
        1    0.000    0.000   53.836   53.836 __main__.py:1(<module>)
        1    0.000    0.000   53.836   53.836 main.py:65(__init__)
        1    0.000    0.000   53.673   53.673 main.py:246(runTests)
        1    0.000    0.000   53.673   53.673 runner.py:151(run)
      2/1    0.000    0.000   53.662   53.662 suite.py:83(__call__)
      2/1    0.000    0.000   53.662   53.662 suite.py:102(run)
        1    0.000    0.000   53.662   53.662 case.py:652(__call__)
        1    0.000    0.000   53.662   53.662 case.py:558(run)
        1    0.000    0.000   53.659   53.659 case.py:549(_callTestMethod)
        1    0.001    0.001   53.659   53.659 test_game_of_life.py:275(test_noahs_ark_elp)
        3    0.023    0.008   53.544   17.848 game_of_life.py:75(start)
      450    0.050    0.000   32.899    0.073 game_of_life.py:247(display)
      450    1.748    0.004   32.430    0.072 game_of_life.py:282(_display_world)
     2707   31.074    0.011   31.074    0.011 {built-in method builtins.print}
      447    1.544    0.003   20.598    0.046 game_of_life.py:122(_update)
   987423    1.378    0.000   18.895    0.000 game_of_life.py:156(_next_cell)
   987423   13.281    0.000   17.517    0.000 game_of_life.py:169(_get_around)
  7899487    4.236    0.000    4.237    0.000 {built-in method builtins.max}

最初に見る箇所は冒頭の下記記載になります。全体の処理時間が53.831秒だったことが分かります。これが筆者のスタート地点です。

         9940406 function calls (9939627 primitive calls) in 53.831 seconds

cProfileについては、以下の記事などを参考にしました。
https://qiita.com/meshidenn/items/4dbde22d1e7a13a255bb

本家のドキュメントを押さえておくなら以下から。
https://docs.python.org/3/library/profile.html

Step2 : ボトルネックを探す

コードのパフォーマンスを上げるためのセオリーは、そのコードが最も時間を要する箇所──つまり、ボトルネック──を見つけて、そこを改善することです。"勘"で決めた(残念ながらボトルネックじゃなかった)箇所を、物凄く頑張って10倍速くしても、全体から見ればほぼ変わらなかった…、なんてこともあり得ますからね。最も改善し甲斐のあるところに優先して手を打ちましょう。

それでは、現状のコードのボトルネックを探していきます。

cProfileが出力したログの内容で、次に見るべきポイントはざっくり以下です。

カラム名 内容
filename:lineno(function) ファイル名:行番号(実行された関数/メソッド)
ncalls コールされた回数
cumtime 関数の累積実行時間(別の関数コールも含む)
tottime その関数自体の累積実行時間

ライフゲーム全体を実行するstartメソッドを除き、個別のメソッドで最も時間のかかっている箇所は、displayメソッドのようです。計450回呼ばれ、累積で約32秒かかっています。

      450    0.050    0.000   32.899    0.073 game_of_life.py:247(display)

ここまでから、ライフゲーム全体の処理時間の53秒に対して、32秒もの大半(約6割)をこの処理が占めていると分かりました。

さらにもう少しよく見てみると、displayからは、_display_worldを経由して、print関数がたくさん呼ばれていているのですが、displayのほぼすべて(32秒中の31秒)は、print関数で占められている事も読み取れます。

      450    0.050    0.000   32.899    0.073 game_of_life.py:247(display)
      450    1.748    0.004   32.430    0.072 game_of_life.py:282(_display_world)
     2707   31.074    0.011   31.074    0.011 {built-in method builtins.print}

おそらく、今倒すべき"敵の尻尾"を掴んだかもしれません。このprint関数を呼ぶ処理に最も時間がかかっており、ボトルネックとなっていることが判明しました。

ゲームの状況を逐次描画しなければならない性質上、妥当なものである可能性も十分あります。ですが、ここを改善することが出来れば、コード全体の性能を改善する上で、最も効果の見込めるポイントであることに違いはありません!

Step3 : コードを直す

ボトルネックとなっている箇所は見つかりました。それでは速度が改善するようにコードを直しましょう。…といっても、何をどう改善すればよいのでしょうか。

cProfileの結果から、print関数に大半の時間を費やしているという事でした。それではここで、画面描画のためにprint関数を呼んでいる箇所を全て見てみましょう。

game_of_life.py
class Console:
    def display(self, world, step, colors):
        if step > 1:
            self._cursor_up(self.y + 4)
        self._display_title()
        self._display_world(world, colors)
        self._display_step(step)

    def _cursor_up(self, n):
        print(f'\033[{n}A', end='')

    def _display_title(self):
        print(f"{self.title}")

    def _display_world(self, world, colors):
        color_list = self.color_list
        marks = self.marks
        max_y, max_x = self.y, self.x
        max_color = len(color_list)
        line = []
        # display world on cli
        print('' + '' * (max_x * 2) + '')
        for y in range(max_y):
            cells = ''
            for x in range(max_x):
                color = colors[y][x] % max_color
                mark = marks[world[y][x]]
                cells += self.color_list[color] + mark + self.color_list[0]
            line += ['' + cells + '']
        print('\n'.join(line))
        print('' + '' * (max_x * 2) + '')

    def _display_step(self, step):
        print(f'step = {step}')

全部で6回のprint関数を呼ぶコードになっています。…ひょっとして、何回もprint関数を呼ぶのがまずい…、とか?

── ChatGPT曰く ──

大量のデータを出力する場合や、パフォーマンスが重要なシステムでは、print文をできるだけ1回にまとめることが推奨されます。これにより、I/O操作のオーバーヘッドを削減し、バッファリングの効率を最大化することで、全体のパフォーマンスが向上します。

な、なるほど。良い後押しを得ました。とりあえず、ほんとかどうかは分かりませんが、print関数を1回にまとめるよう修正してみますね、ChatGPT先生!

変更後のコードは以下のようになりました。

game_of_life.py
class Console:
    def display(self, world, step, colors):
        screen = ''
        if step > 1:
            screen += self._cursor_up(self.y + 4)
        screen += self._get_title()
        screen += self._get_world(world, colors)
        screen += self._get_step(step)
        print(screen, end='')

    def _cursor_up(self, n):
        return f'\033[{n}A'

    def _setup_title(self):
        name, x, y = self.name, self.x, self.y
        count = x * y
        max_cell_size = 3025
        title = f'{name} ({x} x {y})'
        if count > max_cell_size:
            title += f' * warning : max_cell_size({max_cell_size}) is over! *'
        return title

    def _get_title(self):
        return f"{self.title}\n"

    def _get_world(self, world, colors):
        color_list = self.color_list
        marks = self.marks
        max_y, max_x = self.y, self.x
        max_color = len(color_list)
        line = []
        # setup screen for display world on cli
        screen = '' + ('' * (max_x * 2)) + '\n'
        for y in range(max_y):
            cells = ''
            for x in range(max_x):
                color = colors[y][x] % max_color
                mark = marks[world[y][x]]
                cells += self.color_list[color] + mark + self.color_list[0]
            line += ['' + cells + '']
        screen += '\n'.join(line) + '\n'
        screen += '' + ('' * (max_x * 2)) + '\n'
        return screen

    def _get_step(self, step):
        return f'step = {step}\n'

print関数を呼ぶのは1回だけにしています。print関数を呼ぶまでは、表示させたい内容を、どんどん連結していく作りにしてみました。

Step4 : 効果を確認する

コードのボトルネックを見つけ、改善のための予想を立てて、それに基づきコードを変更しました。その次にする事は、効果の測定です。

今回のライフゲームのような目で楽しむようなものの場合、ついつい見た目の感覚で「あ、速くなってそう!」とやってしまいそうですね。ですが、そこはグッと堪えます。最初に決めた指標に対して、今回の結果がどうだったのかを冷静に見る必要があります。もう一度cProfileを実行し、はっきりとしたボトルネックの実行時間の変化を見定めましょう。

以下が結果です。

         9938154 function calls (9937375 primitive calls) in 49.655 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     39/1    0.001    0.000   49.727   49.727 {built-in method builtins.exec}
        1    0.000    0.000   49.726   49.726 runpy.py:200(run_module)
        1    0.000    0.000   49.657   49.657 runpy.py:64(_run_code)
        1    0.000    0.000   49.657   49.657 __main__.py:1(<module>)
        1    0.000    0.000   49.657   49.657 main.py:65(__init__)
        1    0.000    0.000   49.504   49.504 main.py:246(runTests)
        1    0.000    0.000   49.504   49.504 runner.py:151(run)
      2/1    0.000    0.000   49.488   49.488 suite.py:83(__call__)
      2/1    0.000    0.000   49.488   49.488 suite.py:102(run)
        1    0.000    0.000   49.487   49.487 case.py:652(__call__)
        1    0.000    0.000   49.487   49.487 case.py:558(run)
        1    0.000    0.000   49.484   49.484 case.py:549(_callTestMethod)
        1    0.001    0.001   49.484   49.484 test_game_of_life.py:275(test_noahs_ark_elp)
        3    0.031    0.010   49.358   16.453 game_of_life.py:75(start)
      450    0.027    0.000   29.118    0.065 game_of_life.py:247(display)
      460   27.458    0.060   27.458    0.060 {built-in method builtins.print}
      447    1.517    0.003   20.184    0.045 game_of_life.py:122(_update)
   987423    1.345    0.000   18.514    0.000 game_of_life.py:156(_next_cell)
   987423   12.963    0.000   17.169    0.000 game_of_life.py:169(_get_around)
  7899487    4.206    0.000    4.207    0.000 {built-in method builtins.max}
      450    1.632    0.004    1.651    0.004 game_of_life.py:284(_get_world)

変更前から変更後で、全体の処理時間が53.831秒から49.655秒へと、約"4.1秒"短縮されました。

(変更前)

         9940406 function calls (9939627 primitive calls) in 53.831 seconds

(変更後)

         9938154 function calls (9937375 primitive calls) in 49.655 seconds

そして、効果を狙ったprint関数の実行時間も以下の通り、31.074秒から27.5458と"3.6秒"の短縮が見られます。ボトルネックへの改善が、処理全体への改善に繋がっている事が見て取れました。(print関数のコール回数も減ってます)

(変更前)

     2707   31.074    0.011   31.074    0.011 {built-in method builtins.print}

(変更後)

      460   27.458    0.060   27.458    0.060 {built-in method builtins.print}

これにて、改善のための4Stepは無事完了です。幸い、速度向上に繋がる良い結果が出ました!

このように、狙った部分に狙った改善効果が見られて初めて、「コードが良くなった」と言えるのように思います(理由が分からず速い場合、それ以上コードを触れませんからね…)。もちろん、毎回狙った通り改善することも稀だったりしますが…。

お疲れ様でした。改善のためのStepを1サイクル回し終えました。

ですが、まだまだ私の求める速度には満たないようです。それでは次の敵を倒しに、再び最初のStepに戻りましょう。気力の続く限り、このサイクルを続けることが、求めるモノに近づく最短の道だと思います。

次なるボトルネックをつぶす!

サイクルをひと回しして改善はできたものの、print関数の実行にはまだまだ時間がかかっているようです。他に出来ることはないか?と考えたところ、1つ案が浮かんできました。

「表示させる文字列を減らせないか」

そうすれば当然print関数の実行時間は減らせるはず。ということで、変更したコードが以下になります。

game_of_life.py
class Console:
    def _get_world(self, world, colors):
        color_list = self.color_list
        marks = self.marks
        max_y, max_x = self.y, self.x
        max_color = len(color_list)
        line = []
        # setup screen for display world on cli
        screen = '' + ('' * (max_x * 2)) + '\n'
        for y in range(max_y):
            cells, pre_color = '', 0
            for x in range(max_x):
                mark = marks[world[y][x]]
                if mark == '  ':
                    # if no mark, color must be not care
                    cells += mark
                else:
                    # change color if it is different from previous
                    if color := colors[y][x] % max_color:
                        if color == pre_color:
                            cells += mark
                        else:
                            cells += color_list[color] + mark
                    else:
                        if pre_color:
                            cells += color_list[0] + mark
                        else:
                            cells += mark
                    pre_color = color
            if pre_color:
                line += ['' + cells + color_list[0] + '']
            else:
                line += ['' + cells + '']
        screen += '\n'.join(line) + '\n'
        screen += '' + ('' * (max_x * 2)) + '\n'
        return screen

変更したのは、表示文字列に色を付ける際のエスケープシーケンスを、なるべく減らすという部分です。

以前は、1セルずつ以下のようにして色付けしておりました。

[色付けのエスケープシーケンス] + [セル表示文字] + [デフォルトの色に戻すエスケープシーケンス]

これは、冗長なので、以下の対策を入れて文字数の多さを緩和しました。

  • 前回表示した色を覚えておいて、今回表示させる色と異なる場合のみ、色付けする
  • セルがない部分は空白文字なので、色付けはしない

この変更が、どの程度の効果なのか、結果は以下となりました。

         9938154 function calls (9937375 primitive calls) in 23.952 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     39/1    0.001    0.000   24.024   24.024 {built-in method builtins.exec}
        1    0.000    0.000   24.024   24.024 runpy.py:200(run_module)
        1    0.000    0.000   23.956   23.956 runpy.py:64(_run_code)
        1    0.000    0.000   23.956   23.956 __main__.py:1(<module>)
        1    0.000    0.000   23.956   23.956 main.py:65(__init__)
        1    0.000    0.000   23.800   23.800 main.py:246(runTests)
        1    0.000    0.000   23.799   23.799 runner.py:151(run)
      2/1    0.000    0.000   23.787   23.787 suite.py:83(__call__)
      2/1    0.000    0.000   23.787   23.787 suite.py:102(run)
        1    0.000    0.000   23.787   23.787 case.py:652(__call__)
        1    0.000    0.000   23.787   23.787 case.py:558(run)
        1    0.000    0.000   23.784   23.784 case.py:549(_callTestMethod)
        1    0.001    0.001   23.784   23.784 test_game_of_life.py:275(test_noahs_ark_elp)
        3    0.023    0.008   23.646    7.882 game_of_life.py:75(start)
      447    1.472    0.003   19.898    0.045 game_of_life.py:122(_update)
   987423    1.321    0.000   18.271    0.000 game_of_life.py:156(_next_cell)
   987423   12.825    0.000   16.951    0.000 game_of_life.py:169(_get_around)
  7899487    4.126    0.000    4.127    0.000 {built-in method builtins.max}
      450    0.016    0.000    3.706    0.008 game_of_life.py:247(display)
      460    3.188    0.007    3.188    0.007 {built-in method builtins.print}
      450    0.513    0.001    0.520    0.001 game_of_life.py:284(_get_world)

なんと、トータルの処理時間が、49.655秒から23.952秒と、25.7秒(約2倍)ほど速くなりました。

(変更前)

         9938154 function calls (9937375 primitive calls) in 49.655 seconds

(変更後)

         9938154 function calls (9937375 primitive calls) in 23.952 seconds

改善を狙ったprint関数の処理時間も、同じく24秒ほどの高速化が達成できています。

(変更前)

      460   27.458    0.060   27.458    0.060 {built-in method builtins.print}

(変更後)

      460    3.188    0.007    3.188    0.007 {built-in method builtins.print}

表示させていた文字列の長さも合わせてまとめると、以下となりました。

指標 変更前 変更後 削減率
全体の処理時間 49.655秒 23.952秒 -25.703秒 -51.7%
print関数の処理時間 27.458秒 3.188秒 -24.270秒 -88.3%
トータルの表示文字数 12,466,350字 2,718,894字 -9,747,456字 -78.1%

ある程度、表示文字数の削減率がprint関数の処理時間の削減率とリンクしてそうです。今回は思った以上の効果が得られました。

さらなるボトルネックをつぶす!

この調子で次のボトルネックに向かいます。先ほどの改善で現在のボトルネックが変わっています。

        1    0.001    0.001   23.784   23.784 test_game_of_life.py:275(test_noahs_ark_elp)
        3    0.023    0.008   23.646    7.882 game_of_life.py:75(start)
      447    1.472    0.003   19.898    0.045 game_of_life.py:122(_update)
   987423    1.321    0.000   18.271    0.000 game_of_life.py:156(_next_cell)
   987423   12.825    0.000   16.951    0.000 game_of_life.py:169(_get_around)
  7899487    4.126    0.000    4.127    0.000 {built-in method builtins.max}
      450    0.016    0.000    3.706    0.008 game_of_life.py:247(display)

_updateメソッドから紐づく、次の世代のセルの計算に時間を要しているようです。特に、セルの周囲をチェックする_get_aroundメソッドが今回のターゲットのようです。このメソッドは987423回と大量に呼ばれています。そしてさらに、その8倍ほどmax関数がコールされているようです。

一度、該当箇所を見てます。

game_of_life.py
class GameOfLife:
    def _get_around(self, x, y):
        max_x, max_y = self.x, self.y
        world, colors, torus = self.world, self.colors, self.torus
        alive, color = 0, 0
        for dx, dy in self.dirs:
            next_x, next_y = x + dx, y + dy
            if torus:
                # if torus option is enabled,
                # top and bottom, left and right are connected.
                next_x, next_y = next_x % max_x, next_y % max_y
            if (0 <= next_x < max_x) and (0 <= next_y < max_y):
                if self.color:
                    # if color option is enabled,
                    # the color of most evolved cell around it is inherited.
                    color = max(color, colors[next_y][next_x])
                if world[next_y][next_x]:
                    alive += 1
        return alive, color

色進化のオプション有効時、周囲の色番号の最大値を取るために8方向をチェックした際、max関数を使っています。ここでやりたいことは、最大値の取得なので、必ずしもmax関数を毎回呼ぶ必要はなさそうです。

あと他に処理を軽くできそうなものとして、以下を思いつきました。

  • トーラス世界有効時、範囲チェックはなくす
  • 色進化を有効にする箇所は、別にもあるので条件判定を無くす(色進化なし時のパフォーマンス低下は目をつむる)

全部の変更を織り込んだコードが以下になります。(本来は1変更ずつ効果を見るべきですが、紙面の都合で割愛します)

game_of_life.py
class GameOfLife:
    def _get_around(self, x, y):
        max_x, max_y = self.x, self.y
        world, colors, torus = self.world, self.colors, self.torus
        alive, color = 0, 0
        for dx, dy in self.dirs:
            next_x, next_y = x + dx, y + dy
            calc = False
            if torus:
                # if torus option is enabled,
                # top and bottom, left and right are connected.
                next_x, next_y = next_x % max_x, next_y % max_y
                calc = True
            else:
                calc = (0 <= next_x < max_x) and (0 <= next_y < max_y)
            if calc:
                next_color = colors[next_y][next_x]
                if next_color > color:
                    color = next_color
                if world[next_y][next_x]:
                    alive += 1
        return alive, color

続いて、計測結果は以下のとおりでした。

         2038644 function calls (2037865 primitive calls) in 14.862 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     39/1    0.001    0.000   14.936   14.936 {built-in method builtins.exec}
        1    0.000    0.000   14.936   14.936 runpy.py:200(run_module)
        1    0.000    0.000   14.867   14.867 runpy.py:64(_run_code)
        1    0.000    0.000   14.867   14.867 __main__.py:1(<module>)
        1    0.000    0.000   14.867   14.867 main.py:65(__init__)
        1    0.000    0.000   14.712   14.712 main.py:246(runTests)
        1    0.000    0.000   14.712   14.712 runner.py:151(run)
      2/1    0.000    0.000   14.709   14.709 suite.py:83(__call__)
      2/1    0.000    0.000   14.709   14.709 suite.py:102(run)
        1    0.000    0.000   14.709   14.709 case.py:652(__call__)
        1    0.000    0.000   14.709   14.709 case.py:558(run)
        1    0.000    0.000   14.708   14.708 case.py:549(_callTestMethod)
        1    0.001    0.001   14.708   14.708 test_game_of_life.py:275(test_noahs_ark_elp)
        3    0.024    0.008   14.574    4.858 game_of_life.py:75(start)
      447    1.420    0.003   10.768    0.024 game_of_life.py:122(_update)
   987423    1.333    0.000    9.194    0.000 game_of_life.py:156(_next_cell)
   987423    7.861    0.000    7.861    0.000 game_of_life.py:169(_get_around)
      450    0.016    0.000    3.769    0.008 game_of_life.py:250(display)
      460    3.242    0.007    3.242    0.007 {built-in method builtins.print}
      450    0.514    0.001    0.522    0.001 game_of_life.py:287(_get_world)

今回もうまく速度UP達成できたようです。

(変更前)

         9938154 function calls (9937375 primitive calls) in 23.952 seconds

(変更後)

         2038644 function calls (2037865 primitive calls) in 14.862 seconds

トータル処理時間が、23.952秒から14.862秒と、9.09秒(約1.6倍)の高速化でした。割愛しますが、_get_aroundメソッドも同様に高速化できてそうです。

参考のため、各変更の高速化の内訳を以下に残しておきます。

変更内容 変更前 変更後 高速化
max関数なくす 23.952秒 17.953秒 -5.999秒 1.3倍
トーラス世界の時は範囲チェックしない 17.953秒 15.565秒 -2.388秒 1.15倍
色進化の条件判定なくす 15.565秒 14.862秒 -0.703秒 1.04倍

ここまで順調に来ましたが、実はもう筆者的にはこれ以上のロジックの改善は厳しそうです。…が、最後の一滴を無理やり絞ってみます。

最後の一押し!

ボトルネックとなるメソッドの改善は、もう十分ってくらい考え尽くしました。悲しいですがココが筆者の限界。他に改善しても効果のある箇所はなさそうにしか見えません。…ところが、一つだけ最後の切り札を残している筆者でした。それは処理のスレッドを分けて並列処理させるという案です。

具体的には、世代交代のセルを計算する部分と、コンソールへの描画を別のスレッドに分けるというものです。(画面を分割することも考えましたが、描画は1つの方が良さそうとの事で今回はパスです)

ただこれをしてしまうと、今後のメンテや改善効果の確認に色々と支障をきたすことになってしまいます。ですので最後まで取っておきました。

いよいよ、この案を使うときが来たのかもしれません。(できればまだ取っておきたい)

以下が、コード変更した結果です。

game_of_life.py
import queue
import threading


class GameOfLife:
    def start(self):
        try:
            # 1st screen
            self.console.setup()
            world, step, colors = self.world, self.step, self.colors
            screen = self.console._get_screen(world, step, colors)
            print(screen, end='')
            if self.delay:
                time.sleep(self.delay)

            # thread setup
            q = queue.Queue(1)
            thread = threading.Thread(
                    target=self._update, args=(q,), daemon=True)
            thread.start()

            # update screen
            max_step, loop = self.max_step, self.loop
            while True:
                self._wait()
                screen, step = q.get(timeout=0.5)
                print(str(screen), end='')
                if step >= max_step:
                    if not loop:
                        # if loop option is enabled, game_of_life is never end.
                        break
        except KeyboardInterrupt:
            return
        finally:
            self.console.teardown()

        def _update(self, q):
        while True:
            self._next_step()
            world, step, colors = self.world, self.step, self.colors
            screen = self.console._get_screen(world, step, colors)
            q.put((screen, step))

    def _next_step(self):
        max_y, max_x = self.y, self.x
        world, colors = self.world, self.colors
        mortal, ages = self.mortal, self.ages
        new_world = []
        for y in range(max_y):
            next_cells = []
            for x in range(max_x):
                if next_cell := self._next_cell(x, y):
                    if mortal:
                        # if mortal option is enabled, living cells are ageing.
                        if world[y][x]:
                            next_cell = self._ageing(x, y, next_cell)
                        else:
                            ages[y][x] = 1
                else:
                    ages[y][x], colors[y][x] = 0, 0
                next_cells += [next_cell]
            new_world += [next_cells]
        self.world = [[x for x in row] for row in new_world]
        self.step += 1


class Console:
    def _get_screen(self, world, step, colors):
        screen = ''
        if step > 1:
            screen += self._cursor_up(self.y + 4)
        screen += self._get_title()
        screen += self._get_world(world, colors)
        screen += self._get_step(step)
        return screen

コードの概要としては、以下の役割でスレッドを分けて並列処理するよう作り変えました。

役割 メソッド名
世代(画面)計算 self._update()
画面表示 self.start()の無限ループ部分

世代計算のスレッドは、スクリーンに表示させるテキストを作成し、Queueに格納します。画面表示スレッドはQueueに格納されたテキストを取り出し、print関数で表示させます。

画面表示のIO待ちの際に、次の画面計算に移れる分、速くなるだろうという算段です。

         45981 function calls (45191 primitive calls) in 11.081 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     40/1    0.001    0.000   11.083   11.083 {built-in method builtins.exec}
        1    0.000    0.000   11.083   11.083 runpy.py:200(run_module)
        1    0.000    0.000   11.014   11.014 runpy.py:64(_run_code)
        1    0.000    0.000   11.014   11.014 __main__.py:1(<module>)
        1    0.000    0.000   11.014   11.014 main.py:65(__init__)
        1    0.000    0.000   10.853   10.853 main.py:246(runTests)
        1    0.000    0.000   10.853   10.853 runner.py:151(run)
      2/1    0.000    0.000   10.850   10.850 suite.py:83(__call__)
      2/1    0.000    0.000   10.850   10.850 suite.py:102(run)
        1    0.000    0.000   10.849   10.849 case.py:652(__call__)
        1    0.000    0.000   10.849   10.849 case.py:558(run)
        1    0.000    0.000   10.849   10.849 case.py:549(_callTestMethod)
        1    0.001    0.001   10.849   10.849 test_game_of_life.py:275(test_noahs_ark_elp)
        3    0.015    0.005   10.731    3.577 game_of_life.py:77(start)
      460    9.521    0.021    9.521    0.021 {built-in method builtins.print}
      447    0.004    0.000    0.964    0.002 game_of_life.py:213(_wait)
      447    0.960    0.002    0.960    0.002 {built-in method time.sleep}

やはり、速くなりました。全体で約3.8秒(1.34倍)ほどの改善が見られました。

(変更前)

         2038644 function calls (2037865 primitive calls) in 14.862 seconds

(変更後)

         45981 function calls (45191 primitive calls) in 11.081 seconds

しかし残念なことに、スレッドを分けた世代計算側の内訳は表示されなくなっています。これが筆者的に、最後まで変更せず取っておきたかったポイントです。(調べてみると、メインスレッド以外もちゃんと計測する方法はあるようですが、今回はトライしきれず…)

threadingの場合、厳密には1CPUしか使わないため並行処理と呼ばれるそうです。複数CPUで並列処理を行う場合はmultiprosessingが使えるそうですが、こちらを試したところ起動までの待ち時間がネックで採用しませんでした。

スタート地点の53.831秒から始めて、11.081秒にまで到達しました。4.85倍の高速化です。ここが筆者の終着点と相成りました。

ということで、最終形の動く様子を見て下さい!

体感では随分速くなりました(gifファイルだと伝わりづらい部分がありますが)。ただ、私の目指す全画面サイズ(100x50程度)での動作には及ばなかったようです。また妙案が浮かぶまで、ここで一旦手を止めようと思います。

とはいえ、ここまで、コードを改善し効果を確認するサイクルを通して、自分のコードが良くなっていくのを見るのは実にワクワクしました。実際のところ、コードの改善案を思いつくためには、"理屈"なのか"閃き"なのかどちらが強いんでしょう…。

おわりに

ちょっとやれば誰でもできる拙い内容でしたが、ここまでお読みいただきありがとうございました。処理の速さだけを追い求めるなら、Pythonである必要もコンソール表示である必要も無くなります。ただそうすると、私が作ろうとする必要性も無くなってしまうように感じます。世の中には、私より遥かに優れたライフゲームのプログラムが、既に存在してますのでね。

それだと、とてもさみしいよなぁ…という気持ちが、今回あえての制約条件のもと、コードや本記事を書く原動力になったように思います。Pythonはシンプルさが好きで、ただなんとなくでもいいので書いていたいです。

そしてやはり、ライフゲームは面白いなと思います。代表的なパターンの1つである"グライダー"も、ぶつかり方次第で思いもよらぬ変化が生まれる、なんてことにも気付けて、益々ライフゲームの奥深さを再認識しました。(とはいえ、ガチガチのコアな内容には足元にも及ばないですがね…)

それでは最後に、"グライダー"の衝突を利用し、かつ、独自のルールを織り込んだ自作のライフゲーム、"hanabi"で締めくくります。どうもありがとうございました!!

つづき

その後、続きの記事を書きましたので、良かったら読んでください。

外部ライブラリのNumPyとOpenCVを活用して、さらなる高速化を目指した内容となっております。

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1