1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Celery入門

Last updated at Posted at 2024-11-24

1. Celeryとは

Pythonで記述された非同期実行ツールで、時間のかかるタスクをたくさん受け付けるようなシステムで非同期かつ順番に実行していくような使い方をしたり、スケジュールを決めておいて定期実行したりできます。DjangoやFlaskとの親和性が高く、組み合わせて使われることが多いようです

github
https://github.com/celery/celery

ドキュメント
https://docs.celeryq.dev/en/stable/index.html

2. Celeryの仕組み

2-1. 基本編

基本の仕組みでは以下の3つが登場します
2, 3に待機しておいてもらって1から指示を送ります

  1. プロデューサー(Pythonプログラムなど)
  2. メッセージブローカー(Redis, RabbitMQなど)
  3. ワーカー(Celery)

動作としては以下のようになります

  1. プロデューサーがタスクをメッセージブローカーに登録する
  2. メッセージブローカーはタスク情報置き場として機能する
  3. Celeryはメッセージブローカーにタスクが届いているか常時チェックする
  4. メッセージブローカーにタスクが届くとCeleryワーカーが実行する
  5. タスク終了後、Celeryが次のタスクが来てるかメッセージブローカーをチェックする
  6. タスクがなければ次のタスクがくるまで待機する

メッセージブローカーによってCelery側の監視方法が異なるので、選択したメッセージブローカーによってはタスク到着から少し遅れてCeleryが応答するものもありますが、Redisの場合はTCPでCeleryが送ったリクエストに応答しないままにして、タスクが届いたらそれを応答として渡すBLPOPなる仕組みで瞬時にタスク到着が連絡される仕組みになっているようです

2-2. 定期実行

プロデューサーとしてCelery Beatを使うと定期実行を行うことができます

Celery Beatに実行スケジュールを指示しておくと、実行するタイミングでメッセージブローカーにタスクを書きにいってくれて、それに応じてCeleryが実行してくれるという仕組みです

スケジュールは以下の方法で指定できます

  1. コードで書いておく
  2. DjangoのDBを借りてスケジュールを記録参照する
  3. Celery BeatのAPIで指示を受けて実行する

決まったスケジュールがあるならコードで書いておけば良いですし、管理者がちょくちょく変更するならDjangoのDBに記録しておくのが良さそうです。APIによる指示の場合もDjangoのDBに変更を記録するようにできますので、動的に変更したり、SPAから変更したりできるようです

3. 環境構築

今回使うRedis, Celery, Celery beat, プロデューサーはどれもPythonで動作しますので、ローカルにインストールだけしてやれば同じ環境で動かすことができます。実際の運用では死活監視やスケールを考えて別々のdockerコンテナで分けて動かすような運用の方が良さげです。特にCeleryワーカーは分けておくことで負荷分散が非常にやりやすくなりますので分けるべきだと思います

3-1. Celeryのインストール

CeleryはPythonで動きますので適当なPython環境を作ってpipで入れれば動きます

terminal
pip install Celery

特定のメッセージブローカーに限定したビルドを指定してインストールすることもできます

terminal
pip install celery[redis]

3-2. redisの環境構築

今回はRedisをメッセージブローカーとして用います
公式のdockerイメージがあるのでdocker runするだけで動きます
https://hub.docker.com/_/redis

terminal
docker run -d --name redis-server -p 6379:6379 redis:latest

4. 基本の使い方

4-1. Celeryワーカーで非同期処理する

4-1-1. 定義ファイルの準備

Celeryワーカーとプロデューサーを両方ローカルで用意して実行してみましょう
まず以下のようにCeleryアプリケーションを定義してタスクを登録するコードを準備します

hoge.py
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')
app.conf.result_backend = 'redis://localhost:6379/0'
app.conf.broker_connection_retry_on_startup = True

@app.task
def add(x, y):
    from time import sleep
    sleep(10)
    return x + y

