3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Step Functionsで作成する、人による処理が介在するワークフロー(その1)

Posted at
1 / 2

#はじめに
Step Functionsで(以下SF)、人による処理が介在するワークフローをどう実現するのか。
免許証画像による年令確認フローをネタに検証したので要点をメモ。
(検証したのは2017年末ぐらいなのですが。笑)

#重要なポイント

  1. 非同期処理はアクティビティで実現する。アクティビティは人による処理をアクティビティとして定義する。このアクティビティは特定のSFに属さない独立したものである。
  2. Stateの定義で、TypeはTask、ResourceはアクティビティのARNにする。この時、非同期処理に渡したいインプットをInputPathを用いて整える。
  3. アクティビティのARNを指定して、処理が必要なアクティビティをポーリングする。ロングポーリングなので要件次第ではコストがかさむので、必要に応じてコールバックパターン等を用いた処理を検討する。
  4. ポーリングで処理対象のアクティビティがあると、"input"と"taskToken"が返ってくる。
  5. アクティビティで渡すinputに渡していなくても、後続のStateに前のStateから流れてきた情報を渡すことができるのが割と重要。つまりアクティビティの処理には不要だが後続に必要なデータを隠すことができる。
  6. taskTokenを用いて結果を伝える。この時にアクティビティのARNを指定しなくていい。この時にデータを渡すと、次のStateに渡せる。

#ユースケース: ソーシャル系アプリとかの年令確認フロー

  1. ユーザーはスマホアプリで免許証を撮影し保存
  2. スマホアプリはアプリサーバーに画像を送る
  3. 運営者が画像をみて年令を確認する
  4. OKならばユーザーの年令確認ステータスを「済」にする

###システム化の方針

  • 免許証画像はS3に保存される。その際ユーザーIDのフォルダの下に置かれる
  • 「シンプルなシステム」を「コスト最小化」より優先する。ここでは確認する運用者は1人から数人程度とし、それに耐えうるシステムとする。(SQSを用いたコールバックパターンはここでは採用しない)
  • S3のイベントから直接SFを起動せず、その前にデータを整形している。(SFは親子関係が持てるようになったので、今ならS3イベントで親SFを開始するやり方も取れる)

###システム概要図

#SPの作成ステップ
###アクティビティの作成
要は、アクティビティにIDを振るということ。IDはARN。確かに2年前ですね。。。

###S3 Putイベントを受けてのLambda
2年前のコード(nodejs6.10)を修正したもの。
SFに、以下を渡すためにデータ整形

  • identityId (ユーザーの年令確認ステータスを更新するため。今回は触れない)
  • s3bucket
  • s3key
  • priorityとtryCountは、もともとのSFはpriorityに応じで処理を分けたり、再度SFを開始していたため使っていた。今回不要
exports.handler = (event, context, callback) => {
    
    var inputJson = createInputJsonFromS3Event(event);
    if( ! inputJson ) {
        inputJson = createInputJsonFromStepFunctionsError(event);
    }
    if( ! inputJson ) {
        context.done(null, { "message" : "unexpected input format." } );
        return;
    }
    
    const stateMachineArn = process.env.stateMachineArn;

    var params = {
        stateMachineArn: stateMachineArn,
        input:           JSON.stringify(inputJson)
    };
    
    stepfunctions.startExecution(params, (err, data) => {
        if (err) { 
            callback(err, 'step function start error!');
        } else {
            callback(null, 'step function started!');
        }
    });
};

const createInputJsonFromStepFunctionsError = (event) => {
    const error = event.error;
    if( ! error ) {
        return null;
    }
    var priority   = "high";
    const identityId = event.identityId;
    const s3bucket   = event.s3bucket;
    const s3key = event.s3key;
    var count = event.tryCount;
    if( count ) {
        count = count + 1;
    } else {
        count = 2;
    }
    var inputJson = {};
    inputJson["priority"]   = priority;
    inputJson["identityId"] = identityId;
    inputJson["s3bucket"]   = s3bucket;
    inputJson["s3key"]      = s3key;
    inputJson["tryCount"]   = count;
    return inputJson;
};

