はじめに
ひとまず、S3に新しいファイルを保存した際にLambdaを起動したり、SQSにメッセージをキューイングしたりしたときの手順。初歩的な部分から書いています。ところどころぼかしあります。
SQSにキューイングして処理する部分についてはこちら。
目標
図の通り、S3にcsvファイルが入ったらLambda経由からSQSにメッセージを投げる仕組みを作成する。
1. SQSでキューを作成する
Amazon SQSでキューを作成
作成するキューのタイプを指定
今回は順番に処理してほしかったので、タイプで「FIFO」を選択した。
標準タイプだと、トランザクション(TPS)に制限はないが、順番が保証されなかったり、複数メッセージのコピーが送信されることがある。
FIFOタイプだと、トランザクションに制限はあるが、順番が保証され、重複がなくなる。
FIFOの場合は名称の末尾を.fifo
にする。
詳細は以下を参照。
【新機能】Amazon SQSにFIFOが追加されました!
【AWS】楽々SQS解説〜5分で理解〜
標準タイプのキューを作成する際の注意点などはこちら参照。
Amazon SQSを使う前に知っておきたい基本的なこと
キューの設定
タイムアウト時間などを設定する。
メッセージ受信待機時間を20秒にしてロングポーリングにする。
Amazon SQS ロングポーリングを理解する
AWS SQSの性能を調査しました。 (ロングポーリングが良いよ & FIFOスループット注意 & 並行で受けよう)
FIFOの場合、「コンテンツに基づく重複排除」をONにしておく。
AmazonCloudWatchイベントがSQS FIFOに対応したようなのでただただツッコんでみただけの話
その他の設定項目はAWSのサイト参照。
キューパラメータの設定 (コンソール)
2. Lambda関数を作成し、トリガーにS3を設定する
Lambda関数を作成
関数名と使用する言語を指定(今回はPython)して、関数を作成する。
S3をLambda起動のトリガーにする
トリガーを追加
トリガーのサービスとしてS3を選択
バケット、監視するイベントタイプを指定する。
プレフィックスで監視したいファイルパス(キー)の先頭文字(フォルダパスなど)、サフィックスで監視したいファイルパスの末尾(拡張子など)を指定する。
このようにつながればOK。
3. LambdaにSQSにへのアクセス許可を付与する。
SQSにアクセスを許可するポリシーを作成する
「ARNの指定」の欄に、キューの詳細画面からARNの値をコピーする。あとは名称、タグなどを設定してポリシーを作成。
Lambdaの実行ロールに作成したポリシーを付与する
作成したLambdaの「アクセス権限」から、実行ロールを選択する。
「アクセス許可を追加」→「ポリシーをアタッチ」で、作成したポリシーを選択し、アタッチする。
4. Lambda関数からSQSにメッセージを送信
環境変数を設定
「設定」→「環境変数」で設定可能。
キー:「SQS」、値:送信先のキューの名前 で設定。
SQSを呼び出すコードを作成
Lambdaが起動すると、lambda_function.py
の関数lambda_handler
が呼び出される。
引数として、event
、context
のオブジェクトがトリガーのイベントから投げられる。
event
はJSON形式で、トリガーのS3の情報などが入っている。
今回は、S3のbucket名と、新規ファイルのパス(キー)をSQSに送信する。
import json
import urllib3
import os
import boto3
def lambda_handler(event, context):
# eventオブジェクトからトリガーのbucket_nameを取得
bucket_name = event['Records'][0]['s3']['bucket']['name']
# eventオブジェクトからトリガーになった新規ファイルのkey(ファイルパス)を取得
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
# boto3でSQSへアクセスするオブジェクトを取得
sqs = boto3.resource('sqs')
# メッセージを投げるキューの名前を環境変数から取得
name = os.environ['SQS']
try:
# キューの名前を指定してインスタンスを取得
queue = sqs.get_queue_by_name(QueueName=name)
# キューがない場合はexceptionを返却
except:
return {
'statusCode': 500,
'body': json.dumps('SQS_No_Exist')
}
# キューに送信するメッセージをdict形式で作成
queue_msg = {
"bucket_name": bucket_name,
"key": key,
}
id = "0001" # MessageGroupIdを指定(指定しないとエラーになる)
# メッセージをキューに送信
response = queue.send_message(MessageBody = json.dumps(queue_msg), MessageGroupId = id)
print(response)
# 送信完了のメッセージをログに返す
return {
'statusCode': 200,
'body': json.dumps('Request is sent.')
}
eventオブジェクトの例
event
オブジェクトの構造は以下のようになっている。
(Lambdaのテスト画面から確認可能)
{
"Records": [
{
"eventVersion": "2.0",
"eventSource": "aws:s3",
"awsRegion": "us-east-1",
"eventTime": "1970-01-01T00:00:00.000Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "EXAMPLE"
},
"requestParameters": {
"sourceIPAddress": "127.0.0.1"
},
"responseElements": {
"x-amz-request-id": "EXAMPLE123456789",
"x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "testConfigRule",
"bucket": {
"name": "example-bucket",
"ownerIdentity": {
"principalId": "EXAMPLE"
},
"arn": "arn:aws:s3:::example-bucket"
},
"object": {
"key": "test%2Fkey",
"size": 1024,
"eTag": "0123456789abcdef0123456789abcdef",
"sequencer": "0A1B2C3D4E5F678901"
}
}
}
]
}
テスト
テスト内容の詳細を設定する。テンプレートを「s3-put」にする。
作成したキューの「メッセージを送受信」から、メッセージの増加を確認可能
その他参考
JavaScriptの場合はこちらの記事が参考になります。