最初に、app = Celery()でCeleryのインスタンス(公式にはCeleryアプリケーションと呼ぶようです)を作成します。第1引数が名称で、第2引数でメッセージブローカーを指定していて、上記ではlocalhostの6379ポートにあるredisの0番DBを指定しています

これに加えてapp.confに色々設定を書いていくことでログの残し方などの動作を設定できます。上記では処理結果をredisに書きだしておくように設定して、接続時のメッセージブローカーへの接続リトライを推奨通り有効にしています

最後に、@app.taskを付けて関数を定義することで実行するタスクを指定します。Celeryワーカーは起動した時点で設定されているタスクしか実行できない為、Celeryワーカー側もこの情報をもっておく必要があります

4-1-2. Celeryワーカーの準備

上記のhoge.pyファイルがあるディレクトリでCeleryが入っているPython環境を有効にして以下を実行するとCeleryワーカーが起動します

terminal
celery -A hoge worker --loglevel=info --concurrency=5

-A hogeでhoge.pyを指定しています
--loglevel=infoはログ書き出しレベルをINFOに設定
--concurrency=5はワーカー数を5に指定します

4-1-3. Celeryタスクの実行

hoge.pyファイルがあるディレクトリでCeleryが入っているPython環境を有効にしてPythonコンソールから以下のように実行するとCeleryタスクを実行できます

Pythonコンソール
>>> import hoge
>>> res = hoge.add.delay(1, 2)
>>> res.ready()
False
>>> res.ready()
True
>>> res.get()
3

hoge.addという関数は自分で定義したものですが、設定した覚えがないdelayというメソッドが付いています。これは@app.taskが追加してくれたCeleryの拡張メソッドで、delay()は非同期で実行してくれるメソッドです

ready()でTrueが返ってきたらタスクが終了しているのでget()で結果を受け取れるようになります

他にもリトライや中止などの拡張メソッドがあるようです
https://docs.celeryq.dev/en/stable/userguide/tasks.html
https://docs.celeryq.dev/en/stable/reference/celery.app.task.html

4-2. Celery beatで定期実行する

4-2-1. 手順

Celery beatにスケジュールを指定して起動させておくと、指示通りの時間にプロデューサーとしてタスクをメッセージブローカーに送ってくれます。設定はワーカーと同じでスケジュールを足すだけです

tasks.py
from celery import Celery
import datetime

# Celeryインスタンスを作成
app = Celery('tasks')
app.config_from_object('celery_config')  # 設定を読み込む

# タスクを定義
@app.task
def sample_task():
    print(f"タスク実行: {datetime.datetime.now()}")
    return "Task Completed"
celery_config.py
from celery.schedules import crontab

broker_url = 'redis://redis:6379/0'
result_backend = broker_url

beat_schedule = {
    'sample-task-every-minute': {
        'task': 'tasks.sample_task',
        'schedule': crontab(minute='*'),  # 毎分実行
    },
}

# タイムゾーン設定
timezone = 'Asia/Tokyo'
enable_utc = True
terminalからworkerを起動
celery -A tasks worker --loglevel=info
terminalからbeatを起動
celery -A tasks beat --loglevel=info

4-2-2. スケジュールの設定方法

以下のscheduleにcrontab()で渡して指定します
以下だと1分ごとに毎分実行します

celery_config.py
beat_schedule = {
    'sample-task-every-minute': {
        'task': 'tasks.sample_task',
        'schedule': crontab(minute='*'),  # 毎分実行
    },
}

それ以外だと以下のようになります

毎分実行
'schedule': crontab(minute='*'),
毎時17分に実行
'schedule': crontab(minute='17'),
毎日5時に実行
'schedule': crontab(hour=5),
毎週月曜日の朝9時0分に実行
'schedule': crontab(hour=9, minute=0, day_of_week=1)
毎月10日4時に実行
'schedule': crontab(hour=4, day_of_month=10)
毎年4月1日4時に実行
'schedule': crontab(hour=4, day_of_month=1, month_of_year=4)

4-3. dockerで使う

