AWS Lambda(Python)でSQS, ParameterStore, DynamoDBを処理してみた
最近AWS上のサービスを使ったアプリ開発が増え、バッチなどの業務にLambda、SQSなどを使用したので備忘録として残そうと思います。
本番を想定した設定にはなっておりませんので、ご容赦ください。
Lambda関数で各サービスを使うチュートリアル的なものと思ってください。
今回は仕様的な部分に関してはほぼ記述しておりません、機会があれば別途投稿しようと思います。
実施したこと
- Lambda:関数の作成、下記のサービスとの統合
- SQS:キューの作成からメッセージを受信する設定
- ParameterStore:値のセット、Lambda関数で取得
- SDKで取得する方法
- Extensionsで取得する方法(+カスタムレイヤーの作り方)
- DyanamoDBの作成、操作
Lambda:関数の作成、下記のサービスとの統合
SQS:キューの作成からメッセージを受信する設定
・入力する項目ですが、基本的に名前のみで後はデフォルトのままで大丈夫です
デッドレターキュとは、通常に設定しているキューの処理が失敗した時に退避されるキューです。これによりエラーとなるキューをそのままにせず、原因解消後処理するようなことが可能になります。
・そして、【Lambdaトリガー】のタブから先ほどのLambda関数を設定します
そのまま設定すると以下のエラーになるので、Lambda関数にIAMロールを追加します。
Lambda関数の【設定】から【ロール名】をクリックして、IAMへ移動します。
許可ポリシーから追加をします。今回はポリシーのアタッチで大丈夫です。
検索で[SQS]と入力し、【AmazonSQSFullAccess】にチェックを入れ、追加します。
SQSに戻り再度、Lambda関数のトリガーを設定します。
次は正常に設定されました。ステータスがCreating → EnabledなればSQSメッセージを送信すると、Lambda関数が自動で受信するようになります。
・上記までできれば一旦確認してみましょう
まずはLambda側の設定です。
Lambda関数の[コード]を以下のように書き換えてください。
import json
def lambda_handler(event, context):
for record in event["Records"]:
# SQSのメッセージをevent["Records"]で受け取る
# ["body"]にメッセージ本文が格納されている
body = record["body"]
print("Received message:", body)
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
そして【Deploy】します。
少し経ったら、画面上記にダイアログが表示されます。
次にSQSにメッセージを送信します。
メッセージを送受信から、メッセージ本文に任意の値を入力し、【メッセージを送信】します。
送信後、処理されたかログを確認してみましょう。
CloudWatchを確認します。
ParameterStore:値のセット、Lambda関数で取得
次にParameter storeから値を取得してみましょう。
実は可視性のために改行を入れたりすると少し厄介になります。
また取得方法も2つあって、pythonコードからboto3を使って取得する方法、Extensionsのレイヤーを使用して直接取得する方法(http経由)があります。
アップデートによって、Extensionsが使用できるようになったのですが、取得した値をキャッシュできるようになったので、一定の時間2回目の呼び出しが爆速になり、低レイテンシを実現できます。
なので私はExtensionsの使い方をお勧めいたします。しかしEC2などからParameter storeの値を取得する際は使用できないので、SDKの取得方法も紹介させていただきます。
まずはParameter storeの値のセットを行います。
パラメータの作成から、名前と値を入力して、作成をしておきます。
ちなみに名前はスラッシュ(/)で階層化することが可能です。
Parameter storeを使用する際(というか他サービスとの連携)もIAMロールによってアクセス許可しないとエラーになるので追加します。
今回はインラインポリシーの作成で許可してみましょう。
設定画面の【JSON】タブに切り替え、以下のJSONデータを入力します。
Resourceには自分のアカウントIDを設定してください。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ssm:GetParameter",
"ssm:GetParameters",
"ssm:GetParametersByPath"
],
"Resource": "arn:aws:ssm:ap-northeast-1:自分のアカウントID:parameter/sample-func-param"
}
]
}
次はLambda関数から呼び出します。
SDKで取得する方法
SDKの場合はboto3を使って以下のように呼び出すことができます。
import json
import boto3
ssm = boto3.client("ssm") # クライアント初期化
PARAM_NAME = "sample-func-param" # ParameterStoreのパラメータ名
def lambda_handler(event, context):
# SDKでParameterStoreから値
# WithDecryption=Trueにすると暗号化されているパラメータを複合化して取得できる
res = ssm.get_parameter(Name=PARAM_NAME, WithDecryption=True)
param = res['Parameter']['Value']
print(param)
for record in event["Records"]:
body = record["body"]
print("Received message:", body)
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
上記を設定して、【Deploy】した後に、上記で行ったSQSからメッセージを投げて、Lambda関数を起動してみましょう。
実行後、CloudWatchでログを確認します。
ParameterStoreで設定した値が取得できていればOK。
Extensionsで取得する方法(+カスタムレイヤーの作り方)
詳しいこと設定は以下の記事がとても参考になります。
AWS Parameters and Secrets Lambda Extensionを使用してみた
ではまずLambdaからAWSレイヤーを追加します。設定は以下を選択します。
次に実際取得するコードを書きます。
import os
import json
import requests
SECRETS_EXTENSION_PORT = os.environ.get('PARAMETERS_SECRETS_EXTENSION_HTTP_PORT')
end_point = f'http://localhost:{SECRETS_EXTENSION_PORT}'
PARAM_NAME = "sample-func-param" # ParameterStoreのパラメータ名
path = f'/systemsmanager/parameters/get/?name={PARAM_NAME}'
def lambda_handler(event, context):
# SDKでParameterStoreから値
url = '{}{}'.format(end_point, path)
headers = {
"content-type": "application/json",
'X-Aws-Parameters-Secrets-Token': os.environ.get('AWS_SESSION_TOKEN')
}
res = requests.get(url, headers=headers)
res.encoding = 'utf-8'
param = res.json()['Parameter']['Value']
print(param)
for record in event["Records"]:
body = record["body"]
print("Received message:", body)
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
そして、コード内では、requests
の外部ライブラリを使用しています。
Lambdaではインストールされていないため、外部ライブラリをパッケージ化して、カスタムレイヤーとして追加する必要があります。
パッケージ化は割愛させていただきます。下記の記事をご参考ください。
【AWS】LambdaにrequestsライブラリをLayerで追加する
DyanamoDBの作成、操作
DyanamoDBからテーブルの作成を選択して、テーブル名とパーティションキー、ソートキーをして作成します。
作成後↓
次は、Lambda関数でテーブルに対してデータ投入をします。
Lambda関数のコードを書き換えましょう。
import os
import json
import uuid
import boto3
from datetime import datetime
import requests
SECRETS_EXTENSION_PORT = os.environ.get('PARAMETERS_SECRETS_EXTENSION_HTTP_PORT')
end_point = f'http://localhost:{SECRETS_EXTENSION_PORT}'
PARAM_NAME = "sample-func-param" # ParameterStoreのパラメータ名
path = f'/systemsmanager/parameters/get/?name={PARAM_NAME}'
# DynamoDBクライアント初期化
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('sample')
def lambda_handler(event, context):
# SDKでParameterStoreから値
url = '{}{}'.format(end_point, path)
headers = {
"content-type": "application/json",
'X-Aws-Parameters-Secrets-Token': os.environ.get('AWS_SESSION_TOKEN')
}
res = requests.get(url, headers=headers)
res.encoding = 'utf-8'
param = res.json()['Parameter']['Value']
print(param)
for record in event["Records"]:
body = record["body"]
print("Received message:", body)
# DynamoDBにデータ投入
id = str(uuid.uuid4()) # サロゲートキー
dt = datetime.now().strftime('%Y/%m/%d %H:%M:%S')
name = 'Test User'
message = 'Hello from Lambda!'
# データ挿入
res = table.put_item(
Item = {
'id': id,
'dt': dt,
'name': name,
'message': message
}
)
print(f'DB登録結果: {res['ResponseMetadata']}')
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
忘れずに【depoly】し、そして、IAMロールも忘れずにします。
今回はポリシーのアタッチで下記を追加します。
後はいつも通り、SQSからメッセージを送信して、ログを確認します。
直接DynamoDBの値も確認します。
追加されていました。(何回か実行したので複数データあります)
まとめ
最近AWSサービスをよく使うけど、この設定どうするんだっけ?となることがあるので、備忘録としてまとめました。特にIAMロールの設定忘れとか初めのころはよくしてました(笑)
他にもVPC、RDS、ECSなど色んなサービスも使うので、まとめようと思っています。