3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Lambdaで処理できないバッチ処理をAWS Batchで便利に実行しよう

Posted at

概要

この記事ではSQS + Lambda構成で処理できない長時間のバッチ処理のためにAWS Batchを利用する方法について解説します。AWS Batchの概要、設定方法や実際にジョブを登録して実行する方法について説明していきます。

このユースケースでのAWS Batchの利点は以下の通りです。

  • ✅ サーバーレスでバッチ処理を行うことができる
  • ✅ Lambdaの15分制限を超える長時間の処理を行うことができる
  • ✅ ジョブのスケジューリングやリトライを自動で行ってくれる
  • ✅ ジョブの並行実行数を制御することができる
  • ✅ ジョブの失敗時の通知を設定することができる

またこの記事ではAWS CDKを利用しながらAWS Batchの設定方法を例示するので、お手元の環境でサンプルコードそのまま実行して試してみることができます。

目次

背景

バッチ処理を行う際にSQS + Lambdaの構成は非常に便利です。SQSによるスケジューリング、流量制御、リトライなどの機能を利用しつつ、Lambdaによるサーバーレスな処理を行うことができます。しかし、Lambdaの実行時間には15分までという制約があるため長時間の処理を行う場合は別の方法を考える必要があります。

同様にサーバーレスな基盤で処理を行うことを考えると、まず代替候補に挙げられるのがECSです。しかしながら、SQSから直接ECSタスクをトリガーすることができません。ECSサービスとして常時起動してSQSキューをポーリングすることで同様の処理を実現することも可能ですが、この場合はポーリングする間に常にコストが発生することになります。

AWS Batchの概要

AWS BatchはECSにタスクのスケジューリング機能を追加したものというイメージのサービスです。コンテナベースのタスクをキューに登録してスケジューリングして自動的に実行、リトライを行うことができます。またLambdaと違って最大実行時間の制約がなく15分以上の長時間の処理を行うことができます。

AWS公式の説明では以下のようになっており、大規模なバッチ処理向けのサービスという印象を受けるかもしれません。

AWS Batch は、Amazon ECS、Amazon EKS、AWS Fargate、および Spot またはオンデマンドインスタンスなどの AWS コンピューティングサービスの全範囲にわたって、お客様のコンテナ化したバッチの機械学習、シミュレーションおよび分析ワークロードを計画、スケジュール、実行するフルマネージドバッチコンピューティングサービスです。

しかしながら、実際には一つのECSタスクに最低で0.25vCPUと512MBのメモリから割り当てることができため、比較的小規模なバッチ処理にも利用することができます。

またAWS Batch自体の利用に追加でコストは発生しません。タスクで利用した計算リソースに対してのみ課金されます。

以上から、長時間のバッチ処理を行う際にはSQS + Lambda構成の代替としてAWS Batchがフィットしていると言えます。

AWS Batchの使い方

AWS BatchではECSタスクはジョブと呼ばれます。あらかじめジョブの内容を定義しておき、また別途ジョブキューを作成しておきます。この二つを指定してジョブを登録することで自動的にECSタスクが起動して処理が実行されます。

AWS Batchは以下の要素から構成されています。それぞれ順番に見ていきましょう。

  • ジョブ定義
  • コンピューティング環境
  • ジョブキュー

各リソースの作成にはAWS CDKを利用します。まず以下のコマンドでCDKプロジェクトを作成しておきます。

cdk init app --language typescript

ジョブ定義

ジョブ定義はECSのタスク定義に相当するものです。まずジョブで利用するコンテナを定義します。ここでコンテナイメージ、割り当てる計算リソース(CPU、メモリ)、コンテナ環境変数、ジョブ実行用のIAMロールなどを指定します。

ここではFargate環境でAmazon Linuxイメージを利用してecho Hello, World!コマンドを実行するだけのコンテナを定義する例を示します。

lib/cdk-stack.ts
import { EcsFargateContainerDefinition, FargateComputeEnvironment } from 'aws-cdk-lib/aws-batch';
import { ContainerImage } from 'aws-cdk-lib/aws-ecs';
import { Role, ServicePrincipal } from 'aws-cdk-lib/aws-iam';

