Help us understand the problem. What is going on with this article?

Luigi逆引きリファレンス

PythonのジョブマネージャであるLuigi 2.0系のメモです。

基本

Taskの基本の型

standard_task.py
import luigi

class MyTask(luigi.Task):
    date = luigi.DateParameter()

    def requires(self):
        return MyDependentTask(self.date)

    def run(self):
        with self.output().open('w') as output:
            with self.input().open('r') as input:
                for line in input:
                    ret = do_something(line)
                    output.write(ret)
                    output.write('\n')

    def output(self):
        return luigi.LocalTarget('./out2_{0}.txt'.format(self.date.isoformat()))


class MyDependentTask(luigi.Task):
    date = luigi.DateParameter()

    def run(self):
        with self.output().open('w') as output:
            output.write("line1\n")
            output.write("line2\n")

    def output(self):
        return luigi.LocalTarget('./out1_{0}.txt'.format(self.date.isoformat()))


if __name__ == '__main__':
    luigi.run()

バイナリファイルをoutputにしたい

luigi.format.Nop を使う。
例えばpickleしたい場合。

import pickle

import luigi


class SomeTask(luigi.Task):
    def requires(self):
        return xxxx

    def run(self):
        some_obj = hoge()
        with self.output().open('w') as output:
            output.write(pickle.dumps(some_obj, protocol=pickle.HIGHEST_PROTOCOL))

    def output(self):
        return luigi.LocalTarget(
            format=luigi.format.Nop,
            path='xxxxxxxx')


class NextTask(luigi.Task):
    def requires(self):
        return SomeTask()

    def run(self):
        with self.input().open('r') as infile:
            ret = pickle.load(infile)

gzipされたファイルを入力とする

依存するTaskのoutputで luigi.format.GzipFormatを渡したTargetを返す。

出力ファイルをgzipする

入力時と同じく、Targetのformatにluigi.format.GzipFormatを渡しておく

class MyTask(luigi.Task):
    def run(self):
        with self.output().open('w') as output:
            output.write('aaaa')

    def output(self):
        return luigi.LocalTarget('./out.gz', format=luigi.format.GzipFormat())

PandasのDataFrameをoutputにしたい

outputのformatに luigi.format.Nop を指定してDataFrameをpickleして書き込む。型が失なわれるので to_csv 等は使わない。

def run(self):
    result_df = do_something()
    with self.output().open('w') as output:
        output.write(pickle.dumps(result_df, protocol=pickle.HIGHEST_PROTOCOL))

inputとして受けとる側は

def run(self):
    with self.input().open('r') as infile:
        input_df: pd.DataFrame = pickle.load(infile)
        do_something(input_df)    

タスクを実行するだけのタスク

luigi.WrapperTaskrunoutput も実装しない。

class MyInvokerTask(luigi.WrapperTask):
    def requires(self):
        return [FugaTask(), BarTask(), BuzTask(), FooTask()]

依存タスクを並行して実行する

class MyInvokerTask(luigi.WrapperTask):
    def requires(self):
        return [MyDepTask1(), MyDepTask2(), MyDepTask3()]

class MyDepTask1(luigi.Task):
    priority = 100

    # 以下略

しておいて、起動コマンドで --workers 2 とか付加すると良い。
各タスクの proprity を見て高い物から優先して実行する。

依存タスクを順番に実行する

luigi的な依存関係は定義しないけどシリアルに処理したい場合

class MyInvokerTask(luigi.WrapperTask):
    def requires(self):
        yield MyDepTask1()
        yield MyDepTask2()
        yield MyDepTask3()

依存タスクを連鎖起動しない

タスクオブジェクトを luigi.task.externalize すると run しないで、outputが生成されているかどうかをチェックするのみになる。

class MyTask(luigi.Task):
    def requires(self):
        return externalize(MyDependencyTask())

    def run(self):
        print('Someone has finished MyDependencyTask')

コケたジョブを手動でリトライする

VisualiserでタスクがPENDINGになっている、もしくは見えない(スケジューラーからリリース済み)の場合は再度コマンド実行すれば良い。依存ツリー上でoutputが生成されていないタスクのみ実行される。

コケたジョブを自動でリトライする設定

デフォルト設定ではリトライしてくれないので、設定ファイルの次の4項目を指定する。
注: Version 2.5だとリトライ周りの設定項目が変わっている

