4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Amazon Aurora(PostgreSQL互換)でLambdaをトリガする際のプラクティス

Last updated at Posted at 2023-08-06

はじめに

AWSを使っていると、業務用のデータはリレーショナルデータベースに入れて、バックエンドのシステムに転送するような履歴情報は扱いが容易なKVSに入れたくなるような業務要件がよくある。
履歴情報の処理はリレーショナルデータベースでも実現できるが、1つ1つの処理のシンプルさを考慮すると、非同期でバックエンド処理をした方が良い。
Auroraには、Lambdaをイベントトリガで起動する機能があるので、まさにうってつけの機能と言える。これと、PostgreSQLのトリガプロシージャの機能を用いて非同期処理を実現できるだろう。
まずは、昨日の概要は公式のユーザーガイドを確認してみていただきたい。

ただし、本当に冪等性高く、漏れなく処理ができるのかが気になったので、本記事で検証をしてみた。
なお、AuroraはPostgreSQL互換を使用している。MySQL互換は異なる仕様の可能性があるため注意していただきたい。

検証ポイント

処理が非同期に分割されるので、考慮すべきポイントを洗い出していく。
この中には、AWSの仕様もあればPostgresSQLの仕様もある。

  • Commit時にトリガを引くことができるか(PostgreSQLの仕様)
    PostgreSQLではデフォルトがAUTOCOMMIT=ONになっているが、これをOFFにして正しいタイミングでトリガプロシージャが発動するかを確認する。
  • イベント起動に失敗した場合にエラー検知できるか(PostgreSQLの仕様)
    Lambdaの起動までもっていければエラー検知はアプリケーションで実装できるが、それよりも前の段階でエラー検知ができるか?特に、Auroraの中でエラーになった場合の検知ができるかを確認する。
  • イベント起動後は他のマネージドサービスのイベントトリガと同等の動作をするか(AWSの仕様)
    これは大丈夫と思いつつ念のため確認をする。

トリガプロシージャの作成

今回トリガを通知するテーブル

以下のように作成している。

CREATE TABLE IF NOT EXISTS employee (
  id char(5) PRIMARY KEY,
  name CHAR(20) NOT NULL,
  age integer
);

Lambdaの準備

今回のLambdaは、イベント通知の内容を確認が最低限できれば良いので、以下のように定義する。

example.py
import pprint

def lambda_handler(event, context):
  pprint.pprint(event)

  return {}

TerraformのIaCは以下のように定義する。
後で、3つ目の検証ポイントを確認するためにSQSを作成してdead_letter_configを設定しておこう。

data "archive_file" "example" {
  type        = "zip"
  source_dir  = "../scripts/example"
  output_path = "../outputs/example.zip"
}

resource "aws_lambda_function" "example" {
  depends_on = [
    aws_cloudwatch_log_group.lambda,
  ]

  function_name    = local.lambda_function_name
  filename         = data.archive_file.example.output_path
  role             = aws_iam_role.lambda.arn
  handler          = "example.lambda_handler"
  source_code_hash = data.archive_file.example.output_base64sha256
  runtime          = "python3.10"

  memory_size = 128
  timeout     = 30

  dead_letter_config {
    target_arn = aws_sqs_queue.lambda_dlq.arn
  }
}

resource "aws_cloudwatch_log_group" "lambda" {
  name              = "/aws/lambda/${local.lambda_function_name}"
  retention_in_days = 3
}

resource "aws_sqs_queue" "lambda_dlq" {
  name = local.lambda_dlq_name

  kms_master_key_id                 = "alias/aws/sqs"
  kms_data_key_reuse_period_seconds = 300
}

resource "aws_iam_role" "lambda" {
  name               = local.iam_lambda_role_name
  assume_role_policy = data.aws_iam_policy_document.lambda_assume.json
}

data "aws_iam_policy_document" "lambda_assume" {
  statement {
    effect = "Allow"

    actions = [
      "sts:AssumeRole",
    ]

    principals {
      type = "Service"
      identifiers = [
        "lambda.amazonaws.com",
      ]
    }
  }
}

resource "aws_iam_role_policy" "lambda_custom" {
  name   = local.iam_lambda_policy_name
  role   = aws_iam_role.lambda.id
  policy = data.aws_iam_policy_document.lambda_custom.json
}

