0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

API GatewayからPubSubバックエンドLambaに接続する。

Posted at

はじめに

前回までは以下の記事の内容で、Publisher Lambda⇒SNS⇒SQS⇒Subscriber Lambda⇒DynamoDBまでの流れを構築しました。

今回はこちらの構成に手を加えて、以下のようにUpdateしていきたいと思います。
image.png
処理の流れを簡単に説明すると以下のようになります。

  1. Postmanを使ってAPIリクエストを送信し、API Gatewayを介してバックエンドLambdaが起動する。
  2. LambdaからSNSトピックが配信され、SQSキューに渡った後、Subscriber Lambdaが起動してDynamoDBに新規Itemを追加(Put)する。
  3. DynamoDB Streamに新規Itemの追加情報が流れ、次のLambdaがトリガーされる。このLambdaで更にSNSトピックを配信する。
  4. SNSにトピックが追加されたら、Userに処理完了通知をEmailで送信する。

前提条件

  • Administrator権限を持つAWSアカウント
  • LambdaはPython3.8で記述
  • 上記別記事の部分は構築済

以下の順序で構築していきたいと思います。

  1. API Gatewayの設定
  2. メールを配信するSNSの設定
  3. DynamoDB Streamにトリガーされ、SNSトピックを配信する追加Lambdaの構築
  4. DynamoDB Streamの設定

API Gatewayの設定

API作成

今回はREST APIを作成していきます。
image.png

Resource作成

APIが作成されたら、Resourceを作成していきます。
image.png

Method作成

作成したResourceに対してMethodを作成していきます。下図の/test/publishtopicに読み替えてください。
image.png
今回はLambdaを呼び出すのでIntegration typeにLambdaを選択し、リクエストボディに後続処理で利用するメッセージを格納したいのでUse Lambda Proxy IntegrationONにします。Lambda Functionでは作成済のSNSトピックを配信するLambdaを選択します。Lambda側の画面を見てみると、トリガーにAPI Gatewayが反映されていました。
image.png

Use Lambda Proxy IntegrationONにすると、設定項目を大幅に減らすことが可能です。今回のようにリクエスト、レスポンスの変換を必要としない場合に有効です。
また、上記をONにした場合、LambdaにInputされるデータ(Lambda関数のパラメータeventに入る値)が決まった形式になり、LambdaからOutputするデータも決まった形式する必要があります。

今回は以下のような最低限のレスポンスをセットするようにLambda関数を変更しました。

return {
        'statusCode': 200,
        'body': json.dumps('A new message has been published')
}

最後に作成したAPIをデプロイします。この手順は忘れがちです。
image.png
デプロイが完了すると、以下のようにAPIのエンドポイントが作成されます。
image.png

メール配信を行うSNSトピックの作成

以下のようなSNSトピックとサブスクリプションを作成します。サブスクリプションにEmailを選択しており、指定したEmail宛にサブスクリプションの確認メールが届くのでAcceptします。
image.png

追加Lambdaの作成

上記で作成したトピックを配信するLambdaを実装していきます。


import json
import boto3

client = boto3.client('sns')

def lambda_handler(event, context):
    # Logging
    print('Lambda triggered by DynamoDB Strem.')
    
    # Get new item data
    item_data = event['Records'][0]['dynamodb']['NewImage']
    message = item_data['message']['S']
    print(item_data)
    
    # Set SNS Topic parameter
    params = {
        'TopicArn': 'arn:aws:sns:ap-northeast-1:xxxxxxxxxxxx:newItemOnDynamodb',
        'Subject' : 'Published From: getDataFromDynamodbStream',
        'Message' : message
    }
    
    # Send to SNS topic
    try:
        response = client.publish(
            TopicArn = params['TopicArn'],
            Subject = params['Subject'],
            Message = params['Message']
        )
    
        print(response)
        return {
            'statusCode': 200,
            'body': json.dumps(item_data)    
        }
    
    except Exception as e:
        print(e)
        raise e

DynamoDBストリームから渡される中身から、メッセージを取得しています。取得したメッセージをTopicのMessageにセットし、Emailで確認できるようにしています。また、メールの件名もSubjectでセットしています。

ちなみにDynamoDBストリームから渡ってくる中身は、以下のようなイメージです。

{
  "Records": [
    {
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "NewImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES",
        "SequenceNumber": "111",
        "SizeBytes": 26
      },
      "awsRegion": "us-west-2",
      "eventName": "INSERT",
      "eventSourceARN": "arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291",
      "eventSource": "aws:dynamodb"
    }
  ]}

Records[].dynamodbの中に入っているNewImageからMessageを取り出せばよいカタチです。

DynamoDB Streamの設定

DynamoDBに新規Itemが追加された際に、上記で実装したLambdaが呼び出されるようにDynamoDB Streamを設定していきます。
Streamを有効化し、Streamにデータが流れるタイミングは新規Itemが追加された場合で、New Imageが渡されればその内容(メッセージ)が取得できます。
image.png
また、Triggerとして先ほど作成したLambdaを設定します。これで一通り構築できました。

動作確認テスト

動作確認をしてみます。まずはAPI GatewayからTest実行してみます。
image.png
リクエストボディに以下のように文言をセットしてTest実行します。
image.png
最初のLambdaから200のレスポンスが返却されました。
image.png
メールも届いています。動作していそうです。
image.png

続いてAPIのエンドポイントを実際に叩いてテストしてみます。APIエンドポイントはAPI Gateway作成手順でAPIをDeployした際に確認しました。
こちらのエンドポイントをブラウザで叩いてみます。
image.png
エラーとなりました。最初は「え??」となったのですが、ブラウザからのAPIコールはGETとなるので、今回はPOSTリクエストを送信しないといけません。
PostmanからPOSTリクエストを送信します。Bodyに任意の文字列を入れています。
image.png
ステータス200が返ってきています。
メールも届きました。
image.png
先ほどの実行と合わせて、DynamoDBにも新規Itemが追加されていることも確認できました。
image.png

おわりに

無事にAPI GatewayとDynamoDB Streamを追加したPubSubの仕組みを構築できました。ユーザーからのリクエスト処理と、バックエンドのロジックを切り離しできるこのような構成は、AWSのサービスを組み合わせて実装可能なことを確認できました。ユーザーからのAPIリクエストを受け取る際に、ユーザー側には処理時間を待たせることなく、処理完了したら通知したいような構成としたい場合は有効だと思います。

参考記事

以下参考にさせて頂いた記事です。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?