0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ECS TaskからStep Functionsへのタスク完了通知(sendTaskSuccess)の実装

Posted at

はじめに

ECSタスクからStep Functionsへタスク完了を通知し、ECSタスクの実行結果を受け取る実装について確認します。
参考記事
AWS CDKを使用したインフラストラクチャのコード化(IaC)の例も含めて説明していきます。

システム概要

以下のようなアーキテクチャを実装します:

  1. Step Functionsがタスクトークンを含むECSタスクを起動
  2. ECSタスク内のPythonアプリケーションが処理を実行
  3. 処理完了後、タスクトークンを使用してStep Functionsに完了通知を送信

実装のポイント

1. タスクトークンの受け渡し

Step Functions側の設定(CDK)

const runTask = new stepfunctions_tasks.EcsRunTask(this, 'Run ECS Task', {
  integrationPattern: stepfunctions.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
  cluster: cluster,
  taskDefinition: taskDefinition,
  launchTarget: new stepfunctions_tasks.EcsFargateLaunchTarget(),
  containerOverrides: [
    {
      containerDefinition: container,
      environment: [
        { name: 'TASK_TOKEN', value: stepfunctions.JsonPath.taskToken }
      ],
    },
  ],
});

ポイント:

  • IntegrationPattern.WAIT_FOR_TASK_TOKENを指定
  • コンテナの環境変数としてTASK_TOKENを渡す

2. ECSタスクへの権限付与

// Add StepFunctions permissions to the task role
taskDefinition.addToTaskRolePolicy(
  new iam.PolicyStatement({
    effect: iam.Effect.ALLOW,
    actions: [
      'states:SendTaskSuccess',
      'states:SendTaskFailure'
    ],
    resources: ['*']
  })
);

ポイント:

  • ECSタスクロールにStep Functionsへの権限を追加
  • SendTaskSuccessSendTaskFailureの両方の権限を付与

3. タスク完了通知の実装(Python)

import os
import boto3
import json

def main():
    # 処理の実行
    result = {"message": "Task completed successfully", "data": 42}
    
    # Step Functionsにタスク完了を通知
    sfn_client = boto3.client('stepfunctions')
    sfn_client.send_task_success(
        taskToken=os.environ['TASK_TOKEN'],
        output=json.dumps(result)
    )

if __name__ == '__main__':
    main()

ポイント:

  • 環境変数からTASK_TOKENを取得
  • boto3を使用してStep Functionsクライアントを初期化
  • send_task_successで完了通知と結果を送信

エラーハンドリング

エラーが発生した場合はsend_task_failureを使用して失敗を通知することができます:

try:
    # 処理の実行
    result = process_task()
    sfn_client.send_task_success(
        taskToken=os.environ['TASK_TOKEN'],
        output=json.dumps(result)
    )
except Exception as e:
    sfn_client.send_task_failure(
        taskToken=os.environ['TASK_TOKEN'],
        error='TaskProcessingError',
        cause=str(e)
    )

実行結果の受け取り

Step FunctionsでECSのタスクの実行結果は以下のように状態の出力欄で確認できます。

ecs-task-result.png

まとめ

この実装により、以下のメリットが得られます:

  1. 非同期処理:Step Functionsは処理完了を待機できる
  2. 柔軟な結果通知:成功/失敗の両方のケースに対応
  3. IaCによる管理:CDKで権限やインフラを一元管理

Step FunctionsでECSのタスクの実行結果を受け取ることで、ECSの実行結果を後続処理で利用したい場合があると思いますが、
上記のような実装を行うことで、実現できると思います。

参考文献

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?