概要
- djangoとceleryを組み合わせて非同期処理をした結果を、django-celery-resultsを使って保存するまでの流れ
- statusがPENDINGままだったり、DBへの結果の書き込みが非同期処理が終わるまで行われなかったりと躓いた箇所の備忘録
前提
- 環境
- python: v.3.7.7
- redis: v.6.0.9
- pip install
- django: v.2.0.3
- celery: v.4.4.7
- django-redis: v.4.12.1
- django-celery-results: v.1.2.1
インストール
- settings.py編集
INSTALLED_APPS = [
...
'django_celery_results', # 追加
]
CELERY_BROKER_URL = "redis:// [redis host]:6379" # ブローカーにredisを指定
CELERY_RESULT_BACKEND = "django-db" # 結果はdjango指定のDBに保存。本記事ではMySQLを想定。
CELERY_TASK_TRACK_STARTED = True # taskが開始状態になったことを確認できるための設定(後述)
構築
- 実行ファイル構成
- 以降で作成するファイルの構成は以下の通り
├── myapp
│ ├── myapp
│ │ ├── __init__.py
│ │ ├── settings.py
│ │ ├── celery.py
│ │ ├── urls.py
│ │ ├── tasks
│ │ │ ├── __init__.py
│ │ │ └── task.py
│ │ └── wsgi.py
│ └── manage.py
├── docker-compose.yml
└── requirements.txt
- celery.py作成
- アプリケーションディレクトリ配下(urls.pyと同じ階層)に以下
celery.py
を作成する(公式ドキュメントとほぼ同じ)
- アプリケーションディレクトリ配下(urls.pyと同じ階層)に以下
# celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault(
'DJANGO_SETTINGS_MODULE',
'myapp.settings'
)
app = Celery('myapp')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object(
'django.conf:settings',
namespace='CELERY'
)
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
- taskファイル作成
- 複数の非同期スクリプトを管理するために、tasksディレクトリを切って、配下にタスク用のスクリプトを配置する構成にしている
- 以下の通り
task.py
を用意する
## 呼び出し先(=Task)
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def hello(proj_id):
time.sleep(10) # 非同期に処理が終わることを確認するために10秒まつ
message = "hello"
print(message) # 標準出力
return message # 結果を返す
立ち上げ
- djangoアプリケーションの実行
$python manage.py makemigrations # migrationファイルを作成(必要なら
$python manage.py migrate # migrate実行
$python manage.py rumserver # アプリケーション実行
- redis立ち上げ
# redis-server
- celery立ち上げ
- manage.pyがあるディレクトリにcdして以下コマンドを実行
$celery -A myapp worker --concurrency=1 -l info
Taskを実行して、IDを取得する
- タスクの実行は、
[your_task_function].delay()
で実行可能- 以下の例では、10秒待ってから "hello" を返しているだけ
- タスクの内容を確認するためには、タスクに割り振られるIDを取得する必要がある
-
hello.delay()
の返り値として、タスクオブジェクトが返って来て、idの参照が可能。 - タスクIDをDBなどに保持しておくことで、アプリ内部でいつでも・別プロセスのロジックからもタスクの状態チェックが可能になる
-
from django_app.tasks import hello
from celery.result import AsyncResult
def hoge():
task = hello.delay(project_id) # 非同期処理の呼び出しはこれだけでOK。後は非同期に処理が流れていく
print("===============")
print(task.id) # 一意に割り振られたIDが確認できる。
Taskの状態を確認する
- 上記例で取得したタスクIDを使って、タスクの状態をチェックしてみる
-
AsyncResult
を使って、状態と結果を確認する- タスクIDを渡すことで、タスクのオブジェクトが取得できる
-
status
メソッドでタスクの状態を取得できる-
PENDING
: 実行待ちの状態。 -
STARTED
: 実行開始の状態。(デフォルトでは出力されない。後述で出力方法を記します。) -
SUCCESS
: 実行が正常終了した状態。 -
FAILED
: 実行が異常終了した状態。
-
from django_app.tasks import hello
from celery.result import AsyncResult
def hoge():
task = hello.delay(project_id) # 非同期処理の呼び出しはこれだけでOK。後は非同期に処理が流れていく
print("===============")
task_id = task.id
print(task_id) # 一意に割り振られたIDが確認できる。
task_1 = AsyncResult(task_id)
print(task_1.status) # 処理開始直後のstatusを確認
time.sleep(1)
print("===============")
task_2 = AsyncResult(task_id)
print(task_2.status) # 処理1秒後のstatusを確認
- 結果をDBに保存させる
- あまり使わないと思うけど、やり方として。。
- settings.pyにて、
CELERY_RESULT_BACKEND
を指定したことで、非同期処理の状態・結果はDBに自動で保存される。本記事では、MySQLを想定として確認する。 - タスクIDを指定して、djangoからタスク情報の検索が可能
from django_app.tasks import hello
from django_celery_results.models import TaskResult
def hoge():
task = hello.delay(project_id) # 非同期処理の呼び出しはこれだけでOK。後は非同期に処理が流れていく
print("===============")
task_id = task.id # 一意に割り振られたIDが確認できる。
print(task_id)
task_model_1 = TaskResult.objects.filter(task_id=task_id)
if len(task_model_1) > 0:
print(task_model_1[0].status) # 処理開始直後のstatusを確認
time.sleep(1)
task_model_2 = TaskResult.objects.filter(task_id=task_id)
if len(task_model_2) > 0:
print(task_model_2[0].status) # 処理1秒後のstatusを確認
# MySQLの中身
## tableは以下の通り。
mysql> show tables;
+----------------------------------+
| Tables_in_nodeai |
+----------------------------------+
| auth_group |
| auth_group_permissions |
| auth_permission |
| django_admin_log |
| django_celery_results_taskresult | ## <- これが追加されている!
...
## django_celery_results_taskresultのschemaは以下の通り。
mysql> desc django_celery_results_taskresult;
+------------------+--------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+------------------+--------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| task_id | varchar(255) | NO | UNI | NULL | |
| status | varchar(50) | NO | MUL | NULL | |
| content_type | varchar(128) | NO | | NULL | |
| content_encoding | varchar(64) | NO | | NULL | |
| result | longtext | YES | | NULL | |
| date_done | datetime(6) | NO | MUL | NULL | |
| traceback | longtext | YES | | NULL | |
| meta | longtext | YES | | NULL | |
| task_args | longtext | YES | | NULL | |
| task_kwargs | longtext | YES | | NULL | |
| task_name | varchar(255) | YES | MUL | NULL | |
| worker | varchar(100) | YES | MUL | NULL | |
| date_created | datetime(6) | NO | MUL | NULL | |
+------------------+--------------+------+-----+---------+----------------+
Taskの状態がPENDINGままになるのを防ぐ
- settings.pyに
CELERY_TASK_TRACK_STARTED
が指定されていないと、タスクを実行しても結果が出るまで-
AsyncResult
を使って取得したstatusはPENDING
のままになっている - DBにタスクの内容が保持されていない。(結果が出て初めて保存される)
-
- 解消するために、
settings.py
に以下の設定をいれている
CELERY_TASK_TRACK_STARTED = True
Taskを失敗させてみる
- raiseでエラーを挙げればタスクの結果は
FAILED
となる - raiseした結果のエラーメッセージは、
task.result
に格納される
## 呼び出し先(=Task)
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def hello(proj_id):
time.sleep(10) # 非同期に処理が終わることを確認するために10秒まつ
message = "hello"
raise Exception("my error message") # エラー発報
# MySQLに格納された結果
mysql> select * from django_celery_results_taskresult\G
...
*************************** 31. row ***************************
id: 31
task_id: be294008-d2fc-4760-9055-483efdaa4970
status: FAILURE
content_type: application/json
content_encoding: utf-8
result: {"exc_type": "Exception", "exc_message": ["my error message"], "exc_module": "builtins"}
date_done: 2020-11-10 08:06:32.848782
traceback: Traceback (most recent call last):...
meta: {"children": []}
task_args: (4,)
task_kwargs: {}
task_name: myapp.tasks.task.hello
worker: celery@05ab2e4b5ee1
date_created: 2020-11-10 08:06:22.829301