LoginSignup
0
0

More than 5 years have passed since last update.

データをテーブルにインサートされると、バーチジョブが実行される(cfn)

Last updated at Posted at 2018-08-22

目的

以下のRDS->lambda->AWSbatchという流れで実行したい
実現したいのはデータをテーブルにインサートしたら
INSERT INTO batch_order_data VALUES (NULL, 'item_001');
結果:hello worldなどpythonのスクリプトが実行される。

前提条件

(追加の手順があるため、必須ではない)
ECR URLが既にあること以下の図だとpipeline1で作られていた。
ci/cdのやり方だとpipeline1とpipeline2も事前にあること
しかしci/cdでなくても、実現できるので必須ではありません。
Untitled Diagram (3).jpg

手順

1.事前にContainerImageを準備する必要があります。
dockerfileでcicd デプロイする方は以下のdockerfileをご参考に

FROM centos:latest
RUN curl -kl https://bootstrap.pypa.io/get-pip.py | python
RUN pip install awscli
RUN pip install boto3
RUN yum install -y git
ADD init.sh /usr/local/

もし手動でlocal環境(Mac)で作成するなら以下の手順ご活用:

//もととなるコンテナイメージをpull
$ docker pull centos

//コンテナイメージを確認する
$ docker images
REPOSITORY            TAG                 IMAGE ID            CREATED             SIZE
centos                latest              xxxxxxxxxxxx        xx days ago         193MB

//コンテナの中に入る
$ docker run -it centos /bin/bash

//pipコマンドインストール
[root@xxx]# curl -kl https://bootstrap.pypa.io/get-pip.py | python

//aws-cliインストール
[root@xxx]# pip install awscli

//boto3インストール
[root@xxx]# pip install boto3

//gitインストール
[root@xxx]# yum install -y git

//CodeCommitからcloneして中のrun.shをキックするスクリプト
[root@xxx]# vi /usr/local/init.sh
---------
#!/bin/bash

CODE_REPO=https://git-codecommit.ap-northeast-1.amazonaws.com/v1/repos/<your repo>
CODE_DIR=/usr/local/test-batch

git clone --config credential.helper='!aws --region ap-northeast-1 codecommit credential-helper $@' --config credential.UseHttpPath=true $CODE_REPO $CODE_DIR

sh /usr/local/test-batch/run.sh
----------

[root@xxx]# exit
//作業時のコンテナIDを確認
$ docker ps -a -n=5
CONTAINER ID        IMAGE               COMMAND             CREATED             STATUS                     PORTS               NAMES
XXXXxxxxxxxx        centos              "/bin/bash"         9 seconds ago       Exited (0) 1 second ago                        elated_hypatia
・
・
・
//「test-container」としてコンテナイメージ作成
$ docker commit XXXXxxxxxxxx test-container

//イメージが作られたことを確認
$ docker images

//コマンドが使えること、ファイルがあることを確認
$ docker run -it test-container /bin/bash 

以下のecr.ymlを使ってAWSでECR を作成する

ecr.yml
Parameters:    
  lifecyclePolicyText:
    Type: String
  repositoryName:
    Type: String
    Default: test-repo
  registryId:
    Type: String
Resources:
  MyRepository:
    Type: AWS::ECR::Repository
    Properties:
      LifecyclePolicy:
        LifecyclePolicyText: !Ref lifecyclePolicyText
        RegistryId: !Ref registryId
      RepositoryName: !Ref repositoryName
Outputs:    
  Arn:
    Value: !GetAtt MyRepository.Arn

作成したECRを登録する

$ aws ecr get-login --no-include-email --region ap-northeast-1
docker login ・・・・・・・・・
ながいログインコマンドが出力されます。

//出力されたコマンドを実行します。
$ docker login -u AWS -p <出力された値> https://xxxxxxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com
Login Succeeded

//作成したコンテナイメージ「test-container」をもとにタグをつけます
$ docker tag test-container:latest xxxxxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com/test-repo:latest

//リポジトリにpushします
$ docker push xxxxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com/test-repo:latest

2.codecommit repoを作成する

codecommit.yml
AWSTemplateFormatVersion: 2010-09-09
Description: CodeCommit repo.
Parameters:
  RepositoryName:
    Description: 
    Type: String
    Default: test-commit
Resources:
  MyRepo:
    Type: AWS::CodeCommit::Repository
    Properties:
      RepositoryName: !Ref RepositoryName
      RepositoryDescription: a CodeCommit repo for batch
      Triggers:
      - Name: MasterTrigger
        CustomData: Project ID 12345
        Branches:
        - Master
        Events:
        - all

3.codecommit push
3秒ごとHello worldを表示するpythonのスクリプトtest.pyとtest.pyを実行するrun.shのシェルスクリプトを作成したcodecommit repoにプッシュする。

test.py
# -*- coding: utf-8 -*-
import boto3
from time import sleep
# S3
BUCKET_NAME = "test-bucket-batch"
TARGET_DIR = "batch"
FILE_CONTENTS = ""
# Timer
MAX_ITER = 2
SEC = 3
def PutS3(i):
  s3 = boto3.resource('s3')
  bucket = s3.Bucket(BUCKET_NAME)
  filename = str(i+1)
  obj = bucket.put_object(ACL='private', Body=FILE_CONTENTS, Key=TARGET_DIR + "/" + filename, ContentType='text/plain')
  return str(obj)
def count(i):
  print("{}Hello world".format((i+1)*SEC))
if __name__ == '__main__':
  for i in range(MAX_ITER):
    sleep(SEC)
    count(i)
    #PutS3(i)
run.sh
#!/bin/bash
python /usr/local/test-batch/test.py

4.batch環境を作成します

Batch.yml
AWSTemplateFormatVersion: 2010-09-09
Description: AWS Batch environment.
Parameters:
  SubnetIds:
    Description: Subnets For ComputeEnvironment
    Type: List<AWS::EC2::Subnet::Id>
    Default: subnet-xxxxxx
  SecurityGroupIds:
    Description: SecurityGroups For ComputeEnvironment
    Type: List<AWS::EC2::SecurityGroup::Id>
    Default: sg-xxxxxx
  ContainerImage:
    Description: Url of container image to use in ECS
    Type: String
    Default: xxxxxx.dkr.ecr.us-west-2.amazonaws.com/xxxxxx-repo:xxxxxx
Resources:
  ecsInstanceRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Principal:
            Service:
            - ec2.amazonaws.com
          Action:
          - sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
        - arn:aws:iam::aws:policy/AWSCodeCommitFullAccess
      Path: "/"
  ecsInstanceProfile:
    Type: "AWS::IAM::InstanceProfile"
    Properties:
      Roles:
        - !Ref ecsInstanceRole
  AWSBatchServiceRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Principal:
            Service:
            - batch.amazonaws.com
            - ecs-tasks.amazonaws.com
          Action:
          - sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSBatchServiceRole
      Path: "/service-role/"
  ComputeEnv:
    Type: "AWS::Batch::ComputeEnvironment"
    Properties:
      Type: MANAGED
      ServiceRole: !GetAtt AWSBatchServiceRole.Arn
      ComputeEnvironmentName: test-env
      ComputeResources:
        MaxvCpus: 256
        MinvCpus: 0
        DesiredvCpus: 0
        SecurityGroupIds: !Ref SecurityGroupIds
        Type: EC2
        Subnets: !Ref SubnetIds
        InstanceRole: !GetAtt ecsInstanceProfile.Arn
        InstanceTypes:
          - optimal
        Tags: {"Name": "Batch Instance - test"}
      State: ENABLED
  JobQueue:
    Type: AWS::Batch::JobQueue
    Properties:
      ComputeEnvironmentOrder:
        - Order: 1
          ComputeEnvironment: !Ref ComputeEnv
      State: ENABLED
      Priority: 1
      JobQueueName: test-job-queues
  JobDefinition:
    Type: AWS::Batch::JobDefinition
    Properties:
      Type: container
      JobDefinitionName: test-job-def
      ContainerProperties:
        Command:
          - sh
          - /usr/local/init.sh
        Memory: 4048
        Vcpus: 2
        Image: !Ref ContainerImage
      RetryStrategy:
        Attempts: 1

5.Lambda functionを作成します

Lambda.yml
AWSTemplateFormatVersion: 2010-09-09
Description: Lambda Function.
Parameters:
    NameofFunction:
      Description: not use
      Type: String
      Default: test
