0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Moto Server × boto3 でローカルSQS環境を構築してみた

0
Last updated at Posted at 2026-05-31

1. はじめに

前回の記事では、Moto Server を利用してローカル上に S3 モック環境を構築し、さらに exporter を使って S3 の中身をローカルファイルとして可視化する仕組みを作成しました。

今回はその続きとして、

  • SQS モック環境
  • キュー状態監視
  • ローカルでのメッセージ検証

を追加していきます。

SQS は内部状態が見えづらく「メッセージ本文」ではなく「キュー状態」を可視化する
という方針で、SQS monitor を作成してみました。

やりたいこと

  • SQS(boto3)を使った処理をローカルで検証したい
  • キュー状態をローカルで確認したい
  • S3 と同じ moto 環境で扱いたい

この構成のメリット

  • AWSに接続せず安全に開発できる
  • boto3 のコードをほぼそのまま利用できる
  • SQS の状態を可視化できる
  • S3 と統一構成で扱える
  • ローカルだけでキュー処理を検証できる

2. 今回作るもの

今回の構成

ローカル環境
├─ moto_server(S3 / SQS モックAPI)
├─ exporter(S3差分同期 ※前回作成)
├─ sqs_monitor(SQS状態監視 ※今回追加)
└─ 任意のアプリ(boto3で接続)

ポイント

  • Moto Server を使って SQS API をローカルで再現
  • boto3 からそのまま接続可能
  • sqs_monitor が SQS状態を定期監視
  • JSONログとして状態確認できる
  • S3 と同じローカルAWS環境で扱える

使用技術

  • Python
  • boto3
  • Moto(SQSモック)
  • subprocess(プロセス管理)

前回の記事では、Moto Server を使ってローカル S3 環境を構築したので
今回はその続きとして、 SQS のモック環境 + キュー監視機能を追加していきます。

作成ファイル 内容
exporter.py S3差分同期
sqs_monitor.py SQS状態監視
JSONログ出力

3. 今回の方針

receive_message()を使うと、実際にメッセージを受信してしまうため、
単純に「覗く」ことが難しいです。
そのため今回は、「メッセージ本文」ではなく「キュー状態」を監視する方針にしました。

監視する項目

属性 内容
ApproximateNumberOfMessages キューに見えているメッセージ数
ApproximateNumberOfMessagesNotVisible 処理中メッセージ数
ApproximateNumberOfMessagesDelayed Delay 中メッセージ数

4. ディレクトリ全体

今回やることは5点

  • config.py にSQSの内容追加
  • runner.py にSQSの内容追加
  • sqs_monitor.py を新規作成
  • send_sample_sqs_message.py を新規作成(サンプル作成用)
  • sqs_worker.py を新規作成(サンプル処理確認用)
.
├── moto_export #自動で作られる
│   └── manifest.json
├── moto_logs #自動で作られる
│   └── sqs_monitor.log
└── moto_server
    ├── config.py #SQSの内容追加
    ├── exporter.py
    ├── runner.py #SQSの内容追加
    ├── sqs_monitor.py #新規作成
    ├── samples
    |   ├── delete_sample_objects.py
    |   ├── put_sample_object.py
    |   └── send_sample_sqs_message.py #新規作成
    └── sqs_worker.py #新規作成

5. config.py へ追加

SQS用の設定値を追加します。

config.py
# moto server のポート番号
# runner.py でこのポートで起動される
MOTO_PORT = 9876

# boto3 から接続するエンドポイント
# 本来の AWS ではなくローカルの moto を指す
MOTO_ENDPOINT = f"http://localhost:{MOTO_PORT}"

# 使用するリージョン
# バケット作成時の設定にも使用される
AWS_REGION = "ap-northeast-1"

# S3の内容をローカルに保存するディレクトリ
# exporter がこの配下にファイルを書き出す
EXPORT_DIR = "./moto_export"

# ポーリング間隔(秒)
# exporter が S3 の変更をチェックする周期
EXPORT_INTERVAL = 15

# 削除同期を有効にするかどうか
# True にすると、S3 から削除されたファイルをローカルでも削除する
DELETE_SYNC = False

