23
15

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

SAM CLI + Golang でlambda s3 to s3を実現する

Last updated at Posted at 2018-12-15

このエントリはただの集団 Advent Calendar 2018の16日目の記事です。

#概要
S3にgzipがputされたのを感知し、lambdaで編集&gzip化、S3にuploadする処理です。
AWS Design.png

環境

  • Golang 1.11.2
  • SAN CLI 0.6.2
  • Docker for Mac
  • LocalStack
  • Goland
  • MacOS

#開発
##1. 環境構築
まずは開発環境を用意します。下記を実行するとsampleが生成されます。

$ brew tap aws/tap
$ brew install aws-sam-cli
$ sam init --runtime go

その他の使い方は下記のリンクを見ていただくと良いと思います
https://github.com/awslabs/aws-sam-cli/blob/develop/docs/usage.rst#generate-sample-event-payloads

##2. パッケージ構成
ざっくりとしたパッケージ構成とその説明です。

.
├── src                         
│   ├── main.go                 <-- Lambda function code
│   └── main_test.go            <-- Unit tests
├── template                    
│   ├── local.yaml              <-- sam cliの定義ファイル(local)
│   └── staging.yaml            <-- sam cliの定義ファイル(staging)
├── testdata                    
│   └── example.json.gz         <-- test data      
├── docker-compose.yaml         <-- localでS3を再現するためのlocalstack
├── event_file.json             <-- localでevent起動するときのrequestファイル(sam local generate-event s3 put で生成し、編集)
├── Makefile                    <-- command実行ファイル
├── packaged.yaml               <-- sam package時生成される。deploy時に必要
└── README.md

##3. 実行ファイル
開発はTDDで行いましたので、まずはtestの解説を...
TestMainで前処理としてbucketの作成、及び初期データを片方のbucketに突っ込んでいます。
その後は関数ごとにtestを書いています。
testの実行方法はcd src go testです(事前にenv TMPDIR=/private$TMPDIR docker-compose up -dでdockerを起動しておいてください)

main_test.go
package main

import (
	"bytes"
	"compress/gzip"
	"context"
	"encoding/json"
	"fmt"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/awserr"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/s3"
	"github.com/aws/aws-sdk-go/service/s3/s3manager"
	"io/ioutil"
	"os"
	"testing"
	"time"
)

func TestMain(m *testing.M) {
	// test前処理
	println("before all...")

	os.Setenv("REGION", "ap-northeast-1")
	os.Setenv("S3_ENDPOINT", "http://localhost:4572")
	os.Setenv("TARGET_S3", "bucket-example-convert")

	var sess = session.Must(session.NewSession(&aws.Config{
		S3ForcePathStyle: aws.Bool(true),
		Region:           aws.String(os.Getenv("REGION")),
		Endpoint:         aws.String(os.Getenv("S3_ENDPOINT")),
	}))
	var creater = s3.New(sess)
	var uploader = s3manager.NewUploader(sess)

	_, err := creater.CreateBucket(&s3.CreateBucketInput{
		Bucket: aws.String("bucket-example"),
	})

	if err != nil {
		if aerr, ok := err.(awserr.Error); ok {
			switch aerr.Code() {
			case s3.ErrCodeBucketAlreadyExists:
				fmt.Println(s3.ErrCodeBucketAlreadyExists, aerr.Error())
			case s3.ErrCodeBucketAlreadyOwnedByYou:
				fmt.Println(s3.ErrCodeBucketAlreadyOwnedByYou, aerr.Error())
			default:
				fmt.Println(aerr.Error())
			}
		} else {
			fmt.Println(err.Error())
		}
	}
	_, err = creater.CreateBucket(&s3.CreateBucketInput{
		Bucket: aws.String("bucket-example-convert"),
	})

	if err != nil {
		if aerr, ok := err.(awserr.Error); ok {
			switch aerr.Code() {
			case s3.ErrCodeBucketAlreadyExists:
				fmt.Println(s3.ErrCodeBucketAlreadyExists, aerr.Error())
			case s3.ErrCodeBucketAlreadyOwnedByYou:
				fmt.Println(s3.ErrCodeBucketAlreadyOwnedByYou, aerr.Error())
			default:
				fmt.Println(aerr.Error())
			}
		} else {
			fmt.Println(err.Error())
		}
	}

	up, err := os.Open("./testdata/example.json.gz")
	if err != nil {
		fmt.Println("failed to open file")
		return
	}

	gzip.NewWriter(up).Flush()

	_, err = uploader.Upload(&s3manager.UploadInput{
		Bucket: aws.String("bucket-example"),
		Key:    aws.String("example.json.gz"),
		Body:   up,
	})
	if err != nil {
		fmt.Println("failed to upload file")
		return
	}

	// test実行
	code := m.Run()
	// test後実行
	println("after all...")
	os.Exit(code)
}

func TestS3Upload(t *testing.T) {
	t.Run("upload", func(t *testing.T) {
		var buf bytes.Buffer
		result, err := s3Upload(buf)
		if err != nil {
			t.Fatal("Error failed to s3upload")
		}
		if result.Location == "" {
			t.Errorf("got: %v\nwant: %v", result.UploadID, "")
		}
		fmt.Println("Test s3upload...")
	})
}

