1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

FastAPI + Celery + Redisでローカルマシンで非同期処理(ジョブキュー)をやってみる!

Last updated at Posted at 2025-04-23

はじめに

例えば、Webバックエンドを構築する際、ユーザーからのリクエストに対してはなるべく即座にレスポンスを返す必要があります。
画像処理やデータベースのクエリ、外部APIとの通信など、時間がかかる処理を行う場合、ユーザーは待たされることになってしまうので、これを解決するために非同期処理を導入するのが良いでしょう!

AWSなどを利用するのであれば、SQSとLambdaを利用したり、AWS Batchを使ってみたりといろいろ手法があると思いますが、ローカルでのテストがしづらいので、ローカル環境で動作するプログラムを作ってみます。

Pythonでは、ジョブキューとしてCeleryがよく利用されます。各種ブローカー(RabbitMQやRedisなど)が利用できますが、Redisは比較的導入が容易かつ軽量なのでCelery + Redisの組み合わせが一般的のようです。

今回はFastAPI + Celery + Redisで簡易的なジョブキューアプリをつくって見ます!
FastAPI上のエンドポイントからジョブをCeleryにエンキューし、Redisを使ってバックグラウンド処理した結果を保存するまでやってみます。

以下に示すファイル一覧と説明をベースに、構築・セットアップ手順と運用のポイントについて見ていきましょう。

書いたコードは以下に配置したので、参考にしてみてください!

プロジェクト構成・主要ファイル一覧

async-task-queue-sample/
├── celery_app.py
├── docker-compose.yml
├── Dockerfile
├── main.py
├── tasks.py
├── pyproject.toml
└── uv.lock

各ファイルの役割

  • celery_app.py
    Celery のインスタンス(celery_app)を作り、ブローカーや結果バックエンドを設定するファイル。すべてのタスクをこのCeleryインスタンス経由で実行する。
  • docker-compose.yml
    Redisコンテナや Celery、FastAPI を起動するコンテナ構成を定義する。ここでは 3 つのサービス(redis, worker, api)を立ち上げる。
  • Dockerfile
    CeleryのWorkerが起動できるような設定を記述している。
  • main.py
    FastAPI本体を定義するファイル。GET /add?x=...&y=...を叩くことで、Celeryタスクをエンキューする。
  • tasks.py
    Celery タスクを定義するファイル。ここでは、非同期処理を行うためのタスクを実装します。

DockerfileでのCeleryのWorkerの起動

以下が Dockerfile の内容です。

FROM python:3.13-slim

WORKDIR /app

RUN pip install uv

COPY pyproject.toml .
COPY uv.lock .

RUN uv pip install --system .

COPY . .

CMD ["celery", "-A", "celery_app", "worker", "--loglevel=info", "--concurrency=4"]

CMD ["celery", "-A", "celery_app", "worker", "--loglevel=info", "--concurrency=4"] の部分でCeleryのWorkerを起動しています。

pyproject.tomlでは以下のような依存関係を定義しています。

  • celery[redis]: Celery本体と、Redisのサポートを含むためのパッケージ。
  • fastapi: PythonのモダンなWebフレームワーク。
  • redis: PythonからRedisに接続するためのクライアント。
  • uvicorn: ASGIサーバー。FastAPIを実行する際に使用。

docker composeの設定

以下が docker-compose.yml の内容です。

services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  worker:
    build: .
    depends_on:
      - redis
    environment:
      CELERY_BROKER_URL: redis://redis:6379/0
      CELERY_RESULT_BACKEND: redis://redis:6379/1

  api:
    build: .
    command: uvicorn main:app --host 0.0.0.0 --port 8080 --reload
    depends_on:
      - redis
    environment:
      CELERY_BROKER_URL: redis://redis:6379/0
      CELERY_RESULT_BACKEND: redis://redis:6379/1
    ports:
      - "8080:8080"

基本的には、Redisを立ち上げて、CeleryとFastAPIから利用するための設定を書いているのみです。
apiサービスは、FastAPIのエンドポイントを提供するために立ち上げていますが、(今回は面倒なので) Celeryと同じイメージを使っています。
コマンドだけは上書きして、FastAPIを起動するようにしています。

簡易APIの作成

以下のような小さなFastAPIのコードを書いています。

from fastapi import FastAPI, HTTPException
from tasks import add

app = FastAPI(title="Async Task Queue Sample")


@app.get("/add")
def enqueue_add(x: int, y: int):
    try:
        res = add.delay(x, y)
        return {"task_id": res.id, "status": "queued"}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Celeryタスクのadd関数を非同期呼び出しするためにdelayメソッドをいます。これによってタスクがRedisにキューイングされ、ワーカーが処理するのを待つ状態になる。

celeryの設定

Celery インスタンスを初期化するコードは以下の通りです。

from celery import Celery
import os

BROKER = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0")
BACKEND = os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/1")


celery_app = Celery("celery_app", broker=BROKER, backend=BACKEND, include=["tasks"])


celery_app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    timezone="Asia/Tokyo",
    task_track_started=True,
)

celery_app = Celery("celery_app", broker=BROKER, backend=BACKEND, include=["tasks"])の部分で、Celeryのインスタンスを作成しています。
brokerにはタスク実行のためのRedisのURLを指定し、backendには結果を保存するためのRedisのURLを指定しています。これにより、CeleryはRedisをブローカーとして使用し、タスクの結果もRedisに保存します。

またinclude=["tasks"]を設定してあげることによって、Worker起動時に自動的に tasks.py 内のタスクが登録されます。

タスクの設定

具体的に実行されるタスクを書きます。
重たい処理を表現するために、time.sleep(2)を入れています。

import time

from celery_app import celery_app


@celery_app.task
def add(x, y):
    time.sleep(2)
    return x + y

実行方法

ここまでの設定が完了したら、以下のコマンドでコンテナを立ち上げます。

docker compose up --build -d

これで、Redis、Celery、FastAPI の各サービスが立ち上がります。
次に、FastAPI のエンドポイントにアクセスしてみましょう。

curl -X GET "http://localhost:8080/add?x=10&y=20"

これで、{"task_id": "xxxx", "status": "queued"} のようなレスポンスが返ってくるはずです。
タスクはキューイングされ、Celeryがバックグラウンドで処理を行います。

docker compose logs -f worker

このコマンドで、Worker のログを確認できます。タスクがキューイングされてから、実行されるまでの様子がわかります。

[2023-10-01 12:00:00,000: INFO/MainProcess] Received task: tasks.add[xxxx]
[2023-10-01 12:00:02,000: INFO/MainProcess] Task tasks.add[xxxx] succeeded in 2.0s: 30

これで、タスクの結果が表示されます。

最後にちゃんと足し算結果の30が表示されていますね!

まとめ

本記事では、FastAPI + Celery + Redisの構成で非同期でのタスク処理をやってみました!

docker compose up --build -dでサービス群が起動し、GET /add?x=10&y=20のようにアクセスするとタスクがキューイングされます。Workerがバックグラウンドでタスクを処理し、結果はRedisに保存されるという流れです。

冒頭にも述べた通り、SQS + LmabdaやAWS Batchなどを利用するのであればその前提でビジネスロジックだけ実装してデプロイするだけでも良いのですが、結局ローカルでスムーズに動作しなければテスト・デバッグ・リファクタリングしづらく、開発速度がめちゃくちゃ落ちてしまうので、何かしらの手段でローカル開発環境を用意するのが良いと思います!

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?