1. はじめに
前回の記事では、Moto Server を利用してローカル上に S3 モック環境を構築し、さらに exporter を使って S3 の中身をローカルファイルとして可視化する仕組みを作成しました。
今回はその続きとして、
- SQS モック環境
- キュー状態監視
- ローカルでのメッセージ検証
を追加していきます。
SQS は内部状態が見えづらく「メッセージ本文」ではなく「キュー状態」を可視化する
という方針で、SQS monitor を作成してみました。
やりたいこと
- SQS(boto3)を使った処理をローカルで検証したい
- キュー状態をローカルで確認したい
- S3 と同じ moto 環境で扱いたい
この構成のメリット
- AWSに接続せず安全に開発できる
- boto3 のコードをほぼそのまま利用できる
- SQS の状態を可視化できる
- S3 と統一構成で扱える
- ローカルだけでキュー処理を検証できる
2. 今回作るもの
今回の構成
ローカル環境
├─ moto_server(S3 / SQS モックAPI)
├─ exporter(S3差分同期 ※前回作成)
├─ sqs_monitor(SQS状態監視 ※今回追加)
└─ 任意のアプリ(boto3で接続)
ポイント
- Moto Server を使って SQS API をローカルで再現
- boto3 からそのまま接続可能
- sqs_monitor が SQS状態を定期監視
- JSONログとして状態確認できる
- S3 と同じローカルAWS環境で扱える
使用技術
- Python
- boto3
- Moto(SQSモック)
- subprocess(プロセス管理)
前回の記事では、Moto Server を使ってローカル S3 環境を構築したので
今回はその続きとして、 SQS のモック環境 + キュー監視機能を追加していきます。
| 作成ファイル | 内容 |
|---|---|
| exporter.py | S3差分同期 |
| sqs_monitor.py | SQS状態監視 JSONログ出力 |
3. 今回の方針
receive_message()を使うと、実際にメッセージを受信してしまうため、
単純に「覗く」ことが難しいです。
そのため今回は、「メッセージ本文」ではなく「キュー状態」を監視する方針にしました。
監視する項目
| 属性 | 内容 |
|---|---|
| ApproximateNumberOfMessages | キューに見えているメッセージ数 |
| ApproximateNumberOfMessagesNotVisible | 処理中メッセージ数 |
| ApproximateNumberOfMessagesDelayed | Delay 中メッセージ数 |
4. ディレクトリ全体
今回やることは5点
- config.py にSQSの内容追加
- runner.py にSQSの内容追加
- sqs_monitor.py を新規作成
- send_sample_sqs_message.py を新規作成(サンプル作成用)
- sqs_worker.py を新規作成(サンプル処理確認用)
.
├── moto_export #自動で作られる
│ └── manifest.json
├── moto_logs #自動で作られる
│ └── sqs_monitor.log
└── moto_server
├── config.py #SQSの内容追加
├── exporter.py
├── runner.py #SQSの内容追加
├── sqs_monitor.py #新規作成
├── samples
| ├── delete_sample_objects.py
| ├── put_sample_object.py
| └── send_sample_sqs_message.py #新規作成
└── sqs_worker.py #新規作成
5. config.py へ追加
SQS用の設定値を追加します。
# moto server のポート番号
# runner.py でこのポートで起動される
MOTO_PORT = 9876
# boto3 から接続するエンドポイント
# 本来の AWS ではなくローカルの moto を指す
MOTO_ENDPOINT = f"http://localhost:{MOTO_PORT}"
# 使用するリージョン
# バケット作成時の設定にも使用される
AWS_REGION = "ap-northeast-1"
# S3の内容をローカルに保存するディレクトリ
# exporter がこの配下にファイルを書き出す
EXPORT_DIR = "./moto_export"
# ポーリング間隔(秒)
# exporter が S3 の変更をチェックする周期
EXPORT_INTERVAL = 15
# 削除同期を有効にするかどうか
# True にすると、S3 から削除されたファイルをローカルでも削除する
DELETE_SYNC = False
# 必須バケット一覧
# exporter 起動時に存在しなければ自動作成される
REQUIRED_BUCKETS = ["test-bucket", "report-bucket"]
+ # SQS
+ # 監視するキューのリスト
+ REQUIRED_QUEUES = ["sample-queue", "sample-dlq"]
+ # ログを保存するディレクトリ
+ SQS_LOG_DIR = "./moto_logs"
+ # SQS 監視の間隔(秒)
+ SQS_MONITOR_INTERVAL = 5
6. runner.py へ追加
S3のexporterと一緒に、SQSモニタも一緒に起動します。
合わせて、module 実行の形へ変更します。
import signal
import socket
import subprocess
import sys
import time
- import config
+ from . import config
# 起動した子プロセスを管理するリスト
# (name, process) のタプルで保持する
processes = []
def wait_for_port(host: str, port: int, timeout: int = 10) -> None:
"""
指定したホスト・ポートが接続可能になるまで待機する
moto_server の起動完了待ちに使用
"""
start = time.time()
while True:
try:
# ソケット接続できれば起動済みと判断
with socket.create_connection((host, port), timeout=1):
return
except OSError:
# タイムアウトを超えたらエラー
if time.time() - start > timeout:
raise RuntimeError(f"Port {host}:{port} did not open in time")
# 少し待って再試行
time.sleep(0.5)
def stop_process(name: str, proc: subprocess.Popen) -> None:
"""
子プロセスを安全に停止する
terminate → wait → kill の順で確実に終了させる
"""
# すでに終了している場合は何もしない
if proc.poll() is not None:
return
print(f"Stopping {name}...")
# まずは穏やかに終了を要求
proc.terminate()
try:
# 一定時間待つ
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
# 停止しない場合は強制終了
print(f"{name} did not stop with terminate(); killing...")
proc.kill()
proc.wait(timeout=5)
def stop_all():
"""
すべての子プロセスを停止する
reversed にすることで、起動した順の逆で終了させる
(依存関係を意識)
"""
for name, proc in reversed(processes):
try:
stop_process(name, proc)
except Exception as e:
print(f"[warn] failed to stop {name}: {e}")
def handle_signal(signum, frame):
"""
Ctrl+C(SIGINT)や SIGTERM を受けた時の処理
すべての子プロセスを停止して終了する
"""
print(f"Received signal {signum}, shutting down...")
stop_all()
sys.exit(0)
def main():
# Ctrl+C や kill コマンドに対応
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
# --- moto server 起動 ---
moto_proc = subprocess.Popen(
["moto_server", "-p", str(config.MOTO_PORT)],
)
processes.append(("moto", moto_proc))
# moto server の起動完了を待つ(ポートが開くまで)
wait_for_port("localhost", config.MOTO_PORT, timeout=10)
# --- exporter 起動 ---
- export_proc = subprocess.Popen(
- ["python", "moto_server/exporter.py"],
- )
+ export_proc = subprocess.Popen(
+ [
+ sys.executable,
+ "-m",
+ "moto_server.exporter",
+ ]
+ )
processes.append(("exporter", export_proc))
+ # --- SQSモニタ起動 ---
+ sqs_monitor_proc = subprocess.Popen(
+ [
+ sys.executable,
+ "-m",
+ "moto_server.sqs_monitor",
+ ]
+ )
+ processes.append(("sqs-monitor", sqs_monitor_proc))
- print("moto + exporter started. Press Ctrl+C to stop.")
+ # 全プロセス起動完了message
+ print("moto + exporter + sqs-monitor started. Press Ctrl+C to stop.")
try:
# 親プロセスは常駐して監視する
while True:
# 子プロセスが落ちていないか軽く監視
for name, proc in processes:
code = proc.poll()
if code is not None:
# 子プロセスが異常終了した場合は例外を投げる
raise RuntimeError(f"{name} exited unexpectedly with code {code}")
# CPU負荷を下げるため少し待つ
time.sleep(1)
finally:
# 例外・終了時に必ず全プロセスを停止
stop_all()
if __name__ == "__main__":
main()
6. sqs_monitor.py の新規作成
sqs_monitor.py は以下を担当します。
- 必須キュー作成
- キュー状態取得
- JSONログ出力
- 定期監視
import json
import signal
import time
from datetime import UTC, datetime
from pathlib import Path
import boto3
from botocore.exceptions import BotoCoreError, ClientError
from . import config
# -----------------------------
# config.py から設定値を読み込む
# -----------------------------
# moto server のエンドポイント
# boto3 が AWS 本番ではなくローカル moto に接続するために使用する
MOTO_ENDPOINT = config.MOTO_ENDPOINT
# AWS リージョン
AWS_REGION = config.AWS_REGION
# 起動時に存在保証したいキュー一覧
# 存在しない場合は monitor 側で自動作成する
REQUIRED_QUEUES = config.REQUIRED_QUEUES
# ログ出力先ディレクトリ
LOG_DIR = Path(config.SQS_LOG_DIR)
# SQS監視ログファイル
LOG_PATH = LOG_DIR / "sqs_monitor.log"
# SQS状態をチェックする間隔(秒)
INTERVAL = config.SQS_MONITOR_INTERVAL
# メインループ継続フラグ
# Ctrl+C や SIGTERM を受けたら False にして安全停止する
running = True
def handle_signal(signum, frame):
"""
Ctrl+C(SIGINT)や SIGTERM を受けた時に呼ばれる
running を False にして
メインループを安全に終了させる
"""
global running
running = False
# シグナル受信時の処理を登録
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
def create_sqs_client():
"""
moto server に接続する SQS クライアントを作成する
endpoint_url を指定することで、
boto3 が本物の AWS ではなく moto に接続する
"""
return boto3.client(
"sqs",
region_name=AWS_REGION,
aws_access_key_id="dummy",
aws_secret_access_key="dummy",
endpoint_url=MOTO_ENDPOINT,
)
def write_log(event: dict) -> None:
"""
JSON Lines 形式でログを書き込む
1行ごとに JSON を出力することで、
後から grep や jq などで解析しやすくしている
"""
# ログディレクトリがなければ作成
LOG_DIR.mkdir(parents=True, exist_ok=True)
# ログレコード作成
# logged_at を付与して記録時刻を残す
record = {
"logged_at": datetime.now(UTC).isoformat(),
**event,
}
# append モードで追記
with LOG_PATH.open("a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
def ensure_required_queues(sqs) -> dict[str, str]:
"""
必須キューを存在保証する
REQUIRED_QUEUES に指定されたキューを作成し、
QueueName -> QueueUrl の dict を返す
create_queue は既存キューに対しても安全に呼べるため、
毎回実行しても問題ない
"""
queue_urls = {}
for queue_name in REQUIRED_QUEUES:
# キュー作成
response = sqs.create_queue(
QueueName=queue_name,
)
# QueueUrl を取得
queue_url = response["QueueUrl"]
queue_urls[queue_name] = queue_url
# ログ出力
write_log({
"event": "ensure_queue",
"queue_name": queue_name,
"queue_url": queue_url,
})
print(f"[created queue] {queue_name}", flush=True)
return queue_urls
def get_queue_attributes(sqs, queue_url: str) -> dict:
"""
SQSキューの状態を取得する
Approximate 系は SQS が持つ概算値で、
現在のキュー状態を把握するために使用する
"""
response = sqs.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=[
# キューに見えているメッセージ数
"ApproximateNumberOfMessages",
# receive_message 後で
# VisibilityTimeout 中のメッセージ数
"ApproximateNumberOfMessagesNotVisible",
# DelaySeconds により遅延中のメッセージ数
"ApproximateNumberOfMessagesDelayed",
],
)
return response.get("Attributes", {})
def monitor_once(sqs, queue_urls: dict[str, str]) -> None:
"""
1回分の監視処理を実行する
各キューの状態を取得し、
ログファイルおよび標準出力へ出力する
"""
for queue_name, queue_url in queue_urls.items():
# 現在のキュー状態を取得
attrs = get_queue_attributes(sqs, queue_url)
# JSONログへ出力
write_log({
"event": "queue_status",
"queue_name": queue_name,
"queue_url": queue_url,
"attributes": attrs,
})
# ターミナルにも表示
print(
f"[sqs] {queue_name}: {attrs}",
flush=True,
)
def main():
"""
SQS monitor のメイン処理
1. SQSクライアント作成
2. 必須キュー作成
3. 一定間隔で状態監視
"""
print("SQS monitor started.", flush=True)
# SQSクライアント作成
sqs = create_sqs_client()
# 必須キューを作成
queue_urls = ensure_required_queues(sqs)
# メイン監視ループ
while running:
try:
# 1回分の監視
monitor_once(sqs, queue_urls)
except (ClientError, BotoCoreError, OSError) as e:
# 想定される boto3 / I/O 系エラー
write_log({
"event": "warn",
"message": str(e),
})
print(
f"[warn] sqs monitor failed: {e}",
flush=True,
)
except Exception as e:
# 想定外エラー
write_log({
"event": "unexpected_error",
"message": str(e),
})
print(
f"[warn] unexpected error: {e}",
flush=True,
)
# 1秒ずつ待つことで
# Ctrl+C に素早く反応できるようにしている
for _ in range(INTERVAL):
if not running:
break
time.sleep(1)
print("SQS monitor stopped cleanly.", flush=True)
if __name__ == "__main__":
main()
sqs_monitor.py の全体像
① SQS client 作成
② 必須キュー作成
③ キュー状態取得
④ JSONログ出力
⑤ 一定間隔で繰り返し
①SQS client 作成
boto3 client を作成します。
def create_sqs_client():
return boto3.client(
"sqs",
region_name=AWS_REGION,
aws_access_key_id="dummy",
aws_secret_access_key="dummy",
endpoint_url=MOTO_ENDPOINT,
)
endpoint_url=MOTO_ENDPOINT を指定することで、ローカル moto server へ向けています。
②必須queue 作成
今回の monitor では、 ensure_required_queues() を利用して、
起動時にキューを自動作成しローカル開発で毎回 queue 作成しなくて良い形になっています。
SQS の create_queue() は便利で、
- 既存ならそのまま返す
- なければ作成する
という動きをしてくれます。つまり毎回呼んでも安全です。
③ キュー状態取得
今回取得している監視する項目は以下です。
| 項目 | 内容 |
|---|---|
| ApproximateNumberOfMessages | キューに見えている件数 |
| ApproximateNumberOfMessagesNotVisible | 処理中件数 |
| ApproximateNumberOfMessagesDelayed | Delay 中件数 |
最初はreceive_message() で中身を見ようとしましたが
これは実際に「受信」扱いとなり
- VisibilityTimeout が始まる
- アプリ側処理へ影響する
ため、今回は採用しませんでした。
④ JSONログ出力
取得した内容は write_log() でJSON Lines 形式で出力しています。
通常のテキストログではなく、
{"event":"queue_status", ...}
のように1行ずつ JSON を出すことで
- grep しやすい
- jq で解析できる
- CloudWatch Logs に近い
- 後から機械処理しやすい
ためです。
{
"logged_at": "2026-05-31T07:57:09.519223+00:00",
"event": "queue_status",
"queue_name": "sample-queue",
"queue_url": "http://localhost:9876/123456789012/sample-queue",
"attributes": {
"ApproximateNumberOfMessages": "10",
"ApproximateNumberOfMessagesNotVisible": "0",
"ApproximateNumberOfMessagesDelayed": "0"
}
}
⑤ 一定間隔で繰り返し
監視は以下で定期実行しています。
while running:
なぜ1秒単位で sleep しているのか
単純に:
time.sleep(INTERVAL)
でも動きます。
ただ今回は、 Ctrl+C に素早く反応したい ため、
1秒単位で待機しています。
for _ in range(INTERVAL):
if not running:
break
time.sleep(1)
signal.signal(SIGINT, ...) を利用して、
- Ctrl+C
- terminate
時に安全停止しています。
7. runner.py との連携
最終的には runner.py から起動しています。
sqs_monitor_proc = subprocess.Popen(
[
sys.executable,
"-m",
"moto_server.sqs_monitor",
]
)
これにより、
- moto server
- exporter
- sqs_monitor
をまとめて起動できます。
8. サンプルメッセージ作成
下記サンプル作成スクリプトを利用してキューにサンプルメッセージを投入します。
from datetime import UTC, datetime
import json
import boto3
from .. import config
# moto server 接続先
MOTO_ENDPOINT = config.MOTO_ENDPOINT
# AWS リージョン
AWS_REGION = config.AWS_REGION
# サンプル投入先キュー
QUEUE_NAME = "sample-queue"
# 送信件数
MESSAGE_COUNT = 10
def create_sqs_client():
"""
moto server に接続する SQS クライアントを作成する
"""
return boto3.client(
"sqs",
region_name=AWS_REGION,
aws_access_key_id="dummy",
aws_secret_access_key="dummy",
endpoint_url=MOTO_ENDPOINT,
)
def get_queue_url(sqs) -> str:
"""
QueueName から QueueUrl を取得する
"""
response = sqs.get_queue_url(
QueueName=QUEUE_NAME,
)
return response["QueueUrl"]
def build_message(index: int) -> dict:
"""
サンプルメッセージを作成する
index を入れることで、
どのメッセージか識別しやすくする
"""
return {
"event_type": "sample_event",
"message_no": index,
"message": f"hello moto sqs {index}",
"created_at": datetime.now(UTC).isoformat(),
}
def send_messages(sqs, queue_url: str) -> None:
"""
複数メッセージを SQS へ投入する
"""
for i in range(1, MESSAGE_COUNT + 1):
# メッセージ生成
message = build_message(i)
# SQSへ送信
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(
message,
ensure_ascii=False,
),
)
print(
f"[sent] "
f"queue={QUEUE_NAME} "
f"message_no={i} "
f"message_id={response['MessageId']}"
)
def main():
"""
動作確認用の複数メッセージを SQS へ投入する
"""
print(
f"Sending {MESSAGE_COUNT} messages "
f"to queue={QUEUE_NAME}"
)
# SQS client 作成
sqs = create_sqs_client()
# QueueUrl 取得
queue_url = get_queue_url(sqs)
# 複数メッセージ送信
send_messages(sqs, queue_url)
print("Send completed.")
if __name__ == "__main__":
main()
こちらを実行すると、sqs_monitor.logにてメッセージが追加されたことが確認出来ます。
├── moto_logs #設定した出力先
│ └── sqs_monitor.log
9. SQS Worker を追加して動作確認
ここまでで、
- SQS へメッセージ投入
- キュー状態監視
まではできるようになったので
簡易的な SQS Worker を追加して動作確認をしてみます。
Workerの処理の流れ
今回の Worker は以下のような処理をします。
SQS
↓
receive_message()
↓
Worker
↓
処理
↓
delete_message()
ポイント
- ロングポーリング
- 複数メッセージ取得
- ThreadPoolExecutor による並列処理
- 処理成功後のみ delete_message
import json
import random
import signal
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import boto3
from botocore.exceptions import BotoCoreError, ClientError
from . import config
MOTO_ENDPOINT = config.MOTO_ENDPOINT
AWS_REGION = config.AWS_REGION
QUEUE_NAME = "sample-queue"
# 並列ワーカー数
MAX_WORKERS = 5
running = True
def handle_signal(signum, frame):
"""
Ctrl+C や terminate を受けたら、
次のループで安全に停止する
"""
global running
running = False
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
def create_sqs_client():
"""
ローカルの moto server に接続する SQS クライアントを作成する
本番AWSで動かす場合は endpoint_url を外す
"""
return boto3.client(
"sqs",
region_name=AWS_REGION,
aws_access_key_id="dummy",
aws_secret_access_key="dummy",
endpoint_url=MOTO_ENDPOINT,
)
def get_queue_url(sqs) -> str:
response = sqs.get_queue_url(QueueName=QUEUE_NAME)
return response["QueueUrl"]
def process_message(message: dict) -> None:
"""
ここに実際の業務処理を書く
今回はメッセージをコンソールに表示するだけ
"""
body = message.get("Body", "")
try:
parsed_body = json.loads(body)
except json.JSONDecodeError:
parsed_body = body
message_id = message.get("MessageId")
# 処理に時間がかかることを想定して、あえてランダムに1〜3秒待つ
sleep_seconds = random.randint(1, 3)
print(
f"[process] message_id={message_id} sleeping {sleep_seconds} seconds...",
flush=True,
)
time.sleep(sleep_seconds)
log_message = (
"\n"
"===== received message =====\n"
f"MessageId: {message_id}\n"
f"Body: {parsed_body}\n"
"============================\n"
)
print(log_message, flush=True)
def handle_message(message: dict, queue_url: str) -> None:
"""
1件分のメッセージ処理を行う
処理に成功した場合のみ delete_message する。
失敗した場合は削除しないため、VisibilityTimeout 後に再取得される。
"""
sqs = create_sqs_client()
message_id = message.get("MessageId")
try:
process_message(message)
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message["ReceiptHandle"],
)
print(
f"[deleted] message_id={message_id}",
flush=True,
)
except Exception as e:
print(
f"[error] failed to process message_id={message_id}: {e}",
flush=True,
)
def poll_messages(sqs, queue_url: str, executor: ThreadPoolExecutor) -> None:
"""
SQSをロングポーリングしてメッセージを取得し、
取得したメッセージを複数ワーカーで並列処理する
"""
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20,
VisibilityTimeout=30,
)
messages = response.get("Messages", [])
if not messages:
print("[poll] no messages", flush=True)
return
print(f"[poll] received {len(messages)} messages", flush=True)
futures = [
executor.submit(handle_message, message, queue_url)
for message in messages
]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
print(f"[error] worker failed: {e}", flush=True)
def main():
print(
f"SQS worker started. max_workers={MAX_WORKERS}",
flush=True,
)
sqs = create_sqs_client()
queue_url = get_queue_url(sqs)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
while running:
try:
poll_messages(sqs, queue_url, executor)
except (ClientError, BotoCoreError) as e:
print(f"[warn] SQS error: {e}", flush=True)
time.sleep(5)
except Exception as e:
print(f"[warn] unexpected error: {e}", flush=True)
time.sleep(5)
print("SQS worker stopped cleanly.", flush=True)
if __name__ == "__main__":
main()
ロングポーリング、複数メッセージ取得
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20,
)
SQS ではWaitTimeSeconds=20を指定しています。
これにより、1回のポーリングで最大20秒待機し
- メッセージがない時に高速ループしない
- CPU負荷を下げる
- 無駄なポーリングを減らす
ことができます。
またMaxNumberOfMessages=10を設定しているため、最大10個を一括で取得できます。
並列処理
取得したメッセージは ThreadPoolExecutor で並列処理しています。
executor.submit(
handle_message,
message,
queue_url,
)
今回の処理は
- API通信
- SQS待ち
- sleep
などI/O 待ち中心を想定し、並列処理を入れてます。
delete_message のタイミング
SQS では非常に重要です。
process_message() が成功後のみ
delete_message()しています。
もし処理失敗時にも delete すると、
メッセージが失われます。
逆に delete しなければ、
VisibilityTimeout 後に再取得
されてしまいます。
今回のサンプル処理用のスリープ
今回は動作確認のため、time.sleep(random.randint(1, 3))を入れています。
これにより、
- 処理時間ばらつき
- 並列動作
- キュー滞留
を確認しやすくしています。
テスト実行
まずは、runnerを起動
python -m moto_server.runner
次にメッセージ投入
python -m moto_server.samples.send_sample_sqs_message
ここでlogを確認すると投入したメッセージが溜まっているのが分かります。
├── moto_logs #設定した出力先
│ └── sqs_monitor.log
最後にWorker 起動:
python -m moto_server.sqs_worker
実行結果
[poll] received 10 messages
[process] message_id=efa0dd76-4c89-4310-9342-30665a241a86 sleeping 2 seconds...
[process] message_id=64ed8308-d690-4676-a9e8-7d41bba1d203 sleeping 2 seconds...
[process] message_id=5d88367e-342b-4104-95a3-2292c6d955c6 sleeping 2 seconds...
[process] message_id=70c02e1c-223c-485a-acc3-266b6494b2c9 sleeping 1 seconds...
[process] message_id=e86a2691-2eba-4b8d-b6b4-674eef58db39 sleeping 1 seconds...
===== received message =====
MessageId: 70c02e1c-223c-485a-acc3-266b6494b2c9
Body: {'event_type': 'sample_event', 'message_no': 4, 'message': 'hello moto sqs 4', 'created_at': '2026-05-31T12:26:01.272195+00:00'}
============================
===== received message =====
MessageId: e86a2691-2eba-4b8d-b6b4-674eef58db39
Body: {'event_type': 'sample_event', 'message_no': 5, 'message': 'hello moto sqs 5', 'created_at': '2026-05-31T12:26:01.275648+00:00'}
============================
[deleted] message_id=70c02e1c-223c-485a-acc3-266b6494b2c9
[process] message_id=86e39bc8-64dc-4594-a305-1afdf97edb95 sleeping 2 seconds...
[deleted] message_id=e86a2691-2eba-4b8d-b6b4-674eef58db39
[process] message_id=a4ffc1c4-d1a6-40a5-a434-79f0a235754c sleeping 2 seconds...
===== received message =====
MessageId: 5d88367e-342b-4104-95a3-2292c6d955c6
Body: {'event_type': 'sample_event', 'message_no': 2, 'message': 'hello moto sqs 2', 'created_at': '2026-05-31T12:26:01.264599+00:00'}
============================
===== received message =====
MessageId: efa0dd76-4c89-4310-9342-30665a241a86
Body: {'event_type': 'sample_event', 'message_no': 1, 'message': 'hello moto sqs 1', 'created_at': '2026-05-31T12:26:01.259607+00:00'}
============================
並列で処理されていることが確認できます。
今回追加した Worker は非常にシンプルですが、
- ロングポーリング
- 並列処理
- delete_message の考え方
など、SQS Worker の基本構成を含んでいます。
おわりに
SQS は内部状態が見えづらく、ローカル検証しにくいサービスですが、
Moto + sqs_monitor を組み合わせることで、
簡易的に状態を見ながら開発できる環境を作ることができました。