func TestCompress(t *testing.T) {
	t.Run("compress", func(t *testing.T) {
		data := []SampleConvertData{
			{12345678, "abcdefgh", time.Now().String()},
			{23456781, "bcdefgha", time.Now().String()}}

		var buf bytes.Buffer
		err := compress(&buf, data)
		if err != nil {
			t.Fatal("Error failed to compress")
		}
		if len(buf.Bytes()) == 0 {
			t.Fatal("Error failed to compress")
			t.Errorf("got: %v\nwant: %v", buf.Bytes(), 0)
		}
		fmt.Println("Test compress...")
	})
}

func TestConvert(t *testing.T) {
	t.Run("convert", func(t *testing.T) {
		data := []SampleData{{12345678, "abcdefgh"}}
		expected := time.Now().String()

		convertData, err := convert(data, expected)
		if err != nil {
			t.Fatal("Error failed to convert")
		}
		if convertData[0].Time != expected {
			t.Errorf("got: %v\nwant: %v", convertData[0].Time, expected)
		}
		fmt.Println("Test convert...")
	})
}

func TestExtract(t *testing.T) {
	t.Run("extract", func(t *testing.T) {
		file, _ := os.Open("./testdata/example.json.gz")
		defer file.Close()
		actual, err := extract(file)
		if err != nil {
			t.Fatal("Error failed to extract")
		}
		expected := "abcdefgh"
		if actual[0].Value != expected {
			t.Errorf("got: %v\nwant: %v", actual[0].Value, expected)
		}
		fmt.Println("Test extract...")
	})
}

func TestS3Download(t *testing.T) {
	t.Run("s3 download test", func(t *testing.T) {
		tmpFile, err := s3Download("bucket-example", "example.json.gz")
		if err != nil {
			t.Fatal("Error failed to s3 download")
		}
		if tmpFile.Name() == "" {
			t.Errorf("got: %v\nwant: %v", "", "/tmp/srctmp_*********")
		}
		fmt.Println("Test s3Download...")
	})
}

func TestHandler(t *testing.T) {
	t.Run("handler input test", func(t *testing.T) {
		raw, err := ioutil.ReadFile("../event_file.json")
		if err != nil {
			t.Fatal("Error failed to event file load")
		}
		var event events.S3Event
		json.Unmarshal(raw, &event)
		err = handler(context.Background(), event)
		if err != nil {
			t.Fatal("Error failed to s3 event")
		}
		fmt.Println("Test handler...")
	})
}

mainです。
苦労したところはgolang特有tempFileを用意してのdownload処理、gzipへの圧縮処理です。
参考資料(gzip): http://text.baldanders.info/golang/gzip-operation/

main.go
package main

import (
	"bytes"
	"compress/gzip"
	"context"
	"encoding/json"
	"fmt"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/s3"
	"github.com/aws/aws-sdk-go/service/s3/s3manager"
	"github.com/pkg/errors"
	"io"
	"io/ioutil"
	"os"
	"time"
)

type SampleData struct {
	Id    int    `json:"id"`
	Value string `json:"value"`
}

type SampleConvertData struct {
	Id    int    `json:"id"`
	Value string `json:"value"`
	Time  string `json:"time"`
}

func createSession() *session.Session {
	var sess = session.Must(session.NewSession(&aws.Config{
		S3ForcePathStyle: aws.Bool(true),
		Region:           aws.String(os.Getenv("REGION")),
		Endpoint:         aws.String(os.Getenv("S3_ENDPOINT")),
	}))
	return sess
}

func s3Upload(buf bytes.Buffer) (*s3manager.UploadOutput, error) {
	sess := createSession()

	var uploader = s3manager.NewUploader(sess)

	result, err := uploader.Upload(&s3manager.UploadInput{
		Bucket: aws.String(os.Getenv("TARGET_S3")),
		Key:    aws.String("example-convert.json.gz"),
		Body:   bytes.NewReader(buf.Bytes()),
	})
	if err != nil {
		return nil, errors.Wrap(err, "failed to upload file")
	}

	return result, err
}

func compress(w io.Writer, convertData []SampleConvertData) error {
	b, _ := json.Marshal(convertData)
	gw, err := gzip.NewWriterLevel(w, gzip.BestCompression)
	gw.Write(b)
	defer gw.Close()
	return err
}

func convert(data []SampleData, time string) ([]SampleConvertData, error) {
	var dataConvert []SampleConvertData
	for _, d := range data {
		dataConvert = append(dataConvert, SampleConvertData{
			Id:    d.Id,
			Value: d.Value,
			Time:  time,
		})
	}
	return dataConvert, nil
}

func extract(file *os.File) ([]SampleData, error) {
	gzipReader, _ := gzip.NewReader(file)
	defer gzipReader.Close()

	raw, err := ioutil.ReadAll(gzipReader)
	if err != nil {
		fmt.Println(err.Error())
	}

	var data []SampleData
	err = json.Unmarshal(raw, &data)
	if err != nil {
		fmt.Println(err.Error())
	}

	return data, err
}