4-2で実行した内容をdockerでやると以下のようになります
https://github.com/haneya-studio/celery_sample/tree/main

5. Djangoで使う

DjangoとCelery workerやCelery beatを統合して利用することで、ワーカーの動作ログやbeatのスケジュール管理をDjangoの管理画面から閲覧編集できるようになります

その場合は、メッセージブローカーやCeleryの設定、スケジュールをDjangoのsetting.pyに書く、tasks.pyもDjangoプロジェクト内に置くのが推奨のようです。同じ環境からDjango, Celeryを実行する場合はそのまま同じファイルを参照すれば良いですが、コンテナやサーバーを分ける場合はDjangoプロジェクトを複製するなどしてそれぞれに置いてやります

5-2. Celery beatで定期実行する

ディレクトリ構造
/mysite/
    /mysite
        setting.py
    /tasks
        tasks.py

5-2-1. setting.pyの準備

Django環境とCelery環境にdjango_celery_resultsをインストールします

terminal
pip install django_celery_results django_celery_beat

setting.pyに
・アプリ一覧へのdjango_celery_resultsの追加
・メッセージブローカーの設定
・Celeryワーカーの設定
を書きます

setting.py
from celery.schedules import crontab

INSTALLED_APPS = [
    ...
    'django_celery_results',
    'django_celery_beat',
    ...
]

CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'redis://redis:6379/0')
CELERY_RESULT_BACKEND = 'django-db'
CELERY_RESULT_EXPIRES = 86400  # 1 day in seconds
CELERY_BEAT_SCHEDULE = {
    'sample-task-every-minute': {
        'task': 'tasks.tasks.sample_task',
        'schedule': crontab(minute='*'),
    },
}
CELERY_TIMEZONE = 'Asia/Tokyo'
CELERY_ENABLE_UTC = False

上記でCelery用のテーブルがDjangoのDBに追加されるので、以下で変更を反映します

terminal
python manage.py migrate

5-2-2. taskの定義

Celeryワーカーとプロデューサーを両方ローカルで用意して実行してみましょう
まず以下のようにCeleryアプリケーションを定義してタスクを登録するコードを準備します

/mysite/tasks/tasks.py
import os
import datetime
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')
app = Celery('mysite')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

@app.task
def sample_task():
    print(f"タスク実行: {datetime.datetime.now()}")
    return "Task Completed"

5-2-3. DjangoとCeleryとCelery beatの起動

Djangoを起動します

Djangoを起動
python manage.py runserver

上記のtasks.pyファイルがあるディレクトリでCeleryが入っているPython環境を有効にして以下を実行するとCeleryワーカーとCelery beatが起動します

workerの起動
celery -A tasks worker --loglevel=info
beatの起動
celery -A tasks beat --loglevel=info

上手くいっていると http://localhost:8000/admin にCeleryの実行履歴やスケジュールタスク設定が表示されます。今回はsetting.pyに1分間隔で実行するスケジュールを書いていますが、ここにスケジュールを書くこともできます

image.png

5-3. タスクでDjangoモデルに書き込む

django.setup()するとCelery側からDjangoのモデルにアプローチできるようになります。modelのimportはモデルが重複してimportされないようにタスク関数の中で実行した方が良いようです

以下はweatherアプリ内のLogモデルについてcreate()しています

tasks.py
import os
import datetime
import django
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')  # プロジェクト名を適宜変更
app = Celery('mysite')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

django.setup()

@app.task
def sample_task():
  from weather.models import Log
  print(f"タスク実行: {datetime.datetime.now()}")
  Log.objects.create()
  return "Task Completed"

まとめ

少ない作業量で定期実行や非同期処理が実装できるのは魅力だと思います。ワーカーを別コンテナで動かせるのでロードバランサーも使いやすいしすごく良いフレームワークですが、動作や設定のやり方が分かりにくいので慣れるまでは大変かもしれません。とりあえずシンプルな環境でどうやったらちゃんと動くのか把握してからやり始めるのが良さげです

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?