LoginSignup
2
2

More than 1 year has passed since last update.

celeryタスクをネットワーク越しにフローとして実行する

Last updated at Posted at 2022-02-22

概要

  • celeryタスクを以下の観点で実行してみた。
    • 「ネットワーク越し」に呼び出す
    • さらに「複数個順番」に呼び出す
    • さらに「成功時/エラー時のcallback用タスクも指定」して呼び出す

前提

  • 環境
    • python: v.3.7.7
  • pip install
    • celery: v.4.4.7

celeryタスクをネットワーク越しに呼び出す

以下のタスクが定義されているとする。

import time
@app.task(name="test", bind=True)
def test(self, arg1):
    time.sleep(5)
    res = arg1 + 1
    print("test called. Result is :" + res)
    return res

上記のタスクが、別サーバ等で定義され、わざわざmoduleとしてimportできない/したくない場合について考える。

send_task() を使う

send_taskを使って以下のように呼び出すことができる。

from celery.execute import send_task

res = send_task("test", args=[1])

res.get()

この時、send_taskの第一引数はタスク名の文字列、args引数は、タスクに与える引数のリストを与える。

結果は以下のようになる。

$ test called. result is : 2
$ {"name": "celery.app.trace", "message": "Task test[...] succeeded in 5.303148100007093s: 2", "data": {...,"return_value": "2", "runtime": 5.303148100007093}, ...}

5秒待ち合わせてから、結果の 2 が得られたことがわかる。

Signatureを使う

Signatureはタスクをシリアル化してネットワーク越しに送信できるようにwrapしてくれるクラスとのこと。

コードを読むと、以下のように実行することで、タスクをSignatureオブジェクトにwrapし、ネットワーク越しに呼び出すことができるようである。

from celery.execute import Signature

si = Signature("test", args=[1])
# 非同期に実行
res = si.apply_async()

res.get()

この時、Signatureのコンストラクタに与える第一引数はタスク名の文字列、args引数は、タスクに与える引数のリストであっる。

結果は以下のようになる。

$ test called. result is : 2
$ {"name": "celery.app.trace", "message": "Task test[...] succeeded in 5.304436000005808s: 2", "data": {..., "return_value": "2", "runtime": 5.304436000005808}, ...}

5秒待ち合わせてから、結果の 2 が得られたことがわかる。


さらに「複数個順番」に呼び出す

以下のタスクが定義されているとする。

import time
@app.task(name="test", bind=True)
def test(self, arg1):
    time.sleep(5)
    res = arg1 + 1
    print("test called. result is : " + str(res))
    return res

@app.task(name="test2", bind=True)
def test2(self, arg1):
    time.sleep(3)
    res = arg1 + 1
    print("test2 called. result is : " + str(res))
    return res

これらを、ネットワーク越しに、複数個を順番に実行したい場合を考える。

この場合は、chainSignature を用いることで、実行することができる。

なお、ドキュメントによると、タスクの実行順序は、順番通りに直列に実行するchain以外にも、並列実行(group)や、並列実行した結果の待ち合わせ(chords)などの指定もできる。
今回は、順番通りに直列実行する chain にフォーカスして説明する。

send_task()を使う場合

send_task()には、chain引数がある。
このchainに、Signatureオブジェクトのリストとして与えることで、複数のタスクを順番に実行可能になる。

from celery import Signature
res = send_task("test", args=[1], chain=[ Signature("test2", args=[]) ])
res.get()

この時、Signatureのargs引数には、直前のタスク(=test)の実行結果が、自動的与えられる。
そのため、手動では引数を与えていない。(与えると引数の数が不正となりエラーになる)

結果は以下のようになる。

$ test called. result is : 2
$ {"name": "celery.app.trace", "message": "Task test[...] succeeded in 5.444498699973337s: 2", "data": {..., "return_value": "2", "runtime": 5.444498699973337}, ...}
$ test2 called. result is : 3
$ {"name": "celery.app.trace", "message": "Task test2[...] succeeded in 3.3040928000118583s: 3", "data": {..., "return_value": "3", "runtime": 3.3040928000118583}, ...}

