はじめに
例えば、Webバックエンドを構築する際、ユーザーからのリクエストに対してはなるべく即座にレスポンスを返す必要があります。
画像処理やデータベースのクエリ、外部APIとの通信など、時間がかかる処理を行う場合、ユーザーは待たされることになってしまうので、これを解決するために非同期処理を導入するのが良いでしょう!
AWSなどを利用するのであれば、SQSとLambdaを利用したり、AWS Batchを使ってみたりといろいろ手法があると思いますが、ローカルでのテストがしづらいので、ローカル環境で動作するプログラムを作ってみます。
Pythonでは、ジョブキューとしてCeleryがよく利用されます。各種ブローカー(RabbitMQやRedisなど)が利用できますが、Redisは比較的導入が容易かつ軽量なのでCelery + Redisの組み合わせが一般的のようです。
今回はFastAPI + Celery + Redisで簡易的なジョブキューアプリをつくって見ます!
FastAPI上のエンドポイントからジョブをCeleryにエンキューし、Redisを使ってバックグラウンド処理した結果を保存するまでやってみます。
以下に示すファイル一覧と説明をベースに、構築・セットアップ手順と運用のポイントについて見ていきましょう。
書いたコードは以下に配置したので、参考にしてみてください!
プロジェクト構成・主要ファイル一覧
async-task-queue-sample/
├── celery_app.py
├── docker-compose.yml
├── Dockerfile
├── main.py
├── tasks.py
├── pyproject.toml
└── uv.lock
各ファイルの役割
-
celery_app.py
Celery のインスタンス(celery_app
)を作り、ブローカーや結果バックエンドを設定するファイル。すべてのタスクをこのCeleryインスタンス経由で実行する。 -
docker-compose.yml
Redisコンテナや Celery、FastAPI を起動するコンテナ構成を定義する。ここでは 3 つのサービス(redis
,worker
,api
)を立ち上げる。 -
Dockerfile
CeleryのWorkerが起動できるような設定を記述している。 -
main.py
FastAPI本体を定義するファイル。GET /add?x=...&y=...
を叩くことで、Celeryタスクをエンキューする。 -
tasks.py
Celery タスクを定義するファイル。ここでは、非同期処理を行うためのタスクを実装します。
DockerfileでのCeleryのWorkerの起動
以下が Dockerfile
の内容です。
FROM python:3.13-slim
WORKDIR /app
RUN pip install uv
COPY pyproject.toml .
COPY uv.lock .
RUN uv pip install --system .
COPY . .
CMD ["celery", "-A", "celery_app", "worker", "--loglevel=info", "--concurrency=4"]
CMD ["celery", "-A", "celery_app", "worker", "--loglevel=info", "--concurrency=4"]
の部分でCeleryのWorkerを起動しています。
pyproject.toml
では以下のような依存関係を定義しています。
- celery[redis]: Celery本体と、Redisのサポートを含むためのパッケージ。
- fastapi: PythonのモダンなWebフレームワーク。
- redis: PythonからRedisに接続するためのクライアント。
- uvicorn: ASGIサーバー。FastAPIを実行する際に使用。
docker composeの設定
以下が docker-compose.yml
の内容です。
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
worker:
build: .
depends_on:
- redis
environment:
CELERY_BROKER_URL: redis://redis:6379/0
CELERY_RESULT_BACKEND: redis://redis:6379/1
api:
build: .
command: uvicorn main:app --host 0.0.0.0 --port 8080 --reload
depends_on:
- redis
environment:
CELERY_BROKER_URL: redis://redis:6379/0
CELERY_RESULT_BACKEND: redis://redis:6379/1
ports:
- "8080:8080"
基本的には、Redisを立ち上げて、CeleryとFastAPIから利用するための設定を書いているのみです。
apiサービスは、FastAPIのエンドポイントを提供するために立ち上げていますが、(今回は面倒なので) Celeryと同じイメージを使っています。
コマンドだけは上書きして、FastAPIを起動するようにしています。
簡易APIの作成
以下のような小さなFastAPIのコードを書いています。
from fastapi import FastAPI, HTTPException
from tasks import add
app = FastAPI(title="Async Task Queue Sample")
@app.get("/add")
def enqueue_add(x: int, y: int):
try:
res = add.delay(x, y)
return {"task_id": res.id, "status": "queued"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Celeryタスクのadd
関数を非同期呼び出しするためにdelay
メソッドをいます。これによってタスクがRedisにキューイングされ、ワーカーが処理するのを待つ状態になる。
celeryの設定
Celery インスタンスを初期化するコードは以下の通りです。
from celery import Celery
import os
BROKER = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0")
BACKEND = os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/1")
celery_app = Celery("celery_app", broker=BROKER, backend=BACKEND, include=["tasks"])
celery_app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
timezone="Asia/Tokyo",
task_track_started=True,
)
celery_app = Celery("celery_app", broker=BROKER, backend=BACKEND, include=["tasks"])
の部分で、Celeryのインスタンスを作成しています。
broker
にはタスク実行のためのRedisのURLを指定し、backend
には結果を保存するためのRedisのURLを指定しています。これにより、CeleryはRedisをブローカーとして使用し、タスクの結果もRedisに保存します。
またinclude=["tasks"]
を設定してあげることによって、Worker起動時に自動的に tasks.py
内のタスクが登録されます。
タスクの設定
具体的に実行されるタスクを書きます。
重たい処理を表現するために、time.sleep(2)
を入れています。
import time
from celery_app import celery_app
@celery_app.task
def add(x, y):
time.sleep(2)
return x + y
実行方法
ここまでの設定が完了したら、以下のコマンドでコンテナを立ち上げます。
docker compose up --build -d
これで、Redis、Celery、FastAPI の各サービスが立ち上がります。
次に、FastAPI のエンドポイントにアクセスしてみましょう。
curl -X GET "http://localhost:8080/add?x=10&y=20"
これで、{"task_id": "xxxx", "status": "queued"}
のようなレスポンスが返ってくるはずです。
タスクはキューイングされ、Celeryがバックグラウンドで処理を行います。
docker compose logs -f worker
このコマンドで、Worker のログを確認できます。タスクがキューイングされてから、実行されるまでの様子がわかります。
[2023-10-01 12:00:00,000: INFO/MainProcess] Received task: tasks.add[xxxx]
[2023-10-01 12:00:02,000: INFO/MainProcess] Task tasks.add[xxxx] succeeded in 2.0s: 30
これで、タスクの結果が表示されます。
最後にちゃんと足し算結果の30
が表示されていますね!
まとめ
本記事では、FastAPI + Celery + Redisの構成で非同期でのタスク処理をやってみました!
docker compose up --build -d
でサービス群が起動し、GET /add?x=10&y=20
のようにアクセスするとタスクがキューイングされます。Workerがバックグラウンドでタスクを処理し、結果はRedisに保存されるという流れです。
冒頭にも述べた通り、SQS + LmabdaやAWS Batchなどを利用するのであればその前提でビジネスロジックだけ実装してデプロイするだけでも良いのですが、結局ローカルでスムーズに動作しなければテスト・デバッグ・リファクタリングしづらく、開発速度がめちゃくちゃ落ちてしまうので、何かしらの手段でローカル開発環境を用意するのが良いと思います!