はじめに
ECSタスクからStep Functionsへタスク完了を通知し、ECSタスクの実行結果を受け取る実装について確認します。
参考記事
AWS CDKを使用したインフラストラクチャのコード化(IaC)の例も含めて説明していきます。
システム概要
以下のようなアーキテクチャを実装します:
- Step Functionsがタスクトークンを含むECSタスクを起動
- ECSタスク内のPythonアプリケーションが処理を実行
- 処理完了後、タスクトークンを使用してStep Functionsに完了通知を送信
実装のポイント
1. タスクトークンの受け渡し
Step Functions側の設定(CDK)
const runTask = new stepfunctions_tasks.EcsRunTask(this, 'Run ECS Task', {
integrationPattern: stepfunctions.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
cluster: cluster,
taskDefinition: taskDefinition,
launchTarget: new stepfunctions_tasks.EcsFargateLaunchTarget(),
containerOverrides: [
{
containerDefinition: container,
environment: [
{ name: 'TASK_TOKEN', value: stepfunctions.JsonPath.taskToken }
],
},
],
});
ポイント:
-
IntegrationPattern.WAIT_FOR_TASK_TOKEN
を指定 - コンテナの環境変数として
TASK_TOKEN
を渡す
2. ECSタスクへの権限付与
// Add StepFunctions permissions to the task role
taskDefinition.addToTaskRolePolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'states:SendTaskSuccess',
'states:SendTaskFailure'
],
resources: ['*']
})
);
ポイント:
- ECSタスクロールにStep Functionsへの権限を追加
-
SendTaskSuccess
とSendTaskFailure
の両方の権限を付与
3. タスク完了通知の実装(Python)
import os
import boto3
import json
def main():
# 処理の実行
result = {"message": "Task completed successfully", "data": 42}
# Step Functionsにタスク完了を通知
sfn_client = boto3.client('stepfunctions')
sfn_client.send_task_success(
taskToken=os.environ['TASK_TOKEN'],
output=json.dumps(result)
)
if __name__ == '__main__':
main()
ポイント:
- 環境変数から
TASK_TOKEN
を取得 -
boto3
を使用してStep Functionsクライアントを初期化 -
send_task_success
で完了通知と結果を送信
エラーハンドリング
エラーが発生した場合はsend_task_failure
を使用して失敗を通知することができます:
try:
# 処理の実行
result = process_task()
sfn_client.send_task_success(
taskToken=os.environ['TASK_TOKEN'],
output=json.dumps(result)
)
except Exception as e:
sfn_client.send_task_failure(
taskToken=os.environ['TASK_TOKEN'],
error='TaskProcessingError',
cause=str(e)
)
実行結果の受け取り
Step FunctionsでECSのタスクの実行結果は以下のように状態の出力
欄で確認できます。
まとめ
この実装により、以下のメリットが得られます:
- 非同期処理:Step Functionsは処理完了を待機できる
- 柔軟な結果通知:成功/失敗の両方のケースに対応
- IaCによる管理:CDKで権限やインフラを一元管理
Step FunctionsでECSのタスクの実行結果を受け取ることで、ECSの実行結果を後続処理で利用したい場合があると思いますが、
上記のような実装を行うことで、実現できると思います。