「シンプルMQ」が2025年2月6日にさくらのクラウドに追加されました。
「シンプルMQ」はソフトウェア同士を非同期に連携するために、ソフトウェアコンポーネント間でのデータの送受信ができるマネージド型のメッセージキューサービスです。
Pythonを使って非同期の処理を複数のワーカーで分散処理する場合には、タスクキューとして Celery を使うことがよくあると思います。
Celeryでは実際に利用するキューとしてオンメモリやデータベース、Amazon SQS、Redisなどのキューを利用することができます。
同様にしてさくらのクラウドのシンプルMQをCeleryのバックエンドとして使えるように、ドライバ chibiegg/kombu-sakura-simplemq を書きましたので紹介します。
(正確にはCeleryが利用しているMQのラッパーである kombu のTransportです)
Celeryは利用したことがある方をメインターゲットに書いています。Celery自体の使い方については他の情報もご参照ください。
インストール
PyPI にパッケージを登録していますので、pipでインストールできます。
pip install kombu-sakura-simplemq
さくらのクラウドの準備
事前にさくらのクラウドのコントロールパネルでシンプルMQのキューを作成しておきます。
キュー名はこの後使いますので、控えておいてください。
何のキューなのかわかりやすい名前をつけておきます。
キューを作成するとメッセージの送受信に必要なAPIキーが表示されます。
これも利用しますので控えておいてください。
DjangoにCeleryを設定する
Celeryパッケージのインストール
pip install celery
settings.pyにCeleryの設定を追加
from kombu import transport
transport.TRANSPORT_ALIASES["sakura-simplemq"] = "kombu_sakura_simplemq.transport:Transport"
# celery configurations
CELERY_BROKER_URL = "sakura-simplemq://:{}@".format(
os.environ.get("SIMPLE_MQ_API_KEY", ""),
)
CELERY_TASK_DEFAULT_QUEUE = "test-queue" # 作成しておいたキューの名前
CELERY_RESULT_BACKEND = None
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_TIMEZONE = "Asia/Tokyo"
celery.pyを追加
import os
import django
from django.conf import settings
# celeryで使うDjangoの設定ファイル(settings.py)を指定
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djangoproject.settings")
django.setup()
from celery import Celery # noqa: E402
app = Celery("djangoproject")
# Djangoのconfigファイルをceleryのconfigとして使う宣言
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# 利用されるタスクをimportしておく
from djangoproject.tasks import test_task # noqa: E402,F401
タスクを定義
import logging
from djangoproject.celery import app
logger = logging.getLogger(__name__)
@app.task
def test_task():
logger.debug("テストタスクが実行されました")
ジョブの実行
非同期に実行するには定義したタスクを delay()
で呼び出します。
非同期でタスクを実行すると、シンプルMQのキューにメッセージが蓄積されます。
from djangoproject.tasks import test_task
test_task.delay()
ワーカーの起動
他のキューを利用した場合と同様にCeleryのワーカーを起動すると、先ほど蓄積したタスクが実行されます。
celery -A djangoproject worker -l DEBUG
% celery -A djangoproject worker -l DEBUG
[2025-02-23 11:56:06,765: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2025-02-23 11:56:06,782: DEBUG/MainProcess] | Worker: Building graph...
[2025-02-23 11:56:06,783: DEBUG/MainProcess] | Worker: New boot order: {Beat, StateDB, Timer, Hub, Pool, Autoscaler, Consumer}
[2025-02-23 11:56:06,848: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2025-02-23 11:56:06,849: DEBUG/MainProcess] | Consumer: Building graph...
[2025-02-23 11:56:06,883: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Heart, Agent, Mingle, Gossip, Tasks, Control, event loop}
[2025-02-23 11:56:06,884: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
-------------- celery@PCNAME v5.4.0 (opalescent)
--- ***** -----
-- ******* ---- macOS-15.3-arm64-arm-64bit 2025-02-23 11:56:06
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: djangoproject:0x108fe1310
- ** ---------- .> transport: sakura-simplemq://:**@localhost//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 12 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> test-queue exchange=test-queue(direct) key=test-queue
[tasks]
. celery.accumulate
. celery.backend_cleanup
. celery.chain
. celery.chord
. celery.chord_unlock
. celery.chunks
. celery.group
. celery.map
. celery.starmap
. djangoproject.tasks.test_task
[2025-02-23 11:56:06,935: DEBUG/MainProcess] | Worker: Starting Pool
[2025-02-23 11:56:07,010: DEBUG/MainProcess] ^-- substep ok
[2025-02-23 11:56:07,011: DEBUG/MainProcess] | Worker: Starting Consumer
[2025-02-23 11:56:07,013: DEBUG/MainProcess] | Consumer: Starting Connection
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
warnings.warn(
[2025-02-23 11:56:07,032: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2025-02-23 11:56:07,032: INFO/MainProcess] Connected to sakura-simplemq://:**@localhost//
[2025-02-23 11:56:07,032: DEBUG/MainProcess] ^-- substep ok
[2025-02-23 11:56:07,032: DEBUG/MainProcess] | Consumer: Starting Events
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
warnings.warn(
[2025-02-23 11:56:07,033: DEBUG/MainProcess] ^-- substep ok
[2025-02-23 11:56:07,033: DEBUG/MainProcess] | Consumer: Starting Heart
[2025-02-23 11:56:07,034: DEBUG/MainProcess] ^-- substep ok
[2025-02-23 11:56:07,034: DEBUG/MainProcess] | Consumer: Starting Tasks
[2025-02-23 11:56:07,034: DEBUG/MainProcess] Timer wake-up! Next ETA 1.0 secs.
[2025-02-23 11:56:07,045: DEBUG/MainProcess] ^-- substep ok
[2025-02-23 11:56:07,045: DEBUG/MainProcess] | Consumer: Starting event loop
[2025-02-23 11:56:07,045: INFO/MainProcess] celery@PCNAME ready.
[2025-02-23 11:56:07,045: DEBUG/MainProcess] basic.qos: prefetch_count->48
[2025-02-23 11:56:07,048: DEBUG/MainProcess] Starting new HTTPS connection (1): simplemq.tk1b.api.sacloud.jp:443
[2025-02-23 11:56:07,170: DEBUG/MainProcess] https://simplemq.tk1b.api.sacloud.jp:443 "GET /v1/queues/test-queue/messages HTTP/1.1" 200 1507
[2025-02-23 11:56:07,175: INFO/MainProcess] Task djangoproject.tasks.test_task[51a56eef-ce55-467a-8950-38df307895b0] received
[2025-02-23 11:56:07,175: DEBUG/MainProcess] TaskPool: Apply <function fast_trace_task at 0x103e8aac0> (args:('djangoproject.tasks.test_task', '51a56eef-ce55-467a-8950-38df307895b0', {'lang': 'py', 'task': 'djangoproject.tasks.test_task', 'id': '51a56eef-ce55-467a-8950-38df307895b0', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '51a56eef-ce55-467a-8950-38df307895b0', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen98617@PCNAME', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': '51a56eef-ce55-467a-8950-38df307895b0', 'reply_to': '8df5f54e-6ddb-32b3-a3e1-2fc2991d457e', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'test-queue'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '019530b9-ddfd-7520-93f0-8d22b8b0e616'}, 'reply_to': '8df5f54e-6ddb-32b3-a3e1-2fc2991d457e', 'correlation_id': '51a56eef-ce55-467a-8950-38df307895b0', 'hostname': 'celery@PCNAME', 'delivery_info': {'exchange': '', 'routing_key':... kwargs:{})
[2025-02-23 11:56:07,177: DEBUG/MainProcess] Starting new HTTPS connection (1): simplemq.tk1b.api.sacloud.jp:443
[2025-02-23 11:56:07,178: DEBUG/ForkPoolWorker-1] テストタスクが実行されました
[2025-02-23 11:56:07,180: INFO/ForkPoolWorker-1] Task djangoproject.tasks.test_task[51a56eef-ce55-467a-8950-38df307895b0] succeeded in 0.0026704999909270555s: None
[2025-02-23 11:56:07,267: DEBUG/MainProcess] https://simplemq.tk1b.api.sacloud.jp:443 "GET /v1/queues/test-queue/messages HTTP/1.1" 200 1507
[2025-02-23 11:56:07,268: INFO/MainProcess] Task djangoproject.tasks.test_task[642cb8dc-da32-4061-bbe8-2870f7bdd5c9] received
[2025-02-23 11:56:07,269: DEBUG/MainProcess] TaskPool: Apply <function fast_trace_task at 0x103e8aac0> (args:('djangoproject.tasks.test_task', '642cb8dc-da32-4061-bbe8-2870f7bdd5c9', {'lang': 'py', 'task': 'djangoproject.tasks.test_task', 'id': '642cb8dc-da32-4061-bbe8-2870f7bdd5c9', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '642cb8dc-da32-4061-bbe8-2870f7bdd5c9', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen98617@PCNAME', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': '642cb8dc-da32-4061-bbe8-2870f7bdd5c9', 'reply_to': '8df5f54e-6ddb-32b3-a3e1-2fc2991d457e', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'test-queue'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '019530ba-0eb8-7ce7-bd82-09c905872813'}, 'reply_to': '8df5f54e-6ddb-32b3-a3e1-2fc2991d457e', 'correlation_id': '642cb8dc-da32-4061-bbe8-2870f7bdd5c9', 'hostname': 'celery@PCNAME', 'delivery_info': {'exchange': '', 'routing_key':... kwargs:{})
[2025-02-23 11:56:07,271: DEBUG/MainProcess] Starting new HTTPS connection (1): simplemq.tk1b.api.sacloud.jp:443
[2025-02-23 11:56:07,271: DEBUG/ForkPoolWorker-2] テストタスクが実行されました
[2025-02-23 11:56:07,273: INFO/ForkPoolWorker-2] Task djangoproject.tasks.test_task[642cb8dc-da32-4061-bbe8-2870f7bdd5c9] succeeded in 0.0025518749898765236s: None
[2025-02-23 11:56:07,347: DEBUG/MainProcess] https://simplemq.tk1b.api.sacloud.jp:443 "DELETE /v1/queues/test-queue/messages/019530b9-ddfd-7520-93f0-8d22b8b0e616 HTTP/1.1" 200 20
[2025-02-23 11:56:07,350: DEBUG/MainProcess] Starting new HTTPS connection (1): simplemq.tk1b.api.sacloud.jp:443
[2025-02-23 11:56:07,428: DEBUG/MainProcess] https://simplemq.tk1b.api.sacloud.jp:443 "DELETE /v1/queues/test-queue/messages/019530ba-0eb8-7ce7-bd82-09c905872813 HTTP/1.1" 200 20
[2025-02-23 11:56:07,430: DEBUG/MainProcess] Starting new HTTPS connection (1): simplemq.tk1b.api.sacloud.jp:443
[2025-02-23 11:56:07,508: DEBUG/MainProcess] https://simplemq.tk1b.api.sacloud.jp:443 "GET /v1/queues/test-queue/messages HTTP/1.1" 200 1507
[2025-02-23 11:56:07,509: INFO/MainProcess] Task djangoproject.tasks.test_task[4773ee11-84d7-4520-9d13-3ee598942203] received
[2025-02-23 11:56:07,510: DEBUG/MainProcess] TaskPool: Apply <function fast_trace_task at 0x103e8aac0> (args:('djangoproject.tasks.test_task', '4773ee11-84d7-4520-9d13-3ee598942203', {'lang': 'py', 'task': 'djangoproject.tasks.test_task', 'id': '4773ee11-84d7-4520-9d13-3ee598942203', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '4773ee11-84d7-4520-9d13-3ee598942203', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen98617@PCNAME', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': '4773ee11-84d7-4520-9d13-3ee598942203', 'reply_to': '8df5f54e-6ddb-32b3-a3e1-2fc2991d457e', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'test-queue'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '019530ba-13a2-75a5-a806-040ab5d53441'}, 'reply_to': '8df5f54e-6ddb-32b3-a3e1-2fc2991d457e', 'correlation_id': '4773ee11-84d7-4520-9d13-3ee598942203', 'hostname': 'celery@PCNAME', 'delivery_info': {'exchange': '', 'routing_key':... kwargs:{})
[2025-02-23 11:56:07,511: DEBUG/MainProcess] Starting new HTTPS connection (1): simplemq.tk1b.api.sacloud.jp:443
[2025-02-23 11:56:07,512: DEBUG/ForkPoolWorker-3] テストタスクが実行されました
[2025-02-23 11:56:07,513: INFO/ForkPoolWorker-3] Task djangoproject.tasks.test_task[4773ee11-84d7-4520-9d13-3ee598942203] succeeded in 0.0018645829986780882s: None
[2025-02-23 11:56:07,589: DEBUG/MainProcess] https://simplemq.tk1b.api.sacloud.jp:443 "GET /v1/queues/test-queue/messages HTTP/1.1" 200 1507
[2025-02-23 11:56:07,590: INFO/MainProcess] Task djangoproject.tasks.test_task[260acf17-543d-4c07-8083-bd7ec1f4f01c] received
[2025-02-23 11:56:07,590: DEBUG/MainProcess] TaskPool: Apply <function fast_trace_task at 0x103e8aac0> (args:('djangoproject.tasks.test_task', '260acf17-543d-4c07-8083-bd7ec1f4f01c', {'lang': 'py', 'task': 'djangoproject.tasks.test_task', 'id': '260acf17-543d-4c07-8083-bd7ec1f4f01c', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '260acf17-543d-4c07-8083-bd7ec1f4f01c', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen98617@PCNAME', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': '260acf17-543d-4c07-8083-bd7ec1f4f01c', 'reply_to': '8df5f54e-6ddb-32b3-a3e1-2fc2991d457e', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'test-queue'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '019530ba-066c-7583-8fad-7f49a9f13426'}, 'reply_to': '8df5f54e-6ddb-32b3-a3e1-2fc2991d457e', 'correlation_id': '260acf17-543d-4c07-8083-bd7ec1f4f01c', 'hostname': 'celery@PCNAME', 'delivery_info': {'exchange': '', 'routing_key':... kwargs:{})
[2025-02-23 11:56:07,592: DEBUG/MainProcess] Starting new HTTPS connection (1): simplemq.tk1b.api.sacloud.jp:443
[2025-02-23 11:56:07,593: DEBUG/ForkPoolWorker-4] テストタスクが実行されました
[2025-02-23 11:56:07,597: INFO/ForkPoolWorker-4] Task djangoproject.tasks.test_task[260acf17-543d-4c07-8083-bd7ec1f4f01c] succeeded in 0.0045628749940078706s: None
[2025-02-23 11:56:07,662: DEBUG/MainProcess] https://simplemq.tk1b.api.sacloud.jp:443 "DELETE /v1/queues/test-queue/messages/019530ba-13a2-75a5-a806-040ab5d53441 HTTP/1.1" 200 20
[2025-02-23 11:56:07,664: DEBUG/MainProcess] Starting new HTTPS connection (1): simplemq.tk1b.api.sacloud.jp:443
[2025-02-23 11:56:07,729: DEBUG/MainProcess] https://simplemq.tk1b.api.sacloud.jp:443 "DELETE /v1/queues/test-queue/messages/019530ba-066c-7583-8fad-7f49a9f13426 HTTP/1.1" 200 20
[2025-02-23 11:56:07,731: DEBUG/MainProcess] Starting new HTTPS connection (1): simplemq.tk1b.api.sacloud.jp:443
[2025-02-23 11:56:07,807: DEBUG/MainProcess] https://simplemq.tk1b.api.sacloud.jp:443 "GET /v1/queues/test-queue/messages HTTP/1.1" 200 1507
[2025-02-23 11:56:07,809: INFO/MainProcess] Task djangoproject.tasks.test_task[95495221-d89f-4b55-bd86-64c3d9d02f37] received
[2025-02-23 11:56:07,809: DEBUG/MainProcess] TaskPool: Apply <function fast_trace_task at 0x103e8aac0> (args:('djangoproject.tasks.test_task', '95495221-d89f-4b55-bd86-64c3d9d02f37', {'lang': 'py', 'task': 'djangoproject.tasks.test_task', 'id': '95495221-d89f-4b55-bd86-64c3d9d02f37', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '95495221-d89f-4b55-bd86-64c3d9d02f37', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen98617@PCNAME', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': '95495221-d89f-4b55-bd86-64c3d9d02f37', 'reply_to': '8df5f54e-6ddb-32b3-a3e1-2fc2991d457e', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'test-queue'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '019530ba-11af-78ea-a11f-9ffe9011c412'}, 'reply_to': '8df5f54e-6ddb-32b3-a3e1-2fc2991d457e', 'correlation_id': '95495221-d89f-4b55-bd86-64c3d9d02f37', 'hostname': 'celery@PCNAME', 'delivery_info': {'exchange': '', 'routing_key':... kwargs:{})
[2025-02-23 11:56:07,810: DEBUG/MainProcess] Starting new HTTPS connection (1): simplemq.tk1b.api.sacloud.jp:443
[2025-02-23 11:56:07,811: DEBUG/ForkPoolWorker-5] テストタスクが実行されました
[2025-02-23 11:56:07,813: INFO/ForkPoolWorker-5] Task djangoproject.tasks.test_task[95495221-d89f-4b55-bd86-64c3d9d02f37] succeeded in 0.002412707981420681s: None
[2025-02-23 11:56:07,872: DEBUG/MainProcess] https://simplemq.tk1b.api.sacloud.jp:443 "GET /v1/queues/test-queue/messages HTTP/1.1" 200 34
[2025-02-23 11:56:08,035: DEBUG/MainProcess] Timer wake-up! Next ETA 0.9984511669899803 secs.
[2025-02-23 11:56:08,878: DEBUG/MainProcess] Starting new HTTPS connection (1): simplemq.tk1b.api.sacloud.jp:443
[2025-02-23 11:56:08,936: DEBUG/MainProcess] https://simplemq.tk1b.api.sacloud.jp:443 "GET /v1/queues/test-queue/messages HTTP/1.1" 200 34