8
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

分散処理の設計見直し:Redis StreamsからSNS/SQSへの移行記録

8
Last updated at Posted at 2025-12-19

この記事は博報堂テクノロジーズアドベントカレンダー シリーズ2 20日目の記事です。

はじめに

はじめまして。博報堂テクノロジーズ 開発第3センター AdOps開発一部の河窪です。新卒2年目です。プロダクトチームに配属されており、主にフロントエンド・バックエンドエンジニアとして開発・運用に携わっています。ある日、多忙なインフラ担当の先輩から「プロダクトのインフラも触ってみないか」と提案を受けたため、挑戦することにしました。

本記事では、私がインフラを触るきっかけとなったチーム内の大きなタスク「Amazon Simple Notification Service/Amazon Simple Queue Service(SNS/SQS)移行」についてまとめました。

背景

チームのプロダクトはAWSサービスを用いたインフラ環境で構築されています。プロダクトには処理の仕組みが2種類あり、実現したい内容に応じて使い分けられています。

1つ目は「ユーザーが画面で操作した内容をAPI経由で処理する仕組み」です。ユーザーが特定の画面操作を行った際に発生する処理で、イメージしやすいと思います。

2つ目は「スケジュールされた時間にAmazon EventBridgeが発火し、非同期で分散処理する仕組み」です。例えば毎日0時に実行したい処理がある場合、EventBridgeでそのスケジュールを指定すれば、自動的に処理を実行できます。また分散処理は、処理負荷に応じてワーカーを自動的に増減させるオートスケーリングによって実現しています。

今回移行するのは2つ目の処理のインフラ部分です。移行前はRedis Streams(ElastiCache)をベースに実装していました。簡単に説明すると、Redis Streamsはデータをメモリ上に保存する高速なデータストアであるRedisが持つ機能の1つで、メッセージを時系列順に蓄積し、複数のアプリケーション間で受け渡すことができます。Amazon ElastiCache for Redisを利用することで、サーバー構築や運用管理なしにRedis Streamsをマネージドで扱うことができます。処理の流れに関する詳細は後述するSNS/SQSを用いた分散処理の流れに記載しています。


なぜ移行したか。そのきっかけはいくつかの課題があったからです。

課題

メッセージ状況の把握が困難

ElastiCacheの標準メトリクス(AWSリソースのログやメトリクスを収集・監視するAmazon CloudWatchが出力)はCPUやメモリが中心であり、メッセージの状況をうまく把握できませんでした。そのため、滞留の原因がメッセージ数なのか処理負荷なのかを特定しづらい状況でした。Redis Streamsのコマンドを用いて独自にメトリクスを作成・運用すれば対応可能ですが、どうしても労力がかかってしまいます。

オートスケーリングの信頼性不足

これまでは独自に作成したオートスケーリングの仕組みを利用していました。しかしスケジュールされた時間に多くの処理を一気に処理する仕組みに、独自のオートスケーリングの実装が沿っていない状況でした。そのため本当にうまくスケールできているのかという信頼性の低さがありました。

アイドル時間のコスト負担

ElastiCacheは稼働しているだけで課金される料金体系です。しかし今回のケースではスケジュールされた時間帯にしか処理を行わず、アイドル時間も長いという点があるため、ElastiCacheを逐次削除するというような強引な運用を行わない限りは無駄なコストがかかってしまいます。


ではこれらの課題をどうしたら改善できるかを考えた結果、タイトルにもあるとおりSNS/SQSへの移行が良いのではないかという結論にたどり着きました。

なぜSNSを選んだか

SNSはフルマネージドのメッセージングサービスです。メッセージとは、システム間を連携する通知やデータのことです。

なぜ今回、SNSを導入したのか。その理由は以下の3点です。

責務の分離

1点目は、Producerが各Consumerの詳細を意識する必要がなくなるからです。

例えば、メッセージXをConsumer AとConsumer Bがそれぞれ異なる処理で扱いたい場合、SQSだけではProducerがAとBそれぞれにメッセージを送る必要があります。しかし、SNSを使えばProducerは1回メッセージを発行するだけで、Subscriptionの仕組みによって自動的に各Consumerに配信されます。また、新たにConsumer Cを追加する場合や、Consumer Bを廃止する場合でも、Producer側の変更は必要ありません。これにより「スケジュールされた時間にEventBridgeが発火し、非同期で分散処理する仕組み」の非同期の部分も満たせます。

他プロダクトとの連携

2点目は、事前にSNS-SQSのフレームワークとして構築しておくことで、他プロダクトとの連携方法を変更する際の負担を減らせるからです。これは今回のケースに固有の理由です。

現在運用しているプロダクトはいくつかの別プロダクトと連携しており、それらのプロダクトに配置されているSNSからデータを受け取っています。

