0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Pythonのテスト実行を並列化する

Last updated at Posted at 2022-05-13

テストと並列化

ソフトウェアエンジニアリングを行う上で開発者テストは欠かせないものとなっています。テストはコードに対する素早いフィードバックを開発者に与え、高いアジリティを持った開発を支援します。また、CIによりテストを実行することで、コードのチェックインのたびにリグレッションがないかを検査することが容易になります。

ところが開発を続けていくとやがてテストの数は増え、実行にかかる時間も増えていきます。テストの実行にあまりに時間がかかると、開発者へのフィードバックは遅くなり、テストから得られる恩恵も減ってしまいます。

この記事では並列化によるテスト実行の高速化を、Pythonのコードを例に紹介したいと思います。

並列化の実装

ここでは2段階の並列化を行います。1つ目は単一の計算インスタンス内におけるマルチプロセッシングによる並列化、2つ目は複数の計算インスタンスを使うことによる並列化です。

マルチプロセスによる並列化

テストの収集

まずはテストを集めるところからです。Test Discoveryによる方法と、Test Enumerationによる方法があります。以下はTest Discoveryによる方法です。tests/ディレクトリ以下のテストクラスをTestSuiteとして収集します。後の便利のためリストに変換しています。

from unittest import TestLoader

test_suites = list(TestLoader().discover("tests"))

テストの分割

収集されたテストクラスの集合を、並列化するプロセスに割り当てるために分割します。できるだけ計算負荷が均等になるように分割するのが望ましいですが、ここでは各分割(テストグループ)に含まれるテストケースの数ができるだけ均等になるように、テストクラス群を分割します。

import heapq
from collections import namedtuple

TestGroup = namedtuple("TestGroup", "num_classes num_cases classes")

def split_test_classes(test_suites, num_groups):
    # テストクラスの集合(TestSuiteのリスト)を含まれるテストケース数の降順にソートする
    test_suites.sort(key=lambda x: x.countTestCases(), reverse=True)
    # テストグループ用のリストの初期化
    heap = [(0, 0, []) for _ in range(num_groups)]
    # サイズの大きいテストクラスから順に分配する
    for n, test_suite in enumerate(test_suites):
        # 最小ヒープを使い、最も空いているテストグループを取り出す
        group_size, _, test_group = heapq.heappop(heap)
        test_group.append(test_suite)
        # テストクラスを追加後、テストグループのサイズ(テストケース数)を更新してヒープに戻す
        # タプルの2番目の要素は、複数のグループが同数のテストケースを含む場合の順序決定のため
        heapq.heappush(heap, (group_size + test_suite.countTestCases(), n, test_group))
    # TestGroupのリストとして返却
    return [TestGroup(len(test_group), group_size, test_group) for group_size, _, test_group in heap]

分割は貪欲に行っています。すなわち、テストクラス群を、クラスが含むテストケース数の多い順にソートし、サイズの大きいテストクラスから順に、一番空きのあるテストグループに入れていきます。一番空きのあるテストグループを得るために最小ヒープを使用しています。

テストの実行

分割されたテストをマルチプロセッシングにより並列実行します。各プロセスでは、個々のテストグループに含まれるテストクラス群をTestSuiteオブジェクトとしてまとめ、テストランナーに渡し実行します。

from concurrent.futures import ProcessPoolExecutor
from unittest import TestSuite, TextTestRunner

def run_tests(test_suites, num_groups):
    # テストクラス群をテストグループに分割
    test_groups = split_test_classes(test_suites, num_groups)
    # テスト実行
    with ProcessPoolExecutor(max_workers=len(test_groups)) as executor:
        results = list(executor.map(run_test_group, test_groups))
        print("[Test Results]")
        for idx, result in enumerate(results):
            print("Group-{}: {}".format(idx, ['NG', 'OK'][result]))


