AWS で簡易的なジョブ管理をしてみよう、という試みです。
概要
AWS 上でジョブ管理するにはどうしたら良い?という話がありましたので、自分なりに少し考えてみました。普通に考えれば、例えば以下のようになるかとおもいます。
- Cloud Watch Events、場合によっては Event Bridge を使ってジョブを起動する
- ジョブの流れは Step Functions で様々なフロー制御を組み込んで作成
- Step Functions から Lambda 関数、ECS RunTask、AWS Batch などを順次呼び出す
- ログは Cloud Watch Logs に格納
- 問題発生時には SNS から SES や Slack 連携などでアラートを発報する
ただですね、今回は以下のような制約をつけて考えてみました。
- 数人の小さなチームで、スケジュールがかなり厳しい中で開発する必要がある
- AWS に詳しいのは自分だけで、他のメンバーはこれから学ぶ必要がある
こういった状態ですと、いまから AWS を学んでもらうのは効率が悪いです。期間も短いことですし、他のメンバーには使い慣れた言語で、業務ロジックをゴリゴリとコードに落として欲しい。
なので今回は、NodeJS や Java の知識だけでバッチ処理を書いてもらうためには、どうやって AWS の数多くのサービスを抽象化して隠すか、がアイデアの骨子となります。特に Step Functions は癖が強く、他の開発者からは隠したい。SNS を経由したアラート発報なども隠したい。
今回の構成
今回は Step Functions を使って、簡易的ではありますが、汎用的なジョブ管理ステートマシンを作成してみます。
まず各開発者には、NoteJS や Java など得意な言語で、タスク単位で業務ロジックを実装してもらいます。ただし今回はジョブ管理の抽象化が主目的なので、S3、RDS、DynamoDB などの AWS API 利用は抽象化せず、そのまま使ってもらう想定です。そして作成した業務ロジックは、Lambda 関数だったり、ECS Task としてデプロイします。
そして「タスクリスト」を用意して、Step Functions で作成したジョブ管理システムから読み込み、この業務ロジックを実装したタスクを順に実行していきます。
この「タスクリスト」も、開発者のみなさんにNoteJS や Java など得意な言語で作成してもらいます。単に条件を並べたファイルではなく、規約に沿って作成した Control 機能をもったロジックとしての実装になります。
AWS 側の担当者、つまり私は、上記のジョブ管理システムを Step Functions で作成します。タスクリストごとに個別に開発するのではなく、1つ汎用的なものを開発して、すべてのタスクリスト(を実装した Control ロジック) に対応します。そしてログ出力や、SES 経由でのアラート発報も、Control ロジックの指示に従って Step Function 側で対応します。
これにより AWS を知らない開発者でも、業務ロジックをタスクに分割して得意な言語で開発し、更にその呼び出し順も得意な言語で記述したロジックで制御することができます。すべて同じ開発言語、開発環境で実装できるので、短い期間でも素早く開発作業を開始することができます。
Lambda ジョブを作成する
まずは業務ロジックを模したタスク(ジョブ)を作成してみます。
今回は実行するジョブを Lambda + NodeJS を用いて開発します。いちばん慣れているもので手早く。他の言語や ECS RunTask などは後で試すこととします。
ランダム数値を生成する (randValue)
本当は RDB にアクセスするとか、何か業務ロジック的なものを用意したかったのですが。。まずは以下のように、0~99までのランダムな数値を生成するだけの非常に簡易なロジックを用意します。エラーも吐かないです。
exports.handler = async (event) => {
event.data = event.data ? event.data : {};
event.data.value = Math.floor(Math.random() * 100);
event.ret = 0;
return event;
};
以下のように Lambda を新規に作成します。名前は rtksb-sample01
キーワードに、ジョブの名前を連結したものとします。
上記のコードを貼り付けるだけ。
テストを実行すると、入力されたJSONデータに対し、以下の赤枠部分が追加されるのが確認できます。実行するたびに異なるランダム値が生成されることを確認します。
数値が偶数であるか検査する (testEvenValue)
もうひとつ、エラーを吐く可能性のあるジョブを用意しましょう。以下のように、与えられた値が偶数かどうか検査するロジックにしてみました。
exports.handler = async (event) => {
event.data = event.data ? event.data : {};
if (event.data.value == undefined || typeof event.data.value != "number") {
event.ret = -1;
} else if (event.data.value % 2 == 1) {
event.ret = -2;
} else {
event.ret = 0;
}
return event;
};
Lambda を新規作成して、上記のコードを貼り付け、そのまま Lambda のテスト実行をすると value が定義されていないのでエラーになります。
入力用の JSON をちゃんと設定して、value に偶数が指定されると 0 の正常終了値を返します。
value に奇数が指定されると -2 のエラー値を返します。
さてこれで、最低限、2つのジョブ用Lambda処理が用意できました。
想定するフロー
全体の流れは想像つきますよね。今回の一連のジョブを実行すると、以下のどちらかの流れになると想定しています。
-
randValue
で偶数のランダム値が生成される。testEvenValue
では特にエラーとならないので、処理が正常終了する。 -
randValue
で奇数のランダム値が生成される。testEvenValue
でエラーとなり、処理が異常終了する。
Lambda コントローラを作成する
さて、ジョブ全体をコントロールするロジックも、各ジョブと同じ方法で定義しましょう。
NodeJS で記述したコントローラのコードが以下になります。このコードを rtksb-sample01-control
という名前の Lambda として新規作成します。control
という名前は確定です。
exports.handler = async (event) => {
if (event.version != 'rtksb:2022-09-16' || event.id != 'rtksb-sample01') {
event.error_msg = 'Incorrect flow object: ' + JSON.stringify(event);
event.error_no = 400;
event.status = 'abend';
} else if (event.next == undefined || event.next == '') { // first step
event.next_type = 'lambda';
event.next = 'randValue';
event.fname = 'function:rtksb-sample01-' + event.next + ':$LATEST'; // 暫定対応
event.status = 'task';
} else if (event.next == 'randValue') {
event.next_type = 'lambda';
event.next = 'testEvenValue';
event.fname = 'function:rtksb-sample01-' + event.next + ':$LATEST'; // 暫定対応
event.status = 'task';
} else if (event.next == 'testEvenValue') {
if (event.ret == 0) {
event.status = 'end';
} else if (event.ret == -1) {
event.status = 'error';
} else {
event.error_msg = 'Incorrect input: ' + JSON.stringify(event);
event.error_no = 401;
event.status = 'abend';
}
} else {
event.error_msg = 'Invalid next: ' + JSON.stringify(event);
event.error_no = 500;
event.status = 'abend';
}
return event;
};
少し長いですが、ロジックとしては単純です。ジョブをどの順番で呼び出すのか、そして戻り値によってフローをどう制御するのか、を if 文で判断しつつ各ケースを記述しているだけ。
event.status が重要で、この後に出てくる Step Functions で作成したイベント実行エンジンへの指示になります。あとはエラー情報ですね。その他の値は、基本的にはこの control で再度利用するために設定しています。
さて、最初の呼び出し時には以下のような JSON が与えられることを想定しています。
{
"version":"rtksb:2022-09-16",
"id":"rtksb-sample01",
"control_fname":"function:rtksb-sample01-control:$LATEST"
}
なお、これらの入出力において fname
および control_fname
は本来は不要な値なのですが、今回は実装を簡単にするため利用しています。これは今後、改善したいとおもっています。
この「最初に実行すべきジョブ」を実行後、またこの control ロジックに戻ってくるわけです。ループしつつ、処理を実行していくイメージ。このあたりは、Step Functions の実行エンジンのほうを見たほうがはやいでしょう。
Step Functions によるジョブ実行エンジン
準備が終わりました。いま、以下の3つの Lambda 関数が利用可能になっています。
いよいよ、Step Functions を利用してジョブ実行エンジンを作成していきましょう。RTK Simple Batch
という名前で、Step Functions を新規に作成します。
以下は作成直後の状態で、開始と終了の Pass state のみ追加してあります。この部分は本当は Cloud Watch のログ出力部品になるはずでしたが、今回はテストのため何もしない Pass state を仮置きしています。
とりあえず作成したフローは以下になります。
WorkFlow Studio だと以下の表示になります。
各タスクの終了後、Control ロジックでエラーコードなど判別して、フロー制御に利用できる値を返却するようにしています。現時点では、以下を想定しています。
返却値 | フロー側の対応 |
---|---|
task | 問題なし。次のタスクを実行する |
error | 問題発生。ただし致命的ではないためログ出力した後、次のタスクを実行する |
abend | 異常発生。フローをただちに中断する |
end | 処理が完了。フローを終了する |
Step Functions の JSON 表現をそのまま貼りますので、参考にしてみてください。
{
"Comment": "A description of my state machine",
"StartAt": "ジョブフローの開始 (ログ出力)",
"States": {
"ジョブフローの開始 (ログ出力)": {
"Type": "Pass",
"Next": "idが指定されていること"
},
"idが指定されていること": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.id",
"IsString": true,
"Next": "control 呼出"
}
],
"Default": "フロー異常終了 (SNSアラート)"
},
"control 呼出": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName.$": "$.control_fname"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"Next": "Choice"
},
"Choice": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "task",
"Next": "ジョブ実行"
},
{
"Variable": "$.status",
"StringEquals": "error",
"Next": "ジョブエラー終了 (ログ出力)"
},
{
"Variable": "$.status",
"StringEquals": "abend",
"Next": "フロー異常終了 (SNSアラート)"
}
],
"Default": "ジョブフローの終了 (ログ出力)"
},
"ジョブ実行": {
"Type": "Pass",
"Next": "ジョブ形式選択"
},
"ジョブ形式選択": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.next_type",
"StringEquals": "lambda",
"Next": "Lambdaジョブ実行"
}
],
"Default": "ECS RunTaskジョブ実行"
},
"ECS RunTaskジョブ実行": {
"Type": "Task",
"Resource": "arn:aws:states:::ecs:runTask",
"Parameters": {
"LaunchType": "FARGATE",
"Cluster": "arn:aws:ecs:REGION:ACCOUNT_ID:cluster/MyECSCluster",
"TaskDefinition": "arn:aws:ecs:REGION:ACCOUNT_ID:task-definition/MyTaskDefinition:1"
},
"Next": "control 呼出"
},
"Lambdaジョブ実行": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName.$": "$.fname"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"Next": "control 呼出"
},
"ジョブフローの終了 (ログ出力)": {
"Type": "Pass",
"End": true
},
"ジョブエラー終了 (ログ出力)": {
"Type": "Pass",
"Next": "control 呼出"
},
"フロー異常終了 (SNSアラート)": {
"Type": "Pass",
"End": true
}
}
}
※ 「ECS RunTaskジョブ実行」は部品を貼っただけで、設定していません
実行
以下の入力を用いて、作成した Step Functions を何度か実行してみます。
{
"version":"rtksb:2022-09-16",
"id":"rtksb-sample01",
"control_fname":""function:rtksb-sample01-control:$LATEST""
}
ランダム値で偶数が出た場合は、ジョブが正常終了します。
ランダム値で偶数が出た場合は、ジョブが異常終了します。
というわけで
今回は、アイデアを実現できるか確認するため、非常に簡易的な仕様で実装してみました。動作確認だけの最低限なので、まだまだ実装したい機能はたくさんあります。例えば…
- ログ出力やアラート発報など、AWS の各サービスと連携させ動作するようにしたい
- 入出力オブジェクトに fname など不要な中間値が含まれているのを隠したい (タスクの事前処理、事後処理の共通ロジックを追加するなど)
- フローの動作がわかりやすくログに出力される実装にする
- Control ロジックの実装規約を決め、もう少し読みやすいコードにする
- (現在でも入力JSONの設定で途中からの開始はできるが) ジョブの途中からの実施、一部実施など手動で柔軟に起動できるようタスク管理を改善する
- 乱数と偶数というサンプルはあまりに簡単すぎるので、もう少しまともなフローを、複数用意する
- NodeJS だけでなく Java で記述したタスクを用意する。また ECS RunTask の起動も試す
- DynamoDB などに実施ステータスを保持、参照するなどフローだけでないステート管理を検討する
などなど。。
というわけで、まだ荒いアイデアですし、実際に使えるかは不明ですが。面白いな、とか。部分的に使えるな、とか。何かしらお役にたつようであればうれしいです。
それではまた!