はじめに
システムの中で、1日1回シェルスクリプトを動かしてバッチ処理を行うときがあります。1台の EC2 上で完結するくらいの処理量であれば、それほど問題になることはありませんが、処理量が多くなってきたときに分散処理をしたくなります。1台の EC2 で完結していたときは、バッチファイルの中でエラーハンドリングを行っていましたが、分散処理をするときは他の仕組みが必要です。こういった時に、便利に活用できるのが、AWS Step Functions です。これは、ワークフローを組み立てて、実行が出来るため、分散されたバッチ処理環境の中でエラーのハンドリングなど、適切に管理ができます。
今回の記事では、バッチ処理のワークフロー管理として、Step Functions を動かしてみる記事になります。Step Functions から ECS Task を呼びだし、エラーハンドリングを行ってみましょう。
環境について
以前の記事で作成した Step Functions と ECS の環境を利用します。
Step Functions の State Machine でエラーハンドリング
編集対象の State Machine を選択して、Edit を押します
Workflow Studio を選択して、グラフィカルな編集画面を開きます。
Workflow Studio の画面が開きました
まず、正常終了時には Success のアクションにたどり着きたいので、アイコンを置きます
次に、ECS の RunTask でエラーになったときの動作を指定していきます。エラーハンドリングを行いたいアクションを選択して、Error handling の画面を開きます。
この画面には、「Retry on errors」と「Catch errors」の2種類のハンドリングのタイプがあります。
- Retry on errors : エラーが発生したときに、そのアクションをリトライする。リトライを繰り返すと、指定した時間分の時間間隔が伸びていく。
- Catch errors : エラーが発生したときに、特定のアクションでリカバリーを行う。Retry on errors と Catch errors を両方指定した場合は、Retry on errors を優先で処理する。リトライしてもエラーが出続ける場合は、Catch errors の処理に進む。
次の URL に詳細が記載されてあります : https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/concepts-error-handling.html#error-handling-fallback-states
今回は、Catch errors の設定を進めていきましょう。Catch errors の個所で、Add new catcher を選択します。
設定画面が表示されました。主な3つの設定項目があります。
- Errors : キャッチするエラーのタイプを指定。指定可能なエラーのタイプはドキュメントに記載
- Fallback state : エラーをキャッチしたあとに、どのアクションに遷移するか指定
- ResultPath : 次のアクションに遷移する際に、エラーの詳細を次のアクションに渡すときの ResultPath を指定
Errors には、States.TaskFailed
を選択して、実行時のエラーを拾います。
Fallback state で Add new state
を選択して、新たなアクションを指定できるようにします。
新たな枠が現れました
今回は、エラー時に Lambda を動かしてエラーの詳細情報を CloudWatch Logs に出力をしていきます。
実行する Lambda Function の ARN を指定します。Payload は、state input をそのまま渡すことにします。
なお、この Lambda は、受け取った Event をそのまま logger.info(json.dumps(event))
としており、CloudWatch Logs に出力する実装になっています。
import json
from logging import getLogger, INFO
logger = getLogger(__name__)
logger.setLevel(INFO)
def lambda_handler(event, context):
print("============ logger.info の出力 ============")
logger.info(json.dumps(event))
return {
"statusCode": 200,
"body": json.dumps({
"message": "hello",
}),
}
右上の Apply and exit を選択します。
Save を押します
Start execution を選択します
State Machine が実行されます。
ECS Task を実行の結果、エラーとなり Lambda が起動していることがわかります。
Step Functions から Lambda を起動するときの Payload を確認すると、Cause にエラーの詳細情報がわたっていますが、JSON を文字列として扱っていることがわかります。「CloudWatch Logs」のリンクをクリックして、CloudWatch 側で受け取ったログを確認します。
CloudWatch Logs 上でも、1個の文字列としてやってきました。
エラーメッセージを整形
上記のように、一個の文字列でもエラーの詳細を確認できますが、検索など活用がしにくいため文字列から JSON に変換をしていきます。
State Machine の Workflow Studio を開きます。
Lambda の Payload に渡す前に、JSON に変換を行いたいです。Lambda の前に Pass アクションを追加します。
Pass の Input から、次のように設定をいれて、JSON に変換を加えます。
{
"Cause.$": "States.StringToJson($.Cause)"
}
なお、この指定方法については 次の Document に記載があります。
Apply and exit を押します。
Save を押します
Start Execution を選択して、実行確認してみましょう。
実行が完了しました
Lambda を実行しているときの Payload が JSON に変換されていることがわかります。
CloudWatch Logs 上でも、整形されていますね。
この Log の中には、StoppedReason
の値もあり、停止した理由がわかります。
また、ExitCode も取得できています。
わかったこと
-
ECS Task の ExitCode が 0 以外の場合は、Step Functions 上でエラーとして扱ってくれる
-
ECS Task の実行がエラーになったときに、Cause にエラーの詳細メッセージが表示されるが、String 型となっており、後続の処理がやりにくい。Pass アクションを中継して、String を JSON に変換することで、後続の処理をやりやすくできる
-
{ "Cause.$": "States.StringToJson($.Cause)" }
-
参考URL