LoginSignup
1
1

More than 1 year has passed since last update.

Celery公式チュートリアルを簡単に試せるように要約してみた

Posted at

概要

DjangoとCeleryの連携

  • シンプルなDockerコンテナを用意します。
FROM python:3.8

RUN apt-get update
RUN apt-get install -y vim less tmux
RUN pip install --upgrade pip
EXPOSE 8000
version: '3'
services:
  python:
    build: .
    container_name: 'python3'
    working_dir: '/usr/local/src'
    tty: true
    ports:
      - "8000:8000"
    volumes:
      - .:/usr/local/src
  redis:
    image: "redis:latest"
    ports:
      - "6379:6379"
    volumes:
      - "./data/redis:/data"
  • Djangoプロジェクトを用意します。ブローカーはタスクのIDがuuidだったりしてsqlite3ではきつい可能性があります。
$ pip install django==3.2
$ pip install Redis psycopg2
$ django-admin startproject proj .
  • proj/celery.py
import os

from celery import Celery


# 'celery' プログラムのためのデフォルトの Django 設定モジュールを設定します。
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# settings.pyにCelory関連の設定を書くときにCELERY_*という接頭辞をつけることができる
app.config_from_object('django.conf:settings', namespace='CELERY')

# Djangoに登録された全ての タスクモジュールをロードします。
app.autodiscover_tasks()

# bind=Trueは現在のタスクインスタンスを簡単に取得でいる
# このタスク自体はリクエスト情報を出力する
@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')
  • proj/__init__.py
# django起動時にアプリが読み込まれ
# @shared_taskデコレータを利用できるようになる
from .celery import app as celery_app

__all__ = ('celery_app',)

Djangoアプリの作成

$ python manage.py startapp demoapp
  • demoapp/models.py
from django.db import models

class Widget(models.Model):
    name = models.CharField(max_length=140)
  • proj/settings.py
INSTALLED_APPS = [
	...
	
    'demoapp.apps.DemoappConfig'
]
  • demoapp/tasks.py
from demoapp.models import Widget

from celery import shared_task

# チュートリアルから2つモデルが絡んでいるタスクを持ってきました。
# @shared_taskデコレータをつけると具体的なCeleryインスタンスがなくてもタスクを定義できる
@shared_task
def count_widgets():
    return Widget.objects.count()

@shared_task
def rename_widget(widget_id, name):
    w = Widget.objects.get(id=widget_id)
    w.name = name
    w.save()

Result Backend

  • django-celery-results拡張機能でDjangoo ORM、DjangoCacheフレームワークを選択。
$ pip install django-celery-results
  • proj/settings.py
INSTALLED_APPS = (
	...
	'django_celery_results',
)

CELERY_RESULT_BACKEND = 'django-db'

# DjangoCacheにするには
CELERY_RESULT_BACKEND = 'django-cache'とする
CELERY_CACHE_BACKEND = 'default'
CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.db.DatabaseCache',
        'LOCATION': 'my_cache_table',
    }
}
  • Celeryのデータベーステーブルを作ります。さらに追加したWidgetモデルもデータベースに反映させます
$ python manage.py migrate django_celery_results
$ python manage.py makemigrations
$ python manage.py maigrate demoapp

Workerプロセスの開始

$ celery -A proj worker -l INFO
$ pyython manage.py migrate
$ python manage.py createsuperuser
$ python manage.py runserver 0.0.0.0:8000
  • adminに行くとTask Results、Group resultsというテーブルが作られています。
  • あとはViewからタスクを実行するとよさそうか
  • チュートリアルはここで終了しているがViewとテンプレートも定義してみます。

ページを作る

  • 今回CreateViewの実装は割愛するのでshellでWidgetデータを2つほど作っておきます。
$ python manage.py shell
>>> from demoapp.models import Widget
>>> w = Widget(name="WidgetA")
>>> w.save()

>>> w = Widget(name="WidgetB")
>>> w.save()
  • proj/urls.py
urlpatterns = [
	...
	path('demoapp/', inclcue('demoapp.urls'))
]
  • demoapp/urls.py
appname="demoapp"
urlpatterns = [
    path('widget_count/', WidgetListView.as_view()),    
    path('widget_rename/<int:pk>/', WidgetRenameView.as_view()),
    path('tasks/<str:task_id>/', WidgetTaskView.as_view())
]
  • deomapp/templates/demoapp/rename.html
<html>
<body>
	<form method="post">
	<p>Widgetの名前変更</p>
	{{ form.as_table }}
	{% csrf_token %}
	<button type="submit">送信</button>
	</form>
</body>
</html>
  • demoapp/forms.py
class WidgetForm(forms.ModelForm):
    class Meta:
        model = Widget
        fields = ['name']
  • demoapp/views.py
from django.views import View
from django.shortcuts import render
from django.http import HttpResponse
from celery.result import AsyncResult

from demoapp.models import Widget
from demoapp.forms import WidgetForm
from proj.tasks import count_widgets, rename_widget


class WidgetListView(View):
    """
    Widgetの総数を計算するタスクを呼び出す
    """
    def get(self, request, *args, **kwargs):
        # タスクの呼び出し
        result = count_widgets.delay()
        response = HttpResponse()
        response.write('<a href="/demoapp/tasks/{}/">タスクの結果</a>'.format(result.task_id))
        return response


class WidgetRenameView(View):
    """
    Widgetをリネームするタスクを呼び出す
    """
    def get(self, request, pk, *args, **kwargs):
        instance = Widget.objects.get(pk=pk)
        form = WidgetForm(instance=instance)
        return render(request, 'demoapp/rename.html', {'form': form})

    def post(self, request, pk, *args, **kwargs):
        name = request.POST['name']
        # タスクの呼び出し
        result = rename_widget.delay(pk, name)
        response = HttpResponse()
        response.write('<a href="/demoapp/tasks/{}">タスクの結果</a>'.format(result.task_id))
        return response


class WidgetTaskView(View):
    """
    タスクIDからタスクの状態を追跡する
    """
    def get(self, request, task_id, *args, **kwargs):
        task = AsyncResult(task_id)
        return HttpResponse(
                '<ul>' +
                f'<li>id = {task.id} </li>' +
                f'<li>status = {task.status} </li>' +
                f'<li>result = {task.result} </li>'+
                '</ul>'
        )

django-celery-results.models.TaskResultsモデルから結果をDBから引っ張ってこれるかと思いましたが、タスクが完了するまではDoesNotExistになるようです。
したがって、タスクの状態はcelery.result.AsyncResultから取得します。

タスクについてより多くの情報が欲しい場合は、statusがPENDNGの間はAsyncResultからデータを取得して、statusがSUCCESSになったら同じtask_idでdjango-celery-resultsのTaskResultを使うのが良いかもしれません。

  • また、django-celery-resultsを使っているので、管理画面からタスクの状態と結果を見ることができます。(/admin/django_celery_results/taskresult/)

参考

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1