現状の構成では、そのSNSからプロダクト内のLambdaが受け取り、Step Functions経由でタスクを実行しています。将来的にはこの構成を見直し、SNSからSQSが受け取る形に変更してSNS-SQSのフレームワークに統一したいと考えています。そのため、今回の移行でSNSとSQSの組み合わせで実装しておくことで、将来の修正を軽微に抑えられると判断しました。

コスト削減

3点目は、課題でも述べた「アイドル時間のコスト負担」を減らせるからです。

ElastiCacheは常時課金のため、アイドル時間が長い今回のケースでは無駄が生じていました。一方、SNSはメッセージ処理に応じた課金であり、スケジュールされた時間にのみ処理を行う今回の仕組みでは低コストで実現できるという試算になりました。

ただし実装過程で、当初の試算より小幅にコストが増加する見込みとなりました。これは重複対策の実装コストを下げるためにFIFOトピックを採用したことや、セキュリティ対策としてサーバー側暗号化(AWS側でメッセージを自動的に暗号化・復号する機能)やPrivateLinkを設定したことによるものです。いずれも必要な設定であり、これらを加味しても移行前よりコスト削減できるため、問題なしと判断しました。


詳細な設定は本稿末尾の付録(SNS/SQSの詳細設定)に記載しています。

なぜSQSを選んだか

SQSはフルマネージドのメッセージキューイングサービスです。メッセージキューイングとは、メッセージを使ってサービスやシステムを連携する仕組みで、サービス間の橋渡しを担います。メッセージはSQS内のキューに保管され、受信側は任意のタイミングでポーリング(一定間隔でデータ取得を要求する問い合わせ)してメッセージを取得します。

なぜSQSを導入したのか。その理由は背景の内容と一部重複しますが以下の3点です。

責務の分離

1点目は、SQSをProducerとConsumerの間に挟むことで、両者の処理タイミングを分離できるからです。

Producerはメッセージをキューに送信した時点で役割を終え、Consumerの処理完了を待つ必要がありません。一方、Consumerは自身の処理能力に応じたペースでメッセージを取得できます。また、一時的にメッセージ数が増加した場合もキューがバッファとして機能するため、Consumerが処理しきれずにメッセージを取りこぼすリスクを軽減できます。

これにより「スケジュールされた時間にEventBridgeが発火し、非同期で分散処理する仕組み」の非同期の部分も満たせます。

標準メトリクスのわかりやすさ

2点目は、標準メトリクスでメッセージの状況を直接把握できるため、課題でも述べた「メッセージ状況の把握が困難」や「オートスケーリングの信頼性不足」を解消できるからです。

可観測性の面では、チームが調査する際に「メッセージ数」で状況を把握できるためわかりやすく、保守運用コストを抑えられると考えました。またオートスケーリングの面では、メッセージの滞留状況を軸にしたシンプルなオートスケールを実装でき、スケールの根拠を明確にできると考えました。

コスト削減

3点目は、SQSもメッセージ処理に応じた課金であるため、SNSと同様に課題の「アイドル時間のコスト負担」を減らせるからです。詳細はなぜSNSを選んだかのコスト削減セクションで述べたとおりです。

SQSでも、FIFOキューの採用やセキュリティ対策としてのPrivateLink設定により若干のコスト増がありますが、SNS側で述べたとおり移行前よりコスト削減できることに変わりはないため問題なしと判断しました。

今回のケースはスケジュール実行を前提としているため、アイドル時のAPIコールは抑制しても問題ないと判断しました。そこで、ロングポーリングは上限の20秒に設定し、ポーリング頻度を下げてコストを抑えることにしました。ロングポーリングとは、キューにメッセージがない場合に指定した時間だけ待機してからレスポンスを返す仕組みで、これにより無駄なAPI呼び出しを抑制できます。


詳細な設定は本稿末尾の付録(SNS/SQSの詳細設定)に記載しています。

SNS/SQSを用いた分散処理の流れ

では、「スケジュールされた時間にEventBridgeが発火し、非同期で分散処理する仕組み」はどのような流れなのか、それは下記のとおりです。

本番で利用するSNSSQS.png

  1. スケジュールされた時間にEventBridgeがStep Functionsを起動
  2. Step FunctionsがECS Task Producerを実行
  3. ECS Task Producerがリスト内のアイテムを1件ずつSNSにメッセージとして送信
  4. SNSがSQSにメッセージを転送
  5. ECS Service配下のECS Task ConsumerがSQSをポーリングしてメッセージを処理
  6. Application Auto ScalingがSQSのメッセージ数に基づいてECS Task Consumerのタスク数をスケーリング

※ Step Functionsは複数のAWSサービスを組み合わせてワークフローを構築・実行できるサービスです。
※ ECS Taskは処理を実行する単位です。Producer/Consumerは役割を示すための呼び名で、Producerがメッセージを生成し、Consumerがメッセージを消費します。
※ Application Auto ScalingはECSなどのリソースを自動でスケーリングするAWSサービスです。

