概要
-
celeryタスクを以下の観点で実行してみた。
- 「ネットワーク越し」に呼び出す
- さらに「複数個順番」に呼び出す
- さらに「成功時/エラー時のcallback用タスクも指定」して呼び出す
前提
- 環境
- python: v.3.7.7
- pip install
- celery: v.4.4.7
celeryタスクをネットワーク越しに呼び出す
以下のタスクが定義されているとする。
import time
@app.task(name="test", bind=True)
def test(self, arg1):
time.sleep(5)
res = arg1 + 1
print("test called. Result is :" + res)
return res
上記のタスクが、別サーバ等で定義され、わざわざmoduleとしてimportできない/したくない場合について考える。
send_task() を使う
send_taskを使って以下のように呼び出すことができる。
from celery.execute import send_task
res = send_task("test", args=[1])
res.get()
この時、send_task
の第一引数はタスク名の文字列、args引数は、タスクに与える引数のリストを与える。
結果は以下のようになる。
$ test called. result is : 2
$ {"name": "celery.app.trace", "message": "Task test[...] succeeded in 5.303148100007093s: 2", "data": {...,"return_value": "2", "runtime": 5.303148100007093}, ...}
5秒待ち合わせてから、結果の 2
が得られたことがわかる。
Signatureを使う
Signatureはタスクをシリアル化してネットワーク越しに送信できるようにwrapしてくれるクラスとのこと。
コードを読むと、以下のように実行することで、タスクをSignatureオブジェクトにwrapし、ネットワーク越しに呼び出すことができるようである。
from celery.execute import Signature
si = Signature("test", args=[1])
# 非同期に実行
res = si.apply_async()
res.get()
この時、Signature
のコンストラクタに与える第一引数はタスク名の文字列、args引数は、タスクに与える引数のリストであっる。
結果は以下のようになる。
$ test called. result is : 2
$ {"name": "celery.app.trace", "message": "Task test[...] succeeded in 5.304436000005808s: 2", "data": {..., "return_value": "2", "runtime": 5.304436000005808}, ...}
5秒待ち合わせてから、結果の 2
が得られたことがわかる。
さらに「複数個順番」に呼び出す
以下のタスクが定義されているとする。
import time
@app.task(name="test", bind=True)
def test(self, arg1):
time.sleep(5)
res = arg1 + 1
print("test called. result is : " + str(res))
return res
@app.task(name="test2", bind=True)
def test2(self, arg1):
time.sleep(3)
res = arg1 + 1
print("test2 called. result is : " + str(res))
return res
これらを、ネットワーク越しに、複数個を順番に実行したい場合を考える。
この場合は、chain
と Signature
を用いることで、実行することができる。
なお、ドキュメントによると、タスクの実行順序は、順番通りに直列に実行するchain
以外にも、並列実行(group)や、並列実行した結果の待ち合わせ(chords)などの指定もできる。
今回は、順番通りに直列実行する chain
にフォーカスして説明する。
send_task()を使う場合
send_task()
には、chain
引数がある。
このchain
に、Signature
オブジェクトのリストとして与えることで、複数のタスクを順番に実行可能になる。
from celery import Signature
res = send_task("test", args=[1], chain=[ Signature("test2", args=[]) ])
res.get()
この時、Signature
のargs引数には、直前のタスク(=test
)の実行結果が、自動的与えられる。
そのため、手動では引数を与えていない。(与えると引数の数が不正となりエラーになる)
結果は以下のようになる。
$ test called. result is : 2
$ {"name": "celery.app.trace", "message": "Task test[...] succeeded in 5.444498699973337s: 2", "data": {..., "return_value": "2", "runtime": 5.444498699973337}, ...}
$ test2 called. result is : 3
$ {"name": "celery.app.trace", "message": "Task test2[...] succeeded in 3.3040928000118583s: 3", "data": {..., "return_value": "3", "runtime": 3.3040928000118583}, ...}
testの実行結果2
が5秒後に得られる。
test2の実行結果の3
がさらに3秒後に得られることがわかる。
直前のタスクの実行結果をSignatureのargs引数に引き継がせたくない場合。
Signatureコンストラクタの引数として、immutable=True
を与えればよい。
→ Signature("test2", args=[10], immutable=True)
Signatureを使う場合
Signatureオブジェクトだけでも完結可能。
コードを読むと(細かいところは割愛します)、
-
OR演算子 で
Signature
または_chain
オブジェクトを繋いであげると、_chain
オブジェクトが返る -
_chain
クラスの__call__()
では、Signature
オブジェクトをqueueに連結させ、Signature
クラスのapply_async()
を実行する。- つまり、前述の
Signature("test", args=[1]).apply_async()
のような呼び出しは不要
- つまり、前述の
from celery import Signature
si = (Signature("test", args=[1]) | Signature("test2", args=[]))
res = si()
res.get()
この時、Signature
のargs引数には、直前のタスク(=test
)の実行結果が、自動的与えられる。
そのため、手動では引数を与えていない。(与えると引数の数が不正となりエラーになる)
結果は以下のようになる。
$ test called. result is : 2
$ {"name": "celery.app.trace", "message": "Task test[...] succeeded in 5.305401199992048s: 2", "data": {..., "return_value": "2", "runtime": 5.305401199992048},...}
$ test2 called. result is : 3
$ {"name": "celery.app.trace", "message": "Task test2[...] succeeded in 3.4333536000049207s: 3", "data": {..., "return_value": "3", "runtime": 3.4333536000049207}, ...}
testの実行結果2
が5秒後に得られる。
test2の実行結果の3
がさらに3秒後に得られることがわかる。
さらに「成功時/エラー時のcallback用タスクも指定」して呼び出す
以下のタスクが定義されているとする。
import time
@app.task(name="test", bind=True)
def test(self, arg1):
time.sleep(5)
res = arg1 + 1
print("test called. result is : " + str(res))
return res
@app.task(name="test2", bind=True)
def test2(self, arg1):
time.sleep(3)
raise Exception("Failed!!!!")
@app.task(name="success", bind=True)
def succ(self, arg1):
print("task success! Arg is: " + str(arg1))
return None
@app.task(name="failed", bind=True)
def fail(self, arg1):
print("task failed! Arg is: " + str(arg1))
return None
これらを組み合わせて、taskが成功した時/失敗した時のcallbackとしてタスクを与える。
celeryでは、link
/link_error
を用いることで、これら機能が実現可能になる。
send_task()を使う場合
send_task()
の引数に、link
とlink_error
がある。
link
はタスク成功時に実行するcallbackで、link_error
はタスク失敗時に実行するcallbackである。
以下のように実行できる。
from celery import Signature
res = send_task("test", args=[1], link=Signature("success", args=[]), link_error=Signature("failed", args=[]) )
res.get()
結果は以下の通り
$ test called. result is : 2
$ {"name": "celery.app.trace", "message": "Task test[...] succeeded in 5.37825320000411s: 2", "data": {..., "return_value": "2", "runtime": 5.37825320000411}, ...}
$ task success! Arg is: 2
$ {"name": "celery.app.trace", "message": "Task success[...] succeeded in 0.36554580001393333s: None", "data": {..., "return_value": "None", "runtime": 0.36554580001393333}, ...}
5秒まって2が出力。
成功したので、"task success!"メッセージが出力されていることが確認できる。
また、エラー検知とchainを組み合わせると以下のように実行できる。
from celery import Signature
res = send_task("test", args=[1], \
link=Signature("success", args=[]), \
link_error=Signature("failed", args=[]), \
chain=[Signature("test2", args=[], link=Signature("success", args=[]), link_error=Signature("failed", args=[]) )] \
)
res.get()
結果は以下の通り
$ test called. result is : 2
$ {"name": "celery.app.trace", "message": "Task test[...] succeeded in 5.331143499992322s: 2", "data": {..., "return_value": "2", "runtime": 5.331143499992322}, ...}
$ task success! Arg is: 2
$ {"name": "celery.app.trace", "message": "Task success[...] succeeded in 0.36938309998367913s: None", "data": {..., "return_value": "None", "runtime": 0.36938309998367913}, ...}
$ {"name": "celery.app.trace", "message": "Task test2[82cd199e-1489-4cb9-b544-f99c4c0054fa] raised unexpected: Exception('Failed!!!!')", "exc_info": ..., "data": {"hostname": "celery@hub", "id": "82cd199e-1489-4cb9-b544-f99c4c0054fa", "name": "test2", "exc": "Exception('Failed!!!!')", "traceback": ..., "args": "[2]", "kwargs": "{}", "description": "raised unexpected", "internal": false}, ...}
$ task failed! Arg is: 82cd199e-1489-4cb9-b544-f99c4c0054fa
$ {"name": "celery.app.trace", "message": "Task failed[...] succeeded in 0.22218690000590868s: None", "data": {..., "return_value": "None", "runtime": 0.22218690000590868}, ...}
3秒まって2
が得られる。
test
タスクが成功したので、"task success!"の文字列が現れる。
chainで、test2
タスクが実行されるが、Exception
をraiseする。
test2
タスクが失敗したので、"task failed!"の文字列が現れることが確認できる。
Signatureを使う場合
send_task()と同様の処理をしてみる。
(Signature("test", args=[1], link=Signature("success", args=[]), link_error=Signature("failed", args=[])) \
| Signature("test2", args=[], link=Signature("success", args=[]), link_error=Signature("failed", args=[])) \
)()
結果は以下の通り
$ test called. result is : 2
$ {"name": "celery.app.trace", "message": "Task test[...] succeeded in 5.3795256999728736s: 2", "data": {..., "return_value": "2", "runtime": 5.3795256999728736}, ...}
$ task success! Arg is: 2
$ {"name": "celery.app.trace", "message": "Task success[...] succeeded in 0.9231500999885611s: None", "data": {..., "return_value": "None", "runtime": 0.9231500999885611}, ...}
$ {"name": "celery.app.trace", "message": "Task test2[127b55dc-78d1-422b-a0f8-546600b65368] raised unexpected: Exception('Failed!!!!')", "exc_info": ..., "data": {"hostname": "celery@hub", "id": "127b55dc-78d1-422b-a0f8-546600b65368", "name": "test2", "exc": "Exception('Failed!!!!')", "traceback": ..., "args": "[2]", "kwargs": "{}", "description": "raised unexpected", "internal": false}, ...}
$ task failed! Arg is: 127b55dc-78d1-422b-a0f8-546600b65368
$ {"name": "celery.app.trace", "message": "Task failed[...] succeeded in 0.2638638000062201s: None", "data": {..., "return_value": "None", "runtime": 0.2638638000062201}, ...}
shared_task()と同じ結果が得られることがわかる。
【2022/3追記】 chainで呼び出したtaskのidを知りたい場合
chainで繋いだtaskを呼び出した場合、返ってくる情報は、chainの 最後 のtask情報(=AsyncResult
オブジェクト)である。
>>> r = (Signature("test", args=[1], link=Signature("success", args=[]), link_error=Signature("failed", args=[])) \
| Signature("test2", args=[], link=Signature("success", args=[]), link_error=Signature("failed", args=[])) \
)()
>>> r
<AsyncResult: 4ed97db4-96f1-408d-9818-2147903f5ed9>
>>> r.id
'4ed97db4-96f1-408d-9818-2147903f5ed9'
chainで繋いでいる全てのtaskのidを引っ張ってくるためには、parent
でアクセスすれば良い
>>> r.parent
<AsyncResult: 74e5730d-2504-4d64-a0f0-95eb663d43eb>
>>> r.parent.id
'74e5730d-2504-4d64-a0f0-95eb663d43eb'
もしparentのtaskが存在しない場合は、Noneが帰ってくる
>>> r.parent
<AsyncResult: 74e5730d-2504-4d64-a0f0-95eb663d43eb>
>>> r.parent.parent
>>> # → 何も帰ってこない(=結果としてはNoneTypeがNoneTypeが返ってきている)