Resources:
  LambdaFunction:
    Type: 'AWS::Lambda::Function'
    DeletionPolicy: Delete
    DependsOn:
      - LambdaExecutionRole
    Properties:
      Code:
        ZipFile: >
          'use strict';
          const AWS = require('aws-sdk');
          const BATCH         = new AWS.Batch({apiVersion: '2016-08-10'});
          const JOBDEFINITION = 'test-job-def';
          const JOBQUEUE      = 'test-job-queues';
          const JOBNAME      = 'lambdajob';
          console.log('Loading function');
          var response = require('cfn-response');
          exports.handler = (event, context, callback) => {
              let params = {
                  jobDefinition: JOBDEFINITION,
                  jobName: JOBNAME,
                  jobQueue: JOBQUEUE
              };
              BATCH.submitJob(params, function (err, data) {
                  if (err) console.log(err, err.stack);
                  else console.log(data);
                  if (err) {
                      console.error(err);
                      const message = `Error calling SubmitJob for: ${event.jobName}`;
                      response.send(event, context, response.FAILED, message);
                      console.error(message);
                      callback(message);
                  } else {
                      const jobId = data.jobId;
                      response.send(event, context, response.SUCCESS);
                      console.log('jobId:', jobId);
                      callback(null, jobId);
                  }
              });
          };
      Handler: index.handler
      MemorySize: 128
      Role: !GetAtt
        - LambdaExecutionRole
        - Arn
      Runtime: nodejs4.3
      Timeout: 10
  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      ManagedPolicyArns:
        - "arn:aws:iam::aws:policy/AWSBatchFullAccess"
        - "arn:aws:iam::aws:policy/CloudWatchLogsFullAccess"
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Principal:
            Service:
            - lambda.amazonaws.com
          Action:
          - sts:AssumeRole
      Path: "/"
Outputs:
  LambdaExecutionRole:
    Description: LambdaExecutionRole name
    Value:
      Ref: LambdaExecutionRole
  LambdaFunction:
    Description: LambdaFunction name
    Value:
      Ref: LambdaFunction

6.RDS mysql serverを作成します

rds.yml
AWSTemplateFormatVersion: 2010-09-09
Description: Aurora mysql.
Metadata:
  Version: 0.2.0
Parameters:
  MasterUsername:
    Description: MasterUsername on the RDS instance.
    Type: String
    Default: sa
  RDSPassword:
    Description: Password for root user on the RDS instance.
    Type: String
    NoEcho: 'true'
  RDSDatabaseName:
    Description: DB Identifier for RDS instance.
    Type: String
    Default: mydbname
  RDSClass:
    Description: RDS Instance Class
    Type: String
    Default: db.t2.small
  DBIdentifier:
    Description: Database Instance Identifier
    Type: String
  RDSSecurityGroupId:
    Description: Existing internal SG for RDS instance access
    Type: 'AWS::EC2::SecurityGroup::Id'
  RDSRetention:
    Description: How long to retain RDS snapshots 1~35
    Type: String
    Default: 20
  RDSVpcId:
    Description: VpcId for RDS instance
    Type: 'AWS::EC2::VPC::Id'
  PubliclyAccessible:
    Description: Set the RDS to be publically available
    Type: String
    AllowedValues:
      - 'true'
      - 'false'
    Default: 'true'
  DBClusterIdentifier:
    Description: The name of the DBCluster
    Type: String
  RDSRoleTag:
    Description: sets if the tag for dev/prod use
    Type: String
    Default: dev