# 必須バケット一覧
# exporter 起動時に存在しなければ自動作成される
REQUIRED_BUCKETS = ["test-bucket", "report-bucket"]

+ # SQS
+ # 監視するキューのリスト
+ REQUIRED_QUEUES = ["sample-queue", "sample-dlq"]
+ # ログを保存するディレクトリ
+ SQS_LOG_DIR = "./moto_logs"
+ # SQS 監視の間隔(秒)
+ SQS_MONITOR_INTERVAL = 5

6. runner.py へ追加

S3のexporterと一緒に、SQSモニタも一緒に起動します。
合わせて、module 実行の形へ変更します。

runner.py
import signal
import socket
import subprocess
import sys
import time

- import config
+ from . import config

# 起動した子プロセスを管理するリスト
# (name, process) のタプルで保持する
processes = []


def wait_for_port(host: str, port: int, timeout: int = 10) -> None:
    """
    指定したホスト・ポートが接続可能になるまで待機する
    moto_server の起動完了待ちに使用
    """
    start = time.time()
    while True:
        try:
            # ソケット接続できれば起動済みと判断
            with socket.create_connection((host, port), timeout=1):
                return
        except OSError:
            # タイムアウトを超えたらエラー
            if time.time() - start > timeout:
                raise RuntimeError(f"Port {host}:{port} did not open in time")
            # 少し待って再試行
            time.sleep(0.5)


def stop_process(name: str, proc: subprocess.Popen) -> None:
    """
    子プロセスを安全に停止する
    terminate → wait → kill の順で確実に終了させる
    """
    # すでに終了している場合は何もしない
    if proc.poll() is not None:
        return

    print(f"Stopping {name}...")

    # まずは穏やかに終了を要求
    proc.terminate()

    try:
        # 一定時間待つ
        proc.wait(timeout=5)
    except subprocess.TimeoutExpired:
        # 停止しない場合は強制終了
        print(f"{name} did not stop with terminate(); killing...")
        proc.kill()
        proc.wait(timeout=5)


def stop_all():
    """
    すべての子プロセスを停止する
    reversed にすることで、起動した順の逆で終了させる
    (依存関係を意識)
    """
    for name, proc in reversed(processes):
        try:
            stop_process(name, proc)
        except Exception as e:
            print(f"[warn] failed to stop {name}: {e}")


def handle_signal(signum, frame):
    """
    Ctrl+C(SIGINT)や SIGTERM を受けた時の処理
    すべての子プロセスを停止して終了する
    """
    print(f"Received signal {signum}, shutting down...")
    stop_all()
    sys.exit(0)


def main():
    # Ctrl+C や kill コマンドに対応
    signal.signal(signal.SIGINT, handle_signal)
    signal.signal(signal.SIGTERM, handle_signal)

    # --- moto server 起動 ---
    moto_proc = subprocess.Popen(
        ["moto_server", "-p", str(config.MOTO_PORT)],
    )
    processes.append(("moto", moto_proc))

    # moto server の起動完了を待つ(ポートが開くまで)
    wait_for_port("localhost", config.MOTO_PORT, timeout=10)

    # --- exporter 起動 ---
-    export_proc = subprocess.Popen(
-        ["python", "moto_server/exporter.py"],
-    )
+    export_proc = subprocess.Popen(
+        [
+            sys.executable,
+            "-m",
+            "moto_server.exporter",
+        ]
+    )
    processes.append(("exporter", export_proc))

+    # --- SQSモニタ起動 ---
+    sqs_monitor_proc = subprocess.Popen(
+        [
+            sys.executable,
+            "-m",
+            "moto_server.sqs_monitor",
+        ]
+    )
+    processes.append(("sqs-monitor", sqs_monitor_proc))

-    print("moto + exporter started. Press Ctrl+C to stop.")
+    # 全プロセス起動完了message
+    print("moto + exporter + sqs-monitor started. Press Ctrl+C to stop.")

    try:
        # 親プロセスは常駐して監視する
        while True:
            # 子プロセスが落ちていないか軽く監視
            for name, proc in processes:
                code = proc.poll()
                if code is not None:
                    # 子プロセスが異常終了した場合は例外を投げる
                    raise RuntimeError(f"{name} exited unexpectedly with code {code}")

            # CPU負荷を下げるため少し待つ
            time.sleep(1)

    finally:
        # 例外・終了時に必ず全プロセスを停止
        stop_all()


