はじめに
本記事では、メッセージングサービスとストリーミングデータ処理サービスについて解説します。
Amazon SQS (Simple Queue Service)
メッセージを一時的に保存し、コンピュータ間でやり取りするためのマネージド型のメッセージキューイングサービス。
SQSキューの種類
- 標準キュー
高いスループットを提供するが、メッセージの順序と一意性は保証されない - FIFOキュー
メッセージの順序を保証し、少なくとも1回のメッセージ配信をサポート
SQS可視性タイムアウト
あるEC2がメッセージを取得してから処理している間、他のEC2がそのメッセージを見えなくする時間
設定方法としては、ChangeMessageVisibility APIを使用
SQS -> EC2 -> RDS -> EC2(別のインスタンス)
SQSと他サービスの連携
- EC2との連携
SQSキューにリクエストメッセージを保持して、EC2インスタンスにポーリング処理させることで、トランザクションを破棄せずに処理を継続できる - Auto Scalingとの連携
CloudWatchでSQSキューサイズを監視し、EC2インスタンスのAuto Scalingグループを設定することでスケーリングが可能 - Lambdaとの連携
Lambda関数がSQSキューメッセージを取得して処理する。デッドレターキューを追加することで、Lambda関数の実行に失敗したメッセージを別のSQSキューに蓄積できる
書き込み操作の信頼性向上
- 「保留中のデータベースへの書き込みリクエスト」をAmazon SQSキューに格納
- Lambda関数がSQSキューメッセージを取得して、書き込み処理を実行
- Lambda関数にデッドレターキューを追加して、失敗したメッセージを蓄積
Amazon SNS (Simple Notification Service)
プッシュ型のメッセージングサービスで、アプリケーション間やユーザーへの通知に使用される。
SNSトピックとフィルタリング
発注アプリケーション開発の例
- SNSトピックを1つ作成
- SQSキューをこのSNSトピックにサブスクライブ
- SNSトピックへのフィルター設定で、注文内容に応じて適切なSQSキューにメッセージを分割
- 各バックエンドサーバーは、対応するSQSキューからメッセージを受信
[注文メッセージ] → SNSトピック → フィルター → 適切なSQSキュー → バックエンドサーバー
SNS FIFO トピック
- 複数のサブスクライバーに厳密なメッセージの順序付けと重複を排除したメッセージ配信を提供する
- アプリケーションにおいて、メッセージの順序が保持され、重複するメッセージが送信されない仕組みが必要な場合に適している
CloudWatchとの連携
アプリケーション処理の異常を監視するためにCloudWatchと連携し、SNSを通じてSMS送信などの通知が可能
Amazon Kinesis Data Streams
ストリーミングデータをリアルタイムで収集、処理、分析するためのサービス。1秒以下のデータロードが達成できる性能があり、リアルタイム処理に適している。
ストリーミングデータとは
連続的に生成される大量のデータをリアルタイムで処理する方式
クリックストリームデータを取得してリアルタイムに処理するアプリケーションなどが該当する
シャード管理
- シャード
データ処理単位。適切なシャード数の設定が重要 - パフォーマンス調整
割り当てられたシャードが十分に活用されていない場合はコストが高くなるため、最適なシャード数の設定が必要
Amazon Kinesis Data Firehose
ストリーミングデータを変換したり、ストレージやDBに配信するサービス。データロードまで60秒必要なため、ミリ秒単位の処理には向かないが、データ変換・配信に適している。
配信先
- Amazon S3
- Amazon Redshift
- Amazon OpenSearch Service
- Splunk
- HTTPエンドポイント
ただし、DynamoDBには直接配信できないことに注意が必要。
Kinesisを使ったデータパイプラインの例
Amazon Kinesis Data Streams → Amazon Kinesis Data Firehose → Amazon S3データレイク → Amazon Redshift Spectrum
Amazon Managed Service for Apache Flink
以前はAmazon Kinesis Data Analyticsと呼ばれていたリアルタイムデータ分析用のサービス。Apache Flinkを利用してデータ処理をリアルタイムに実施し、ストリーミングデータをクエリ処理によって分割してから、S3バケットなどに保存できる。
工場データの処理例
工場のデバイスから送信されるストリームデータを取得して保存する場合
- リアルタイム性が不要(数分の遅延が許容される)なら、Kinesis Data Firehoseを使用
- 10日以内のデータは標準ストレージに、10日経過したデータはアーカイブに自動移行するライフサイクルポリシーを設定
複雑なクエリ処理が必要な場合
Kinesis Data Stream → Firehose → S3の構成だけでは、リアルタイムでの複雑なクエリ処理や時系列での柔軟な分割が難しい
そのため、Apache Flinkを使って高度な処理を行い、結果をS3に保存する構成が選択される
AWS Lambda (サーバーレス)と連携
Lambdaとストリーミングサービスの連携
- S3イベント処理
S3バケットに新しいファイルがアップロードされたときに処理を実行 - DynamoDBストリーム処理
DynamoDBテーブルのデータが変更されたときに処理を実行 - Kinesis Streamsとの連携
ストリーミングデータをリアルタイムで処理
Lambda関数へのアクセス方法
HTTP経由でLambda関数を呼び出す方法は主に下記の2つgある。
- API GatewayをLambda関数に統合 - 一般的な方法
- Lambda関数URL - APIゲートウェイを使わずに、直接Lambda関数にアクセスできる新機能