Edited at

SAM CLI + Golang でlambda s3 to s3を実現する

このエントリはただの集団 Advent Calendar 2018の16日目の記事です。


概要

S3にgzipがputされたのを感知し、lambdaで編集&gzip化、S3にuploadする処理です。

AWS Design.png

環境


  • Golang 1.11.2

  • SAN CLI 0.6.2

  • Docker for Mac

  • LocalStack

  • Goland

  • MacOS


開発


1. 環境構築

まずは開発環境を用意します。下記を実行するとsampleが生成されます。

$ brew tap aws/tap

$ brew install aws-sam-cli
$ sam init --runtime go

その他の使い方は下記のリンクを見ていただくと良いと思います

https://github.com/awslabs/aws-sam-cli/blob/develop/docs/usage.rst#generate-sample-event-payloads


2. パッケージ構成

ざっくりとしたパッケージ構成とその説明です。

.

├── src
│ ├── main.go <-- Lambda function code
│ └── main_test.go <-- Unit tests
├── template
│ ├── local.yaml <-- sam cliの定義ファイル(local)
│ └── staging.yaml <-- sam cliの定義ファイル(staging)
├── testdata
│ └── example.json.gz <-- test data
├── docker-compose.yaml <-- localでS3を再現するためのlocalstack
├── event_file.json <-- localでevent起動するときのrequestファイル(sam local generate-event s3 put で生成し、編集)
├── Makefile <-- command実行ファイル
├── packaged.yaml <-- sam package時生成される。deploy時に必要
└── README.md


3. 実行ファイル

開発はTDDで行いましたので、まずはtestの解説を...

TestMainで前処理としてbucketの作成、及び初期データを片方のbucketに突っ込んでいます。

その後は関数ごとにtestを書いています。

testの実行方法はcd src go testです(事前にenv TMPDIR=/private$TMPDIR docker-compose up -dでdockerを起動しておいてください)


main_test.go

package main

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"io/ioutil"
"os"
"testing"
"time"
)

func TestMain(m *testing.M) {
// test前処理
println("before all...")

os.Setenv("REGION", "ap-northeast-1")
os.Setenv("S3_ENDPOINT", "http://localhost:4572")
os.Setenv("TARGET_S3", "bucket-example-convert")

var sess = session.Must(session.NewSession(&aws.Config{
S3ForcePathStyle: aws.Bool(true),
Region: aws.String(os.Getenv("REGION")),
Endpoint: aws.String(os.Getenv("S3_ENDPOINT")),
}))
var creater = s3.New(sess)
var uploader = s3manager.NewUploader(sess)

_, err := creater.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String("bucket-example"),
})

if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeBucketAlreadyExists:
fmt.Println(s3.ErrCodeBucketAlreadyExists, aerr.Error())
case s3.ErrCodeBucketAlreadyOwnedByYou:
fmt.Println(s3.ErrCodeBucketAlreadyOwnedByYou, aerr.Error())
default:
fmt.Println(aerr.Error())
}
} else {
fmt.Println(err.Error())
}
}
_, err = creater.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String("bucket-example-convert"),
})

if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeBucketAlreadyExists:
fmt.Println(s3.ErrCodeBucketAlreadyExists, aerr.Error())
case s3.ErrCodeBucketAlreadyOwnedByYou:
fmt.Println(s3.ErrCodeBucketAlreadyOwnedByYou, aerr.Error())
default:
fmt.Println(aerr.Error())
}
} else {
fmt.Println(err.Error())
}
}

up, err := os.Open("./testdata/example.json.gz")
if err != nil {
fmt.Println("failed to open file")
return
}

gzip.NewWriter(up).Flush()

_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String("bucket-example"),
Key: aws.String("example.json.gz"),
Body: up,
})
if err != nil {
fmt.Println("failed to upload file")
return
}

// test実行
code := m.Run()
// test後実行
println("after all...")
os.Exit(code)
}

func TestS3Upload(t *testing.T) {
t.Run("upload", func(t *testing.T) {
var buf bytes.Buffer
result, err := s3Upload(buf)
if err != nil {
t.Fatal("Error failed to s3upload")
}
if result.Location == "" {
t.Errorf("got: %v\nwant: %v", result.UploadID, "")
}
fmt.Println("Test s3upload...")
})
}