if __name__ == "__main__":
    main()

6. sqs_monitor.py の新規作成

sqs_monitor.py は以下を担当します。

  • 必須キュー作成
  • キュー状態取得
  • JSONログ出力
  • 定期監視
sqs_monitor.py
import json
import signal
import time
from datetime import UTC, datetime
from pathlib import Path

import boto3
from botocore.exceptions import BotoCoreError, ClientError

from . import config

# -----------------------------
# config.py から設定値を読み込む
# -----------------------------

# moto server のエンドポイント
# boto3 が AWS 本番ではなくローカル moto に接続するために使用する
MOTO_ENDPOINT = config.MOTO_ENDPOINT

# AWS リージョン
AWS_REGION = config.AWS_REGION

# 起動時に存在保証したいキュー一覧
# 存在しない場合は monitor 側で自動作成する
REQUIRED_QUEUES = config.REQUIRED_QUEUES

# ログ出力先ディレクトリ
LOG_DIR = Path(config.SQS_LOG_DIR)

# SQS監視ログファイル
LOG_PATH = LOG_DIR / "sqs_monitor.log"

# SQS状態をチェックする間隔(秒)
INTERVAL = config.SQS_MONITOR_INTERVAL

# メインループ継続フラグ
# Ctrl+C や SIGTERM を受けたら False にして安全停止する
running = True


def handle_signal(signum, frame):
    """
    Ctrl+C(SIGINT)や SIGTERM を受けた時に呼ばれる

    running を False にして
    メインループを安全に終了させる
    """
    global running
    running = False


# シグナル受信時の処理を登録
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)


def create_sqs_client():
    """
    moto server に接続する SQS クライアントを作成する

    endpoint_url を指定することで、
    boto3 が本物の AWS ではなく moto に接続する
    """
    return boto3.client(
        "sqs",
        region_name=AWS_REGION,
        aws_access_key_id="dummy",
        aws_secret_access_key="dummy",
        endpoint_url=MOTO_ENDPOINT,
    )


def write_log(event: dict) -> None:
    """
    JSON Lines 形式でログを書き込む

    1行ごとに JSON を出力することで、
    後から grep や jq などで解析しやすくしている
    """

    # ログディレクトリがなければ作成
    LOG_DIR.mkdir(parents=True, exist_ok=True)

    # ログレコード作成
    # logged_at を付与して記録時刻を残す
    record = {
        "logged_at": datetime.now(UTC).isoformat(),
        **event,
    }

    # append モードで追記
    with LOG_PATH.open("a", encoding="utf-8") as f:
        f.write(json.dumps(record, ensure_ascii=False) + "\n")


def ensure_required_queues(sqs) -> dict[str, str]:
    """
    必須キューを存在保証する

    REQUIRED_QUEUES に指定されたキューを作成し、
    QueueName -> QueueUrl の dict を返す

    create_queue は既存キューに対しても安全に呼べるため、
    毎回実行しても問題ない
    """

    queue_urls = {}

    for queue_name in REQUIRED_QUEUES:

        # キュー作成
        response = sqs.create_queue(
            QueueName=queue_name,
        )

        # QueueUrl を取得
        queue_url = response["QueueUrl"]

        queue_urls[queue_name] = queue_url

        # ログ出力
        write_log({
            "event": "ensure_queue",
            "queue_name": queue_name,
            "queue_url": queue_url,
        })

        print(f"[created queue] {queue_name}", flush=True)

    return queue_urls


def get_queue_attributes(sqs, queue_url: str) -> dict:
    """
    SQSキューの状態を取得する

    Approximate 系は SQS が持つ概算値で、
    現在のキュー状態を把握するために使用する
    """

    response = sqs.get_queue_attributes(
        QueueUrl=queue_url,
        AttributeNames=[
            # キューに見えているメッセージ数
            "ApproximateNumberOfMessages",

            # receive_message 後で
            # VisibilityTimeout 中のメッセージ数
            "ApproximateNumberOfMessagesNotVisible",

            # DelaySeconds により遅延中のメッセージ数
            "ApproximateNumberOfMessagesDelayed",
        ],
    )

    return response.get("Attributes", {})


