4
Help us understand the problem. What are the problem?

More than 1 year has passed since last update.

posted at

AWS CDK & Step FunctionsでInputPathを加工する

結論

色々試した内容を整理しながら順に書いているとあまりに長くなってしまったので結論だけ書きます。
いわゆるTL;DR。

AWS CDK組み込みのEvaluateExpressionタスクを利用することで、InputPathを文字列変換してresultPathに格納し、次のタスクに渡すことができます。

evaluate-expression-task.ts
const createMessage = new tasks.EvaluateExpression(this, 'Create message', {
  expression: '`Now waiting ${$.waitSeconds} seconds...`',
  resultPath: '$.message',
});

以降の内容はほぼこの結果にたどり着くまでの備忘録です。

前置き

Step Functionsでは、タスクの入出力にJSON Pathを利用します。

ステートマシン実行時のパラメータに

input.json
{
    "arg1": "hoge",
    "arg2": "fuga"
}

のようにJSON形式で値を渡してあげると

$.arg1

等のようにJSON Path形式でタスク内で値を参照することが出来ます。

これがタスク定義の中で、実行時の入力が1つ目のタスクのState Inputとなり、1つ目のタスクのState Outputが2つ目のタスクのState Inputとなり、と繋がっていきます。

参考: Step Functionsの入出力処理

今回やりたかったこと

Step FunctionsでECSのタスク実行をするケースについて考えます。

今回やりたかったことは、2つのバッチ処理的なタスクが直列されていて、一部がタスクごとに異なる固定値、一部が実行時パラメータで構成されるコマンドでContainerOverrideにパラメータを渡したいという内容です。

Step Functions.png

1つ目と2つ目のタスクでは引数が異なっているが、最後の引数となる2020-12-01は共通の実行時パラメータとして渡したいというイメージです。

CDKで定義したStepFunctionsでデータを渡す処理のおさらい

順を追って見ていきます。

Step FunctionsでECSへデータを渡す

参考: Step Functions で Amazon ECS または Fargate タスクを管理する

上記参考リンクにあるとおり、ECSタスクにデータを渡すにはContainerOverridesパラメータを使用します。

"ContainerOverrides": [
    {
        "Name": "container-name",
        "Command.$": "$.commands" 
    }
]

"Command.$": "$.commands"という形式で指定すると、InputPathから実行時に値を解決してコマンドを上書きしてくれます。

すべて固定値で上書きしたい場合は"Command": ["arg1", "arg2", "arg3"]という配列形式で指定することも可能です。

CDKでContainerOverridesを指定する

CDKでECSの実行タスクを定義するにはEcsRunTaskクラスを利用します。
ContainerOverridesはそのクラス内のフィールドとしてContainerOverrideインターフェースの配列として定義されています。

さらにContainerOverride内ではcommandはStringの配列として定義されています。
つまり、commandを上書きするには素直に書くと以下のような感じになります。

ecs-run-task.ts
const ecsRunTask = new sfnTask.EcsRunTask(this, 'EcsRunTask', {
  cluster: cluster,
  launchTarget: new sfnTask.EcsFargateLaunchTarget,
  taskDefinition: taskDefinition,
  integrationPattern: sfn.IntegrationPattern.RUN_JOB,
  containerOverrides: [
    {
      containerDefinition: container,
      command: ['aggregate.sh', 'user_log', '2020-12-01'],
    }
  ],
})

ここからCfnをエクスポートすると、コマンド部分は"Command":["aggregate.sh","user_log","2020-12-01"]というような固定値タイプの定義が出力されます。

CDKのContainerOverridesにPathの値を使用する

では、実行時の入力を使用してOverrideするにはどうすればよいでしょうか。
最もシンプルな形としては、InputPathに配列を定義して展開します。

input.json
{
    "command": ["aggregate.sh", "user_log", "2020-12-01"]
}

InputPathを配列に展開するにはJsonPathクラスのlistAtメソッドを使用します。

ecs-run-task.ts
const ecsRunTask = new sfnTask.EcsRunTask(this, 'EcsRunTask', {
  cluster: cluster,
  launchTarget: new sfnTask.EcsFargateLaunchTarget,
  taskDefinition: taskDefinition,
  integrationPattern: sfn.IntegrationPattern.RUN_JOB,
  containerOverrides: [
    {
      containerDefinition: container,
      command: sfn.JsonPath.listAt('$.command'),
    }
  ],
})

