目的
以下のRDS->lambda->AWSbatchという流れで実行したい
実現したいのはデータをテーブルにインサートしたら
INSERT INTO batch_order_data VALUES (NULL, 'item_001');
結果:hello worldなどpythonのスクリプトが実行される。
前提条件
(追加の手順があるため、必須ではない)
ECR URLが既にあること以下の図だとpipeline1で作られていた。
ci/cdのやり方だとpipeline1とpipeline2も事前にあること
しかしci/cdでなくても、実現できるので必須ではありません。
手順
1.事前にContainerImageを準備する必要があります。
dockerfileでcicd デプロイする方は以下のdockerfileをご参考に
FROM centos:latest
RUN curl -kl https://bootstrap.pypa.io/get-pip.py | python
RUN pip install awscli
RUN pip install boto3
RUN yum install -y git
ADD init.sh /usr/local/
もし手動でlocal環境(Mac)で作成するなら以下の手順ご活用:
//もととなるコンテナイメージをpull
$ docker pull centos
//コンテナイメージを確認する
$ docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
centos latest xxxxxxxxxxxx xx days ago 193MB
//コンテナの中に入る
$ docker run -it centos /bin/bash
//pipコマンドインストール
[root@xxx]# curl -kl https://bootstrap.pypa.io/get-pip.py | python
//aws-cliインストール
[root@xxx]# pip install awscli
//boto3インストール
[root@xxx]# pip install boto3
//gitインストール
[root@xxx]# yum install -y git
//CodeCommitからcloneして中のrun.shをキックするスクリプト
[root@xxx]# vi /usr/local/init.sh
---------
#!/bin/bash
CODE_REPO=https://git-codecommit.ap-northeast-1.amazonaws.com/v1/repos/<your repo>
CODE_DIR=/usr/local/test-batch
git clone --config credential.helper='!aws --region ap-northeast-1 codecommit credential-helper $@' --config credential.UseHttpPath=true $CODE_REPO $CODE_DIR
sh /usr/local/test-batch/run.sh
----------
[root@xxx]# exit
//作業時のコンテナIDを確認
$ docker ps -a -n=5
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
XXXXxxxxxxxx centos "/bin/bash" 9 seconds ago Exited (0) 1 second ago elated_hypatia
・
・
・
//「test-container」としてコンテナイメージ作成
$ docker commit XXXXxxxxxxxx test-container
//イメージが作られたことを確認
$ docker images
//コマンドが使えること、ファイルがあることを確認
$ docker run -it test-container /bin/bash
以下のecr.ymlを使ってAWSでECR を作成する
Parameters:
lifecyclePolicyText:
Type: String
repositoryName:
Type: String
Default: test-repo
registryId:
Type: String
Resources:
MyRepository:
Type: AWS::ECR::Repository
Properties:
LifecyclePolicy:
LifecyclePolicyText: !Ref lifecyclePolicyText
RegistryId: !Ref registryId
RepositoryName: !Ref repositoryName
Outputs:
Arn:
Value: !GetAtt MyRepository.Arn
作成したECRを登録する
$ aws ecr get-login --no-include-email --region ap-northeast-1
docker login ・・・・・・・・・
ながいログインコマンドが出力されます。
//出力されたコマンドを実行します。
$ docker login -u AWS -p <出力された値> https://xxxxxxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com
Login Succeeded
//作成したコンテナイメージ「test-container」をもとにタグをつけます
$ docker tag test-container:latest xxxxxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com/test-repo:latest
//リポジトリにpushします
$ docker push xxxxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com/test-repo:latest
2.codecommit repoを作成する
AWSTemplateFormatVersion: 2010-09-09
Description: CodeCommit repo.
Parameters:
RepositoryName:
Description:
Type: String
Default: test-commit
Resources:
MyRepo:
Type: AWS::CodeCommit::Repository
Properties:
RepositoryName: !Ref RepositoryName
RepositoryDescription: a CodeCommit repo for batch
Triggers:
- Name: MasterTrigger
CustomData: Project ID 12345
Branches:
- Master
Events:
- all
3.codecommit push
3秒ごとHello worldを表示するpythonのスクリプトtest.pyとtest.pyを実行するrun.shのシェルスクリプトを作成したcodecommit repoにプッシュする。
# -*- coding: utf-8 -*-
import boto3
from time import sleep
# S3
BUCKET_NAME = "test-bucket-batch"
TARGET_DIR = "batch"
FILE_CONTENTS = ""
# Timer
MAX_ITER = 2
SEC = 3
def PutS3(i):
s3 = boto3.resource('s3')
bucket = s3.Bucket(BUCKET_NAME)
filename = str(i+1)
obj = bucket.put_object(ACL='private', Body=FILE_CONTENTS, Key=TARGET_DIR + "/" + filename, ContentType='text/plain')
return str(obj)
def count(i):
print("{}Hello world".format((i+1)*SEC))
if __name__ == '__main__':
for i in range(MAX_ITER):
sleep(SEC)
count(i)
#PutS3(i)
#!/bin/bash
python /usr/local/test-batch/test.py
4.batch環境を作成します
AWSTemplateFormatVersion: 2010-09-09
Description: AWS Batch environment.
Parameters:
SubnetIds:
Description: Subnets For ComputeEnvironment
Type: List<AWS::EC2::Subnet::Id>
Default: subnet-xxxxxx
SecurityGroupIds:
Description: SecurityGroups For ComputeEnvironment
Type: List<AWS::EC2::SecurityGroup::Id>
Default: sg-xxxxxx
ContainerImage:
Description: Url of container image to use in ECS
Type: String
Default: xxxxxx.dkr.ecr.us-west-2.amazonaws.com/xxxxxx-repo:xxxxxx
Resources:
ecsInstanceRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- ec2.amazonaws.com
Action:
- sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role
- arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
- arn:aws:iam::aws:policy/AWSCodeCommitFullAccess
Path: "/"
ecsInstanceProfile:
Type: "AWS::IAM::InstanceProfile"
Properties:
Roles:
- !Ref ecsInstanceRole
AWSBatchServiceRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- batch.amazonaws.com
- ecs-tasks.amazonaws.com
Action:
- sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSBatchServiceRole
Path: "/service-role/"
ComputeEnv:
Type: "AWS::Batch::ComputeEnvironment"
Properties:
Type: MANAGED
ServiceRole: !GetAtt AWSBatchServiceRole.Arn
ComputeEnvironmentName: test-env
ComputeResources:
MaxvCpus: 256
MinvCpus: 0
DesiredvCpus: 0
SecurityGroupIds: !Ref SecurityGroupIds
Type: EC2
Subnets: !Ref SubnetIds
InstanceRole: !GetAtt ecsInstanceProfile.Arn
InstanceTypes:
- optimal
Tags: {"Name": "Batch Instance - test"}
State: ENABLED
JobQueue:
Type: AWS::Batch::JobQueue
Properties:
ComputeEnvironmentOrder:
- Order: 1
ComputeEnvironment: !Ref ComputeEnv
State: ENABLED
Priority: 1
JobQueueName: test-job-queues
JobDefinition:
Type: AWS::Batch::JobDefinition
Properties:
Type: container
JobDefinitionName: test-job-def
ContainerProperties:
Command:
- sh
- /usr/local/init.sh
Memory: 4048
Vcpus: 2
Image: !Ref ContainerImage
RetryStrategy:
Attempts: 1
5.Lambda functionを作成します
AWSTemplateFormatVersion: 2010-09-09
Description: Lambda Function.
Parameters:
NameofFunction:
Description: not use
Type: String
Default: test
Resources:
LambdaFunction:
Type: 'AWS::Lambda::Function'
DeletionPolicy: Delete
DependsOn:
- LambdaExecutionRole
Properties:
Code:
ZipFile: >
'use strict';
const AWS = require('aws-sdk');
const BATCH = new AWS.Batch({apiVersion: '2016-08-10'});
const JOBDEFINITION = 'test-job-def';
const JOBQUEUE = 'test-job-queues';
const JOBNAME = 'lambdajob';
console.log('Loading function');
var response = require('cfn-response');
exports.handler = (event, context, callback) => {
let params = {
jobDefinition: JOBDEFINITION,
jobName: JOBNAME,
jobQueue: JOBQUEUE
};
BATCH.submitJob(params, function (err, data) {
if (err) console.log(err, err.stack);
else console.log(data);
if (err) {
console.error(err);
const message = `Error calling SubmitJob for: ${event.jobName}`;
response.send(event, context, response.FAILED, message);
console.error(message);
callback(message);
} else {
const jobId = data.jobId;
response.send(event, context, response.SUCCESS);
console.log('jobId:', jobId);
callback(null, jobId);
}
});
};
Handler: index.handler
MemorySize: 128
Role: !GetAtt
- LambdaExecutionRole
- Arn
Runtime: nodejs4.3
Timeout: 10
LambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
ManagedPolicyArns:
- "arn:aws:iam::aws:policy/AWSBatchFullAccess"
- "arn:aws:iam::aws:policy/CloudWatchLogsFullAccess"
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: "/"
Outputs:
LambdaExecutionRole:
Description: LambdaExecutionRole name
Value:
Ref: LambdaExecutionRole
LambdaFunction:
Description: LambdaFunction name
Value:
Ref: LambdaFunction
6.RDS mysql serverを作成します
AWSTemplateFormatVersion: 2010-09-09
Description: Aurora mysql.
Metadata:
Version: 0.2.0
Parameters:
MasterUsername:
Description: MasterUsername on the RDS instance.
Type: String
Default: sa
RDSPassword:
Description: Password for root user on the RDS instance.
Type: String
NoEcho: 'true'
RDSDatabaseName:
Description: DB Identifier for RDS instance.
Type: String
Default: mydbname
RDSClass:
Description: RDS Instance Class
Type: String
Default: db.t2.small
DBIdentifier:
Description: Database Instance Identifier
Type: String
RDSSecurityGroupId:
Description: Existing internal SG for RDS instance access
Type: 'AWS::EC2::SecurityGroup::Id'
RDSRetention:
Description: How long to retain RDS snapshots 1~35
Type: String
Default: 20
RDSVpcId:
Description: VpcId for RDS instance
Type: 'AWS::EC2::VPC::Id'
PubliclyAccessible:
Description: Set the RDS to be publically available
Type: String
AllowedValues:
- 'true'
- 'false'
Default: 'true'
DBClusterIdentifier:
Description: The name of the DBCluster
Type: String
RDSRoleTag:
Description: sets if the tag for dev/prod use
Type: String
Default: dev
Resources:
LambdaRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- 'sts:AssumeRole'
LambdaPolicy:
Type: 'AWS::IAM::Policy'
Properties:
PolicyName: LambdaPolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 'iam:*'
- 'ec2:*'
- 'rds:*'
- 'logs:*'
- 'batch:*'
Resource: '*'
Roles:
- !Ref LambdaRole
LambdaFunction:
Type: 'AWS::Lambda::Function'
DeletionPolicy: Delete
DependsOn:
- LambdaRole
Properties:
Code:
ZipFile: !Join
- |+
- - ' var AWS = require(''aws-sdk'');'
- ' var rds = new AWS.RDS();'
- ' var response = require(''cfn-response'');'
- ' exports.handler = (event, context, callback) => {'
- ' var rolearn = event.ResourceProperties.RDSRole;'
- ' var dbclusteridentifier = event.ResourceProperties.DBClusterIdentifier;'
- ' var responseData = {};'
- ' console.log(''Role ARN: '' + rolearn);'
- ' console.log(''DBClusterIdentifier: '' + dbclusteridentifier);'
- ' var addroleparams = {'
- ' RoleArn: rolearn,'
- ' DBClusterIdentifier: dbclusteridentifier'
- ' };'
- ' if (event.RequestType == ''Delete'') {'
- ' response.send(event, context, response.SUCCESS);'
- ' return;'
- ' }'
- ' rds.addRoleToDBCluster(addroleparams, function(err, data) {'
- ' if (err) {'
- ' console.log(err, err.stack); // an error occurred'
- ' responseData = {Error: ''Create call failed''};'
- ' response.send(event, context, response.FAILED, responseData);'
- ' }'
- ' else {'
- ' response.send(event, context, response.SUCCESS, responseData);'
- ' console.log(data); // successful response'
- ' }'
- ' });'
- ' };'
Handler: index.handler
MemorySize: 128
Role: !GetAtt
- LambdaRole
- Arn
Runtime: nodejs4.3
Timeout: 10
RDSRole:
Type: 'AWS::IAM::Role'
Properties:
ManagedPolicyArns:
- "arn:aws:iam::aws:policy/AWSLambdaFullAccess"
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- rds.amazonaws.com
Action:
- 'sts:AssumeRole'
Path: /
RDSDBClusterParameterGroup:
Type: 'AWS::RDS::DBClusterParameterGroup'
Properties:
Parameters:
aws_default_lambda_role: !GetAtt
- RDSRole
- Arn
Family: aurora5.6
Description: A sample parameter group
RDSDBCluster:
Type: 'AWS::RDS::DBCluster'
DeletionPolicy: Retain
Properties:
BackupRetentionPeriod: !Ref RDSRetention
DatabaseName: !Ref RDSDatabaseName
DBSubnetGroupName: !Ref DBSubnetGroup
DBClusterParameterGroupName: !Ref RDSDBClusterParameterGroup
Engine: aurora
StorageEncrypted: true
MasterUsername: !Ref MasterUsername
MasterUserPassword: !Ref RDSPassword
Port: 3306
Tags:
- Key: Role
Value: !Ref RDSRoleTag
VpcSecurityGroupIds:
- !Ref RDSSecurityGroupId
DBSubnetGroup:
Type: "AWS::RDS::DBSubnetGroup"
Properties:
DBSubnetGroupDescription: "description"
SubnetIds:
- "subnet-xxxxxxxx"
- "subnet-xxxxxxxx"
- "subnet-xxxxxxxx"
RDSInstance:
Type: 'AWS::RDS::DBInstance'
DeletionPolicy: Retain
Properties:
AllowMajorVersionUpgrade: false
AutoMinorVersionUpgrade: true
DBClusterIdentifier: !Ref RDSDBCluster
DBInstanceIdentifier: !Ref DBIdentifier
DBInstanceClass: !Ref RDSClass
Engine: aurora
PubliclyAccessible: !Ref PubliclyAccessible
Tags:
- Key: Role
Value: !Ref RDSRoleTag
RDSInstanceSecurityGroup:
Type: 'AWS::EC2::SecurityGroup'
DeletionPolicy: Retain
Properties:
GroupDescription: Security group for the RDSInstance resource
SecurityGroupEgress:
- IpProtocol: tcp
CidrIp: 127.0.0.1/32
FromPort: '1'
ToPort: '1'
SecurityGroupIngress:
- IpProtocol: tcp
SourceSecurityGroupId: !Ref RDSSecurityGroupId
FromPort: '3306'
ToPort: '3306'
VpcId: !Ref RDSVpcId
Tags:
- Key: Role
Value: !Ref RDSRoleTag
AddRoleToDBCluster:
DependsOn:
- RDSDBCluster
- RDSInstance
Type: 'Custom::AddRoleToDBCluster'
Properties:
ServiceToken: !GetAtt
- LambdaFunction
- Arn
RDSRole: !GetAtt
- RDSRole
- Arn
DBClusterIdentifier: !Ref RDSDBCluster
動作確認
データベースに入りサンプルのテーブルを設定する
CALL mysql.lambda_async はlambda 関数のarnを入れてください
#1 Stored Procedure
DROP PROCEDURE IF EXISTS batch;
DELIMITER ;;
CREATE PROCEDURE batch (IN id INT(11),
IN item VARCHAR(50)) LANGUAGE SQL
BEGIN
CALL mysql.lambda_async('arn:aws:lambda:us-west-2:xxxxxxxxx:function:testlambda1-LambdaFunction-xxxxxxx',
CONCAT('{ "id" : "', id,
'", "item" : "', item, '" }')
);
END
;;
DELIMITER ;
#2 create Table
CREATE TABLE `batch_order_data` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`item` varchar(50) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
#3 create Trigger
DELIMITER ;;
CREATE TRIGGER trigger_batch
AFTER INSERT ON batch_order_data
FOR EACH ROW
BEGIN
CALL batch(NEW.id, NEW.item);
END
;;
DELIMITER ;
データを入れて見たら
INSERT INTO batch_order_data VALUES (NULL, 'item_001');