はじめに
以下の記事で試したEventBridge、StepFunctions、ECS(Fargate)で構成されるバッチ処理と同等のものを、CDK(TypeScript)で記述・構築してみました。
なお、アプリのDockerイメージをビルドしてECR登録する部分については、CI/CDパイプラインとして別途構築することになる想定のため、本記事の対象外とし、ECRのリポジトリ作成とDockerイメージ登録は既に済んでいる前提で記載しています。
準備
ちなみにCDK実行環境としてはdockerコンテナを使用しており、詳細は割愛しますが、node.jsの公式ベースイメージにTypeScriptとAWS CLIだけ追加インストールしたものになってます。
CDKプロジェクトを作成します。
$ node --version
v16.13.2
$ tsc --version
4.5.5
$ cdk --version
2.9.0 (build a69725c)
$ mkdir ecs-test
$ cd ecs-test
$ cdk init sample-app --language typescript
久しぶりにCDKを触ったら、知らぬ間にCDK v2がリリースされていることに気づきました。v1と比べて微妙に書き方が変わっているようで、リソースごとにインストールが必要だったモジュールがaws-cdk-lib
にまとまっているのが大きな変更点に見えます。
API Referenceもググってすぐ見つかるのはv1のものだったりするので、ちゃんとv2の方を見るように要注意でした。CDKは相変わらず更新が早く、ちんたら書いてると途中ですぐにバージョンが上がって置いていかれますね。
ECS
まずはECSのクラスターとタスク定義のスタックを作成します。
コード
import { Stack, StackProps } from 'aws-cdk-lib';
import {
aws_ec2 as ec2,
aws_ecs as ecs,
aws_ecr as ecr,
aws_iam as iam,
} from 'aws-cdk-lib';
import { Construct } from 'constructs';
interface EcsStackProps extends StackProps {
vpcId: string;
clusterName: string;
repositoryName: string;
containerName: string;
}
export class EcsStack extends Stack {
// これらをpublicで公開してる理由は後述
public readonly cluster: ecs.Cluster;
public readonly fargateTaskDefinition: ecs.TaskDefinition;
public readonly containerDefinition: ecs.ContainerDefinition;
constructor(scope: Construct, id: string, props: EcsTestStackProps) {
super(scope, id, props);
// クラスター作成時にVPCを指定しないと自動で新規VPCが作成される
const vpc = ec2.Vpc.fromLookup(this, 'Vpc', {
region: 'ap-northeast-1',
vpcId: props.vpcId
});
// クラスター
this.cluster = new ecs.Cluster(this, 'EcsCluster', {
vpc,
clusterName: props.clusterName,
enableFargateCapacityProviders: true, // Fargateタイプにするために必要
});
// タスクロール
const taskRole = new iam.Role(this, 'TaskRole', {
assumedBy: new iam.ServicePrincipal('ecs-tasks.amazonaws.com'),
});
// アプリからS3入出力するため、タスクロールにポリシー付与
taskRole.addManagedPolicy(
iam.ManagedPolicy.fromAwsManagedPolicyName('Amazons3FullAccess')
);
// タスク実行ロール
const taskExecutionRole = new iam.Role(this, 'TaskExecutionRole', {
assumedBy: new iam.ServicePrincipal('ecs-tasks.amazonaws.com'),
});
// 明示的に指定しなくてもAmazonECSTaskExecutionRolePolicy相当のポリシーが自動で付与された
// タスク定義
this.fargateTaskDefinition = new ecs.FargateTaskDefinition(this, 'TaskDef', {
memoryLimitMiB: 512,
cpu: 256,
taskRole: taskRole,
executionRole: taskExecutionRole,
});
// コンテナ定義で指定するコンテナイメージリポジトリ
const repo = ecr.Repository.fromRepositoryName(this, 'Repository', props.repositoryName);
// タスク定義にコンテナ定義を追加
this.containerDefinition = this.fargateTaskDefinition.addContainer('Container', {
containerName: props.containerName,
image: ecs.ContainerImage.fromEcrRepository(repo),
// ログ設定は、マネコンではデフォルトで設定されたが、CDKでは省略すると無しになる
logging: ecs.LogDrivers.awsLogs({ streamPrefix: 'ecs' }),
});
}
}
# !/usr/bin/env node
import * as cdk from 'aws-cdk-lib';
import { EcsStack } from '../lib/ecs-stack';
const app = new cdk.App();
const env = {
account: process.env.CDK_DEFAULT_ACCOUNT,
region: process.env.CDK_DEFAULT_REGION
};
const ecsStack = new EcsStack(app, 'EcsStack', {
vpcId: '<VPC ID>',
clusterName: 'test',
repositoryName: 'javatest',
containerName: 'javatest-container',
env,
});
引っかかったところ
VPC指定を省略すると新規VPC作成される
マネジメントコンソールでクラスター作成した際は、Fargateタイプの場合、VPCの指定は不要で、タスク実行時にサブネットIDを指定していました。
一方でCDKでクラスター作成した場合、VPCの指定を省略したら、わざわざVPCを新規作成してくれました。そして、ちょっと困ったことに、NATゲートウェイやEIPまで用意してくれたので、料金発生しないと思い込んで削除せずに数日放置していたら数千円とられてしまいました。ただの不注意で自己責任ですが、ちょっと凹みました。
If you omit the property
vpc
, the construct will create a new VPC with two AZs.
そこで、既存VPCをVpc.fromLookup
で指定するように修正したのですが、以下のエラーが発生したので、EcsStack
のプロパティにenv
を追加しています。
$ cdk synth
/root/ecs-test/node_modules/aws-cdk-lib/core/lib/context-provider.ts:63
throw new Error(`Cannot retrieve value from context provider ${options.provider} since account/region ` +
^
Error: Cannot retrieve value from context provider vpc-provider since account/region are not specified at the stack level. Configure "env" with an account and region when you define your stack.See https://docs.aws.amazon.com/cdk/latest/guide/environments.html for more details.
at Function.getValue (/root/ecs-test/node_modules/aws-cdk-lib/core/lib/context-provider.ts:63:13)
at Function.fromLookup (/root/ecs-test/node_modules/aws-cdk-lib/aws-ec2/lib/vpc.ts:658:66)
at new EcsStack (/root/ecs-test/lib/ecs-stack.ts:21:25)
at Object.<anonymous> (/root/ecs-test/bin/ecs-test.ts:6:1)
at Module._compile (node:internal/modules/cjs/loader:1101:14)
at Module.m._compile (/root/ecs-test/node_modules/ts-node/src/index.ts:1056:23)
at Module._extensions..js (node:internal/modules/cjs/loader:1153:10)
at Object.require.extensions.<computed> [as .ts] (/root/ecs-test/node_modules/ts-node/src/index.ts:1059:12)
at Module.load (node:internal/modules/cjs/loader:981:32)
at Function.Module._load (node:internal/modules/cjs/loader:822:12)
Subprocess exited with error 1
ただ、既存VPCを指定したうえでcdk synth
で生成されたCloudFormationのYAMLを見ても、VPCに関する記述は何もなく、VPC指定が必要なのはCDK側の都合のように見えます。結局要らんのかいって思ってしまいました。
Step Functions(ECSタスク実行)
次に、ECSタスクを実行するStepFunctionsワークフローのスタックを作成します。
StepFunctionsは、ステートマシンのASL(JSON)は前回記事で作成済なので、JSONファイルを入力にして多少設定するだけで簡単にできるかな、と思っていたのですが、そうでもなかったです。
StateMachine.fromJson
みたいなメソッドがあるかと思いきや見あたらず、ドキュメント記載のサンプルコードは、ASLで記述したのと同様の定義をCDKコード上で記述するものでした。
ステートマシンのグラフプレビューも見ながら、試行錯誤しつつASLを記述・検証したうえで、完成版をIaCに落とす、みたいなプロセスは需要はありそうな気はするのですが、実際はそうでもないのでしょうか。
CloudFormationの場合はDefinitionString
プロパティでASLのJSONを指定できるようですし、CDKでも低レベルのCfnStateMachine
を使用すれば同様にできそうではあります。
ただ折角なので、CDKコードでの記述方法も学ぶために改めて書いてみました。
コード
import { Stack, StackProps } from 'aws-cdk-lib';
import {
aws_ec2 as ec2,
aws_ecs as ecs,
aws_stepfunctions as sfn,
aws_stepfunctions_tasks as tasks,
} from 'aws-cdk-lib';
import { Construct } from 'constructs';
interface StepEcsStackProps extends StackProps {
fargateCluster: ecs.Cluster;
taskDefinition: ecs.TaskDefinition;
containerDefinition: ecs.ContainerDefinition;
subnetId: string;
securityGroupId: string;
}
export class StepEcsStack extends Stack {
public readonly stateMachine: sfn.StateMachine;
constructor(scope: Construct, id: string, props: StepEcsStackProps) {
super(scope, id, props);
// Cluster.fromClusterArnで生成すると実行時にエラーになるので
// EcsStack内のインスタンスを参照させる
const cluster = props.fargateCluster;
const runTask = new tasks.EcsRunTask(this, 'RunFargateTask', {
// integrationPatternを省略するとデフォルトで非同期実行となる
// 同期実行したい場合はRUN_JOBを指定
integrationPattern: sfn.IntegrationPattern.RUN_JOB,
launchTarget: new tasks.EcsFargateLaunchTarget(),
cluster,
// taskDefinitionをfromTaskDefinitionArnで生成するとエラーとなるので
// EcsStack内のインスタンスを参照させる
taskDefinition: props.taskDefinition,
subnets: {
subnets: [
ec2.Subnet.fromSubnetId(this, 'Subnet', props.subnetId)
]
},
securityGroups: [
ec2.SecurityGroup.fromLookupById(this, 'SecurityGroup', props.securityGroupId)
],
assignPublicIp: true,
containerOverrides: [{
// containerDefinitionもtaskDefinitionと同様にEcsStackから参照
containerDefinition: props.containerDefinition,
command: sfn.JsonPath.listAt('$.commands'),
}],
});
const notifySuccess = new sfn.Pass(this, 'NotifySuccess', {
result: sfn.Result.fromString('Success'),
});
const notifyFailure = new sfn.Pass(this, 'NotifyFailure', {
result: sfn.Result.fromString('Failure'),
});
const definition = runTask
// addCatchはデフォルトで"ErrorEquals":["States.ALL"]相当になる
.addCatch(notifyFailure)
.next(notifySuccess);
this.stateMachine = new sfn.StateMachine(this, 'StateMachine', {
definition,
});
}
}
// 以下を追加
const stepEcsStack = new StepEcsStack(app, 'StepEcsStack', {
// EcsStack内のインスタンスを直接参照させている
fargateCluster: ecsStack.cluster,
taskDefinition: ecsStack.fargateTaskDefinition,
containerDefinition: ecsStack.containerDefinition,
subnetId: '<サブネットID>',
securityGroupId: '<セキュリティグループID>',
env,
});
引っかかったところ
ECSリソースがARN指定できない
ECSのクラスターとタスク定義は、ARNを指定するだけで済むかと思っていましたが、それではうまくいきませんでした。
Cluster
をCluster.fromClusterArn
で生成したところ、実行時に以下のエラーとなりました。
$ cdk synth StepStack
/root/ecs-test/node_modules/aws-cdk-lib/aws-ecs/lib/cluster.ts:80
throw new Error(`vpc ${errorSuffix}`);
^
Error: vpc is not available for a Cluster imported using fromClusterArn(), please use fromClusterAttributes() instead.
at Import.get vpc [as vpc] (/root/ecs-test/node_modules/aws-cdk-lib/aws-ecs/lib/cluster.ts:80:15)
at EcsRunTask.configureAwsVpcNetworking (/root/ecs-test/node_modules/aws-cdk-lib/aws-stepfunctions-tasks/lib/ecs/run-task.ts:211:37)
at new EcsRunTask (/root/ecs-test/node_modules/aws-cdk-lib/aws-stepfunctions-tasks/lib/ecs/run-task.ts:169:12)
at new StepStack (/root/ecs-test/lib/step-stack.ts:26:21)
at Object.<anonymous> (/root/ecs-test/bin/ecstest.ts:23:19)
at Module._compile (node:internal/modules/cjs/loader:1101:14)
at Module.m._compile (/root/ecs-test/node_modules/ts-node/src/index.ts:1056:23)
at Module._extensions..js (node:internal/modules/cjs/loader:1153:10)
at Object.require.extensions.<computed> [as .ts] (/root/ecs-test/node_modules/ts-node/src/index.ts:1059:12)
at Module.load (node:internal/modules/cjs/loader:981:32)
Subprocess exited with error 1
fromClusterArn
でなくfromClusterAttributes
を使えと言われていますが、ClusterAttributes
はVPCやセキュリティグループのインスタンスが必須プロパティとなっており、ここでまた定義するのも面倒なので、とりあえずEcsStack内のインスタンスを直接参照させてしまいました。
タスク定義についても、TaskDefinition.fromTaskDefinitionArn
というメソッドはあるのですが、この戻り値の型がインターフェースのITaskDefinition
であるのに対し、EcsRunTask
はインターフェースでなくTaskDefinition
の実体を求めているためコンパイルエラーとなってしまいます。
このスタック内で改めてタスク定義を組み立てるのは避けたかったので、こちらもEcsStack内のインスタンスを参照させることにしました。
また、ContainerOverrides
で必要となるContainerDefinition
についても、タスク定義に紐づいて生成されるインスタンスなので、TaskDefinition
と同様にEcsStack
から参照させています。
ただ、上記のようにしてしまうと、EcsStackへの依存が強くなってしまい、スタックを分ける意味もあまりなくなってしまうという問題があります。
今回はECSまわりも合わせてCDKで構築しているので上記対応でも可能ですが、もし別途構築済のクラスターやタスク定義をCDKで構築するStepFunctionsで使用したい場合は問題になりそうです。すみませんが、このあたりをどうするのがよいのかは、まだ調査・検討が十分にできていません。
ECSタスクを同期実行したい場合はintegrationPattern指定が必要
tasks.EcsRunTask
でintegrationPattern
を指定しなかったら、該当タスクが"Resource": "arn:aws:states:::ecs:runTask"
(syncが付いてない)となり、ECSタスク完了を待たずにワークフローが完了する状態となってしまいました。
プロパティに以下を追加することで、同期実行されるようになりました。デフォルトだとRUN_JOB
でなくREQUEST_RESPONSE
になるようです。
integrationPattern: sfn.IntegrationPattern.RUN_JOB,
生成されたASL
上記コードで構築し、生成されたASLは以下となります。
{
"StartAt": "RunFargateTask",
"States": {
"RunFargateTask": {
"Next": "NotifySuccess",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "NotifyFailure"
}
],
"Type": "Task",
"Resource": "arn:aws:states:::ecs:runTask.sync",
"Parameters": {
"Cluster": "arn:aws:ecs:ap-northeast-1:<アカウントID>:cluster/test",
"TaskDefinition": "EcsStackTaskDefF4279AC8",
"NetworkConfiguration": {
"AwsvpcConfiguration": {
"AssignPublicIp": "ENABLED",
"Subnets": [
"<サブネットID>"
],
"SecurityGroups": [
"<セキュリティグループID>"
]
}
},
"Overrides": {
"ContainerOverrides": [
{
"Name": "javatest-container",
"Command.$": "$.commands"
}
]
},
"LaunchType": "FARGATE"
}
},
"NotifySuccess": {
"Type": "Pass",
"Result": "Success",
"End": true
},
"NotifyFailure": {
"Type": "Pass",
"Result": "Failure",
"End": true
}
}
}
また、マネジメントコンソールで作成した際は、自動生成されたロールにPassRoleのポリシーを自分で追加する必要がありましたが、CDKでは以下のようなポリシーを持つロールをよしなに生成してくれました。
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "ecs:RunTask",
"Resource": "arn:aws:ecs:ap-northeast-1:<アカウントID>:task-definition/EcsStackTaskDefF4279AC8",
"Effect": "Allow"
},
{
"Action": [
"ecs:StopTask",
"ecs:DescribeTasks"
],
"Resource": "*",
"Effect": "Allow"
},
{
"Action": "iam:PassRole",
"Resource": [
"arn:aws:iam::<アカウントID>:role/EcsStack-TaskRole30FC0FBB-3YSMB2AFOZL7",
"arn:aws:iam::<アカウントID>:role/EcsStack-TaskExecutionRole250D2532-10W3JOGB0O1CF"
],
"Effect": "Allow"
},
{
"Action": [
"events:PutTargets",
"events:PutRule",
"events:DescribeRule"
],
"Resource": "arn:aws:events:ap-northeast-1:<アカウントID>:rule/StepFunctionsGetEventsForECSTaskRule",
"Effect": "Allow"
}
]
}
Step Functions(並列実行)
次に前述のECSタスク実行のワークフローを並列で呼び出す親ワークフローのスタックを記述します。
コード
import { Stack, StackProps } from 'aws-cdk-lib';
import {
aws_lambda as lambda,
aws_stepfunctions as sfn,
aws_stepfunctions_tasks as tasks,
aws_iam as iam,
aws_events as events,
aws_events_targets as targets,
} from 'aws-cdk-lib';
import { Construct } from 'constructs';
interface StepParallelStackProps extends StackProps {
fargateTaskStateMachineArn: string;
}
export class StepParallelStack extends Stack {
constructor(scope: Construct, id: string, props: StepParallelStackProps) {
super(scope, id, props);
const assignFn = new lambda.Function(this, 'AssignFunction', {
runtime: lambda.Runtime.NODEJS_14_X,
handler: 'index.handler',
code: new lambda.AssetCode('lambda'),
});
// LambdaからS3アクセスするため、ロールにポリシー付与
assignFn.role?.addManagedPolicy(
iam.ManagedPolicy.fromAwsManagedPolicyName('Amazons3FullAccess')
);
const callLambda = new tasks.LambdaInvoke(this, 'CallLambda', {
lambdaFunction: assignFn,
// payloadResponseOnlyを指定しないとLambdaレスポンスがPayload配下となり
// 後続タスクのItemsPath定義とずれてしまう
payloadResponseOnly: true,
// retryOnServiceExceptionsを指定しないとデフォルトでリトライありとなる。
retryOnServiceExceptions: false,
});
const runContainerTask = new tasks.StepFunctionsStartExecution(this, 'RunContainerTask', {
// EcsRunTaskと違ってStateMachineはARN指定も可能
stateMachine: sfn.StateMachine.fromStateMachineArn(this, 'RunContainerStateMachine', props.fargateTaskStateMachineArn),
integrationPattern: sfn.IntegrationPattern.RUN_JOB,
input: sfn.TaskInput.fromObject({
"AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id",
"commands.$": "$"
}),
});
const runContainerTaskMap = new sfn.Map(this, 'RunContainerTaskMap', {
maxConcurrency: 2,
itemsPath: sfn.JsonPath.stringAt('$.commandsList'),
});
runContainerTaskMap.iterator(runContainerTask);
const notifySuccess = new sfn.Pass(this, 'NotifySuccess', {
result: sfn.Result.fromString('Success'),
});
const notifyFailure = new sfn.Pass(this, 'NotifyFailure', {
result: sfn.Result.fromString('Failure'),
});
const definition = callLambda
.addCatch(notifyFailure)
.next(runContainerTaskMap.addCatch(notifyFailure))
.next(notifySuccess);
const stateMachine = new sfn.StateMachine(this, 'StateMachine', {
definition,
});
}
}
// 以下を追加
const stepParallelStack = new StepParallelStack(app, 'StepParallelStack', {
fargateTaskStateMachineArn: stepEcsStack.stateMachine.stateMachineArn,
env
});
引っかかったところ
LambdaInvoke
の必須プロパティはlambdaFunction
のみですが、前回記事でマネジメントコンソールで作成した定義と揃えるためには以下のプロパティの追加が必要でした。
payloadResponseOnly
前回記事では以下の記事に記載されている方法1を使用していましたが、本プロパティのデフォルトはfalse
で方法2となるので、方法1とするため明示的にtrue
を設定しました。
後続タスクに渡す出力のJSON構造が方法1と方法2で異なるので、それに合わせて本プロパティ値または後続タスクの入力JsonPathを設定する必要があります。
retryOnServiceExceptions
本プロパティはデフォルトがtrue
で、true
だと生成されたASLに以下のリトライ設定が追加されました。このままでも問題はありませんが、前回記事ではリトライ設定を入れてなかったので、それと揃うようにfalse
を設定しました。
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
生成されたASL
{
"StartAt": "CallLambda",
"States": {
"CallLambda": {
"Next": "RunContainerTaskMap",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "NotifyFailure"
}
],
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:<アカウントID>:function:StepParallelStack-AssignFunction3E1F6BF7-VWGFAiPdkBov"
},
"RunContainerTaskMap": {
"Type": "Map",
"Next": "NotifySuccess",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "NotifyFailure"
}
],
"Iterator": {
"StartAt": "RunContainerTask",
"States": {
"RunContainerTask": {
"End": true,
"Type": "Task",
"Resource": "arn:aws:states:::states:startExecution.sync:2",
"Parameters": {
"Input": {
"AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id",
"commands.$": "$"
},
"StateMachineArn": "arn:aws:states:ap-northeast-1:<アカウントID>:stateMachine:StateMachine2E01A3A5-vnyg48p039x6"
}
}
}
},
"ItemsPath": "$.commandsList",
"MaxConcurrency": 2
},
"NotifySuccess": {
"Type": "Pass",
"Result": "Success",
"End": true
},
"NotifyFailure": {
"Type": "Pass",
"Result": "Failure",
"End": true
}
}
}
EventBridge
最後に定期実行の設定ですが、新たにスタックを作るほどでもないので、StepParallelStackにワークフロー入力値用のプロパティと下記コードを追加したのみです。
// 以下を追加
const scheduleRule = new events.Rule(this, 'ScheduleRule', {
schedule: events.Schedule.cron({ minute: '24', hour: '6' }),
targets: [new targets.SfnStateMachine(stateMachine, {
input: events.RuleTargetInput.fromObject(props.stateMachineInput),
})],
});
const stepParallelStack = new StepParallelStack(app, 'StepParallelStack', {
fargateTaskStateMachineArn: stepEcsStack.stateMachine.stateMachineArn,
// 以下を追加
stateMachineInput: {
"bucket": "<バケット名>",
"inputPrefix": "input/",
"outputPrefix": "output/"
},
env
});
まとめ
前回マネジメントコンソール上で作成したのと同等の構成をCDKで記述・構築することができました。
StepFunctionsについては、ASLがCDKで使いまわしにくそうなので、はじめからCDKで書くなどプロセスについても要検討ですが、実際に書いてみると、JSONよりもTypeScriptの方が型チェックが効いたり読み書きしやすかったりして良い気もしました。デメリットはグラフがすぐに見れない点ぐらいかも。
また、StepFunctionとECSリソースとの結合が強くなってしまった点は問題になりそうなので要検討です。