この記事は、ケーシーエスキャロット Advent Calendar 2021の24日目の記事です。
Step Functionsとは
本当にざっくり紹介しますと、
各処理(Lambda関数/Fargateタスク/その他AWSサービス等々)を
指定した順番通りに、
処理実行フローを作成できるAWSのサービス(サーバーレス)です。
処理結果での分岐や他にも色々フローに追加できます。
処理の流れも分かりやすく可視化してくれて、
どのイベントまで処理が走っているのか?も確認出来ます。
AWS公式サイト
定義の例
本当に基礎的な例ですが、例えば、
- 開始
- Lambda関数(test1) 実効
- parameterに渡されるid毎に処理実行
- Lambda関数(test1)の結果判定
- OK
- Lambda関数(test2) 実行
- test1同様id毎に処理実行
- Lambda関数(test2) 実行
- NG
- Lambda関数(test3) 実行
- test1同様id毎に処理実行
- Lambda関数(test3) 実行
- OK
- 終了
という流れの処理を実行したい場合、
StepFunctionsの定義のレイアウトを見るとこのように見えます。
定義はjsonで作成しています。
"StartAt": "test1",
"States": {
"test1": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:function:test1",
"Parameters": {
"id.$": "$.id"
},
"Next": "choice-status",
"TimeoutSeconds": 900,
"ResultPath": "$.result-status"
},
"choice-status": {
"Type": "Choice",
"Choices": [
{
"Not": {
"Variable": "$.result-status.result",
"StringEquals": "ok"
},
"Next": "test3"
}
],
"Default": "test2"
},
"test2": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:function:test2",
"Parameters": {
"id.$": "$.id"
},
"End": true,
"TimeoutSeconds": 900,
"ResultPath": null
},
"test3": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:function:test3",
"Parameters": {
"id.$": "$.id"
},
"End": true,
"TimeoutSeconds": 900,
"ResultPath": null
}
}
}
Typeにそれぞれの実行関数を指定する事が出来ます。
Type | 概要 |
---|---|
Task | ステートマシンの実行される単一の作業単位(Lambda 関数実行はこれ) |
Choice | ステートマシンに分岐ロジックを追加 |
Fail | ステートマシンの実行を停止し、失敗としてマーク |
Succeed | ステートマシンの実行を正常に停止 |
Pass | 何も処理せずに入力値を出力に渡す(デバッグで使う) |
Wait | ステートマシンの処理続行を指定した時間遅延させる |
Parallel | ステートマシンの並列処理で使用 |
Map | ステートマシンの動的な並列処理で使用 |
公式ドキュメント参照 |
それ以外の内容については今回は省略します
今回やりたいこと
lambdaバッチに問題があり急ぎ修正したので、
今までの実行分(成功分のみ)を再実行できれば、、、
という事で、pythonでスクリプトを作ってみました。
※ 基本的に、parameterにあるidで重複した実行は無い前提です。
import boto3
state_machine_arn = 'arn:aws:states:ap-northeast-1:xxxxxxxxxxx:stateMachine:TestStateMachine'
client = boto3.client('stepfunctions', aws_access_key_id='xxxxxxxxxxxxxxxxxx', aws_secret_access_key='xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', region_name='ap-northeast-1')
response = client.list_executions(
maxResults=1000,
stateMachineArn=state_machine_arn,
statusFilter='SUCCEEDED')
executions = response.get('executions')
for item in executions:
execution_arn = item.get('executionArn')
describe = client.describe_execution(
executionArn=execution_arn
)
input =describe.get('input')
response = client.start_execution(
stateMachineArn=state_machine_arn,
input=input
)
処理の説明ですが、
response = client.list_executions(
maxResults=1000,
stateMachineArn=state_machine_arn,
statusFilter='SUCCEEDED')
executions = response.get('executions')
まずはstepfunctionsのステートマシン(TestStateMachine)の成功分のみの情報(executions)を、
list_executionsメソッドを利用して取得しています。
list_executions()
for item in executions:
execution_arn = item.get('executionArn')
describe = client.describe_execution(
executionArn=execution_arn
)
input =describe.get('input')
ここから実行毎の処理に入ります。
実行ARNを取得し、describe_executionメソッドに実行ARNを指定して、
実効入力値(input)を取得してます。
describe_execution()
response = client.start_execution(
stateMachineArn=state_machine_arn,
input=input
)
最後にstart_executionメソッドに入力値(input)を指定して、
正常終了している実行分と同じ入力値で再実行する。
start_execution()
という流れになります。
同じ実行内容を再実行する為に、
実行済みの入力値をすべて取得して、
同じ入力値を利用して再実行する流れになります。
きっと、もっと良い方法がある気がしますが、
取り合えずこんな感じです