export class CdkStack extends cdk.Stack {
    constructor(scope: Construct, id: string, props?: cdk.StackProps) {
        super(scope, id, props);
        const taskExecutionRole = new Role(this, 'TaskExecutionRole', {
            assumedBy: new ServicePrincipal('ecs-tasks.amazonaws.com'),
        });
        // ECSタスク実行のため`AmazonECSTaskExecutionRolePolicy`を割り当てる必要がある
        taskExecutionRole.addManagedPolicy({
            managedPolicyArn: 'arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy',
        });
        const taskRole = new Role(this, 'TaskRole', {
            assumedBy: new ServicePrincipal('ecs-tasks.amazonaws.com'),
        });
        const container = new EcsFargateContainerDefinition(this, 'Container', {
            image: ContainerImage.fromRegistry('public.ecr.aws/amazonlinux/amazonlinux:latest'),
            memory: cdk.Size.mebibytes(512),
            cpu: 0.25,
            environment: {
                'ENVIRONMENT': 'dev'
            },
            // ECSタスクロール
            jobRole: taskRole,
            // ECSタスク実行ロール
            executionRole: taskExecutionRole,
            command: ['echo', 'Hello, World!'], 
            assignPublicIp: true,
        });
    }
}

実行するジョブから何らかのAWSサービスにアクセスする場合はtaskRoleにも適切な権限を割り当てます。

今回は簡単のため後述するコンピューティング環境をVPCのパブリックサブネットに配置します。インターネットにアクセスしてコンテナイメージを取得するためにパブリックIPアドレスを割り当てています。実用上はパブリックIPアドレスを割り当てず、プライベートサブネットに配置してNATゲートウェイを利用する方がセキュアな構成になるでしょう。

また実際には実行バイナリを含めたイメージをECRにプッシュして利用することになるでしょう。その場合は以下のようにContainerImage.fromEcrRepositoryを利用してイメージを指定します。

lib/cdk-stack.ts
import { ContainerImage } from 'aws-cdk-lib/aws-ecs';
import { Repository } from 'aws-cdk-lib/aws-ecr';

const container = new EcsFargateContainerDefinition(this, 'Container', {
    image: ContainerImage.fromEcrRepository(
        Repository.fromRepositoryName(this, 'Repository', ...),
        'latest',
    ),
    // ...
});

次に上で定義したコンテナを利用してジョブ定義を作成します。ジョブ定義ではジョブのリトライ回数やタイムアウト時間などを指定することができます。

lib/cdk-stack.ts
import { EcsJobDefinition } from 'aws-cdk-lib/aws-batch';

export class CdkStack extends cdk.Stack {
    constructor(scope: Construct, id: string, props?: cdk.StackProps) {
        // ...
        const jobDefinition = new EcsJobDefinition(this, 'JobDefinition', {
            container,
            retryAttempts: 10,
            timeout: cdk.Duration.minutes(100),
        });
    }
}

コンピューティング環境

コンピューティング環境はジョブを実行するための環境です。ここではFargate環境を利用する例を示します。

コンピューティング環境を作成する際には最大vCPU数を指定することができます。これは後述するジョブキュー内でジョブが並行で実行された場合に各ジョブが利用するvCPU数の合計の上限を指定するものです。つまりここで2vCPUを指定し、各ジョブが0.25vCPU利用する場合は最大8つのジョブが同時に実行されることになります。この最大vCPU数を設定することで間接的にジョブの並行実行数を制御することができます(逆にLambdaのSQSトリガーのように直接同時実行数を指定することはできません)。

lib/cdk-stack.ts
import { IpAddresses, Vpc } from 'aws-cdk-lib/aws-ec2';
import { FargateComputeEnvironment } from 'aws-cdk-lib/aws-batch';

export class CdkStack extends cdk.Stack {
    constructor(scope: Construct, id: string, props?: cdk.StackProps) {
        super(scope, id, props);
        // サンプルのためパブリックサブネットのみ作成
        const vpc = new Vpc(this, 'Vpc', {
            subnetConfiguration: [
                {
                    name: 'Public',
                    subnetType: SubnetType.PUBLIC,
                }
            ],
            natGateways: 0
        });
        const computeEnvironment = new FargateComputeEnvironment(this, 'ComputeEnvironment', {
            vpc,
            maxvCpus: 2,
            replaceComputeEnvironment: true,
        });
    }
}