func s3Download(bucket string, key string) (f *os.File, err error) {
	sess := createSession()

	tmpFile, _ := ioutil.TempFile("/tmp", "srctmp_")
	defer os.Remove(tmpFile.Name())

	var downloader = s3manager.NewDownloader(sess)

	_, err = downloader.Download(
		tmpFile,
		&s3.GetObjectInput{
			Bucket: aws.String(bucket),
			Key:    aws.String(key),
		})
	if err != nil {
		return nil, errors.Wrap(err, "file download error")
	}

	return tmpFile, err
}

func handler(ctx context.Context, req events.S3Event) error {
	bucketName := req.Records[0].S3.Bucket.Name
	key := req.Records[0].S3.Object.Key
	file, err := s3Download(bucketName, key)
	if err != nil {
		return errors.Wrap(err, "Error failed to s3 download")
	}
	data, err := extract(file)
	if err != nil {
		return errors.Wrap(err, "Error failed to extract")
	}
	timeNow := time.Now().String()
	convertData, err := convert(data, timeNow)
	if err != nil {
		return errors.Wrap(err, "Error failed to convert")
	}
	var buf bytes.Buffer
	err = compress(&buf, convertData)
	if err != nil {
		return errors.Wrap(err, "Error failed compress")
	}
	_, err = s3Upload(buf)
	if err != nil {
		return errors.Wrap(err, "Error failed to s3 upload")
	}

	return nil
}

func main() {
	lambda.Start(handler)
}

#統合テスト,ビルド,デプロイ
##1. template.yaml
SAMのYAML定義になります。
ローカルでのテスト用に、local.yamlでは環境変数を設定しています。
AWS上で動く用のstaging.yamlではPoliciesを定義すると、Roleが自動生成されて、lambdaに反映されます。

staging.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  sam-app

  Sample SAM Template for sam-app

# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
  Function:
    Timeout: 5

Resources:
  MainFunction:
    Type: AWS::Serverless::Function
    Properties:
      Handler: main
      Runtime: go1.x
      CodeUri: ../src
      FunctionName: main
      Description: >-
        An Amazon S3 trigger that retrieves metadata for the object that has
        been updated.
      MemorySize: 128
      Policies:
      - Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Action:
          - 's3:GetObject'
          - 's3:PutObject'
          Resource: "arn:aws:s3:::bucket-example-*"
      Events:
        S3Event:
          Type: S3
          Properties:
            Bucket: !Ref TestBucket
            Events:
            - 's3:ObjectCreated:Put'
      Environment:
        Variables:
          TARGET_S3: "bucket-example-convert-staging"

  TestBucket:
    Type: 'AWS::S3::Bucket'
    Properties:
      BucketName: "bucket-example-staging"
  TestBucketConvert:
    Type: 'AWS::S3::Bucket'
    Properties:
      BucketName: "bucket-example-convert-staging"

##2. 統合テスト
Makefileのintegration-testingで統合テストをしています。
コマンドはmake integration-testingです。
ここではdockerを一度落として起動し、その後bucketを作成し、sample dataをPUTした後にsam cliのイベントが動くようになっています。

PROJECT_NAME:= "localstack-example"

.PHONY: deps clean build integration-testing deploy

deps:
	go get -u ./...

clean: 
	rm -rf ./src/main
	
build:
	GOOS=linux GOARCH=amd64 go build -o src/main ./src

integration-testing: build
	docker-compose -p $(PROJECT_NAME) down
	env TMPDIR=/private$TMPDIR docker-compose -p $(PROJECT_NAME) up -d
	sleep 5s
	aws --endpoint-url=http://localhost:4572 s3 mb s3://bucket-example
	aws --endpoint-url=http://localhost:4572 s3 mb s3://bucket-example-convert
	aws --endpoint-url=http://localhost:4572 s3 cp ./testdata/example.json.gz s3://bucket-example/example.json.gz
	sam local invoke MainFunction --event event_file.json --template ./template/local.yaml \
	--docker-network $$(docker network ls -q -f name=$(PROJECT_NAME))

deploy: build
	sam package --template-file ./template/staging.yaml --s3-bucket package-bucket-example --output-template-file packaged.yaml
	sam deploy --template-file packaged.yaml --stack-name sam-cli-example --capabilities CAPABILITY_IAM

##3. ビルド
Makefileのbuildでビルドをしています。
コマンドはmake buildです。

##3. デプロイ
Makefileのdeployでビルドをしています。
コマンドはmake deployです。
ここではソースをS3にあげて(S3は事前にbucketを作成)、その後AWSにデプロイをしています。

#参考

#まとめ

SAM CLI + Golang でlambda s3 to s3を実現してみました。
gzip圧縮したりなど、なるべく本番で使う想定で実装してみました。
本当はdeploy周りはCodeBuildを使いたかったのですが、時間が足らず...あとAWS Toolkits for IntelliJもやりたかった。
次回にしたいと思います。

今回使ったプロジェクトは下記にありますので、ご参照ください。
https://github.com/yoshihir/samcli-s3-to-s3-example

23
15
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
23
15

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?