PythonのジョブマネージャであるLuigi 2.0系のメモです。
基本
Taskの基本の型
import luigi
class MyTask(luigi.Task):
date = luigi.DateParameter()
def requires(self):
return MyDependentTask(self.date)
def run(self):
with self.output().open('w') as output:
with self.input().open('r') as input:
for line in input:
ret = do_something(line)
output.write(ret)
output.write('\n')
def output(self):
return luigi.LocalTarget('./out2_{0}.txt'.format(self.date.isoformat()))
class MyDependentTask(luigi.Task):
date = luigi.DateParameter()
def run(self):
with self.output().open('w') as output:
output.write("line1\n")
output.write("line2\n")
def output(self):
return luigi.LocalTarget('./out1_{0}.txt'.format(self.date.isoformat()))
if __name__ == '__main__':
luigi.run()
バイナリファイルをoutputにしたい
luigi.format.Nop
を使う。
例えばpickleしたい場合。
import pickle
import luigi
class SomeTask(luigi.Task):
def requires(self):
return xxxx
def run(self):
some_obj = hoge()
with self.output().open('w') as output:
output.write(pickle.dumps(some_obj, protocol=pickle.HIGHEST_PROTOCOL))
def output(self):
return luigi.LocalTarget(
format=luigi.format.Nop,
path='xxxxxxxx')
class NextTask(luigi.Task):
def requires(self):
return SomeTask()
def run(self):
with self.input().open('r') as infile:
ret = pickle.load(infile)
gzipされたファイルを入力とする
依存するTaskのoutputで luigi.format.GzipFormat
を渡したTarget
を返す。
出力ファイルをgzipする
入力時と同じく、Targetのformatにluigi.format.GzipFormat
を渡しておく
class MyTask(luigi.Task):
def run(self):
with self.output().open('w') as output:
output.write('aaaa')
def output(self):
return luigi.LocalTarget('./out.gz', format=luigi.format.GzipFormat())
PandasのDataFrameをoutputにしたい
outputのformatに luigi.format.Nop
を指定してDataFrameをpickleして書き込む。型が失なわれるので to_csv
等は使わない。
def run(self):
result_df = do_something()
with self.output().open('w') as output:
output.write(pickle.dumps(result_df, protocol=pickle.HIGHEST_PROTOCOL))
inputとして受けとる側は
def run(self):
with self.input().open('r') as infile:
input_df: pd.DataFrame = pickle.load(infile)
do_something(input_df)
タスクを実行するだけのタスク
luigi.WrapperTask
は run
も output
も実装しない。
class MyInvokerTask(luigi.WrapperTask):
def requires(self):
return [FugaTask(), BarTask(), BuzTask(), FooTask()]
依存タスクを並行して実行する
class MyInvokerTask(luigi.WrapperTask):
def requires(self):
return [MyDepTask1(), MyDepTask2(), MyDepTask3()]
class MyDepTask1(luigi.Task):
priority = 100
# 以下略
しておいて、起動コマンドで --workers 2
とか付加すると良い。
各タスクの proprity
を見て高い物から優先して実行する。
依存タスクを順番に実行する
luigi的な依存関係は定義しないけどシリアルに処理したい場合
class MyInvokerTask(luigi.WrapperTask):
def requires(self):
yield MyDepTask1()
yield MyDepTask2()
yield MyDepTask3()
依存タスクを連鎖起動しない
タスクオブジェクトを luigi.task.externalize
すると run
しないで、outputが生成されているかどうかをチェックするのみになる。
class MyTask(luigi.Task):
def requires(self):
return externalize(MyDependencyTask())
def run(self):
print('Someone has finished MyDependencyTask')
コケたジョブを手動でリトライする
VisualiserでタスクがPENDINGになっている、もしくは見えない(スケジューラーからリリース済み)の場合は再度コマンド実行すれば良い。依存ツリー上でoutputが生成されていないタスクのみ実行される。
コケたジョブを自動でリトライする設定
デフォルト設定ではリトライしてくれないので、設定ファイルの次の4項目を指定する。
注: Version 2.5だとリトライ周りの設定項目が変わっている
[core]
worker-keep-alive: true
max-reschedules: 20
[scheduler]
disable-num-failures: 10
retry-delay: 300
リトライが失敗し続けた場合にリトライを止める
disable-window-seconds
で指定した時間内に disable-num-failures
の回数だけコケたらタスクを無効化する。
disable-num-failures: 20
disable-window-seconds: 3600
外部タスクの出力を待ち続ける
luigi.cfg
で retry-external-tasks: true
にするとExternalTaskもリトライしてくれる。retry-delay
の指定はスケジューラー単位で、タスク毎に指定できない。
タスクの処理時間を収集する
luigi.Task.event_handlerデコレーターでフックが作れる。 PROCESSING_TIME
に対してのハンドラ内で、タスク経過時間を集めてやれば、実装は一箇所で済む。
@luigi.Task.event_handler(luigi.Event.PROCESSING_TIME)
def on_processing_time(task, duration):
logger.debug('Task {} proceed {:.1f} sec'.format(task, duration))
# どこかメトリクス収集に投げる
# ...
AWS
外部タスクがS3に置いたファイルを入力とする
luigi.s3.S3PathTask
を使う
class MyTask(luigi.Task):
def requires(self):
return luigi.s3.S3PathTask('s3://xxxxxxx')
gzipされている場合は
class MyTask(luigi.Task):
def requires(self):
return GzipS3FileTask('s3://hoge/fuga.gz')
def run(self):
input = self.input().open('r') # で読める
class GzipS3FileTask(luigi.s3.S3PathTask):
path = luigi.Parameter()
def output(self):
return luigi.s3.S3Target(self.path, format=luigi.format.GzipFormat())
S3に結果を出力する
outputを luigi.s3.S3Target
にして書きこむ。
class MyTask(luigi.Task):
def run(self):
with self.output().open('w') as output:
output.write('Hey')
def output(self):
return luigi.s3.S3Target('s3://hoge/fuga.txt')
S3にアクセスするのにSTS(Security Token Service)接続を使う
S3Target
のclientにSTS接続用のクライアントを渡す
class MyS3FileTask(luigi.s3.S3PathTask):
path = luigi.Parameter()
def output(self):
# assumed roleから取得したキーを渡す
client = luigi.s3.S3Client(
aws_access_key_id=xxxxx,
aws_secret_access_key=yyyyy,
security_token=zzzz)
return luigi.s3.S3Target('s3://xxxx', client=client)
エラー通知をSNSに飛ばす
設定をこんな感じにして
[core]
error-email: arn:aws:sns:ap-northeast-1:0000000000:sns-LuigiError
[email]
type: sns
force-send: true #手動実行時にも飛ばしたい時にtrue
適宜、起動コマンドに AWS_DEFAULT_REGION
等を渡す。EC2インスタンスのIAMロールを使う場合はクレデンシャルの指定は不要。
AWS_DEFAULT_REGION=ap-northeast-1 python sns_test.py Invoke
GCP
GCPのクレデンシャルは環境変数の GOOGLE_APPLICATION_CREDENTIALS
で渡す。
- http://luigi.readthedocs.org/en/stable/_modules/luigi/contrib/gcs.html
- http://luigi.readthedocs.org/en/stable/_modules/luigi/contrib/bigquery.html
GCSのファイルを入力とする
luigi.contrib.gcs.GCSTarget
を使う
GCSTarget
はインスタンス作成時にネットワークアクセスが発生する割にエラーを想定していない作りなので、503が帰ってきた時にリトライがかかるようにすると良い。
GCSに出力する
luigi.contrib.gcs.GCSTarget
に書き込む
import luigi
from luigi.contrib.gcs import GCSTarget
class MyTask(luigi.Task):
def requires(self):
return GCSPathTask(path='gs://hoge/fuga.txt')
def run(self):
with self.input().open('r') as input:
# 何かやる
with self.output().open('w') as output:
# outputに何か書く
def output(self):
return GCSTarget('gs://hoge/fuga_result.txt')
class GCSPathTask(luigi.ExternalTask):
path = luigi.Parameter()
def output(self):
return GCSTarget(self.path)
BigQueryのロードジョブを実行する
luigi.contrib.bigquery
は使いにくいので時前で書いた方が良い。
特にBigQueryTargetはテーブルを削除しないとタスクの再実行ができない。