11
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

分散タスクキューCeleryの備忘録[Python]

Last updated at Posted at 2025-11-05

はじめに

CeleryはPythonの分散タスクキューです。メッセージブローカーにはRedisを使います。
概要図を以下に示します。

ワーカー(Worker)は、タスクキューから受け取った非同期タスクを実際に実行するプロセスです。メッセージブローカーは、アプリとワーカーの間でメッセージ(タスク)を仲介・配送するミドルウェアです。

Celeryを導入することで、以下のようなメリットがあります。

メリット 説明
非同期処理 重い処理をバックグラウンドで実行 メール送信、画像処理、動画エンコード
スケーラビリティ Workerを増やして処理能力を向上 トラフィック増加時にWorkerを追加
スケジューリング 定期実行やcron的な処理 毎日のレポート生成、定期バックアップ
リトライ機能 失敗時の自動再試行 外部API呼び出し、ネットワーク処理
分散処理 複数サーバーで負荷分散 大量のデータ処理を複数マシンで

準備

まずはライブラリをインストールします。

pip install "celery[redis]"

次にdockerでRedisを起動します。

docker run -d --name redis -p 6379:6379 redis:latest

⚠️注意: 本番環境では必ずRedisに認証を設定してください。

サンプルプロジェクトの構成

今回は簡単な演算タスクを登録するプロジェクトを作っていきます。
プロジェクト構成を説明します。

project/
├── app.py(Celeryアプリ本体)
├── tasks.py(タスク定義)
└── producer.py(タスクを投げる側)

このような構成で説明します。

app.py

以下のようにapp.pyを書きます。

app.py
# app.py
from celery import Celery

# Redisをbrokerとbackendに使用(本番は別インスタンス推奨)
celery_app = Celery(
    "proj",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
    include=["tasks"],
)

celery_app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    timezone="Asia/Tokyo",
    enable_utc=True,
    task_acks_late=True,  # タスク完了時ACK(再実行に強い)
    worker_prefetch_multiplier=1,  # フェアスケジューリング
    task_time_limit=300,  # ハードタイムリミット
    task_soft_time_limit=240,  # ソフトタイムリミット
)

Brokerは高頻度の読み書き、Backendは結果の保存・取得に使われます。別インスタンスにすることで、スケーリングが柔軟にできるようになります。
Celeryアプリ本体(celery_app)を後ほど呼び出します。brokerとbackendにはRedisのURIを設定してください。

tasks.py

タスク定義をtasks.pyに書いていきます。

tasks.py
# tasks.py
from app import celery_app

@celery_app.task
def add(x, y):
    return x + y


@celery_app.task
def multiply(x, y):
    return x * y


@celery_app.task
def sum_all(numbers):
    """リストの合計"""
    return sum(numbers)


@celery_app.task
def aggregate(results):
    """統計情報を返す"""
    return {
        "count": len(results),
        "sum": sum(results),
        "min": min(results),
        "max": max(results),
    }

関数のデコレーターに@celery_app.taskを書くことで以下のメソッドが自動的に追加されます。

メソッド 説明 使用例
add.delay(x, y) 非同期実行(最もシンプル) add.delay(2, 3)
add.apply_async() 詳細オプション付き非同期実行 add.apply_async((2, 3), countdown=10)
add.s() シグネチャ作成(chain/group用) add.s(2, 3)

これらを使って非同期実行が可能になります。

# 直接呼ぶと同期実行(Celeryを使わない)
result = add(2, 3)  # 5

# ✅ 非同期実行(Celeryワーカーで実行)
result = add.delay(2, 3)  # AsyncResult オブジェクト
print(result.get())  # 5(結果を待つ)

このように使用します。

producer.py

タスクを投げる側の実装を以下に書きます。
chainを使った直列処理、groupを使った並列処理、chordを使った集約処理が実装されています。

producer.py
# producer.py
from celery import chain, group, chord
from tasks import add, multiply, sum_all, aggregate

# chain(直列処理)
print("=== Chain ===")
c1 = add.s(2, 2) | multiply.s(3) | add.s(10)
result = c1.delay()
print(f"Result: {result.get()}")  # (2+2)*3+10 = 22

# group(並列処理)
print("\n=== Group ===")
g = group(add.s(i, i * 2) for i in range(5))
result = g.delay()
print(f"Results: {result.get()}")  # [0, 3, 6, 9, 12]

# chord(並列→集約)
print("\n=== Chord ===")
job = chord(group(add.s(i, i) for i in range(10)))(sum_all.s())
print(f"Sum: {job.get()}")  # 0+2+4+6+8+10+12+14+16+18 = 90

