LoginSignup
36
41

【Python Celery】本番運用時に知っておくべき10のこと

Last updated at Posted at 2023-05-08

Read this article in English
0_1.png

エンジニアのみなさん、ちゃんとセロリ食べてますか?

はじめに

CeleryはPython用の非同期タスク処理を行うためのライブラリです。タスクの同時実行数、ヘルスチェック、ブローカー、再スケジューリングなど、多くの設定項目があります。しかし、ドキュメントが古かったり探しづらかったりするため、最初のうちはうまくいかないことが多々あると思います。

Celeryを使いこなせるようになれば、大量のタスクを効率よく処理できます。この記事では、Celeryを本番環境で運用した経験から学んだことを共有したいと思います。

本番運用時に知っておくべき10のこと

1. ヘルスチェックは inspect ping コマンドを使う

Celeryのワーカーのヘルスチェックは、inspect ping コマンドで行うのがおすすめです。

このコマンドを打つと、Celeryはすべてのワーカーの状態を確認しようとします。ワーカーを複数のノードに分けている場合は、--destination celery@${HOSTNAME} のように、確認対象を絞ると、不要なオーバーヘッドを避けることができます。

celery -A myapp inspect ping --destination celery@${HOSTNAME}

Elastic Beanstalk (EB)、Kubernetes、Elastic Container Service (ECS) などのオーケストレーションツールには、ヘルスチェックの間隔・タイムアウト・リトライ回数を設定できます。Celeryのヘルスチェックは、重い計算でCPU使用率が高くなり、pingコマンドの実行に数秒かかることがあるため、以下のような余裕を持った設定がおすすめです。

項目 設定
interval 30
timeout 15
startPeriod 10
retries 5

CeleryのCPU使用率が高すぎる場合は、--concurrency フラグでタスクの同時実行数を制限するのも効果的です。詳しくは#9を参照してください。

2. Pythonコードからタスクを再実行する方法

ネットワーク接続やレート制限の影響でタスクが失敗した時に、リトライすることが有効な対策です。Pythonコードからタスクをリトライするには、まずタスクデコレーターに bind=True フラグを設定してタスクインスタンスを取得します。タスク関数の中で、raise self.retry(...) で処理を中断して再スケジューリングします。

@app.task(bind=True, max_retries=3)
def my_task(self)
    try:
        # .. 外部APIを呼ぶなど ..
    except (ConnectionError, RateLimitExceeded) as e:
        raise self.retry(exc=e, countdown=retry_delay)

retryメソッドに渡せる引数についてはこちらをご覧ください。

特に本番環境では、発生したエラーの詳細情報を残しておくことが重要なので、self.request.retries でタスクのリトライ回数を記録することをおすすめします。回数を記録することで、デバッグやリトライ回数と間隔の調整がしやすくなります。そして、リトライ回数に応じてログレベルを上げていくことで、エラーの緊急度をより自然に表現できます。

@app.task(bind=True, max_retries=3)
def my_task(self)
    try:
        # ...call API...
    except (ConnectionError, RateLimitExceeded) as e:
+       msg = f"User {user_id} API call failed (retries: {self.request.retries}/3)"
+       if self.request.retries < 3:
+           logger.warn(msg)
+       elif self.request.retries == 3:
+           logger.error(msg)
        raise self.retry(exc=e, countdown=retry_delay)

Celeryの観測性(observability)を高めるためには、flowerが有効です。より詳細な観測性を求めるなら分散トレーシングも見ておくといいと思います。

ログをよりロバストで扱いやすくするには、構造化ロギングがおすすめです。

3. タスクはアトミックかつ冪等(べきとう)なものにする

アトミックなタスクでは、すべての処理が成功するか、失敗するか、のどちらかしかあり得ません。

冪等なタスクは、何度実行しても同じ結果(システムに対して同じ変化)をもたらします。

アトミックかつ冪等なCeleryタスクを書くことで、システムの状態を複雑にすることなく気軽にリトライできるようになります。プログラムがよりロバストになります。また、タスクが途中でコケた時に、コケたところから手作業でコマンドを打っていく必要がなくなるので、メンテナンス負担の軽減にもつながります。

以下のサンプルコードで、アトミック/冪等なタスクとそうでないタスクの違いを説明します。

atomic_idempotent_example.py
@app.task
def not_atomic_not_idempotent_task():
    """
    アトミック/冪等ではないタスクの例
    """
    # (BAD) 片方のクエリーが成功し、もう片方が失敗する可能性がある..
    delete_rows()
    insert_rows()
    # (BAD) タスクが実行されるたびにファイル名が変わる..
    now = int(time.time())
    create_file(name="backup-{now}.csv")


