2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

DynamoDB Streamsでイベントソースマッピングのフィルタ機能を使ってファンアウトイベントを作る

Last updated at Posted at 2022-03-06

はじめに

DynamoDB Streamsは便利だが、接続した機能を必ず呼んでしまうため、呼ばれたLambda側等でフィルタを作る必要があるのが不便だった。
だが、それも昔の話。2021年11月26日にフィルタ機能が使えるようになったので、必要な処理だけをLambda側で記載すれば良くなって、よりビジネスロジックだけに注力可能になった。

今回は、このフィルタを使って、ファンアウトなイベントを定義してみよう。

なお、前提知識としては以下があれば良い。

  • DynamoDB Streamsの基礎を理解している
  • Terraformの基礎を理解している

また、イベントフィルタリングに関してどんなイベントをどのようにフィルタするかという基本的な部分は、デベロッパーガイドに書かれているので、最低限これはおさえておこう。

構成図

以下のような構成とする。
1個目のLambdaは必ず実行され、2,3個目のLambdaはフィルタの条件を満たしたときのみ実行されるようにしてみる。

構成図.drawio.png

共通のTerraformリソース

まずは、DynamoDBとLambdaにアタッチするIAMを作っておこう。
Lambdaは、CloudWatch LogsとDynamoDBアクセスに必要な権限のみアタッチするので、実際に使用する場合は適宜必要なものを付け足そう。

dynamodb.tf
resource "aws_dynamodb_table" "example" {
  name         = local.dynamodb_table_name
  billing_mode = "PAY_PER_REQUEST"
  hash_key     = "id"

  stream_enabled   = true
  stream_view_type = "NEW_AND_OLD_IMAGES"

  attribute {
    name = "id"
    type = "S"
  }
}
iam.tf
resource "aws_iam_role" "for_lambda" {
  name               = local.lambda_role_name
  assume_role_policy = data.aws_iam_policy_document.lambda_assume.json
}

data "aws_iam_policy_document" "lambda_assume" {
  statement {
    effect = "Allow"

    actions = [
      "sts:AssumeRole",
    ]

    principals {
      type = "Service"
      identifiers = [
        "lambda.amazonaws.com",
      ]
    }
  }
}

resource "aws_iam_role_policy" "lambda_custom" {
  role   = aws_iam_role.for_lambda.id
  name   = local.lambda_policy_name
  policy = data.aws_iam_policy_document.lambda_custom.json
}

data "aws_iam_policy_document" "lambda_custom" {
  statement {
    effect = "Allow"

    actions = [
      "dynamodb:GetRecords",
      "dynamodb:GetShardIterator",
      "dynamodb:DescribeStream",
      "dynamodb:ListStreams",
      "logs:CreateLogGroup",
      "logs:CreateLogStream",
      "logs:PutLogEvents",
    ]

    resources = [
      "*",
    ]
  }
}

Lambdaの定義

Lambdaは以下のように定義する。

実行用コード

実行用コードは、ひとまず動けば何でもよいので、何の関数が呼ばれたか分かるように以下のようにしておく。
※今回は、1つの同じコードを、3つのLambda関数で実行して、その中で何の関数で動かしたか分かるようにしている。

example.py
import os

def lambda_handler(event, context):
    print(os.environ['AWS_LAMBDA_FUNCTION_NAME'])

また、上記コードをLambdaに渡せるように、archive_fileでZIPを作るようにしておく。

archive.tf
data "archive_file" "example" {
  type        = "zip"
  source_dir  = "../scripts"
  output_path = "../outputs/example.zip"
}

Lambdaの定義

Lambdaは以下のように3つ定義しておく。
これで、実行されたログを各ロググループから参照できるので見やすくなる。

lambda.tf
################################################################################
# Lambda-1                                                                     #
################################################################################
resource "aws_lambda_function" "example1" {
  depends_on = [
    aws_cloudwatch_log_group.lambda1,
  ]

  function_name    = local.lambda_function1_name
  filename         = data.archive_file.example.output_path
  role             = aws_iam_role.for_lambda.arn
  handler          = "example.lambda_handler"
  source_code_hash = data.archive_file.example.output_base64sha256
  runtime          = "python3.6"

  memory_size = 128
  timeout     = 30
}

resource "aws_cloudwatch_log_group" "lambda1" {
  name              = "/aws/lambda/${local.lambda_function1_name}"
  retention_in_days = 3
}