luigi.cfg
[core]
worker-keep-alive: true
max-reschedules: 20

[scheduler]
disable-num-failures: 10
retry-delay: 300

リトライが失敗し続けた場合にリトライを止める

disable-window-seconds で指定した時間内に disable-num-failures の回数だけコケたらタスクを無効化する。

luigi.cfg
disable-num-failures: 20
disable-window-seconds: 3600

外部タスクの出力を待ち続ける

luigi.cfgretry-external-tasks: true にするとExternalTaskもリトライしてくれる。retry-delay の指定はスケジューラー単位で、タスク毎に指定できない。

タスクの処理時間を収集する

luigi.Task.event_handlerデコレーターでフックが作れる。 PROCESSING_TIME に対してのハンドラ内で、タスク経過時間を集めてやれば、実装は一箇所で済む。

@luigi.Task.event_handler(luigi.Event.PROCESSING_TIME)
def on_processing_time(task, duration):
    logger.debug('Task {} proceed {:.1f} sec'.format(task, duration))
    # どこかメトリクス収集に投げる
    # ...

AWS

http://luigi.readthedocs.org/en/stable/_modules/luigi/s3.html

外部タスクがS3に置いたファイルを入力とする

luigi.s3.S3PathTask を使う

class MyTask(luigi.Task):
    def requires(self):
        return luigi.s3.S3PathTask('s3://xxxxxxx')

gzipされている場合は

class MyTask(luigi.Task):
    def requires(self):
        return GzipS3FileTask('s3://hoge/fuga.gz')

    def run(self):
        input = self.input().open('r') # で読める

class GzipS3FileTask(luigi.s3.S3PathTask):
    path = luigi.Parameter()

    def output(self):
        return luigi.s3.S3Target(self.path, format=luigi.format.GzipFormat())

S3に結果を出力する

outputを luigi.s3.S3Target にして書きこむ。

class MyTask(luigi.Task):
    def run(self):
        with self.output().open('w') as output:
            output.write('Hey')

    def output(self):
        return luigi.s3.S3Target('s3://hoge/fuga.txt')

S3にアクセスするのにSTS(Security Token Service)接続を使う

S3Target のclientにSTS接続用のクライアントを渡す

class MyS3FileTask(luigi.s3.S3PathTask):
    path = luigi.Parameter()

    def output(self):
        # assumed roleから取得したキーを渡す
        client = luigi.s3.S3Client(
            aws_access_key_id=xxxxx,
            aws_secret_access_key=yyyyy,
            security_token=zzzz)
        return luigi.s3.S3Target('s3://xxxx', client=client)

エラー通知をSNSに飛ばす

設定をこんな感じにして

luigi.cfg
[core]
error-email: arn:aws:sns:ap-northeast-1:0000000000:sns-LuigiError

[email]
type: sns
force-send: true #手動実行時にも飛ばしたい時にtrue

適宜、起動コマンドに AWS_DEFAULT_REGION 等を渡す。EC2インスタンスのIAMロールを使う場合はクレデンシャルの指定は不要。

AWS_DEFAULT_REGION=ap-northeast-1 python sns_test.py Invoke

GCP

GCPのクレデンシャルは環境変数の GOOGLE_APPLICATION_CREDENTIALS で渡す。

GCSのファイルを入力とする

luigi.contrib.gcs.GCSTarget を使う

GCSTargetはインスタンス作成時にネットワークアクセスが発生する割にエラーを想定していない作りなので、503が帰ってきた時にリトライがかかるようにすると良い。

GCSに出力する

luigi.contrib.gcs.GCSTarget に書き込む

import luigi
from luigi.contrib.gcs import GCSTarget

class MyTask(luigi.Task):
   def requires(self):
       return GCSPathTask(path='gs://hoge/fuga.txt')

    def run(self):
        with self.input().open('r') as input:
            # 何かやる

        with self.output().open('w') as output:
            # outputに何か書く

    def output(self):
        return GCSTarget('gs://hoge/fuga_result.txt')

class GCSPathTask(luigi.ExternalTask):
    path = luigi.Parameter()

    def output(self):
        return GCSTarget(self.path)

BigQueryのロードジョブを実行する

luigi.contrib.bigquery は使いにくいので時前で書いた方が良い。
特にBigQueryTargetはテーブルを削除しないとタスクの再実行ができない。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした