0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

AWS日記18 (Step Functions)

Last updated at Posted at 2020-09-11

はじめに

今回は AWS Step Functions を試します。
画像を自動的に変換・サイズ変更するページを作成します。
[Lambda関数・SAMテンプレート]
(https://github.com/tanaka-takurou/serverless-application-step-functions-page-go)

準備

AWS SAM の準備をします

[AWS Step Functionsの資料]
AWS Step Functions
Create a Step Functions State Machine Using AWS SAM

AWS SAM テンプレート作成

AWS SAM テンプレートで API-Gateway , Lambda, Step Functionsの設定をします。

[参考資料]
AWS SAM テンプレートを作成する

template.yml
template.yml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Serverless Application Template for Step Functions

Parameters:
  ApplicationName:
    Type: String
    Default: 'ServerlessApplicationCreateThumbnail'
  FrontApiStageName:
    Type: String
    Default: 'ProdStage'

Metadata:
  AWS::ServerlessRepo::Application:
    Name: Serverless-Application-Step-Functions
    Description: 'This application convert image and create icon, thumbnail.'
    Author: tanaka-takurou
    SpdxLicenseId: MIT
    LicenseUrl: LICENSE.txt
    ReadmeUrl: README.md
    Labels: ['ServerlessRepo']
    HomePageUrl: https://github.com/tanaka-takurou/serverless-application-step-functions-page-go
    SemanticVersion: 0.0.2
    SourceCodeUrl: https://github.com/tanaka-takurou/serverless-application-step-functions-page-go

Resources:
  FrontApi:
    Type: AWS::Serverless::Api
    Properties:
      EndpointConfiguration: REGIONAL
      StageName: !Ref FrontApiStageName
  ImgBucket:
    Type: AWS::S3::Bucket
    Properties:
      CorsConfiguration:
        CorsRules:
        - AllowedHeaders: ['*']
          AllowedMethods: [GET, HEAD]
          AllowedOrigins: ['*']
          Id: CORSRuleId1
          MaxAge: '3600'
  FrontFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: bin/
      Handler: main
      MemorySize: 256
      Runtime: go1.x
      Description: 'Front Function'
      Policies:
      - Statement:
        - Effect: 'Allow'
          Action:
            - 'logs:CreateLogGroup'
            - 'logs:CreateLogStream'
            - 'logs:PutLogEvents'
          Resource: '*'
      Events:
        FrontApi:
          Type: Api
          Properties:
            Path: '/'
            Method: get
            RestApiId: !Ref FrontApi
      Environment:
        Variables:
          REGION: !Ref AWS::Region
          BUCKET_NAME: !Ref 'ImgBucket'
          API_PATH: !Join [ '', [ '/', !Ref 'FrontApiStageName', '/api'] ]
  MainFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: api/bin/
      Handler: main
      MemorySize: 256
      Runtime: go1.x
      Description: 'API Function'
      Policies:
      - S3CrudPolicy:
          BucketName: !Ref ImgBucket
      - Statement:
        - Effect: 'Allow'
          Action:
            - 'logs:CreateLogGroup'
            - 'logs:CreateLogStream'
            - 'logs:PutLogEvents'
          Resource: '*'
        - Effect: 'Allow'
          Action:
            - 'states:StartExecution'
            - 'states:ListExecutions'
          Resource: '*'
      Events:
        FrontApi:
          Type: Api
          Properties:
            Path: '/api'
            Method: post
            RestApiId: !Ref FrontApi
      Environment:
        Variables:
          REGION: !Ref AWS::Region
          BUCKET_NAME: !Ref 'ImgBucket'
          STATE_MACHINE_ARN: !Ref MainStateMachine
  StepFunctionsMain:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: step/bin/
      Handler: main
      MemorySize: 256
      Runtime: go1.x
      Description: 'Step Functions Main'
      Policies:
      - S3CrudPolicy:
          BucketName: !Ref ImgBucket
      - Statement:
        - Effect: 'Allow'
          Action:
            - 'logs:CreateLogGroup'
            - 'logs:CreateLogStream'
            - 'logs:PutLogEvents'
          Resource: '*'
      Environment:
        Variables:
          REGION: !Ref AWS::Region
          BUCKET_NAME: !Ref 'ImgBucket'
  FrontApiPermission:
    Type: AWS::Lambda::Permission
    Properties:
      Action: lambda:InvokeFunction
      FunctionName: !Ref FrontFunction
      Principal: apigateway.amazonaws.com
  MainStateMachine:
    Type: AWS::Serverless::StateMachine
    Properties:
      DefinitionUri: step/statemachine.json
      DefinitionSubstitutions:
        LambdaFunction: !GetAtt StepFunctionsMain.Arn
      Role: !GetAtt StatesExecutionRole.Arn
  StatesExecutionRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service:
                - !Sub states.${AWS::Region}.amazonaws.com
            Action: "sts:AssumeRole"
      Path: "/"
      Policies:
        - PolicyName: StatesExecutionPolicy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - "lambda:InvokeFunction"
                Resource: "*"

Outputs:
  APIURI:
    Value: !Join [ '', [ 'https://', !Ref FrontApi, '.execute-api.',!Ref 'AWS::Region','.amazonaws.com/',!Ref 'FrontApiStageName','/'] ]

Step Functionsの設定は以下の部分

  MainStateMachine:
    Type: AWS::Serverless::StateMachine
    Properties:
      DefinitionUri: step/statemachine.json
      DefinitionSubstitutions:
        LambdaFunction: !GetAtt StepFunctionsMain.Arn
      Role: !GetAtt StatesExecutionRole.Arn
  StatesExecutionRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service:
                - !Sub states.${AWS::Region}.amazonaws.com
            Action: "sts:AssumeRole"
      Path: "/"
      Policies:
        - PolicyName: StatesExecutionPolicy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - "lambda:InvokeFunction"
                Resource: "*"

Lambda関数作成

※ Lambda関数は aws-lambda-go を利用し、Step Functionsの周りの処理は aws-sdk-go-v2 を利用しました。
AWS SDK for Go API Reference V2

main.go
main.go
package main

import (
	"os"
	"log"
	"time"
	"bytes"
	"errors"
	"strings"
	"context"
	"net/http"
	"path/filepath"
	"encoding/json"
	"encoding/base64"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/aws/external"
	"github.com/aws/aws-sdk-go-v2/service/sfn"
	"github.com/aws/aws-sdk-go-v2/service/s3"
	"github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
)

type APIResponse struct {
	Message  string `json:"message"`
}

type Response events.APIGatewayProxyResponse

var cfg aws.Config
var s3Client *s3.Client
var sfnClient *sfn.Client

const layout  string = "2006-01-02-15-04"
const layout2 string = "20060102150405.000"

func HandleRequest(ctx context.Context, request events.APIGatewayProxyRequest) (Response, error) {
	var jsonBytes []byte
	var err error
	d := make(map[string]string)
	json.Unmarshal([]byte(request.Body), &d)
	if v, ok := d["action"]; ok {
		switch v {
		case "upload" :
			if v, ok := d["filename"]; ok {
				if w, ok := d["filedata"]; ok {
					if name, key, e := uploadImage(v, w); e == nil {
						err = startExecution(ctx, name, key)
						if err == nil {
							jsonBytes, _ = json.Marshal(APIResponse{Message: name})
						}
					} else {
						err = e
					}
				}
			}
		case "checkstatus" :
			if id, ok := d["id"]; ok {
				res, e := checkStatus(ctx, id)
				if e != nil {
					err = e
				} else {
					jsonBytes, _ = json.Marshal(APIResponse{Message: res})
				}
			}
		}
	}
	if err != nil {
		return Response{
			StatusCode: http.StatusInternalServerError,
		}, err
	} else {
		log.Print(request.RequestContext.Identity.SourceIP)
	}
	responseBody := ""
	if len(jsonBytes) > 0 {
		responseBody = string(jsonBytes)
	}
	return Response {
		StatusCode: http.StatusOK,
		Body: responseBody,
	}, nil
}

func uploadImage(filename string, filedata string)(string, string, error) {
	t := time.Now()
	b64data := filedata[strings.IndexByte(filedata, ',')+1:]
	data, err := base64.StdEncoding.DecodeString(b64data)
	if err != nil {
		log.Print(err)
		return "", "", err
	}
	extension := filepath.Ext(filename)
	var contentType string

	switch extension {
	case ".jpg":
		contentType = "image/jpeg"
	case ".jpeg":
		contentType = "image/jpeg"
	case ".gif":
		contentType = "image/gif"
	case ".png":
		contentType = "image/png"
	default:
		return "", "", errors.New("this extension is invalid")
	}
	name := strings.Replace(t.Format(layout2), ".", "", 1)
	key := strings.Replace(t.Format(layout), ".", "", 1) + "/" + name + extension
	uploader := s3manager.NewUploader(cfg)
	_, err = uploader.Upload(&s3manager.UploadInput{
		ACL: s3.ObjectCannedACLPublicRead,
		Bucket: aws.String(os.Getenv("BUCKET_NAME")),
		Key: aws.String(key),
		Body: bytes.NewReader(data),
		ContentType: aws.String(contentType),
	})
	if err != nil {
		log.Print(err)
		return "", "", err
	}
	return name, key, nil
}

func startExecution(ctx context.Context, name string, key string) error {
	if sfnClient == nil {
		sfnClient = sfn.New(cfg)
	}
	input := &sfn.StartExecutionInput{
		Input: aws.String("{\"Key\" : \"" + key + "\"}"),
		Name: aws.String(name),
		StateMachineArn: aws.String(os.Getenv("STATE_MACHINE_ARN")),
	}

	req := sfnClient.StartExecutionRequest(input)
	_, err := req.Send(ctx)
	if err != nil {
		log.Print(err)
		return err
	}
	return nil
}

func checkStatus(ctx context.Context, id string)(string, error) {
	if sfnClient == nil {
		sfnClient = sfn.New(cfg)
	}

	statusList := []sfn.ExecutionStatus{sfn.ExecutionStatusRunning, sfn.ExecutionStatusSucceeded}

	for _, v := range statusList {
		input := &sfn.ListExecutionsInput{
			StateMachineArn: aws.String(os.Getenv("STATE_MACHINE_ARN")),
			StatusFilter: v,
		}

		req := sfnClient.ListExecutionsRequest(input)
		res, err := req.Send(ctx)
		if err != nil {
			log.Print(err)
			return "", err
		}
		for _, w := range res.ListExecutionsOutput.Executions {
			if id == aws.StringValue(w.Name) {
				return string(v), nil
			}
		}
	}

	return "Error", nil
}

func init() {
	var err error
	cfg, err = external.LoadDefaultAWSConfig()
	cfg.Region = os.Getenv("REGION")
	if err != nil {
		log.Print(err)
	}
}

func main() {
	lambda.Start(HandleRequest)
}

ステートマシンの実行を開始するには StartExecutionRequest を使う

func startExecution(ctx context.Context, name string, key string) error {
	if sfnClient == nil {
		sfnClient = sfn.New(cfg)
	}
	input := &sfn.StartExecutionInput{
		Input: aws.String("{\"Key\" : \"" + key + "\"}"),
		Name: aws.String(name),
		StateMachineArn: aws.String(os.Getenv("STATE_MACHINE_ARN")),
	}

	req := sfnClient.StartExecutionRequest(input)
	_, err := req.Send(ctx)
	if err != nil {
		log.Print(err)
		return err
	}
	return nil
}

ステートマシンの定義ファイル作成

statemachine.json
statemachine.json
{
  "StartAt": "Convert jpg image",
  "States": {
    "Convert jpg image": {
      "Type": "Task",
      "Resource": "${LambdaFunction}",
      "Parameters": {
        "action": "convert",
        "key.$": "$.Key",
        "type": "jpg"
      },
      "Next": "Convert png image"
    },
    "Convert png image": {
      "Type": "Task",
      "Resource": "${LambdaFunction}",
      "Parameters": {
        "action": "convert",
        "key.$": "$.Key",
        "type": "png"
      },
      "Next": "Create medium icon"
    },
    "Create medium icon": {
      "Type": "Task",
      "Resource": "${LambdaFunction}",
      "Parameters": {
        "action": "icon",
        "key.$": "$.Key",
        "icon": {
          "diameter": "200",
          "bgcolor": "f0ff"
        }
      },
      "Next": "Create large icon"
    },
    "Create large icon": {
      "Type": "Task",
      "Resource": "${LambdaFunction}",
      "Parameters": {
        "action": "icon",
        "key.$": "$.Key",
        "icon": {
          "diameter": "300",
          "bgcolor": "f0ff"
        }
      },
      "Next": "Create medium thumbnail"
    },
    "Create medium thumbnail": {
      "Type": "Task",
      "Resource": "${LambdaFunction}",
      "Parameters": {
        "action": "thumbnail",
        "key.$": "$.Key",
        "thumbnail": {
          "width": "960",
          "height": "540",
          "bgcolor": "f0ff"
        }
      },
      "Next": "Create large thumbnail"
    },
    "Create large thumbnail": {
      "Type": "Task",
      "Resource": "${LambdaFunction}",
      "Parameters": {
        "action": "thumbnail",
        "key.$": "$.Key",
        "thumbnail": {
          "width": "1440",
          "height": "810",
          "bgcolor": "f0ff"
        }
      },
      "Next": "Create small thumbnail"
    },
    "Create small thumbnail": {
      "Type": "Task",
      "Resource": "${LambdaFunction}",
      "Parameters": {
        "action": "thumbnail",
        "key.$": "$.Key",
        "thumbnail": {
          "width": "480",
          "height": "270",
          "bgcolor": "f0ff"
        }
      },
      "End": true
    }
  }
}

ステートマシンの実行開始後、AWSの管理ページから、ステートマシンの進捗を確認できます。

step-functions.jpg

終わりに

連続した処理を行う際には積極的にStep Functionsを利用していきたいと思います。
分岐処理も行えるため、今後試していこうと思います。

参考資料
[AWS Step Functions で作る Serverless バッチシステム](https://qiita.com/ketancho/items/147a141c9f8a6de86c97) [AWS SAMがStep Functionsをサポートしたのでやってみる](https://qiita.com/hayao_k/items/c69b0f0a33f1b07498bb) [StepFunctionsの入力方法サンプルメモ集](https://qiita.com/kure/items/6d09e4550814c3a17ed0) [AWS Step Functionsを実行するためのIAM Policy](https://qiita.com/nakano348/items/b169792d0ec356a352ef)
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?