構成図
以下のような仕組みをつくっていきたいと思います。
PublisherのLambdaはコンソールから「Test」実行し、SNSトピックをPublishし、SQSでキューに追加された後に、SubscriberのLambdaが起動します。SubscriberのLambdaではメッセージに含まれる内容をDynamoDBに追加します。
前提条件
- Administrator権限を持つAWSアカウント
- LambdaはPython3.8で記述
以下の順番で内容を記載していきたいと思います。
- SNSトピックの設定
- SQSキューの設定
- DynamoDBの設定
- Publisher Lambdaの中身とSubscriber Lambdaの中身
- DynamoDBとCloudWatch Logsで実行確認
SNSトピックの設定
今回はStandardタイプでトピックを作成します。
トピックがうまく配信されない場合に、細かいログも確認したいのでLoggingの設定もします。今回はSQSにトピックを配信するので、ロギングの対象をAmazon SQSとします。既存のIAMロールが存在しない場合は、「Create New Service Role」から画面遷移に従ってロールを作成していきます。今回は既に作成済だったので、対象のロールをセットしています。
その他の項目はデフォルトで「作成」します。一旦ここまでの設定です。
Amazon SQSの設定
次にSQSのキューを作成していきます。こちらもStandardタイプで作成します。
(※キューの名前を打ち間違いしました。。。)
後はデフォルトで進めていき、「作成」をします。
以下の2点は追加で設定をした部分です。デッドレターキューはオプションですが、アクセス権限の部分はうまく設定されてないとSNSからメッセージを受信できないエラーとなりました。
デッドレターキューの設定
アクセスポリシーの設定
SNSトピックのサブスクライブの設定
一つ前の手順で作成したSNSトピックをサブスクライブする設定をします。SQSの画面から実施できます。
再びAmazon SNSの設定
SQS側でSNSのトピックのサブスクリプションの設定をすると、SNS側のトピックにも反映されます。少し追加の設定をします。
まずはEnable raw message delivery
のチェックボックスをONにします。この部分をONにすることで、Publisher LambdaでSNSにパブリッシュしたメッセージをそのまま受信できるようになります。(最初はOFFにしていましたが、そうするとSNS側で付与される属性情報なども含めたメッセージを受信するカタチになります。)
配信に失敗したトピックを格納しておくように、先ほどのデッドレターキューを指定します。オプション設定です。
DynamoDBの設定
DynamoDBはシンプルに設定していきます。今回はPartition Keyをmessage_id
、Sort Keyをtimestamp
としてみます。
Publisher用のLambdaを作成
Lambdaが起動したときのログ出力と、SNSにトピックをパブリッシュしているシンプルなLambdaです。Message
パラメータには、Test実行時に指定した文字列を渡すようにしています。
import boto3
from datetime import datetime
import json
client = boto3.client('sns')
def logging(errorLv, LambdaName, errorMsg):
loggingDateStr=(datetime.now()).strftime('%Y%m%d %H:%M:%S')
print(loggingDateStr + " " + LambdaName + " " + "[" + errorLv + "] " + errorMsg)
return
def lambda_handler(event, context):
logging("info", context.function_name, "lambda started")
params = {
'TopicArn': 'arn:aws:sns:ap-northeast-1:xxxxxxxxxxxx:myFirstTopic_std',
'Subject' : 'Published From: deliverTopicToSNS_Test01',
'Message' : event['message']
}
response = client.publish(
TopicArn = params['TopicArn'],
Subject = params['Subject'],
Message = params['Message']
)
print(json.dumps(response))
return response
Subscriber用のLambdaを作成
SQSから渡されるメッセージの内容をDynamoDBにputしているLambdaです。本来はメッセージを受信して処理が完了したら、メッセージを削除する流れがよいと思います。
import json
import boto3
from datetime import datetime
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('sns_sqs_lambda_table')
sqs_client = boto3.client("sqs")
queue_url = "https://sqs.ap-northeast-1.amazonaws.com/xxxxxxxxxxxx/SnsToSnsQueue"
def lambda_handler(event, context):
message = event['Records'][0]['body']
message_id = event['Records'][0]['messageId']
try:
result = table.put_item(Item={
'message_id':message_id,
'subject': 'fixed subject',
'message': message,
'timestamp': datetime.now().isoformat(timespec='seconds')
})
# sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=event['Records'][0]['ReceiptHandle'])
except Exception as e:
print(e)
raise e
print(result)
return result
Lambdaを起動するようにSQSで追加設定
SQSのキューにメッセージが格納された後に、作成したSubscriberのLambdaが起動されるように設定をします。この設定はSQS側で実施します。
Configure Lambda FUnction Triggerから対象のLambdaを選択するだけです。
この設定が完了すると、Lambda画面にSQSからのトリガーが追加されます。
ここまでで一通り設定が完了しました。
テスト実行と結果確認
Publisher用のLambdaから「Test」を実行します。全てうまく起動していれば、DynamoDBに新規ItemがPutされているはずです。
DynamoDBのテーブルを確認すると、無事に1件新規のItemが追加されていました!
念のためPublisher用Lambdaも確認します。
Subscriber用のLambdaも確認します。
Lambdaも無事に動作していることが確認できました。
SNSトピックがちゃんと配信されたかどうかはsnsのログから確認できます。
こちらも無事に動いていることが確認できました。
終わりに
ここまでLambda⇒SNS⇒SQS⇒Lambda⇒DynamoDBの構成を構築してきました。次回はこの構成に手を加えてAPI GatewayやEmailで処理完了通知などを受け取る仕組みを実装していきたいと思います。