CeleryでSQSを使う場合は注意点がいくつかあるのでまとめました
基本的には以下のドキュメントを参考に進めます
First steps with Django
Using Amazon SQS
インストール
$ pip install celery[sqs]
(django-celeryというものもありますが、対応しているCeleryのバージョンが古いようです)
モジュールの作成
プロジェクト直下に以下のようなファイルを作成します
proj
の箇所は自分のプロジェクトに合わせてください
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery('proj')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django apps.
app.autodiscover_tasks()
次にプロジェクト直下の__init__.py
に以下を追加します
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
こんな感じの構成になると思います
- path/
- manage.py
- proj/
- __init__.py
- celery.py
- settings.py
- urls.py
SQS設定
settings.pyに以下を追加します
AWS_ACCESS_KEY_ID
、AWS_SECRET_ACCESS_KEY
は環境変数などで渡してください
import urllib
CELERY_BROKER_URL = "sqs://{aws_access_key}:{aws_secret_key}@".format(
aws_access_key=urllib.parse.quote(AWS_ACCESS_KEY_ID, safe=''),
aws_secret_key=urllib.parse.quote(AWS_SECRET_ACCESS_KEY, safe=''),
)
CELERY_BROKER_TRANSPORT_OPTIONS = {
'region': 'ap-northeast-1',
'predefined_queues': {
# 任意のキュー名(FIFOキューの場合は.fifoをつけるが必要ある)
'csv-upload.fifo': {
# キューのURL
'url': 'https://sqs.ap-northeast-1.amazonaws.com/000000000/csv-upload.fifo',
},
'send-message': {
'url': 'https://sqs.ap-northeast-1.amazonaws.com/000000000/send-message',
}
}
}
# 処理が完了したらメッセージを削除する
CELERY_TASK_ACKS_LATE = True
# 例外エラーが発生したらメッセージは削除しない
CELERY_TASK_ACKS_ON_FAILURE_OR_TIMEOUT = False
predefined_queues
に任意のキュー名とキューのURLを連想配列の形で定義してください
キュー名はキューにメッセージを送信するときや、ワーカーを起動するときに使用するだけなので、内容がわかる任意の名前で問題ないですが、FIFOキューの場合は.fifo
を付ける必要があるようです
公式ドキュメントには触れられてませんが以下の設定部分があった方がいいです
# 処理が完了したらメッセージを削除する
CELERY_TASK_ACKS_LATE = True
# 例外エラーが発生したらメッセージは削除しない
CELERY_TASK_ACKS_ON_FAILURE_OR_TIMEOUT = False
Celeryはデフォルトでは
キューからメッセージを取得
↓
キューからメッセージを削除
↓
処理実行
という動きのようで、処理が終わる前にメッセージが削除されます
スタンダードキューならまだしも、FIFOキューは基本並列処理させたくないはずなので、この仕様は致命的になります(次のメッセージがある場合、即時実行される)
CELERY_TASK_ACKS_LATE = True
とすることで
キューからメッセージを取得
↓
処理実行
↓
キューからメッセージを削除
になります
ただし、このままだと処理が成功しようが例外エラーが発生しようがメッセージは削除されます
デッドレターキューを使っている場合など、例外エラーの場合にメッセージを削除したくない場合は
CELERY_TASK_ACKS_ON_FAILURE_OR_TIMEOUT = False
にします
キューへのメッセージ送信
適当な場所にtasks.py
というファイルを以下のような内容で作成します
自分は、celery.pyと同じディレクトリに作成しました
from celery import shared_task
@shared_task
def csv_register(record_id):
# 必要な処理
以下のような処理で上記処理を呼びます
from path import tasks
class hoge:
@classmethod
def csv_register_task(cls, record_id, shop_id):
tasks.csv_register.apply_async(
args=[record_id],
queue='csv-upload.fifo',
MessageGroupId=f'shop_id-{shop_id}')
args
で呼んでるメソッドの引数の指定
queue
で送信したいキュー名
MessageGroupId
でメッセージグループの指定(必要があれば)
が設定できます
ワーカーの起動
メッセージの送信までできたので、メッセージを取得して処理を実行させるワーカーを以下のようなコマンドで起動させます
celery -A proj worker -l INFO -Q csv-upload.fifo --concurrency=2
proj
は自分のプロジェクト名にしてください
-Q
で取得するキューを指定します
--concurrency
はプロセス数です、この場合二つにしてますが一つでよこれば不要です
キューが複数ある場合はキュー分起動させます
その場合はsupervisorなどを使った方がいいかと思います(参考となるconfもここにあります)
実際にメッセージを送信してみて実行されれば成功です