ジョブキュー

ジョブキューはジョブのスケジューリングを行うためのキューです。ジョブキューを作成する際にはどのコンピューティング環境を利用するかを指定する必要があります。ここでは先ほど作成したコンピューティング環境を指定します。複数のコンピューティング環境を優先度順位を付けて指定することもできます。

lib/cdk-stack.ts
import { JobQueue } from 'aws-cdk-lib/aws-batch';

export class CdkStack extends cdk.Stack {
    constructor(scope: Construct, id: string, props?: cdk.StackProps) {
        // ...
        const jobQueue = new JobQueue(this, 'JobQueue', {
            computeEnvironments: [
                {
                    computeEnvironment,
                    order: 1,
                },
            ],
        });
    }
}

その他の設定

以上でAWS Batchのジョブ定義とジョブキューの設定を行いました。その他にも以下のような設定を行うと便利です。

  • ジョブ失敗時の通知
  • パラメータストアへのジョブ定義/ジョブキューのARNの保存

ジョブ失敗時の通知

ジョブが失敗した際にCloudWatch Eventsを経由してSNSトピックへと通知を送ることができます。アラート用のSNSトピックを作成し、ジョブの状態変更をトリガーとしてSNSトピックに通知を送るEventBridgeルールを作成します。

lib/cdk-stack.ts
import { Rule } from 'aws-cdk-lib/aws-events';
import { SnsTopic } from 'aws-cdk-lib/aws-events-targets';
import { Topic } from 'aws-cdk-lib/aws-sns';

export class CdkStack extends cdk.Stack {
    constructor(scope: Construct, id: string, props?: cdk.StackProps) {
        // ...
        const alartSnsTopic = new Topic(this, 'AlartSnsTopic');
        new Rule(this, 'BatchJobFailureNotificationRule', {
            eventPattern: {
                source: ['aws.batch'],
                detailType: ['Batch Job State Change'],
                detail: {
                    status: ['FAILED'],
                    jobQueue: [
                        jobQueue.jobQueueArn,
                    ],
                },
            },
            targets: [new SnsTopic(alartSnsTopic)],
        });
    }
}

パラメータストアへのジョブ定義/ジョブキューのARNの保存

他のサービスなどからジョブを登録する際にジョブ定義やジョブキューのARNを参照する必要がある場合があります。その際にはSSMパラメータストアにARNの値を保存しておくと便利に参照することができます。

lib/cdk-stack.ts
import { StringParameter } from 'aws-cdk-lib/aws-ssm';

export class CdkStack extends cdk.Stack {
    constructor(scope: Construct, id: string, props?: cdk.StackProps) {
        // ...
        const batchArnParameter = new cdk.aws_ssm.StringParameter(this, 'BatchArnParameter', {
            stringValue: JSON.stringify({
                JOB_DEFINITION_ARN:
                    jobDefinition.jobDefinitionArn,
                JOB_QUEUE_ARN: jobQueue.jobQueueArn,
            }),
        });
    }
}

リソースの作成

ここまでで作成したCDKスタックの全体は以下のようになります。

lib/cdk-stack.ts
import * as cdk from 'aws-cdk-lib';
import { EcsFargateContainerDefinition, EcsJobDefinition, FargateComputeEnvironment, JobQueue } from 'aws-cdk-lib/aws-batch';
import { IpAddresses, SecurityGroup, SubnetType, Vpc } from 'aws-cdk-lib/aws-ec2';
import { ContainerImage } from 'aws-cdk-lib/aws-ecs';
import { Rule } from 'aws-cdk-lib/aws-events';
import { SnsTopic } from 'aws-cdk-lib/aws-events-targets';
import { Role, ServicePrincipal } from 'aws-cdk-lib/aws-iam';
import { Topic } from 'aws-cdk-lib/aws-sns';
import { StringParameter } from 'aws-cdk-lib/aws-ssm';
import { Construct } from 'constructs';

