AWSにおけるバッチ処理
「AWS バッチ処理」で検索すると、AWS Batchを用いたバッチコンピューティングの情報が多くヒットします。
しかし、実際の業務では定期的に実行されるバッチ処理の方がよく利用されると思います。
本記事ではAWSを用いて定期実行するバッチ処理を作成したいと思います。
AWSでバッチ処理を実施するときの選択肢
以下の3パターンが考えられます
No | 方式 | メリット | デメリット |
---|---|---|---|
1 | EC2上でCronを動かす | 長時間のバッチを実行可能 OSやサーバのスペックの自由度が高い |
サーバのメンテナンスが必要(セキュリティパッチなど) バッチが動いていない時間も課金が発生する |
2 | EventBridge + Lambda | サーバのメンテナンスが不要 3つの中で料金が一番安い 環境構築が容易 デフォルトで同時実行が1,000まで可能 |
15分以内にバッチを終わる必要がある 大きなメモリを割り当てられない |
3 | EventBridge + Fargate | サーバのメンテナンスが不要 長時間のバッチを実行可能 バッチが動いている時間のみの課金となる |
dockerの利用が必須になる ECSの学習コストがやや高い |
業務でバッチ処理を使用する場合、15分以上実行するケースが多いです。
また、15分以上かかるバッチが今後発生しないとも限らないので、個人的にはEventBridge + Lambdaは選択肢から外れます。
EC2 + Cronはサーバのメンテナンスが地味に大変なのと、バッチが動いていない時間も課金が発生するため、これも選択肢から外れます。
総合的にみてEventBridge + Fargateが現状ベストな方式だと思いますので、次の章からEventBridge + Fargateの構築方法について解説します。
EventBridge + Fargateによるバッチ処理
構成図
EventBridgeはCron形式で起動時間を指定することが可能です。
AZ1とAZ2がありますが、EventBridgeがどちらのAZでFargateを起動するかはランダムになります。
今回バッチ処理では、毎時30分にSecrets Managerに保存されているバケット名を取得し、そのバケットに配置されているファイル名をログに表示します。
ECRの作成
イメージタグの変更可能性をMUTABLEにしていますが、IMMUTABLEの方が安全です。
resource "aws_ecr_repository" "sample_batch" {
name = "sample-batch"
image_tag_mutability = "MUTABLE"
image_scanning_configuration {
scan_on_push = true
}
encryption_configuration {
encryption_type = "KMS"
}
}
VPCの作成
VPCエンドポイント(Interface)を利用するためには、enable_dns_hostnamesをTrueに設定する必要があります。
resource "aws_vpc" "sample_batch" {
cidr_block = "192.168.0.0/16"
instance_tenancy = "default"
enable_dns_hostnames = true
tags = {
Name = "sample-batch"
}
}
VPCエンドポイントの作成
プライベートサブネットからVPC外のAWSサービスに接続するためにはVPCエンドポイントが必要なります。
VPCエンドポイントにはGateway型とInterface型があります。
Gateway型は無料で、Interface型は有料(0.014USD/h)で使用できます。
全てGateway型を使用したいところですが、S3とDynamoDBしか対応していないので、今回はS3のみ作成します。
Interface型は計8個((ECR2個+CloudWatch1個+SecretsManager1個)×2AZ)作成します。
マルチAZ構成にするためには、各AZにVPCエンドポイントを配置する必要があります。
VPCエンドポイントは地味に高いので、個人アカウントで検証する場合は、こまめに削除しましょう。
resource "aws_vpc_endpoint" "s3_gateway" {
vpc_id = aws_vpc.sample_batch.id
service_name = "com.amazonaws.ap-northeast-1.s3"
vpc_endpoint_type = "Gateway"
route_table_ids = [aws_route_table.sample_ecs_rt.id]
tags = {
Name = "s3-gateway"
}
}
resource "aws_vpc_endpoint" "ecr_dkr" {
vpc_id = aws_vpc.sample_batch.id
service_name = "com.amazonaws.ap-northeast-1.ecr.dkr"
vpc_endpoint_type = "Interface"
security_group_ids = [aws_security_group.sample_endpoint_sg.id]
subnet_ids = [aws_subnet.sample_endpoint_private_subnet_1a.id, aws_subnet.sample_endpoint_private_subnet_1c.id]
private_dns_enabled = true
tags = {
Name = "ecr-dkr"
}
}
resource "aws_vpc_endpoint" "ecr_api" {
vpc_id = aws_vpc.sample_batch.id
service_name = "com.amazonaws.ap-northeast-1.ecr.api"
vpc_endpoint_type = "Interface"
security_group_ids = [aws_security_group.sample_endpoint_sg.id]
subnet_ids = [aws_subnet.sample_endpoint_private_subnet_1a.id, aws_subnet.sample_endpoint_private_subnet_1c.id]
private_dns_enabled = true
tags = {
Name = "ecr-api"
}
}
resource "aws_vpc_endpoint" "logs" {
vpc_id = aws_vpc.sample_batch.id
service_name = "com.amazonaws.ap-northeast-1.logs"
vpc_endpoint_type = "Interface"
security_group_ids = [aws_security_group.sample_endpoint_sg.id]
subnet_ids = [aws_subnet.sample_endpoint_private_subnet_1a.id, aws_subnet.sample_endpoint_private_subnet_1c.id]
private_dns_enabled = true
tags = {
Name = "logs"
}
}
resource "aws_vpc_endpoint" "secrets_manager" {
vpc_id = aws_vpc.sample_batch.id
service_name = "com.amazonaws.ap-northeast-1.secretsmanager"
vpc_endpoint_type = "Interface"
security_group_ids = [aws_security_group.sample_endpoint_sg.id]
subnet_ids = [aws_subnet.sample_endpoint_private_subnet_1a.id, aws_subnet.sample_endpoint_private_subnet_1c.id]
private_dns_enabled = true
tags = {
Name = "secrets-manager"
}
}
サブネットの作成
バッチ用のプライベートサブネットを2つ、VPCエンドポイント(Interface)用のプライベートサブネットを2つ用意します。
resource "aws_subnet" "sample_batch_private_subnet_1a" {
vpc_id = aws_vpc.sample_batch.id
availability_zone = "ap-northeast-1a"
cidr_block = "192.168.0.0/24"
tags = {
Name = "sample-batch-subnet-1a"
}
}
resource "aws_subnet" "sample_batch_private_subnet_1c" {
vpc_id = aws_vpc.sample_batch.id
availability_zone = "ap-northeast-1c"
cidr_block = "192.168.1.0/24"
tags = {
Name = "sample-batch-subnet-1c"
}
}
resource "aws_subnet" "sample_endpoint_private_subnet_1a" {
vpc_id = aws_vpc.sample_batch.id
availability_zone = "ap-northeast-1a"
cidr_block = "192.168.2.0/24"
tags = {
Name = "sample-endpoint-subnet-1a"
}
}
resource "aws_subnet" "sample_endpoint_private_subnet_1c" {
vpc_id = aws_vpc.sample_batch.id
availability_zone = "ap-northeast-1c"
cidr_block = "192.168.3.0/24"
tags = {
Name = "sample-endpoint-subnet-1c"
}
}
Secrets Managerの作成
Secrets ManagerにS3のバケット名を保存します。
resource "aws_secretsmanager_secret" "sample_batch" {
name = "sample-batch"
}
resource "aws_secretsmanager_secret_version" "sample_batch" {
secret_id = aws_secretsmanager_secret.sample_batch.id
secret_string = jsonencode({
S3_BUCKET_NAME = aws_s3_bucket.sample_batch.bucket
})
}
セキュリティグループの作成
VPCエンドポイントのセキュリティグループには443のインバウンドルールを許可する必要があります。
resource "aws_security_group" "sample_ecs_sg" {
name = "sample-ecs-sg"
description = "ecs security group"
vpc_id = aws_vpc.sample_batch.id
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
}
resource "aws_security_group" "sample_endpoint_sg" {
name = "sample-endpoint-sg"
description = "endpoint security group"
vpc_id = aws_vpc.sample_batch.id
ingress {
protocol = "tcp"
from_port = 443
to_port = 443
security_groups = [aws_security_group.sample_ecs_sg.id]
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
}
S3の作成
365日のライフサイクルポリシーを設定しています。
パブリックアクセスは無効になっています。
resource "aws_s3_bucket" "sample_batch" {
bucket = "sample-batch-xxxxxx"
}
resource "aws_s3_bucket_public_access_block" "sample_batch" {
bucket = aws_s3_bucket.sample_batch.id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
resource "aws_s3_bucket_versioning" "sample_batch" {
bucket = aws_s3_bucket.sample_batch.id
versioning_configuration {
status = "Disabled"
}
}
resource "aws_s3_bucket_lifecycle_configuration" "sample_batch" {
bucket = aws_s3_bucket.sample_batch.id
rule {
id = "sample-batch"
status = "Enabled"
expiration {
days = 365
}
}
}
resource "aws_s3_bucket_policy" "sample_batch" {
bucket = aws_s3_bucket.sample_batch.id
policy = data.aws_iam_policy_document.read_policy.json
}
data "aws_iam_policy_document" "read_policy" {
version = "2012-10-17"
statement {
effect = "Allow"
principals {
type = "Service"
identifiers = ["delivery.logs.amazonaws.com"]
}
actions = ["s3:GetObject"]
resources = ["arn:aws:s3:::${aws_s3_bucket.sample_batch.bucket}/*"]
}
}
resource "aws_s3_bucket_server_side_encryption_configuration" "sample_batch" {
bucket = aws_s3_bucket.sample_batch.bucket
rule {
apply_server_side_encryption_by_default {
sse_algorithm = "AES256"
}
}
}
ルートテーブルの作成
Gateway型のVPCエンドポイントが、ルートテーブルを必要とするため作成します。
resource "aws_route_table" "sample_ecs_rt" {
vpc_id = aws_vpc.sample_batch.id
tags = {
Name = "sample-ecs-rt"
}
}
resource "aws_route_table_association" "sample_ecs_private_subnet_1a_rta" {
route_table_id = aws_route_table.sample_ecs_rt.id
subnet_id = aws_subnet.sample_batch_private_subnet_1a.id
}
resource "aws_route_table_association" "sample_ecs_private_subnet_1c_rta" {
route_table_id = aws_route_table.sample_ecs_rt.id
subnet_id = aws_subnet.sample_batch_private_subnet_1c.id
}
IAMロールの作成
ECSとEventBridgeに付与するIMAロールを作成します。
data "aws_iam_policy_document" "ecs_assume_role_policy" {
version = "2012-10-17"
statement {
effect = "Allow"
actions = ["sts:AssumeRole"]
principals {
type = "Service"
identifiers = ["ecs-tasks.amazonaws.com"]
}
}
}
data "aws_iam_policy_document" "event_bridge_assume_role_policy" {
statement {
effect = "Allow"
actions = ["sts:AssumeRole"]
principals {
identifiers = ["events.amazonaws.com"]
type = "Service"
}
}
}
data "aws_iam_policy_document" "get_secret_policy" {
version = "2012-10-17"
statement {
effect = "Allow"
actions = ["secretsmanager:GetSecretValue"]
resources = ["*"]
}
}
data "aws_iam_policy_document" "log_access_policy" {
version = "2012-10-17"
statement {
effect = "Allow"
actions = ["logs:CreateLogStream", "logs:CreateLogGroup", "logs:PutLogEvents"]
resources = ["*"]
}
}
data "aws_iam_policy_document" "ecs_task_run_policy" {
version = "2012-10-17"
statement {
effect = "Allow"
actions = ["ecs:RunTask"]
resources = ["*"]
}
statement {
effect = "Allow"
actions = ["iam:PassRole"]
resources = ["*"]
condition {
test = "StringEquals"
variable = "iam:PassedToService"
values = ["ecs-tasks.amazonaws.com"]
}
}
}
resource "aws_iam_role" "ecs_task_execution_role" {
name = "ECSTaskExcutionRole"
assume_role_policy = data.aws_iam_policy_document.ecs_assume_role_policy.json
}
resource "aws_iam_role" "ecs_task_role" {
name = "ECSTaskRole"
assume_role_policy = data.aws_iam_policy_document.ecs_assume_role_policy.json
}
resource "aws_iam_role" "event_bridge_role" {
name = "EventBridgeRole"
assume_role_policy = data.aws_iam_policy_document.event_bridge_assume_role_policy.json
}
resource "aws_iam_role_policy" "get_secret_policy" {
name = "GetSecretPolicy"
role = aws_iam_role.ecs_task_execution_role.name
policy = data.aws_iam_policy_document.get_secret_policy.json
}
resource "aws_iam_role_policy" "ecs_task_run_policy" {
name = "ECSTaskRunPolicy"
role = aws_iam_role.event_bridge_role.id
policy = data.aws_iam_policy_document.ecs_task_run_policy.json
}
resource "aws_iam_role_policy_attachment" "ecs_task_execution" {
role = aws_iam_role.ecs_task_execution_role.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"
}
resource "aws_iam_role_policy_attachment" "s3_full_access" {
role = aws_iam_role.ecs_task_role.name
policy_arn = "arn:aws:iam::aws:policy/AmazonS3FullAccess"
}
EventBridgeの作成
Cronを使用して毎時30分にバッチが起動するように設定しています。
最新バージョンのタスク定義を起動するようにしています。
resource "aws_cloudwatch_event_rule" "sample_batch" {
name = "sample-batch"
description = "sample batch event rule"
schedule_expression = "cron(30 * * * ? *)"
}
resource "aws_cloudwatch_event_target" "sample_batch" {
arn = aws_ecs_cluster.sample_batch.arn
rule = aws_cloudwatch_event_rule.sample_batch.name
role_arn = aws_iam_role.event_bridge_role.arn
ecs_target {
launch_type = "FARGATE"
task_count = 1
task_definition_arn = replace(aws_ecs_task_definition.sample_batch.arn, "/:\\d+$/", "")
network_configuration {
assign_public_ip = false
subnets = [aws_subnet.sample_batch_private_subnet_1a.id, aws_subnet.sample_batch_private_subnet_1c.id]
security_groups = [aws_security_group.sample_ecs_sg.id]
}
}
input = <<DOC
{
"containerOverrides": [
{
"name": "batch",
"command": ["java","-classpath","sample-batch.jar","S3ListFiles"]
}
]
}
DOC
retry_policy {
maximum_event_age_in_seconds = 1800
maximum_retry_attempts = 10
}
}
ECS(Fargate)の作成
Secrets Managerから取得したバケット名を環境変数にセットしてます。
resource "aws_ecs_cluster" "sample_batch" {
name = "sample-batch"
setting {
name = "containerInsights"
value = "enabled"
}
}
resource "aws_ecs_task_definition" "sample_batch" {
family = "sample-batch"
requires_compatibilities = ["FARGATE"]
task_role_arn = aws_iam_role.ecs_task_role.arn
network_mode = "awsvpc"
memory = "8192"
cpu = "2048"
execution_role_arn = aws_iam_role.ecs_task_execution_role.arn
container_definitions = jsonencode([
{
name = "batch"
image = "${aws_ecr_repository.sample_batch.repository_url}:latest"
portMappings = [
{
containerPort = 80
hostPort = 80
}
],
secrets = [
{
name = "S3_BUCKET_NAME"
valueFrom = "${aws_secretsmanager_secret.sample_batch.arn}:S3_BUCKET_NAME::"
}
],
logConfiguration = {
logDriver = "awslogs",
options = {
awslogs-region = "ap-northeast-1"
awslogs-group = aws_cloudwatch_log_group.sample_batch.name
awslogs-stream-prefix = "ecs"
}
}
}
])
runtime_platform {
operating_system_family = "LINUX"
}
}
CloudWatchロググループの作成
バッチのログを保存するためにCloudWatchロググループを作成します。
resource "aws_cloudwatch_log_group" "sample_batch" {
name = "/ecs/SampleBatch"
retention_in_days = 365
}
バッチアプリケーションの作成
Javaでバッチアプリケーションを作成します。
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3ObjectSummary;
public class S3ListFiles {
private static final String BUCKET_NAME = System.getenv("S3_BUCKET_NAME");
private static final String S3_REGION = "ap-northeast-1";
public static void main(String[] args) {
AmazonS3 client = AmazonS3ClientBuilder.standard().withRegion(S3_REGION).build();
ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(BUCKET_NAME);
ListObjectsV2Result result;
do {
result = client.listObjectsV2(req);
for (S3ObjectSummary summary : result.getObjectSummaries()) {
System.out.println(summary.getKey());
}
req.setContinuationToken(result.getNextContinuationToken());
} while (result.isTruncated());
}
}
ビルドはGradleで行います。
plugins {
id 'java'
id 'com.github.johnrengelman.shadow' version '7.1.2'
}
sourceCompatibility = '11'
compileJava.options.encoding = 'UTF-8'
sourceSets.main.java.srcDirs = ['src']
repositories {
mavenCentral()
}
dependencies {
implementation platform('com.amazonaws:aws-java-sdk-bom:1.11.1000')
implementation 'com.amazonaws:aws-java-sdk-s3'
}
Shadowプラグインを使用してjarファイルを作成します。
下記コマンドを実行すると、build/libs配下にjarファイルが作成されます。
$ ./gradlew shadowJar
ECRにプッシュ
今回使用するDockerfileは下記になります。
FROM openjdk:11
ENV TZ=Asia/Tokyo
COPY ./build/libs/sample-batch.jar .
ECRへプッシュする際のコマンドは、AWSマネージメントコンソールのECRのページから、「プッシュコマンドを表示」というボタンから確認することができます。
動作確認
バッチが起動する毎時30分になったら、CloudWatchロググループにある/ecs/SampleBatchのログを確認します。
ログにa.txtとb.txtが表示されていることを確認できました。
最後に
業務でバッチ処理を作成する際は、ぜひ参考にしてみてください。
バッチからDBに接続する場合は、DBの接続先をSecrets Managerに保存し、環境変数経由で取得するようにしてください。
EventBridgeは1つのイベントに応じて複数回トリガーされる仕様があります。
https://docs.aws.amazon.com/ja_jp/eventbridge/latest/userguide/eb-troubleshooting.html#eb-rule-triggered-more-than-once
実際に私も過去に1度経験したことがあります。
対策としては、アプリケーション側で複数回呼ばれても問題ない仕様にするか、Step Functionsで制御する方法を試してみてください。