################################################################################
# Lambda-2                                                                     #
################################################################################
resource "aws_lambda_function" "example2" {
  depends_on = [
    aws_cloudwatch_log_group.lambda2,
  ]

  function_name    = local.lambda_function2_name
  filename         = data.archive_file.example.output_path
  role             = aws_iam_role.for_lambda.arn
  handler          = "example.lambda_handler"
  source_code_hash = data.archive_file.example.output_base64sha256
  runtime          = "python3.6"

  memory_size = 128
  timeout     = 30
}

resource "aws_cloudwatch_log_group" "lambda2" {
  name              = "/aws/lambda/${local.lambda_function2_name}"
  retention_in_days = 3
}

################################################################################
# Lambda-3                                                                     #
################################################################################
resource "aws_lambda_function" "example3" {
  depends_on = [
    aws_cloudwatch_log_group.lambda3,
  ]

  function_name    = local.lambda_function3_name
  filename         = data.archive_file.example.output_path
  role             = aws_iam_role.for_lambda.arn
  handler          = "example.lambda_handler"
  source_code_hash = data.archive_file.example.output_base64sha256
  runtime          = "python3.6"

  memory_size = 128
  timeout     = 30
}

resource "aws_cloudwatch_log_group" "lambda3" {
  name              = "/aws/lambda/${local.lambda_function3_name}"
  retention_in_days = 3
}

イベントソースマッピングの定義

イベントソースマッピングは以下のように定義する。

フィルタなし

フィルタなしはあまり考えることはないのでサクッと定義しておこう。
1つめのLambdaに紐付けて、無条件にコールされるようにする。

resource "aws_lambda_event_source_mapping" "lambda1" {
  event_source_arn  = aws_dynamodb_table.example.stream_arn
  function_name     = aws_lambda_function.example1.arn
  starting_position = "TRIM_HORIZON"
}

HCLで直接フィルタを記述する

さて、今回はIDが00001と00002のの場合で呼び出す関数を分岐してみよう。
ただし、同じ呼び出し方で書いても面白くないので、2通りの書き方をしてみる。
2つめのLambdaに紐付けるのは、以下のようにHCLで直接記述する方法だ。
別に難しいことはないが、CloudWatch Logsのフィルタ定義のJSONとの互換性がないのがたまにキズだ。

resource "aws_lambda_event_source_mapping" "lambda2" {
  event_source_arn  = aws_dynamodb_table.example.stream_arn
  function_name     = aws_lambda_function.example2.arn
  starting_position = "TRIM_HORIZON"

  filter_criteria {
    filter {
      pattern = jsonencode({
        dynamodb = {
          Keys: {
            id: {
              S: ["00001"]
            }
          }
        }
      })
    }
  }
}

これでイベントソースを作ると、以下のようにコンソールから確認できるようになる。

キャプチャ1.png

外部ファイルでJSON定義する

せっかくだから、外部ファイルでJSON定義してバリデーションチェックをエディタに任せたいという人も多いだろう。そういった場合は、以下のように定義を行う。

file("./eventfilter.json")のみで実行する場合、Terraformはjsonencodeもついでにやってくれるのだが、どうやら、AWSのAPIが改行コードを受け付けてくれないらしく、バリデーションエラーになる。回避するためにreplace関数を噛ませている。terraform applyの見た目上では改行コードが見えなくて何故エラーになっているか分からないという罠があるので気を付けよう。

eventfilter.json
{
  "dynamodb": {
    "Keys": {
      "id": {
        "S": [ "00002" ]
      }
    }
  }
}
resource "aws_lambda_event_source_mapping" "lambda3" {
  event_source_arn  = aws_dynamodb_table.example.stream_arn
  function_name     = aws_lambda_function.example3.arn
  starting_position = "TRIM_HORIZON"

  filter_criteria {
    filter {
      pattern = replace(file("./eventfilter.json"), "\n", "")
    }
  }
}

これで定義すると、マネージメントコンソールからは以下のように見える。
余計なスペースが入っているが、それ以外はHCLに直接書いたものと書式は同じだ(中身は違うけど)。

キャプチャ2.png

動かしてみる

さて、この状態で、

$ aws dynamodb put-item --table-name テーブル名 --item '{ "id": { "S": "00001" }, "name": { "S": "Taro" }, "age": { "S": "40" } }'
$ aws dynamodb put-item --table-name テーブル名 --item '{ "id": { "S": "00002" }, "name": { "S": "Jiro" }, "age": { "S": "25" } }'

とすると、CloudWatch Logsで、1つ目のLambdaが2回実行され、2,3個目のLambdaが1回ずつ実行されるのが分かる。
しっかりフィルタリングしつつファンアウトのイベントが作れたことが確認できた!

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?