はじめに
AWS Lambdaの気になる機能調べてみた記事2つ目です。
今回はevent source mappings(イベントソースマッピング)
を調べてみます。
ここには何が表示されるのかな?
イベント駆動型の処理を実装する方法の一つ
早速結論ですが、event source mappingsは、AWS Lambdaでイベント駆動型の処理を実装する方法の一つでした。
イベント駆動型の処理を実装する方法として、「event source mappings」以外にも「direct triggers」という方法もあります。
それぞれの特徴は以下の通りです。
event source mappings
- ストリームやキューからのデータをポーリング
- Lambda内で管理される
- バッチ処理が可能
Lambda内で管理するイベントポーラーと呼ばれるリソースが新しいメッセージをポーリングし、関数を呼び出します。
詳細は後述しますが、個々のイベントを個別処理するのではなく、ある程度まとめて処理を行う、バッチ処理機能があります。
direct triggers
- イベント発生時、即座にLambdaを呼び出し
- イベント元のサービスで管理
- 個別のイベント処理に適している
こちらは、S3にファイルが格納されたら、みたいなイベント個別毎に直接Lambdaを各サービスがコールする仕組みです。
対応サービス
event source mappingsは以下のサービスで利用可能みたいです。
- Amazon DynamoDB
- Amazon Kinesis
- Amazon MSK
- Amazon MQ
- Apache Kafka
- Amazon SQS
- Amazon DocumentDB
一方、direct triggers側は、S3やSNS、APIGatewayなどが該当します。
バッチ処理の仕組み
以下の2つでバッチ処理を制御可能です。
- バッチ処理ウィンドウの設定(0秒〜300秒)
- バッチサイズの制御
また、設定変更はできませんが、6MBのペイロード制限もあります。
バッチ処理イメージは以下のような感じです。
バッチ処理ウィンドウ到達
バッチ処理ウインドウは関数を呼び出すまでにイベントを収集する最大時間 (秒)です。
下記例でいうと、バッチ処理ウィンドウの40秒に到達した時点で、バッチサイズに関わりなく、この時点で関数がコールされます。
バッチサイズ到達
バッチサイズは関数に送信する各バッチのイベント数です。
下記例でいうと、バッチサイズである10までイベント受信した時点で、バッチ処理ウインドウまで待つことなく、この時点で関数がコールされます。
ペイロード制限も同様な動きをします。
プロビジョニングモードもあるよ。
イベントをポーリングする、イベントポーラー1つの性能は5MBps(メガバイト/秒)です。
これを超えると自動スケーリングするのですが、スパイクに対応するためのプロビジョニングモードもあります。
Amazon MSKとApache Kafkaで利用可能とのことですが、別料金がかかる模様。
試してみた!
DynamoDB Streamsで動作確認してみます。
Lambda準備
受け取ったイベント情報をそのままログに出すだけのプログラムを準備します。
// DynamoDB Streamsを処理するLambda関数
export const handler = async (event) => {
try {
// 各レコードを処理
for (const record of event.Records) {
// DynamoDBの変更内容を取得
const dynamodb = record.dynamodb;
// イベントタイプに応じた処理
switch (record.eventName) {
case 'INSERT':
console.log('新規レコード:');
console.log(JSON.stringify(dynamodb.NewImage, null, 2));
break;
case 'MODIFY':
console.log('変更前のレコード:');
console.log(JSON.stringify(dynamodb.OldImage, null, 2));
console.log('変更後のレコード:');
console.log(JSON.stringify(dynamodb.NewImage, null, 2));
break;
case 'REMOVE':
console.log('削除されたレコード:');
console.log(JSON.stringify(dynamodb.OldImage, null, 2));
break;
}
console.log('----------------------------------------');
}
return {
statusCode: 200,
body: JSON.stringify({
message: 'Successfully processed DynamoDB Stream records',
recordCount: event.Records.length
})
};
} catch (error) {
console.error('Error processing DynamoDB Stream:', error);
throw error;
}
};
DynamoDB側の設定
適用なテーブルを作り、DynamoDB ストリームの詳細
からストリームを有効化します。
次にトリガーの設定をします。
バッチ処理ウィンドウの設定はここではできないみたい。
この時点でLambdaの実行ロールが不足しているとエラーが出るので、以下のポリシーを追加しておきます。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:DescribeStream",
"dynamodb:ListStreams"
],
"Resource": "*"
}
]
}
イベントソースマッピングに出てきた!
ここまで作成すると、Lambdaのイベントソースマッピングのところに項目が追加されました!
この画面はインフォパネルに説明がある通り、削除するためだけの画面です。
イベントソースマッピングは、イベントソースからの読み込みを行って Lambda 関数を呼び出す Lambda リソースです。関数を削除しても、Lambda は関連するイベントソースマッピングを削除しません。ここでイベントソースマッピングを削除できます。
バッチ処理ウィンドウとバッチサイズの変更
バッチ処理ウィンドウとバッチサイズの変更は、Lambda側のトリガーの設定から変更可能です。
バッチサイズ2、バッチ処理ウインドウ30秒で動作確認
CLIを使って、30秒以内に連続してアイテム追加してみます。
aws dynamodb put-item \
--table-name stream_test \
--item '{
"id": {"S": "007”},
"name": {"S": "test user"}
}'
aws dynamodb put-item \
--table-name stream_test \
--item '{
"id": {"S": "008”},
"name": {"S": "test user"}
}'
一度の処理(24cca4e6-11f2-4fe1-a9d2-b0ab0a2afd51
)で2つのイベントを処理していることがわかります。
2025-01-03T00:03:49.444Z 24cca4e6-11f2-4fe1-a9d2-b0ab0a2afd51 INFO 新規レコード:
2025-01-03T00:03:49.444Z 24cca4e6-11f2-4fe1-a9d2-b0ab0a2afd51 INFO
{
"name": {
"S": "test user"
},
"id": {
"S": "007"
}
}
2025-01-03T00:03:49.444Z 24cca4e6-11f2-4fe1-a9d2-b0ab0a2afd51 INFO ----------------------------------------
2025-01-03T00:03:49.444Z 24cca4e6-11f2-4fe1-a9d2-b0ab0a2afd51 INFO 新規レコード:
2025-01-03T00:03:49.444Z 24cca4e6-11f2-4fe1-a9d2-b0ab0a2afd51 INFO
{
"name": {
"S": "test user"
},
"id": {
"S": "008"
}
}
2025-01-03T00:03:49.444Z 24cca4e6-11f2-4fe1-a9d2-b0ab0a2afd51 INFO ----------------------------------------
START RequestId: 24cca4e6-11f2-4fe1-a9d2-b0ab0a2afd51 Version: $LATEST
END RequestId: 24cca4e6-11f2-4fe1-a9d2-b0ab0a2afd51
REPORT RequestId: 24cca4e6-11f2-4fe1-a9d2-b0ab0a2afd51 Duration: 1.84 ms Billed Duration: 2 ms Memory Size: 128 MB Max Memory Used: 70 MB
バッチ処理は複数のイベントをまとめて処理することで、Lambdaの起動処理などのオーバヘッドを軽減してくれます。
まとめ
「event source mappings」と「direct triggers」の違いを学ぶことができました!
イベント駆動型の実装方法は知ってましたが、「event source mappings」と「direct triggers」の違いや、バッチ処理について知れて、勉強になりました〜!