testの実行結果2が5秒後に得られる。
test2の実行結果の3がさらに3秒後に得られることがわかる。

直前のタスクの実行結果をSignatureのargs引数に引き継がせたくない場合。

Signatureコンストラクタの引数として、immutable=Trueを与えればよい。

→ Signature("test2", args=[10], immutable=True)

Signatureを使う場合

Signatureオブジェクトだけでも完結可能。

コードを読むと(細かいところは割愛します)、

  • OR演算子Signatureまたは_chainオブジェクトを繋いであげると、_chainオブジェクトが返る
  • _chainクラスの__call__()では、Signatureオブジェクトをqueueに連結させ、Signatureクラスのapply_async()を実行する。
    • つまり、前述の Signature("test", args=[1]).apply_async()のような呼び出しは不要
from celery import Signature
si = (Signature("test", args=[1]) | Signature("test2", args=[]))
res = si()
res.get()

この時、Signatureのargs引数には、直前のタスク(=test)の実行結果が、自動的与えられる。
そのため、手動では引数を与えていない。(与えると引数の数が不正となりエラーになる)

結果は以下のようになる。

$ test called. result is : 2
$ {"name": "celery.app.trace", "message": "Task test[...] succeeded in 5.305401199992048s: 2", "data": {..., "return_value": "2", "runtime": 5.305401199992048},...}
$ test2 called. result is : 3
$ {"name": "celery.app.trace", "message": "Task test2[...] succeeded in 3.4333536000049207s: 3", "data": {..., "return_value": "3", "runtime": 3.4333536000049207}, ...}

testの実行結果2が5秒後に得られる。
test2の実行結果の3がさらに3秒後に得られることがわかる。


さらに「成功時/エラー時のcallback用タスクも指定」して呼び出す

以下のタスクが定義されているとする。

import time
@app.task(name="test", bind=True)
def test(self, arg1):
    time.sleep(5)
    res = arg1 + 1
    print("test called. result is : " + str(res))
    return res

@app.task(name="test2", bind=True)
def test2(self, arg1):
    time.sleep(3)
    raise Exception("Failed!!!!")

@app.task(name="success", bind=True)
def succ(self, arg1):
    print("task success! Arg is: " + str(arg1))
    return None

@app.task(name="failed", bind=True)
def fail(self, arg1):
    print("task failed! Arg is: " + str(arg1))
    return None

これらを組み合わせて、taskが成功した時/失敗した時のcallbackとしてタスクを与える。
celeryでは、link/link_errorを用いることで、これら機能が実現可能になる。

send_task()を使う場合

send_task()の引数に、linklink_errorがある。
linkはタスク成功時に実行するcallbackで、link_errorはタスク失敗時に実行するcallbackである。

以下のように実行できる。

from celery import Signature
res = send_task("test", args=[1],  link=Signature("success", args=[]), link_error=Signature("failed", args=[]) )
res.get()

結果は以下の通り

$ test called. result is : 2
$ {"name": "celery.app.trace", "message": "Task test[...] succeeded in 5.37825320000411s: 2", "data": {..., "return_value": "2", "runtime": 5.37825320000411}, ...}
$ task success! Arg is: 2
$ {"name": "celery.app.trace", "message": "Task success[...] succeeded in 0.36554580001393333s: None", "data": {..., "return_value": "None", "runtime": 0.36554580001393333}, ...}

5秒まって2が出力。
成功したので、"task success!"メッセージが出力されていることが確認できる。

また、エラー検知とchainを組み合わせると以下のように実行できる。

from celery import Signature
res = send_task("test", args=[1],  \
link=Signature("success", args=[]), \
link_error=Signature("failed", args=[]), \
chain=[Signature("test2", args=[], link=Signature("success", args=[]), link_error=Signature("failed", args=[]) )] \
)
res.get()

結果は以下の通り