def monitor_once(sqs, queue_urls: dict[str, str]) -> None:
    """
    1回分の監視処理を実行する

    各キューの状態を取得し、
    ログファイルおよび標準出力へ出力する
    """

    for queue_name, queue_url in queue_urls.items():

        # 現在のキュー状態を取得
        attrs = get_queue_attributes(sqs, queue_url)

        # JSONログへ出力
        write_log({
            "event": "queue_status",
            "queue_name": queue_name,
            "queue_url": queue_url,
            "attributes": attrs,
        })

        # ターミナルにも表示
        print(
            f"[sqs] {queue_name}: {attrs}",
            flush=True,
        )


def main():
    """
    SQS monitor のメイン処理

    1. SQSクライアント作成
    2. 必須キュー作成
    3. 一定間隔で状態監視
    """

    print("SQS monitor started.", flush=True)

    # SQSクライアント作成
    sqs = create_sqs_client()

    # 必須キューを作成
    queue_urls = ensure_required_queues(sqs)

    # メイン監視ループ
    while running:
        try:
            # 1回分の監視
            monitor_once(sqs, queue_urls)

        except (ClientError, BotoCoreError, OSError) as e:

            # 想定される boto3 / I/O 系エラー
            write_log({
                "event": "warn",
                "message": str(e),
            })

            print(
                f"[warn] sqs monitor failed: {e}",
                flush=True,
            )

        except Exception as e:

            # 想定外エラー
            write_log({
                "event": "unexpected_error",
                "message": str(e),
            })

            print(
                f"[warn] unexpected error: {e}",
                flush=True,
            )

        # 1秒ずつ待つことで
        # Ctrl+C に素早く反応できるようにしている
        for _ in range(INTERVAL):
            if not running:
                break
            time.sleep(1)

    print("SQS monitor stopped cleanly.", flush=True)


if __name__ == "__main__":
    main()

sqs_monitor.py の全体像

① SQS client 作成
② 必須キュー作成
③ キュー状態取得
④ JSONログ出力
⑤ 一定間隔で繰り返し

①SQS client 作成

boto3 client を作成します。

def create_sqs_client():
    return boto3.client(
        "sqs",
        region_name=AWS_REGION,
        aws_access_key_id="dummy",
        aws_secret_access_key="dummy",
        endpoint_url=MOTO_ENDPOINT,
    )

endpoint_url=MOTO_ENDPOINT を指定することで、ローカル moto server へ向けています。

②必須queue 作成

今回の monitor では、 ensure_required_queues() を利用して、
起動時にキューを自動作成しローカル開発で毎回 queue 作成しなくて良い形になっています。

SQS の create_queue() は便利で、

  • 既存ならそのまま返す
  • なければ作成する

という動きをしてくれます。つまり毎回呼んでも安全です。

③ キュー状態取得

今回取得している監視する項目は以下です。

項目 内容
ApproximateNumberOfMessages キューに見えている件数
ApproximateNumberOfMessagesNotVisible 処理中件数
ApproximateNumberOfMessagesDelayed Delay 中件数

最初はreceive_message() で中身を見ようとしましたが
これは実際に「受信」扱いとなり

  • VisibilityTimeout が始まる
  • アプリ側処理へ影響する

ため、今回は採用しませんでした。

④ JSONログ出力

取得した内容は write_log() でJSON Lines 形式で出力しています。

通常のテキストログではなく、
{"event":"queue_status", ...}
のように1行ずつ JSON を出すことで

  • grep しやすい
  • jq で解析できる
  • CloudWatch Logs に近い
  • 後から機械処理しやすい

ためです。

