1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

TerraformでAWS SNSのファンアウトイベントを作ってみる

Posted at

はじめに

AWS SNSは複数の宛先に配信をするような仕組みを簡単に作れるマネージドサービスだ。
その中でも、ファンアウトはよく使われるデザインパターンの例なので、使い方を覚えておいて損はない。
※「ファンアウト」というSNSの設定があるわけではない。

前提知識として以下がある。

  • Terraformをある程度書いたことがある
  • SNSに関して基本的な用語(トピック、サブスクライブ、パブリッシュ程度で良い)を理解している
  • SQSのイベントソースマッピングを触ったことがある(参考記事はこちら)

構成図

今回は以下の構成図とする。
ファンアウトイベントの通知先はSQSにして、SQSからイベントトリガでLambdaを起動してみてどのようなイベント内容になるかまで確認してみよう。
また、今回はイベント通知の成否をCloudWatch Logsに出力して確認できるようにもしてみる。

構成図2.drawio.png

下準備

SNSにCloudWatch Logsの出力するための権限を付与しておく。
また、SNSのログ出力は、以下のロググループに出力される。自動で作ってくれるが、ログ保存期間がデフォルトで1ヶ月だったりするため、自分でチューニングしたい場合は予め作っておこう。

  • 成功ケース: sns/リージョン名/AWSアカウントID/SNSトピック名>
  • 失敗ケース: sns/リージョン名/AWSアカウントID/SNSトピック名>/Failure
IAM
resource "aws_iam_role" "for_sns" {
  name               = local.sns_role_name
  assume_role_policy = data.aws_iam_policy_document.sns_assume.json
}

data "aws_iam_policy_document" "sns_assume" {
  statement {
    effect = "Allow"

    actions = [
      "sts:AssumeRole",
    ]

    principals {
      type = "Service"
      identifiers = [
        "sns.amazonaws.com",
      ]
    }
  }
}

resource "aws_iam_role_policy" "sns_custom" {
  role   = aws_iam_role.for_sns.id
  name   = local.sns_policy_name
  policy = data.aws_iam_policy_document.sns_custom.json
}

data "aws_iam_policy_document" "sns_custom" {
  statement {
    effect = "Allow"

    actions = [
      "logs:CreateLogGroup",
      "logs:CreateLogStream",
      "logs:PutLogEvents",
      "logs:PutMetricFilter",
      "logs:PutRetentionPolicy",
    ]

    resources = [
      "*",
    ]
  }
}
CloudWatchLogs
resource "aws_cloudwatch_log_group" "sns_example_success" {
  name              = "sns/${data.aws_region.current.name}/${data.aws_caller_identity.self.account_id}/${local.sns_topic_name}"
  retention_in_days = 3
}

resource "aws_cloudwatch_log_group" "sns_example_failure" {
  name              = "sns/${data.aws_region.current.name}/${data.aws_caller_identity.self.account_id}/${local.sns_topic_name}/Failure"
  retention_in_days = 3
}

通知先のSQSとLambda

通知先のSQSとLambdaは普通にイベントソースマッピングで作っておけば良い。
SQSのキューポリシーで、SNSからの通知を許容するようにしておいてあげよう。
Lambdaの中身は最後に説明する。

SQS(1つめのファンアウトイベント通知先)
resource "aws_sqs_queue" "sns_subscriber_1" {
  name = local.sqs_queue_name1
}

resource "aws_lambda_event_source_mapping" "sns_subscriber_1" {
  event_source_arn = aws_sqs_queue.sns_subscriber_1.arn
  function_name    = aws_lambda_function.example.arn
}

resource "aws_sqs_queue_policy" "sns_subscriber_1" {
  queue_url = aws_sqs_queue.sns_subscriber_1.id
  policy    = data.aws_iam_policy_document.sqs_sns_subscriber_1.json
}

data "aws_iam_policy_document" "sqs_sns_subscriber_1" {
  statement {
    effect = "Allow"

    principals {
      type = "Service"

      identifiers = [
        "sns.amazonaws.com"
      ]
    }

    actions = [
      "sqs:SendMessage",
    ]

    resources = [
      aws_sqs_queue.sns_subscriber_1.arn,
    ]

    condition {
      test     = "ArnEquals"
      variable = "aws:SourceArn"

      values = [
        aws_sns_topic.example.arn,
      ]
    }
  }
}
SQS(2つめのファンアウトイベント通知先)
resource "aws_sqs_queue" "sns_subscriber_2" {
  name = local.sqs_queue_name2
}

