はじめに
前回までは以下の記事の内容で、Publisher Lambda⇒SNS⇒SQS⇒Subscriber Lambda⇒DynamoDBまでの流れを構築しました。
今回はこちらの構成に手を加えて、以下のようにUpdateしていきたいと思います。
処理の流れを簡単に説明すると以下のようになります。
- Postmanを使ってAPIリクエストを送信し、API Gatewayを介してバックエンドLambdaが起動する。
- LambdaからSNSトピックが配信され、SQSキューに渡った後、Subscriber Lambdaが起動してDynamoDBに新規Itemを追加(Put)する。
- DynamoDB Streamに新規Itemの追加情報が流れ、次のLambdaがトリガーされる。このLambdaで更にSNSトピックを配信する。
- SNSにトピックが追加されたら、Userに処理完了通知をEmailで送信する。
前提条件
- Administrator権限を持つAWSアカウント
- LambdaはPython3.8で記述
- 上記別記事の部分は構築済
以下の順序で構築していきたいと思います。
- API Gatewayの設定
- メールを配信するSNSの設定
- DynamoDB Streamにトリガーされ、SNSトピックを配信する追加Lambdaの構築
- DynamoDB Streamの設定
API Gatewayの設定
API作成
Resource作成
Method作成
作成したResourceに対してMethodを作成していきます。下図の/test
は/publishtopic
に読み替えてください。
今回はLambdaを呼び出すのでIntegration type
にLambdaを選択し、リクエストボディに後続処理で利用するメッセージを格納したいのでUse Lambda Proxy Integration
をON
にします。Lambda Functionでは作成済のSNSトピックを配信するLambdaを選択します。Lambda側の画面を見てみると、トリガーにAPI Gatewayが反映されていました。
Use Lambda Proxy Integration
をON
にすると、設定項目を大幅に減らすことが可能です。今回のようにリクエスト、レスポンスの変換を必要としない場合に有効です。
また、上記をONにした場合、LambdaにInputされるデータ(Lambda関数のパラメータeventに入る値)が決まった形式になり、LambdaからOutputするデータも決まった形式する必要があります。
今回は以下のような最低限のレスポンスをセットするようにLambda関数を変更しました。
return {
'statusCode': 200,
'body': json.dumps('A new message has been published')
}
最後に作成したAPIをデプロイします。この手順は忘れがちです。
デプロイが完了すると、以下のようにAPIのエンドポイントが作成されます。
メール配信を行うSNSトピックの作成
以下のようなSNSトピックとサブスクリプションを作成します。サブスクリプションにEmailを選択しており、指定したEmail宛にサブスクリプションの確認メールが届くのでAcceptします。
追加Lambdaの作成
上記で作成したトピックを配信するLambdaを実装していきます。
import json
import boto3
client = boto3.client('sns')
def lambda_handler(event, context):
# Logging
print('Lambda triggered by DynamoDB Strem.')
# Get new item data
item_data = event['Records'][0]['dynamodb']['NewImage']
message = item_data['message']['S']
print(item_data)
# Set SNS Topic parameter
params = {
'TopicArn': 'arn:aws:sns:ap-northeast-1:xxxxxxxxxxxx:newItemOnDynamodb',
'Subject' : 'Published From: getDataFromDynamodbStream',
'Message' : message
}
# Send to SNS topic
try:
response = client.publish(
TopicArn = params['TopicArn'],
Subject = params['Subject'],
Message = params['Message']
)
print(response)
return {
'statusCode': 200,
'body': json.dumps(item_data)
}
except Exception as e:
print(e)
raise e
DynamoDBストリームから渡される中身から、メッセージを取得しています。取得したメッセージをTopicのMessage
にセットし、Emailで確認できるようにしています。また、メールの件名もSubject
でセットしています。
ちなみにDynamoDBストリームから渡ってくる中身は、以下のようなイメージです。
{
"Records": [
{
"eventID": "1",
"eventVersion": "1.0",
"dynamodb": {
"Keys": {
"Id": {
"N": "101"
}
},
"NewImage": {
"Message": {
"S": "New item!"
},
"Id": {
"N": "101"
}
},
"StreamViewType": "NEW_AND_OLD_IMAGES",
"SequenceNumber": "111",
"SizeBytes": 26
},
"awsRegion": "us-west-2",
"eventName": "INSERT",
"eventSourceARN": "arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291",
"eventSource": "aws:dynamodb"
}
]}
Records[].dynamodb
の中に入っているNewImage
からMessage
を取り出せばよいカタチです。
DynamoDB Streamの設定
DynamoDBに新規Itemが追加された際に、上記で実装したLambdaが呼び出されるようにDynamoDB Streamを設定していきます。
Streamを有効化し、Streamにデータが流れるタイミングは新規Itemが追加された場合で、New Image
が渡されればその内容(メッセージ)が取得できます。
また、Triggerとして先ほど作成したLambdaを設定します。これで一通り構築できました。
動作確認テスト
動作確認をしてみます。まずはAPI GatewayからTest実行してみます。
リクエストボディに以下のように文言をセットしてTest実行します。
最初のLambdaから200
のレスポンスが返却されました。
メールも届いています。動作していそうです。
続いてAPIのエンドポイントを実際に叩いてテストしてみます。APIエンドポイントはAPI Gateway作成手順でAPIをDeployした際に確認しました。
こちらのエンドポイントをブラウザで叩いてみます。
エラーとなりました。最初は「え??」となったのですが、ブラウザからのAPIコールはGETとなるので、今回はPOSTリクエストを送信しないといけません。
PostmanからPOSTリクエストを送信します。Bodyに任意の文字列を入れています。
ステータス200
が返ってきています。
メールも届きました。
先ほどの実行と合わせて、DynamoDBにも新規Itemが追加されていることも確認できました。
おわりに
無事にAPI GatewayとDynamoDB Streamを追加したPubSubの仕組みを構築できました。ユーザーからのリクエスト処理と、バックエンドのロジックを切り離しできるこのような構成は、AWSのサービスを組み合わせて実装可能なことを確認できました。ユーザーからのAPIリクエストを受け取る際に、ユーザー側には処理時間を待たせることなく、処理完了したら通知したいような構成としたい場合は有効だと思います。
参考記事
以下参考にさせて頂いた記事です。