初めに
イベント駆動型アーキテクチャは、マイクロサービスの連携や非同期処理を実現する上で非常に重要な設計パターンとなっています。2022年末のreで発表されたAmazon EventBridge Pipesは、イベントソースとターゲットをシームレスに接続し、オプションでイベントのフィルタリングや変換を行うことができる強力なサービスです。
本記事では、EventBridge Pipesを活用したS3バケットトリガーのバッチ処理システムについて解説します。具体的には、S3バケットにファイルがアップロードされたイベントをトリガーに、適切な時間にバッチ処理を実行するアーキテクチャを紹介します。
アーキテクチャ概要
今回紹介するシステムは以下のような要件を満たしています:
- S3バケットにファイルがアップロードされると自動的にデータ処理を開始
- アップロード時刻に応じて処理を即時実行または指定時刻まで待機
- バッチ処理にはECSタスクを使用し、処理内容に合わせた設定を動的に適用
このようなフローを実現するために、EventBridge Pipesを中心としたアーキテクチャを構築しました。
処理フロー
処理フローは大きく2つのパターンに分かれています
1. 即時実行パターン
S3 Object Created → SQS → EventBridge Pipe (Lambda Enrichment) → EventBridge Bus → EventBridge Rule → ECS Task
2. 時間指定実行パターン
S3 Object Created → SQS → EventBridge Pipe (Lambda Enrichment) → EventBridge Bus → EventBridge Rule → Step Functions → Wait Until Target time → ECS Task
EventBridge Pipesとは
Amazon EventBridge Pipesは、イベントプロデューサーとコンシューマーを直接接続する専用サービスです。従来のEventBridgeと異なり、Point-to-Pointの接続に特化しており、イベントのフィルタリングや変換機能を組み込むことができます。
EventBridge Pipesの主な特徴
- ソースとターゲットの直接接続: イベントソースからターゲットへの直接的な接続を提供
- イベントフィルタリング: 特定条件に一致するイベントのみを処理
- イベント変換(Enrichment): Lambda関数を使用したイベントの変換や拡張
- 豊富なソースとターゲット: SQS、Kinesis、DynamoDBなど様々なソースと、Lambda、ECS、Step Functionsなど多様なターゲットをサポート
システム実装詳細
1. 即時実行
ファイルがS3バケットにアップロードされると、以下の処理が実行されます:
- S3バケットへのオブジェクト作成イベントが発生
- イベントがSQSキューに送信される
- EventBridge Pipeが以下の処理を行う
SQSからメッセージを取得
Lambda関数でイベント情報を解析(S3キー、バッチ名、バケット名の抽出) - 解析されたイベントがEventBridge Busに送信される
- EventBridge Ruleで適切なターゲットにルーティング
- ECSタスクが特定のパラメータ(S3キー、バッチ名、バケット名)で起動
2. 時間指定実行
基本的な流れは同じですが、以下の点が異なります:
- EventBridge Ruleのターゲットとして、Step Functionsが起動
- Step Functionsが指定された時間まで待機
- 指定時刻になると、ECSタスクが起動される
EventBridge Pipesの具体的な実装
EventBridge Pipesでは、以下の設定を行っています:
1.ソース設定:
- ソースタイプ: Amazon SQS
- キュー: S3イベント通知用のSQSキュー
- バッチサイズ: 1(単一メッセージ処理)
2.フィルター設定(オプション):
- S3バケット名やプレフィックスでフィルタリング
3.エンリッチメント設定:
- Lambda関数を使用
- 関数処理:
S3キーの抽出
バッチ名の設定(importJob)
バケット名の抽出
実行時間(targetTime)の決定
4.ターゲット設定:
- ターゲットタイプ: Amazon EventBridge Bus
- イベントバス: カスタムイベントバス
- 詳細タイプ: "Enriched S3 Event"
EventBridge Pipesの利点
このアーキテクチャにおいて、EventBridge Pipesを採用することで以下のメリットが得られました:
1.デカップリング: S3イベントとバッチ処理が疎結合になり、柔軟な変更が可能
2.イベント加工: Lambda関数を使ったエンリッチメントで、イベント情報を処理に最適な形に変換
3.条件分岐: 時間に応じた処理フローの分岐が容易に実現可能
4.柔軟な拡張: 新たなバッチ処理の追加が容易
実装上の工夫
時間に基づく条件分岐
アップロード時刻に応じて処理方法を変更するために、Lambda関数内で現在時刻を確認し、適切なターゲットタイムを設定しています。これにより、指定時間より前にアップロードされたファイルは即時処理、それ以降のファイルは翌日の指定時刻に処理されます。
エラーハンドリング
EventBridge Pipesでは、処理に失敗したイベントを自動的にDLQ(Dead Letter Queue)に送信する機能を設定しています。これにより、処理失敗時にも確実に問題を検知し、対応することができます。
ECSタスクへのパラメータ受け渡し
EventBridge RuleやStep Functionsから起動するECSタスクに対して、S3キーやバッチ名などの情報を動的に渡すために、タスク定義のoverrideパラメータを活用しています。
実運用での注意点
コスト最適化
EventBridge Pipesは処理されたイベント数に応じた課金となるため、大量のイベントが発生する場合はコストに注意が必要です。今回のようにSQSをバッファとして使用し、バッチサイズを適切に設定することでコスト最適化が可能です。
モニタリングとアラート
EventBridge Pipesの処理状況やエラーを監視するために、CloudWatch Metricsとアラームを設定することをお勧めします。特に以下の指標に注目しています:
- InvocationsSucceeded
- InvocationsFailed
- ExecutionThrottled
まとめ
EventBridge Pipesを活用したS3トリガーのバッチ処理アーキテクチャを紹介しました。イベント駆動型アーキテクチャの構築において、EventBridge Pipesはソースとターゲットをシームレスに接続し、イベントの変換や拡張を容易に実現できる強力なサービスです。
今回紹介したアーキテクチャは、ファイルのアップロードをトリガーにしたバッチ処理だけでなく、様々なイベントソースに応用可能です。システムの疎結合化や柔軟性向上のために、EventBridge Pipesの活用を検討してみてはいかがでしょうか。