はじめに
AWS SNSは複数の宛先に配信をするような仕組みを簡単に作れるマネージドサービスだ。
その中でも、ファンアウトはよく使われるデザインパターンの例なので、使い方を覚えておいて損はない。
※「ファンアウト」というSNSの設定があるわけではない。
前提知識として以下がある。
- Terraformをある程度書いたことがある
- SNSに関して基本的な用語(トピック、サブスクライブ、パブリッシュ程度で良い)を理解している
- SQSのイベントソースマッピングを触ったことがある(参考記事はこちら)
構成図
今回は以下の構成図とする。
ファンアウトイベントの通知先はSQSにして、SQSからイベントトリガでLambdaを起動してみてどのようなイベント内容になるかまで確認してみよう。
また、今回はイベント通知の成否をCloudWatch Logsに出力して確認できるようにもしてみる。
下準備
SNSにCloudWatch Logsの出力するための権限を付与しておく。
また、SNSのログ出力は、以下のロググループに出力される。自動で作ってくれるが、ログ保存期間がデフォルトで1ヶ月だったりするため、自分でチューニングしたい場合は予め作っておこう。
- 成功ケース: sns/リージョン名/AWSアカウントID/SNSトピック名>
- 失敗ケース: sns/リージョン名/AWSアカウントID/SNSトピック名>/Failure
resource "aws_iam_role" "for_sns" {
name = local.sns_role_name
assume_role_policy = data.aws_iam_policy_document.sns_assume.json
}
data "aws_iam_policy_document" "sns_assume" {
statement {
effect = "Allow"
actions = [
"sts:AssumeRole",
]
principals {
type = "Service"
identifiers = [
"sns.amazonaws.com",
]
}
}
}
resource "aws_iam_role_policy" "sns_custom" {
role = aws_iam_role.for_sns.id
name = local.sns_policy_name
policy = data.aws_iam_policy_document.sns_custom.json
}
data "aws_iam_policy_document" "sns_custom" {
statement {
effect = "Allow"
actions = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:PutMetricFilter",
"logs:PutRetentionPolicy",
]
resources = [
"*",
]
}
}
resource "aws_cloudwatch_log_group" "sns_example_success" {
name = "sns/${data.aws_region.current.name}/${data.aws_caller_identity.self.account_id}/${local.sns_topic_name}"
retention_in_days = 3
}
resource "aws_cloudwatch_log_group" "sns_example_failure" {
name = "sns/${data.aws_region.current.name}/${data.aws_caller_identity.self.account_id}/${local.sns_topic_name}/Failure"
retention_in_days = 3
}
通知先のSQSとLambda
通知先のSQSとLambdaは普通にイベントソースマッピングで作っておけば良い。
SQSのキューポリシーで、SNSからの通知を許容するようにしておいてあげよう。
Lambdaの中身は最後に説明する。
resource "aws_sqs_queue" "sns_subscriber_1" {
name = local.sqs_queue_name1
}
resource "aws_lambda_event_source_mapping" "sns_subscriber_1" {
event_source_arn = aws_sqs_queue.sns_subscriber_1.arn
function_name = aws_lambda_function.example.arn
}
resource "aws_sqs_queue_policy" "sns_subscriber_1" {
queue_url = aws_sqs_queue.sns_subscriber_1.id
policy = data.aws_iam_policy_document.sqs_sns_subscriber_1.json
}
data "aws_iam_policy_document" "sqs_sns_subscriber_1" {
statement {
effect = "Allow"
principals {
type = "Service"
identifiers = [
"sns.amazonaws.com"
]
}
actions = [
"sqs:SendMessage",
]
resources = [
aws_sqs_queue.sns_subscriber_1.arn,
]
condition {
test = "ArnEquals"
variable = "aws:SourceArn"
values = [
aws_sns_topic.example.arn,
]
}
}
}
resource "aws_sqs_queue" "sns_subscriber_2" {
name = local.sqs_queue_name2
}
resource "aws_lambda_event_source_mapping" "sns_subscriber_2" {
event_source_arn = aws_sqs_queue.sns_subscriber_2.arn
function_name = aws_lambda_function.example.arn
}
resource "aws_sqs_queue_policy" "sns_subscriber_2" {
queue_url = aws_sqs_queue.sns_subscriber_2.id
policy = data.aws_iam_policy_document.sqs_sns_subscriber_2.json
}
data "aws_iam_policy_document" "sqs_sns_subscriber_2" {
statement {
effect = "Allow"
principals {
type = "Service"
identifiers = [
"sns.amazonaws.com"
]
}
actions = [
"sqs:SendMessage",
]
resources = [
aws_sqs_queue.sns_subscriber_2.arn,
]
condition {
test = "ArnEquals"
variable = "aws:SourceArn"
values = [
aws_sns_topic.example.arn,
]
}
}
}
data "archive_file" "example" {
type = "zip"
source_dir = "../scripts"
output_path = "../outputs/example.zip"
}
resource "aws_lambda_function" "example" {
depends_on = [
aws_cloudwatch_log_group.example,
]
function_name = local.lambda_function_name
filename = data.archive_file.example.output_path
role = aws_iam_role.for_lambda.arn
handler = "example.lambda_handler"
source_code_hash = data.archive_file.example.output_base64sha256
runtime = "python3.6"
memory_size = 128
timeout = 30
}
resource "aws_cloudwatch_log_group" "example" {
name = "/aws/lambda/${local.lambda_function_name}"
retention_in_days = 3
}
resource "aws_iam_role" "for_lambda" {
name = local.lambda_role_name
assume_role_policy = data.aws_iam_policy_document.lambda_assume.json
}
data "aws_iam_policy_document" "lambda_assume" {
statement {
effect = "Allow"
actions = [
"sts:AssumeRole",
]
principals {
type = "Service"
identifiers = [
"lambda.amazonaws.com",
]
}
}
}
resource "aws_iam_role_policy" "lambda_custom" {
role = aws_iam_role.for_lambda.id
name = local.lambda_policy_name
policy = data.aws_iam_policy_document.lambda_custom.json
}
data "aws_iam_policy_document" "lambda_custom" {
statement {
effect = "Allow"
actions = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ReceiveMessage",
]
resources = [
"*",
]
}
}
import json
def lambda_handler(event, context):
print(event)
print(event['Records'][0])
body = json.loads(event['Records'][0]['body'])
print(body)
message = json.loads(body['Message'])
print(message)
SNS
SNSのトピックは、aws_sns_topic
で定義する。
ロググループはログを吐き出すタイミングで作るはずではあるが、念のため、自分でロググループを作った場合は、depends_on
で先に作られるようにしておこう。
sqs_success_feedback_sample_rate
は正常系のログをどれくらいの割合でサンプリングして出力するかを示す。今回は練習なので、全部だしておこう。
sqs_success/failure_feedback_role_arn
は、↑の方で作ったログ出力用のIAMロールを設定すれば良い。
また、aws_sns_topic_subscription
でイベント通知先との紐づけを行える。
今回は、SQSなので、protocol = "sqs"
を設定しよう。
resource "aws_sns_topic" "example" {
depends_on = [
aws_cloudwatch_log_group.sns_example_success,
aws_cloudwatch_log_group.sns_example_failure,
]
name = local.sns_topic_name
sqs_success_feedback_sample_rate = 100
sqs_success_feedback_role_arn = aws_iam_role.for_sns.arn
sqs_failure_feedback_role_arn = aws_iam_role.for_sns.arn
}
resource "aws_sns_topic_subscription" "subscriber_1" {
topic_arn = aws_sns_topic.example.arn
protocol = "sqs"
endpoint = aws_sqs_queue.sns_subscriber_1.arn
}
resource "aws_sns_topic_subscription" "subscriber_2" {
topic_arn = aws_sns_topic.example.arn
protocol = "sqs"
endpoint = aws_sqs_queue.sns_subscriber_2.arn
}
トピックにPublishしてみる
さて、ここまで実施すると完成だ。マネージメントコンソール画面から見てみると、トピックが作られていてキューが2つ通知先に設定されているかと思う。
さて、ここからPublishをしてみよう。
マネージメントコンソールの「メッセージの発行」ボタンからでも良いし、CLIで以下のように実行しても良い。
$ cat ../data/message.json
{
"message": "test"
}
$ aws sns publish --topic-arn arn:aws:sns:ap-northeast-1:XXXXXXXXXXXX:sns-fanout-example-topic --message file://../data/message.json
ここで、CloudWatch Logsを見てみる。
SQSからLambdaへのイベント通知の内容は、公式のSQSのデベロッパーガイドより、
{
"Records": [
{
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
"body": "test",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082649183",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082649185"
},
"messageAttributes": {},
"md5OfBody": "098f6bcd4621d373cade4e832627b4f6",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
"awsRegion": "us-east-2"
}
]
}
の通りに通知されている。
前段のSNSの通知がどこに入っているかというと、body
の部分だ。
このbody
に
'body': '{\n "Type" : "Notification",\n "MessageId" : "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",\n "TopicArn" : "arn:aws:sns:ap-northeast-1:XXXXXXXXXXXX:sns-fanout-example-topic",\n "Message" : "{\\n \\"message\\": \\"test\\"\\n}\\n",\n "Timestamp" : "2021-09-24T12:01:17.765Z",\n "SignatureVersion" : "1",\n "Signature" : "シグニチャの中身をBase64エンコードしたもの",\n "SigningCertURL" : "https://sns.ap-northeast-1.amazonaws.com/SimpleNotificationService-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX.pem",\n "UnsubscribeURL" : "https://sns.ap-northeast-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:ap-northeast-1:XXXXXXXXXXXX:sns-fanout-example-topic:XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"\n}'
と、文字列の状態でグチャっと入っているので、これをjson.loads(event['Records'][0]['body'])
としてdict型に変換してあげよう。すると、公式のSQSのデベロッパーガイドにある通り
{
"Type" : "SubscriptionConfirmation",
"MessageId" : "165545c9-2a5c-472c-8df2-7ff2be2b3b1b",
"Token" : "2336412f37...",
"TopicArn" : "arn:aws:sns:us-west-2:123456789012:MyTopic",
"Message" : "You have chosen to subscribe to the topic arn:aws:sns:us-west-2:123456789012:MyTopic.\nTo confirm the subscription, visit the SubscribeURL included in this message.",
"SubscribeURL" : "https://sns.us-west-2.amazonaws.com/?Action=ConfirmSubscription&TopicArn=arn:aws:sns:us-west-2:123456789012:MyTopic&Token=2336412f37...",
"Timestamp" : "2012-04-26T20:45:04.751Z",
"SignatureVersion" : "1",
"Signature" : "EXAMPLEpH+DcEwjAPg8O9mY8dReBSwksfg2S7WKQcikcNKWLQjwu6A4VbeS0QHVCkhRS7fUQvi2egU3N858fiTDN6bkkOxYDVrY0Ad8L10Hs3zH81mtnPk5uvvolIC1CXGu43obcgFxeL3khZl8IKvO61GWB6jI9b5+gLPoBc1Q=",
"SigningCertURL" : "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-f3ecfb7224c7233fe7bb5f59f96de52f.pem"
}
の形式で出力される。
やっとこのMessage
に、Publish時に渡した情報が入っているが、この部分も実際は文字列になっているので、さらに
body = json.loads(event['Records'][0]['body'])
message = json.loads(body['Message'])
print(message)
してあげることで、
{'message': 'test'}
とメッセージを受信することができる。
これで、ファンアウトイベントで通知されたメッセージをLambdaまで持ち回ることができるようになった!