このページについて
s3 にファイルが置かれたイベントをsqsに書き込み、その情報をlambdaで購読、
ファイルを読み込むまでが書いてあります(2019/05時点)
検証目的なので、適切なロールについては割愛します
開発環境
$ go version
go version go1.12.5 darwin/amd64
手順
S3でバケット作成
よしなに設定する
S3でのイベント設定
バケットのプロパティタブの下部から設定
s3が発行するeventはputを選択しておけば大丈夫です

しかし。送信先のSQSを設定していないのでSQSの作成をします
SQSの設定
今回はlambdaと連携させるのでスタンダードキューを使います(FIFOはlambdaと連携できません)
queue作成後、2つやることがあります
- queueへのパーミッション設定

- lambdaをトリガとして設定する
前回の記事で作ったlambdaを指定しています

ここまで設定したら、S3のイベント設定で通知先にSQSを選択して
作ったSQSのキュー名を指定しましょう
lambdaで受け取ってみる
これでs3にファイルをアップしたときにsqsに通知が行われ、lambdaが動くようになりました
実際にlambdaでメッセージを受け取ってみようと思います
lambdaで受け取れるイベントの構造体はこちらから参照できます
今回は、SQSのイベントを拾うのでSQSのイベント定義を使います
そして、その中にS3で発生したイベント情報が入ってるのですがどこに入っているかというと
構造体のBodyプロパティにstringで埋め込まれています
なのでS3のイベント定義にdecodeしてあげればいいでしょう
こんな感じでcloudwatchのログに表示されるようにしました
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
)
var (
appName = ""
version = ""
revision = ""
)
func HandleRequest(ctx context.Context, evt events.SQSEvent) (string, error) {
s := fmt.Sprintf("%s-%s-%s, Hello", appName, version, revision)
fmt.Printf("sqsevent **** %#v\n", s)
s3 := events.S3Event{}
for _, item := range evt.Records {
if err := json.Unmarshal([]byte(item.Body), &s3); err != nil {
fmt.Printf("***error*** %#v\n", err)
return s, nil
}
}
fmt.Printf("s3 *** %#v\n", s3.Records[0])
return s, nil
}
func main() {
lambda.Start(HandleRequest)
}
これでS3にファイルアップロードをしたら、S3の情報がlambda内で取得できるはずです
lambdaからアップロードしたS3の中身を取得する
アップロードされたS3の情報はわかったので、今度は中身を取得してみます
こちらのサンプルと前回記事を参考にlambdaを書き換えました
アップロードされたcsvの中身を出力するサンプルです
package main
import (
"context"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
var (
appName = ""
version = ""
revision = ""
)
func HandleRequest(ctx context.Context, evt events.SQSEvent) (string, error) {
s := fmt.Sprintf("%s-%s-%s, Hello", appName, version, revision)
fmt.Printf("sqsevent **** %#v\n", s)
s3Evt := events.S3Event{}
for _, item := range evt.Records {
if err := json.Unmarshal([]byte(item.Body), &s3Evt); err != nil {
fmt.Printf("***error*** %#v\n", err)
return s, nil
}
}
fmt.Printf("s3 *** %#v\n", s3Evt.Records[0])
sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("ap-northeast-1")))
svc := s3.New(sess)
for _, item := range s3Evt.Records {
// イベントの内容を使って、bucketとkeyを設定
input := &s3.GetObjectInput{
Bucket: &item.S3.Bucket.Name,
Key: &item.S3.Object.Key,
}
//
// get file
//
result, err := svc.GetObject(input)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeNoSuchKey:
fmt.Println(s3.ErrCodeNoSuchKey, aerr.Error())
default:
fmt.Println(aerr.Error())
}
} else {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
fmt.Println(err.Error())
}
return "", nil
}
fmt.Printf("get succeeded.obj key %v\n", item.S3.Object.Key)
//
// output csv
//
r := csv.NewReader(result.Body)
for {
record, err := r.Read()
if err == io.EOF {
break
}
if err != nil {
fmt.Println(err)
}
fmt.Printf("csv row %v\n", record)
}
}
return s, nil
}
func main() {
lambda.Start(HandleRequest)
}
GetObjectで返ってくる構造体のGetObjectOutputのBodyがio.ReadCloserなのは大変使いやすいですね
api設計のときに参考にしたいと思います。特に内部でデータやり取りするときなど。