# chord with aggregate
job2 = chord(group(multiply.s(i, i) for i in range(1, 6)))(aggregate.s())
print(f"Stats: {job2.get()}")  # {count:5, sum:55, min:1, max:25}

# 複雑な組み合わせ
print("\n=== Complex ===")
workflow = chain(
    group(add.s(i, i) for i in range(5)),  # [0,2,4,6,8]
    sum_all.s(),  # 20
    multiply.s(2),  # 40
)
print(f"Final: {workflow.delay().get()}")  # 40

chainには、2つの書き方があります。

# パターン1
c = chain(add.s(2, 2), add.s(10))

# パターン2: パイプ演算子を使う場合
c = add.s(2, 2) | add.s(10)

サンプルの実行

Redisが起動している状態で、ワーカーを起動させましょう。

# 単一Celeryワーカー起動する場合
celery -A app.celery_app worker --loglevel=info --concurrency=20

# 複数ワーカーを起動する場合
celery -A app.celery_app worker --loglevel=info --concurrency=10 -n worker1@%h &
celery -A app.celery_app worker --loglevel=info --concurrency=10 -n worker2@%h &
celery -A app.celery_app worker --loglevel=info --concurrency=10 -n worker3@%h &

そして、producer.pyを実行してみます。

% python producer.py 
=== Chain ===
Result: 22

=== Group ===
Results: [0, 3, 6, 9, 12]

=== Chord ===
Sum: 90
Stats: {'count': 5, 'sum': 55, 'min': 1, 'max': 25}

=== Complex ===
Final: 40

ワーカーの方のログには、以下のようなものが流れています。複数ワーカーを起動している場合は、処理が分散するのでログも分散します。