data "aws_iam_policy_document" "lambda_custom" {
  statement {
    effect = "Allow"

    actions = [
      "logs:CreateLogGroup",
      "logs:CreateLogStream",
      "logs:PutLogEvents",
    ]

    resources = [
      aws_cloudwatch_log_group.lambda.arn,
      "${aws_cloudwatch_log_group.lambda.arn}:log-stream:*",
    ]
  }
  statement {
    effect = "Allow"

    actions = [
      "sqs:SendMessage",
    ]

    resources = [
      aws_sqs_queue.lambda_dlq.arn,
    ]
  }
}

Auroraの準備

Aurora側でLambdaを実行できるように権限を付与しておく必要がある。
権限の付与には、aws_rds_cluster_role_associationのリソースを使用する。feature_nameは何でもよい。

必要に応じてセキュリティグループの設定も見直す
LambdaはHTTPS経由で実行されるため、Auroraにアタッチしているセキュリティグループのアウトバウンドポートを解放していない場合は443ポートを解放しておくこと。

resource "aws_rds_cluster_role_association" "example" {
  db_cluster_identifier = aws_rds_cluster.example.id
  feature_name          = "Lambda"
  role_arn              = aws_iam_role.aurora.arn
}

resource "aws_iam_role" "aurora" {
  name               = local.iam_aurora_role_name
  assume_role_policy = data.aws_iam_policy_document.aurora_assume.json
}

data "aws_iam_policy_document" "aurora_assume" {
  statement {
    effect = "Allow"

    actions = [
      "sts:AssumeRole",
    ]

    principals {
      type = "Service"
      identifiers = [
        "rds.amazonaws.com",
      ]
    }
  }
}

resource "aws_iam_role_policy" "aurora" {
  role   = aws_iam_role.aurora.name
  policy = data.aws_iam_policy_document.aurora_custom.json
}

data "aws_iam_policy_document" "aurora_custom" {
  statement {
    effect = "Allow"

    actions = [
      "lambda:InvokeFunction",
    ]

    resources = [
      aws_lambda_function.example.arn,
    ]
  }
}

トリガプロシージャの作成

まずはLambdaの実行をSQLで実行できるようにする拡張パックをアクティベーションする。

CREATE EXTENSION IF NOT EXISTS aws_lambda CASCADE;

その後、まずはLambdaを実行する関数を作成しておく。
環境ごとの冪等性を担保するために、LambdaのARNは変数化して、Terraformのtemplatefile()で置換しよう。
aws_lambda.invoke()の第2引数がLambdaに渡ってくるイベントのペイロードになる。JSONで書きたいが、SQLで書く都合どうしても変な書式になってしまって扱いにくい……。

CHAR型カラムの扱い
今回のテーブル定義にあるNameのようなCHAR型の項目は、デフォルトでは後方空白トリムされないため、RTRIM関数等を使って期待したペイロードになるよう調整しよう。

create_procedure.sql
CREATE OR REPLACE FUNCTION invoke_lambda()
  RETURNS TRIGGER
  LANGUAGE PLPGSQL
AS $$
BEGIN
  PERFORM
    aws_lambda.invoke(
      '${lambda_function_arn}',
      CONCAT(
        '{"id": "', NEW.id, '", "name": "', RTRIM(NEW.name), '", "age": "', NEW.age, '"}'
      )::json,
      'ap-northeast-1',
      'Event'
    );
  RETURN NEW;
END
$$;
resource "local_file" "create_procedure_sql" {
  filename = "../sql/create_procedure.sql"
  content  = templatefile(
    "../sql/create_procedure_tmpl.sql",
    {
      lambda_function_arn = aws_lambda_function.example.arn
    }
  )
}

関数を作成したら、次はトリガーを作成する。

CREATE CONSTRAINT TRIGGER invoke_lambda
  AFTER INSERT ON employee
  INITIALLY DEFERRED
  FOR EACH ROW
EXECUTE FUNCTION
  invoke_lambda();

これで、INSERT時にLambdaが実行されることが確認できるはずだ。
CloudWatch Logsでイベントが想定通りに渡されているか確認してみよう。以下の通りの結果となっているはずだ。

実行したINSET文
INSERT INTO employee VALUES ( '00001', 'Taro', 35, CURRENT_TIMESTAMP );
CloudWatch Logsの出力
{'age': '35', 'id': '00001', 'name': 'Taro'}

検証:Commit時にトリガを引くことができるか
結論としては引くことは可能である。

