LoginSignup
15
11

More than 3 years have passed since last update.

s3 → sqs → lambda 連携をおこなってみる

Posted at

このページについて

s3 にファイルが置かれたイベントをsqsに書き込み、その情報をlambdaで購読、
ファイルを読み込むまでが書いてあります(2019/05時点)

検証目的なので、適切なロールについては割愛します

開発環境

$ go version
go version go1.12.5 darwin/amd64

手順

S3でバケット作成

よしなに設定する

S3でのイベント設定

バケットのプロパティタブの下部から設定
s3が発行するeventはputを選択しておけば大丈夫です

スクリーンショット 2019-05-17 16.30.44.png

しかし。送信先のSQSを設定していないのでSQSの作成をします

SQSの設定

今回はlambdaと連携させるのでスタンダードキューを使います(FIFOはlambdaと連携できません)
queue作成後、2つやることがあります

  1. queueへのパーミッション設定

スクリーンショット 2019-05-17 11.56.20.png

  1. lambdaをトリガとして設定する

前回の記事で作ったlambdaを指定しています

スクリーンショット 2019-05-17 11.56.31.png

ここまで設定したら、S3のイベント設定で通知先にSQSを選択して
作ったSQSのキュー名を指定しましょう

lambdaで受け取ってみる

これでs3にファイルをアップしたときにsqsに通知が行われ、lambdaが動くようになりました

実際にlambdaでメッセージを受け取ってみようと思います

lambdaで受け取れるイベントの構造体はこちらから参照できます
今回は、SQSのイベントを拾うのでSQSのイベント定義を使います
そして、その中にS3で発生したイベント情報が入ってるのですがどこに入っているかというと
構造体のBodyプロパティにstringで埋め込まれています
なのでS3のイベント定義にdecodeしてあげればいいでしょう
こんな感じでcloudwatchのログに表示されるようにしました

hello.go
package main

import (
    "context"
    "encoding/json"
    "fmt"

    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-lambda-go/lambda"
)

var (
    appName  = ""
    version  = ""
    revision = ""
)

func HandleRequest(ctx context.Context, evt events.SQSEvent) (string, error) {
    s := fmt.Sprintf("%s-%s-%s, Hello", appName, version, revision)
    fmt.Printf("sqsevent **** %#v\n", s)
    s3 := events.S3Event{}
    for _, item := range evt.Records {
        if err := json.Unmarshal([]byte(item.Body), &s3); err != nil {
            fmt.Printf("***error*** %#v\n", err)
            return s, nil
        }
    }
    fmt.Printf("s3 *** %#v\n", s3.Records[0])

    return s, nil
}

func main() {
    lambda.Start(HandleRequest)
}

これでS3にファイルアップロードをしたら、S3の情報がlambda内で取得できるはずです

lambdaからアップロードしたS3の中身を取得する

アップロードされたS3の情報はわかったので、今度は中身を取得してみます
こちらのサンプル前回記事を参考にlambdaを書き換えました

アップロードされたcsvの中身を出力するサンプルです

hello.go
package main

import (
    "context"
    "encoding/csv"
    "encoding/json"
    "fmt"
    "io"

    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/awserr"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/s3"
)

var (
    appName  = ""
    version  = ""
    revision = ""
)

func HandleRequest(ctx context.Context, evt events.SQSEvent) (string, error) {
    s := fmt.Sprintf("%s-%s-%s, Hello", appName, version, revision)
    fmt.Printf("sqsevent **** %#v\n", s)
    s3Evt := events.S3Event{}
    for _, item := range evt.Records {
        if err := json.Unmarshal([]byte(item.Body), &s3Evt); err != nil {
            fmt.Printf("***error*** %#v\n", err)
            return s, nil
        }
    }
    fmt.Printf("s3 *** %#v\n", s3Evt.Records[0])

    sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("ap-northeast-1")))
    svc := s3.New(sess)

    for _, item := range s3Evt.Records {
        // イベントの内容を使って、bucketとkeyを設定
        input := &s3.GetObjectInput{
            Bucket: &item.S3.Bucket.Name,
            Key:    &item.S3.Object.Key,
        }

        //
        // get file
        //
        result, err := svc.GetObject(input)
        if err != nil {
            if aerr, ok := err.(awserr.Error); ok {
                switch aerr.Code() {
                case s3.ErrCodeNoSuchKey:
                    fmt.Println(s3.ErrCodeNoSuchKey, aerr.Error())
                default:
                    fmt.Println(aerr.Error())
                }
            } else {
                // Print the error, cast err to awserr.Error to get the Code and
                // Message from an error.
                fmt.Println(err.Error())
            }
            return "", nil
        }

        fmt.Printf("get succeeded.obj key %v\n", item.S3.Object.Key)

        //
        // output csv
        //
        r := csv.NewReader(result.Body)
        for {
            record, err := r.Read()
            if err == io.EOF {
                break
            }
            if err != nil {
                fmt.Println(err)
            }

            fmt.Printf("csv row %v\n", record)
        }
    }

    return s, nil
}

func main() {
    lambda.Start(HandleRequest)
}

GetObjectで返ってくる構造体のGetObjectOutputのBodyがio.ReadCloserなのは大変使いやすいですね
api設計のときに参考にしたいと思います。特に内部でデータやり取りするときなど。

参考

15
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
15
11