1. Celeryとは
Pythonで記述された非同期実行ツールで、時間のかかるタスクをたくさん受け付けるようなシステムで非同期かつ順番に実行していくような使い方をしたり、スケジュールを決めておいて定期実行したりできます。DjangoやFlaskとの親和性が高く、組み合わせて使われることが多いようです
github
https://github.com/celery/celery
ドキュメント
https://docs.celeryq.dev/en/stable/index.html
2. Celeryの仕組み
2-1. 基本編
基本の仕組みでは以下の3つが登場します
2, 3に待機しておいてもらって1から指示を送ります
- プロデューサー(Pythonプログラムなど)
- メッセージブローカー(Redis, RabbitMQなど)
- ワーカー(Celery)
動作としては以下のようになります
- プロデューサーがタスクをメッセージブローカーに登録する
- メッセージブローカーはタスク情報置き場として機能する
- Celeryはメッセージブローカーにタスクが届いているか常時チェックする
- メッセージブローカーにタスクが届くとCeleryワーカーが実行する
- タスク終了後、Celeryが次のタスクが来てるかメッセージブローカーをチェックする
- タスクがなければ次のタスクがくるまで待機する
メッセージブローカーによってCelery側の監視方法が異なるので、選択したメッセージブローカーによってはタスク到着から少し遅れてCeleryが応答するものもありますが、Redisの場合はTCPでCeleryが送ったリクエストに応答しないままにして、タスクが届いたらそれを応答として渡すBLPOPなる仕組みで瞬時にタスク到着が連絡される仕組みになっているようです
2-2. 定期実行
プロデューサーとしてCelery Beatを使うと定期実行を行うことができます
Celery Beatに実行スケジュールを指示しておくと、実行するタイミングでメッセージブローカーにタスクを書きにいってくれて、それに応じてCeleryが実行してくれるという仕組みです
スケジュールは以下の方法で指定できます
- コードで書いておく
- DjangoのDBを借りてスケジュールを記録参照する
- Celery BeatのAPIで指示を受けて実行する
決まったスケジュールがあるならコードで書いておけば良いですし、管理者がちょくちょく変更するならDjangoのDBに記録しておくのが良さそうです。APIによる指示の場合もDjangoのDBに変更を記録するようにできますので、動的に変更したり、SPAから変更したりできるようです
3. 環境構築
今回使うRedis, Celery, Celery beat, プロデューサーはどれもPythonで動作しますので、ローカルにインストールだけしてやれば同じ環境で動かすことができます。実際の運用では死活監視やスケールを考えて別々のdockerコンテナで分けて動かすような運用の方が良さげです。特にCeleryワーカーは分けておくことで負荷分散が非常にやりやすくなりますので分けるべきだと思います
3-1. Celeryのインストール
CeleryはPythonで動きますので適当なPython環境を作ってpipで入れれば動きます
pip install Celery
特定のメッセージブローカーに限定したビルドを指定してインストールすることもできます
pip install celery[redis]
3-2. redisの環境構築
今回はRedisをメッセージブローカーとして用います
公式のdockerイメージがあるのでdocker runするだけで動きます
https://hub.docker.com/_/redis
docker run -d --name redis-server -p 6379:6379 redis:latest
4. 基本の使い方
4-1. Celeryワーカーで非同期処理する
4-1-1. 定義ファイルの準備
Celeryワーカーとプロデューサーを両方ローカルで用意して実行してみましょう
まず以下のようにCeleryアプリケーションを定義してタスクを登録するコードを準備します
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
app.conf.result_backend = 'redis://localhost:6379/0'
app.conf.broker_connection_retry_on_startup = True
@app.task
def add(x, y):
from time import sleep
sleep(10)
return x + y
最初に、app = Celery()でCeleryのインスタンス(公式にはCeleryアプリケーションと呼ぶようです)を作成します。第1引数が名称で、第2引数でメッセージブローカーを指定していて、上記ではlocalhostの6379ポートにあるredisの0番DBを指定しています
これに加えてapp.confに色々設定を書いていくことでログの残し方などの動作を設定できます。上記では処理結果をredisに書きだしておくように設定して、接続時のメッセージブローカーへの接続リトライを推奨通り有効にしています
最後に、@app.taskを付けて関数を定義することで実行するタスクを指定します。Celeryワーカーは起動した時点で設定されているタスクしか実行できない為、Celeryワーカー側もこの情報をもっておく必要があります
4-1-2. Celeryワーカーの準備
上記のhoge.pyファイルがあるディレクトリでCeleryが入っているPython環境を有効にして以下を実行するとCeleryワーカーが起動します
celery -A hoge worker --loglevel=info --concurrency=5
-A hogeでhoge.pyを指定しています
--loglevel=infoはログ書き出しレベルをINFOに設定
--concurrency=5はワーカー数を5に指定します
4-1-3. Celeryタスクの実行
hoge.pyファイルがあるディレクトリでCeleryが入っているPython環境を有効にしてPythonコンソールから以下のように実行するとCeleryタスクを実行できます
>>> import hoge
>>> res = hoge.add.delay(1, 2)
>>> res.ready()
False
>>> res.ready()
True
>>> res.get()
3
hoge.addという関数は自分で定義したものですが、設定した覚えがないdelayというメソッドが付いています。これは@app.taskが追加してくれたCeleryの拡張メソッドで、delay()は非同期で実行してくれるメソッドです
ready()でTrueが返ってきたらタスクが終了しているのでget()で結果を受け取れるようになります
他にもリトライや中止などの拡張メソッドがあるようです
https://docs.celeryq.dev/en/stable/userguide/tasks.html
https://docs.celeryq.dev/en/stable/reference/celery.app.task.html
4-2. Celery beatで定期実行する
4-2-1. 手順
Celery beatにスケジュールを指定して起動させておくと、指示通りの時間にプロデューサーとしてタスクをメッセージブローカーに送ってくれます。設定はワーカーと同じでスケジュールを足すだけです
from celery import Celery
import datetime
# Celeryインスタンスを作成
app = Celery('tasks')
app.config_from_object('celery_config') # 設定を読み込む
# タスクを定義
@app.task
def sample_task():
print(f"タスク実行: {datetime.datetime.now()}")
return "Task Completed"
from celery.schedules import crontab
broker_url = 'redis://redis:6379/0'
result_backend = broker_url
beat_schedule = {
'sample-task-every-minute': {
'task': 'tasks.sample_task',
'schedule': crontab(minute='*'), # 毎分実行
},
}
# タイムゾーン設定
timezone = 'Asia/Tokyo'
enable_utc = True
celery -A tasks worker --loglevel=info
celery -A tasks beat --loglevel=info
4-2-2. スケジュールの設定方法
以下のscheduleにcrontab()で渡して指定します
以下だと1分ごとに毎分実行します
beat_schedule = {
'sample-task-every-minute': {
'task': 'tasks.sample_task',
'schedule': crontab(minute='*'), # 毎分実行
},
}
それ以外だと以下のようになります
'schedule': crontab(minute='*'),
'schedule': crontab(minute='17'),
'schedule': crontab(hour=5),
'schedule': crontab(hour=9, minute=0, day_of_week=1)
'schedule': crontab(hour=4, day_of_month=10)
'schedule': crontab(hour=4, day_of_month=1, month_of_year=4)
4-3. dockerで使う
4-2で実行した内容をdockerでやると以下のようになります
https://github.com/haneya-studio/celery_sample/tree/main
5. Djangoで使う
DjangoとCelery workerやCelery beatを統合して利用することで、ワーカーの動作ログやbeatのスケジュール管理をDjangoの管理画面から閲覧編集できるようになります
その場合は、メッセージブローカーやCeleryの設定、スケジュールをDjangoのsetting.pyに書く、tasks.pyもDjangoプロジェクト内に置くのが推奨のようです。同じ環境からDjango, Celeryを実行する場合はそのまま同じファイルを参照すれば良いですが、コンテナやサーバーを分ける場合はDjangoプロジェクトを複製するなどしてそれぞれに置いてやります
5-2. Celery beatで定期実行する
/mysite/
/mysite
setting.py
/tasks
tasks.py
5-2-1. setting.pyの準備
Django環境とCelery環境にdjango_celery_resultsをインストールします
pip install django_celery_results django_celery_beat
setting.pyに
・アプリ一覧へのdjango_celery_resultsの追加
・メッセージブローカーの設定
・Celeryワーカーの設定
を書きます
from celery.schedules import crontab
INSTALLED_APPS = [
...
'django_celery_results',
'django_celery_beat',
...
]
CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'redis://redis:6379/0')
CELERY_RESULT_BACKEND = 'django-db'
CELERY_RESULT_EXPIRES = 86400 # 1 day in seconds
CELERY_BEAT_SCHEDULE = {
'sample-task-every-minute': {
'task': 'tasks.tasks.sample_task',
'schedule': crontab(minute='*'),
},
}
CELERY_TIMEZONE = 'Asia/Tokyo'
CELERY_ENABLE_UTC = False
上記でCelery用のテーブルがDjangoのDBに追加されるので、以下で変更を反映します
python manage.py migrate
5-2-2. taskの定義
Celeryワーカーとプロデューサーを両方ローカルで用意して実行してみましょう
まず以下のようにCeleryアプリケーションを定義してタスクを登録するコードを準備します
import os
import datetime
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')
app = Celery('mysite')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.task
def sample_task():
print(f"タスク実行: {datetime.datetime.now()}")
return "Task Completed"
5-2-3. DjangoとCeleryとCelery beatの起動
Djangoを起動します
python manage.py runserver
上記のtasks.pyファイルがあるディレクトリでCeleryが入っているPython環境を有効にして以下を実行するとCeleryワーカーとCelery beatが起動します
celery -A tasks worker --loglevel=info
celery -A tasks beat --loglevel=info
上手くいっていると http://localhost:8000/admin にCeleryの実行履歴やスケジュールタスク設定が表示されます。今回はsetting.pyに1分間隔で実行するスケジュールを書いていますが、ここにスケジュールを書くこともできます
5-3. タスクでDjangoモデルに書き込む
django.setup()するとCelery側からDjangoのモデルにアプローチできるようになります。modelのimportはモデルが重複してimportされないようにタスク関数の中で実行した方が良いようです
以下はweatherアプリ内のLogモデルについてcreate()しています
import os
import datetime
import django
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings') # プロジェクト名を適宜変更
app = Celery('mysite')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
django.setup()
@app.task
def sample_task():
from weather.models import Log
print(f"タスク実行: {datetime.datetime.now()}")
Log.objects.create()
return "Task Completed"
まとめ
少ない作業量で定期実行や非同期処理が実装できるのは魅力だと思います。ワーカーを別コンテナで動かせるのでロードバランサーも使いやすいしすごく良いフレームワークですが、動作や設定のやり方が分かりにくいので慣れるまでは大変かもしれません。とりあえずシンプルな環境でどうやったらちゃんと動くのか把握してからやり始めるのが良さげです