#はじめに
Step Functionsで(以下SF)、人による処理が介在するワークフローをどう実現するのか。
免許証画像による年令確認フローをネタに検証したので要点をメモ。
(検証したのは2017年末ぐらいなのですが。笑)
#重要なポイント
- 非同期処理はアクティビティで実現する。アクティビティは人による処理をアクティビティとして定義する。このアクティビティは特定のSFに属さない独立したものである。
- Stateの定義で、TypeはTask、ResourceはアクティビティのARNにする。この時、非同期処理に渡したいインプットをInputPathを用いて整える。
- アクティビティのARNを指定して、処理が必要なアクティビティをポーリングする。ロングポーリングなので要件次第ではコストがかさむので、必要に応じてコールバックパターン等を用いた処理を検討する。
- ポーリングで処理対象のアクティビティがあると、"input"と"taskToken"が返ってくる。
- アクティビティで渡すinputに渡していなくても、後続のStateに前のStateから流れてきた情報を渡すことができるのが割と重要。つまりアクティビティの処理には不要だが後続に必要なデータを隠すことができる。
- taskTokenを用いて結果を伝える。この時にアクティビティのARNを指定しなくていい。この時にデータを渡すと、次のStateに渡せる。
#ユースケース: ソーシャル系アプリとかの年令確認フロー
- ユーザーはスマホアプリで免許証を撮影し保存
- スマホアプリはアプリサーバーに画像を送る
- 運営者が画像をみて年令を確認する
- OKならばユーザーの年令確認ステータスを「済」にする
###システム化の方針
- 免許証画像はS3に保存される。その際ユーザーIDのフォルダの下に置かれる
- 「シンプルなシステム」を「コスト最小化」より優先する。ここでは確認する運用者は1人から数人程度とし、それに耐えうるシステムとする。(SQSを用いたコールバックパターンはここでは採用しない)
- S3のイベントから直接SFを起動せず、その前にデータを整形している。(SFは親子関係が持てるようになったので、今ならS3イベントで親SFを開始するやり方も取れる)
#SPの作成ステップ
###アクティビティの作成
要は、アクティビティにIDを振るということ。IDはARN。確かに2年前ですね。。。
###S3 Putイベントを受けてのLambda
2年前のコード(nodejs6.10)を修正したもの。
SFに、以下を渡すためにデータ整形
- identityId (ユーザーの年令確認ステータスを更新するため。今回は触れない)
- s3bucket
- s3key
- priorityとtryCountは、もともとのSFはpriorityに応じで処理を分けたり、再度SFを開始していたため使っていた。今回不要
exports.handler = (event, context, callback) => {
var inputJson = createInputJsonFromS3Event(event);
if( ! inputJson ) {
inputJson = createInputJsonFromStepFunctionsError(event);
}
if( ! inputJson ) {
context.done(null, { "message" : "unexpected input format." } );
return;
}
const stateMachineArn = process.env.stateMachineArn;
var params = {
stateMachineArn: stateMachineArn,
input: JSON.stringify(inputJson)
};
stepfunctions.startExecution(params, (err, data) => {
if (err) {
callback(err, 'step function start error!');
} else {
callback(null, 'step function started!');
}
});
};
const createInputJsonFromStepFunctionsError = (event) => {
const error = event.error;
if( ! error ) {
return null;
}
var priority = "high";
const identityId = event.identityId;
const s3bucket = event.s3bucket;
const s3key = event.s3key;
var count = event.tryCount;
if( count ) {
count = count + 1;
} else {
count = 2;
}
var inputJson = {};
inputJson["priority"] = priority;
inputJson["identityId"] = identityId;
inputJson["s3bucket"] = s3bucket;
inputJson["s3key"] = s3key;
inputJson["tryCount"] = count;
return inputJson;
};
###Step Functionの定義
かなり簡素化したSF。もとのままだと要点が分かりにくいので記事用に再定義。
アクティビティを割り当てているStandardValidationステートの定義で、人による処理の結果をverifiedResultにマッピングしている。またidentityId, s3bucket, s3keyはそのまま後続のステートに流す設定をしている。
"ResultPath" : "$.verifiedResult",
"OutputPath" : "$",
{
"Comment" : "Id Validation Steps",
"StartAt" : "StandardValidation",
"States" : {
"StandardValidation" : {
"Type" : "Task",
"Resource" : "arn:aws:states:ap-northeast-1:xxxxxxxxxxxx:activity:StandardIdValidation",
"TimeoutSeconds" : 60,
"InputPath" : "$",
"ResultPath" : "$.verifiedResult",
"OutputPath" : "$",
"Next" : "ProcessResult",
"Catch" : [{
"ErrorEquals" : ["States.ALL"],
"ResultPath" : "$.error",
"Next" : "FailedWithError"
}]
},
"ProcessResult" : {
"Type" : "Task",
"Resource" : "arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:function:mine-move-validated-file",
"InputPath" : "$",
"ResultPath" : "$.movedResult",
"OutputPath" : "$",
"End" : true
},
"FailedWithError" : {
"Type" : "Fail",
"Error" : "$.error",
"Cause" : "Validation end with error"
}
}
}
###アクティビティの結果を処理するLambda
ここではいきなり画像を削除せず、別のバケットに移動している。一定期間後に削除するような運用を考慮していた。
もともとのSFでは、ID検証OKとNGで別々のStateがあって、OKの場合はユーザーの検証ステータスを更新するLambdaを割り当てている。ちなみにユーザーはCognitoで管理。要望があればそこらへんも記事化。
exports.handler = (event, context, callback) => {
console.log("event: " + JSON.stringify(event));
var destinationBucket = bucketForVerificationNG;
var verifiedResult = event.verifiedResult;
if( ! verifiedResult ) {
context.done("no verifiedResult parameter!");
return;
}
if( verifiedResult.isVerified == true ) {
destinationBucket = bucketForVerificationOK;
}
var identityId = event.identityId;
var sourceBucket = event.s3bucket;
var sourceKey = decodeURIComponent(event.s3key.replace(/\+/g, ' '));
var sourceDir = identityId;
var comps = sourceKey.split("/");
var filename = comps[comps.length-1];
var params = {
CopySource: sourceBucket + "/" + sourceKey,
Bucket: destinationBucket + "/" + sourceDir,
Key: filename
};
s3.copyObject( params ).promise()
.then( function(data) {
console.log("copy!: " + data);
var deleteParams = {
Bucket: sourceBucket,
Key: sourceKey
};
return s3.deleteObject(deleteParams).promise();
})
.then( function(data) {
console.log("deleteObject data: " + JSON.stringify(data));
context.done(null, {});
})
.catch( function(err) {
console.log("err: " + err);
context.done(err);
});
};
#人による処理(アクティビティ)の実装ステップ
人による処理はWebシステムとして作成する。画像の取得と年令判定結果をPOSTするAPIをAPI Gatewayを用いて作成した。実際の処理はLambdaが行う。API Gatewayまわりはここでは割愛するが今は良記事があるのではないか?
###画像の取得
この処理で、オープンなアクティビティを取得している。
ただし、inputからs3の画像を特定し、その画像をダウンロードするためのpresigned-urlをidFileUrlとして作成している。
クライアントに返すのは、taskTokenとidFileUrl。
getActivityTaskのロングポーリング課題に多少対応するため、SFでステータスが"RUNNING"のものがあるか確認している。今回のSFでは他のStateの処理は時間がかからないので、"RUNNING"のものがある場合は、オープンなアクティビティがあると推測できる。実際にはすべてのアクティビティが払い出されていても、未完了の状態が存在する。その場合はロングポーリングの待ち状態が発生する。
makeJsonResponse()は、このLambdaはAPI Gatewayから呼ばれたものなので、フォーマットを整えている関数。ここでは割愛する。
exports.idValidationTask = (event, context, callback) => {
var stateMachineArn = process.env.stateMachineArn;
var activityArnString = process.env.activityArnString;
var listParams = {
stateMachineArn: stateMachineArn,
maxResults: 0,
statusFilter: "RUNNING"
};
stepfunctions.listExecutions( listParams ).promise()
.then( ( data ) => {
console.log("list: " + JSON.stringify(data)); // successful response
var executions = data.executions;
if( executions == undefined || executions.length == 0 ) {
var json = {};
json["message"] = "no activity";
var result = makeJsonResponse(json);
context.done(null, result);
return null;
}
var taskParams = {
activityArn: activityArnString
};
return stepfunctions.getActivityTask( taskParams ).promise();
})
.then( (data) => {
console.log("util.inspect(data..." + util.inspect(data, false, null));
if (data === null) {
console.log("data is null");
let json = {};
json["message"] = "no activity";
let result = makeJsonResponse(json);
context.done(null, result);
} else if (Object.keys(data).length === 0) {
console.log("activity is empty object!");
let json = {};
json["message"] = "no activity";
let result = makeJsonResponse(json);
console.log("result: " + result);
context.done(null, result);
return;
} else {
var taskToken = data.taskToken;
var inputString = data.input;
var input = JSON.parse(inputString);
var s3bucket = input.s3bucket;
var s3key = input.s3key;
var decodedKey = decodeURIComponent(s3key.replace(/\+/g, ' '));
let params = {
Bucket: s3bucket,
Key: decodedKey,
Expires: 3600
};
s3.getSignedUrl('getObject', params, (err, url) => {
if( err ) {
callback(null, null);
return;
}
let json = {};
json["taskToken"] = taskToken;
json["idFileUrl"] = url;
let result = makeJsonResponse(json);
callback(null, result);
});
}
})
.catch( function(err) {
console.log(err, err.stack); // an error occurred
});
};
###年令判定結果をPOST
taskTokenを渡して、sendTaskSuccess()を呼んで判定処理を完了させている。
ポイントは、taskTokenと一緒に渡すoutputに必要な情報を渡しているところ。idTypeは免許証などのidの種類、isVerifiedは年令判定結果(true/false)
var params = {
output: JSON.stringify(taskOutput),
taskToken: taskToken
};
API Gatewayのためにフォーマットを整えている関数は割愛している。
exports.idValidationResult = (event, context, callback) => {
var checkResult = parameterCheck_idValidationResult(event);
var responseJson = {};
if( checkResult.error ) {
responseJson = makeJsonResponseFromError(checkResult.error);
context.done(null, responseJson);
return;
}
var inputJson = checkResult.inputJson;
var idType = inputJson.idType;
var taskToken = inputJson.taskToken;
var isVerified = inputJson.isVerified;
var taskOutput = {
idType: idType,
isVerified: isVerified
};
if( isVerified ) {
var birthdayJson = inputJson.birthday;
taskOutput["birthday"] = birthdayJson;
var gender = inputJson.gender;
if( gender ) {
taskOutput["gender"] = gender;
}
}
var params = {
output: JSON.stringify(taskOutput),
taskToken: taskToken
};
stepfunctions.sendTaskSuccess(params).promise()
.then( function(data) {
responseJson = makeJsonResponse({message: "done!"});
context.done(null, responseJson);
})
.catch( function(err) {
console.log("err: " + err);
responseJson = makeJsonResponseFromError(err);
context.done(null, responseJson);
});
};
###残りの作業
後は、API Gatewayで定義しているAPIを用いてWebフロントを作成する。
API Gatewayでアクセスするためのログイン処理も必要。運用担当者が数人レベルであれば、Cognitoのコンソールでユーザー登録とパスワードを設定しておけばいい。
WebフロントのリソースはS3の静的ウェブサイト十分かと。自分しか担当者がいなければローカルでいい。
###ユーザーの年令確認ステータスの更新について
SFの開始の時にあった入力を、ずっと後続のStateに流している。
isVerifiedがtrueの場合は、identityIdをもとにユーザーリソースを特定し、年令確認ステータスを更新する。
謝辞
聖なる夜@マクドナルド
15日になったので投稿します!