3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

さくらのクラウド シンプルMQを使ってDjangoのCeleryを動かす

Last updated at Posted at 2025-02-23

「シンプルMQ」が2025年2月6日にさくらのクラウドに追加されました。

「シンプルMQ」はソフトウェア同士を非同期に連携するために、ソフトウェアコンポーネント間でのデータの送受信ができるマネージド型のメッセージキューサービスです。

Pythonを使って非同期の処理を複数のワーカーで分散処理する場合には、タスクキューとして Celery を使うことがよくあると思います。

Celeryでは実際に利用するキューとしてオンメモリやデータベース、Amazon SQS、Redisなどのキューを利用することができます。

同様にしてさくらのクラウドのシンプルMQをCeleryのバックエンドとして使えるように、ドライバ chibiegg/kombu-sakura-simplemq を書きましたので紹介します。
(正確にはCeleryが利用しているMQのラッパーである kombu のTransportです)

Celeryは利用したことがある方をメインターゲットに書いています。Celery自体の使い方については他の情報もご参照ください。

インストール

PyPI にパッケージを登録していますので、pipでインストールできます。

pip install kombu-sakura-simplemq

さくらのクラウドの準備

事前にさくらのクラウドのコントロールパネルでシンプルMQのキューを作成しておきます。

キュー名はこの後使いますので、控えておいてください。
何のキューなのかわかりやすい名前をつけておきます。

image.png

キューを作成するとメッセージの送受信に必要なAPIキーが表示されます。
これも利用しますので控えておいてください。

image.png

DjangoにCeleryを設定する

Celeryパッケージのインストール

pip install celery

settings.pyにCeleryの設定を追加

settings.py
from kombu import transport
transport.TRANSPORT_ALIASES["sakura-simplemq"] = "kombu_sakura_simplemq.transport:Transport"

# celery configurations
CELERY_BROKER_URL = "sakura-simplemq://:{}@".format(
    os.environ.get("SIMPLE_MQ_API_KEY", ""),
)
CELERY_TASK_DEFAULT_QUEUE = "test-queue"  # 作成しておいたキューの名前
CELERY_RESULT_BACKEND = None
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_TIMEZONE = "Asia/Tokyo"

celery.pyを追加

djangoproject/celery.py
import os
import django
from django.conf import settings

# celeryで使うDjangoの設定ファイル(settings.py)を指定
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djangoproject.settings")
django.setup()

from celery import Celery  # noqa: E402

app = Celery("djangoproject")

# Djangoのconfigファイルをceleryのconfigとして使う宣言
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

# 利用されるタスクをimportしておく
from djangoproject.tasks import test_task  # noqa: E402,F401

タスクを定義

djangoproject/tasks.py
import logging

from djangoproject.celery import app

logger = logging.getLogger(__name__)


@app.task
def test_task():
    logger.debug("テストタスクが実行されました")

ジョブの実行

非同期に実行するには定義したタスクを delay() で呼び出します。
非同期でタスクを実行すると、シンプルMQのキューにメッセージが蓄積されます。

from djangoproject.tasks import test_task

test_task.delay()

ワーカーの起動

他のキューを利用した場合と同様にCeleryのワーカーを起動すると、先ほど蓄積したタスクが実行されます。

