概要
-
https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html
-
以上のチュートリアルに情報を補足させた記事です。
-
ソースコードもGithubに置いておきます。
DjangoとCeleryの連携
- シンプルなDockerコンテナを用意します。
FROM python:3.8
RUN apt-get update
RUN apt-get install -y vim less tmux
RUN pip install --upgrade pip
EXPOSE 8000
version: '3'
services:
python:
build: .
container_name: 'python3'
working_dir: '/usr/local/src'
tty: true
ports:
- "8000:8000"
volumes:
- .:/usr/local/src
redis:
image: "redis:latest"
ports:
- "6379:6379"
volumes:
- "./data/redis:/data"
- Djangoプロジェクトを用意します。ブローカーはタスクのIDがuuidだったりしてsqlite3ではきつい可能性があります。
$ pip install django==3.2
$ pip install Redis psycopg2
$ django-admin startproject proj .
- proj/celery.py
import os
from celery import Celery
# 'celery' プログラムのためのデフォルトの Django 設定モジュールを設定します。
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery('proj')
# settings.pyにCelory関連の設定を書くときにCELERY_*という接頭辞をつけることができる
app.config_from_object('django.conf:settings', namespace='CELERY')
# Djangoに登録された全ての タスクモジュールをロードします。
app.autodiscover_tasks()
# bind=Trueは現在のタスクインスタンスを簡単に取得でいる
# このタスク自体はリクエスト情報を出力する
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
- proj/__init__.py
# django起動時にアプリが読み込まれ
# @shared_taskデコレータを利用できるようになる
from .celery import app as celery_app
__all__ = ('celery_app',)
Djangoアプリの作成
$ python manage.py startapp demoapp
- demoapp/models.py
from django.db import models
class Widget(models.Model):
name = models.CharField(max_length=140)
- proj/settings.py
INSTALLED_APPS = [
...
'demoapp.apps.DemoappConfig'
]
- demoapp/tasks.py
from demoapp.models import Widget
from celery import shared_task
# チュートリアルから2つモデルが絡んでいるタスクを持ってきました。
# @shared_taskデコレータをつけると具体的なCeleryインスタンスがなくてもタスクを定義できる
@shared_task
def count_widgets():
return Widget.objects.count()
@shared_task
def rename_widget(widget_id, name):
w = Widget.objects.get(id=widget_id)
w.name = name
w.save()
Result Backend
- django-celery-results拡張機能でDjangoo ORM、DjangoCacheフレームワークを選択。
$ pip install django-celery-results
- proj/settings.py
INSTALLED_APPS = (
...
'django_celery_results',
)
CELERY_RESULT_BACKEND = 'django-db'
# DjangoCacheにするには
CELERY_RESULT_BACKEND = 'django-cache'とする
CELERY_CACHE_BACKEND = 'default'
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.db.DatabaseCache',
'LOCATION': 'my_cache_table',
}
}
- Celeryのデータベーステーブルを作ります。さらに追加したWidgetモデルもデータベースに反映させます
$ python manage.py migrate django_celery_results
$ python manage.py makemigrations
$ python manage.py maigrate demoapp
Workerプロセスの開始
$ celery -A proj worker -l INFO
$ pyython manage.py migrate
$ python manage.py createsuperuser
$ python manage.py runserver 0.0.0.0:8000
- adminに行くとTask Results、Group resultsというテーブルが作られています。
- あとはViewからタスクを実行するとよさそうか
- チュートリアルはここで終了しているがViewとテンプレートも定義してみます。
ページを作る
- 今回CreateViewの実装は割愛するのでshellでWidgetデータを2つほど作っておきます。
$ python manage.py shell
>>> from demoapp.models import Widget
>>> w = Widget(name="WidgetA")
>>> w.save()
>>> w = Widget(name="WidgetB")
>>> w.save()
- proj/urls.py
urlpatterns = [
...
path('demoapp/', inclcue('demoapp.urls'))
]
- demoapp/urls.py
appname="demoapp"
urlpatterns = [
path('widget_count/', WidgetListView.as_view()),
path('widget_rename/<int:pk>/', WidgetRenameView.as_view()),
path('tasks/<str:task_id>/', WidgetTaskView.as_view())
]
- deomapp/templates/demoapp/rename.html
<html>
<body>
<form method="post">
<p>Widgetの名前変更</p>
{{ form.as_table }}
{% csrf_token %}
<button type="submit">送信</button>
</form>
</body>
</html>
- demoapp/forms.py
class WidgetForm(forms.ModelForm):
class Meta:
model = Widget
fields = ['name']
- demoapp/views.py
from django.views import View
from django.shortcuts import render
from django.http import HttpResponse
from celery.result import AsyncResult
from demoapp.models import Widget
from demoapp.forms import WidgetForm
from proj.tasks import count_widgets, rename_widget
class WidgetListView(View):
"""
Widgetの総数を計算するタスクを呼び出す
"""
def get(self, request, *args, **kwargs):
# タスクの呼び出し
result = count_widgets.delay()
response = HttpResponse()
response.write('<a href="/demoapp/tasks/{}/">タスクの結果</a>'.format(result.task_id))
return response
class WidgetRenameView(View):
"""
Widgetをリネームするタスクを呼び出す
"""
def get(self, request, pk, *args, **kwargs):
instance = Widget.objects.get(pk=pk)
form = WidgetForm(instance=instance)
return render(request, 'demoapp/rename.html', {'form': form})
def post(self, request, pk, *args, **kwargs):
name = request.POST['name']
# タスクの呼び出し
result = rename_widget.delay(pk, name)
response = HttpResponse()
response.write('<a href="/demoapp/tasks/{}">タスクの結果</a>'.format(result.task_id))
return response
class WidgetTaskView(View):
"""
タスクIDからタスクの状態を追跡する
"""
def get(self, request, task_id, *args, **kwargs):
task = AsyncResult(task_id)
return HttpResponse(
'<ul>' +
f'<li>id = {task.id} </li>' +
f'<li>status = {task.status} </li>' +
f'<li>result = {task.result} </li>'+
'</ul>'
)
django-celery-results.models.TaskResultsモデルから結果をDBから引っ張ってこれるかと思いましたが、タスクが完了するまではDoesNotExistになるようです。
したがって、タスクの状態はcelery.result.AsyncResultから取得します。
タスクについてより多くの情報が欲しい場合は、statusがPENDNGの間はAsyncResultからデータを取得して、statusがSUCCESSになったら同じtask_idでdjango-celery-resultsのTaskResultを使うのが良いかもしれません。
- また、django-celery-resultsを使っているので、管理画面からタスクの状態と結果を見ることができます。(/admin/django_celery_results/taskresult/)