株式会社エアークローゼットにてエンジニアをやらせていただいてる大西です。
この記事はエアークローゼット アドベントカレンダー2日目の記事になります。
はじめに
先日、業務の中でStepFunctionの中でEC2インスタンスを起動して、ステータスチェックが終わるまで待つという処理を書く場面がありました。
パッと思いつく実装はLambdaの中で待機処理を書くことですが、諸々の事情からそれはやらない方針となりました。
じゃあ、どうすればええねんと思いながらStepFunctionの機能を調べていると、色々な機能を駆使すればタスクだけで待ち処理を作れることがわかりました。
StepFunctionの機能面で多くの学びがあったので、今回は備忘録も兼ねて学びを書き残そうと思います。
環境
実行環境
- macOS Monterey v12.5.1
- node v16.3.0
package
- typescript@3.9.10
- aws-cdk@2.43.1
StepFunctionのタスクだけでリトライ処理つくりたい
リトライ処理の構成要素とは
初めにリトライ処理を作るために必要な機能が何かについて考えます。
必要な機能には下記のものがあります。
- 処理の本体
- 対象の状態確認
- 状態による分岐処理
- 待機処理
- 無限ループ防止機能(リトライ上限回数、タイムアウトなど)
処理本体と状態確認はLambdaや、StepFunction用タスク、AWS CDK統合などで実装することになります。
状態による分岐処理はChoiceタスク、待機処理はWaitタスクで書くので、割と簡単に実装できそうです。
曲者は無限ループ防止機能です。
2022/12時点ではStepFunctionにループ制御に関する処理がないため、既存の機能をうまく組み合わせて作るしかないです。
リトライ回数の上限を設けるのも、タイムアウトを実装するのも一見簡単そうですが、ちょうどこれらを実装するために必要なタスクがなく、頭を悩ませていました。
余談ですが、AWS CDK統合はStepFunctionのタスクとして、AWSのAPIを実行できる機能です。
AWS SDKでの書き方は下記の記事で解説しているので、興味があればご覧ください。
https://qiita.com/ItsukiOnishi/items/cd517910fecb588d0467
StepFunctionのタスクだけで無限ループ処理を作りたい
色々調べる中で見つけたのが、ステートメント言語の組み込み関数です。
この組み込み関数には算術演算が含まれています。
StepFunctionの中で算術演算ができるとわかったことで、
- 算術演算があれば、カウンターの実装ができる
- カウンターがあればリトライ回数を計算できる
- リトライ回数が分かれ、リトライ上限回数による無限ループ防止機能が作れる
という流れでリトライ処理を作れると思い至りました。
組み込み関数について、詳しくは下記の公式ドキュメントをご覧ください。
https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/amazon-states-language-intrinsic-functions.html#asl-intrsc-func-math-operation
初めは、タイムアウトを実装しようと思ったのですが、Lambda使わないと現在時刻を取得できないんですよね。。。
わざわざそれ用のLambdaを作るのは面倒だったので今回は見送りました。
実際にリトライ処理を作る
ここからは実際にリトライ回数の上限値を決めることによる無限ループ防止機能を有したリトライ処理を作っていきます。
リトライ機能を作るために必要なタスクを列挙すると下記の通りになります。
- カウンターの定義
- 処理
- リトライ回数の確認
- カウンターを増やす
- 待機
無限ループ防止機能をどのように作るかの説明が目的ですので、前述の処理本体と状態の取得は「処理」としてひとまとめにしています。
特に無限ループ防止機能を作る上で重要な
- カウンターの定義
- カウンターを増やす
について解説します。
カウンターの定義
StepFunctionに慣れている方はご存知だと思いますが、数字ひとつ定義するにもPassタスクを一個用意しないといけません。
自分は今回の件で初めてStepFunctionを触ったので、この感覚がパッと思いつかなかったのと、SDKでの書き方で微妙に詰まりました。。。
後続の方々のためにもSDKでのタスク定義方法を残します。
やっていることはcounterを定義して、0
を突っ込んでいるだけです。
const initializeRetryCount = new sfn.Pass(
this,
"InitializeRetryCountForCheckRun",
{
inputPath: "$",
parameters: {
checkRunCount: 0,
},
resultPath: "$.retry",
}
);
カウンターを増やす
前述のステートメント言語の組み込み関数にある算術演算関数を使ってカウンターを増やします。
算術演算関数について詳しくは下記の記事をご覧ください。
https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/amazon-states-language-intrinsic-functions.html#asl-intrsc-func-math-operation
const increaseCheckRunRetryCount = new sfn.Pass(
this,
"IncreaseCheckRunRetryCount",
{
inputPath: "$",
parameters: {
"checkRunCount.$": "States.MathAdd($.retry.checkRunCount, 1)",
},
resultPath: "$.retry",
}
);
ソースコード
最後にEC2インスタンスを起動して、ステータスがOKになるまで待つ処理のソースコードをベタ張しておきます。
長いのでおりたたみにしています。
ソースコード
import { Construct } from "constructs";
import { Stack, StackProps, Duration } from "aws-cdk-lib";
import * as sfnTasks from "aws-cdk-lib/aws-stepfunctions-tasks";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as sfn from "aws-cdk-lib/aws-stepfunctions";
import * as iam from "aws-cdk-lib/aws-iam";
export class RetryStack extends Stack {
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
// ---------- カウンターの定義 ----------
const initializeRetryCount = new sfn.Pass(
this,
"InitializeRetryCountForCheckRun",
{
inputPath: "$",
parameters: {
checkRunCount: 0,
},
resultPath: "$.retry",
}
);
// ---------- 処理 ----------
const startInstanceFunction = new lambda.Function(
this,
"StartInstanceFunction",
{
functionName: "StartInstanceFunction",
handler: "startInstance.handler", // Lambdaのソースコードのファイルを指定
code: lambda.Code.fromAsset("src"), // ソースコードのディレクトリをしてい
runtime: lambda.Runtime.NODEJS_16_X, // ランタイムの指定
}
);
// EC2インスタンスの起動
const startInstanceTask = new sfnTasks.LambdaInvoke(
this,
"StartInstanceTask",
{
lambdaFunction: startInstanceFunction,
payloadResponseOnly: true,
inputPath: "$",
resultPath: "$",
}
);
const describeInstanceStatusTaskRole = new iam.Role(
this,
"DescribeInstanceStatusTaskRole",
{
assumedBy: new iam.CompositePrincipal(
new iam.ServicePrincipal("states.amazonaws.com")
),
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName("AmazonEC2FullAccess"),
],
}
);
// EC2インスタンスのステータスを取得
const describeInstanceStatusTask = new sfnTasks.CallAwsService(
this,
"DescribeInstanceStatusTask",
{
service: "ec2",
action: "describeInstanceStatus",
// state machineのinputで単一のinstanceIdが渡ってくる想定
parameters: {
"InstanceIds.$": "States.Array($.instanceId)",
},
iamResources: [describeInstanceStatusTaskRole.roleArn],
resultPath: "$.describeInstanceResult",
}
);
// EC2インスタンスのステータスを確認
const checkRunInstance = new sfn.Choice(this, "checkRunInstance", {
inputPath: "$",
});
// ---------- 無限ループ防止処理にかかるタスク ----------
// リトライ回数の確認
const checkRetryCount = new sfn.Choice(this, "CheckRunInstanceRetryCount", {
inputPath: "$",
});
// カウンターを増やす
const increaseCheckRunRetryCount = new sfn.Pass(
this,
"IncreaseCheckRunRetryCount",
{
inputPath: "$",
parameters: {
"checkRunCount.$": "States.MathAdd($.retry.checkRunCount, 1)",
},
resultPath: "$.retry",
}
);
// 待機
const interval = new sfn.Wait(this, "IntervalForCheckRunInstance", {
time: sfn.WaitTime.duration(
Duration.seconds(30) // 待機時間の設定
),
});
// ---------- 次の処理 ----------
const nextTask = new sfn.Succeed(this, "nextTask");
const failTask = new sfn.Fail(this, "failTask");
// ---------- タスク関係を定義 ----------
initializeRetryCount
.next(startInstanceTask)
.next(describeInstanceStatusTask)
.next(checkRunInstance);
// ステータスがOKを確認
checkRunInstance
.when(
sfn.Condition.or(
sfn.Condition.and(
sfn.Condition.stringEquals(
"$.describeInstanceResult.InstanceStatuses[0].InstanceState.Name",
"running"
),
sfn.Condition.stringEquals(
"$.describeInstanceResult.InstanceStatuses[0].InstanceStatus.Status",
"ok"
),
sfn.Condition.stringEquals(
"$.describeInstanceResult.InstanceStatuses[0].SystemStatus.Status",
"ok"
)
)
),
nextTask // 次の処理のタスクを入れる
)
.otherwise(checkRetryCount);
checkRetryCount
.when(
sfn.Condition.numberLessThan(
"$.retry.checkRunCount",
3 // リトライ回数の上限値を設置
),
increaseCheckRunRetryCount
)
.otherwise(failTask);
increaseCheckRunRetryCount.next(interval).next(describeInstanceStatusTask);
// ---------- ステートマシンの定義 ----------
const role = new iam.Role(this, "StartInstanceStepFunctionRole", {
assumedBy: new iam.CompositePrincipal(
new iam.ServicePrincipal("lambda.amazonaws.com"),
new iam.ServicePrincipal("states.amazonaws.com")
),
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName(
"service-role/AWSLambdaBasicExecutionRole"
),
iam.ManagedPolicy.fromAwsManagedPolicyName("AmazonEC2FullAccess"),
],
});
new sfn.StateMachine(this, "StartInstanceStepFunction", {
role,
definition: initializeRetryCount,
timeout: Duration.minutes(30),
});
}
}
おわりに
StepFunctionにfor文的なタスクを追加されれば、ここまでややこしいことしなくて済むのですが。。。
(現在時刻を取得できるタスクとかも)
StepFunction自体がまだまだ発展途上にあるので、これからもっと使いやすくなっていくことに期待しましょう!
次はデザイナーの土井さんです!
よろしくお願いします!