resource "aws_lambda_event_source_mapping" "sns_subscriber_2" {
  event_source_arn = aws_sqs_queue.sns_subscriber_2.arn
  function_name    = aws_lambda_function.example.arn
}

resource "aws_sqs_queue_policy" "sns_subscriber_2" {
  queue_url = aws_sqs_queue.sns_subscriber_2.id
  policy    = data.aws_iam_policy_document.sqs_sns_subscriber_2.json
}

data "aws_iam_policy_document" "sqs_sns_subscriber_2" {
  statement {
    effect = "Allow"

    principals {
      type = "Service"

      identifiers = [
        "sns.amazonaws.com"
      ]
    }

    actions = [
      "sqs:SendMessage",
    ]

    resources = [
      aws_sqs_queue.sns_subscriber_2.arn,
    ]

    condition {
      test     = "ArnEquals"
      variable = "aws:SourceArn"

      values = [
        aws_sns_topic.example.arn,
      ]
    }
  }
}
Lambda
data "archive_file" "example" {
  type        = "zip"
  source_dir  = "../scripts"
  output_path = "../outputs/example.zip"
}

resource "aws_lambda_function" "example" {
  depends_on = [
    aws_cloudwatch_log_group.example,
  ]

  function_name    = local.lambda_function_name
  filename         = data.archive_file.example.output_path
  role             = aws_iam_role.for_lambda.arn
  handler          = "example.lambda_handler"
  source_code_hash = data.archive_file.example.output_base64sha256
  runtime          = "python3.6"

  memory_size = 128
  timeout     = 30
}

resource "aws_cloudwatch_log_group" "example" {
  name              = "/aws/lambda/${local.lambda_function_name}"
  retention_in_days = 3
}

resource "aws_iam_role" "for_lambda" {
  name               = local.lambda_role_name
  assume_role_policy = data.aws_iam_policy_document.lambda_assume.json
}

data "aws_iam_policy_document" "lambda_assume" {
  statement {
    effect = "Allow"

    actions = [
      "sts:AssumeRole",
    ]

    principals {
      type = "Service"
      identifiers = [
        "lambda.amazonaws.com",
      ]
    }
  }
}

resource "aws_iam_role_policy" "lambda_custom" {
  role   = aws_iam_role.for_lambda.id
  name   = local.lambda_policy_name
  policy = data.aws_iam_policy_document.lambda_custom.json
}

data "aws_iam_policy_document" "lambda_custom" {
  statement {
    effect = "Allow"

    actions = [
      "logs:CreateLogGroup",
      "logs:CreateLogStream",
      "logs:PutLogEvents",
      "sqs:DeleteMessage",
      "sqs:GetQueueAttributes",
      "sqs:ReceiveMessage",
    ]

    resources = [
      "*",
    ]
  }
}
example.py
import json

def lambda_handler(event, context):
    print(event)
    print(event['Records'][0])
    
    body = json.loads(event['Records'][0]['body'])
    print(body)
    
    message = json.loads(body['Message'])
    print(message)

SNS

SNSのトピックは、aws_sns_topicで定義する。
ロググループはログを吐き出すタイミングで作るはずではあるが、念のため、自分でロググループを作った場合は、depends_onで先に作られるようにしておこう。
sqs_success_feedback_sample_rateは正常系のログをどれくらいの割合でサンプリングして出力するかを示す。今回は練習なので、全部だしておこう。
sqs_success/failure_feedback_role_arnは、↑の方で作ったログ出力用のIAMロールを設定すれば良い。

また、aws_sns_topic_subscriptionでイベント通知先との紐づけを行える。
今回は、SQSなので、protocol = "sqs"を設定しよう。

SNS
resource "aws_sns_topic" "example" {
  depends_on = [
    aws_cloudwatch_log_group.sns_example_success,
    aws_cloudwatch_log_group.sns_example_failure,
  ]

  name = local.sns_topic_name

  sqs_success_feedback_sample_rate = 100
  sqs_success_feedback_role_arn    = aws_iam_role.for_sns.arn
  sqs_failure_feedback_role_arn    = aws_iam_role.for_sns.arn
}

resource "aws_sns_topic_subscription" "subscriber_1" {
  topic_arn = aws_sns_topic.example.arn
  protocol  = "sqs"
  endpoint  = aws_sqs_queue.sns_subscriber_1.arn
}

