Webアプリケーションのタスクをバックグラウンドで流したいのでCeleryを少し触ってみたので最初の一歩的な記事を書いた。
1. 前段
1-1. ジョブとかバッチジョブとかジョブマネージメントとかジョブスケジューラとか
コンピュータの世界で何かしらの処理をジョブと呼ぶことがある。cronで実行される処理もcronジョブとか言われたりする。postfixとかnginxとかなにかのサービスを提供するアプリケーションもdaemonジョブとか呼ばれることもある。
で今回やるのはユーザの何かしらの操作を契機として比較的長い時間バックグラウンドで実行される処理。バッチジョブとか呼んだりする。HPCとかスパコンの業界でよく使われるのを聞いたことがある。人によっては別のものをバッチジョブと呼んだりする(例えば小売物流金融の世界だと1日の定時や定間隔に行われる処理のことをバッチ(ジョブ)とか日次バッチ(ジョブ)とか呼んだりする)こともあるので、割とコンテキストによって意味が変わる言葉なのかもしれない。
で、そのバッチジョブの実行を開始したり、資源が空いたときに実行しておいてと待ち行列(キュー)に入れたり、実行結果を後で呼び出したりすることができるアプリケーションをジョブマネージャとかジョブスケジューラとか呼んだりする。知っている中では (Open)PBS, Condor, Sun(Oracle) Grid Engine とか。Wikipediaにもリストがあるけどこれに限らないと思う。
1-2. Cerelyと他のジョブスケジューラの違いとか良し悪しとか個人的なお気持ち
Celeryは定義としてはジョブスケジューラではなく、Distributed Task Queueと呼ばれているらしい。RabbitMQ, Redis, RDBなどをブローカとして、複数のワーカーを動かして、投入されたジョブを非同期に動かすことができる。
一番いいところとしては、今回書くようなFlaskやDjangoとコードベースを共有して、Pythonの関数をジョブとして実行することができること。例えば関数に渡す引数とか、戻り値とかをよろしくシリアライズしてくれて、他の関数から読み書きしたりすることができる。
他にジョブスケジューラとして期待されるようなことは結構サポートされている。複数キューとか失敗時のリトライとか、ジョブの実行状態を定期的に渡したりとか。やったことないけど他の言語でワーカーを書いて、Webhookで動かすこともできるらしい。
さらっと使ってみてうーんと思ったのは以下の点
- 学習のハードルが少し高い
ドキュメントは詳細に書かれていて見つからないことはないんだけど、ちょっとわかりづらい。ジョブスケジューラの知識ゼロの人がいきなりCeleryに取り組むと結構しんどそうな感じ。 - 割と大きめの変更が入る
この記事執筆時点のバージョンは5.3.6だけど、4.x台からタスクの管理のモジュールが変わっていたりと、割と大きめに変更が入っているようで、4.x台のノウハウをそのままでは使えないことがある。裏を返せばそれだけアクティブに開発されているということなので、どう受け止めるは使う人の状況次第になるのかもしれない。
感覚としてはAWSのLambdaのようなことをオンプレでやりたいというようなWebスケール的なものをやるには比較的あっているのかもしれない。
もともと自分の希望としては、時間のかかる処理をバックグラウンドで実行したい、途中経過を知りたい、スケールアウトさせたい、というシンプルなものだったので、Celeryは少しoverkillなのかもしれないな、というのが自分の印象。小規模でコードベースが分かれてもいいのであれば、OpenPBSをベースに簡便なwrapperをかぶせるのでもよかったのかもしれないが、作るのもそれはそれで面倒だしありものでさくっとできるのは魅力ではある。この辺は個々の事情によって違うので、Celeryがスイートスポットでハマるところもあるのかもしれない。
今回はさわってないけど監視にはFlowerというアプリケーションを使うことができるらしい。Webの監視コンソールのようなものがあって、ジョブの実行状況などを見ることができる。(例)
他にもPrometheusやGrafanaとつなげることもできるらしい。
ということで以下やっていく。
2. 事前準備:ブローカのインストール
Celeryを使い始める前にブローカを何にするか選ぶ。選択肢はいろいろあるが、ざっと見た中で実例として多いのはRabbitMQ or Redis。私はジョブの途中経過(State)を呼び出し元に伝えることを重視していたのでRedisにした。単にジョブのキューイングだけできればいいのであればRabbitMQでもいいのかもしれないし、RabbitMQでもStateの処理ができないわけではないようだ。
apt install
でさくっとインストールできる。認証かけたりいろいろするのはお好みで。
$ sudo apt install redis-server
3. 作る
3-1. Hello Celery
Flaskとの連携パターンはFlask公式にかかれているが私を含むnot熟練者がひと目見て作れる感じの記述レベルではないのでしばらく試行錯誤した。最小パターンとしては以下のような感じがいいと思った。
内容としてはジョブが投入されたら実行開始まで10秒待機し、その後指定秒数スリープして、'Hello Celery!' という文字列を戻す。
コードを書き始める前にFlaskとCeleryのPythonパッケージを入れておく。Ubuntuが提供しているaptパッケージpython3-celery
だとcelery
コマンドが動かなかったので、Celeryパッケージはpipで入れている。
$ sudo apt install python3-flask
$ pip3 install --user --break-system-packages celery
でコードを書く。作るファイルは3つ。
./
├── app.py
├── config.py
└── tasks.py
from celery import Celery, Task
from flask import Flask
# Celeryの初期化
def celery_init_app(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)
celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config["CELERY"])
celery_app.set_default()
app.extensions["celery"] = celery_app
return celery_app
# Flaskの初期化
def create_app() -> Flask:
app = Flask(__name__)
app.config.from_mapping(
CELERY=dict(
# see available option:
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#new-lowercase-settings
broker_url="redis://localhost",
result_backend="redis://localhost",
task_ignore_result=True,
task_track_started=True,
),
)
app.config.from_prefixed_env()
celery_init_app(app)
return app
config.py の内容は初期化なのだがおまじないに近い。Flaskの拡張としてCeleryを登録している。ブローカの場所とか、Celery自体の挙動はここで設定する。
from tasks import flask_app, long_running_task
from celery.result import AsyncResult
from flask import request,jsonify
@flask_app.post("/submit")
def submit_task() -> dict[str, object]:
seconds = request.args.get('seconds')
result = long_running_task.apply_async([int(seconds)], countdown=10)
return {"result_id": result.id}
@flask_app.get("/result")
def task_result() -> dict[str, object]:
result_id = request.args.get('result_id')
result = AsyncResult(result_id)
print("Current Status:", result.state)
return jsonify({"status": result.state})
if __name__ == "__main__":
flask_app.run(debug=True)
app.pyにはFlaskのルーティングを記述している。ジョブを登録するエンドポイントと、ジョブIDを受け取って状態を戻すエンドポイントの2つ。
from config import create_app
from celery import shared_task
from time import sleep
flask_app = create_app()
celery_app = flask_app.extensions["celery"]
@shared_task(ignore_result= False)
def long_running_task (seconds) -> int:
result = 0
for i in range (seconds):
result += 1
sleep(1)
return "Hello Celery!"
tasks.pyにCeleryのタスクを記述している。
実行は以下のようにする
$ flask run --debugger --reload
* Debug mode: off
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
* Running on http://127.0.0.1:5000
Press CTRL+C to quit
* Restarting with stat
* Debugger is active!
* Debugger PIN: 144-989-716
(待機状態になる)
$ ~/.local/bin/celery -A tasks worker --loglevel=INFO
-------------- celery@desktop v5.3.6 (emerald-rush)
--- ***** -----
-- ******* ---- Linux-6.5.0-14-generic-x86_64-with-glibc2.38 2024-01-17 14:24:51
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: config:0x7fe27187e310
- ** ---------- .> transport: redis://localhost:6379//
- ** ---------- .> results: redis://localhost/
- *** --- * --- .> concurrency: 20 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.long_running_task
[2024-01-17 14:24:51,517: INFO/MainProcess] Connected to redis://localhost:6379//
[2024-01-17 14:24:51,518: INFO/MainProcess] mingle: searching for neighbors
[2024-01-17 14:24:52,521: INFO/MainProcess] mingle: all alone
[2024-01-17 14:24:52,526: INFO/MainProcess] celery@desktop ready.
(待機状態になる)
前提条件(見えるソースコードを同じにしたり、パスを統一したり、ブローカにアクセスできるなど)を揃えれば、Celery workerは他のホストでも動かすことができる。
で、ジョブの投入は以下のように行う
$ curl -XPOST 'http://localhost:5000/submit?seconds=30'
{"result_id":"1a8e2e74-83b1-4ee7-9bdb-f71acd3c6dc1"} # <= Job IDが戻ってくる
得られたJod IDをもとに /result に問い合わせすると、現在のジョブの状態が戻ってくる
$ curl -XPOST 'http://localhost:5000/submit?seconds=30'
{"result_id":"1a8e2e74-83b1-4ee7-9bdb-f71acd3c6dc1"} # <= Job IDが戻ってくる
$ curl -XGET 'http://localhost:5000/result?result_id=1a8e2e74-83b1-4ee7-9bdb-f71acd3c6dc1'
{"status":"PENDING"} # <= 実行前の状態
$ curl -XGET 'http://localhost:5000/result?result_id=1a8e2e74-83b1-4ee7-9bdb-f71acd3c6dc1'
{"status":"STARTED"} # <= 実行中の状態
$ curl -XGET 'http://localhost:5000/result?result_id=1a8e2e74-83b1-4ee7-9bdb-f71acd3c6dc1'
{"status":"SUCCESS"} # <= 実行完了の状態
Celery workerの画面にもジョブの状態が出力される
[2024-01-17 14:35:08,433: INFO/MainProcess] Task tasks.long_running_task[1a8e2e74-83b1-4ee7-9bdb-f71acd3c6dc1] received
[2024-01-17 14:35:48,422: INFO/ForkPoolWorker-16] Task tasks.long_running_task[1a8e2e74-83b1-4ee7-9bdb-f71acd3c6dc1] succeeded in 30.007238832011353s: 'Hello Celery!'
3-2. 実行中の進捗状態を知る
長いジョブの場合、現時点でどの程度進んでいるかを知ることができると精神衛生上よい。それを実装する。
Celeryの言葉ではCustom Stateと呼ぶ。
tasks.py
とapp.py
を変更する
from config import create_app
from celery import shared_task
from time import sleep
flask_app = create_app()
celery_app = flask_app.extensions["celery"]
@shared_task(ignore_result=False, bind=True)
def long_running_task (self, seconds) -> int:
#print("request is", self.request)
result = 0
for i in range (seconds):
if not self.request.called_directly:
self.update_state(
state='RUNNING',
meta={
'progress_percentage': i/seconds*100
})
result += 1
sleep(1)
return "Hello Celery!"
from tasks import flask_app, long_running_task
from celery.result import AsyncResult
from flask import request,jsonify
@flask_app.post("/submit")
def submit_task() -> dict[str, object]:
seconds = request.args.get('seconds')
result = long_running_task.apply_async([int(seconds)], countdown=10)
return {"result_id": result.id}
@flask_app.get("/result")
def task_result() -> dict[str, object]:
result_id = request.args.get('result_id')
result = AsyncResult(result_id)
if result.state == "RUNNING":
return jsonify({
"status": result.state,
"progress": result.info["progress_percentage"]
})
return jsonify({"status": result.state})
実行すると以下のような感じに処理状態が見えるようになる
$ curl -XPOST 'http://localhost:5000/submit?seconds=30'
{"result_id":"1a8827c5-60f4-403b-93a4-8c0b340400ad"} # <=ジョブを投入したらIDが発行される
$ curl -XGET 'http://localhost:5000/result?result_id=1a8827c5-60f4-403b-93a4-8c0b340400ad'
{"status":"PENDING"} # <= 実行前
$ curl -XGET 'http://localhost:5000/result?result_id=1a8827c5-60f4-403b-93a4-8c0b340400ad'
{"progress":6.666666666666667,"status":"RUNNING"} # <= 実行中
$ curl -XGET 'http://localhost:5000/result?result_id=1a8827c5-60f4-403b-93a4-8c0b340400ad'
{"progress":63.33333333333333,"status":"RUNNING"} # <= 実行中
$ curl -XGET 'http://localhost:5000/result?result_id=1a8827c5-60f4-403b-93a4-8c0b340400ad'
{"status":"SUCCESS"} # <= 実行完了
3-3. 実行前、実行中のジョブをキャンセルする
AbortableTask, AbortableAsyncResultを使うことで実装できる
from config import create_app
from celery import shared_task
from celery.contrib.abortable import AbortableTask
from time import sleep
flask_app = create_app()
celery_app = flask_app.extensions["celery"]
@shared_task(ignore_result=False, bind=True, base=AbortableTask)
def long_running_task (self, seconds) -> int:
result = 0
for i in range (seconds):
if self.is_aborted():
print("Task aborted")
return "Aborted"
if not self.request.called_directly:
self.update_state(
state='RUNNING',
meta={
'progress_percentage': i/seconds*100
})
result += 1
sleep(1)
return "Hello Celery!"
from tasks import flask_app, long_running_task
from celery.result import AsyncResult
from celery.contrib.abortable import AbortableAsyncResult
from flask import request,jsonify
@flask_app.post("/submit")
def submit_task() -> dict[str, object]:
seconds = request.args.get('seconds')
result = long_running_task.apply_async([int(seconds)], countdown=20)
return {"result_id": result.id}
@flask_app.get("/result")
def task_result() -> dict[str, object]:
result_id = request.args.get('result_id')
result = AsyncResult(result_id)
if result.state == "RUNNING":
return jsonify({
"status": result.state,
"progress": result.info["progress_percentage"]
})
return jsonify({"status": result.state})
@flask_app.get("/abort")
def task_abort() -> dict[str, object]:
result_id = request.args.get('result_id')
result = AbortableAsyncResult(result_id)
if result.state == "PENDING":
result.revoke()
return jsonify({'status': "SEND"})
if result.state == "RUNNING":
result.abort()
return jsonify({'status': "SEND"})
return jsonify({
'status': "INVALID",
"message": f"specified job is in invalid state: {result.state}"})
if __name__ == "__main__":
flask_app.run(debug=True)
タスクが実行前のときは、ジョブに対してrevoke()
を送ることで、実行されず無視される。ステータスはREVOKEDとなる。
タスクが実行中のときは、ジョブに対してabort()
を送ることで、ジョブのis_aborted
フラグがTrueになるので、タスク側でそれを拾って中断する。中断後のステータスはSUCCESSになる。これはカスタムステータスで上書きできないようだ。
(パターン1: ジョブ実行開始前にrevoke()した場合)
$ curl -XPOST 'http://localhost:5000/submit?seconds=30'
{"result_id":"8a20bfda-83c0-4f61-a881-ac47eae70e63"} # <= ジョブの投入
$ curl -XGET 'http://localhost:5000/abort?result_id=8a20bfda-83c0-4f61-a881-ac47eae70e63'
{"status":"SEND"} # <= revoke()の送信
$ curl -XGET 'http://localhost:5000/result?result_id=8a20bfda-83c0-4f61-a881-ac47eae70e63'
{"status":"PENDING"} # <= ジョブの実行の順番が来るまではPENDINGになる
$ curl -XGET 'http://localhost:5000/result?result_id=8a20bfda-83c0-4f61-a881-ac47eae70e63'
{"status":"REVOKED"} # <= 実行の順番が来ても実際には実行されず、ステータスはREVOKEDになる
(パターン2: ジョブ実行開始後にabort()した場合)
$ curl -XPOST 'http://localhost:5000/submit?seconds=30'
{"result_id":"e90c75a9-2f10-4d75-a46c-ddef46075aa0"} # <= ジョブの投入
$ curl -XGET 'http://localhost:5000/result?result_id=e90c75a9-2f10-4d75-a46c-ddef46075aa0'
{"status":"PENDING"} # <= ジョブ実行開始前
$ curl -XGET 'http://localhost:5000/result?result_id=e90c75a9-2f10-4d75-a46c-ddef46075aa0'
{"progress":6.666666666666667,"status":"RUNNING"} # <= ジョブ実行中
$ curl -XGET 'http://localhost:5000/abort?result_id=e90c75a9-2f10-4d75-a46c-ddef46075aa0'
{"status":"SEND"} # <= abort()を送信
$ curl -XGET 'http://localhost:5000/result?result_id=e90c75a9-2f10-4d75-a46c-ddef46075aa0'
{"status":"SUCCESS"} # <= ジョブは中断されるがステータスはSUCCESSになる
4. おわり
おわりです。