ただし、単純に\set AUTOCOMMIT offするだけでは上手くいかない。
上記の通りINITIALLY DEFERREDを設定することで、COMMITまでトリガが実行されないようになる。もちろん、ROLLBACKをしたらトリガは実行されない。

ただし、INITIALLY DEFERREDは通常のトリガーでは使用できないため、CONSTRAINT TRIGGERにする。

また、トランザクションで処理した行の1件1件に対して関数実行するためにFOR EACH ROWを設定する。

こうすれば、Commit時にすべての行に対して非同期イベントが発行される。

検証:イベント起動失敗時の動作
実際のアプリケーションでエラーを補足できるかまでは確認できていないが、以下の通り、SQL実行時にエラーが発生することは確認できている。

今回は、エラーを疑似するために、AuroraにアタッチしているIAM権限を落としてLambda:Invokeが失敗するようにした。

以下の通り、COMMITのタイミングでエラーが発生する。この後にSELECTしても該当のレコードは取得できなかったため、非同期にしたとしても業務テーブルと履歴テーブルで一貫性は失われずに処理が継続できそうだ。

COMPANY=> INSERT INTO employee VALUES ( '00007', 'Nama', 77, CURRENT_TIMESTAMP );
INSERT 0 1
COMPANY=> commit;
ERROR:  invoke API failed
DETAIL:  AWS Lambda client returned 'User: arn:aws:sts::xxxxxxxxxxxx:assumed-role/aurora-psql-trigger-aurora-role/dbc-role-mem-id-null is not authorized to perform: lambda:InvokeFunction on resource: arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:function:aurora-postgres-trigger-lambda-example-function because no identity-based policy allows the lambda:InvokeFunction action'.
CONTEXT:  SQL statement "SELECT aws_lambda.invoke(
      'arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:function:aurora-postgres-trigger-lambda-example-function',
      CONCAT(
        '{"id": "', NEW.id, '", "name": "', RTRIM(NEW.name), '", "age": "', NEW.age, '"}'
      )::json,
      'ap-northeast-1',
      'Event'
    )"
PL/pgSQL function invoke_lambda() line 3 at PERFORM

イベント起動後の動作

aws_lambda.invokeを、Eventモードで起動した場合、起動後は通常のLambdaの「非同期処理」として動作する。

非同期処理の仕様は公式のユーザーガイドでは以下の通り書かれている。

非同期呼び出し - Lambda は、関数エラーを 2 回再試行します。関数にすべての受信リクエストを処理する十分なキャパシティがない場合、関数に送信されるまで、イベントはキューの中に数時間または数日間保持される可能性があります。正常に処理できなかったイベントを把握するために、デッドレターキューを設定できます。詳細については、「非同期呼び出し」を参照してください。

つまり、上記の定義の時に記載したとおり、DLQを設定しておけばエラーを補足することも可能そうだ。

実際にコードを以下のように書き換えてLambdaが必ず失敗するようにして検証する。

example.py
import pprint

def lambda_handler(event, context):
  pprint.pprint(event)

  raise Exception

これでINSERT文を1回実行すると、デフォルトの2回リトライが行われて、計3回の試行をすることが分かる。

image.png

また、上記エラー後にDLQを参照すると、以下のようにキューに情報が格納されている。
リクエストIDがCloudWatch Logsのものと一致することから、エラー時の動作は通常の非同期イベントの動作と同じになっていると考えられる。

image.png
image.png

注意:今回検証しきれていないこと

今回は以下の観点の検証はできていない。プロダクション利用する際は、充分検証の上で開始することを推奨する。

  • イベント通知が「ベストエフォート」なのか「At Least Once」なのか
    これは調べても分からなかった。
    EventBridgeの公式のユーザーガイドからに書かれているRDSの項目には「ベストエフォート」と書かれている。しかし、そこからリンクされているRDSの公式のユーザーガイドには、今回のイベント景気に関する記載がみあたらない。能動的にユーザが発行しているイベントなのでさすがに「At Least Once」であるとは思いたいが……
  • 性能観点
    性能観点としては、100件同時COMMITがもれなく処理されることは確認した。
    ワークロードが高負荷になるにつれてもれなく処理ができるかはよく検討しておいた方が良いだろう。
    100件同時COMMITした際は、レイテンシも秒単位で発生した(ただし、それがAurora Serverless v2のACUが小さかったことが原因であるかもしれない)。いずれにしろ、プロダクションのワークロードを模擬した負荷を投入してエラーにならないことは確認しておこう。
4
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?