3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Amazon SNSとAmazon SQSでPubSubの仕組みを実装する

Posted at

構成図

以下のような仕組みをつくっていきたいと思います。
スクリーンショット 2023-06-26 121400.png
PublisherのLambdaはコンソールから「Test」実行し、SNSトピックをPublishし、SQSでキューに追加された後に、SubscriberのLambdaが起動します。SubscriberのLambdaではメッセージに含まれる内容をDynamoDBに追加します。

前提条件

  • Administrator権限を持つAWSアカウント
  • LambdaはPython3.8で記述

以下の順番で内容を記載していきたいと思います。

  1. SNSトピックの設定
  2. SQSキューの設定
  3. DynamoDBの設定
  4. Publisher Lambdaの中身とSubscriber Lambdaの中身
  5. DynamoDBとCloudWatch Logsで実行確認

SNSトピックの設定

今回はStandardタイプでトピックを作成します。
スクリーンショット 2023-06-26 122254.png
トピックがうまく配信されない場合に、細かいログも確認したいのでLoggingの設定もします。今回はSQSにトピックを配信するので、ロギングの対象をAmazon SQSとします。既存のIAMロールが存在しない場合は、「Create New Service Role」から画面遷移に従ってロールを作成していきます。今回は既に作成済だったので、対象のロールをセットしています。
image.png
その他の項目はデフォルトで「作成」します。一旦ここまでの設定です。

Amazon SQSの設定

次にSQSのキューを作成していきます。こちらもStandardタイプで作成します。
(※キューの名前を打ち間違いしました。。。)
スクリーンショット 2023-06-26 131607.png
後はデフォルトで進めていき、「作成」をします。
以下の2点は追加で設定をした部分です。デッドレターキューはオプションですが、アクセス権限の部分はうまく設定されてないとSNSからメッセージを受信できないエラーとなりました。

デッドレターキューの設定

image.png

アクセスポリシーの設定

スクリーンショット 2023-06-26 132026.png

SNSトピックのサブスクライブの設定

一つ前の手順で作成したSNSトピックをサブスクライブする設定をします。SQSの画面から実施できます。
image.png

再びAmazon SNSの設定

SQS側でSNSのトピックのサブスクリプションの設定をすると、SNS側のトピックにも反映されます。少し追加の設定をします。
スクリーンショット 2023-06-26 132841.png
まずはEnable raw message deliveryのチェックボックスをONにします。この部分をONにすることで、Publisher LambdaでSNSにパブリッシュしたメッセージをそのまま受信できるようになります。(最初はOFFにしていましたが、そうするとSNS側で付与される属性情報なども含めたメッセージを受信するカタチになります。)
スクリーンショット 2023-06-26 133007.png
配信に失敗したトピックを格納しておくように、先ほどのデッドレターキューを指定します。オプション設定です。
image.png

DynamoDBの設定

DynamoDBはシンプルに設定していきます。今回はPartition Keyをmessage_id、Sort Keyをtimestampとしてみます。
image.png

Publisher用のLambdaを作成

Lambdaが起動したときのログ出力と、SNSにトピックをパブリッシュしているシンプルなLambdaです。Messageパラメータには、Test実行時に指定した文字列を渡すようにしています。

import boto3
from datetime import datetime
import json

client = boto3.client('sns')

def logging(errorLv, LambdaName, errorMsg):
    loggingDateStr=(datetime.now()).strftime('%Y%m%d %H:%M:%S')
    print(loggingDateStr + " " + LambdaName + " " + "[" + errorLv + "] " + errorMsg)
    return

def lambda_handler(event, context):
    
    logging("info", context.function_name, "lambda started")
    
    params = {
        'TopicArn': 'arn:aws:sns:ap-northeast-1:xxxxxxxxxxxx:myFirstTopic_std',
        'Subject' : 'Published From: deliverTopicToSNS_Test01',
        'Message' : event['message']
    }
    
    response = client.publish(
        TopicArn = params['TopicArn'],
        Subject = params['Subject'],
        Message = params['Message']
    )
    
    print(json.dumps(response))
    return response

以下がTest実行時に渡される文字列です。
image.png

Subscriber用のLambdaを作成

SQSから渡されるメッセージの内容をDynamoDBにputしているLambdaです。本来はメッセージを受信して処理が完了したら、メッセージを削除する流れがよいと思います。

import json
import boto3
from datetime import datetime

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('sns_sqs_lambda_table')
sqs_client = boto3.client("sqs")
queue_url = "https://sqs.ap-northeast-1.amazonaws.com/xxxxxxxxxxxx/SnsToSnsQueue"

def lambda_handler(event, context):
    
    message = event['Records'][0]['body']
    message_id = event['Records'][0]['messageId']
   
    try:
        result = table.put_item(Item={
            'message_id':message_id,
            'subject': 'fixed subject',
            'message': message,
            'timestamp': datetime.now().isoformat(timespec='seconds')
        })
        
        # sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=event['Records'][0]['ReceiptHandle'])
        
    except Exception as e:
        print(e)
        raise e
   
    print(result)
    return result

Lambdaを起動するようにSQSで追加設定

SQSのキューにメッセージが格納された後に、作成したSubscriberのLambdaが起動されるように設定をします。この設定はSQS側で実施します。
Configure Lambda FUnction Triggerから対象のLambdaを選択するだけです。
image.png
image.png
この設定が完了すると、Lambda画面にSQSからのトリガーが追加されます。
image.png
ここまでで一通り設定が完了しました。

テスト実行と結果確認

Publisher用のLambdaから「Test」を実行します。全てうまく起動していれば、DynamoDBに新規ItemがPutされているはずです。
DynamoDBのテーブルを確認すると、無事に1件新規のItemが追加されていました!
image.png
念のためPublisher用Lambdaも確認します。
image.png
Subscriber用のLambdaも確認します。
image.png
Lambdaも無事に動作していることが確認できました。
SNSトピックがちゃんと配信されたかどうかはsnsのログから確認できます。
image.png
image.png
こちらも無事に動いていることが確認できました。

終わりに

ここまでLambda⇒SNS⇒SQS⇒Lambda⇒DynamoDBの構成を構築してきました。次回はこの構成に手を加えてAPI GatewayやEmailで処理完了通知などを受け取る仕組みを実装していきたいと思います。

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?