export class CdkStack extends cdk.Stack {
    constructor(scope: Construct, id: string, props?: cdk.StackProps) {
        super(scope, id, props);
        const taskExecutionRole = new Role(this, 'TaskExecutionRole', {
            assumedBy: new ServicePrincipal('ecs-tasks.amazonaws.com'),
        });
        taskExecutionRole.addManagedPolicy({
            managedPolicyArn: 'arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy',
        });
        const taskRole = new Role(this, 'TaskRole', {
            assumedBy: new ServicePrincipal('ecs-tasks.amazonaws.com'),
        });
        const container = new EcsFargateContainerDefinition(this, 'Container', {
            image: ContainerImage.fromRegistry('public.ecr.aws/amazonlinux/amazonlinux:latest'),
            memory: cdk.Size.mebibytes(512),
            cpu: 0.25,
            environment: {
                'ENVIRONMENT': 'dev'
            },
            jobRole: taskRole,
            executionRole: taskExecutionRole,
            command: ['echo', 'Hello, World!'],
            assignPublicIp: true,
        });
        const jobDefinition = new EcsJobDefinition(this, 'JobDefinition', {
            container,
            retryAttempts: 10,
            timeout: cdk.Duration.minutes(100),
        });
        const vpc = new Vpc(this, 'Vpc', {
            subnetConfiguration: [
                {
                    name: 'Public',
                    subnetType: SubnetType.PUBLIC,
                }
            ],
            natGateways: 0
        });
        const computeEnvironment = new FargateComputeEnvironment(this, 'ComputeEnvironment', {
            vpc,
            maxvCpus: 2,
            replaceComputeEnvironment: true,
        });
        const jobQueue = new JobQueue(this, 'JobQueue', {
            computeEnvironments: [
                {
                    computeEnvironment,
                    order: 1,
                },
            ],
        });
        const alartSnsTopic = new Topic(this, 'AlartSnsTopic');
        new Rule(this, 'BatchJobFailureNotificationRule', {
            eventPattern: {
                source: ['aws.batch'],
                detailType: ['Batch Job State Change'],
                detail: {
                    status: ['FAILED'],
                    jobQueue: [
                        jobQueue.jobQueueArn,
                    ],
                },
            },
            targets: [new SnsTopic(alartSnsTopic)],
        });
        const batchArnParameter = new StringParameter(this, 'BatchArnParameter', {
            stringValue: JSON.stringify({
                JOB_DEFINITION_ARN:
                    jobDefinition.jobDefinitionArn,
                JOB_QUEUE_ARN: jobQueue.jobQueueArn,
            }),
        });
    }
}

このスタックに以下のようにBatchSampleStackと名前をつけてデプロイします。

bin/cdk.ts
#!/usr/bin/env node
import 'source-map-support/register';
import * as cdk from 'aws-cdk-lib';
import { CdkStack } from '../lib/cdk-stack';

const app = new cdk.App();
new CdkStack(app, 'BatchSampleStack', {});

npx cdk deploy

作成したジョブ定義とジョブキューのARNを、パラメータストアのパラメータを取得して確認してみましょう。以下のコマンドでSSMパラメータを一覧してパラメータ名を取得します。

aws ssm describe-parameters
{
    "Parameters": [
        // ...
        {
            "Name": "CFN-BatchArnParameterD7C46A89-...",
            "Type": "String",
            // ...
        }
    ]
}

このパラメータ名を指定してパラメータの内容を取得します。

aws ssm get-parameter --name <ParameterName>
{
    "Parameter": {
        "Name": "CFN-BatchArnParameterD7C46A89-...",
        "Type": "String",
        "Value": "{\"JOB_DEFINITION_ARN\":\"arn:aws:batch:ap-northeast-1:...:job-definition/JobDefinition24FFE3ED-...:1\",\"JOB_QUEUE_ARN\":\"arn:aws:batch:ap-northeast-1:...:job-queue/JobQueueEE3AD499-...\"}",
        "Version": 1,
        "LastModifiedDate": "...",
        "ARN": "arn:aws:ssm:ap-northeast-1:...:parameter/CFN-BatchArnParameterD7C46A89-...",
        "DataType": "text"
    }
}