func TestCompress(t *testing.T) {
t.Run("compress", func(t *testing.T) {
data := []SampleConvertData{
{12345678, "abcdefgh", time.Now().String()},
{23456781, "bcdefgha", time.Now().String()}}

var buf bytes.Buffer
err := compress(&buf, data)
if err != nil {
t.Fatal("Error failed to compress")
}
if len(buf.Bytes()) == 0 {
t.Fatal("Error failed to compress")
t.Errorf("got: %v\nwant: %v", buf.Bytes(), 0)
}
fmt.Println("Test compress...")
})
}

func TestConvert(t *testing.T) {
t.Run("convert", func(t *testing.T) {
data := []SampleData{{12345678, "abcdefgh"}}
expected := time.Now().String()

convertData, err := convert(data, expected)
if err != nil {
t.Fatal("Error failed to convert")
}
if convertData[0].Time != expected {
t.Errorf("got: %v\nwant: %v", convertData[0].Time, expected)
}
fmt.Println("Test convert...")
})
}

func TestExtract(t *testing.T) {
t.Run("extract", func(t *testing.T) {
file, _ := os.Open("./testdata/example.json.gz")
defer file.Close()
actual, err := extract(file)
if err != nil {
t.Fatal("Error failed to extract")
}
expected := "abcdefgh"
if actual[0].Value != expected {
t.Errorf("got: %v\nwant: %v", actual[0].Value, expected)
}
fmt.Println("Test extract...")
})
}

func TestS3Download(t *testing.T) {
t.Run("s3 download test", func(t *testing.T) {
tmpFile, err := s3Download("bucket-example", "example.json.gz")
if err != nil {
t.Fatal("Error failed to s3 download")
}
if tmpFile.Name() == "" {
t.Errorf("got: %v\nwant: %v", "", "/tmp/srctmp_*********")
}
fmt.Println("Test s3Download...")
})
}

func TestHandler(t *testing.T) {
t.Run("handler input test", func(t *testing.T) {
raw, err := ioutil.ReadFile("../event_file.json")
if err != nil {
t.Fatal("Error failed to event file load")
}
var event events.S3Event
json.Unmarshal(raw, &event)
err = handler(context.Background(), event)
if err != nil {
t.Fatal("Error failed to s3 event")
}
fmt.Println("Test handler...")
})
}


mainです。

苦労したところはgolang特有tempFileを用意してのdownload処理、gzipへの圧縮処理です。

参考資料(gzip): http://text.baldanders.info/golang/gzip-operation/


main.go

package main

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/pkg/errors"
"io"
"io/ioutil"
"os"
"time"
)

type SampleData struct {
Id int `json:"id"`
Value string `json:"value"`
}

type SampleConvertData struct {
Id int `json:"id"`
Value string `json:"value"`
Time string `json:"time"`
}

func createSession() *session.Session {
var sess = session.Must(session.NewSession(&aws.Config{
S3ForcePathStyle: aws.Bool(true),
Region: aws.String(os.Getenv("REGION")),
Endpoint: aws.String(os.Getenv("S3_ENDPOINT")),
}))
return sess
}

func s3Upload(buf bytes.Buffer) (*s3manager.UploadOutput, error) {
sess := createSession()

var uploader = s3manager.NewUploader(sess)

result, err := uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(os.Getenv("TARGET_S3")),
Key: aws.String("example-convert.json.gz"),
Body: bytes.NewReader(buf.Bytes()),
})
if err != nil {
return nil, errors.Wrap(err, "failed to upload file")
}

return result, err
}

func compress(w io.Writer, convertData []SampleConvertData) error {
b, _ := json.Marshal(convertData)
gw, err := gzip.NewWriterLevel(w, gzip.BestCompression)
gw.Write(b)
defer gw.Close()
return err
}

func convert(data []SampleData, time string) ([]SampleConvertData, error) {
var dataConvert []SampleConvertData
for _, d := range data {
dataConvert = append(dataConvert, SampleConvertData{
Id: d.Id,
Value: d.Value,
Time: time,
})
}
return dataConvert, nil
}

func extract(file *os.File) ([]SampleData, error) {
gzipReader, _ := gzip.NewReader(file)
defer gzipReader.Close()

raw, err := ioutil.ReadAll(gzipReader)
if err != nil {
fmt.Println(err.Error())
}

var data []SampleData
err = json.Unmarshal(raw, &data)
if err != nil {
fmt.Println(err.Error())
}

return data, err
}

func s3Download(bucket string, key string) (f *os.File, err error) {
sess := createSession()

tmpFile, _ := ioutil.TempFile("/tmp", "srctmp_")
defer os.Remove(tmpFile.Name())

var downloader = s3manager.NewDownloader(sess)

_, err = downloader.Download(
tmpFile,
&s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
return nil, errors.Wrap(err, "file download error")
}

return tmpFile, err
}

