3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

FusicAdvent Calendar 2023

Day 20

SQS+DynamoDBでスケーラブルでフルサーバーレスなポイント管理システムを作る

Last updated at Posted at 2023-12-19

こんにちは。
株式会社Fusicで技術共創部門の部門長/プリンシパルエンジニアをしている吉野です。

Fusic Advent Calendar 2023 の20日目です! 昨日は、ゆっきーさんゆるっとスタディ:【マーケ】ひとりマーケターがまずやることとは!?でした!

ひとりマーケターの悩みややるべきことについて、詳細に書いてあってとっても勉強になる記事ですね。是非読んでみてください!

今回つくるもの

お仕事でスケーラブルなサーバーレスシステムを担当することになりそうだったので、勉強がてらつくってみました。

構成図は以下。

Untitled (1).png

とってもシンプルですね。

ポイント管理システムですので、同時にたくさんのユーザーに対してポイントが追加されるケースが想定されます。
AWS LambdaとSQSによって、ある程度スケーラブルになっているのではないかと。
この構成図を実装していきます。

ディレクトリを作成

今回はServerless Frameworkを用いて実装を進めます。

とりあえずテンプレートファイルを作成

$ serverless

Creating a new serverless project
? What do you want to make?
  AWS - Node.js - Starter
  AWS - Node.js - HTTP API
  AWS - Node.js - Scheduled Task
  AWS - Node.js - SQS Worker
  AWS - Node.js - Express API
  AWS - Node.js - Express API with DynamoDB
❯ AWS - Python - Starter // Python Starterを選択
  AWS - Python - HTTP API
  AWS - Python - Scheduled Task
  AWS - Python - SQS Worker
  AWS - Python - Flask API
  AWS - Python - Flask API with DynamoDB
  Other

? What do you want to call this project? point-system // 任意のプロジェクト名を入力

⠦ Downloading "aws-python" template
✔ Project successfully created in point-system folder

? Do you want to login/register to Serverless Dashboard? (Y/n) n // 今回はNoを選択 

? Do you want to deploy now? (Y/n) n // 今の状態でデプロイしても何もないので、Noを選択

以下のファイルが作成されます。

  • handler.py
  • README.md
  • serverless.yml

DynamoDBとSQSを作成

serverless.ymlにリソースを追加していきます。

serverless.yml
service: point-system

frameworkVersion: '3'

provider:
  name: aws
  runtime: python3.9
  region: ap-northeast-1 # 東京Regionを設定

resources:
  Resources:
    PointsTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: points
        AttributeDefinitions:
          - AttributeName: userId
            AttributeType: S
        KeySchema:
          - AttributeName: userId
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 5
          WriteCapacityUnits: 5
    PointUpdateQueue:
      Type: "AWS::SQS::Queue"
      Properties:
        QueueName: pointUpdateQueue.fifo
        FifoQueue: true
        ContentBasedDeduplication: true 

今回、DynamoDBのKeyとしてはString型のUserIdを設定しました。

SQSの設定では、FIFOキューを設定します。

通常の標準設定のSQSでは、

  • 実行の順番が保証されない
  • メッセージが複数回実行される可能性がある
  • 呼び出し側が複数回呼ぶ場合、複数回実行される

というものになります。
ポイントシステムという特性上、複数回実行されることは避ける必要があります。
FIFOキューにすることで、上記の問題が解決するため、FIFO設定で作成をします。

参考: 【新機能】Amazon SQSにFIFOが追加されました!(重複削除/単一実行/順序取得に対応)

Lambda関数を作成する

次に、SQSを呼び出すLambda関数、SQSによって実行されDynamoDBへデータ登録を行うLambda関数の二つの関数を作成します。

serverless.yml

service: point-system

frameworkVersion: '3'

provider:
  name: aws
  runtime: python3.9
  region: ap-northeast-1
  iam: ## 追加
    role:
      statements:
        - Effect: Allow
          Action: 'dynamodb:*'
          Resource: 'arn:aws:dynamodb:ap-northeast-1:*:table/points'
        - Effect: Allow
          Action: 'sqs:*'
          Resource: 'arn:aws:sqs:ap-northeast-1:*:pointUpdateQueue.fifo'

resources:
  Resources:
    PointsTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: points
        AttributeDefinitions:
          - AttributeName: userId
            AttributeType: S
        KeySchema:
          - AttributeName: userId
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 5
          WriteCapacityUnits: 5
    PointUpdateQueue:
      Type: "AWS::SQS::Queue"
      Properties:
        QueueName: pointUpdateQueue.fifo
        FifoQueue: true
        ContentBasedDeduplication: true 

## 追加
functions:
  addPoints:
    handler: handler.addPoints
    runtime: python3.9
    events:
      - http:
          path: addPoints
          method: POST

  processPointsUpdate:
    handler: handler.processPointsUpdate
    runtime: python3.9
    events:
      - sqs:
          arn:
            Fn::GetAtt:
              - PointUpdateQueue
              - Arn

追加されたコードについて解説をします。

serverless.yml
iam:
    role:
      statements:
        - Effect: Allow
          Action: 'dynamodb:*'
          Resource: 'arn:aws:dynamodb:ap-northeast-1:*:table/points'
        - Effect: Allow
          Action: 'sqs:*'
          Resource: 'arn:aws:sqs:ap-northeast-1:*:pointUpdateQueue.fifo'

