3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Django+Celery+SQSでキュー処理を行う

Last updated at Posted at 2022-10-15

CeleryでSQSを使う場合は注意点がいくつかあるのでまとめました

基本的には以下のドキュメントを参考に進めます
First steps with Django
Using Amazon SQS

インストール

$ pip install celery[sqs]

(django-celeryというものもありますが、対応しているCeleryのバージョンが古いようです)

モジュールの作成

プロジェクト直下に以下のようなファイルを作成します
projの箇所は自分のプロジェクトに合わせてください

celery.py
import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()

次にプロジェクト直下の__init__.pyに以下を追加します

__init__.py
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

こんな感じの構成になると思います

- path/
  - manage.py
  - proj/
    - __init__.py
    - celery.py
    - settings.py
    - urls.py

SQS設定

settings.pyに以下を追加します
AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYは環境変数などで渡してください

settings.py
import urllib

CELERY_BROKER_URL = "sqs://{aws_access_key}:{aws_secret_key}@".format(
    aws_access_key=urllib.parse.quote(AWS_ACCESS_KEY_ID, safe=''), 
    aws_secret_key=urllib.parse.quote(AWS_SECRET_ACCESS_KEY, safe=''),
)

CELERY_BROKER_TRANSPORT_OPTIONS = {
    'region': 'ap-northeast-1',
    'predefined_queues': {
        # 任意のキュー名(FIFOキューの場合は.fifoをつけるが必要ある)
        'csv-upload.fifo': {
            # キューのURL
            'url': 'https://sqs.ap-northeast-1.amazonaws.com/000000000/csv-upload.fifo',
        },
        'send-message': {
            'url': 'https://sqs.ap-northeast-1.amazonaws.com/000000000/send-message',
        }
    }
}

# 処理が完了したらメッセージを削除する
CELERY_TASK_ACKS_LATE = True
# 例外エラーが発生したらメッセージは削除しない
CELERY_TASK_ACKS_ON_FAILURE_OR_TIMEOUT = False

predefined_queuesに任意のキュー名とキューのURLを連想配列の形で定義してください
キュー名はキューにメッセージを送信するときや、ワーカーを起動するときに使用するだけなので、内容がわかる任意の名前で問題ないですが、FIFOキューの場合は.fifoを付ける必要があるようです

公式ドキュメントには触れられてませんが以下の設定部分があった方がいいです

# 処理が完了したらメッセージを削除する
CELERY_TASK_ACKS_LATE = True
# 例外エラーが発生したらメッセージは削除しない
CELERY_TASK_ACKS_ON_FAILURE_OR_TIMEOUT = False

Celeryはデフォルトでは

キューからメッセージを取得
↓
キューからメッセージを削除
↓
処理実行

という動きのようで、処理が終わる前にメッセージが削除されます
スタンダードキューならまだしも、FIFOキューは基本並列処理させたくないはずなので、この仕様は致命的になります(次のメッセージがある場合、即時実行される)
CELERY_TASK_ACKS_LATE = Trueとすることで

キューからメッセージを取得
↓
処理実行
↓
キューからメッセージを削除

になります
ただし、このままだと処理が成功しようが例外エラーが発生しようがメッセージは削除されます
デッドレターキューを使っている場合など、例外エラーの場合にメッセージを削除したくない場合は
CELERY_TASK_ACKS_ON_FAILURE_OR_TIMEOUT = False
にします

キューへのメッセージ送信

適当な場所にtasks.pyというファイルを以下のような内容で作成します
自分は、celery.pyと同じディレクトリに作成しました

tasks.py
from celery import shared_task

@shared_task
def csv_register(record_id):
    # 必要な処理

以下のような処理で上記処理を呼びます

from path import tasks

class hoge:
    @classmethod
    def csv_register_task(cls, record_id, shop_id):
        tasks.csv_register.apply_async(
            args=[record_id], 
            queue='csv-upload.fifo',
            MessageGroupId=f'shop_id-{shop_id}')

argsで呼んでるメソッドの引数の指定
queueで送信したいキュー名
MessageGroupIdでメッセージグループの指定(必要があれば)
が設定できます

ワーカーの起動

メッセージの送信までできたので、メッセージを取得して処理を実行させるワーカーを以下のようなコマンドで起動させます

celery -A proj worker -l INFO -Q csv-upload.fifo --concurrency=2

projは自分のプロジェクト名にしてください
-Qで取得するキューを指定します
--concurrencyはプロセス数です、この場合二つにしてますが一つでよこれば不要です
キューが複数ある場合はキュー分起動させます
その場合はsupervisorなどを使った方がいいかと思います(参考となるconfもここにあります)

実際にメッセージを送信してみて実行されれば成功です

関連

Django+Celery+SQSをLocalStackで動かす

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?