概要
Step Functionの理解を深める。
Lambda を使用してループを反復するをCDKで書いてみた。
callbackとreturnの書き換え
例では戻り値をcallbackに渡しているが、冗長なのでreturnで返す形に書き換えた。
(Node.js の AWS Lambda 関数ハンドラー)
以下の2つのソースは等価となる。
exports.handler = (event, context, callback) => {
console.log(event);
const Payload = { message: "hello", firstEvent: event }
callback(null, { Payload });
};
exports.handler = async (event) => {
console.log(event);
const Payload = { message: "hello", firstEvent: event }
return { Payload }
};
なお、以下のソースは等価ではない(asyncがない)。この場合、出力のPayloadがnull
となる。
exports.handler = (event) => {
console.log(event);
const Payload = { message: "hello", firstEvent: event }
return { Payload }
};
Lambda を使用してループを反復する を cdkで書いてみる
公式のシンプルなループを試す
LambdaInvoke
でオプションを設定しないとシンプルな定義JSONにならないので注意。*
import * as cdk from 'aws-cdk-lib';
import * as stepfunc from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import { aws_events, aws_events_targets } from 'aws-cdk-lib';
import { Construct } from 'constructs';
import { Choice, Condition, DefinitionBody, Pass, Result } from 'aws-cdk-lib/aws-stepfunctions';
interface StepFunctionSampleStackProps extends cdk.StackProps { }
export class StepFunctionSampleStack extends cdk.Stack {
constructor(scope: Construct, id: string, props: StepFunctionSampleStackProps) {
super(scope, id, props);
const lambdaParamsDefault = {
runtime: lambda.Runtime.NODEJS_20_X,
handler: 'index.handler',
timeout: cdk.Duration.seconds(10),
}
const iteratorFunction = new lambda.Function(this, 'iteratorFunction', {
...lambdaParamsDefault,
code: lambda.Code.fromInline(`
exports.handler = async (event) => {
let index = event.iterator.index
const step = event.iterator.step
const count = event.iterator.count
index = index + step
return { index, step, count, continue: index < count };
};
`),
});
const configureCount = new Pass(this, 'ConfigureCount', {
result: Result.fromObject({ count: 10, index: 0, step: 1 }),
resultPath: '$.iterator',
});
const iteratorJob = new tasks.LambdaInvoke(this, 'Iterator', {
lambdaFunction: iteratorFunction,
resultPath: '$.iterator',
payloadResponseOnly: true, // デフォルトfalse。falseではParamtersが自動設定される
retryOnServiceExceptions: false // デフォルトtrue。trueではリトライ2回が自動設定される
});
const exampleWork = new Pass(this, 'ExampleWork', {
comment: "Your application logic, to run a specific number of times",
result: Result.fromObject({ success: true }),
resultPath: '$.result',
});
const isCountReached = new Choice(this, 'IsCountReached');
const condition1 = Condition.booleanEquals('$.iterator.continue', true);
// StateMachine
const definition = configureCount.next(iteratorJob).next(
isCountReached.when(condition1, exampleWork.next(iteratorJob)).otherwise(new Pass(this, 'Done'))
);
const stateMachine = new stepfunc.StateMachine(this, 'StepFunctionSampleStateMachine', {
comment: "Iterator State Machine Example",
definitionBody: DefinitionBody.fromChainable(definition),
});
// EventBridge Rule1(毎時5分に起動)
new aws_events.Rule(this, 'rule1', {
ruleName: 'sampleRule1',
schedule: aws_events.Schedule.cron({
minute: '5',
}),
targets: [new aws_events_targets.SfnStateMachine(stateMachine)],
});
}
}
入力のtimeを書き換えてみる
{"time": "2000-01-01T00:00:00Z"}
をinputに渡すとする。
これをループごとに+1日してみる。
const iteratorFunction = new lambda.Function(this, 'iteratorFunction', {
...lambdaParamsDefault,
code: lambda.Code.fromInline(`
exports.handler = async (event) => {
let index = event.iterator.index
const step = event.iterator.step
const count = event.iterator.count
index = index + step
- return { index, step, count, continue: index < count };
+ const iterator = { index, step, count, continue: index < count }
+ const dt = new Date(event.time);
+ dt.setDate(dt.getDate() + 1);
+ return { iterator, time: dt.toISOString() };
};
`),
});
const iteratorJob = new tasks.LambdaInvoke(this, 'Iterator', {
lambdaFunction: iteratorFunction,
- resultPath: '$.iterator',
payloadResponseOnly: true,
retryOnServiceExceptions: false
});
出力の引き回し
ソースコード
resultPath
を設定することで、各関数の実行結果を引き回すことができる。
returnしない場合はnull
となる。
const firstFunction = new lambda.Function(this, 'FirstFunction', {
...lambdaParamsDefault,
code: lambda.Code.fromInline(`
exports.handler = async (event) => {
console.log(event);
return 'first';
};
`),
});
const secondFunction = new lambda.Function(this, 'SecondFunction', {
...lambdaParamsDefault,
code: lambda.Code.fromInline(`
exports.handler = async (event) => {
console.log(event);
};
`),
});
// definite state machine
const firstJonb = new tasks.LambdaInvoke(this, 'UpstreamTask', {
lambdaFunction: firstFunction,
resultPath: '$.hoge',
payloadResponseOnly: true,
});
const secondJob = new tasks.LambdaInvoke(this, 'MainFunctionTask', {
lambdaFunction: secondFunction,
resultPath: '$.huga',
payloadResponseOnly: true,
});
なお、今のソースだとループごとに結果はリセットされる。(iteratorのLambdaのreturnが引き回しを想定していない)
並列
const parallel = new Parallel(this, 'parallel');
parallel.branch(firstJonb.next(secondJob));
parallel.branch(thirdJob);
const loopJob = exampleWork
.next(parallel)
.next(
new Pass(this, 'filter', {
inputPath: '$[0]',
}),
)
.next(iteratorJob);
参考
cdk - state machine
cdk -choice
cdkを使ったStepFunctionsでのループ込実装手続きを振り返りつつ書いてみた
[AWS CDK] Step FunctionsステートマシンでLambdaInvokeでの”payloadResponseOnly”の指定による動作の違いを確認してみた