2020-10-23 追記
StepFuntionにAthenaが統合され、ステートマシーンから直接AthenaのAPIを呼べるようになった。
これでLambdaを実装しなくてもクエリーを実行したりできる。
Call Athena with Step Functions | AWS Step Functions Developer Guide
AWS SAMを見ていたら AWS::Serverless::StateMachine
というStepFunctionのコンポーネントがあったので、それを使ってAthenaの集計処理を実装してみた。
実際は集計期間に応じて処理を分岐してリトライ間隔など調整できるようにしたが、ここではシンプルにした。
余談:VSCode + AWS Toolkit
*.asl.json
ファイルを編集するとlintやグラフの描画をリアルタイムに行ってくれるので便利だった。
できれば Fail
タスクはもっと軽く扱って欲しい。
やりたいこと
下記の5タスクをステートマシーンにした。
この処理を週次/月次で実行する。
- 集計期間の計算
- Athenaへのクエリー
- クエリー進捗の確認/待機
- クエリー結果の加工
- S3への保存
StepFunctionでやるメリット
-
各タスクがシンプルになる
タスクはLambda(node.js)で実装したが、1つのファンクションで1つのことしかしないので各ファンクションがシンプルになる。 -
コードの分岐がなくなる
Choise
など使うとステートマシーン上で分岐できるので、コード上ではその分岐がなくなる。 -
コードのリトライ処理がなくなる
エラー時のリトライやクエリー完了まで待機するのにエクスポネンシャルバックオフを使ったりできる。こういった処理を自分で実装する必要がなくなる。 -
テストがしやすい
各Lambdaは処理が小さくなるのでローカルでのテストもしやすくなる。
StepFunction自体が入出力の管理をしっかりしないといけないサービスなので、当然各Lambdaの入出力も分かりやすくせざるを得なくなる。
また、AWS上でステートマシーンを実行すると各タスクの入出力と例外が表示されるので、どこでどういう結果になっているのか把握しやすい。
初期化
AWS SAMにはStepFunctionを利用したテンプレートが用意されているので、それを元に開発した。
sam init
で下記のように選択していく。
$ sam init
Which template source would you like to use?
1 - AWS Quick Start Templates
2 - Custom Template Location
Choice: 1
Which runtime would you like to use?
1 - nodejs12.x
2 - python3.8
3 - ruby2.7
4 - go1.x
5 - java11
6 - dotnetcore3.1
7 - nodejs10.x
8 - python3.7
9 - python3.6
10 - python2.7
11 - ruby2.5
12 - java8
13 - dotnetcore2.1
Runtime: 1
Project name [sam-app]:
Cloning app templates from https://github.com/awslabs/aws-sam-cli-app-templates.git
AWS quick start application templates:
1 - Hello World Example
2 - Step Functions Sample App (Stock Trader)
3 - Quick Start: From Scratch
4 - Quick Start: Scheduled Events
5 - Quick Start: S3
6 - Quick Start: SNS
7 - Quick Start: SQS
8 - Quick Start: Web Backend
Template selection: 2
-----------------------
Generating application:
-----------------------
Name: sam-app
Runtime: nodejs12.x
Dependency Manager: npm
Application Template: step-functions-sample-app
Output Directory: .
Next steps can be found in the README file at ./sam-app/README.md
この時点でこのような構成になっている。
functions
以下のLambdaを適当に追加・削除し、stock_trader.asl.json
と template.yaml
を編集していく。
.
└── sam-app/
├── functions/
│ ├── stock-buyer/
: : :
│ │ ├── app.js
│ │ └── package.json
│ ├── stock-checker/
: : :
│ │ ├── app.js
│ │ └── package.json
│ └── stock-seller/
: :
│ ├── app.js
│ └── package.json
├── statemachine/
│ └── stock_trader.asl.json
:
└── template.yaml
最終的にはこうなった。
.
└── sam-app/
├── functions/
│ ├── aggregation/
: : :
│ │ ├── app.js
│ │ └── package.json
│ ├── period-calculation/
: : :
│ │ ├── app.js
│ │ └── package.json
│ ├── store-result/
: : :
│ │ ├── app.js
│ │ └── package.json
│ ├── transform/
: : :
│ │ ├── app.js
│ │ └── package.json
│ └── wait-aggregation/
: :
│ ├── app.js
│ └── package.json
├── statemachine/
│ └── athenaquery.asl.json
:
├── samconfig.toml
└── template.yaml
ステートマシーン
全体は省略するが、例えばAthenaのクエリーが完了するのを待つタスクはこのような定義になった。
Parameters
の QueryExecutionId.$
が対象クエリーを特定するIDで、前工程のタスクの出力を入力値として受け取っている。
Retry
のところで InProgressError
を受け取ってエクスポネンシャルバックオフでリトライするよう定義している。Lambdaのコードではステータスを確認して実行中であれば InProgressError
をスローするだけ。長時間実行してクエリー完了を待つ必要がないのでタイムアウトの心配もない。
"WaitAggregation": {
"Type": "Task",
"Resource": "${WaitAggregationArn}",
"Parameters": {
"Source.$": "$$.Execution.Input",
"QueryExecutionId.$": "$.QueryExecutionId"
},
"Next": "Transform",
"Retry": [
{
"ErrorEquals": [
"States.TaskFailed"
],
"IntervalSeconds": 3,
"MaxAttempts": 5,
"BackoffRate": 1
},
{
"ErrorEquals": [
"InProgressError"
],
"IntervalSeconds": 5,
"MaxAttempts": 10,
"BackoffRate": 1.2
}
],
"Catch": [
{
"ErrorEquals": [
"InvalidRequestException"
],
"Next": "Fail"
}
]
}
テンプレート
CloudFormationテンプレートの抜粋。
デフォルトだとStepFunctionのStandardWorkflowで構築される。
DefinitionSubstitutions
に設定した値は、ステートマシーンの定義で ${WaitAggregationArn}
のようにプレースホルダーとして利用できる。
Events
のところでEventBridgeのスケジュールも同時に定義している。
UTCで毎週土曜日15:05(JSTで毎週日曜00:05)と毎月月末15:05(JSTで毎月1日00:05)。
また Input
のところでステートマシーンの初期入力値を渡している。
さきほどのステートマシーンの定義にある $$.Execution.Input
にこの値が含まれる。
AthenaQueryStateMachine:
Type: AWS::Serverless::StateMachine
Properties:
DefinitionUri: statemachine/athenaquery.asl.json
DefinitionSubstitutions:
PeriodCalculationArn: !GetAtt PeriodCalculation.Arn
AggregationArn: !GetAtt Aggregation.Arn
WaitAggregationArn: !GetAtt WaitAggregation.Arn
TransformArn: !GetAtt Transform.Arn
StoreResultArn: !GetAtt StoreResult.Arn
Events:
Weekly:
Type: Schedule
Properties:
Description: Schedule to run the user ranking state machine every week
Enabled: True
Input: !Sub '{"IntervalUnit":"Weekly","Env":"${Environment}"}'
Schedule: "cron(5 15 ? * SAT *)"
Monthly:
Type: Schedule
Properties:
Description: Schedule to run the user ranking state machine every month
Enabled: True
Input: !Sub '{"IntervalUnit":"Monthly","Env":"${Environment}"}'
Schedule: "cron(5 15 L * ? *)"
Policies:
- LambdaInvokePolicy:
FunctionName: !Ref PeriodCalculation
- LambdaInvokePolicy:
FunctionName: !Ref Aggregation
- LambdaInvokePolicy:
FunctionName: !Ref WaitAggregation
- LambdaInvokePolicy:
FunctionName: !Ref Transform
- LambdaInvokePolicy:
FunctionName: !Ref StoreResult
Lambda
Athena関連のタスクだけ抜粋。
Aggregation
ここでAthenaのクエリーを実行している。
startQueryExecution
を実行すると QueryExecutionId
が返ってくるので、それを次タスクに渡す。
const Athena = require("aws-sdk/clients/athena");
const athena = new Athena();
exports.lambdaHandler = async (event, context) => {
const env = event.Source.Env;
const query = "select ...";
const params = {
QueryString: query,
QueryExecutionContext: {
Database: `database_${env}`,
},
ResultConfiguration: {
OutputLocation: `s3://athena-query-result-${env}/`,
},
WorkGroup: env,
};
const result = await athena.startQueryExecution(params).promise();
const qid = result.QueryExecutionId;
const ret = {
QueryExecutionId: qid,
};
return ret;
}
WaitAggregation
QueryExecutionId
を受け取りクエリーが完了するまで監視する。
完了したら結果を取得して次のタスクに渡している。
前述の通り InProgressError
が出ている間は何度も実行されるタスク。
今回は結果セットが小さかったので直接次のタスクに渡した。サイズが大きい場合は QueryExecutionId
だけ渡すなり、別のDBを経由するなりしないといけないかもしれない。
getQueryExecution(QueryExecutionId)
で実行情報を取得し、ステータスを見て完了していれば getQueryResults(QueryExecutionId)
で結果を取得する。
const Athena = require("aws-sdk/clients/athena");
const athena = new Athena();
class InProgressError extends Error {
constructor(...params) {
super(...params);
this.name = this.constructor.name;
}
}
exports.lambdaHandler = async (event, context) => {
const qid = event.QueryExecutionId;
const params = { QueryExecutionId: qid };
const exec = await athena.getQueryExecution(params).promise();
const list = [];
const status = exec.QueryExecution.Status.State;
switch (status) {
case "QUEUED":
case "RUNNING":
throw new InProgressError("InProgress");
break;
case "FAILED":
throw new Error("Failed");
break;
case "CANCELLED":
throw new Error("Canceled");
break;
default:
const result = await athena.getQueryResults(params).promise();
const rs = result.ResultSet;
const fields = rs.Rows.shift();
for (const row of rs.Rows) {
const processed = processRow(fields, row);
list.push(processed);
}
break;
}
const ret = {
QueryResult: list,
};
return ret;
}
デプロイ
初回デプロイは sam deploy -g
で行う。
設定を保存すると samconfig.toml
が生成され、次回からは sam deploy
だけでデプロイできる。
sam build
sam deploy -g