Python
Celery

Celery 3.1のワークフローデザイン

More than 1 year has passed since last update.

Celeryのドキュメント読みつつどういうのがあるのか公式のドキュメントを読みつつ自分的に咀嚼。

The primitives

  • Chains
  • Groups
  • Chords
  • Map & Starmap
  • Chunks

Map & Starmap, Chunksは調べてません。

Chains

タスクを直列に実行する。後続のタスクは直前のタスクの実行結果を受け取る。

個々のタスクのシグネチャに注意すること。

from celery import chain

# `add.s(4, 4)`の結果が`mul.s(8)`へ渡る。その結果が`mul.s(10)`へ渡る。
chain(add.s(4, 4), mul.s(8), mul.s(10))

Groups

複数のタスクを並列に実行する。

from celery import group

# `task.s(2, 2)`と`task.s(4, 4)`が並列に実行される
group(
    task.s(2, 2),
    task.s(4, 4)
)

Chords

複数のタスクの実行結果をコールバックに渡すことが出来る。

from celery import chord

# 複数の実行結果をtsum.s()に渡す
chord(add.s(i, i) for i in xrange(100))(tsum.s()).get()

この例ではadd.s(i, i) for i in xrange(100)が並列実行するタスクで、その実行結果(リスト?)がコールバックtsum.s()に渡される。

組み合わせ

1 pre-processor + n post-processors

これがやりたくて調べてた!

タイムスケールとしてはこんな感じ。

kobito.1423119852.301252.png

前処理として画像保存を実行、後処理として画像処理、リサイズなど複数同時に走らせられるようなタスクを想定。

@task
def main_task(object_id):
    # 前処理を行うタスク

    # 素敵な処理

    # 後続タスクのためにobject_idを返す
    return object_id

@task
def sub_task_1(object_id):
    # 後処理を行うタスク
    pass

@task
def sub_task_2(object_id):
    # 後処理を行うタスク
    pass


# タスク全体のチェインを組み立てる。chain()とgroupを使用する。
chains = chain(
    main_task.s(object.pk),
    group(
        sub_task_1.s(),
        sub_task_2.s()
    )
)

# チェインを実行する
# main_taskの実行完了後、sub_task_1, sub_task_2は並列に実行される。
chains.apply_async()

グループ内の後続タスクが受け取るシグネチャを合わせておくことがポイント。

1 processor + mapped post processors

前処理から複数の出力を行って後続タスクを複数並行に走らせる。先のフローと似ているが、pre processが複数の結果(数は不定)を出力する点が異なる。

kobito.1423120577.848210.png

@task
def pre_process_task(object_id):
    # 前処理を行うタスク

    # 素敵な処理

    # 処理結果となるオブジェクトのリストを返す
    return [1, 2, 3, 4, 5 ...]

@task
def post_process_task(object_id):
    # 後処理を行うタスク
    # 個別のオブジェクトを受け取るように設計する
    pass


@task
def dmap(it, callback):
    # リストを受け取ってcallbakに渡す
    callback = subtask(callback)
    return group(callback.clone([arg,]) for arg in it)()


# タスク全体のチェインを組み立てる
chains = chain(
    pre_process_task.s(object.pk),
    dmap.s(post_process_task.s()
)

# チェインを実行する
# pre_process_taskの実行完了後、post_process_taskが複数並行で処理を行う
chains.apply_async()

Completed task

si()を使用するとimmutable taskとなるため、前方タスクの戻り値を無視してタスクを実行することが出来る。

Kobito.v0wo8Y.png

@task
def main_task(object_id):
    # 何かのタスク
    return (object_id, result)

@task
def sub_task_1(args):
    # 何かのタスク
    object_id, result = args
    return True

@task
def sub_task_2(args):
    # 何かのタスク
    object_id, result = args
    return True

@task
def finalize_task(object_id):
    # タスク完了のログを出力する
    logger.info('Task completed')
    return True


object_id = 123

chain(
    main_task.s(object_id),
    group(
        sub_task_1.s(),  # main_taskの戻り値を使用する
        sub_task_2.s()   # main_taskの戻り値を使用する
    ),
    main_completed_task.si(object_id)       # s()ではなく、si()を使う点に注意
).apply_async()

参考