Luigi逆引きリファレンス

More than 1 year has passed since last update.

PythonのジョブマネージャであるLuigi 2.0系のメモです。


基本


Taskの基本の型


standard_task.py

import luigi

class MyTask(luigi.Task):
date = luigi.DateParameter()

def requires(self):
return MyDependentTask(self.date)

def run(self):
with self.output().open('w') as output:
with self.input().open('r') as input:
for line in input:
ret = do_something(line)
output.write(ret)
output.write('\n')

def output(self):
return luigi.LocalTarget('./out2_{0}.txt'.format(self.date.isoformat()))

class MyDependentTask(luigi.Task):
date = luigi.DateParameter()

def run(self):
with self.output().open('w') as output:
output.write("line1\n")
output.write("line2\n")

def output(self):
return luigi.LocalTarget('./out1_{0}.txt'.format(self.date.isoformat()))

if __name__ == '__main__':
luigi.run()



バイナリをoutputに書きたい

luigi.format.Nop を使う。

例えばpickleしたい場合。

import pickle

import luigi

class SomeTask(luigi.Task):
def requires(self):
return xxxx

def run(self):
some_obj = hoge()
with self.output().open('w') as output:
output.write(pickle.dumps(some_obj, protocol=2))

def output(self):
return luigi.LocalTarget(
format=luigi.format.Nop,
path='xxxxxxxx')

class NextTask(luigi.Task):
def requires(self):
return SomeTask()

def run(self):
with self.input().open('r') as infile:
ret = pickle.load(infile)


gzipされたファイルを入力とする

依存するTaskのoutputで luigi.format.GzipFormatを渡したTargetを返す。


出力ファイルをgzipする

入力時と同じく、Targetのformatにluigi.format.GzipFormatを渡しておく

class MyTask(luigi.Task):

def run(self):
with self.output().open('w') as output:
output.write('aaaa')

def output(self):
return luigi.LocalTarget('./out.gz', format=luigi.format.GzipFormat())


PandasのDataFrameからoutputに書きたい

def run(self):

result_df = do_something()
with self.output().open('w') as output:
result_df.to_csv(output, index=None)

inputとして受けとる側は

import pandas as pd

...

def run(self):
input_df = pd.read_csv(self.input().open('r'))
do_something(input_df)


タスクを実行するだけのタスク

luigi.WrapperTaskrunoutput も実装しない。

class MyInvokerTask(luigi.WrapperTask):

def requires(self):
return [FugaTask(), BarTask(), BuzTask(), FooTask()]


依存タスクを並行して実行する

class MyInvokerTask(luigi.WrapperTask):

def requires(self):
return [MyDepTask1(), MyDepTask2(), MyDepTask3()]

class MyDepTask1(luigi.Task):
priority = 100

# 以下略

しておいて、起動コマンドで --workers 2 とか付加すると良い。

各タスクの proprity を見て高い物から優先して実行する。


依存タスクを順番に実行する

luigi的な依存関係は定義しないけどシリアルに処理したい場合

class MyInvokerTask(luigi.WrapperTask):

def requires(self):
yield MyDepTask1()
yield MyDepTask2()
yield MyDepTask3()


依存タスクを連鎖起動しない

タスクオブジェクトを luigi.task.externalize すると run しないで、outputが生成されているかどうかをチェックするのみになる。

class MyTask(luigi.Task):

def requires(self):
return externalize(MyDependencyTask())

def run(self):
print('Someone has finished MyDependencyTask')


コケたジョブを手動でリトライする

VisualiserでタスクがPENDINGになっている、もしくは見えない(スケジューラーからリリース済み)の場合は再度コマンド実行すれば良い。依存ツリー上でoutputが生成されていないタスクのみ実行される。


コケたジョブを自動でリトライする設定

デフォルト設定ではリトライしてくれないので、設定ファイルの次の4項目を指定する。

注: Version 2.5だとリトライ周りの設定項目が変わっている


luigi.cfg

[core]

worker-keep-alive: true
max-reschedules: 20

[scheduler]
disable-num-failures: 10
retry-delay: 300



