概要
リアルタイムデータの蓄積と加工は、ログ分析、モニタリング、機械学習前処理など、多くのユースケースで重要な要素です。
本記事では、Amazon Kinesis Data Firehose と AWS Lambda を用いて、次のような構成を実現する方法を紹介します:
- リアルタイムでストリーミングデータを取り込み
- 必要なデータ加工処理をLambdaで自動化
- 処理済みデータをS3に永続化
このアーキテクチャは、完全サーバレスでスケーラブルな設計が可能であり、初期構築や運用の手間を最小限に抑えることができます。
システム構成イメージ
データ処理の流れ
- データ送信元 は、Firehose の Delivery Stream に対して HTTP API などを用いてデータを送信します。
- Firehose は、受信したデータをリアルタイムで Lambda 関数 に渡します。
- Lambda 関数 内では、データ整形・バリデーション・フィールドの追加や変換など、自由に加工処理を実装可能です。
- 加工が完了したデータは、Amazon S3 に自動で格納され、後続のバッチ処理やAthenaなどによるクエリ分析に活用できます。
Lambda 変換関数のサンプル(Python)
import json
import base64
def lambda_handler(event, context):
output = []
for record in event['records']:
# Base64デコードしてJSONとしてパース
payload = json.loads(base64.b64decode(record['data']))
# 加工処理:例としてタイムスタンプの追加
payload['processed_at'] = context.aws_request_id
# 加工後のデータを再エンコード
encoded_data = base64.b64encode(json.dumps(payload).encode('utf-8')).decode('utf-8')
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': encoded_data
}
output.append(output_record)
return {'records': output}
Firehose の主な設定項目
設定項目 | 内容 |
---|---|
Delivery Stream Type | Direct PUT または Kinesis Stream 経由を選択可能 |
Lambda 関数指定 | 加工処理用のLambda関数を指定(リアルタイム変換を有効化) |
S3 設定 | 保存先バケット、キーPrefix、圧縮(GZIPなど)の指定が可能 |
ログ設定 | CloudWatch Logs にFirehose処理結果を出力してデバッグや監視が可能 |
なぜこの構成が有用なのか?
特徴 | 説明 |
---|---|
完全サーバレス | EC2やECSを使用せず、スケーラブルで保守不要なデータパイプラインが構築可能 |
拡張性 | Lambdaのロジック変更で柔軟にデータ加工処理をアップデート可能 |
コスト効率 | バッチ転送&圧縮でストレージコストや転送コストを最小限に |
保守性 | マネージドサービス活用により、スケーリング・パッチ・稼働監視が不要 |
まとめ
ユースケース | 説明 |
---|---|
Webログの整形と分析保存 | JSON形式に変換し、AthenaやQuickSightでのクエリ分析が可能に |
IoTセンサーのデータ拡張と蓄積 | 不要フィールド除去やメタ情報追加などをLambdaで前処理 |
ユーザー操作ログのフィルタ保存 | 特定イベントや操作のみを抽出して保存し、後続分析を効率化 |
Lambdaによるデータ前処理をFirehose内で実現することで、後続のバッチ処理やBI分析を意識したデータ構造をS3に直接格納できます。拡張性・保守性・運用効率の三拍子が揃ったサーバレスアーキテクチャです。