ログ例
{
    "logged_at": "2026-05-31T07:57:09.519223+00:00",
    "event": "queue_status",
    "queue_name": "sample-queue",
    "queue_url": "http://localhost:9876/123456789012/sample-queue",
    "attributes": {
        "ApproximateNumberOfMessages": "10",
        "ApproximateNumberOfMessagesNotVisible": "0",
        "ApproximateNumberOfMessagesDelayed": "0"
    }
}

⑤ 一定間隔で繰り返し

監視は以下で定期実行しています。

while running: 

なぜ1秒単位で sleep しているのか

単純に:

time.sleep(INTERVAL) 

でも動きます。

ただ今回は、 Ctrl+C に素早く反応したい ため、
1秒単位で待機しています。

for _ in range(INTERVAL):
    if not running:
        break
    time.sleep(1)

signal.signal(SIGINT, ...) を利用して、

  • Ctrl+C
  • terminate

時に安全停止しています。

7. runner.py との連携

最終的には runner.py から起動しています。

sqs_monitor_proc = subprocess.Popen(
    [
        sys.executable,
        "-m",
        "moto_server.sqs_monitor",
    ]
)

これにより、

  • moto server
  • exporter
  • sqs_monitor

をまとめて起動できます。

8. サンプルメッセージ作成

下記サンプル作成スクリプトを利用してキューにサンプルメッセージを投入します。

send_sample_sqs_message.py
from datetime import UTC, datetime
import json

import boto3

from .. import config

# moto server 接続先
MOTO_ENDPOINT = config.MOTO_ENDPOINT

# AWS リージョン
AWS_REGION = config.AWS_REGION

# サンプル投入先キュー
QUEUE_NAME = "sample-queue"

# 送信件数
MESSAGE_COUNT = 10


def create_sqs_client():
    """
    moto server に接続する SQS クライアントを作成する
    """
    return boto3.client(
        "sqs",
        region_name=AWS_REGION,
        aws_access_key_id="dummy",
        aws_secret_access_key="dummy",
        endpoint_url=MOTO_ENDPOINT,
    )


def get_queue_url(sqs) -> str:
    """
    QueueName から QueueUrl を取得する
    """

    response = sqs.get_queue_url(
        QueueName=QUEUE_NAME,
    )

    return response["QueueUrl"]


def build_message(index: int) -> dict:
    """
    サンプルメッセージを作成する

    index を入れることで、
    どのメッセージか識別しやすくする
    """

    return {
        "event_type": "sample_event",
        "message_no": index,
        "message": f"hello moto sqs {index}",
        "created_at": datetime.now(UTC).isoformat(),
    }


def send_messages(sqs, queue_url: str) -> None:
    """
    複数メッセージを SQS へ投入する
    """

    for i in range(1, MESSAGE_COUNT + 1):

        # メッセージ生成
        message = build_message(i)

        # SQSへ送信
        response = sqs.send_message(
            QueueUrl=queue_url,

            MessageBody=json.dumps(
                message,
                ensure_ascii=False,
            ),
        )

        print(
            f"[sent] "
            f"queue={QUEUE_NAME} "
            f"message_no={i} "
            f"message_id={response['MessageId']}"
        )


def main():
    """
    動作確認用の複数メッセージを SQS へ投入する
    """

    print(
        f"Sending {MESSAGE_COUNT} messages "
        f"to queue={QUEUE_NAME}"
    )

    # SQS client 作成
    sqs = create_sqs_client()

    # QueueUrl 取得
    queue_url = get_queue_url(sqs)

    # 複数メッセージ送信
    send_messages(sqs, queue_url)

    print("Send completed.")


if __name__ == "__main__":
    main()

こちらを実行すると、sqs_monitor.logにてメッセージが追加されたことが確認出来ます。

├── moto_logs #設定した出力先
│   └── sqs_monitor.log

9. SQS Worker を追加して動作確認

ここまでで、

  • SQS へメッセージ投入
  • キュー状態監視

まではできるようになったので
簡易的な SQS Worker を追加して動作確認をしてみます。

Workerの処理の流れ

今回の Worker は以下のような処理をします。

SQS
 ↓
receive_message()
 ↓
Worker
 ↓
処理
 ↓
delete_message()

ポイント

  • ロングポーリング
  • 複数メッセージ取得
  • ThreadPoolExecutor による並列処理
  • 処理成功後のみ delete_message
