概要
以下の構成で実装しました。
- CloudWatchEventsをトリガーとしてAWS Batchを起動し、登録しておいたプロシージャでRDSに対してクエリを実行する
- AWS Batchのジョブ実行結果が失敗した場合は、Lambda関数によってSlackに通知する
上記をCloudFormationで実装したので紹介します。
AWS Batchは初めて触るリソースだったのと、なかなか上記構成に該当の記事が見つからず苦戦したところもあったので、備忘も兼ねて丁寧に整理して記載しました。
実施手順
CloudFormationを実行するにあたり、事前準備も含め手順は以下の通りです。
①AWS Secret ManagerでRDSのアクセス情報を登録(dbnameも追加で登録)。
②EC2でKeyPairを作成し、ダウンロードしておく。
③Slack Incoming Webhookを作成する。
④RDSデータベースにプロシージャ(SQLクエリ)を登録する。
⑤CloudFormationテンプレートを作成し、適用する(後述で詳細に記載)。
⑥ECRにDockerイメージをPushする。
バッチ構成図
※上記構成のうち、VPC/IGW/NATGateway/Subnet/RDS/踏み台サーバは、今回のCloudFormationでは作成しません
前提・留意点
・上記④によりRDSにInsertDataProcedure
というストアドプロシージャを登録していますが、以下にサンプルを記載します。以下内容の通り、今回はサンプルとしてデータ追加というクエリをAWS Batchで自動実行させます。中身はよしなに変えてください。
DELIMITER //
CREATE PROCEDURE InsertDataProcedure()
BEGIN
INSERT INTO sample_table (column1, column2, column3)
VALUES ('hoge', 'foo', '123');
END;
//
DELIMITER ;
・RDSのセキュリティグループのインバウンドルールに、Batchのセキュリティグループを追加するのを忘れない様にしましょう。
参考:【AWS】ERROR 2002 (HY000): Can't connect to MySQL server on 'xxx' (115)の原因
・実行結果をSlackに通知する際に、CloudWatchEventからSNS経由無しでLambdaに直接連携してもOKです。関数コードについては以下でまとめています。
参考:【AWS】CloudWatchEvents→(SNS→)Lambda→Slackへ通知する場合のLambda関数コード
・AWS Batchのジョブ定義にコマンドを指定するとき、少し詰まったので、以下でまとめています。ご参考まで。
参考:【AWS Batch】ジョブ定義のコマンドでDBの環境変数を使う時!
・BatchのEC2インスタンスに入るケースはあまりないと思われますが、念の為、踏み台サーバから入れるようにしています。不要であれば、SSHサーバからのアクセス許可(Batchセキュリティグループでの設定)やEC2キーペアの部分は無視してください。
Batchジョブが「RUNNABLE
」で止まってしまう場合
AWS Batchのエラーとして「RUNNABLE
」で止まってしまうという現象があり、自分も躓きました。ここは色々な記事で書かれている通り、インターネットと繋がっているか確認することが大事です(EC2がECSサービスエンドポイントと通信する際に外部ネットワークアクセスが必要なため)。特にPrivateSubnetを使う場合は、NATGatewayと繋がっているかを確認しましょう。PrivateSubnetについては作成済みのものをインポートしていますが、新規にPrivate Subnetを作成する場合のテンプレートも別で下に記載しています。
また、上記手順⑥に記載しましたが、ECRでリポジトリが作成されていても、その中にイメージが存在しない場合、当然ですがジョブは「RUNNABLE
」のままになります。ジョブがジョブ定義に設定された Docker イメージを見つけることができないため、実行されない状態になるのです。自分はCLIからPUSHしました。その場合は、ECRのマネコン画面にコマンドの説明があるので参考にしましょう。ここは別の方法でImageのプッシュをするでもOKです(むしろその方が良いケースもあり)。
CloudFormationテンプレート
AWSTemplateFormatVersion: 2010-09-09
Parameters:
########################################
# Common
########################################
ProjectKey:
Type: String
Default: "Project"
EnvKey:
Type: String
Default: "Environment"
EnvTag:
Type: String
Default: stg
AllowedValues:
- stg
- prd
SysTag:
Type: String
Default: "samplesystem"
ProjectTag:
Type: String
Default: "sampleproject"
RdsSecret:
Type: String
Default: SampleRdsSecret # RDSのSecret名を指定(手順①で作成したもの)
KeyPair:
Type: String
Default: SampleKeyPair # KeyPair名を指定(手順②で作成したもの)
########################################
# SNS
########################################
SlackWebhookURL: # WebhookURLを指定(手順③で作成したもの)
Type: String
Default: "https://hooks.slack.com/services/XXXXXXXXX/XXXXXXXXX/XXXXXXXXXXXXXXXXXXXXXXXX"
AllowedValues:
- "https://hooks.slack.com/services/XXXXXXXXX/XXXXXXXXX/XXXXXXXXXXXXXXXXXXXXXXXX"
- "https://hooks.slack.com/services/XXXXXXXXX/XXXXXXXXX/XXXXXXXXXXXXXXXXXXXXXXXXX"
Resources:
########################################
# Role
########################################
BatchServiceRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- batch.amazonaws.com
Action: 'sts:AssumeRole'
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSBatchServiceRole
Path: "/"
RoleName: BatchServiceRole
EcsInstanceRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2008-10-17'
Statement:
- Sid: ''
Effect: Allow
Principal:
Service: ec2.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role
Path: "/"
RoleName: BatchEcsInstanceRole
BatchInstanceProfile:
Type: "AWS::IAM::InstanceProfile"
Properties:
Roles:
- !Ref EcsInstanceRole
CloudWatchEventsRoleForBatch:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
-
Effect: Allow
Principal:
Service:
- events.amazonaws.com
Action: 'sts:AssumeRole'
Path: "/"
RoleName: BatchCloudWatchEventsRole
Policies:
- PolicyName: BatchJobSubmitPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: 'batch:SubmitJob'
Resource:
- !Sub ${BatchJobDefinition}
- !Sub ${BatchJobQueue}
LambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
-
Effect: Allow
Principal:
Service:
- sns.amazonaws.com
- lambda.amazonaws.com
- events.amazonaws.com
Action:
- sts:AssumeRole
Path: "/"
RoleName: BatchLambdaExecutionRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AmazonSNSFullAccess
- arn:aws:iam::aws:policy/AWSLambdaExecute
SNSTopicPolicy:
Type: AWS::SNS::TopicPolicy
Properties:
PolicyDocument:
Id: SNSPolicy
Version: '2012-10-17'
Statement:
- Sid: !Sub AWSEvents_${SysTag}-${EnvTag}-Batch-JobFailureRule
Effect: Allow
Principal:
Service: events.amazonaws.com
Action:
- sns:GetTopicAttributes
- sns:SetTopicAttributes
- sns:AddPermission
- sns:RemovePermission
- sns:DeleteTopic
- sns:Subscribe
- sns:ListSubscriptionsByTopic
- sns:Publish
Resource: !Ref BatchJobFailureTopic
Topics:
- !Ref BatchJobFailureTopic
########################################
# SecurityGroup
########################################
BatchSecurityGroup:
Type: 'AWS::EC2::SecurityGroup'
Properties:
GroupName: !Sub ${SysTag}-${EnvTag}-batch-ECS
GroupDescription: !Sub ${SysTag}-${EnvTag}-batch-ECS
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 22
ToPort: 22
SourceSecurityGroupId: !ImportValue SshSecurityGroup
VpcId: !ImportValue VPC
Tags:
- Key: Name
Value: !Sub ${SysTag}-batch-ECS
- Key: !Ref ProjectKey
Value: !Ref ProjectTag
- Key: !Ref EnvKey
Value: !Ref EnvTag
########################################
# ECR
########################################
EcrBatchRepository:
Type: AWS::ECR::Repository
Properties:
RepositoryName: !Sub ${SysTag}_batch
Tags:
- Key: Name
Value: !Ref SysTag
- Key: !Ref ProjectKey
Value: !Ref ProjectTag
- Key: !Ref EnvKey
Value: !Ref EnvTag
RepositoryPolicyText:
Version: "2012-10-17"
Statement:
-
Sid: AllowPushPull
Effect: Allow
Principal:
AWS:
- !Sub arn:aws:iam::${AWS::AccountId}:role/${EcsInstanceRole}
Action:
- "ecr:GetDownloadUrlForLayer"
- "ecr:BatchGetImage"
- "ecr:BatchCheckLayerAvailability"
- "ecr:PutImage"
- "ecr:InitiateLayerUpload"
- "ecr:UploadLayerPart"
- "ecr:CompleteLayerUpload"
########################################
# Batch
########################################
BatchJobDefinition:
Type: 'AWS::Batch::JobDefinition'
Properties:
Type: container
ContainerProperties:
Image: !Join [ ":", [ !GetAtt EcrBatchRepository.RepositoryUri, "latest" ] ]
Memory: 1024
Vcpus: 1
Environment:
- Name: RDS_HOST
Value: !Sub '{{resolve:secretsmanager:${RdsSecret}:SecretString:host}}'
- Name: RDS_PORT
Value: !Sub '{{resolve:secretsmanager:${RdsSecret}:SecretString:port}}'
- Name: RDS_DB_NAME
Value: !Sub '{{resolve:secretsmanager:${RdsSecret}:SecretString:dbname}}'
- Name: RDS_USERNAME
Value: !Sub '{{resolve:secretsmanager:${RdsSecret}:SecretString:username}}'
- Name: RDS_PASSWORD
Value: !Sub '{{resolve:secretsmanager:${RdsSecret}:SecretString:password}}'
- Name: CALL_PROCEDURE
Value: 'CALL InsertDataProcedure();' # プロシージャ名を指定(手順④で作成したもの)
Command:
- "sh"
- "-c"
- "mysql -h $RDS_HOST -P $RDS_PORT -u $RDS_USERNAME -p$RDS_PASSWORD -D $RDS_DB_NAME -e \"$CALL_PROCEDURE\""
JobDefinitionName: !Sub ${SysTag}-${EnvTag}-batch-job-definition
RetryStrategy:
Attempts: 1
BatchComputeEnvironment:
Type: 'AWS::Batch::ComputeEnvironment'
Properties:
Type: MANAGED
ServiceRole: !GetAtt BatchServiceRole.Arn
ComputeEnvironmentName: !Sub ${SysTag}${EnvTag}-batch-compute-environment
ComputeResources:
Ec2KeyPair: !Ref KeyPair
Type: EC2
MinvCpus: 0
MaxvCpus: 256
DesiredvCpus: 1
InstanceTypes:
- optimal
InstanceRole: !GetAtt BatchInstanceProfile.Arn
Subnets:
- !ImportValue PrivateSubnet1 # Subnetは別で作成したものを指定。新規作成の場合は後述
- !ImportValue PrivateSubnet2 # Subnetは別で作成したものを指定。新規作成の場合は後述
SecurityGroupIds:
- !Ref BatchSecurityGroup
Tags: {"Name": "Batch-instance"}
State: ENABLED
BatchJobQueue:
Type: 'AWS::Batch::JobQueue'
Properties:
ComputeEnvironmentOrder:
- ComputeEnvironment: !Ref BatchComputeEnvironment
Order: 1
Priority: 1
State: ENABLED
JobQueueName: !Sub ${SysTag}-${EnvTag}-batch-job-queue
########################################
# CloudWatchEventRule
########################################
CloudWatchEventRule:
Type: 'AWS::Events::Rule'
Properties:
Name: !Sub ${SysTag}-${EnvTag}-Batch-CloudWatchEventRule
Description: 'Trigger the batch job of inserting the data'
ScheduleExpression: 'cron(0 0 1 * ? *)'
State: ENABLED
Targets:
- Arn: !Ref BatchJobQueue
Id: !Sub ${SysTag}-batch-job-queue-target
BatchParameters:
JobDefinition: !Ref BatchJobDefinition
JobName: !Sub ${SysTag}-${EnvTag}-Batch-Job
RoleArn: !GetAtt CloudWatchEventsRoleForBatch.Arn
BatchJobFailureRule:
Type: AWS::Events::Rule
Properties:
Name: !Sub ${SysTag}-${EnvTag}-Batch-JobFailureRule
EventPattern:
source:
- aws.batch
detail-type:
- Batch Job State Change
detail:
status:
- FAILED
Targets:
- Arn: !Ref BatchJobFailureTopic
Id: "TargetFunction"
################################################################################
# Lambda
################################################################################
BatchLambdaFunction:
Type: AWS::Lambda::Function
Properties:
Handler: index.lambda_handler
Role: !GetAtt LambdaExecutionRole.Arn
FunctionName: BatchJobFailureNotifier
Runtime: python3.7
Environment:
Variables:
SLACK_WEBHOOK_URL: !Sub ${SlackWebhookURL}
Code:
ZipFile: |
#!/usr/bin/python3.7
import json
import boto3
import os
import requests
from datetime import datetime, timezone, timedelta
def lambda_handler(event, context):
slack_webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
print (event)
sns_message = json.loads(event['Records'][0]['Sns']['Message'])
event_detail = sns_message['detail']
print (event_detail)
detail_status = event_detail['status']
print (detail_status)
event_time_utc = sns_message['time']
event_time_utc = datetime.fromisoformat(event_time_utc[:-1])
event_time_local = event_time_utc.astimezone(timezone(timedelta(hours=9)))
print("ローカル時間:", event_time_local)
if detail_status == 'FAILED':
message = f"BatchジョブがFAILEDしました。\n時間: {event_time_local}\nジョブ名: {event_detail['jobName']}\nジョブID: {event_detail['jobId']}"
print(message)
send_slack_notification(slack_webhook_url, message)
def send_slack_notification(webhook_url, message):
payload = {
"text": message
}
headers = {
"Content-Type": "application/json"
}
response = requests.post(webhook_url, data=json.dumps(payload), headers=headers)
if response.status_code != 200:
print(f"Slack通知の送信に失敗しました。ステータスコード: {response.status_code}")
else:
print("Slack通知が正常に送信されました。")
BatchLambdaSNSPermission:
Type: AWS::Lambda::Permission
Properties:
Action: lambda:InvokeFunction
FunctionName: !GetAtt BatchLambdaFunction.Arn
Principal: sns.amazonaws.com
SourceArn: !Ref BatchJobFailureTopic
################################################################################
# SNS
################################################################################
BatchJobFailureTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: BatchJobFailureTopic
DisplayName: BatchJobFailureTopic
BatchLogLambdaSubscription:
Type: AWS::SNS::Subscription
Properties:
Endpoint: !GetAtt BatchLambdaFunction.Arn
Protocol: 'lambda'
TopicArn: !Ref BatchJobFailureTopic
新規PrivateSubnetと既存NAT Gatewayを使うとき
上記の例では、PrivateSubnetは既存のものをインポートしています。
もし新規に作成する場合は、以下のようにします(NAT Gatewayは1対多で使えるので既存リソースを使う前提)。
ポイントは、NATGatewayを使ってインターネットと繋げること。ここを失念していると、AWS Batchのジョブが正常に実行されないので気を付けましょう(Public Subnetに置く場合はInternetGateway
を使用&RouteTable
と紐づける)。
Parameters:
########################################
# Common
########################################
NATGatewayPublic1:
Type: String
Default: "nat-XXXXXXXXXXXXX"
NATGatewayPublic2:
Type: String
Default: "nat-XXXXXXXXXXXXX"
########################################
# VPC
########################################
PrivateCidr1:
Type: String
Default: "192.168.XX.X/24"
PrivateCidr2:
Type: String
Default: "192.168.XX.X/24"
AzName1:
Type: String
Default: "ap-northeast-1a"
AzName2:
Type: String
Default: "ap-northeast-1c"
Resources:
########################################
# Subnet
########################################
PrivateSubnet1:
Type: AWS::EC2::Subnet
Properties:
CidrBlock: !Ref PrivateCidr1
AvailabilityZone: !Ref AzName1
MapPublicIpOnLaunch: false
VpcId: !ImportValue VPC
Tags:
- Key: Name
Value: !Sub ${SysTag}-Batch-Private1
- Key: !Ref ProjectKey
Value: !Ref ProjectTag
- Key: !Ref EnvKey
Value: !Ref EnvTag
PrivateSubnet2:
Type: AWS::EC2::Subnet
Properties:
CidrBlock: !Ref PrivateCidr2
AvailabilityZone: !Ref AzName2
MapPublicIpOnLaunch: false
VpcId: !ImportValue VPC
Tags:
- Key: Name
Value: !Sub ${SysTag}-Batch-Private2
- Key: !Ref ProjectKey
Value: !Ref ProjectTag
- Key: !Ref EnvKey
Value: !Ref EnvTag
########################################
# RouteTable
########################################
RouteTablePrivate1:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !ImportValue VPC
Tags:
- Key: Name
Value: !Sub ${SysTag}-Batch-Private1
- Key: !Ref ProjectKey
Value: !Ref ProjectTag
- Key: !Ref EnvKey
Value: !Ref EnvTag
RouteTablePrivate2:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !ImportValue VPC
Tags:
- Key: Name
Value: !Sub ${SysTag}-Batch-Private1
- Key: !Ref ProjectKey
Value: !Ref ProjectTag
- Key: !Ref EnvKey
Value: !Ref EnvTag
########################################
# SubnetRouteTableAssociation
########################################
RouteTableAssociationPrivate1:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PrivateSubnet1
RouteTableId: !Ref RouteTablePrivate1
RouteTableAssociationPrivate2:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PrivateSubnet2
RouteTableId: !Ref RouteTablePrivate2
#*******************************************************************************
# Route
#*******************************************************************************
RouteTablePrivate1NATRoute:
Type: AWS::EC2::Route
Properties:
RouteTableId: !Ref RouteTablePrivate1
DestinationCidrBlock: 0.0.0.0/0
NatGatewayId: !Ref NATGatewayPublic1
RouteTablePrivate2NATRoute:
Type: AWS::EC2::Route
Properties:
RouteTableId: !Ref RouteTablePrivate2
DestinationCidrBlock: 0.0.0.0/0
NatGatewayId: !Ref NATGatewayPublic2