はじめに
DynamoDB Streamsは便利だが、接続した機能を必ず呼んでしまうため、呼ばれたLambda側等でフィルタを作る必要があるのが不便だった。
だが、それも昔の話。2021年11月26日にフィルタ機能が使えるようになったので、必要な処理だけをLambda側で記載すれば良くなって、よりビジネスロジックだけに注力可能になった。
今回は、このフィルタを使って、ファンアウトなイベントを定義してみよう。
なお、前提知識としては以下があれば良い。
- DynamoDB Streamsの基礎を理解している
- Terraformの基礎を理解している
また、イベントフィルタリングに関してどんなイベントをどのようにフィルタするかという基本的な部分は、デベロッパーガイドに書かれているので、最低限これはおさえておこう。
構成図
以下のような構成とする。
1個目のLambdaは必ず実行され、2,3個目のLambdaはフィルタの条件を満たしたときのみ実行されるようにしてみる。
共通のTerraformリソース
まずは、DynamoDBとLambdaにアタッチするIAMを作っておこう。
Lambdaは、CloudWatch LogsとDynamoDBアクセスに必要な権限のみアタッチするので、実際に使用する場合は適宜必要なものを付け足そう。
resource "aws_dynamodb_table" "example" {
name = local.dynamodb_table_name
billing_mode = "PAY_PER_REQUEST"
hash_key = "id"
stream_enabled = true
stream_view_type = "NEW_AND_OLD_IMAGES"
attribute {
name = "id"
type = "S"
}
}
resource "aws_iam_role" "for_lambda" {
name = local.lambda_role_name
assume_role_policy = data.aws_iam_policy_document.lambda_assume.json
}
data "aws_iam_policy_document" "lambda_assume" {
statement {
effect = "Allow"
actions = [
"sts:AssumeRole",
]
principals {
type = "Service"
identifiers = [
"lambda.amazonaws.com",
]
}
}
}
resource "aws_iam_role_policy" "lambda_custom" {
role = aws_iam_role.for_lambda.id
name = local.lambda_policy_name
policy = data.aws_iam_policy_document.lambda_custom.json
}
data "aws_iam_policy_document" "lambda_custom" {
statement {
effect = "Allow"
actions = [
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:DescribeStream",
"dynamodb:ListStreams",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
]
resources = [
"*",
]
}
}
Lambdaの定義
Lambdaは以下のように定義する。
実行用コード
実行用コードは、ひとまず動けば何でもよいので、何の関数が呼ばれたか分かるように以下のようにしておく。
※今回は、1つの同じコードを、3つのLambda関数で実行して、その中で何の関数で動かしたか分かるようにしている。
import os
def lambda_handler(event, context):
print(os.environ['AWS_LAMBDA_FUNCTION_NAME'])
また、上記コードをLambdaに渡せるように、archive_file
でZIPを作るようにしておく。
data "archive_file" "example" {
type = "zip"
source_dir = "../scripts"
output_path = "../outputs/example.zip"
}
Lambdaの定義
Lambdaは以下のように3つ定義しておく。
これで、実行されたログを各ロググループから参照できるので見やすくなる。
################################################################################
# Lambda-1 #
################################################################################
resource "aws_lambda_function" "example1" {
depends_on = [
aws_cloudwatch_log_group.lambda1,
]
function_name = local.lambda_function1_name
filename = data.archive_file.example.output_path
role = aws_iam_role.for_lambda.arn
handler = "example.lambda_handler"
source_code_hash = data.archive_file.example.output_base64sha256
runtime = "python3.6"
memory_size = 128
timeout = 30
}
resource "aws_cloudwatch_log_group" "lambda1" {
name = "/aws/lambda/${local.lambda_function1_name}"
retention_in_days = 3
}
################################################################################
# Lambda-2 #
################################################################################
resource "aws_lambda_function" "example2" {
depends_on = [
aws_cloudwatch_log_group.lambda2,
]
function_name = local.lambda_function2_name
filename = data.archive_file.example.output_path
role = aws_iam_role.for_lambda.arn
handler = "example.lambda_handler"
source_code_hash = data.archive_file.example.output_base64sha256
runtime = "python3.6"
memory_size = 128
timeout = 30
}
resource "aws_cloudwatch_log_group" "lambda2" {
name = "/aws/lambda/${local.lambda_function2_name}"
retention_in_days = 3
}
################################################################################
# Lambda-3 #
################################################################################
resource "aws_lambda_function" "example3" {
depends_on = [
aws_cloudwatch_log_group.lambda3,
]
function_name = local.lambda_function3_name
filename = data.archive_file.example.output_path
role = aws_iam_role.for_lambda.arn
handler = "example.lambda_handler"
source_code_hash = data.archive_file.example.output_base64sha256
runtime = "python3.6"
memory_size = 128
timeout = 30
}
resource "aws_cloudwatch_log_group" "lambda3" {
name = "/aws/lambda/${local.lambda_function3_name}"
retention_in_days = 3
}
イベントソースマッピングの定義
イベントソースマッピングは以下のように定義する。
フィルタなし
フィルタなしはあまり考えることはないのでサクッと定義しておこう。
1つめのLambdaに紐付けて、無条件にコールされるようにする。
resource "aws_lambda_event_source_mapping" "lambda1" {
event_source_arn = aws_dynamodb_table.example.stream_arn
function_name = aws_lambda_function.example1.arn
starting_position = "TRIM_HORIZON"
}
HCLで直接フィルタを記述する
さて、今回はIDが00001と00002のの場合で呼び出す関数を分岐してみよう。
ただし、同じ呼び出し方で書いても面白くないので、2通りの書き方をしてみる。
2つめのLambdaに紐付けるのは、以下のようにHCLで直接記述する方法だ。
別に難しいことはないが、CloudWatch Logsのフィルタ定義のJSONとの互換性がないのがたまにキズだ。
resource "aws_lambda_event_source_mapping" "lambda2" {
event_source_arn = aws_dynamodb_table.example.stream_arn
function_name = aws_lambda_function.example2.arn
starting_position = "TRIM_HORIZON"
filter_criteria {
filter {
pattern = jsonencode({
dynamodb = {
Keys: {
id: {
S: ["00001"]
}
}
}
})
}
}
}
これでイベントソースを作ると、以下のようにコンソールから確認できるようになる。
外部ファイルでJSON定義する
せっかくだから、外部ファイルでJSON定義してバリデーションチェックをエディタに任せたいという人も多いだろう。そういった場合は、以下のように定義を行う。
file("./eventfilter.json")
のみで実行する場合、Terraformはjsonencode
もついでにやってくれるのだが、どうやら、AWSのAPIが改行コードを受け付けてくれないらしく、バリデーションエラーになる。回避するためにreplace
関数を噛ませている。terraform apply
の見た目上では改行コードが見えなくて何故エラーになっているか分からないという罠があるので気を付けよう。
{
"dynamodb": {
"Keys": {
"id": {
"S": [ "00002" ]
}
}
}
}
resource "aws_lambda_event_source_mapping" "lambda3" {
event_source_arn = aws_dynamodb_table.example.stream_arn
function_name = aws_lambda_function.example3.arn
starting_position = "TRIM_HORIZON"
filter_criteria {
filter {
pattern = replace(file("./eventfilter.json"), "\n", "")
}
}
}
これで定義すると、マネージメントコンソールからは以下のように見える。
余計なスペースが入っているが、それ以外はHCLに直接書いたものと書式は同じだ(中身は違うけど)。
動かしてみる
さて、この状態で、
$ aws dynamodb put-item --table-name テーブル名 --item '{ "id": { "S": "00001" }, "name": { "S": "Taro" }, "age": { "S": "40" } }'
$ aws dynamodb put-item --table-name テーブル名 --item '{ "id": { "S": "00002" }, "name": { "S": "Jiro" }, "age": { "S": "25" } }'
とすると、CloudWatch Logsで、1つ目のLambdaが2回実行され、2,3個目のLambdaが1回ずつ実行されるのが分かる。
しっかりフィルタリングしつつファンアウトのイベントが作れたことが確認できた!