AdventarのPython Advent Calendar 2015 25日目の記事です。
はじめに
本記事ではLuigiを用いたジョブパイプライン構築の簡単な実例として、Pytestのテスト再実行について記述する。
Luigiとは
LuigiはPython製のジョブパイプライン構築ツールである。Luigiを用いることで、ジョブパイプライン構築に必要な以下の事柄をPythonコードで表現することができる。
- タスクの実行
- タスク間の依存関係
- タスク実行結果の保存
HadoopやSparkのジョブ実行、データベースからの/へのデータロードなど、それなりに時間がかかるタスクを構成要素としたジョブパイプラインの構築をメインターゲットとしているようで、それらのツールと連携するためのモジュールはデフォルトでサポートされている(http://luigi.readthedocs.org/en/stable/api/luigi.contrib.html#submodules)。
今回取りあげる例では牛刀であるのは紛れもない事実だが、決められた枠組みに乗ってパイプラインの拡張ができる、という点にメリットを感じ、慣れる意味も込めて使用してみることにした。
タスクの定義
Luigiのタスク定義の基本は以下の通りである。
-
luigi.Task
を継承したクラスを定義する。 -
luigi.Task
を継承したクラスで以下のメソッドを定義する。- run(self):タスクの実行処理
- requires(self):タスクの依存関係
- output(self):タスク実行結果の保存処理
- タスクに引数が必要な場合は、
luigi.Parameter()
あるいはluigi.<型:ex. Int>Parameter()
をクラス変数に持たせる。
以下は、pytest
実行のタスク定義である。
root = os.path.normpath(os.path.abspath(os.path.dirname(__file__)))
class PytestTask(luigi.Task):
# タスクの引数
pytest_args = luigi.Parameter(default='tests')
repeat_id = luigi.IntParameter()
# タスクの実行処理
def run(self):
cmd = ['py.test']
cmd.extend(self.pytest_args.split(' '))
os.chdir(root)
process = Popen(cmd, stdout=PIPE, stderr=PIPE)
for line in iter(process.stdout.readline, ''):
print(line.rstrip())
# self.output()から、実行結果を書き込むストリームを取得できる。
out = self.output().open('w')
with open(lastfailed) as f:
out.write(f.read())
out.close()
# タスクの依存関係
# 依存するタスクのリストを返す。(ex. return [A(), B()])
# 今回は諸事情から空のリスト(
def requires(self):
return []
# タスクの実行結果の保存処理
# luigi.Targetから派生したクラスを返す。以下の例ではローカルのファイルシステムに実行結果を保存する
# (ex) http://luigi.readthedocs.org/en/stable/api/luigi.html#luigi.Target
def output(self):
return luigi.LocalTarget('test_repeat_{0}.txt'.format(self.repeat_id))
タスクの依存関係の動的定義
今回は、ただ単にLuigiからpytest
を実行するだけでなく、以下のような要件を満たし自動的にテストの再実行をかけるパイプラインを構築したかった。
- テストが全件成功していない場合は再実行する。再実行する回数の上限は引数で与える。
- テストの再実行の際には、
--lf
オプションを用いて失敗したテストのみ実行する。(参考)
Luigiは前述したrequires(self)
による静的な依存関係の追加のみならず、条件に応じて動的にタスクの依存関係を追加することもできる。
# 前回実行の際に失敗したテストが記録されているファイル
lastfailed = '.cache/v/cache/lastfailed'
class RepeatPytestTask(luigi.Task):
pytest_args = luigi.Parameter(default='tests')
repeat = luigi.IntParameter(default=1)
def is_success(self, target):
i = target.open('r')
# 全件成功している場合、空のディクショナリが生成される
success = bool(not json.load(i))
i.close()
return success
def run(self):
# 一回実行して成功したら終了
out = self.output().open('w')
target = yield PytestTask(
pytest_args=self.pytest_args,
repeat_id=1)
if self.is_success(target):
out.write('success')
out.close()
return
# 二回目以降はlfオプション付きで実行
for i in range(0, self.repeat - 1):
# yield <タスクのインスタンス> で動的な依存関係を追加できる
target = yield PytestTask(
pytest_args='{0} --lf'.format(self.pytest_args),
repeat_id=i + 2)
# 成功した時点で実行終了
if self.is_success(target):
out.write('success')
out.close()
return
# 最後まで失敗が残った
out.write('failure')
out.close()
def output(self):
return luigi.LocalTarget('test_repeats.txt')
パイプラインの実行
前述のタスク定義に加え、パイプラインの起動処理を加えたプログラム全体が以下である。
import json
import os
import sys
from contextlib import contextmanager
from subprocess import Popen, PIPE
import luigi
root = os.path.normpath(os.path.abspath(os.path.dirname(__file__)))
lastfailed = '.cache/v/cache/lastfailed'
class PytestTask(luigi.Task):
pytest_args = luigi.Parameter(default='tests')
repeat_id = luigi.IntParameter()
def output(self):
return luigi.LocalTarget('test_repeat_{0}.txt'.format(self.repeat_id))
def run(self):
cmd = ['py.test']
cmd.extend(self.pytest_args.split(' '))
os.chdir(root)
process = Popen(cmd, stdout=PIPE, stderr=PIPE)
for line in iter(process.stdout.readline, ''):
print(line.rstrip())
out = self.output().open('w')
with open(lastfailed) as f:
out.write(f.read())
out.close()
class RepeatPytestTask(luigi.Task):
pytest_args = luigi.Parameter(default='tests')
# 繰り返し回数は引数として外部から与える
repeat = luigi.IntParameter(default=1)
def is_success(self, target):
i = target.open('r')
success = bool(not json.load(i))
i.close()
return success
def output(self):
return luigi.LocalTarget('test_repeats.txt')
def run(self):
out = self.output().open('w')
target = yield PytestTask(
pytest_args=self.pytest_args,
repeat_id=1)
if self.is_success(target):
out.write('success')
out.close()
return
for i in range(0, self.repeat - 1):
target = yield PytestTask(
pytest_args='{0} --lf'.format(self.pytest_args),
repeat_id=i + 2)
if self.is_success(target):
out.write('success')
out.close()
return
out.write('failure')
out.close()
# パイプラインの起動処理
if __name__ == '__main__':
argv = ['RepeatPytestTask']
if len(sys.argv) > 1:
argv.extend(sys.argv[1:])
luigi.run(argv)
上記のプログラムに繰り返し回数(--repeat
)を与えて実行することで、失敗時に自動で再実行するテストのパイプラインが実現できる。
# Luigiはoutput(self)が出力されている=タスクが終了していると見なす。
# 最初からタスクを実行したい場合は全てのアウトプットを消去する。
$ rm -rf test_repeat_1.txt test_repeats.txt test_repeat_2.txt
# 大規模処理の場合、タスクスケジューラを別途構築することができる。
# 今回は小さい処理なのでローカルでスケジュールする(--local-schedulerオプション)
# http://luigi.readthedocs.org/en/stable/central_scheduler.html?highlight=scheduler%20server
$ python pytest_pipeline.py --local-scheduler --repeat 3