1
0

More than 1 year has passed since last update.

【AWS】Aurora MySQLのレコードが操作されたら、SNSにTopicを発行したかった

Posted at

やりたかったこと

Aurora MySQLの特定のテーブルのInsert/Update/Deleteに反応して、
該当レコードのPKをSNSのTopicにpublishしたい。

さいしょにやったこと

AuroraにはLambdaを呼び出すネイティブ関数「lambda_(a)sync」が存在します。

なんで、これで「SNSにTopic発行するLambda」を、PKをパラメータにして呼び出せばいけるじゃんヤッター!

AuroraToSNS.png

create_trigger.sql
-- TESTというテーブルにIDというPKがあると仮定
DELIMITER ;;
CREATE TRIGGER TEST_TRIGGER_1 AFTER CREATE ON TEST FOR EACH ROW
BEGIN
  SELECT
    lambda_async(
      '<LambdaのARN>',
      CONCAT(
        '{
          "subject": "TEST TABLE MODIFIED",
          "message": "{ \\"ID\\": ', NEW.ID, ' }"
        }'
      )
    )
 INTO @result; -- 単純なSELECTのみだと「トリガーは値を返せない」エラーが出るので、てきとうなローカル変数に格納します
END;;

-- UPDATEとDELETEも同様に記載します

DELIMITER ;
lambda.py
import json

import boto3

sns = boto3.client("sns")


def handler(event, context):
    return sns.publish(
        TopicArn="<SNS TopicのARN>",
        Subject=event["subject"],
        Message=event["message"],
        MessageStructure="string",
    )

だめでした

SNSTopicのサブスクライバーがいざ該当レコードを確認しに行くと、レコードがないことがある。

調べたところ、トランザクションでレコードをいれてCOMMITしたタイミングで発生していました。
どうも、 lambda_(a)syncは、トランザクションとは無関係に動く らしいです。

要するに、トリガー内のlambda_(a)syncは、トリガーが作動した瞬間に起動するので

  1. トランザクション内でlambda_asyncを呼んでレコードの内容をlambdaに渡した場合、その瞬間はまだ該当レコードはコミットされていない
  2. トランザクションがロールバックされても呼ばれている

ということのようですね。参った。
1は実際起こった症状であるからまだいいとして、2は深刻です。
なんせlambda経由でロールバックされて存在しないPKをTopicに発行してしまいますから、
サブスクライバーからしたらたまったものではありません。

こまったなあ

  • できたレコードをクロールするのが確実だけど、そんなことはやりたくない
  • レコードの存在確認をしようにも、ロールバックされたレコードだと永遠に存在しない
  • レコードを操作したアプリケーションがpublishするのが正攻法だろうが、複数のアプリケーションがレコードを操作するので、そんな工数はかけられない

最終的にどうしたのか

DMSを使いました。

なんでこんなところでDMSが出てくるねん、と識者はお思いでしょうが、まずは下の構成図をごらんください。

withDMS.png

create_audit_log.sql
-- TESTというテーブルにIDというPKがあると仮定

CREATE TABLE TEST_AUDIT_LOG(
  ID INT PRIMARY KEY
) ENGINE = InnoDB;

DELIMITER ;;
CREATE TRIGGER TEST_AUDIT_LOG_TRIGGER_1 AFTER CREATE ON TEST_AUDIT_LOG FOR EACH ROW
BEGIN
  SELECT
    lambda_async(
      '<LambdaのARN>',
      CONCAT(
        '{
          "subject": "TEST TABLE MODIFIED",
          "message": "{ \\"ID\\": ', NEW.ID, ' }"
        }'
      )
    )
 INTO @result; -- 単純なSELECTのみだと「トリガーは値を返せない」エラーが出るので、てきとうなローカル変数に格納します
END;;

-- UPDATEとDELETEも同様に記載します

DELIMITER ;

(lambda.pyはそのまま)

こうすることにより、 TEST テーブルから TEST_AUDIT_LOG にレコードがマイグレーションされ、
マイグレーション時のINSERT/UPDATE/DELETEに反応してLambdaがコールされるようになります。

確かに、lambdaがコールされた段階では、TEST_AUDIT_LOGにはレコードはないかもしれません。
でもマイグレーションが作動するということは、TESTのレコード自身は確実に最新の状態です
なので、TopicのサブスクライバーはオリジナルのTESTを見に行くことで、確実に最新の状況を得られる状態になります。

さいごに

以上、DMSの悪用例をご紹介しました。
こんな使い方、AWSの中の人も想定しているまい。

正しい攻略法があったらぜひともご教授いただきたいです。
よろしくおねがいします。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0