このエントリはただの集団 Advent Calendar 2018の16日目の記事です。
#概要
S3にgzipがputされたのを感知し、lambdaで編集&gzip化、S3にuploadする処理です。
環境
- Golang 1.11.2
- SAN CLI 0.6.2
- Docker for Mac
- LocalStack
- Goland
- MacOS
#開発
##1. 環境構築
まずは開発環境を用意します。下記を実行するとsampleが生成されます。
$ brew tap aws/tap
$ brew install aws-sam-cli
$ sam init --runtime go
その他の使い方は下記のリンクを見ていただくと良いと思います
https://github.com/awslabs/aws-sam-cli/blob/develop/docs/usage.rst#generate-sample-event-payloads
##2. パッケージ構成
ざっくりとしたパッケージ構成とその説明です。
.
├── src
│ ├── main.go <-- Lambda function code
│ └── main_test.go <-- Unit tests
├── template
│ ├── local.yaml <-- sam cliの定義ファイル(local)
│ └── staging.yaml <-- sam cliの定義ファイル(staging)
├── testdata
│ └── example.json.gz <-- test data
├── docker-compose.yaml <-- localでS3を再現するためのlocalstack
├── event_file.json <-- localでevent起動するときのrequestファイル(sam local generate-event s3 put で生成し、編集)
├── Makefile <-- command実行ファイル
├── packaged.yaml <-- sam package時生成される。deploy時に必要
└── README.md
##3. 実行ファイル
開発はTDDで行いましたので、まずはtestの解説を...
TestMainで前処理としてbucketの作成、及び初期データを片方のbucketに突っ込んでいます。
その後は関数ごとにtestを書いています。
testの実行方法はcd src
go test
です(事前にenv TMPDIR=/private$TMPDIR docker-compose up -d
でdockerを起動しておいてください)
package main
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"io/ioutil"
"os"
"testing"
"time"
)
func TestMain(m *testing.M) {
// test前処理
println("before all...")
os.Setenv("REGION", "ap-northeast-1")
os.Setenv("S3_ENDPOINT", "http://localhost:4572")
os.Setenv("TARGET_S3", "bucket-example-convert")
var sess = session.Must(session.NewSession(&aws.Config{
S3ForcePathStyle: aws.Bool(true),
Region: aws.String(os.Getenv("REGION")),
Endpoint: aws.String(os.Getenv("S3_ENDPOINT")),
}))
var creater = s3.New(sess)
var uploader = s3manager.NewUploader(sess)
_, err := creater.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String("bucket-example"),
})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeBucketAlreadyExists:
fmt.Println(s3.ErrCodeBucketAlreadyExists, aerr.Error())
case s3.ErrCodeBucketAlreadyOwnedByYou:
fmt.Println(s3.ErrCodeBucketAlreadyOwnedByYou, aerr.Error())
default:
fmt.Println(aerr.Error())
}
} else {
fmt.Println(err.Error())
}
}
_, err = creater.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String("bucket-example-convert"),
})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeBucketAlreadyExists:
fmt.Println(s3.ErrCodeBucketAlreadyExists, aerr.Error())
case s3.ErrCodeBucketAlreadyOwnedByYou:
fmt.Println(s3.ErrCodeBucketAlreadyOwnedByYou, aerr.Error())
default:
fmt.Println(aerr.Error())
}
} else {
fmt.Println(err.Error())
}
}
up, err := os.Open("./testdata/example.json.gz")
if err != nil {
fmt.Println("failed to open file")
return
}
gzip.NewWriter(up).Flush()
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String("bucket-example"),
Key: aws.String("example.json.gz"),
Body: up,
})
if err != nil {
fmt.Println("failed to upload file")
return
}
// test実行
code := m.Run()
// test後実行
println("after all...")
os.Exit(code)
}
func TestS3Upload(t *testing.T) {
t.Run("upload", func(t *testing.T) {
var buf bytes.Buffer
result, err := s3Upload(buf)
if err != nil {
t.Fatal("Error failed to s3upload")
}
if result.Location == "" {
t.Errorf("got: %v\nwant: %v", result.UploadID, "")
}
fmt.Println("Test s3upload...")
})
}
func TestCompress(t *testing.T) {
t.Run("compress", func(t *testing.T) {
data := []SampleConvertData{
{12345678, "abcdefgh", time.Now().String()},
{23456781, "bcdefgha", time.Now().String()}}
var buf bytes.Buffer
err := compress(&buf, data)
if err != nil {
t.Fatal("Error failed to compress")
}
if len(buf.Bytes()) == 0 {
t.Fatal("Error failed to compress")
t.Errorf("got: %v\nwant: %v", buf.Bytes(), 0)
}
fmt.Println("Test compress...")
})
}
func TestConvert(t *testing.T) {
t.Run("convert", func(t *testing.T) {
data := []SampleData{{12345678, "abcdefgh"}}
expected := time.Now().String()
convertData, err := convert(data, expected)
if err != nil {
t.Fatal("Error failed to convert")
}
if convertData[0].Time != expected {
t.Errorf("got: %v\nwant: %v", convertData[0].Time, expected)
}
fmt.Println("Test convert...")
})
}
func TestExtract(t *testing.T) {
t.Run("extract", func(t *testing.T) {
file, _ := os.Open("./testdata/example.json.gz")
defer file.Close()
actual, err := extract(file)
if err != nil {
t.Fatal("Error failed to extract")
}
expected := "abcdefgh"
if actual[0].Value != expected {
t.Errorf("got: %v\nwant: %v", actual[0].Value, expected)
}
fmt.Println("Test extract...")
})
}
func TestS3Download(t *testing.T) {
t.Run("s3 download test", func(t *testing.T) {
tmpFile, err := s3Download("bucket-example", "example.json.gz")
if err != nil {
t.Fatal("Error failed to s3 download")
}
if tmpFile.Name() == "" {
t.Errorf("got: %v\nwant: %v", "", "/tmp/srctmp_*********")
}
fmt.Println("Test s3Download...")
})
}
func TestHandler(t *testing.T) {
t.Run("handler input test", func(t *testing.T) {
raw, err := ioutil.ReadFile("../event_file.json")
if err != nil {
t.Fatal("Error failed to event file load")
}
var event events.S3Event
json.Unmarshal(raw, &event)
err = handler(context.Background(), event)
if err != nil {
t.Fatal("Error failed to s3 event")
}
fmt.Println("Test handler...")
})
}
mainです。
苦労したところはgolang特有tempFileを用意してのdownload処理、gzipへの圧縮処理です。
参考資料(gzip): http://text.baldanders.info/golang/gzip-operation/
package main
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/pkg/errors"
"io"
"io/ioutil"
"os"
"time"
)
type SampleData struct {
Id int `json:"id"`
Value string `json:"value"`
}
type SampleConvertData struct {
Id int `json:"id"`
Value string `json:"value"`
Time string `json:"time"`
}
func createSession() *session.Session {
var sess = session.Must(session.NewSession(&aws.Config{
S3ForcePathStyle: aws.Bool(true),
Region: aws.String(os.Getenv("REGION")),
Endpoint: aws.String(os.Getenv("S3_ENDPOINT")),
}))
return sess
}
func s3Upload(buf bytes.Buffer) (*s3manager.UploadOutput, error) {
sess := createSession()
var uploader = s3manager.NewUploader(sess)
result, err := uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(os.Getenv("TARGET_S3")),
Key: aws.String("example-convert.json.gz"),
Body: bytes.NewReader(buf.Bytes()),
})
if err != nil {
return nil, errors.Wrap(err, "failed to upload file")
}
return result, err
}
func compress(w io.Writer, convertData []SampleConvertData) error {
b, _ := json.Marshal(convertData)
gw, err := gzip.NewWriterLevel(w, gzip.BestCompression)
gw.Write(b)
defer gw.Close()
return err
}
func convert(data []SampleData, time string) ([]SampleConvertData, error) {
var dataConvert []SampleConvertData
for _, d := range data {
dataConvert = append(dataConvert, SampleConvertData{
Id: d.Id,
Value: d.Value,
Time: time,
})
}
return dataConvert, nil
}
func extract(file *os.File) ([]SampleData, error) {
gzipReader, _ := gzip.NewReader(file)
defer gzipReader.Close()
raw, err := ioutil.ReadAll(gzipReader)
if err != nil {
fmt.Println(err.Error())
}
var data []SampleData
err = json.Unmarshal(raw, &data)
if err != nil {
fmt.Println(err.Error())
}
return data, err
}
func s3Download(bucket string, key string) (f *os.File, err error) {
sess := createSession()
tmpFile, _ := ioutil.TempFile("/tmp", "srctmp_")
defer os.Remove(tmpFile.Name())
var downloader = s3manager.NewDownloader(sess)
_, err = downloader.Download(
tmpFile,
&s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
return nil, errors.Wrap(err, "file download error")
}
return tmpFile, err
}
func handler(ctx context.Context, req events.S3Event) error {
bucketName := req.Records[0].S3.Bucket.Name
key := req.Records[0].S3.Object.Key
file, err := s3Download(bucketName, key)
if err != nil {
return errors.Wrap(err, "Error failed to s3 download")
}
data, err := extract(file)
if err != nil {
return errors.Wrap(err, "Error failed to extract")
}
timeNow := time.Now().String()
convertData, err := convert(data, timeNow)
if err != nil {
return errors.Wrap(err, "Error failed to convert")
}
var buf bytes.Buffer
err = compress(&buf, convertData)
if err != nil {
return errors.Wrap(err, "Error failed compress")
}
_, err = s3Upload(buf)
if err != nil {
return errors.Wrap(err, "Error failed to s3 upload")
}
return nil
}
func main() {
lambda.Start(handler)
}
#統合テスト,ビルド,デプロイ
##1. template.yaml
SAMのYAML定義になります。
ローカルでのテスト用に、local.yamlでは環境変数を設定しています。
AWS上で動く用のstaging.yamlではPoliciesを定義すると、Roleが自動生成されて、lambdaに反映されます。
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
sam-app
Sample SAM Template for sam-app
# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
Function:
Timeout: 5
Resources:
MainFunction:
Type: AWS::Serverless::Function
Properties:
Handler: main
Runtime: go1.x
CodeUri: ../src
FunctionName: main
Description: >-
An Amazon S3 trigger that retrieves metadata for the object that has
been updated.
MemorySize: 128
Policies:
- Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- 's3:GetObject'
- 's3:PutObject'
Resource: "arn:aws:s3:::bucket-example-*"
Events:
S3Event:
Type: S3
Properties:
Bucket: !Ref TestBucket
Events:
- 's3:ObjectCreated:Put'
Environment:
Variables:
TARGET_S3: "bucket-example-convert-staging"
TestBucket:
Type: 'AWS::S3::Bucket'
Properties:
BucketName: "bucket-example-staging"
TestBucketConvert:
Type: 'AWS::S3::Bucket'
Properties:
BucketName: "bucket-example-convert-staging"
##2. 統合テスト
Makefileのintegration-testing
で統合テストをしています。
コマンドはmake integration-testing
です。
ここではdockerを一度落として起動し、その後bucketを作成し、sample dataをPUTした後にsam cliのイベントが動くようになっています。
PROJECT_NAME:= "localstack-example"
.PHONY: deps clean build integration-testing deploy
deps:
go get -u ./...
clean:
rm -rf ./src/main
build:
GOOS=linux GOARCH=amd64 go build -o src/main ./src
integration-testing: build
docker-compose -p $(PROJECT_NAME) down
env TMPDIR=/private$TMPDIR docker-compose -p $(PROJECT_NAME) up -d
sleep 5s
aws --endpoint-url=http://localhost:4572 s3 mb s3://bucket-example
aws --endpoint-url=http://localhost:4572 s3 mb s3://bucket-example-convert
aws --endpoint-url=http://localhost:4572 s3 cp ./testdata/example.json.gz s3://bucket-example/example.json.gz
sam local invoke MainFunction --event event_file.json --template ./template/local.yaml \
--docker-network $$(docker network ls -q -f name=$(PROJECT_NAME))
deploy: build
sam package --template-file ./template/staging.yaml --s3-bucket package-bucket-example --output-template-file packaged.yaml
sam deploy --template-file packaged.yaml --stack-name sam-cli-example --capabilities CAPABILITY_IAM
##3. ビルド
Makefileのbuild
でビルドをしています。
コマンドはmake build
です。
##3. デプロイ
Makefileのdeploy
でビルドをしています。
コマンドはmake deploy
です。
ここではソースをS3にあげて(S3は事前にbucketを作成)、その後AWSにデプロイをしています。
#参考
- S3にアップしたZIPを別S3バケットに解凍するアプリでAWS SAM+Golang開発の流れを確認
- [aws-sam-cliの使用方法]
(https://github.com/awslabs/aws-sam-cli/blob/develop/docs/usage.rst#generate-sample-event-payloads)
#まとめ
SAM CLI + Golang でlambda s3 to s3を実現してみました。
gzip圧縮したりなど、なるべく本番で使う想定で実装してみました。
本当はdeploy周りはCodeBuildを使いたかったのですが、時間が足らず...あとAWS Toolkits for IntelliJもやりたかった。
次回にしたいと思います。
今回使ったプロジェクトは下記にありますので、ご参照ください。
https://github.com/yoshihir/samcli-s3-to-s3-example