このソースをエクスポートするとコマンド部分が"Command.$":"$.command"となる定義が出力され、パス参照タイプになっていることが分かります。
string[]を指定したときはCommandだったキー項目がCommand.$に変わっているあたりはちょっとした黒魔術感。

ここまでのことから、InputPathを実行時に以下のように指定することで今回やりたいことは実現出来ます。

input.json
{
    "commandA": ["aggregate.sh", "user_log", "2020-12-01"],
    "commandB": ["aggregate.sh", "event_log", "2020-12-01"]
}

タスク定義は以下のようなイメージ。

ecs-run-task.ts
const ecsRunTaskA = new sfnTask.EcsRunTask(this, 'EcsRunTaskA', {
  cluster: cluster,
  launchTarget: new sfnTask.EcsFargateLaunchTarget,
  taskDefinition: taskDefinition,
  resultPath: '$.result',
  integrationPattern: sfn.IntegrationPattern.RUN_JOB,
  containerOverrides: [
    {
      containerDefinition: container,
      command: sfn.JsonPath.listAt('$.commandA'),
    }
  ],
})
const ecsRunTaskB = new sfnTask.EcsRunTask(this, 'EcsRunTaskB', {
  cluster: cluster,
  launchTarget: new sfnTask.EcsFargateLaunchTarget,
  taskDefinition: taskDefinition,
  resultPath: '$.result',
  integrationPattern: sfn.IntegrationPattern.RUN_JOB,
  containerOverrides: [
    {
      containerDefinition: container,
      command: sfn.JsonPath.listAt('$.commandB'),
    }
  ],
})

とはいえ、実行時に与えるInputPathが冗長に見えますね。
出来れば

input.json
{
    "target": "2020-12-01"
}

だけ与えて、各タスク定義でInputPathを加工してコマンドを組み立てたい。

InputPathを加工する

ようやく本題です。

失敗例

試したもののうまくいかなかったパターンです

JsonPath.stringAt()で値を取り出してstring配列を直接定義する

JsonPathにはstringAtというメソッドもあり、単一の値を取り出せます。

ecs-run-task.ts
// targetの値を取り出す
const target = sfn.JsonPath.stringAt('$.target')
const ecsRunTask = new sfnTask.EcsRunTask(this, 'EcsRunTask', {
  cluster: cluster,
  launchTarget: new sfnTask.EcsFargateLaunchTarget,
  taskDefinition: taskDefinition,
  containerOverrides: [
    {
      containerDefinition: container,
      // ここで取り出したtargetをarrayのメンバとして渡す
      command: ['aggregate.sh', 'user_log', target],
    }
  ],
})

この記述はエクスポート時に以下のようなエラーになります。

Error: Cannot use JsonPath fields in an array, they must be used in objects

"JsonPathで取り出した値は配列の内部では使えない"というエラーメッセージなので、仕様ですというお話ですね。

固定値タイプで直接配列を定義し、JsonPath形式の記述を埋め込む

ecs-run-task.ts
const ecsRunTask = new sfnTask.EcsRunTask(this, 'EcsRunTask', {
  cluster: cluster,
  launchTarget: new sfnTask.EcsFargateLaunchTarget,
  taskDefinition: taskDefinition,
  containerOverrides: [
    {
      containerDefinition: container,
      // '$.target'というJsonPath定義を直接書く
      command: ['aggregate.sh', 'user_log', '$.target'],
    }
  ],
})