ログ全文
[2025-11-04 15:13:57,925: INFO/MainProcess] Task tasks.add[8b085378-66ac-4cf0-b3ca-73798e887a17] received
[2025-11-04 15:13:57,934: INFO/MainProcess] Task tasks.multiply[cf6e808a-25bd-4aa1-92cc-3fc297b80fe7] received
[2025-11-04 15:13:57,935: INFO/ForkPoolWorker-8] Task tasks.add[8b085378-66ac-4cf0-b3ca-73798e887a17] succeeded in 0.00917254199157469s: 4
[2025-11-04 15:13:57,943: INFO/MainProcess] Task tasks.add[ceac1047-9e02-45ea-8012-e809d4795ef4] received
[2025-11-04 15:13:57,944: INFO/ForkPoolWorker-8] Task tasks.add[ceac1047-9e02-45ea-8012-e809d4795ef4] succeeded in 0.0010475829767528921s: 22
[2025-11-04 15:13:57,945: INFO/ForkPoolWorker-1] Task tasks.multiply[cf6e808a-25bd-4aa1-92cc-3fc297b80fe7] succeeded in 0.009986290999222547s: 12
[2025-11-04 15:13:57,947: INFO/MainProcess] Task tasks.add[ded9e44d-3623-43f4-a4c2-bd55493159d4] received
[2025-11-04 15:13:57,948: INFO/MainProcess] Task tasks.add[379c6035-a6d7-46be-9fca-829c14a048f9] received
[2025-11-04 15:13:57,948: INFO/ForkPoolWorker-8] Task tasks.add[ded9e44d-3623-43f4-a4c2-bd55493159d4] succeeded in 0.001000124990241602s: 0
[2025-11-04 15:13:57,949: INFO/MainProcess] Task tasks.add[86fcc316-6fe7-400a-9289-e2bdcb8c6375] received
[2025-11-04 15:13:57,949: INFO/ForkPoolWorker-1] Task tasks.add[379c6035-a6d7-46be-9fca-829c14a048f9] succeeded in 0.0008713330025784671s: 3
[2025-11-04 15:13:57,950: INFO/MainProcess] Task tasks.add[30fab9cd-3546-47fe-a0cd-2016efcd7d02] received
[2025-11-04 15:13:57,950: INFO/ForkPoolWorker-8] Task tasks.add[86fcc316-6fe7-400a-9289-e2bdcb8c6375] succeeded in 0.0009778329986147583s: 6
[2025-11-04 15:13:57,951: INFO/MainProcess] Task tasks.add[3ad9ec75-eeb8-419c-b0ce-24854649eff2] received
[2025-11-04 15:13:57,951: INFO/ForkPoolWorker-1] Task tasks.add[30fab9cd-3546-47fe-a0cd-2016efcd7d02] succeeded in 0.0008664169872645289s: 9
[2025-11-04 15:13:57,953: INFO/ForkPoolWorker-8] Task tasks.add[3ad9ec75-eeb8-419c-b0ce-24854649eff2] succeeded in 0.0008945840236265212s: 12
[2025-11-04 15:13:58,462: INFO/MainProcess] Task tasks.add[c49d9ba2-6a22-48cb-b191-4c02833ed38f] received
[2025-11-04 15:13:58,465: INFO/MainProcess] Task tasks.add[3835fc6b-68bc-4fd3-b309-c27c47634787] received
[2025-11-04 15:13:58,475: INFO/MainProcess] Task tasks.add[d6dd91b4-8a9c-4a90-b56a-0a755d28bcac] received
[2025-11-04 15:13:58,472: INFO/ForkPoolWorker-8] Task tasks.add[c49d9ba2-6a22-48cb-b191-4c02833ed38f] succeeded in 0.008371459000045434s: 0
[2025-11-04 15:13:58,477: INFO/MainProcess] Task tasks.add[e88dc10d-6706-4d51-8a23-32d7ddd4961d] received
[2025-11-04 15:13:58,476: INFO/ForkPoolWorker-1] Task tasks.add[3835fc6b-68bc-4fd3-b309-c27c47634787] succeeded in 0.009088207996683195s: 2
[2025-11-04 15:13:58,479: INFO/MainProcess] Task tasks.add[e1e592e4-ce45-434c-bfea-30d26ddf0aa9] received
[2025-11-04 15:13:58,479: INFO/ForkPoolWorker-8] Task tasks.add[e88dc10d-6706-4d51-8a23-32d7ddd4961d] succeeded in 0.002237334003439173s: 6
[2025-11-04 15:13:58,481: INFO/MainProcess] Task tasks.add[d8d44646-21f9-41ff-a356-ca5d8909b56e] received
[2025-11-04 15:13:58,481: INFO/ForkPoolWorker-1] Task tasks.add[e1e592e4-ce45-434c-bfea-30d26ddf0aa9] succeeded in 0.0017189170175697654s: 8
[2025-11-04 15:13:58,483: INFO/MainProcess] Task tasks.add[eff6da21-89f4-41c4-b17a-1ff9987b7468] received
[2025-11-04 15:13:58,483: INFO/ForkPoolWorker-8] Task tasks.add[d8d44646-21f9-41ff-a356-ca5d8909b56e] succeeded in 0.0020373330044094473s: 10
[2025-11-04 15:13:58,485: INFO/MainProcess] Task tasks.add[20838029-5e14-4704-9d29-461606c92496] received
[2025-11-04 15:13:58,485: INFO/ForkPoolWorker-9] Task tasks.add[d6dd91b4-8a9c-4a90-b56a-0a755d28bcac] succeeded in 0.007833916984964162s: 4
[2025-11-04 15:13:58,485: INFO/ForkPoolWorker-1] Task tasks.add[eff6da21-89f4-41c4-b17a-1ff9987b7468] succeeded in 0.0016967499977909029s: 12
[2025-11-04 15:13:58,487: INFO/MainProcess] Task tasks.add[6e66a0e5-7600-4222-bfa3-d6d55d0753d4] received
[2025-11-04 15:13:58,487: INFO/ForkPoolWorker-8] Task tasks.add[20838029-5e14-4704-9d29-461606c92496] succeeded in 0.001541542005725205s: 14
[2025-11-04 15:13:58,489: INFO/MainProcess] Task tasks.add[c7af87a4-3355-4c8b-aa55-ebd9d0743ec6] received
[2025-11-04 15:13:58,490: INFO/ForkPoolWorker-8] Task tasks.add[6e66a0e5-7600-4222-bfa3-d6d55d0753d4] succeeded in 0.001410917000612244s: 16
[2025-11-04 15:13:58,494: INFO/MainProcess] Task tasks.sum_all[72907345-2703-4929-a5a8-bcffd64fc3c2] received
[2025-11-04 15:13:58,494: INFO/ForkPoolWorker-1] Task tasks.add[c7af87a4-3355-4c8b-aa55-ebd9d0743ec6] succeeded in 0.0038579999818466604s: 18
[2025-11-04 15:13:58,495: INFO/ForkPoolWorker-8] Task tasks.sum_all[72907345-2703-4929-a5a8-bcffd64fc3c2] succeeded in 0.0006507079815492034s: 90
[2025-11-04 15:13:58,496: INFO/MainProcess] Task tasks.multiply[0fc8731c-f3ee-4d69-bd47-a4ad565573f4] received
[2025-11-04 15:13:58,497: INFO/MainProcess] Task tasks.multiply[1475b671-7952-4427-9d72-6a7e2163f39c] received
[2025-11-04 15:13:58,498: INFO/MainProcess] Task tasks.multiply[766fdbbd-891c-4871-a0d9-dcf51fe8be85] received
[2025-11-04 15:13:58,498: INFO/ForkPoolWorker-8] Task tasks.multiply[0fc8731c-f3ee-4d69-bd47-a4ad565573f4] succeeded in 0.0012470830115489662s: 1
[2025-11-04 15:13:58,499: INFO/ForkPoolWorker-1] Task tasks.multiply[1475b671-7952-4427-9d72-6a7e2163f39c] succeeded in 0.0013905829982832074s: 4
[2025-11-04 15:13:58,500: INFO/MainProcess] Task tasks.multiply[b3a6b8ce-4c43-4ad9-b9e8-c2604cea2f38] received
[2025-11-04 15:13:58,501: INFO/ForkPoolWorker-8] Task tasks.multiply[766fdbbd-891c-4871-a0d9-dcf51fe8be85] succeeded in 0.0015266250120475888s: 9
[2025-11-04 15:13:58,501: INFO/MainProcess] Task tasks.multiply[ebc2b6e7-927e-421f-9e9b-acd49238000a] received
[2025-11-04 15:13:58,502: INFO/ForkPoolWorker-1] Task tasks.multiply[b3a6b8ce-4c43-4ad9-b9e8-c2604cea2f38] succeeded in 0.0011340839846525341s: 16
[2025-11-04 15:13:58,504: INFO/MainProcess] Task tasks.aggregate[385b8fff-edc3-4ac5-9985-1b3d9637c931] received
[2025-11-04 15:13:58,504: INFO/ForkPoolWorker-8] Task tasks.multiply[ebc2b6e7-927e-421f-9e9b-acd49238000a] succeeded in 0.0024131250102072954s: 25
[2025-11-04 15:13:58,505: INFO/ForkPoolWorker-1] Task tasks.aggregate[385b8fff-edc3-4ac5-9985-1b3d9637c931] succeeded in 0.0008877499785739928s: {'count': 5, 'sum': 55, 'min': 1, 'max': 25}
[2025-11-04 15:13:58,507: INFO/MainProcess] Task tasks.add[9af7c5fb-c0dc-494a-83ef-71642bfafc35] received
[2025-11-04 15:13:58,508: INFO/MainProcess] Task tasks.add[f09d9e6f-790b-4b02-95a1-d9a6e46965fa] received
[2025-11-04 15:13:58,509: INFO/ForkPoolWorker-8] Task tasks.add[9af7c5fb-c0dc-494a-83ef-71642bfafc35] succeeded in 0.0013531250006053597s: 0
[2025-11-04 15:13:58,510: INFO/MainProcess] Task tasks.add[8affc0f4-1133-4039-b080-346ef2d0c9d1] received
[2025-11-04 15:13:58,510: INFO/ForkPoolWorker-1] Task tasks.add[f09d9e6f-790b-4b02-95a1-d9a6e46965fa] succeeded in 0.0016105420072562993s: 2
[2025-11-04 15:13:58,511: INFO/MainProcess] Task tasks.add[61cf6011-c4aa-495e-9382-7aa8f06502cb] received
[2025-11-04 15:13:58,512: INFO/ForkPoolWorker-8] Task tasks.add[8affc0f4-1133-4039-b080-346ef2d0c9d1] succeeded in 0.0014727090019732714s: 4
[2025-11-04 15:13:58,513: INFO/MainProcess] Task tasks.add[067ead5e-b36e-4665-a8af-5ab2f50049ef] received
[2025-11-04 15:13:58,513: INFO/ForkPoolWorker-1] Task tasks.add[61cf6011-c4aa-495e-9382-7aa8f06502cb] succeeded in 0.001269708009203896s: 6
[2025-11-04 15:13:58,516: INFO/MainProcess] Task tasks.sum_all[af77db70-06bf-453e-b081-d71c10233cfb] received
[2025-11-04 15:13:58,516: INFO/ForkPoolWorker-8] Task tasks.add[067ead5e-b36e-4665-a8af-5ab2f50049ef] succeeded in 0.0031255419889930636s: 8
[2025-11-04 15:13:58,517: INFO/MainProcess] Task tasks.multiply[dcb26555-493d-42af-8011-a08246b95618] received
[2025-11-04 15:13:58,518: INFO/ForkPoolWorker-1] Task tasks.sum_all[af77db70-06bf-453e-b081-d71c10233cfb] succeeded in 0.0013412090193014592s: 20
[2025-11-04 15:13:58,518: INFO/ForkPoolWorker-8] Task tasks.multiply[dcb26555-493d-42af-8011-a08246b95618] succeeded in 0.000707999977748841s: 40

まとめ

非同期処理で重い処理をバックグラウンドで実行したい時には、分散タスクキューのCeleryを使うといいでしょう。

11
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?