0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Celery で非同期でタスクを実行

Last updated at Posted at 2025-04-20

python の Celery を使って、ウェブサーバがクライアントからのリクエストを受けてタスクを生成し、そのタスクを非同期で実行する方法のメモ。

時間がかかる処理は、ウェブサーバが直接処理を行わず、キューにタスクを登録してレスポンスを返却する。
そして、別プロセスがタスクを処理して、DB 等に実行結果を登録する。

やりたいこと

  • Flask のウェブサーバがクライアントからリクエストを受け付けると、Celery を使って RabbitMQ にタスクを登録し、クライアントにタスク登録完了のレスポンスを返却。
  • Celery が RabbitMQ からメッセージを取得してタスクを実行。

必要なもの

RabbitMQ

sudo yum install rabbitmq-server

Celery

pip install celery

Flask

pip install Flask

プログラム構成

  • tasks.py

    • タスクを定義
    • ここでは task_add(num1, num2) と task_sub(nu1, num2) を定義
  • server.py

    • Flask の http サーバ
    • メッセージに応じて task_add、task_sub の apply_async() を実行してキューにタスクを登録
  • celery_app.py

    • celery コマンドで実行する worker

プログラム例

tasks.py

Celery の worker で処理したい内容を記述する。
@shared_task() で bind=True を指定すると self でタスク情報を取得することができる。

tasks.py
import time
from celery import shared_task

@shared_task(
    bind=True, # bind=True を指定すると self でタスク情報を取得可能
)
def task_add(self, num1, num2):
    print(f"{self}")
    time.sleep(1.0)
    ret = num1 + num2
    print(f"task_add: {num1} + {num2} = {ret}")
    return ret

@shared_task(
    bind=True
)
def task_sub(self, num1, num2):
    print(f"{self}")
    time.sleep(1.0)
    ret = num1 - num2
    print(f"task_sub: {num1} - {num2} = {ret}")
    return ret

server.py

以下の 2 つの API を用意し、リクエストを受け付けるとキューにタスクを登録。

  • /api/add/?num1={num1}&num2={num2}
  • /api/sub/?num1={num1}&num2={num2}
server.py
from flask import Flask, request
from celery import Celery
from tasks import task_add, task_sub
from celery_app import celery_app


app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = celery_app.conf.broker_url


@app.route('/api/add/', methods=['GET'])
def api_add():
    print(f"api_add()")
    num1 = int(request.args.get('num1'))
    num2 = int(request.args.get('num2'))
    task = task_add.apply_async(
        args=[num1, num2],
        queue='calc',
        routing_key='calc',
        exchange='',
    )
    return f"task_add: {task.id}"


@app.route('/api/sub/', methods=['GET'])
def api_sub():
    print(f"api_sub()")
    num1 = int(request.args.get('num1'))
    num2 = int(request.args.get('num2'))
    task = task_sub.apply_async(
        args=(num1, num2),
        queue='calc',
        routing_key='calc',
        exchange='',
    )
    return f"task_sub: {task.id}"


def main():
    app.run(host='0.0.0.0', port=9000)
    return 0


if __name__ == '__main__':
    res = main()
    exit(res)

celery_app.py

celery コマンドに指定する worker のプログラム。
Celery のコンストラクタの include に tasks を指定することで tasks.py を読み込み。

celery_app.py
from celery import Celery

celery_app = Celery(__name__,
                    broker='amqp://localhost:5672//',
                    include=['tasks'] # tasks.py を指定
                    )

実行例

Flask server、Celery worker を起動し、以下の curl コマンドで Flask server にリクエストを送信。

リクエスト

$ curl -X GET 'http://localhost:9000/api/add/?num1=1&num2=3'
task_add: cc0838cb-1b2f-4c91-a089-33c32dbe371f

$ curl -X GET 'http://localhost:9000/api/sub/?num1=5&num2=2'
task_sub: 883f3d53-3ce2-4855-bea2-d6930ef60704

Flask server

$ python server.py
api_add()
127.0.0.1 - - [20/Apr/2025 16:09:05] "GET /api/add/?num1=1&num2=3 HTTP/1.1" 200 -
api_sub()
127.0.0.1 - - [20/Apr/2025 16:09:24] "GET /api/sub/?num1=5&num2=2 HTTP/1.1" 200 -

Celery worker

$ celery -A celery_app worker -Q calc
[2025-04-20 16:09:05,416: WARNING/ForkPoolWorker-1] <@task: tasks.task_add of celery_app at 0x7fb393eff2f0>
[2025-04-20 16:09:06,416: WARNING/ForkPoolWorker-1] task_add: 1 + 3 = 4
[2025-04-20 16:09:24,551: WARNING/ForkPoolWorker-1] <@task: tasks.task_sub of celery_app at 0x7fb393eff2f0>
[2025-04-20 16:09:25,551: WARNING/ForkPoolWorker-1] task_sub: 5 - 2 = 3
0
0
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?