python の Celery を使って、ウェブサーバがクライアントからのリクエストを受けてタスクを生成し、そのタスクを非同期で実行する方法のメモ。
時間がかかる処理は、ウェブサーバが直接処理を行わず、キューにタスクを登録してレスポンスを返却する。
そして、別プロセスがタスクを処理して、DB 等に実行結果を登録する。
やりたいこと
- Flask のウェブサーバがクライアントからリクエストを受け付けると、Celery を使って RabbitMQ にタスクを登録し、クライアントにタスク登録完了のレスポンスを返却。
- Celery が RabbitMQ からメッセージを取得してタスクを実行。
必要なもの
RabbitMQ
sudo yum install rabbitmq-server
Celery
pip install celery
Flask
pip install Flask
プログラム構成
-
tasks.py
- タスクを定義
- ここでは task_add(num1, num2) と task_sub(nu1, num2) を定義
-
server.py
- Flask の http サーバ
- メッセージに応じて task_add、task_sub の apply_async() を実行してキューにタスクを登録
-
celery_app.py
- celery コマンドで実行する worker
プログラム例
tasks.py
Celery の worker で処理したい内容を記述する。
@shared_task() で bind=True を指定すると self でタスク情報を取得することができる。
tasks.py
import time
from celery import shared_task
@shared_task(
bind=True, # bind=True を指定すると self でタスク情報を取得可能
)
def task_add(self, num1, num2):
print(f"{self}")
time.sleep(1.0)
ret = num1 + num2
print(f"task_add: {num1} + {num2} = {ret}")
return ret
@shared_task(
bind=True
)
def task_sub(self, num1, num2):
print(f"{self}")
time.sleep(1.0)
ret = num1 - num2
print(f"task_sub: {num1} - {num2} = {ret}")
return ret
server.py
以下の 2 つの API を用意し、リクエストを受け付けるとキューにタスクを登録。
- /api/add/?num1={num1}&num2={num2}
- /api/sub/?num1={num1}&num2={num2}
server.py
from flask import Flask, request
from celery import Celery
from tasks import task_add, task_sub
from celery_app import celery_app
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = celery_app.conf.broker_url
@app.route('/api/add/', methods=['GET'])
def api_add():
print(f"api_add()")
num1 = int(request.args.get('num1'))
num2 = int(request.args.get('num2'))
task = task_add.apply_async(
args=[num1, num2],
queue='calc',
routing_key='calc',
exchange='',
)
return f"task_add: {task.id}"
@app.route('/api/sub/', methods=['GET'])
def api_sub():
print(f"api_sub()")
num1 = int(request.args.get('num1'))
num2 = int(request.args.get('num2'))
task = task_sub.apply_async(
args=(num1, num2),
queue='calc',
routing_key='calc',
exchange='',
)
return f"task_sub: {task.id}"
def main():
app.run(host='0.0.0.0', port=9000)
return 0
if __name__ == '__main__':
res = main()
exit(res)
celery_app.py
celery コマンドに指定する worker のプログラム。
Celery のコンストラクタの include に tasks を指定することで tasks.py を読み込み。
celery_app.py
from celery import Celery
celery_app = Celery(__name__,
broker='amqp://localhost:5672//',
include=['tasks'] # tasks.py を指定
)
実行例
Flask server、Celery worker を起動し、以下の curl コマンドで Flask server にリクエストを送信。
リクエスト
$ curl -X GET 'http://localhost:9000/api/add/?num1=1&num2=3'
task_add: cc0838cb-1b2f-4c91-a089-33c32dbe371f
$ curl -X GET 'http://localhost:9000/api/sub/?num1=5&num2=2'
task_sub: 883f3d53-3ce2-4855-bea2-d6930ef60704
Flask server
$ python server.py
api_add()
127.0.0.1 - - [20/Apr/2025 16:09:05] "GET /api/add/?num1=1&num2=3 HTTP/1.1" 200 -
api_sub()
127.0.0.1 - - [20/Apr/2025 16:09:24] "GET /api/sub/?num1=5&num2=2 HTTP/1.1" 200 -
Celery worker
$ celery -A celery_app worker -Q calc
[2025-04-20 16:09:05,416: WARNING/ForkPoolWorker-1] <@task: tasks.task_add of celery_app at 0x7fb393eff2f0>
[2025-04-20 16:09:06,416: WARNING/ForkPoolWorker-1] task_add: 1 + 3 = 4
[2025-04-20 16:09:24,551: WARNING/ForkPoolWorker-1] <@task: tasks.task_sub of celery_app at 0x7fb393eff2f0>
[2025-04-20 16:09:25,551: WARNING/ForkPoolWorker-1] task_sub: 5 - 2 = 3