リトライが失敗し続けた場合にリトライを止める

disable-window-seconds で指定した時間内に disable-num-failures の回数だけコケたらタスクを無効化する。


luigi.cfg

disable-num-failures: 20

disable-window-seconds: 3600


外部タスクの出力を待ち続ける

luigi.cfgretry-external-tasks: true にするとExternalTaskもリトライしてくれる。retry-delay の指定はスケジューラー単位で、タスク毎に指定できない。


タスクの処理時間を収集する

luigi.Task.event_handlerデコレーターでフックが作れる。 PROCESSING_TIME に対してのハンドラ内で、タスク経過時間を集めてやれば、実装は一箇所で済む。

@luigi.Task.event_handler(luigi.Event.PROCESSING_TIME)

def on_processing_time(task, duration):
logger.debug('Task {} proceed {:.1f} sec'.format(task, duration))
# どこかメトリクス収集に投げる
# ...


AWS

http://luigi.readthedocs.org/en/stable/_modules/luigi/s3.html


外部タスクがS3に置いたファイルを入力とする

luigi.s3.S3PathTask を使う

class MyTask(luigi.Task):

def requires(self):
return luigi.s3.S3PathTask('s3://xxxxxxx')

gzipされている場合は

class MyTask(luigi.Task):

def requires(self):
return GzipS3FileTask('s3://hoge/fuga.gz')

def run(self):
input = self.input().open('r') # で読める

class GzipS3FileTask(luigi.s3.S3PathTask):
path = luigi.Parameter()

def output(self):
return luigi.s3.S3Target(self.path, format=luigi.format.GzipFormat())


S3に結果を出力する

outputを luigi.s3.S3Target にして書きこむ。

class MyTask(luigi.Task):

def run(self):
with self.output().open('w') as output:
output.write('Hey')

def output(self):
return luigi.s3.S3Target('s3://hoge/fuga.txt')


S3にアクセスするのにSTS(Security Token Service)接続を使う

S3Target のclientにSTS接続用のクライアントを渡す

class MyS3FileTask(luigi.s3.S3PathTask):

path = luigi.Parameter()

def output(self):
# assumed roleから取得したキーを渡す
client = luigi.s3.S3Client(
aws_access_key_id=xxxxx,
aws_secret_access_key=yyyyy,
security_token=zzzz)
return luigi.s3.S3Target('s3://xxxx', client=client)


エラー通知をSNSに飛ばす

設定をこんな感じにして


luigi.cfg

[core]

error-email: arn:aws:sns:ap-northeast-1:0000000000:sns-LuigiError

[email]
type: sns
force-send: true #手動実行時にも飛ばしたい時にtrue


適宜、起動コマンドに AWS_DEFAULT_REGION 等を渡す。EC2インスタンスのIAMロールを使う場合はクレデンシャルの指定は不要。

AWS_DEFAULT_REGION=ap-northeast-1 python sns_test.py Invoke


GCP

GCPのクレデンシャルは環境変数の GOOGLE_APPLICATION_CREDENTIALS で渡す。


GCSのファイルを入力とする

luigi.contrib.gcs.GCSTarget を使う

GCSTargetはインスタンス作成時にネットワークアクセスが発生する割にエラーを想定していない作りなので、503が帰ってきた時にリトライがかかるようにすると良い。


GCSに出力する

luigi.contrib.gcs.GCSTarget に書き込む

import luigi

from luigi.contrib.gcs import GCSTarget

class MyTask(luigi.Task):
def requires(self):
return GCSPathTask(path='gs://hoge/fuga.txt')

def run(self):
with self.input().open('r') as input:
# 何かやる

with self.output().open('w') as output:
# outputに何か書く

def output(self):
return GCSTarget('gs://hoge/fuga_result.txt')

class GCSPathTask(luigi.ExternalTask):
path = luigi.Parameter()

def output(self):
return GCSTarget(self.path)


BigQueryのロードジョブを実行する

luigi.contrib.bigquery は使いにくいので時前で書いた方が良い。

特にBigQueryTargetはテーブルを削除しないとタスクの再実行ができない。