移行前はSNS/SQS部分がRedis Streams(ElastiCache)であり、Application Auto ScalingもRedis Streams(ElastiCache)からの値を参照していました。

シンプルなオートスケール戦略

今回の移行ではシンプルな仕組みでオートスケールしたいという動機がありました。そのためSQSの標準メトリクスで得られるメッセージの状況を用いて予測処理時間を算出し、その値に基づいてスケーリングを行う仕組みにしました。下の式のとおりです。

予測処理時間(分)=(待機メッセージ数 + 処理中メッセージ数)÷ 1分間に削除されたメッセージ数

この式は「処理が必要なメッセージをすべて処理しきるまでに何分かかるか」を表しています。この値に基づき、以下の3つのスケーリングポリシー(オートスケールのルール)でスケーリングを行います。

  • [START] キュー空 → メッセージ到着した場合
    • 条件: 待機メッセージ数 > 0 かつ 処理中メッセージ数 == 0
    • 動作: タスクを1つ追加
  • [SCALE-OUT] 処理が追いつかない場合
    • 条件: 予測処理時間 ≥ 10分(しきい値の10分はタスクに応じて指定することも可能)
    • 動作: 予測処理時間に応じて50%〜600%増加(最低1台は増加)
  • [SCALE-IN] 処理に余裕がある場合
    • 条件: 予測処理時間 < 10分 の状態が10分以上継続
    • 動作: 50%減少

理解しやすいシンプルな仕組みになったと思います。ちなみにゼロ除算の場合(1分間メッセージが削除されなかった場合)は、明示的に分岐処理を設けています。スケールアウト時は処理をスキップし、スケールイン時はキューが空であればタスクを減少させます。

落とし穴とその対策

SNS/SQS移行作業中、検討しきれていなかったいくつかの課題に直面しました。

SNSのサーバー側暗号化

1つ目はセキュリティの観点で、SQSはサーバー側暗号化がデフォルトで設定されているが、SNSはデフォルトでは設定されていない点です。公式がSNSのサーバー側暗号化を推奨していたため、今回はAWS Key Management Service(KMS: 暗号化キーの作成・管理を行うAWSのマネージドサービス)を用いて設定しました。

ここで実際にTerraformで実装した結果、はまったことがありました。SNSからSQSへメッセージを送る部分は特に変更不要でしたが、ECS Task ProducerからSNSへメッセージを送る際には、ECS Task Producer側にKMSの暗号化アクション権限を付与する必要があったのです。KMSを利用する場合は気をつけてください。

SNSからSQSへの送信失敗時のメッセージ特定

2つ目はSNSからSQSへのメッセージ送信が失敗した場合、SNSはエラーを出力するため失敗自体は検知できるが、どのメッセージで失敗したかは特定できないことです。滅多に起きないと思われますが、SNSの送信先SQSの設定ミスや、何らかの障害でSQSに送信できなくなるケースが考えられます。そのため、CloudWatchやDatadog(AWSサービスではなくログの可視化や監視を行うSaaSサービス)を組み合わせてメッセージを特定する仕組みを用意しました。

実際に課題は改善できたのか

現在は1つずつタスクを置き換えている最中です。リスクが小さいタスクや、失敗しても許容できるタスクから移行を進めています。

では、課題は実際に改善できたのでしょうか?

「メッセージ状況の把握が困難」だったのは…

標準メトリクスでメッセージの状況を把握できるのは非常にわかりやすいです。標準メトリクスの情報をCloudWatch経由でDatadogに渡し、ダッシュボードで可視化することで、処理の成否を簡単に確認できるようになりました。
また、各タスクが何件程度のメッセージを処理しているかについても、これまでは感覚や概算に頼っていましたが、具体的な数値を簡単に取得できるようになりました。今後の議論にも活かせそうです。

「オートスケーリングの信頼性不足」だったのは…

メッセージ数基準のオートスケールで、現在のところ問題は起きていないので信頼性が低いということはなさそうです。スケールの理由が明確なため、チーム内での説明コストも下がりました。今回のスケーリングポリシーが最適かどうかはまだ検証できていないため、今後も継続的に見直していきたいと考えています。

「アイドル時間のコスト負担」だったのは…

まだ移行中であり、全タスクが同じElastiCacheで運用されていた関係でElastiCacheを削除できていないためまだ確認できていません。むしろ現時点ではトータルのインフラコストは移行前より少し増加しています(SNS/SQSでのメッセージ処理分が上乗せされている状態)。ただし、当初の想定どおり許容範囲内に収まっています。すべての移行が完了したとき、チーム全員で喜びながらElastiCacheを削除できることを祈っているところです。