@app.task
def atomic_and_idempotent_task(filename):
    """
    アトミック/冪等なタスクの例(transaction.atomic は Djangoフレームワークの関数)
    """
    # (GOOD) 重要なオペレーションが全部成功するか全部失敗する
    with transaction.atomic():
        delete_rows()
        insert_rows()
    # (GOOD) タスクを再実行してもファイル名は変わらない
    create_file(name=filename)

データの完全性を保証するために、外部データに対するCRUD操作を行う時にトランザクションを使った方がいいです。また、Djangoビュー関数からCeleryタスクを実行する時に、レースコンディション(競合状態)が発生することがあるので、回避する方法については データベーストランザクションに関するCeleryのドキュメント をご参照ください。

4. Celeryワーカーが異常終了した時にタスクを再スケジューリングする方法(スポットインスタンスの中断対応)

スポットインスタンスで起動しているCeleryワーカーが中断通知を受けたり、リソース不足で異常終了したりした場合は、デフォルト設定のままではタスクはリトライされません。

スポットインスタンスのコスト削減効果を活かしたいなら、ワーカーが異常終了しても、進行中のタスクが別ワーカーに再スケジューリングされるようにする必要があります。こうするには、以下の2つの設定を有効にする必要があります。

# タスクが完了してからAckする(タスクはべき等であるべき)
task_acks_late = True
# ワーカーの異常終了で、Ackされていないタスクを際スケジューリングする
task_reject_on_worker_lost = True

task_acks_late

task_acks_late は、タスクをAckするタイミングを制御するフラグです。

Ackって何?
Ack は Acknowledge の略で、Celeryワーカー側からブローカー(Redisなど)に対して「このタスク処理は終わったから、キューから削除して大丈夫だよ」を示す合図です。

False (デフォルト) の場合は、タスクが始まったらAckされ、True の場合は、タスクが終了した後にAckされます。タスクが終わった後にAckすることで、進行中のタスクと完了したタスクがきっちり見分けられるようになります。Ackを後にすると、タスクがリトライの対象になるので、べき等にする必要があります。この負担をユーザーに押し付けないために、あえてデフォルト設定を False にしてあるそうです。

task_reject_on_worker_lost

なんらかの理由でCeleryワーカーが異常終了した場合は、そのワーカーが持っているタスクは「失敗」と Ack され、再実行されません。

task_acks_late が有効でも、タスクを実行しているワーカープロセスが異常終了したり、シグナル(KILL/INTなど)を受けたりすると、ワーカーはタスクをAckします(Celeryドキュメントから)。

task_reject_on_worker_lost は、ワーカーが SIGTERM などによって殺されたときに、タスクを再スケジューリングするための設定項目です。task_acks_late と組み合わせることで、Celeryワーカーが異常終了した時に、タスクが自動的に再スケジューリングされます。

Celeryワーカーを100%の確率で異常終了させるようなタスクは、無限に再スケジューリングされます!成功であろうが失敗であろうが、Celeryタスクが必ず完了できるようにすることがポイントです。

visibility_timeout

もうひとつ重要な設定として、ブローカーの visibility_timeout があります。設定する値は「Ackされないタスクを再スケジューリングするまでの秒数」です。デフォルト値は1時間なので、Celeryワーカーが死んだら、実行中だったタスクが一時間後に再スケジューリングされます。この待ち時間を短縮する(もしくは長くする)には、以下のように書きます。

# Ackされないタスクを再スケジューリングするまで5分待ちます。
CELERY_BROKER_TRANSPORT_OPTIONS = {"visibility_timeout": 60 * 5}

visibility_timeout を短く設定しすぎると、タスクが終わる前に再スケジューリングされてしまいます。

原則、短く設定しすぎるより、長く設定しすぎる方がトラブルに繋がりにくいです。私は、最も長いタスクの実行時間にバッファー(例えば30秒)を加えて設定しています。

stopTimeout(ECS) & terminationGracePeriodSeconds(Kubernetes)

Celeryワーカーが SIGTERM などの停止命令を受けても、進行中のタスクが終わるまで止まりません。ECSやKubernetesのようなコンテナオーケストレーションツールは、デフォルトで30秒くらいまで許容してくれますが、それでもCeleryが処理を終えなかったらプロセスを強制的に終了させます。

この「待つ時間」を長くしたい場合は、stopTimeout (ECS) や terminationGracePeriodSeconds (Kubernetes) で調整できます。私はできるだけ Celery に正常終了してほしいので、ありうる最も長いタスク実行時間(例: 2分)に設定しています。