$ test called. result is : 2
$ {"name": "celery.app.trace", "message": "Task test[...] succeeded in 5.331143499992322s: 2", "data": {..., "return_value": "2", "runtime": 5.331143499992322}, ...}
$ task success! Arg is: 2
$ {"name": "celery.app.trace", "message": "Task success[...] succeeded in 0.36938309998367913s: None", "data": {..., "return_value": "None", "runtime": 0.36938309998367913}, ...}
$ {"name": "celery.app.trace", "message": "Task test2[82cd199e-1489-4cb9-b544-f99c4c0054fa] raised unexpected: Exception('Failed!!!!')", "exc_info": ..., "data": {"hostname": "celery@hub", "id": "82cd199e-1489-4cb9-b544-f99c4c0054fa", "name": "test2", "exc": "Exception('Failed!!!!')", "traceback": ..., "args": "[2]", "kwargs": "{}", "description": "raised unexpected", "internal": false}, ...}
$ task failed! Arg is: 82cd199e-1489-4cb9-b544-f99c4c0054fa
$ {"name": "celery.app.trace", "message": "Task failed[...] succeeded in 0.22218690000590868s: None", "data": {..., "return_value": "None", "runtime": 0.22218690000590868}, ...}

3秒まって2が得られる。
testタスクが成功したので、"task success!"の文字列が現れる。
chainで、test2タスクが実行されるが、Exceptionをraiseする。
test2タスクが失敗したので、"task failed!"の文字列が現れることが確認できる。

Signatureを使う場合

send_task()と同様の処理をしてみる。

(Signature("test", args=[1], link=Signature("success", args=[]), link_error=Signature("failed", args=[])) \
| Signature("test2", args=[], link=Signature("success", args=[]), link_error=Signature("failed", args=[])) \
)()

結果は以下の通り

$ test called. result is : 2
$ {"name": "celery.app.trace", "message": "Task test[...] succeeded in 5.3795256999728736s: 2", "data": {..., "return_value": "2", "runtime": 5.3795256999728736}, ...}
$ task success! Arg is: 2
$ {"name": "celery.app.trace", "message": "Task success[...] succeeded in 0.9231500999885611s: None", "data": {..., "return_value": "None", "runtime": 0.9231500999885611}, ...}
$ {"name": "celery.app.trace", "message": "Task test2[127b55dc-78d1-422b-a0f8-546600b65368] raised unexpected: Exception('Failed!!!!')", "exc_info": ..., "data": {"hostname": "celery@hub", "id": "127b55dc-78d1-422b-a0f8-546600b65368", "name": "test2", "exc": "Exception('Failed!!!!')", "traceback": ..., "args": "[2]", "kwargs": "{}", "description": "raised unexpected", "internal": false}, ...}
$ task failed! Arg is: 127b55dc-78d1-422b-a0f8-546600b65368
$ {"name": "celery.app.trace", "message": "Task failed[...] succeeded in 0.2638638000062201s: None", "data": {..., "return_value": "None", "runtime": 0.2638638000062201}, ...}

shared_task()と同じ結果が得られることがわかる。


【2022/3追記】 chainで呼び出したtaskのidを知りたい場合


chainで繋いだtaskを呼び出した場合、返ってくる情報は、chainの 最後 のtask情報(=AsyncResultオブジェクト)である。

>>> r = (Signature("test", args=[1], link=Signature("success", args=[]), link_error=Signature("failed", args=[])) \
| Signature("test2", args=[], link=Signature("success", args=[]), link_error=Signature("failed", args=[])) \
)()
>>> r
<AsyncResult: 4ed97db4-96f1-408d-9818-2147903f5ed9>
>>> r.id
'4ed97db4-96f1-408d-9818-2147903f5ed9'

chainで繋いでいる全てのtaskのidを引っ張ってくるためには、parentでアクセスすれば良い

>>> r.parent
<AsyncResult: 74e5730d-2504-4d64-a0f0-95eb663d43eb>
>>> r.parent.id
'74e5730d-2504-4d64-a0f0-95eb663d43eb'

もしparentのtaskが存在しない場合は、Noneが帰ってくる

>>> r.parent
<AsyncResult: 74e5730d-2504-4d64-a0f0-95eb663d43eb>
>>> r.parent.parent
>>> # → 何も帰ってこない(=結果としてはNoneTypeがNoneTypeが返ってきている)
2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2