より良くするために

すでに移行のメリットは得られており、今後もさらなるメリットを享受できると考えています。一方で、まだ改善の余地があるとも感じており、特に以下の2点について検討中です。

デッドレターキューの運用

1つ目はデッドレターキュー(DLQ)の運用方法です。DLQとは、正常に処理できなかったメッセージを退避させる専用のキューです。現在はDLQを、繰り返しエラーが発生するメッセージを退避させる仕組みとしてのみ運用しています。せっかく退避させているのであれば、後続の処理につなげられるのではないでしょうか。

例えば、一時的なエラーかどうかを識別して再実行する機構を作るとか、検証環境でDLQにメッセージが送られたらエラーなどを元に自動で修正を加える仕組みなんかも検討してみたいですね。

本番環境のタスク最小台数

2つ目は本番環境のタスクの最小台数を0にするか1にするかという点です。移行時はパラメータを極力変更しない方針を取っていました。移行前のオートスケールは信頼性に課題があったため、最小台数を1に設定しており、現在もその設定を維持しています。つまり、スケールイン時にタスクの起動台数が0台になることはありません。

移行後のオートスケールの安定性が確認できれば、最小台数を0に変更することも検討できます。ただし、その場合はトレードオフがあります。

  • 最小台数0のメリット:アイドル時の無駄なAPIコールとECSコストを削減できる
  • 最小台数0のデメリット:0→1へのスケールアウト時にタスク起動で数分かかり、処理開始が遅れる可能性がある

チーム内でしっかり議論し、結論を出していきたいです。

まとめ

SNS/SQSへの移行により、現時点で移行済みのタスクはメッセージ状況の可観測性やオートスケーリングの面で一定のメリットを得られています。またすべての移行が完了しElastiCacheを削除できれば、コスト面でもメリットが得られる見込みです。

本移行ではチームのインフラ担当であった先輩の発案が基となっており、チーム内で議論を何度も重ねて進めてきました。私自身、この移行を通してプロダクトの開発にはインフラの仕組みが深く関わっていることを実感しました。インフラ担当の先輩には議論だけでなく、たくさんの疑問にも答えていただきました。その結果、AWSを含むインフラの基礎知識、Terraformの書き方、メリット・デメリットを検討することの大切さ、サービスの利用にはコストへの意識も必要であることなど、インフラに関する様々なことを学ぶことができ、エンジニアとして大きく成長できたと感じています。また、同時期にAWS SAA-C03の勉強もしていたため、移行タスクと結びついて理解が深まり、無事合格することができました。インフラ担当の先輩には感謝しています。

現在、チームではフロントエンド・バックエンドに加えてインフラも担当するフルスタックエンジニアとして、日々奮闘しています。これからも積極的に知識を深め、より価値の高いエンジニアとして成長しつつ、チームに貢献していきたいと思います!

付録(SNS/SQSの詳細設定)

SNS

トピック

メッセージの配信先を管理するエンドポイントであるトピックには標準トピックとFIFOトピックの2つのトピックタイプがあります。今回はFIFOトピックを選択することで5分間はメッセージの重複を排除するようにしました。

セキュリティ対策

トピック内のメッセージはKMSを用いて暗号化することでセキュリティリスクを軽減しました。

また、AWS PrivateLink(VPCエンドポイント)を用意することで、インターネットを介さないプライベート接続としてECS Task ProducerがSNSにアクセスできるため、セキュリティリスクが軽減しました。

SQS

キュー

メッセージの保管場所であるキューには標準キューとFIFOキューの2つのキュータイプがあります。今回はFIFOキューを選択することで5分間はメッセージの重複を排除するようにしました。また保持期間はタスクごとに調整可能としました。

ロングポーリング

ロングポーリングは20秒待機に設定してキューにメッセージがない場合の無駄なAPI呼び出しを抑制しました。

可視性タイムアウト

可視性タイムアウトとは、1度メッセージが取得されると他のECS Task Consumerが取得できなくなる時間のことです。メッセージは処理が完了してからキューから削除されるため、処理中にタイムアウト時間が過ぎると重複処理になりかねません。そのため可視性タイムアウトは「最大処理時間 × 1.1倍」としてタスクごとで調整できるようにしました。(最大処理時間は「計測された最大値」ではなく「その時間を超えたら異常な為、処理を一旦手放してタイムアウトエラーを通知したいレベルの閾値」であるため十分な時間が確保されています)。

デッドレターキュー

3回処理に失敗したメッセージはDLQに送信することで、問題のあるメッセージを分離しました。

セキュリティ対策

PrivateLink(VPCエンドポイント)を用意することで、インターネットを介さないプライベート接続としてECS Task ConsumerがSQSにアクセスできるため、セキュリティリスクが軽減しました。

8
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?