###Step Functionの定義
かなり簡素化したSF。もとのままだと要点が分かりにくいので記事用に再定義。
アクティビティを割り当てているStandardValidationステートの定義で、人による処理の結果をverifiedResultにマッピングしている。またidentityId, s3bucket, s3keyはそのまま後続のステートに流す設定をしている。

      "ResultPath" : "$.verifiedResult",
      "OutputPath" : "$",
{
  "Comment" : "Id Validation Steps",
  "StartAt" : "StandardValidation",
  "States"  : {
  	"StandardValidation" : {
      "Type" :     "Task",
      "Resource" : "arn:aws:states:ap-northeast-1:xxxxxxxxxxxx:activity:StandardIdValidation",
      "TimeoutSeconds" : 60,
      "InputPath" : "$",
      "ResultPath" : "$.verifiedResult",
      "OutputPath" : "$",
      "Next" : "ProcessResult", 
      "Catch" : [{
        "ErrorEquals" :   ["States.ALL"],
        "ResultPath" :   "$.error",
        "Next" :         "FailedWithError"
      }]
    },
    "ProcessResult" : {
      "Type" : "Task",
      "Resource" : "arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:function:mine-move-validated-file",
      "InputPath" : "$",
      "ResultPath" : "$.movedResult",
      "OutputPath" : "$",
      "End" : true
    },
    "FailedWithError" : {
      "Type" : "Fail",
      "Error" : "$.error",
      "Cause" : "Validation end with error"
    }
  }
}

###アクティビティの結果を処理するLambda
ここではいきなり画像を削除せず、別のバケットに移動している。一定期間後に削除するような運用を考慮していた。
もともとのSFでは、ID検証OKとNGで別々のStateがあって、OKの場合はユーザーの検証ステータスを更新するLambdaを割り当てている。ちなみにユーザーはCognitoで管理。要望があればそこらへんも記事化。

exports.handler = (event, context, callback) => {
    console.log("event: " + JSON.stringify(event));
    var destinationBucket = bucketForVerificationNG;

    var verifiedResult = event.verifiedResult;
    if( ! verifiedResult ) {
        context.done("no verifiedResult parameter!");
        return;
    }
    if( verifiedResult.isVerified == true ) {
        destinationBucket = bucketForVerificationOK;
    }
    
    var identityId = event.identityId;
    var sourceBucket = event.s3bucket;
    var sourceKey = decodeURIComponent(event.s3key.replace(/\+/g, ' '));
    var sourceDir = identityId;
    
    var comps = sourceKey.split("/");
    var filename = comps[comps.length-1];
    var params = {
        CopySource: sourceBucket + "/" + sourceKey, 
        Bucket: destinationBucket + "/" + sourceDir,
        Key:  filename
    };
    
    s3.copyObject( params ).promise()
    .then( function(data) {

        console.log("copy!: " + data);
         var deleteParams = {
            Bucket: sourceBucket,
            Key: sourceKey
        };
        
        return s3.deleteObject(deleteParams).promise();
    })
    .then( function(data) {
        console.log("deleteObject data: " + JSON.stringify(data));
        context.done(null, {});
    })
    .catch( function(err) {
        console.log("err: " + err);
        context.done(err);
    });
    
};

#人による処理(アクティビティ)の実装ステップ
人による処理はWebシステムとして作成する。画像の取得と年令判定結果をPOSTするAPIをAPI Gatewayを用いて作成した。実際の処理はLambdaが行う。API Gatewayまわりはここでは割愛するが今は良記事があるのではないか?

###画像の取得
この処理で、オープンなアクティビティを取得している。
ただし、inputからs3の画像を特定し、その画像をダウンロードするためのpresigned-urlをidFileUrlとして作成している。
クライアントに返すのは、taskTokenidFileUrl

getActivityTaskのロングポーリング課題に多少対応するため、SFでステータスが"RUNNING"のものがあるか確認している。今回のSFでは他のStateの処理は時間がかからないので、"RUNNING"のものがある場合は、オープンなアクティビティがあると推測できる。実際にはすべてのアクティビティが払い出されていても、未完了の状態が存在する。その場合はロングポーリングの待ち状態が発生する。
makeJsonResponse()は、このLambdaはAPI Gatewayから呼ばれたものなので、フォーマットを整えている関数。ここでは割愛する。