ジョブの登録/実行

では実際にジョブを登録してみましょう。以下のコマンドでジョブ名とジョブ定義、ジョブキューを指定してジョブを登録します。

aws batch submit-job \
    --job-name HelloWorld \
    --job-queue <JOB_QUEUE_ARN> \
    --job-definition <JOB_DEFINITION_ARN>

作成されたジョブのARNなどが返されます。

{
    "jobArn": "arn:aws:batch:ap-northeast-1:...:job/...",
    "jobName": "HelloWorld",
    "jobId": "..."
}

ジョブの実行結果を確認するためにジョブのARNを指定してジョブの詳細を取得します。

aws batch describe-jobs --jobs <jobArn>

ジョブが正常に終了していれば以下のように表示されます。

{
    "jobs": [
        {
            // ...
            "status": "SUCCEEDED",
            "attempts": [
                {
                    "container": {
                        "taskArn": "arn:aws:ecs:ap-northeast-1:...:task/AWSBatch-ComputeEnvironmentC57099-.../...",
                        "exitCode": 0,
                        "logStreamName": "JobDefinition24FFE3ED-.../default/...",
                        "networkInterfaces": [
                            {
                                "attachmentId": "...",
                                "privateIpv4Address": "..."
                            }
                        ]
                    },
                    "startedAt": 0,
                    "stoppedAt": 0,
                    "statusReason": "Essential container in task exited"
                }
            ],
        // ...
        }
    ]
}

このジョブは標準出力に"Hello, World!"と出力するだけのジョブでした。正しく実行されていることを確認するためにジョブのログを取得してみましょう。

上記のジョブ詳細に書かれているCloudWatch Logsのログストリーム名を指定してログイベントを取得します。

aws logs get-log-events --log-group-name /aws/batch/job --log-stream-name <logStreamArn>

以下のように"Hello, World!"と出力されていることが確認できます。

{
    "events": [
        {
            "timestamp": 0,
            "message": "Hello, World!",
            "ingestionTime": 0
        }
    ],
    // ...
}

ジョブへのパラメータの渡し方

ジョブに個別のパラメータを渡して実行したい場合は、コンテナの環境変数を利用するかジョブパラメータを指定します。前者の場合はジョブを登録する際にコンテナ定義を上書きすることで環境変数を追加することができます(ジョブ定義にもともとあった環境変数もそのまま利用できます)。

aws batch submit-job \
    --job-name HelloWorld \
    --job-queue <JOB_QUEUE_ARN> \
    --job-definition <JOB_DEFINITION_ARN> \
    --container-overrides '{
        "environment": [
            {"name": "PARAM1", "value": "VALUE1"},
            {"name": "PARAM2", "value": "VALUE2"}
        ]
    }'

この環境変数はコンテナの環境変数として処理中に利用することができます。

後者の場合は以下のようにジョブの登録時にパラメータを指定することができます。

aws batch submit-job \
    --job-name HelloWorld \
    --job-queue <JOB_QUEUE_ARN> \
    --job-definition <JOB_DEFINITION_ARN> \
    --parameters PARAM1=VALUE1,PARAM2=VALUE2

このパラメータはコンテナコマンドの引数として渡されます。利用するためにはコンテナを定義する際にコマンドの引数にプレースホルダを指定しておく必要があります。

const container = new EcsFargateContainerDefinition(this, 'Container', {
    // ...
    command: ['echo', 'Ref::PARAM1'],
    // ...
});

まとめ

この記事ではSQS + Lambda構成の代替としてAWS Batchを利用してバッチ処理を行う方法について解説しました。これによってSQS + Lambdaでは処理できない長時間の処理を、元の構成と同様にタスクのキューイングを行いながらサーバーレスに実行することができます。

We Are Hiring!

VALUESでは「組織の提案力と生産性を最大化する提案ナレッジシェアクラウド」Pitchcraftなどの開発メンバーとしてエンジニアを積極採用中です。

この記事で解説したAWS Batchを利用したバッチ処理の開発などに興味がある方はぜひご連絡ください!

参考

3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?