はじめに
AWS Batchにジョブを投入する方法はいくつかありますが、今回はVPC LambdaからAWS Batchにジョブを投入する構成をAWS CDKで作ってみました。
構成図
VPC LambdaからAWS Batchにジョブ投入のリクエストを送る流れを示した図です。
AWS Batchのコンピューティング環境にはFargateを利用しています。
VPC Lambdaは実際にはVPCの外にあり、ENI、VPC Endpointを経由してAWS Batchにジョブ投入のリクエストを送信することになります。
AWS Batchのコンピューティング環境はVPC内になりますが、オーケストレータがVPC外にあるため、VPC Lambdaからジョブを投入するにはVPC Endpointが必要になります。
動作環境
$ cdk --version
2.30.0 (build 1529743)
$ node --version
v14.19.3
やってみた
前提として、VPC、サブネットはすでに作成済であるとします。
AWS Batchを作る
まず、次のようなスタックを作成します。
import { Duration, Stack, StackProps } from 'aws-cdk-lib';
import * as batch from '@aws-cdk/aws-batch-alpha';
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import * as ecs from 'aws-cdk-lib/aws-ecs';
import * as ecr from 'aws-cdk-lib/aws-ecr';
import * as iam from 'aws-cdk-lib/aws-iam';
import { Construct } from 'constructs';
interface AwsBatchStackProps extends StackProps {
appName: string;
envName: string;
vpc: ec2.Vpc;
batchRepository: ecr.Repository;
}
export class AwsBatchStack extends Stack {
public readonly jobQueueArn: string;
public readonly jobDefinitionArn: string;
constructor(scope: Construct, id: string, props?: AwsBatchStackProps) {
super(scope, id, props);
const computeEnvironment = new batch.ComputeEnvironment(
this,
`${props?.appName}--${props?.envName}--compute-environment`,
{
computeEnvironmentName: `${props?.appName}--${props?.envName}--compute-environment`,
computeResources: {
vpc: props!.vpc,
type: batch.ComputeResourceType.FARGATE,
vpcSubnets: {
subnetType: ec2.SubnetType.PRIVATE_ISOLATED,
},
},
}
);
const jobQueue = new batch.JobQueue(
this,
`${props?.appName}--${props?.envName}--job-queue`,
{
jobQueueName: `${props?.appName}--${props?.envName}--job-queue`,
computeEnvironments: [
{
computeEnvironment,
order: 1,
},
],
}
);
const jobDefinition = new batch.JobDefinition(
this,
`${props?.appName}--${props?.envName}--job-definition`,
{
jobDefinitionName: `${props?.appName}--${props?.envName}--job-definition`,
container: {
image: new ecs.EcrImage(props!.batchRepository, 'latest'),
command: ['python3', '/app/main.py', 'Ref::TenantId'],
vcpus: 4,
memoryLimitMiB: 8192,
executionRole: new iam.Role(
this,
`${props?.appName}--${props?.envName}--batch-job-exec-role`,
{
assumedBy: new iam.ServicePrincipal('ecs-tasks.amazonaws.com'),
managedPolicies: [
iam.ManagedPolicy.fromManagedPolicyArn(
this,
`${props?.appName}--${props?.envName}--batch-job-exec-role-ecs-managed-policy`,
'arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy'
),
],
}
),
},
platformCapabilities: [batch.PlatformCapabilities.FARGATE],
}
);
this.jobQueueArn = jobQueue.jobQueueArn;
this.jobDefinitionArn = jobDefinition.jobDefinitionArn;
}
}
リポジトリは後のスタックで使うのでpublic readonly
の属性にしておきます。
次に、cdk-ecr-deployment
を使ってECRにコンテナイメージをpushできるようにします。
本記事では省略しますが、directory
で指定したディレクトリにDockerfileを置いてください。
import { Duration, Stack, StackProps } from 'aws-cdk-lib';
import * as ecr from 'aws-cdk-lib/aws-ecr';
import { DockerImageAsset } from 'aws-cdk-lib/aws-ecr-assets';
import * as ecrdeploy from 'cdk-ecr-deployment';
import { Construct } from 'constructs';
import * as path from 'path';
interface EcrPushStackProps extends StackProps {
appName: string;
envName: string;
batchRepository: ecr.Repository;
}
export class EcrPushStack extends Stack {
public readonly batchRepository: ecr.Repository;
constructor(scope: Construct, id: string, props?: EcrPushStackProps) {
super(scope, id, props);
const image = new DockerImageAsset(
this,
`${props?.appName}--${props?.envName}--docker-image-asset`,
{
directory: path.join(__dirname, 'container'),
}
);
new ecrdeploy.ECRDeployment(
this,
`${props?.appName}--${props?.envName}--ecr-deployment`,
{
src: new ecrdeploy.DockerImageName(image.imageUri),
dest: new ecrdeploy.DockerImageName(
`${props?.env?.account}.dkr.ecr.${props?.env?.region}.amazonaws.com/${props?.batchRepository.repositoryName}`
),
}
);
}
}
VPCエンドポイントを作る
復習:VPCエンドポイントとは
VPC内からVPC外のサービスにインターネットを経由せずにセキュアに通信するための仕組みです。インターフェースエンドポイント、Gateway Load Balancerエンドポイント、ゲートウェイエンドポイントの3種類があります。
インターフェースエンドポイントは、PrivateLink
を使って各サービスと接続します。利用可能なサービスの一覧は下記です。
Gateway Load Balancerエンドポイントは、セキュリティ製品のゲートウェイとなるものです。
ゲートウェイエンドポイントはS3、DynamoDBのみが対応しています。
今回は、AWS Batch用のインターフェースエンドポイントと、Fargate用の5つのインターフェースエンドポイントと1つのゲートウェイエンドポイントを作成する必要があります。
また、AWS Batchをプライベートサブネットに配置した、Route53との通信ができないため、プライベートDNSを有効にする必要があります。
import { Duration, Stack, StackProps } from 'aws-cdk-lib';
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import { Construct } from 'constructs';
interface VpcEndpointStackProps extends StackProps {
appName: string;
envName: string;
vpc: ec2.Vpc;
}
export class VpcEndpointStack extends Stack {
constructor(scope: Construct, id: string, props?: VpcEndpointStackProps) {
super(scope, id, props);
new ec2.InterfaceVpcEndpoint(
this,
`${props?.appName}--${props?.envName}--batch-vpc-endpoint`,
{
vpc: props!.vpc,
service: new ec2.InterfaceVpcEndpointService(
'com.amazonaws.ap-northeast-1.batch'
),
subnets: {
subnetType: ec2.SubnetType.PRIVATE_ISOLATED,
},
privateDnsEnabled: true, // プライベートサブネットの場合、Route53との通信ができないため、プライベートDNSを有効にする必要がある
}
);
new ec2.InterfaceVpcEndpoint(
this,
`${props?.appName}--${props?.envName}--ecr-dkr-vpc-endpoint`,
{
vpc: props!.vpc,
service: ec2.InterfaceVpcEndpointAwsService.ECR,
subnets: {
subnetType: ec2.SubnetType.PRIVATE_ISOLATED,
},
}
);
new ec2.InterfaceVpcEndpoint(
this,
`${props?.appName}--${props?.envName}--ecr-api-vpc-endpoint`,
{
vpc: props!.vpc,
service: ec2.InterfaceVpcEndpointAwsService.ECR_DOCKER,
subnets: {
subnetType: ec2.SubnetType.PRIVATE_ISOLATED,
},
}
);
new ec2.InterfaceVpcEndpoint(
this,
`${props?.appName}--${props?.envName}--logs-endpoint`,
{
vpc: props!.vpc,
service: ec2.InterfaceVpcEndpointAwsService.CLOUDWATCH_LOGS,
subnets: {
subnetType: ec2.SubnetType.PRIVATE_ISOLATED,
},
}
);
new ec2.InterfaceVpcEndpoint(
this,
`${props?.appName}--${props?.envName}--ssm-endpoint`,
{
vpc: props!.vpc,
service: ec2.InterfaceVpcEndpointAwsService.SSM,
subnets: {
subnetType: ec2.SubnetType.PRIVATE_ISOLATED,
},
}
);
new ec2.InterfaceVpcEndpoint(
this,
`${props?.appName}--${props?.envName}--secretsmanager-endpoint`,
{
vpc: props!.vpc,
service: ec2.InterfaceVpcEndpointAwsService.SECRETS_MANAGER,
subnets: {
subnetType: ec2.SubnetType.PRIVATE_ISOLATED,
},
}
);
new ec2.GatewayVpcEndpoint(
this,
`${props?.appName}--${props?.envName}--s3-vpc-endpoint`,
{
vpc: props!.vpc,
service: ec2.GatewayVpcEndpointAwsService.S3,
subnets: [
{
subnetType: ec2.SubnetType.PRIVATE_ISOLATED,
},
],
}
);
}
}
Lambdaを作る
次のようなスタックを作成します。
import { Duration, Stack, StackProps, ResourceEnvironment } from 'aws-cdk-lib';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import * as iam from 'aws-cdk-lib/aws-iam';
import { Construct } from 'constructs';
import * as path from 'path';
import * as batch from '@aws-cdk/aws-batch-alpha';
interface JobSubmitLambdaStackProps extends StackProps {
appName: string;
envName: string;
vpc: ec2.Vpc;
jobQueue: batch.JobQueue;
jobDefinition: batch.JobDefinition;
}
export class JobSubmitLambdaStack extends Stack {
constructor(scope: Construct, id: string, props?: JobSubmitLambdaStackProps) {
super(scope, id, props);
const lambdaFunction = new lambda.Function(
this,
`${props?.appName}--${props?.envName}--batch-func`,
{
functionName: `${props?.appName}--${props?.envName}--batch-func`,
runtime: lambda.Runtime.PYTHON_3_9,
handler: 'batch_func.lambda_handler',
code: lambda.Code.fromAsset(path.join(__dirname, './lambda-handler')),
vpc: props?.vpc,
vpcSubnets: {
subnetType: ec2.SubnetType.PRIVATE_ISOLATED,
},
memorySize: 512,
timeout: Duration.seconds(10),
environment: {
JOB_NAME: 'test-job',
JOB_QUEUE: props?.jobQueue.jobQueueArn as string,
JOB_DEFINITION: props?.jobDefinition.jobDefinitionArn as string,
},
role: new iam.Role(
this,
`${props?.appName}--${props?.envName}--batch-func-role`,
{
assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com'),
managedPolicies: [
iam.ManagedPolicy.fromManagedPolicyArn(
this,
`${props?.appName}--${props?.envName}-----lambda-vpc-access-managed-policy`,
'arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole'
),
iam.ManagedPolicy.fromManagedPolicyArn(
this,
`${props?.appName}--${props?.envName}-----aws-batch-submit-job-policy`,
'arn:aws:iam::aws:policy/service-role/AWSBatchServiceEventTargetRole'
),
],
}
),
}
);
}
}
ENIを作成できるようにロールにAWSLambdaVPCAccessExecutionRole
ポリシーを付与しています。
また、AWS Batchにジョブを投入するためにAWSBatchServiceEventTargetRole
ポリシーも付与しています。
ハンドラ関数のコードは下記です。AWS Batchにジョブを投入して、ジョブIDを返しているだけです。AWS Batchのジョブ名などは環境変数で指定しています。
import json
import os
import boto3
batch = boto3.client("batch")
def lambda_handler(event, context):
jobName = os.environ.get("JOB_NAME")
jobQueue = os.environ.get("JOB_QUEUE")
jobDefinition = os.environ.get("JOB_DEFINITION")
try:
# Submit a Batch Job
response = batch.submit_job(
jobQueue=jobQueue,
jobName=jobName,
jobDefinition=jobDefinition,
)
# Log response from AWS Batch
print("Response: " + json.dumps(response, indent=2))
# Return the jobId
jobId = response["jobId"]
return {"jobId": jobId}
except Exception as e:
print(e)
message = "Error submitting Batch Job"
print(message)
raise Exception(message)
動作確認
Lambdaにテストイベントを発行して、レスポンスにjobIdが入っていることが確認できました。