概要
AWS Step Functionsでバッチ処理っぽいことをする。
内容: LambdaからS3に置いてあるファイルを読み込んで、最終更新が今日より古かったら、ファイルのデータをデータベースへ放り込む。
↓の続きです
AWSでバッチ処理をするときの方法を考える
実践
##1. バケットを作成し、サンプルjsonファイルを入れておく。
jsonファイルの内容は下記のようにしました。
{
"datetime": "20170821",
"id": "0001",
"type": "donut",
"name": "Cake"
}
##2. Lambdaを作る
ベタで書いてあるS3のファイルを開いて、jsonで記録されている日時と今日の日時を比較し、
True,Falseを返します。
var aws = require('aws-sdk');
aws.config.region = 'us-east-2';
var bucket = 'xxxxxxxxxx'; //自前のS3バケット名に書き換え
var s3 = new aws.S3();
var dt = new Date();
exports.handler = function(event, context, callback) {
var params = {
Bucket: bucket,
Key: 'sample.json' //ファイル名に書き換え
};
s3.getObject(params, function(err, data) {
if (err) {
console.log(err, err.stack);
} else {
//ファイル更新時刻確認
var object = JSON.parse(data.Body.toString());
console.log("lastupdate = " + object.datetime);
var Last_update = object.datetime;
//現在時刻確認
var year = dt.getFullYear();
var month = ("0"+(dt.getMonth() + 1)).slice(-2);
var date = ("0"+dt.getDate()).slice(-2);
var Date_now = (year + month + date);
console.log("Date_now = " + Date_now);
//最終更新が過去日か確認
if (Date_now >= Last_update){
callback(null, {"update" : "True"});
}else{
callback(null, {"update" : "False"});
}
}
});
}
※Lambdaで使用するロールはS3オブジェクトの読み取り権限があるものを指定、または作成してください。
##3. Step Functinosの作成
ワークフローは下記で作成しました。
{
"Comment": "",
"StartAt": "FirstState",
"States": {
"FirstState": {
"Type": "Task",
"Resource": "arn:aws:xxxxxxxxxxxxxxxxxxxxxxx",
"Next": "ChoiceState"
},
"ChoiceState":{
"Comment": "FirstStateで呼ばれたLambdaが返した文字列によって処理を分岐させている。",
"Type" : "Choice",
"Choices":[
{
"Variable": "$.update",
"StringEquals": "True",
"Next": "TrueState"
},
{
"Variable": "$.update",
"StringEquals": "False",
"Next": "FalseState"
}
]
},
"TrueState": {
"Comment": "ChoiceStateでTrueであればこちらに流れる。",
"Type": "Pass",
"Result": "True",
"Next": "DBWriteState"
},
"FalseState": {
"Comment": "ChoiceStateでFalseであればこちらに流れる。",
"Type": "Pass",
"Result": "False",
"Next": "FinalState"
},
"DBWriteState": {
"Comment": "Trueにだった場合、ここでサンプルjsonの内容をデータベースへ書き込む処理を書く。",
"Type": "Pass",
"Result": "End",
"Next": "CheckState"
},
"CheckState": {
"Comment": "書き込み処理の後に、正しく書き込まれたか確認の処理を書く。",
"Type": "Pass",
"Result": "End",
"Next": "FinalState"
},
"FinalState": {
"Type": "Pass",
"Result": "End",
"End": true
}
}
}
作成したステートマシンはLambdaを呼び出しているので、Lambdaが使えるロールを与えます。
Lambdaが呼び出すS3やDynamoDBの読み書きロールは、Lambda側に持っているので不要です。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": "*"
}
]
}
##4. 作成したStep Functinosの動作確認
起動のjsonはデフォルトのもので動きます。
{
"Comment": "Insert your JSON here"
}
もしFirstStateで処理が失敗した場合は、LambdaのTimeOut時間を伸ばしてみてください。
##5. Step Functionsを定期実行をさせるには。
Step Functionsには定時実行の機能がないようです。
実行には下記のいづれかでStep Functionsを実行する必要があります。
・API Gateway
・Lambda(Step Functionsを呼ぶLambdaを起動する各トリガーから)
##6. 気づいたこと
・LambdaからStep Functionsへ渡すデータがあまりに大きいとエラーになります。
Lambda単体で動いている時は発生しないので注意。
・待ち合わせ処理はパラレルを使うと簡単に作れそうです。
分岐した先の処理が全て完了するまでは次のステップに進みません。
・同じ処理を作るにも、部品の使い方でいくつものパターンがありそうです。
事前にステートマシンのテンプレートを動かして、部品の機能をよく確認しておくとスムーズに構築できます。
#Step Functionsをもっと簡単に作ってみる
AWS SAMでStep Functionsを作ってみる←検証中
ServerlessFrameworkでStep Functionsを作ってみる←検証中