import json
import random
import signal
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

import boto3
from botocore.exceptions import BotoCoreError, ClientError

from . import config

MOTO_ENDPOINT = config.MOTO_ENDPOINT
AWS_REGION = config.AWS_REGION
QUEUE_NAME = "sample-queue"

# 並列ワーカー数
MAX_WORKERS = 5

running = True


def handle_signal(signum, frame):
    """
    Ctrl+C や terminate を受けたら、
    次のループで安全に停止する
    """
    global running
    running = False


signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)


def create_sqs_client():
    """
    ローカルの moto server に接続する SQS クライアントを作成する

    本番AWSで動かす場合は endpoint_url を外す
    """
    return boto3.client(
        "sqs",
        region_name=AWS_REGION,
        aws_access_key_id="dummy",
        aws_secret_access_key="dummy",
        endpoint_url=MOTO_ENDPOINT,
    )


def get_queue_url(sqs) -> str:
    response = sqs.get_queue_url(QueueName=QUEUE_NAME)
    return response["QueueUrl"]


def process_message(message: dict) -> None:
    """
    ここに実際の業務処理を書く
    今回はメッセージをコンソールに表示するだけ
    """
    body = message.get("Body", "")

    try:
        parsed_body = json.loads(body)
    except json.JSONDecodeError:
        parsed_body = body

    message_id = message.get("MessageId")

    # 処理に時間がかかることを想定して、あえてランダムに1〜3秒待つ
    sleep_seconds = random.randint(1, 3)

    print(
        f"[process] message_id={message_id} sleeping {sleep_seconds} seconds...",
        flush=True,
    )

    time.sleep(sleep_seconds)

    log_message = (
        "\n"
        "===== received message =====\n"
        f"MessageId: {message_id}\n"
        f"Body: {parsed_body}\n"
        "============================\n"
    )
    print(log_message, flush=True)


def handle_message(message: dict, queue_url: str) -> None:
    """
    1件分のメッセージ処理を行う

    処理に成功した場合のみ delete_message する。
    失敗した場合は削除しないため、VisibilityTimeout 後に再取得される。
    """
    sqs = create_sqs_client()
    message_id = message.get("MessageId")

    try:
        process_message(message)

        sqs.delete_message(
            QueueUrl=queue_url,
            ReceiptHandle=message["ReceiptHandle"],
        )

        print(
            f"[deleted] message_id={message_id}",
            flush=True,
        )

    except Exception as e:
        print(
            f"[error] failed to process message_id={message_id}: {e}",
            flush=True,
        )


def poll_messages(sqs, queue_url: str, executor: ThreadPoolExecutor) -> None:
    """
    SQSをロングポーリングしてメッセージを取得し、
    取得したメッセージを複数ワーカーで並列処理する
    """
    response = sqs.receive_message(
        QueueUrl=queue_url,
        MaxNumberOfMessages=10,
        WaitTimeSeconds=20,
        VisibilityTimeout=30,
    )

    messages = response.get("Messages", [])

    if not messages:
        print("[poll] no messages", flush=True)
        return

    print(f"[poll] received {len(messages)} messages", flush=True)

    futures = [
        executor.submit(handle_message, message, queue_url)
        for message in messages
    ]

    for future in as_completed(futures):
        try:
            future.result()
        except Exception as e:
            print(f"[error] worker failed: {e}", flush=True)


def main():
    print(
        f"SQS worker started. max_workers={MAX_WORKERS}",
        flush=True,
    )

    sqs = create_sqs_client()
    queue_url = get_queue_url(sqs)

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        while running:
            try:
                poll_messages(sqs, queue_url, executor)

            except (ClientError, BotoCoreError) as e:
                print(f"[warn] SQS error: {e}", flush=True)
                time.sleep(5)

            except Exception as e:
                print(f"[warn] unexpected error: {e}", flush=True)
                time.sleep(5)

    print("SQS worker stopped cleanly.", flush=True)


if __name__ == "__main__":
    main()

ロングポーリング、複数メッセージ取得

