Celeryとは
- 非同期のタスクキューです。Rubyだと
ActiveJob
、Sidekiq
、Delayed_job
に相当します。ざっくり言うと非同期でアプリケーションを動かすためのフレームワークです。
DjangoにCeleryを導入する
パッケージを追加
celery
flower # Celeryのダッシュボード
django-celery-results # タスクの状況を見れる
django-redis # タスクのキューをredisにする場合
cached-property
/config/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'xxxxxx') # Django側の設定と同じにする
app = Celery('proj')
app.config_from_object('django.conf:settings', namespace='CELERY') # Celeryの設定ファイルの場所
app.autodiscover_tasks()
config/__init__.py
- このファイルはceleryを起動するときに利用します。
from .celery import app as celery_app
__all__ = ('celery_app',)
config/settings.py
DJANGO_APPS = [
# 〜〜〜
'django_celery_results',
]
CELERY_BROKER_URL = 'redis://redis:6379/0'
CELERY_TIMEZONE = 'Asia/Tokyo'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_CACHE_BACKEND = 'django-cache'
CELERY_TASK_TRACK_STARTED = True
CELERY_IMPORTS = ['worker.sample_task'] # タスクの場所
worker/sample_task.py
from celery import shared_task
@shared_task(name='sample_task_execute') # 外部から呼び出ししたい場合は、nameをつける
def execute(something: str):
print(f'============= something={something} =============')
app/views/sample_view.py タスク呼び出しもと
from worker.sample_task import execute
execute.delay('hogefuga') # メソッド名にdelayをつけて呼び出す
Celeryを起動する。
- CeleryはDjangoとは別に起動する必要があります。
# config/__init__.pyで定義したので、 -A config.celeryではなく、-A configと出来る
celery -A config worker -l INFO
Flowerを起動したい場合
celery -A config flower --address=0.0.0.0 --port=5555
Djangoの外側からCeleryを呼び出したい場合
-
send_task
を使うことにより外部から呼び出せる
from celery import Celery
celery = Celery('webapi')
celery.conf.broker_url = 'redis://redis:6379/0'
celery.send_task('sample_task_execute', kwargs={'something': 'Hallo World'})
※ DBと接続できるなら、celery.conf.result_backend
を利用して結果をDBに保存できる