5. スポットインスタンスで動かすタスクの実行時間をなるべく短くする

スポットインスタンスでCeleryワーカーを動かす場合、中断通知を受けてからインスタンスが終了するまでの時間内に、タスクが完了できるようにしましょう。例えば、AWSではスポットインスタンスの中断通知は2分前に来るので(Azureは30秒、GCPは通知なし :cry: )、すべてのCeleryタスクが2分以内に完了するように設計すると良いです。

このやり方には、以下の利点があります。

  1. Celeryは、中断通知(SIGTERM)を受けると、新しいタスクのスケジューリングを拒否し、実行中のタスクが完了するまで待ちます(これをウォームシャットダウンという)。したがって、すべてのタスクが中断通知〜インスタンス終了までの時間内に終われば、時間切れで失敗することはないはずです
  2. 一つ一つのタスクの実行時間を短く抑えるために、長いタスクの中から並列化できる処理を切り抜く方法があります。処理の並列化によって、ワークロードの全体的な実行時間が速くなることがあります。
  3. 実行時間が長いタスクより、短いタスクの方が平等なスケジューリングがしやすいです。例えば、実行時間が1秒で100個あるタスクと、実行時間が5分で4つあるタスクがあるとします。長いタスクが先に実行されたら、短いタスクは5分も待たないといけないので、長いタスクがCPUを独り占めしているわけです。タスクの実行時間を短く抑えれば、優先度の高いタスクが実行されるまでの待ち時間を短縮できます。

6. flower(モニタリングツール)を使う

flowerはCeleryワーカーの健康状態(health)を監視するためのツールです。flowerを利用すると、Celeryワーカーや実行中のタスクの状況を可視化できて、デバッグやパフォーマンスチューニングに役立ちます。

インストールは簡単です。コンテナを使っている場合は、mher/flower コンテナイメージをおすすめします。以下は、compose-specに則ったサービス定義を記載します。

docker-compose.yaml
services:
  flower:
    image: mher/flower:1.2.0
    command: celery flower -A my_app --address=0.0.0.0 --port=5555
    environment:
      FLOWER_ADDRESS: 0.0.0.0
      FLOWER_PORT: 5555
      CELERY_RESULT_BACKEND: redis://redis:6379  # このサンプルコードには redis サービスの
      CELERY_BROKER_URL: redis://redis:6379      # 定義はありません
      FLOWER_PURGE_OFFLINE_WORKERS: 60
    ports:
      - 5555:5555
    healthcheck:
      test:
        [
          "CMD-SHELL",
          "wget --tries 1 --spider http://0.0.0.0:5555/flower/healthcheck || exit"
        ]
      interval: 10s
      timeout: 3s
      retries: 3
    profiles:
      - flower
    # このサンプルコードには redis または celery サービスの定義はありません
    depends_on:
      redis:
        condition: service_healthy
      celery:
        condition: service_healthy

FLOWER_PURGE_OFFLINE_WORKERS を60秒くらいに設定しておくことをおすすめします。これで停止されたワーカーが1分後に画面から消えてくれます。特に新しい Celery ワーカーを頻繁に起動したり停止したりするようなオートスケーリング環境では役に立ちます。

コンテナ化されていないデプロイメントの場合は、systemd のようなサービスマネージャで flower をデーモン化することをおすすめします。

flower.service
[Unit]
Description=Flower - Celery monitoring tool

[Service]
User=celery
Group=celery
WorkingDirectory=/path/to/project
ExecStart=/path/to/project/.venv/bin/flower -A my_app
Environment=FLOWER_ADDRESS=127.0.0.1
Environment=FLOWER_PORT=5555
Environment=CELERY_RESULT_BACKEND=redis://redis:6379
Environment=CELERY_BROKER_URL=redis://redis:6379
Environment=FLOWER_PURGE_OFFLINE_WORKERS=60
Restart=on-failure
Type=simple

[Install]
WantedBy=multi-user.target

Celeryには、通常の Task Event や Worker Event の他、task-startedtask-sent のような追加で有効にできるイベントがあります。例えば、「task-started」イベントを有効にすると、Celeryタスクの実行が始まったことを flower から確認できるようになります。

CELERY_TASK_TRACK_STARTED = True  # ワーカーがタスクを実行する直前に送信されます。
CELERY_TASK_SEND_SENT_EVENT = True  # Sent when a task message is published
CELERY_WORKER_SEND_TASK_EVENTS = True

正直なところ、このイベント情報を業務で使用したことはまだありませんが、オブザーバビリティをよりよくするために念のため有効にしています。

Leek(リーク)について一言