func handler(ctx context.Context, req events.S3Event) error {
bucketName := req.Records[0].S3.Bucket.Name
key := req.Records[0].S3.Object.Key
file, err := s3Download(bucketName, key)
if err != nil {
return errors.Wrap(err, "Error failed to s3 download")
}
data, err := extract(file)
if err != nil {
return errors.Wrap(err, "Error failed to extract")
}
timeNow := time.Now().String()
convertData, err := convert(data, timeNow)
if err != nil {
return errors.Wrap(err, "Error failed to convert")
}
var buf bytes.Buffer
err = compress(&buf, convertData)
if err != nil {
return errors.Wrap(err, "Error failed compress")
}
_, err = s3Upload(buf)
if err != nil {
return errors.Wrap(err, "Error failed to s3 upload")
}

return nil
}

func main() {
lambda.Start(handler)
}



統合テスト,ビルド,デプロイ


1. template.yaml

SAMのYAML定義になります。

ローカルでのテスト用に、local.yamlでは環境変数を設定しています。

AWS上で動く用のstaging.yamlではPoliciesを定義すると、Roleが自動生成されて、lambdaに反映されます。


staging.yaml

AWSTemplateFormatVersion: '2010-09-09'

Transform: AWS::Serverless-2016-10-31
Description: >
sam-app

Sample SAM Template for sam-app

# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
Function:
Timeout: 5

Resources:
MainFunction:
Type: AWS::Serverless::Function
Properties:
Handler: main
Runtime: go1.x
CodeUri: ../src
FunctionName: main
Description: >-
An Amazon S3 trigger that retrieves metadata for the object that has
been updated.
MemorySize: 128
Policies:
- Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- 's3:GetObject'
- 's3:PutObject'
Resource: "arn:aws:s3:::bucket-example-*"
Events:
S3Event:
Type: S3
Properties:
Bucket: !Ref TestBucket
Events:
- 's3:ObjectCreated:Put'
Environment:
Variables:
TARGET_S3: "bucket-example-convert-staging"

TestBucket:
Type: 'AWS::S3::Bucket'
Properties:
BucketName: "bucket-example-staging"
TestBucketConvert:
Type: 'AWS::S3::Bucket'
Properties:
BucketName: "bucket-example-convert-staging"



2. 統合テスト

Makefileのintegration-testingで統合テストをしています。

コマンドはmake integration-testingです。

ここではdockerを一度落として起動し、その後bucketを作成し、sample dataをPUTした後にsam cliのイベントが動くようになっています。

PROJECT_NAME:= "localstack-example"

.PHONY: deps clean build integration-testing deploy

deps:
go get -u ./...

clean:
rm -rf ./src/main

build:
GOOS=linux GOARCH=amd64 go build -o src/main ./src

integration-testing: build
docker-compose -p $(PROJECT_NAME) down
env TMPDIR=/private$TMPDIR docker-compose -p $(PROJECT_NAME) up -d
sleep 5s
aws --endpoint-url=http://localhost:4572 s3 mb s3://bucket-example
aws --endpoint-url=http://localhost:4572 s3 mb s3://bucket-example-convert
aws --endpoint-url=http://localhost:4572 s3 cp ./testdata/example.json.gz s3://bucket-example/example.json.gz
sam local invoke MainFunction --event event_file.json --template ./template/local.yaml \
--docker-network $$(docker network ls -q -f name=$(PROJECT_NAME))

deploy: build
sam package --template-file ./template/staging.yaml --s3-bucket package-bucket-example --output-template-file packaged.yaml
sam deploy --template-file packaged.yaml --stack-name sam-cli-example --capabilities CAPABILITY_IAM


3. ビルド

Makefileのbuildでビルドをしています。

コマンドはmake buildです。


3. デプロイ

Makefileのdeployでビルドをしています。

コマンドはmake deployです。

ここではソースをS3にあげて(S3は事前にbucketを作成)、その後AWSにデプロイをしています。


参考


まとめ

SAM CLI + Golang でlambda s3 to s3を実現してみました。

gzip圧縮したりなど、なるべく本番で使う想定で実装してみました。

本当はdeploy周りはCodeBuildを使いたかったのですが、時間が足らず...あとAWS Toolkits for IntelliJもやりたかった。

次回にしたいと思います。

今回使ったプロジェクトは下記にありますので、ご参照ください。

https://github.com/yoshihir/samcli-s3-to-s3-example