exports.idValidationTask = (event, context, callback) => {

    var stateMachineArn   = process.env.stateMachineArn;
    var activityArnString = process.env.activityArnString;
    
    var listParams = {
        stateMachineArn: stateMachineArn,
        maxResults:      0,
        statusFilter:    "RUNNING" 
    };
    stepfunctions.listExecutions( listParams ).promise()
    .then( ( data ) => {
        console.log("list: " + JSON.stringify(data));           // successful response
        var executions = data.executions;
        if( executions == undefined || executions.length == 0 ) {
            var json = {};
            json["message"] = "no activity";
            var result = makeJsonResponse(json);
            context.done(null, result);
            return null;    
        }
        var taskParams = {
            activityArn: activityArnString
        };
        return stepfunctions.getActivityTask( taskParams ).promise();
    })
    .then( (data) => {
        console.log("util.inspect(data..." + util.inspect(data, false, null));
        if (data === null) {
          console.log("data is null");
            let json = {};
            json["message"] = "no activity";
            let result = makeJsonResponse(json);
            context.done(null, result);
        } else if (Object.keys(data).length === 0) { 
            console.log("activity is empty object!"); 
            let json = {};
            json["message"] = "no activity";
            let result = makeJsonResponse(json);
            console.log("result: " + result);
            context.done(null, result);
            return;
        } else {
            var taskToken = data.taskToken;
            var inputString = data.input;
            var input = JSON.parse(inputString);
            
            var s3bucket = input.s3bucket;
            var s3key = input.s3key;
            var decodedKey = decodeURIComponent(s3key.replace(/\+/g, ' '));

            let params = {
                Bucket: s3bucket,
                Key: decodedKey,
                Expires: 3600
            };
            s3.getSignedUrl('getObject', params,  (err, url) => {
              if( err ) {
                callback(null, null);
                return;
              }
              let json = {};
              json["taskToken"] = taskToken;
              json["idFileUrl"] = url;
              let result = makeJsonResponse(json);
                
              callback(null, result);
            });
        }
    })
    .catch( function(err) {
        console.log(err, err.stack); // an error occurred
        
    });

};

###年令判定結果をPOST

taskTokenを渡して、sendTaskSuccess()を呼んで判定処理を完了させている。
ポイントは、taskTokenと一緒に渡すoutputに必要な情報を渡しているところ。idTypeは免許証などのidの種類、isVerifiedは年令判定結果(true/false)

  var params = {
    output: JSON.stringify(taskOutput),
    taskToken: taskToken
  };

API Gatewayのためにフォーマットを整えている関数は割愛している。

exports.idValidationResult = (event, context, callback) => {

  var checkResult = parameterCheck_idValidationResult(event);

  var responseJson = {};

  if( checkResult.error ) {
    responseJson = makeJsonResponseFromError(checkResult.error);
    context.done(null, responseJson);
    return;
  }
  var inputJson = checkResult.inputJson;

  var idType    = inputJson.idType;
  var taskToken = inputJson.taskToken;
  var isVerified = inputJson.isVerified;
  
  var taskOutput = {
    idType: idType,
    isVerified: isVerified
  };
  if( isVerified ) {
    var birthdayJson = inputJson.birthday;
    taskOutput["birthday"] = birthdayJson;

    var gender = inputJson.gender;
    if( gender ) {
      taskOutput["gender"] = gender;
    }
  }
  var params = {
    output: JSON.stringify(taskOutput),
    taskToken: taskToken
  };
  
  stepfunctions.sendTaskSuccess(params).promise()
  .then( function(data) {
    responseJson = makeJsonResponse({message: "done!"});
    context.done(null, responseJson);
  })
  .catch( function(err) {
    console.log("err: " + err);
    responseJson = makeJsonResponseFromError(err);
    context.done(null, responseJson);
  });

};

###残りの作業
後は、API Gatewayで定義しているAPIを用いてWebフロントを作成する。
API Gatewayでアクセスするためのログイン処理も必要。運用担当者が数人レベルであれば、Cognitoのコンソールでユーザー登録とパスワードを設定しておけばいい。
WebフロントのリソースはS3の静的ウェブサイト十分かと。自分しか担当者がいなければローカルでいい。

###ユーザーの年令確認ステータスの更新について
SFの開始の時にあった入力を、ずっと後続のStateに流している。
isVerifiedがtrueの場合は、identityIdをもとにユーザーリソースを特定し、年令確認ステータスを更新する。

謝辞

聖なる夜@マクドナルド
15日になったので投稿します!

3
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?