Leekはもう1つのCeleryモニタリングツールです。flowerよりもUIが見えやすく、イベント情報がElasticsearchに保存されるので、永久ストレージと高速な検索を実現しています。すでにElasticsearchを使っていて、Celeryのオブザーバービリティをさらに向上させたいのであれば、Leekを検討してみてください(注意:私はLeekを本番運用で使ったことはありません)。

7. ブローカー(Redis,RabbitMQ,SQS)の特徴と注意点

名前 リモートコマンド バックエンドとして利用できる 配信セマンティクス
Redis At-most-once
RabbitMQ At-least-once
Amazon SQS × × At-least-once

Redis

Redisは多くのソフトウェアプロジェクトで使われている有名なインメモリデータベースです。ブローカーとして利用するために、Celeryは Redis Pub/Sub という機能を使っています。

Redisはリモートコマンドに対応しているので、celery inspect ping でヘルスチェックしたり、celery control purge でキューからメッセージを削除したりできます。また、ストレージバックエンドとしても使えるので、chord() 関数でタスクのコールバックを定義したり Task.result() メソッドでタスクの実行結果を取得したりできます。

注意点

Redisは小さめのメッセージを扱うのに向いているので、メッセージが大きすぎるとパフォーマンスが悪くなる可能性があるそうです。リクエストパラメータや戻り値に多くのデータを含めないように設計する必要があります。