resource "aws_sns_topic_subscription" "subscriber_2" {
  topic_arn = aws_sns_topic.example.arn
  protocol  = "sqs"
  endpoint  = aws_sqs_queue.sns_subscriber_2.arn
}

トピックにPublishしてみる

さて、ここまで実施すると完成だ。マネージメントコンソール画面から見てみると、トピックが作られていてキューが2つ通知先に設定されているかと思う。

キャプチャ1.png

さて、ここからPublishをしてみよう。
マネージメントコンソールの「メッセージの発行」ボタンからでも良いし、CLIで以下のように実行しても良い。

$ cat ../data/message.json 
{
  "message": "test"
}
$ aws sns publish --topic-arn arn:aws:sns:ap-northeast-1:XXXXXXXXXXXX:sns-fanout-example-topic --message file://../data/message.json

ここで、CloudWatch Logsを見てみる。
SQSからLambdaへのイベント通知の内容は、公式のSQSのデベロッパーガイドより、

{
    "Records": [
        {
            "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
            "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
            "body": "test",
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1545082649183",
                "SenderId": "AIDAIENQZJOLO23YVJ4VO",
                "ApproximateFirstReceiveTimestamp": "1545082649185"
            },
            "messageAttributes": {},
            "md5OfBody": "098f6bcd4621d373cade4e832627b4f6",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
            "awsRegion": "us-east-2"
        }
    ]
}

の通りに通知されている。
前段のSNSの通知がどこに入っているかというと、bodyの部分だ。

このbody

'body': '{\n  "Type" : "Notification",\n  "MessageId" : "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",\n  "TopicArn" : "arn:aws:sns:ap-northeast-1:XXXXXXXXXXXX:sns-fanout-example-topic",\n  "Message" : "{\\n  \\"message\\": \\"test\\"\\n}\\n",\n  "Timestamp" : "2021-09-24T12:01:17.765Z",\n  "SignatureVersion" : "1",\n  "Signature" : "シグニチャの中身をBase64エンコードしたもの",\n  "SigningCertURL" : "https://sns.ap-northeast-1.amazonaws.com/SimpleNotificationService-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX.pem",\n  "UnsubscribeURL" : "https://sns.ap-northeast-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:ap-northeast-1:XXXXXXXXXXXX:sns-fanout-example-topic:XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"\n}'

と、文字列の状態でグチャっと入っているので、これをjson.loads(event['Records'][0]['body'])としてdict型に変換してあげよう。すると、公式のSQSのデベロッパーガイドにある通り

{
  "Type" : "SubscriptionConfirmation",
  "MessageId" : "165545c9-2a5c-472c-8df2-7ff2be2b3b1b",
  "Token" : "2336412f37...",
  "TopicArn" : "arn:aws:sns:us-west-2:123456789012:MyTopic",
  "Message" : "You have chosen to subscribe to the topic arn:aws:sns:us-west-2:123456789012:MyTopic.\nTo confirm the subscription, visit the SubscribeURL included in this message.",
  "SubscribeURL" : "https://sns.us-west-2.amazonaws.com/?Action=ConfirmSubscription&TopicArn=arn:aws:sns:us-west-2:123456789012:MyTopic&Token=2336412f37...",
  "Timestamp" : "2012-04-26T20:45:04.751Z",
  "SignatureVersion" : "1",
  "Signature" : "EXAMPLEpH+DcEwjAPg8O9mY8dReBSwksfg2S7WKQcikcNKWLQjwu6A4VbeS0QHVCkhRS7fUQvi2egU3N858fiTDN6bkkOxYDVrY0Ad8L10Hs3zH81mtnPk5uvvolIC1CXGu43obcgFxeL3khZl8IKvO61GWB6jI9b5+gLPoBc1Q=",
  "SigningCertURL" : "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-f3ecfb7224c7233fe7bb5f59f96de52f.pem"
}

の形式で出力される。
やっとこのMessageに、Publish時に渡した情報が入っているが、この部分も実際は文字列になっているので、さらに

    body = json.loads(event['Records'][0]['body'])
    message = json.loads(body['Message'])
    print(message)

してあげることで、

{'message': 'test'}

とメッセージを受信することができる。

これで、ファンアウトイベントで通知されたメッセージをLambdaまで持ち回ることができるようになった!

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?