def run_test_group(test_group):
    # テストグループ内のテストクラス群を一つのテストスイートにまとめて実行する
    suite = TestSuite(test_group.classes)
    result = TextTestRunner().run(suite)
    return result.wasSuccessful()

CPUバウンドなテストであれば、並列数は計算インスタンスのコア数にするのが良いと思います。I/Oバウンドな場合はコア数より少し多めにするのが有効かもしれません。

複数CIジョブを用いた並列化

以上により単一インスタンスでの並列実行ができるようになりました。しかし同時にそれが限界でもあります。さらにテスト実行をスケールさせるためには複数インスタンスでの並列実行をする必要があります。ここではGitHub Actionsのmatrixを使った並列化を紹介します。以下にYAMLファイルの例を示します。

name: ci
on:
  push
jobs:
  run-tests:
    runs-on: ubuntu-latest
    strategy:
      matrix:
        params:
          - {test_group_indices: "0,1"}
          - {test_group_indices: "2,3"}
    env:
      NUM_PARALLELIZATION: 4  # 2 jobs x 2 processes
    steps:
      - uses: actions/checkout@v2
      - name: Setup python 3.9
        uses: actions/setup-python@v2
        with:
          python-version: '3.9.9'
      - name: Run tests
        env:
          TEST_GROUP_INDICES: ${{ matrix.params.test_group_indices }}
        run: |
          python parallel_test_run.py

strategymatrixを指定し、test_group_indicesというコンマ区切りの整数を指定しています。環境変数として、並列数を表すNUM_PARALLELIZATIONと、先のmatrixで定義したパラメータを持つTEST_GROUP_INDICESを設定しています。TEST_GROUP_INDICESは、分割されたテストグループのうち、各Jobがどれを実行するかを指定するものです。Python側のコードは以下のようになります。

import os
from unittest import TestLoader

if __name__ == '__main__':
    default_parallelization_factor = 4
    test_group_indices = os.environ.get("TEST_GROUP_INDICES")
    if test_group_indices:
        test_group_indices = [int(idx) for idx in test_group_indices.split(",")]

    num_parallelization = int(os.environ.get("NUM_PARALLELIZATION", default_parallelization_factor))
    test_classes = list(TestLoader().discover("tests"))
    run_tests(test_classes, num_parallelization, test_group_indices)

ここで、run_testsはテストグループのインデックスを指定できるように拡張しています。

def run_tests(test_suites, num_groups, test_group_indices):
    test_groups = split_test_classes(test_suites, num_groups)
    if test_group_indices:
        # インデックスで指定されたグループのみをピックアップする
        test_groups = [
            test_groups[idx] for idx in test_group_indices
            if idx < len(test_groups)
        ]
    if not test_groups:
        return

    # 以下はマルチプロセスによる実行。前と同じ

matrixのパラメータとNUM_PARALLELIZATIONを修正することで並列数を調整することができます。GitHub Actionsの同時実行上限までスケールすることが期待できます。

まとめ

以上、テスト実行の並列化手法をPythonを用いて紹介しました。こちらにサンプルコードをおいています。サンプルではダミーのテストファイルを生成しています。
https://github.com/shu-yusa/python-test-parallelization
並列化は2つのレベルで行いました。単一インスタンス内での並列化は、外部ライブラリに依存しない比較的シンプルな実装です。Test Discoveryを用いましたが、Test Enumerationでも同様に適用できます。Python以外の言語であっても同様な実装が可能だと思います。

複数インスタンスを用いた並列化ではGitHub Actionsのmatrixを使った例を示しました。今回はテストの実行だけですが、coverageライブラリを並列モードで実行し、各Jobでの結果をキャッシュ経由で後続のJobで渡すことで、テストカバレッジを統合して算出することも可能です。

テストの実行時間で困っている方、高速化をしたいと考えている方の参考になれば幸いです。

参考

unittest — Unit testing framework — Python 3.10.4 documentation
Using a matrix for your jobs - GitHub Docs

0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?