はじめに
今回はAWS SQSとCeleryを連携させる方法について記載していきます。
CeleryはDjangoで使用していることを想定の上、作業をしてきます。
※基本的な設定は他の導入記事がたくさんあるため省略します。
基本的な設定編
まずは、settings.pyに下記の設定を記載しましょう。
重要なのは主に以下です。
CELERY_BROKER_URL = "sqs://"# AWS内のEC2などにアクセス権限が付与されていればこれだけで大丈夫です。(付与されていない場合は、権限情報を記載する必要があります。)
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_ACKS_LATE = True
CELERY_TASK_ACKS_ON_FAILURE_OR_TIMEOUT = False
CELERY_RESULT_BACKEND = None
CELERY_TASK_DEFAULT_QUEUE = 'celery.fifo' # デフォルトのqueueを設定
CELERY_TIMEZONE = 'Asia/Tokyo'
次に、celery.pyに下記を追記していきます。
import uuid
from django.conf import settings
from kombu import Queue
sqs_url = settings.SQS_URL # (https://sqs.ap-northeast-1.amazonaws.com/00000000/)
queues = {
# FIFOQueueの場合は、.fifoをつける必要がある。
'hoge.fifo': {
'url': f'{sqs_url}hoge.fifo', # コンソールから確認できるQueueのURL
'queue_name': 'hoge.fifo',
'message_group_id': 'hoge_group',
'message_deduplication_id': str(uuid.uuid4())
},
'celery.fifo': {
'url': f'{sqs_url}celery.fifo',
'queue_name': 'celery.fifo',
'message_group_id': 'celery.group',
'message_deuplication_id': str(uuid.uuid4())
}
}
task_routes = {
'hallo_hoge': {
'queue': hoge.fifo'
}
}
broker_transport_options = {
'region': 'ap-northeast-1',
'predefined_queues': queues,
'visibility_timeout': 3600,
'polling_interval': 1
}
app: Celery = Celery("app_backend")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.conf.update(
broker_transport_options=broker_transport_options,
task_routes=task_routes,
task_queues=[Queue(name, exchange=name, routing_key=name) for name in queues.keys()]
)
app.autodiscover_tasks()
上記設定の補足です。
queuesには、SQSで作成したQueueをCelery上のどのqueueで使用するかを設定します。
task_routesには、Celeryのどのqueueをどのtaskに送信するかを設定します。
今回の場合は、以下のような流れになります。
SQS上のhoge.fifoqueueにmessageが入ったことをCeleryがキャッチ
↓↑ ここは相互にやりとりする
Celeryの'hoge.fifo'でそのmessageを読み込み
↓
Celeryがhello_hogeタスクを呼び出し、非同期処理を実行
このcelery.pyの内容をいい感じに設定すると、taskのルーティングやqueueのURL設定などを動的に設定することも可能です。
但し、Celeryを起動した後に設定内容を更新することはできないので、DjangoからDB内のレコードを更新し、そのレコード内容に基づいてqueue内容を設定したいなどの要件の場合は、DB内容を監視し、Celeryを再起動させるCustom Commandを作成するなどで対応する必要があります。
デフォルトキューについて
settings.pyでcelery.fifoをデフォルトキューに設定しましたが、これはtask_routesで設定されているものではないtaskを非同期処理でSQSにエンキューした際にこのqueueに入るという設定となります。
実運用上では、ここのQueueを作成して利用することになると思うためあまり使わないと思いますが、開発上ではとりあえず新しい非同期処理が正しく動くか確認するために使用することができます。
外部からSQSへメッセージを送信する
ここからは、外部アプリケーションから先ほど設定したSQSへ向けてエンキューし、そのキューをDjangoのCeleryでキャッチして処理する方法について記載していきます。
この方法を使用すると、AのアプリからBのアプリへジョブを投げることができるようになります。
ここでは、外部アプリケーションもpythonで作成していきますが、適宜他の言語に読み替えてください。
メッセージ内容の作成
Celeryで読み込める形式のメッセージを作成するにはメッセージの内容をbase64でエンコードする必要があります。また、メッセージのbodyもbase64でエンコードする必要があります。
エンコードを行わないとうまい具合にデコードが行われず例外が発生します。
import json
import base64
import uuid
import boto3
hoge_id = str(uuid.uuid4())
args = []
kwargs = {
'hoge': 'hoge',
'fuga': 'fuga',
}
task_message = (args, kwargs, None)
task_message_body = json.dumps(task_message)
encoded_body = base64.b64encode(task_message_body.encode()).decode()
message_body = json.dumps({
'body': encoded_body,
"content-encoding": "utf-8",
"content-type": "application/json",
'headers': {
'lang': 'py',
'task': 'test_app.tasks.hello_hoge',
'id': hoge_id, # unique値
"argsrepr": args,
"kwargsrepr": kwargs,
"ignore_result": False,
"replaced_task_nesting": 0
},
'properties': {
'correlation_id': hoge_id, # unique値
"MessageGroupId": f'hello_{hoge_id}', # unique値
"delivery_mode": 2,
"delivery_info": {
"exchange": "",
"routing_key": 'hoge.fifo'
},
"body_encoding": "base64"
}
})
sqs_client = boto3.client("sqs")
sqs_url = sqs_url + 'hoge.fifo'
sqs_client.send_message(
QueueUrl=sqs_url,
MessageBody=base64.b64encode(message_body.encode()).decode(),
MessageGroupId=hoge_id,
MessageDeduplicationId=hoge.id,
)
上記のコードについて解説します。
args, kwargsは非同期処理の対象メソッドの引数に渡したいものを入れることができます。
argsは配列型
kwargsはオブジェクト型で入れることができるので、必要に応じて引数を設定してください。
今回は、hogeとfugaというキーワード引数を設定しました。
message_bodyの内容について詳しく知りたい方は、公式ドキュメントを参考にしてください。(https://docs.celeryq.dev/en/stable/internals/protocol.html#task-message-format)
この処理を実行すると、SQSのhoge.fifoキューにメッセージがエンキューされます。
それをCeleryがポーリングして、処理を開始します。
タスクの定義について
最後にタスク定義について簡単な例を紹介します。
task.pyに以下を記載します。
キーワード引数は入ってこないことも想定したハンドリングをしたほうが良いです。(外部からSQSにエンキューできるため)
from celery import shared_task
@shared_task
def hello_hoge(hoge=None, fuga=None):
if hoge and fuga:
print(f'Hello {hoge} and {fuga}')
これで、外部からSQSへジョブを投げて、hello_hogeという非同期処理を実行させることができるようになりました!
まとめ
今回は、SQSとCeleryを連携させる方法についてまとめました。
AWSでCeleryを使用するときは、redisをElasticCacheとして使用するよりもSQSを使用するほうがランニングコストが低く済むため、要件が大丈夫なようであれば積極的に活用していきたいところです。
外部からエンキューした処理をCeleryでキャッチして処理する実装内容については参考文献が少なく、ライブラリ内のコードを読み進めて行く必要があるところに苦労しました。
この記事がどなたかの参考になれば幸いです!
参考
https://docs.celeryq.dev/en/stable/internals/protocol.html#task-message-format
https://github.com/celery/celery
https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/sqs.html