こんにちは。
株式会社Fusicで技術共創部門の部門長/プリンシパルエンジニアをしている吉野です。
Fusic Advent Calendar 2023 の20日目です! 昨日は、ゆっきーさんのゆるっとスタディ:【マーケ】ひとりマーケターがまずやることとは!?でした!
ひとりマーケターの悩みややるべきことについて、詳細に書いてあってとっても勉強になる記事ですね。是非読んでみてください!
今回つくるもの
お仕事でスケーラブルなサーバーレスシステムを担当することになりそうだったので、勉強がてらつくってみました。
構成図は以下。
とってもシンプルですね。
ポイント管理システムですので、同時にたくさんのユーザーに対してポイントが追加されるケースが想定されます。
AWS LambdaとSQSによって、ある程度スケーラブルになっているのではないかと。
この構成図を実装していきます。
ディレクトリを作成
今回はServerless Frameworkを用いて実装を進めます。
とりあえずテンプレートファイルを作成
$ serverless
Creating a new serverless project
? What do you want to make?
AWS - Node.js - Starter
AWS - Node.js - HTTP API
AWS - Node.js - Scheduled Task
AWS - Node.js - SQS Worker
AWS - Node.js - Express API
AWS - Node.js - Express API with DynamoDB
❯ AWS - Python - Starter // Python Starterを選択
AWS - Python - HTTP API
AWS - Python - Scheduled Task
AWS - Python - SQS Worker
AWS - Python - Flask API
AWS - Python - Flask API with DynamoDB
Other
? What do you want to call this project? point-system // 任意のプロジェクト名を入力
⠦ Downloading "aws-python" template
✔ Project successfully created in point-system folder
? Do you want to login/register to Serverless Dashboard? (Y/n) n // 今回はNoを選択
? Do you want to deploy now? (Y/n) n // 今の状態でデプロイしても何もないので、Noを選択
以下のファイルが作成されます。
- handler.py
- README.md
- serverless.yml
DynamoDBとSQSを作成
serverless.ymlにリソースを追加していきます。
service: point-system
frameworkVersion: '3'
provider:
name: aws
runtime: python3.9
region: ap-northeast-1 # 東京Regionを設定
resources:
Resources:
PointsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: points
AttributeDefinitions:
- AttributeName: userId
AttributeType: S
KeySchema:
- AttributeName: userId
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
PointUpdateQueue:
Type: "AWS::SQS::Queue"
Properties:
QueueName: pointUpdateQueue.fifo
FifoQueue: true
ContentBasedDeduplication: true
今回、DynamoDBのKeyとしてはString型のUserIdを設定しました。
SQSの設定では、FIFOキューを設定します。
通常の標準設定のSQSでは、
- 実行の順番が保証されない
- メッセージが複数回実行される可能性がある
- 呼び出し側が複数回呼ぶ場合、複数回実行される
というものになります。
ポイントシステムという特性上、複数回実行されることは避ける必要があります。
FIFOキューにすることで、上記の問題が解決するため、FIFO設定で作成をします。
参考: 【新機能】Amazon SQSにFIFOが追加されました!(重複削除/単一実行/順序取得に対応)
Lambda関数を作成する
次に、SQSを呼び出すLambda関数、SQSによって実行されDynamoDBへデータ登録を行うLambda関数の二つの関数を作成します。
service: point-system
frameworkVersion: '3'
provider:
name: aws
runtime: python3.9
region: ap-northeast-1
iam: ## 追加
role:
statements:
- Effect: Allow
Action: 'dynamodb:*'
Resource: 'arn:aws:dynamodb:ap-northeast-1:*:table/points'
- Effect: Allow
Action: 'sqs:*'
Resource: 'arn:aws:sqs:ap-northeast-1:*:pointUpdateQueue.fifo'
resources:
Resources:
PointsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: points
AttributeDefinitions:
- AttributeName: userId
AttributeType: S
KeySchema:
- AttributeName: userId
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
PointUpdateQueue:
Type: "AWS::SQS::Queue"
Properties:
QueueName: pointUpdateQueue.fifo
FifoQueue: true
ContentBasedDeduplication: true
## 追加
functions:
addPoints:
handler: handler.addPoints
runtime: python3.9
events:
- http:
path: addPoints
method: POST
processPointsUpdate:
handler: handler.processPointsUpdate
runtime: python3.9
events:
- sqs:
arn:
Fn::GetAtt:
- PointUpdateQueue
- Arn
追加されたコードについて解説をします。
iam:
role:
statements:
- Effect: Allow
Action: 'dynamodb:*'
Resource: 'arn:aws:dynamodb:ap-northeast-1:*:table/points'
- Effect: Allow
Action: 'sqs:*'
Resource: 'arn:aws:sqs:ap-northeast-1:*:pointUpdateQueue.fifo'
Lambda関数によって、DynamoDBに対する変更とSQSに対するキューイングが実行されるため、Lambdaに対象のDynamoDBテーブルとキューへの権限のあるロールを渡します。
functions:
addPoints:
handler: handler.addPoints
runtime: python3.9
events:
- http:
path: addPoints
method: POST
processPointsUpdate:
handler: handler.processPointsUpdate
runtime: python3.9
events:
- sqs:
arn:
Fn::GetAtt:
- PointUpdateQueue
- Arn
addPoints関数、processPointsUpdate関数の二つの関数を作成します。
addPoints関数は/addPointsというPATHに対するPOSTの実行で呼び出されます。
processPointsUpdate関数は、SQSのPointUpdateQueueによって実行されます。
Lambda関数内のPythonのコードを実装する
- SQSにキューイングを実行する処理
- DynamoDBの更新を行う処理
それぞれをPythonのコードで記述します。
import json
import boto3
from boto3.dynamodb.conditions import Key
dynamoDB = boto3.resource('dynamodb').Table('points')
sqs = boto3.resource('sqs').Queue('SQSのURL')
def addPoints(event, context):
body = json.loads(event['body'])
user_id = body['userId']
points_to_add = body['points']
message_id = body['message_id']
# SQSにポイント更新イベントを送信
send_to_queue(user_id, points_to_add, message_id)
return {
"statusCode": 200,
"body": json.dumps({"message": "Points added successfully"})
}
def send_to_queue(user_id, points, message_id):
# ポイント更新イベントをキューに送信
sqs.send_message(
MessageBody=json.dumps({'userId': user_id, 'points': points}),
MessageDeduplicationId=message_id,
MessageGroupId="1"
)
# SQSによって呼び出されるメソッド
def processPointsUpdate(event, context):
for record in event['Records']:
body = json.loads(record['body'])
user_id = body['userId']
points = body['points']
## DynamoDBからデータを取得
queryData = dynamoDB.query(
KeyConditionExpression = Key("userId").eq(str(user_id)), # 取得するKey情報
Limit = 1
)
if queryData['Count'] == 0:
# Dynamo上に該当ユーザーのデータがない場合は新たにレコードを作成
dynamoDB.put_item(
Item = {
"userId": user_id,
"points": points
}
)
else:
# Dynamo上に該当ユーザーのデータが存在する場合は既存のレコードを更新
# DynamoDBにポイントを追加
response = dynamoDB.update_item(
Key={'userId': str(user_id)},
UpdateExpression='ADD points :val',
ExpressionAttributeValues={':val': points},
ReturnValues='UPDATED_NEW'
)
return {
"statusCode": 200,
"body": json.dumps({"message": "Points updated successfully"})
}
handler.py内にSQSを呼び出す処理、SQSから呼び出される処理、両方を記述しています。
serverless deploy
記述が必要なコードは以上です。
最後にデプロイコマンドを実行します。
$ serverless deploy
Deploying point-system to stage dev (ap-northeast-1)
✔ Service deployed to stack point-system-dev (36s)
endpoint: POST - Lambda関数のURL
functions:
addPoints: point-system-dev-addPoints (2.5 kB)
processPointsUpdate: point-system-dev-processPointsUpdate (2.5 kB)
無事に各リソースがデプロイされました。
Pythonコード内にSQSのURLを記述する必要があるため、マネジメントコンソールにログインし、SQSのURLを取得します。
コピーしたURLをLambdaのコードに反映し、再度デプロイを行います。
sqs = boto3.resource('sqs').Queue('SQSのURL') # ここにURLを追加
$ serverless deploy
動作確認
Lambda関数に対してPOSTでポイント追加操作を実行してみます。
DynamoDBをマネジメントコンソールで確認します。
無事にポイントが追加されました。
同じユーザーに対してポイントを追加してみます。
ここでメッセージIDを変更せずに実行すると
ポイントに変化はありません。
SQSをFIFOキューにしており、同じメッセージだと判別しているため、5分以内は再実行がされないためです。
メッセージIDを変更して実行してみます。
無事にポイントが更新されました。
これで同じメッセージでポイントが重複で加算されることを防がれていることが確認できました!
最後に
ServerlessFrameworkを用いて、とてもシンプルに簡単なポイント管理システムを作ることができました。
AWSのフルマネージドなサービスに乗っかって、スケーラブルなシステムの構築ができるのが、サーバーレスの大きなメリットですね。
SQSのFIFOについても、実際に使ってみて大きなメリットがあることがわかりました。
明日は岸田さんのbrefについての記事です!
明日もサーバーレスのお話ですね、お楽しみに!