Lambda関数によって、DynamoDBに対する変更とSQSに対するキューイングが実行されるため、Lambdaに対象のDynamoDBテーブルとキューへの権限のあるロールを渡します。

serverless.yml
functions:
  addPoints:
    handler: handler.addPoints
    runtime: python3.9
    events:
      - http:
          path: addPoints
          method: POST

  processPointsUpdate:
    handler: handler.processPointsUpdate
    runtime: python3.9
    events:
      - sqs:
          arn:
            Fn::GetAtt:
              - PointUpdateQueue
              - Arn

addPoints関数、processPointsUpdate関数の二つの関数を作成します。
addPoints関数は/addPointsというPATHに対するPOSTの実行で呼び出されます。
processPointsUpdate関数は、SQSのPointUpdateQueueによって実行されます。

Lambda関数内のPythonのコードを実装する

  • SQSにキューイングを実行する処理
  • DynamoDBの更新を行う処理

それぞれをPythonのコードで記述します。

handler.py
import json
import boto3
from boto3.dynamodb.conditions import Key

dynamoDB = boto3.resource('dynamodb').Table('points')
sqs = boto3.resource('sqs').Queue('SQSのURL')

def addPoints(event, context):
    body = json.loads(event['body'])
    user_id = body['userId']
    points_to_add = body['points']
    message_id = body['message_id']
    # SQSにポイント更新イベントを送信
    send_to_queue(user_id, points_to_add, message_id)

    return {
        "statusCode": 200,
        "body": json.dumps({"message": "Points added successfully"})
    }

def send_to_queue(user_id, points, message_id):
    # ポイント更新イベントをキューに送信
    sqs.send_message(
        MessageBody=json.dumps({'userId': user_id, 'points': points}),
        MessageDeduplicationId=message_id,
        MessageGroupId="1"
    )

# SQSによって呼び出されるメソッド
def processPointsUpdate(event, context):
    for record in event['Records']:
        body = json.loads(record['body'])
        user_id = body['userId']
        points = body['points']
        ## DynamoDBからデータを取得
        queryData = dynamoDB.query(
            KeyConditionExpression = Key("userId").eq(str(user_id)), # 取得するKey情報
            Limit = 1
        )
        if queryData['Count'] == 0:
            # Dynamo上に該当ユーザーのデータがない場合は新たにレコードを作成
            dynamoDB.put_item(
                Item = {
                    "userId": user_id,
                    "points": points
                }
            )
        else:
            # Dynamo上に該当ユーザーのデータが存在する場合は既存のレコードを更新
            # DynamoDBにポイントを追加
            response = dynamoDB.update_item(
                Key={'userId': str(user_id)},
                UpdateExpression='ADD points :val',
                ExpressionAttributeValues={':val': points},
                ReturnValues='UPDATED_NEW'
            )

    return {
        "statusCode": 200,
        "body": json.dumps({"message": "Points updated successfully"})
    }

handler.py内にSQSを呼び出す処理、SQSから呼び出される処理、両方を記述しています。

serverless deploy

記述が必要なコードは以上です。
最後にデプロイコマンドを実行します。

$ serverless deploy

Deploying point-system to stage dev (ap-northeast-1)

✔ Service deployed to stack point-system-dev (36s)

endpoint: POST - Lambda関数のURL
functions:
  addPoints: point-system-dev-addPoints (2.5 kB)
  processPointsUpdate: point-system-dev-processPointsUpdate (2.5 kB)

無事に各リソースがデプロイされました。

Pythonコード内にSQSのURLを記述する必要があるため、マネジメントコンソールにログインし、SQSのURLを取得します。

Screenshot 2023-12-19 at 21.52.58.png

コピーしたURLをLambdaのコードに反映し、再度デプロイを行います。

handler.py
sqs = boto3.resource('sqs').Queue('SQSのURL') # ここにURLを追加
$ serverless deploy

動作確認

Lambda関数に対してPOSTでポイント追加操作を実行してみます。

Screenshot 2023-12-19 at 21.56.26.png

DynamoDBをマネジメントコンソールで確認します。

Screenshot 2023-12-19 at 21.57.23.png

無事にポイントが追加されました。

同じユーザーに対してポイントを追加してみます。
ここでメッセージIDを変更せずに実行すると

Screenshot 2023-12-19 at 21.56.26.png

Screenshot 2023-12-19 at 21.57.23.png

ポイントに変化はありません。
SQSをFIFOキューにしており、同じメッセージだと判別しているため、5分以内は再実行がされないためです。

メッセージIDを変更して実行してみます。

Screenshot 2023-12-19 at 21.59.53.png

Screenshot 2023-12-19 at 22.00.33.png

無事にポイントが更新されました。

これで同じメッセージでポイントが重複で加算されることを防がれていることが確認できました!

最後に

ServerlessFrameworkを用いて、とてもシンプルに簡単なポイント管理システムを作ることができました。
AWSのフルマネージドなサービスに乗っかって、スケーラブルなシステムの構築ができるのが、サーバーレスの大きなメリットですね。
SQSのFIFOについても、実際に使ってみて大きなメリットがあることがわかりました。

明日は岸田さんのbrefについての記事です!
明日もサーバーレスのお話ですね、お楽しみに!

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?