Resources:
  LambdaRole:
    Type: 'AWS::IAM::Role'
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - 'sts:AssumeRole'
  LambdaPolicy:
    Type: 'AWS::IAM::Policy'
    Properties:
      PolicyName: LambdaPolicy
      PolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action:
              - 'iam:*'
              - 'ec2:*'
              - 'rds:*'
              - 'logs:*'
              - 'batch:*'
            Resource: '*'
      Roles:
        - !Ref LambdaRole
  LambdaFunction:
    Type: 'AWS::Lambda::Function'
    DeletionPolicy: Delete
    DependsOn:
      - LambdaRole
    Properties:
      Code:
        ZipFile: !Join
          - |+

          - - '          var AWS = require(''aws-sdk'');'
            - '          var rds = new AWS.RDS();'
            - '          var response = require(''cfn-response'');'
            - '          exports.handler = (event, context, callback) => {'
            - '              var rolearn = event.ResourceProperties.RDSRole;'
            - '              var dbclusteridentifier = event.ResourceProperties.DBClusterIdentifier;'
            - '              var responseData = {};'
            - '              console.log(''Role ARN: '' + rolearn);'
            - '              console.log(''DBClusterIdentifier: '' + dbclusteridentifier);'
            - '              var addroleparams = {'
            - '                  RoleArn: rolearn,'
            - '                  DBClusterIdentifier: dbclusteridentifier'
            - '                };'
            - '                if (event.RequestType == ''Delete'') {'
            - '                  response.send(event, context, response.SUCCESS);'
            - '                  return;'
            - '                }'
            - '                rds.addRoleToDBCluster(addroleparams, function(err, data) {'
            - '                  if (err) {'
            - '                  console.log(err, err.stack); // an error occurred'
            - '                  responseData = {Error: ''Create call failed''};'
            - '                  response.send(event, context, response.FAILED, responseData);'
            - '                  }'
            - '                  else {'
            - '                  response.send(event, context, response.SUCCESS, responseData);'
            - '                  console.log(data);  // successful response'
            - '                  }'
            - '                });'
            - '          };'
      Handler: index.handler
      MemorySize: 128
      Role: !GetAtt
        - LambdaRole
        - Arn
      Runtime: nodejs4.3
      Timeout: 10
  RDSRole:
    Type: 'AWS::IAM::Role'
    Properties:
      ManagedPolicyArns:
        - "arn:aws:iam::aws:policy/AWSLambdaFullAccess"
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - rds.amazonaws.com
            Action:
              - 'sts:AssumeRole'
      Path: /
  RDSDBClusterParameterGroup:
    Type: 'AWS::RDS::DBClusterParameterGroup'
    Properties:
      Parameters:
        aws_default_lambda_role: !GetAtt
          - RDSRole
          - Arn
      Family: aurora5.6
      Description: A sample parameter group
  RDSDBCluster:
    Type: 'AWS::RDS::DBCluster'
    DeletionPolicy: Retain
    Properties:
      BackupRetentionPeriod: !Ref RDSRetention
      DatabaseName: !Ref RDSDatabaseName
      DBSubnetGroupName: !Ref DBSubnetGroup
      DBClusterParameterGroupName: !Ref RDSDBClusterParameterGroup
      Engine: aurora
      StorageEncrypted: true
      MasterUsername: !Ref MasterUsername
      MasterUserPassword: !Ref RDSPassword
      Port: 3306
      Tags:
        - Key: Role
          Value: !Ref RDSRoleTag
      VpcSecurityGroupIds:
        - !Ref RDSSecurityGroupId
  DBSubnetGroup:
    Type: "AWS::RDS::DBSubnetGroup"
    Properties:
      DBSubnetGroupDescription: "description"
      SubnetIds:
        - "subnet-xxxxxxxx"
        - "subnet-xxxxxxxx"
        - "subnet-xxxxxxxx"
  RDSInstance:
    Type: 'AWS::RDS::DBInstance'
    DeletionPolicy: Retain
    Properties:
      AllowMajorVersionUpgrade: false
      AutoMinorVersionUpgrade: true
      DBClusterIdentifier: !Ref RDSDBCluster
      DBInstanceIdentifier: !Ref DBIdentifier
      DBInstanceClass: !Ref RDSClass
      Engine: aurora
      PubliclyAccessible: !Ref PubliclyAccessible
      Tags:
        - Key: Role
          Value: !Ref RDSRoleTag
  RDSInstanceSecurityGroup:
    Type: 'AWS::EC2::SecurityGroup'
    DeletionPolicy: Retain
    Properties:
      GroupDescription: Security group for the RDSInstance resource
      SecurityGroupEgress:
        - IpProtocol: tcp
          CidrIp: 127.0.0.1/32
          FromPort: '1'
          ToPort: '1'
      SecurityGroupIngress:
        - IpProtocol: tcp
          SourceSecurityGroupId: !Ref RDSSecurityGroupId
          FromPort: '3306'
          ToPort: '3306'
      VpcId: !Ref RDSVpcId
      Tags:
        - Key: Role
          Value: !Ref RDSRoleTag
  AddRoleToDBCluster:
    DependsOn:
      - RDSDBCluster
      - RDSInstance
    Type: 'Custom::AddRoleToDBCluster'
    Properties:
      ServiceToken: !GetAtt
        - LambdaFunction
        - Arn
      RDSRole: !GetAtt
        - RDSRole
        - Arn
      DBClusterIdentifier: !Ref RDSDBCluster

動作確認

データベースに入りサンプルのテーブルを設定する
CALL mysql.lambda_async はlambda 関数のarnを入れてください

#1 Stored Procedure

DROP PROCEDURE IF EXISTS batch;
DELIMITER ;;
CREATE PROCEDURE batch (IN id INT(11),
                                      IN item VARCHAR(50)) LANGUAGE SQL
BEGIN
  CALL mysql.lambda_async('arn:aws:lambda:us-west-2:xxxxxxxxx:function:testlambda1-LambdaFunction-xxxxxxx',
    CONCAT('{ "id" : "', id,
            '", "item" : "', item, '" }')
     );
END
;;
DELIMITER ;


#2 create Table

CREATE TABLE `batch_order_data` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `item` varchar(50) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;



#3 create Trigger

DELIMITER ;;
CREATE TRIGGER trigger_batch
  AFTER INSERT ON batch_order_data
  FOR EACH ROW
BEGIN
  CALL batch(NEW.id, NEW.item);
END
;;
DELIMITER ;

データを入れて見たら

INSERT INTO batch_order_data VALUES (NULL, 'item_001');

結果:cloudwatchでhello worldが表示されることがわかります。
Screen Shot 2018-08-22 at 14.00.50.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0