response = sqs.receive_message(
    QueueUrl=queue_url,
    MaxNumberOfMessages=10,
    WaitTimeSeconds=20,
)

SQS ではWaitTimeSeconds=20を指定しています。
これにより、1回のポーリングで最大20秒待機し

  • メッセージがない時に高速ループしない
  • CPU負荷を下げる
  • 無駄なポーリングを減らす

ことができます。

またMaxNumberOfMessages=10を設定しているため、最大10個を一括で取得できます。

並列処理

取得したメッセージは ThreadPoolExecutor で並列処理しています。

executor.submit(
    handle_message,
    message,
    queue_url,
)

今回の処理は

  • API通信
  • SQS待ち
  • sleep

などI/O 待ち中心を想定し、並列処理を入れてます。

delete_message のタイミング

SQS では非常に重要です。

process_message() が成功後のみ
delete_message()しています。

もし処理失敗時にも delete すると、
メッセージが失われます。

逆に delete しなければ、
VisibilityTimeout 後に再取得
されてしまいます。

今回のサンプル処理用のスリープ

今回は動作確認のため、time.sleep(random.randint(1, 3))を入れています。

これにより、

  • 処理時間ばらつき
  • 並列動作
  • キュー滞留

を確認しやすくしています。

テスト実行

まずは、runnerを起動

python -m moto_server.runner

次にメッセージ投入

python -m moto_server.samples.send_sample_sqs_message

ここでlogを確認すると投入したメッセージが溜まっているのが分かります。

├── moto_logs #設定した出力先
│   └── sqs_monitor.log

最後にWorker 起動:

python -m moto_server.sqs_worker

実行結果

[poll] received 10 messages
[process] message_id=efa0dd76-4c89-4310-9342-30665a241a86 sleeping 2 seconds...
[process] message_id=64ed8308-d690-4676-a9e8-7d41bba1d203 sleeping 2 seconds...
[process] message_id=5d88367e-342b-4104-95a3-2292c6d955c6 sleeping 2 seconds...
[process] message_id=70c02e1c-223c-485a-acc3-266b6494b2c9 sleeping 1 seconds...
[process] message_id=e86a2691-2eba-4b8d-b6b4-674eef58db39 sleeping 1 seconds...

===== received message =====
MessageId: 70c02e1c-223c-485a-acc3-266b6494b2c9
Body: {'event_type': 'sample_event', 'message_no': 4, 'message': 'hello moto sqs 4', 'created_at': '2026-05-31T12:26:01.272195+00:00'}
============================


===== received message =====
MessageId: e86a2691-2eba-4b8d-b6b4-674eef58db39
Body: {'event_type': 'sample_event', 'message_no': 5, 'message': 'hello moto sqs 5', 'created_at': '2026-05-31T12:26:01.275648+00:00'}
============================

[deleted] message_id=70c02e1c-223c-485a-acc3-266b6494b2c9
[process] message_id=86e39bc8-64dc-4594-a305-1afdf97edb95 sleeping 2 seconds...
[deleted] message_id=e86a2691-2eba-4b8d-b6b4-674eef58db39
[process] message_id=a4ffc1c4-d1a6-40a5-a434-79f0a235754c sleeping 2 seconds...

===== received message =====
MessageId: 5d88367e-342b-4104-95a3-2292c6d955c6
Body: {'event_type': 'sample_event', 'message_no': 2, 'message': 'hello moto sqs 2', 'created_at': '2026-05-31T12:26:01.264599+00:00'}
============================


===== received message =====
MessageId: efa0dd76-4c89-4310-9342-30665a241a86
Body: {'event_type': 'sample_event', 'message_no': 1, 'message': 'hello moto sqs 1', 'created_at': '2026-05-31T12:26:01.259607+00:00'}
============================

並列で処理されていることが確認できます。

今回追加した Worker は非常にシンプルですが、

  • ロングポーリング
  • 並列処理
  • delete_message の考え方

など、SQS Worker の基本構成を含んでいます。

おわりに

SQS は内部状態が見えづらく、ローカル検証しにくいサービスですが、
Moto + sqs_monitor を組み合わせることで、
簡易的に状態を見ながら開発できる環境を作ることができました。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?