はじめに
今回は AWS Step Functions を試します。
画像を自動的に変換・サイズ変更するページを作成します。
[Lambda関数・SAMテンプレート]
(https://github.com/tanaka-takurou/serverless-application-step-functions-page-go)
準備
[AWS Step Functionsの資料]
AWS Step Functions
Create a Step Functions State Machine Using AWS SAM
AWS SAM テンプレート作成
AWS SAM テンプレートで API-Gateway , Lambda, Step Functionsの設定をします。
[参考資料]
AWS SAM テンプレートを作成する
template.yml
template.yml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Serverless Application Template for Step Functions
Parameters:
ApplicationName:
Type: String
Default: 'ServerlessApplicationCreateThumbnail'
FrontApiStageName:
Type: String
Default: 'ProdStage'
Metadata:
AWS::ServerlessRepo::Application:
Name: Serverless-Application-Step-Functions
Description: 'This application convert image and create icon, thumbnail.'
Author: tanaka-takurou
SpdxLicenseId: MIT
LicenseUrl: LICENSE.txt
ReadmeUrl: README.md
Labels: ['ServerlessRepo']
HomePageUrl: https://github.com/tanaka-takurou/serverless-application-step-functions-page-go
SemanticVersion: 0.0.2
SourceCodeUrl: https://github.com/tanaka-takurou/serverless-application-step-functions-page-go
Resources:
FrontApi:
Type: AWS::Serverless::Api
Properties:
EndpointConfiguration: REGIONAL
StageName: !Ref FrontApiStageName
ImgBucket:
Type: AWS::S3::Bucket
Properties:
CorsConfiguration:
CorsRules:
- AllowedHeaders: ['*']
AllowedMethods: [GET, HEAD]
AllowedOrigins: ['*']
Id: CORSRuleId1
MaxAge: '3600'
FrontFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: bin/
Handler: main
MemorySize: 256
Runtime: go1.x
Description: 'Front Function'
Policies:
- Statement:
- Effect: 'Allow'
Action:
- 'logs:CreateLogGroup'
- 'logs:CreateLogStream'
- 'logs:PutLogEvents'
Resource: '*'
Events:
FrontApi:
Type: Api
Properties:
Path: '/'
Method: get
RestApiId: !Ref FrontApi
Environment:
Variables:
REGION: !Ref AWS::Region
BUCKET_NAME: !Ref 'ImgBucket'
API_PATH: !Join [ '', [ '/', !Ref 'FrontApiStageName', '/api'] ]
MainFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: api/bin/
Handler: main
MemorySize: 256
Runtime: go1.x
Description: 'API Function'
Policies:
- S3CrudPolicy:
BucketName: !Ref ImgBucket
- Statement:
- Effect: 'Allow'
Action:
- 'logs:CreateLogGroup'
- 'logs:CreateLogStream'
- 'logs:PutLogEvents'
Resource: '*'
- Effect: 'Allow'
Action:
- 'states:StartExecution'
- 'states:ListExecutions'
Resource: '*'
Events:
FrontApi:
Type: Api
Properties:
Path: '/api'
Method: post
RestApiId: !Ref FrontApi
Environment:
Variables:
REGION: !Ref AWS::Region
BUCKET_NAME: !Ref 'ImgBucket'
STATE_MACHINE_ARN: !Ref MainStateMachine
StepFunctionsMain:
Type: AWS::Serverless::Function
Properties:
CodeUri: step/bin/
Handler: main
MemorySize: 256
Runtime: go1.x
Description: 'Step Functions Main'
Policies:
- S3CrudPolicy:
BucketName: !Ref ImgBucket
- Statement:
- Effect: 'Allow'
Action:
- 'logs:CreateLogGroup'
- 'logs:CreateLogStream'
- 'logs:PutLogEvents'
Resource: '*'
Environment:
Variables:
REGION: !Ref AWS::Region
BUCKET_NAME: !Ref 'ImgBucket'
FrontApiPermission:
Type: AWS::Lambda::Permission
Properties:
Action: lambda:InvokeFunction
FunctionName: !Ref FrontFunction
Principal: apigateway.amazonaws.com
MainStateMachine:
Type: AWS::Serverless::StateMachine
Properties:
DefinitionUri: step/statemachine.json
DefinitionSubstitutions:
LambdaFunction: !GetAtt StepFunctionsMain.Arn
Role: !GetAtt StatesExecutionRole.Arn
StatesExecutionRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service:
- !Sub states.${AWS::Region}.amazonaws.com
Action: "sts:AssumeRole"
Path: "/"
Policies:
- PolicyName: StatesExecutionPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- "lambda:InvokeFunction"
Resource: "*"
Outputs:
APIURI:
Value: !Join [ '', [ 'https://', !Ref FrontApi, '.execute-api.',!Ref 'AWS::Region','.amazonaws.com/',!Ref 'FrontApiStageName','/'] ]
Step Functionsの設定は以下の部分
MainStateMachine:
Type: AWS::Serverless::StateMachine
Properties:
DefinitionUri: step/statemachine.json
DefinitionSubstitutions:
LambdaFunction: !GetAtt StepFunctionsMain.Arn
Role: !GetAtt StatesExecutionRole.Arn
StatesExecutionRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service:
- !Sub states.${AWS::Region}.amazonaws.com
Action: "sts:AssumeRole"
Path: "/"
Policies:
- PolicyName: StatesExecutionPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- "lambda:InvokeFunction"
Resource: "*"
Lambda関数作成
※ Lambda関数は aws-lambda-go を利用し、Step Functionsの周りの処理は aws-sdk-go-v2 を利用しました。
AWS SDK for Go API Reference V2
main.go
main.go
package main
import (
"os"
"log"
"time"
"bytes"
"errors"
"strings"
"context"
"net/http"
"path/filepath"
"encoding/json"
"encoding/base64"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/external"
"github.com/aws/aws-sdk-go-v2/service/sfn"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
)
type APIResponse struct {
Message string `json:"message"`
}
type Response events.APIGatewayProxyResponse
var cfg aws.Config
var s3Client *s3.Client
var sfnClient *sfn.Client
const layout string = "2006-01-02-15-04"
const layout2 string = "20060102150405.000"
func HandleRequest(ctx context.Context, request events.APIGatewayProxyRequest) (Response, error) {
var jsonBytes []byte
var err error
d := make(map[string]string)
json.Unmarshal([]byte(request.Body), &d)
if v, ok := d["action"]; ok {
switch v {
case "upload" :
if v, ok := d["filename"]; ok {
if w, ok := d["filedata"]; ok {
if name, key, e := uploadImage(v, w); e == nil {
err = startExecution(ctx, name, key)
if err == nil {
jsonBytes, _ = json.Marshal(APIResponse{Message: name})
}
} else {
err = e
}
}
}
case "checkstatus" :
if id, ok := d["id"]; ok {
res, e := checkStatus(ctx, id)
if e != nil {
err = e
} else {
jsonBytes, _ = json.Marshal(APIResponse{Message: res})
}
}
}
}
if err != nil {
return Response{
StatusCode: http.StatusInternalServerError,
}, err
} else {
log.Print(request.RequestContext.Identity.SourceIP)
}
responseBody := ""
if len(jsonBytes) > 0 {
responseBody = string(jsonBytes)
}
return Response {
StatusCode: http.StatusOK,
Body: responseBody,
}, nil
}
func uploadImage(filename string, filedata string)(string, string, error) {
t := time.Now()
b64data := filedata[strings.IndexByte(filedata, ',')+1:]
data, err := base64.StdEncoding.DecodeString(b64data)
if err != nil {
log.Print(err)
return "", "", err
}
extension := filepath.Ext(filename)
var contentType string
switch extension {
case ".jpg":
contentType = "image/jpeg"
case ".jpeg":
contentType = "image/jpeg"
case ".gif":
contentType = "image/gif"
case ".png":
contentType = "image/png"
default:
return "", "", errors.New("this extension is invalid")
}
name := strings.Replace(t.Format(layout2), ".", "", 1)
key := strings.Replace(t.Format(layout), ".", "", 1) + "/" + name + extension
uploader := s3manager.NewUploader(cfg)
_, err = uploader.Upload(&s3manager.UploadInput{
ACL: s3.ObjectCannedACLPublicRead,
Bucket: aws.String(os.Getenv("BUCKET_NAME")),
Key: aws.String(key),
Body: bytes.NewReader(data),
ContentType: aws.String(contentType),
})
if err != nil {
log.Print(err)
return "", "", err
}
return name, key, nil
}
func startExecution(ctx context.Context, name string, key string) error {
if sfnClient == nil {
sfnClient = sfn.New(cfg)
}
input := &sfn.StartExecutionInput{
Input: aws.String("{\"Key\" : \"" + key + "\"}"),
Name: aws.String(name),
StateMachineArn: aws.String(os.Getenv("STATE_MACHINE_ARN")),
}
req := sfnClient.StartExecutionRequest(input)
_, err := req.Send(ctx)
if err != nil {
log.Print(err)
return err
}
return nil
}
func checkStatus(ctx context.Context, id string)(string, error) {
if sfnClient == nil {
sfnClient = sfn.New(cfg)
}
statusList := []sfn.ExecutionStatus{sfn.ExecutionStatusRunning, sfn.ExecutionStatusSucceeded}
for _, v := range statusList {
input := &sfn.ListExecutionsInput{
StateMachineArn: aws.String(os.Getenv("STATE_MACHINE_ARN")),
StatusFilter: v,
}
req := sfnClient.ListExecutionsRequest(input)
res, err := req.Send(ctx)
if err != nil {
log.Print(err)
return "", err
}
for _, w := range res.ListExecutionsOutput.Executions {
if id == aws.StringValue(w.Name) {
return string(v), nil
}
}
}
return "Error", nil
}
func init() {
var err error
cfg, err = external.LoadDefaultAWSConfig()
cfg.Region = os.Getenv("REGION")
if err != nil {
log.Print(err)
}
}
func main() {
lambda.Start(HandleRequest)
}
ステートマシンの実行を開始するには StartExecutionRequest を使う
func startExecution(ctx context.Context, name string, key string) error {
if sfnClient == nil {
sfnClient = sfn.New(cfg)
}
input := &sfn.StartExecutionInput{
Input: aws.String("{\"Key\" : \"" + key + "\"}"),
Name: aws.String(name),
StateMachineArn: aws.String(os.Getenv("STATE_MACHINE_ARN")),
}
req := sfnClient.StartExecutionRequest(input)
_, err := req.Send(ctx)
if err != nil {
log.Print(err)
return err
}
return nil
}
ステートマシンの定義ファイル作成
statemachine.json
statemachine.json
{
"StartAt": "Convert jpg image",
"States": {
"Convert jpg image": {
"Type": "Task",
"Resource": "${LambdaFunction}",
"Parameters": {
"action": "convert",
"key.$": "$.Key",
"type": "jpg"
},
"Next": "Convert png image"
},
"Convert png image": {
"Type": "Task",
"Resource": "${LambdaFunction}",
"Parameters": {
"action": "convert",
"key.$": "$.Key",
"type": "png"
},
"Next": "Create medium icon"
},
"Create medium icon": {
"Type": "Task",
"Resource": "${LambdaFunction}",
"Parameters": {
"action": "icon",
"key.$": "$.Key",
"icon": {
"diameter": "200",
"bgcolor": "f0ff"
}
},
"Next": "Create large icon"
},
"Create large icon": {
"Type": "Task",
"Resource": "${LambdaFunction}",
"Parameters": {
"action": "icon",
"key.$": "$.Key",
"icon": {
"diameter": "300",
"bgcolor": "f0ff"
}
},
"Next": "Create medium thumbnail"
},
"Create medium thumbnail": {
"Type": "Task",
"Resource": "${LambdaFunction}",
"Parameters": {
"action": "thumbnail",
"key.$": "$.Key",
"thumbnail": {
"width": "960",
"height": "540",
"bgcolor": "f0ff"
}
},
"Next": "Create large thumbnail"
},
"Create large thumbnail": {
"Type": "Task",
"Resource": "${LambdaFunction}",
"Parameters": {
"action": "thumbnail",
"key.$": "$.Key",
"thumbnail": {
"width": "1440",
"height": "810",
"bgcolor": "f0ff"
}
},
"Next": "Create small thumbnail"
},
"Create small thumbnail": {
"Type": "Task",
"Resource": "${LambdaFunction}",
"Parameters": {
"action": "thumbnail",
"key.$": "$.Key",
"thumbnail": {
"width": "480",
"height": "270",
"bgcolor": "f0ff"
}
},
"End": true
}
}
}
ステートマシンの実行開始後、AWSの管理ページから、ステートマシンの進捗を確認できます。
終わりに
連続した処理を行う際には積極的にStep Functionsを利用していきたいと思います。
分岐処理も行えるため、今後試していこうと思います。
参考資料
[AWS Step Functions で作る Serverless バッチシステム](https://qiita.com/ketancho/items/147a141c9f8a6de86c97)
[AWS SAMがStep Functionsをサポートしたのでやってみる](https://qiita.com/hayao_k/items/c69b0f0a33f1b07498bb)
[StepFunctionsの入力方法サンプルメモ集](https://qiita.com/kure/items/6d09e4550814c3a17ed0)
[AWS Step Functionsを実行するためのIAM Policy](https://qiita.com/nakano348/items/b169792d0ec356a352ef)