また、Redis Pub/Subは、At-most-once配信セマンティクスを実装しています。つまり、メッセージは最大で1回送信されるということです。例えば、ブローカーからCeleryワーカーにメッセージ送信した直後に、Celeryワーカーとの接続が切れてメッセージが届かなかった場合は、そのメッセージは失われます。こういう時に、タスクが再スケジューリングされるように、task_reject_on_worker_lost (#4参照) を有効にしましょう。

RabbitMQ

RabbitMQは、広く使われているメッセージブローカーのOSSです。Redisと同様に、リモートコマンドに対応しています。また、At-least-once配信セマンティクスを実装しているため、メッセージは最低でも1回送信される保証があります。

目的はあくまでメッセージの送受信なので、Redisのようにin-memoryデータベースとして併用できるものではありません。

注意点

RabbitMQをストレージバックエンドとして利用する場合は、Redisと挙動が異なるため、注意が必要です。

具体的に、RabbitMQは rpc:// バックエンドを使うため、タスク結果を「保存する」のではなく、メッセージとして送信します。そのため、タスクの実行結果取得について以下の2つの条件が課されます。

  1. メッセージを発行したクライアントのみから取得できる
  2. 1回しか取得できない

つまり、他のシステムからCeleryタスクの結果を確認したり、コマンドラインからアドホックに結果を取得したりするユースケースには向いていないかもしれません。

SQS

Amazon SQSは、AWSのメッセージキューサービスです。スケーリングのしやすさでいうと、RedisとRabbitMQより優れていると言えます。また、RabbitMQと同じようにAt-least-once配信セマンティクスを実装しているので、ネットワークエラーやメモリー不足でメッセージが失われる心配はないです。AWSユーザーとして、最初は魅力的な選択肢です。

注意点

SQSは、Celeryイベントやリモートコントロールコマンドに対応していないため、celery inspect ping でワーカーのヘルスチェックができず、flowerのような監視ツールも使えません。Celeryの Monitoring and Management Guide が実質役に立たなくなるので、ご認識ください。

私は上記の理由でSQSを避けますが、以下のような工夫をして独自の監視体制を整えれば、なんとかなるでしょう。

責任 Redis/RabbitMQの場合 SQSの場合
ヘルスチェック celery inspect ping SQSとの接続状況やPIDを確認するPythonスクリプトを定期実行する
Celeryワーカー監視 flower or celery status EC2やECSのCloudWatchメトリクスをCelery用のダッシュボードに追加する
キューの監視 flower SQSのCloudWatchメトリクスをCelery用のダッシュボードに追加する
キューから特定のタスクの消去 flower / celery control revoke 不可 :cry:
キューからすべてのタスクの消去 celery purge -Q foo PurgeQueue APIを叩くスクリプトを用意するか、手動でSQSのマネコンから「キューの消去」を行う

8. 長いタスクがある場合は worker_prefetch_multiplier=1 を設定する

デフォルト設定では、Celeryはブローカーから一度に4つのタスクをあらかじめ取得し、順番に実行します。このやり方は「プリフェッチ」と呼ばれます。タスクをプリフェッチすることで、ブローカーへのリクエスト回数が減りますし、タスクはすでにメモリ上にあるため、次のタスクの実行開始までの時間が短縮されます。

しかし、長いタスクがある場合、プリフェッチによって一部のCeleryワーカーがすべてのタスクを奪ってしまい、他のCeleryワーカーが待ち状態になってしまう可能性があります。これを防ぐためにプリフェッチを無効にする必要があります。

Celeryの worker_prefetch_multiplier という設定は、一度にプリフェッチできるタスク数を制御します。1 に設定することで、実質プリフェッチが無効化され、タスクを一つ一つブローカーから取得するようになります。数秒以上のような長いタスクがある場合は、1 に設定しましょう。

# 長いタスクがあるため、Celeryワーカーが待ち状態にならないようにプリフェッチを無効化します。
worker_prefetch_multiplier = 1

一方で、すべての Celery タスクが短い (たとえば 1 秒未満) かつブローカーとCeleryワーカーのやり取りで生じるオーバーヘッドを最小限にしたいなら、むしろ worker_prefetch_multiplier64128 などの高い値に設定することが推奨されます。

9. --concurrency に CPU コア数を設定する

--concurrency パラメータは、一つのCeleryワーカーが一度に処理できるタスク数を制御します。この値が高すぎると、Celeryワーカーが頑張りすぎて、ヘルスチェックの失敗の原因に繋がることがあります。安定的な実行を実現するために、ほとんどの場合は --concurrency にマシンの 物理的なCPUコア数 を設定することをおすすめします。

--concurrency が未指定の場合、os.cpu_count() が使われます。ただ、このカウントには「vCPU」という仮想CPUが含まれるため、実際の物理的コア数と比べて膨れ上がっている可能性があります。CPUに負荷がかかるタスクがある場合、Celeryが処理しきれないほどの仕事を背負ってしまうかもしれません。

以下、--concurrency の設定例をいくつか紹介します。(詳細は、AWSのドキュメント「インスタンスタイプ別の CPU コアごとの CPU コア数とスレッド数」をご覧ください。)

ランタイム vCPU --concurrency
ECS (Fargate) 1024 1
EC2 (>=t4) 64 64
EC2 (<=t3) 64 32

--concurrency=物理的コア数 という設定は万全な対策ではありませんし、Celeryワーカーに1つ以上のタスクを処理できる場合もあるでしょう。ただ、異なるスペックを必要とするタスクを1つのCeleryワーカーが実行しなければいけない場合は、この戦略はより安全だと思います。最悪の場合、CPUの使用率が低下しますが、そうなった場合はスポットインスタンスで費用を安くしてみてはいかがでしょうか。

10. 複数のタスクの状態を確認する方法(get() を使用しない)

groupchunks などのCanvas関数でタスクのグループを作ることができます。この関数は、GroupResult オブジェクトに解決されるプロミスを返します。このオブジェクトには、グループ内のタスクの状態を確認するためのメソッドがいくつか用意されています。また、このメソッドはキャッシュを利用しているため、ブローカーへの負荷が減ります。

メソッド 説明
waiting Ready でないタスクがある場合、True を返す
ready すべてのタスクが完了したかどうか(成功/失敗問わず)
completed_count 成功したタスクの数
successful 全てのタスクが成功した場合、True を返す
failed いずれかのタスクが失敗した場合、True を返す

タスクの戻り値が不要な場合は、get() の代わりに waiting() を使ってタスクの進捗を確認できます。

task_group = group([long_job.s(1, 2), long_job.s(3, 4)])
group_result = task_group()

ts_start = time.time()
while group_result.waiting():
    ts_now = time.time()
    elapsed = round(ts_now - ts_start, 1)
    logger.info(f"実行中。経過時間: {elapsed}")
    time.sleep(60)

# 不要な結果を破棄する
group_result.forget()

タスクの戻り値がなくて GroupResult.get() を実行すると、None だけが入っているリストが返ってきます。

>>> group_of_tasks_with_no_return_values.get()
[None, None, None, None, None, None, None, None, None, None, ...]

この無意味なデータはブローカーから取得されているので、効率悪いですよね。代わりに waiting()forget() を使ってみましょう。

終わりに

Celeryは素晴らしいフレームワークですが、設定項目が多くてコツを掴むまで時間がかかります。この記事では、ヘルスチェック、タスクの再スケジューリング、プリフェッチするタスク数の制限、ブローカーの選択など、主に本番環境での運用経験から学んできたことを説明しました。Celeryキューの分割方法やタスクのプライオリティ(優先度)の設定など、触れていないトピックもあるので網羅的なリストではないですが、少しでもお役に立てれば幸いです。

Celeryメンテナはちょっとしたドキュメント修正も歓迎するので、ぜひOSS貢献を検討してください。

その他参考になる情報

36
41
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
36
41