search
LoginSignup
0
Help us understand the problem. What are the problem?

posted at

updated at

Organization

S3をトリガーにしてLambdaでSQSにメッセージを送信する(Python)

はじめに

ひとまず、S3に新しいファイルを保存した際にLambdaを起動したり、SQSにメッセージをキューイングしたりしたときの手順。初歩的な部分から書いています。ところどころぼかしあります。
SQSにキューイングして処理する部分についてはこちら

目標

図の通り、S3にcsvファイルが入ったらLambda経由からSQSにメッセージを投げる仕組みを作成する。
s3tosqs.jpg

1. SQSでキューを作成する

Amazon SQSでキューを作成

「キューを作成」を選択。
image.png

作成するキューのタイプを指定

今回は順番に処理してほしかったので、タイプで「FIFO」を選択した。
image.png
標準タイプだと、トランザクション(TPS)に制限はないが、順番が保証されなかったり、複数メッセージのコピーが送信されることがある。
FIFOタイプだと、トランザクションに制限はあるが、順番が保証され、重複がなくなる。
FIFOの場合は名称の末尾を.fifoにする。

詳細は以下を参照。
【新機能】Amazon SQSにFIFOが追加されました!
【AWS】楽々SQS解説〜5分で理解〜

標準タイプのキューを作成する際の注意点などはこちら参照。
Amazon SQSを使う前に知っておきたい基本的なこと

キューの設定

タイムアウト時間などを設定する。
メッセージ受信待機時間を20秒にしてロングポーリングにする。
Amazon SQS ロングポーリングを理解する
AWS SQSの性能を調査しました。 (ロングポーリングが良いよ & FIFOスループット注意 & 並行で受けよう)

FIFOの場合、「コンテンツに基づく重複排除」をONにしておく。
AmazonCloudWatchイベントがSQS FIFOに対応したようなのでただただツッコんでみただけの話

image.png

その他の設定項目はAWSのサイト参照。
キューパラメータの設定 (コンソール)

2. Lambda関数を作成し、トリガーにS3を設定する

Lambda関数を作成

関数名と使用する言語を指定(今回はPython)して、関数を作成する。
image.png

S3をLambda起動のトリガーにする

トリガーを追加
image.png
トリガーのサービスとしてS3を選択
image.png
バケット、監視するイベントタイプを指定する。
プレフィックスで監視したいファイルパス(キー)の先頭文字(フォルダパスなど)、サフィックスで監視したいファイルパスの末尾(拡張子など)を指定する。
s3setting.png
このようにつながればOK。
image.png

3. LambdaにSQSにへのアクセス許可を付与する。

SQSにアクセスを許可するポリシーを作成する

IAMから、ポリシーの作成を選択(IAM権限が必要)
image.png

サービス(SQS)と、許可するアクションを指定する。
image.png

リソースは、「ARNの追加」から作成したキューを指定する。
image.png

「ARNの指定」の欄に、キューの詳細画面からARNの値をコピーする。あとは名称、タグなどを設定してポリシーを作成。
image.png

Lambdaの実行ロールに作成したポリシーを付与する

作成したLambdaの「アクセス権限」から、実行ロールを選択する。
lambda_role.png

「アクセス許可を追加」→「ポリシーをアタッチ」で、作成したポリシーを選択し、アタッチする。
image.png

4. Lambda関数からSQSにメッセージを送信

環境変数を設定

「設定」→「環境変数」で設定可能。
キー:「SQS」、値:送信先のキューの名前 で設定。
image.png

SQSを呼び出すコードを作成

Lambdaが起動すると、lambda_function.pyの関数lambda_handlerが呼び出される。
引数として、eventcontextのオブジェクトがトリガーのイベントから投げられる。
eventはJSON形式で、トリガーのS3の情報などが入っている。

今回は、S3のbucket名と、新規ファイルのパス(キー)をSQSに送信する。

lambda_function.py

import json
import urllib3
import os
import boto3

def lambda_handler(event, context):

    # eventオブジェクトからトリガーのbucket_nameを取得
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    # eventオブジェクトからトリガーになった新規ファイルのkey(ファイルパス)を取得
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    
    # boto3でSQSへアクセスするオブジェクトを取得
    sqs = boto3.resource('sqs')

    # メッセージを投げるキューの名前を環境変数から取得
    name = os.environ['SQS']
    try:
       # キューの名前を指定してインスタンスを取得
       queue = sqs.get_queue_by_name(QueueName=name)
    # キューがない場合はexceptionを返却
    except:
       return {
          'statusCode': 500,
          'body': json.dumps('SQS_No_Exist')
       }

    # キューに送信するメッセージをdict形式で作成
    queue_msg = {
        "bucket_name": bucket_name,
        "key": key,
    }
    
    id = "0001" # MessageGroupIdを指定(指定しないとエラーになる)
    
    # メッセージをキューに送信
    response = queue.send_message(MessageBody = json.dumps(queue_msg), MessageGroupId = id)
    print(response)
   
    # 送信完了のメッセージをログに返す
    return {
        'statusCode': 200,
        'body': json.dumps('Request is sent.')
    }

eventオブジェクトの例

eventオブジェクトの構造は以下のようになっている。
(Lambdaのテスト画面から確認可能)

{
  "Records": [
    {
      "eventVersion": "2.0",
      "eventSource": "aws:s3",
      "awsRegion": "us-east-1",
      "eventTime": "1970-01-01T00:00:00.000Z",
      "eventName": "ObjectCreated:Put",
      "userIdentity": {
        "principalId": "EXAMPLE"
      },
      "requestParameters": {
        "sourceIPAddress": "127.0.0.1"
      },
      "responseElements": {
        "x-amz-request-id": "EXAMPLE123456789",
        "x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH"
      },
      "s3": {
        "s3SchemaVersion": "1.0",
        "configurationId": "testConfigRule",
        "bucket": {
          "name": "example-bucket",
          "ownerIdentity": {
            "principalId": "EXAMPLE"
          },
          "arn": "arn:aws:s3:::example-bucket"
        },
        "object": {
          "key": "test%2Fkey",
          "size": 1024,
          "eTag": "0123456789abcdef0123456789abcdef",
          "sequencer": "0A1B2C3D4E5F678901"
        }
      }
    }
  ]
}

テスト

テストイベントを作成する。
image.png

テスト内容の詳細を設定する。テンプレートを「s3-put」にする。
image.png

テストを選択し、「Test」ボタンでテスト実行
image.png

「Execution result」からログを確認
image.png

作成したキューの「メッセージを送受信」から、メッセージの増加を確認可能
image.png
image.png

その他参考

JavaScriptの場合はこちらの記事が参考になります。

【 AWS 】S3 → Lambda → SQS にメッセージを投げるまでの構築手順!

Register as a new user and use Qiita more conveniently

  1. You can follow users and tags
  2. you can stock useful information
  3. You can make editorial suggestions for articles
What you can do with signing up
0
Help us understand the problem. What are the problem?