celery -A djangoproject worker -l DEBUG
実行例
% celery -A djangoproject worker -l DEBUG
[2025-02-23 11:56:06,765: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2025-02-23 11:56:06,782: DEBUG/MainProcess] | Worker: Building graph...
[2025-02-23 11:56:06,783: DEBUG/MainProcess] | Worker: New boot order: {Beat, StateDB, Timer, Hub, Pool, Autoscaler, Consumer}
[2025-02-23 11:56:06,848: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2025-02-23 11:56:06,849: DEBUG/MainProcess] | Consumer: Building graph...
[2025-02-23 11:56:06,883: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Heart, Agent, Mingle, Gossip, Tasks, Control, event loop}
[2025-02-23 11:56:06,884: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
 
 -------------- celery@PCNAME v5.4.0 (opalescent)
--- ***** ----- 
-- ******* ---- macOS-15.3-arm64-arm-64bit 2025-02-23 11:56:06
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         djangoproject:0x108fe1310
- ** ---------- .> transport:   sakura-simplemq://:**@localhost//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 12 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> test-queue       exchange=test-queue(direct) key=test-queue
                

[tasks]
  . celery.accumulate
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap
  . djangoproject.tasks.test_task

[2025-02-23 11:56:06,935: DEBUG/MainProcess] | Worker: Starting Pool
[2025-02-23 11:56:07,010: DEBUG/MainProcess] ^-- substep ok
[2025-02-23 11:56:07,011: DEBUG/MainProcess] | Worker: Starting Consumer
[2025-02-23 11:56:07,013: DEBUG/MainProcess] | Consumer: Starting Connection
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2025-02-23 11:56:07,032: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2025-02-23 11:56:07,032: INFO/MainProcess] Connected to sakura-simplemq://:**@localhost//
[2025-02-23 11:56:07,032: DEBUG/MainProcess] ^-- substep ok
[2025-02-23 11:56:07,032: DEBUG/MainProcess] | Consumer: Starting Events
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2025-02-23 11:56:07,033: DEBUG/MainProcess] ^-- substep ok
[2025-02-23 11:56:07,033: DEBUG/MainProcess] | Consumer: Starting Heart
[2025-02-23 11:56:07,034: DEBUG/MainProcess] ^-- substep ok
[2025-02-23 11:56:07,034: DEBUG/MainProcess] | Consumer: Starting Tasks
[2025-02-23 11:56:07,034: DEBUG/MainProcess] Timer wake-up! Next ETA 1.0 secs.
[2025-02-23 11:56:07,045: DEBUG/MainProcess] ^-- substep ok
[2025-02-23 11:56:07,045: DEBUG/MainProcess] | Consumer: Starting event loop
[2025-02-23 11:56:07,045: INFO/MainProcess] celery@PCNAME ready.
[2025-02-23 11:56:07,045: DEBUG/MainProcess] basic.qos: prefetch_count->48
[2025-02-23 11:56:07,048: DEBUG/MainProcess] Starting new HTTPS connection (1): simplemq.tk1b.api.sacloud.jp:443
[2025-02-23 11:56:07,170: DEBUG/MainProcess] https://simplemq.tk1b.api.sacloud.jp:443 "GET /v1/queues/test-queue/messages HTTP/1.1" 200 1507
[2025-02-23 11:56:07,175: INFO/MainProcess] Task djangoproject.tasks.test_task[51a56eef-ce55-467a-8950-38df307895b0] received
[2025-02-23 11:56:07,175: DEBUG/MainProcess] TaskPool: Apply <function fast_trace_task at 0x103e8aac0> (args:('djangoproject.tasks.test_task', '51a56eef-ce55-467a-8950-38df307895b0', {'lang': 'py', 'task': 'djangoproject.tasks.test_task', 'id': '51a56eef-ce55-467a-8950-38df307895b0', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '51a56eef-ce55-467a-8950-38df307895b0', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen98617@PCNAME', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': '51a56eef-ce55-467a-8950-38df307895b0', 'reply_to': '8df5f54e-6ddb-32b3-a3e1-2fc2991d457e', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'test-queue'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '019530b9-ddfd-7520-93f0-8d22b8b0e616'}, 'reply_to': '8df5f54e-6ddb-32b3-a3e1-2fc2991d457e', 'correlation_id': '51a56eef-ce55-467a-8950-38df307895b0', 'hostname': 'celery@PCNAME', 'delivery_info': {'exchange': '', 'routing_key':... kwargs:{})
[2025-02-23 11:56:07,177: DEBUG/MainProcess] Starting new HTTPS connection (1): simplemq.tk1b.api.sacloud.jp:443
[2025-02-23 11:56:07,178: DEBUG/ForkPoolWorker-1] テストタスクが実行されました
[2025-02-23 11:56:07,180: INFO/ForkPoolWorker-1] Task djangoproject.tasks.test_task[51a56eef-ce55-467a-8950-38df307895b0] succeeded in 0.0026704999909270555s: None
[2025-02-23 11:56:07,267: DEBUG/MainProcess] https://simplemq.tk1b.api.sacloud.jp:443 "GET /v1/queues/test-queue/messages HTTP/1.1" 200 1507
[2025-02-23 11:56:07,268: INFO/MainProcess] Task djangoproject.tasks.test_task[642cb8dc-da32-4061-bbe8-2870f7bdd5c9] received
[2025-02-23 11:56:07,269: DEBUG/MainProcess] TaskPool: Apply <function fast_trace_task at 0x103e8aac0> (args:('djangoproject.tasks.test_task', '642cb8dc-da32-4061-bbe8-2870f7bdd5c9', {'lang': 'py', 'task': 'djangoproject.tasks.test_task', 'id': '642cb8dc-da32-4061-bbe8-2870f7bdd5c9', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '642cb8dc-da32-4061-bbe8-2870f7bdd5c9', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen98617@PCNAME', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': '642cb8dc-da32-4061-bbe8-2870f7bdd5c9', 'reply_to': '8df5f54e-6ddb-32b3-a3e1-2fc2991d457e', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'test-queue'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '019530ba-0eb8-7ce7-bd82-09c905872813'}, 'reply_to': '8df5f54e-6ddb-32b3-a3e1-2fc2991d457e', 'correlation_id': '642cb8dc-da32-4061-bbe8-2870f7bdd5c9', 'hostname': 'celery@PCNAME', 'delivery_info': {'exchange': '', 'routing_key':... kwargs:{})
[2025-02-23 11:56:07,271: DEBUG/MainProcess] Starting new HTTPS connection (1): simplemq.tk1b.api.sacloud.jp:443
[2025-02-23 11:56:07,271: DEBUG/ForkPoolWorker-2] テストタスクが実行されました
[2025-02-23 11:56:07,273: INFO/ForkPoolWorker-2] Task djangoproject.tasks.test_task[642cb8dc-da32-4061-bbe8-2870f7bdd5c9] succeeded in 0.0025518749898765236s: None
[2025-02-23 11:56:07,347: DEBUG/MainProcess] https://simplemq.tk1b.api.sacloud.jp:443 "DELETE /v1/queues/test-queue/messages/019530b9-ddfd-7520-93f0-8d22b8b0e616 HTTP/1.1" 200 20
[2025-02-23 11:56:07,350: DEBUG/MainProcess] Starting new HTTPS connection (1): simplemq.tk1b.api.sacloud.jp:443
[2025-02-23 11:56:07,428: DEBUG/MainProcess] https://simplemq.tk1b.api.sacloud.jp:443 "DELETE /v1/queues/test-queue/messages/019530ba-0eb8-7ce7-bd82-09c905872813 HTTP/1.1" 200 20
[2025-02-23 11:56:07,430: DEBUG/MainProcess] Starting new HTTPS connection (1): simplemq.tk1b.api.sacloud.jp:443
[2025-02-23 11:56:07,508: DEBUG/MainProcess] https://simplemq.tk1b.api.sacloud.jp:443 "GET /v1/queues/test-queue/messages HTTP/1.1" 200 1507
[2025-02-23 11:56:07,509: INFO/MainProcess] Task djangoproject.tasks.test_task[4773ee11-84d7-4520-9d13-3ee598942203] received
[2025-02-23 11:56:07,510: DEBUG/MainProcess] TaskPool: Apply <function fast_trace_task at 0x103e8aac0> (args:('djangoproject.tasks.test_task', '4773ee11-84d7-4520-9d13-3ee598942203', {'lang': 'py', 'task': 'djangoproject.tasks.test_task', 'id': '4773ee11-84d7-4520-9d13-3ee598942203', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '4773ee11-84d7-4520-9d13-3ee598942203', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen98617@PCNAME', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': '4773ee11-84d7-4520-9d13-3ee598942203', 'reply_to': '8df5f54e-6ddb-32b3-a3e1-2fc2991d457e', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'test-queue'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '019530ba-13a2-75a5-a806-040ab5d53441'}, 'reply_to': '8df5f54e-6ddb-32b3-a3e1-2fc2991d457e', 'correlation_id': '4773ee11-84d7-4520-9d13-3ee598942203', 'hostname': 'celery@PCNAME', 'delivery_info': {'exchange': '', 'routing_key':... kwargs:{})
[2025-02-23 11:56:07,511: DEBUG/MainProcess] Starting new HTTPS connection (1): simplemq.tk1b.api.sacloud.jp:443
[2025-02-23 11:56:07,512: DEBUG/ForkPoolWorker-3] テストタスクが実行されました
[2025-02-23 11:56:07,513: INFO/ForkPoolWorker-3] Task djangoproject.tasks.test_task[4773ee11-84d7-4520-9d13-3ee598942203] succeeded in 0.0018645829986780882s: None
[2025-02-23 11:56:07,589: DEBUG/MainProcess] https://simplemq.tk1b.api.sacloud.jp:443 "GET /v1/queues/test-queue/messages HTTP/1.1" 200 1507
[2025-02-23 11:56:07,590: INFO/MainProcess] Task djangoproject.tasks.test_task[260acf17-543d-4c07-8083-bd7ec1f4f01c] received
[2025-02-23 11:56:07,590: DEBUG/MainProcess] TaskPool: Apply <function fast_trace_task at 0x103e8aac0> (args:('djangoproject.tasks.test_task', '260acf17-543d-4c07-8083-bd7ec1f4f01c', {'lang': 'py', 'task': 'djangoproject.tasks.test_task', 'id': '260acf17-543d-4c07-8083-bd7ec1f4f01c', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '260acf17-543d-4c07-8083-bd7ec1f4f01c', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen98617@PCNAME', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': '260acf17-543d-4c07-8083-bd7ec1f4f01c', 'reply_to': '8df5f54e-6ddb-32b3-a3e1-2fc2991d457e', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'test-queue'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '019530ba-066c-7583-8fad-7f49a9f13426'}, 'reply_to': '8df5f54e-6ddb-32b3-a3e1-2fc2991d457e', 'correlation_id': '260acf17-543d-4c07-8083-bd7ec1f4f01c', 'hostname': 'celery@PCNAME', 'delivery_info': {'exchange': '', 'routing_key':... kwargs:{})
[2025-02-23 11:56:07,592: DEBUG/MainProcess] Starting new HTTPS connection (1): simplemq.tk1b.api.sacloud.jp:443
[2025-02-23 11:56:07,593: DEBUG/ForkPoolWorker-4] テストタスクが実行されました
[2025-02-23 11:56:07,597: INFO/ForkPoolWorker-4] Task djangoproject.tasks.test_task[260acf17-543d-4c07-8083-bd7ec1f4f01c] succeeded in 0.0045628749940078706s: None
[2025-02-23 11:56:07,662: DEBUG/MainProcess] https://simplemq.tk1b.api.sacloud.jp:443 "DELETE /v1/queues/test-queue/messages/019530ba-13a2-75a5-a806-040ab5d53441 HTTP/1.1" 200 20
[2025-02-23 11:56:07,664: DEBUG/MainProcess] Starting new HTTPS connection (1): simplemq.tk1b.api.sacloud.jp:443
[2025-02-23 11:56:07,729: DEBUG/MainProcess] https://simplemq.tk1b.api.sacloud.jp:443 "DELETE /v1/queues/test-queue/messages/019530ba-066c-7583-8fad-7f49a9f13426 HTTP/1.1" 200 20
[2025-02-23 11:56:07,731: DEBUG/MainProcess] Starting new HTTPS connection (1): simplemq.tk1b.api.sacloud.jp:443
[2025-02-23 11:56:07,807: DEBUG/MainProcess] https://simplemq.tk1b.api.sacloud.jp:443 "GET /v1/queues/test-queue/messages HTTP/1.1" 200 1507
[2025-02-23 11:56:07,809: INFO/MainProcess] Task djangoproject.tasks.test_task[95495221-d89f-4b55-bd86-64c3d9d02f37] received
[2025-02-23 11:56:07,809: DEBUG/MainProcess] TaskPool: Apply <function fast_trace_task at 0x103e8aac0> (args:('djangoproject.tasks.test_task', '95495221-d89f-4b55-bd86-64c3d9d02f37', {'lang': 'py', 'task': 'djangoproject.tasks.test_task', 'id': '95495221-d89f-4b55-bd86-64c3d9d02f37', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '95495221-d89f-4b55-bd86-64c3d9d02f37', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen98617@PCNAME', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': '95495221-d89f-4b55-bd86-64c3d9d02f37', 'reply_to': '8df5f54e-6ddb-32b3-a3e1-2fc2991d457e', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'test-queue'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '019530ba-11af-78ea-a11f-9ffe9011c412'}, 'reply_to': '8df5f54e-6ddb-32b3-a3e1-2fc2991d457e', 'correlation_id': '95495221-d89f-4b55-bd86-64c3d9d02f37', 'hostname': 'celery@PCNAME', 'delivery_info': {'exchange': '', 'routing_key':... kwargs:{})
[2025-02-23 11:56:07,810: DEBUG/MainProcess] Starting new HTTPS connection (1): simplemq.tk1b.api.sacloud.jp:443
[2025-02-23 11:56:07,811: DEBUG/ForkPoolWorker-5] テストタスクが実行されました
[2025-02-23 11:56:07,813: INFO/ForkPoolWorker-5] Task djangoproject.tasks.test_task[95495221-d89f-4b55-bd86-64c3d9d02f37] succeeded in 0.002412707981420681s: None
[2025-02-23 11:56:07,872: DEBUG/MainProcess] https://simplemq.tk1b.api.sacloud.jp:443 "GET /v1/queues/test-queue/messages HTTP/1.1" 200 34
[2025-02-23 11:56:08,035: DEBUG/MainProcess] Timer wake-up! Next ETA 0.9984511669899803 secs.
[2025-02-23 11:56:08,878: DEBUG/MainProcess] Starting new HTTPS connection (1): simplemq.tk1b.api.sacloud.jp:443
[2025-02-23 11:56:08,936: DEBUG/MainProcess] https://simplemq.tk1b.api.sacloud.jp:443 "GET /v1/queues/test-queue/messages HTTP/1.1" 200 34
3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?