これはエクスポートには成功しますが、出力されるタスク定義が"Command":["aggregate.sh","user_log","$.target"というような形式になります。

問題はCommandとなっている部分で、Command.$という出力にならないため、このステートマシンを実行してもJsonPathから展開してくれず、固定値の$.targetという文字列が引数に渡されます。

どういった条件でCommandCommand.$に変換されて出力されるのかは分かりませんでした。

EvaluateExpressionを使う

これが成功したパターンです。

CDKにはEvaluateExpressionというタスクが定義されています。

このタスクを構築するとInputPathを引数に取りevalした結果を返してくれるLambda Functionがデプロイされ、Lambda実行タスク定義が生成されます。

evaluate-task.ts
const evaluateTask = new sfnTask.EvaluateExpression(this, 'EvaluateTask', {
  expression: '[ "aggregate.sh", "user_log", $.target ]',
  resultPath: '$.command'
})

expressionがevalされる式です。
実行時にこのタスクのInputPathに$.targetが含まれていれば、LambdaFunctionによって展開、resultPathで指定したパスに評価結果が格納され、OutputPathとして次のタスクに渡されます。

このソースをエクスポートした際に出力されるタスク定義は以下のようなものになります。

task.json
{
  "Next":"EvaluateTask",
  "Type":"Task",
  "ResultPath":"$.command",
  "Resource":"arn:aws:lambda:ap-northeast-1:123456789012:function:xxx",
  "Parameters":{
    "expression":"[\"aggregate.sh\", \"user_log\", $.target ]",
    "expressionAttributeValues":{
      "$.target.$":"$.target"
    }
  }
}

展開されるLambda Functionは以下のような感じ。これはCDKが自動生成してくれます。

index.js
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.handler = void 0;
function escapeRegex(x) {
    return x.replace(/[-\/\\^$*+?.()|[\]{}]/g, '\\$&');
}
async function handler(event) {
    console.log('Event: %j', event);
    const expression = Object.entries(event.expressionAttributeValues)
        .reduce((exp, [k, v]) => exp.replace(new RegExp(escapeRegex(k), 'g'), JSON.stringify(v)), event.expression);
    console.log(`Expression: ${expression}`);
    return eval(expression);
}
exports.handler = handler;

ざっくりとは、expressionの文字列に対してexpressionAttributeValuesに配列で定義された実行時パラメータをreplaceしていき、最後にevalしています。
expressionAttributeValuesの抽出はCDKがやってくれているようですが、正確な抽出ルールは不明です。

具体的にハマった箇所としては、最初

expression: '["aggregate.sh", "user_log", $.target]',

と書いているとexpressionAttributeValuesが$.target]として抽出されてしまい、期待した変換が行われませんでした。
単純な抽出条件っぽいので区切り文字扱いと思われるスペースを挟むことで回避しています。

というわけで、結論としては以下のようなタスク定義で期待通りの結果を得ることができました。

input.json
{
    "target": "2020-12-01"
}
ecs-run-task.ts
const evaluateTaskA = new sfnTask.EvaluateExpression(this, 'EvaluateTaskA', {
  expression: '[ "aggregate.sh", "user_log", $.target ]',
  resultPath: '$.command'
})
const ecsRunTaskA = new sfnTask.EcsRunTask(this, 'EcsRunTaskA', {
  cluster: cluster,
  launchTarget: new sfnTask.EcsFargateLaunchTarget,
  taskDefinition: taskDefinition,
  resultPath: '$.result',
  integrationPattern: sfn.IntegrationPattern.RUN_JOB,
  containerOverrides: [
    {
      containerDefinition: container,
      command: sfn.JsonPath.listAt('$.command'),
    }
  ],
})
const evaluateTaskB = new sfnTask.EvaluateExpression(this, 'EvaluateTaskB', {
  expression: '[ "aggregate.sh", "event_log", $.target ]',
  resultPath: '$.command'
})
const ecsRunTaskB = new sfnTask.EcsRunTask(this, 'EcsRunTaskB', {
  cluster: cluster,
  launchTarget: new sfnTask.EcsFargateLaunchTarget,
  taskDefinition: taskDefinition,
  resultPath: '$.result',
  integrationPattern: sfn.IntegrationPattern.RUN_JOB,
  containerOverrides: [
    {
      containerDefinition: container,
      command: sfn.JsonPath.listAt('$.command'),
    }
  ],
})

const definition = evaluateTaskA
                     .next(ecsRunTaskA)
                     .next(evaluateTaskB)
                     .next(ecsRunTaskB)

EvaluateExpressionの他の使い方例はドキュメントにもあります。

単純にタスクが1つずつ増えるので今回やりたかったことに対してはちょっとやり過ぎ感もないではないですが、evalを簡単に挟めるのは色々便利な場面がありそうです。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Sign upLogin
4
Help us understand the problem. What are the problem?