Python
AWS
aws-sqs
AWS-StepFunctions
AWS-Lambda

AWS Step Functionsを用いた低信頼性処理の提案

概要

  • 時々エラーが発生したりメンテナンスで停止させる必要のある逐次処理に対して、運用負荷の低いリカバリ処理方法を提案する。

課題

動画処理のように処理時間が長くコストの大きな処理フローで、エラー復旧時の再処理のエンジニアオペレーションを簡易にしたい。

課題の背景事情

  • A処理→B処理→C処理 のような一連の処理フローがあり、数分から数時間と処理時間が長い。これを毎日数百件程度処理を行っている。
  • フローの特定のプロセスで外部のAPIを叩くのだが、その外部APIは時々エラーになる可能性があり、数時間メンテナンスに入ることがある。また、ベストエフォート運用で深夜休日になんらかの異常で停止した場合、翌営業日の復旧の可能性がある。
  • このシステムを運用するチームにシステムエンジニアは1名しかいない(つまり自分だ)。隣接システムの障害に24時間対応はしたくない。幸い、隣接システムが復旧するまで、このシステムが処理遅延しても問題はないので、隣接システム復旧後に誰でも簡単に再処理ができる方法があればよい。

従来の手法

  • 処理が失敗したらリトライをするような実装だと、隣接システムが落ちている間に何度もリトライするが、停止原因によってはリトライしてほしくない場合がある。例えば、メンテナンスで動作チェックをしているような場合である。
  • 処理が一度でも失敗したら停止するような実装の場合、上記の問題は解決できるが、都度復旧対応が必要であり復旧時にログをさかのぼって失敗した処理を特定してやり直す必要があった。頻度が多いと運用上大変になる。

課題解決の方針

  • 従来シェルスクリプトで書いていたようなフロー処理を AWS Step Functions に置き換える。Step Functions ならリトライ処理を定義できて、自分で書かなくてよい。
  • エラーになった処理を任意のタイミングで再処理するために、エラーデータをSQSキューに保管する。具体的にはメイン処理の入力パラメータをキューに保存し、再処理したいときには再度キューからパラメータを取り出してメイン処理に流せばよい
    SQSを選択した理由: Step Function 単体で任意のタイミングまで特定処理を待たせることが実現できないか検討したが、現時点の仕様では特定のタスクの実行を外部から安全にペンディングにする方法が思いつかず、 AWS Step Functionのアクティビティで疑似的な待機処理ができそうであるが、ハートビートを投げる処理が面倒。一定期間プールしてもらえるSQSのほうが運用が簡単に思えたため。

構築手順

  • AWSで 実用的なLambda関数をつくったことがあるレベルの人を前提に以降で手順を紹介する。

メイン処理の作成

実行に数秒かかりエラーで終了するテスト用Lambda関数を作成する。

  • マネジメントコンソールでLambdaを開き「関数の作成」を開く
  • 「一から作成」で
    名前=function_uncertain
    ランタイム=Python3.7
    実行ロールはAWSサービスタイプでLambda関数を指定して作成したもの。
  • デフォルトのHelloコードを以下に置き換える
lambda_function.py
import json
import logging
import os
import time

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    logger.info(json.dumps(event))
    sleeptime= int(os.environ.get("SLEEP","2")) 
    time.sleep(sleeptime)
    if (os.environ.get("BEHAVIOR") == "NG") :
        logger.error('failed function')
        raise Exception('failed!')
    else :
        return (event)
  • 環境変数に BEHAVIOR=NG を指定して保存する。
  • 適当なJSONメッセージを指定してテストしエラーが起きる動作を確認
  • コードを読めばわかると思うが、このLambda関数は環境文字列をBEHAVIOR=NG以外に設定すると成功し1、入力したJSONを返す。環境文字列にSLEEP=(スリープする秒数)を指定することでスリープ時間を指定できるので、タイムアウトの場合のデバッグもできる。
  • 後で必要になるので、右上にあるこの関数のARNをコピペしておく。

SQS エラースプールキューの作成

エラーになった処理を再処理用に保存するためのキューを作成する。

  • マネジメントコンソールのSQS画面で
    キュー名=queue_workflow_error(任意)
    キューのタイプは標準キュー
    画面下の「キューの設定」をクリック
    メッセージ保持期間=7日間
    「キューの作成」を実行
  • 後で必要になるので、キューURLとARNをコピペしておく。

エラースプール処理を行う Lambda 関数の作成

エラーになったデータをSQSに送る関数を作成する。
この部分は将来的に Step Functionで内部的に実現できそうであるが、現時点ではLambda関数で実装する2

  • マネジメントコンソールでLambdaを開き「関数の作成」を開く
  • 「一から作成」で
    名前=function_errorspool(任意)<
    ランタイム=Python3.6
    実行ロールはAWSサービスタイプでLambda関数を指定して作成したものにあとで、SQSへのSendMessageの許可を追記する
ポリシーの例
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "sqs:SendMessage",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:*:*:*",
                "arn:aws:sqs:us-west-2:xxxxxxxxxxx(SQSキューのARNを指定 )"
            ]
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "arn:aws:logs:*:*:*"
        }
    ]
}
  • デフォルトのHelloコードを以下に置き換える
lambda_function.py
import json
import boto3
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

sqsurl = "https://sqs.us-west-2.*******(SQSのURLを指定)"

def lambda_handler(event, context):
    try:
        logger.info(event)
        message = {}
        message["key"] = event["key"]
        response = boto3.client("sqs").send_message(
            QueueUrl = sqsurl,
            MessageBody = json.dumps(message)
        )
        logger.debug(response)
        return response
    except Exception as e:
        logger.error(e)
  • Lambda関数をテスト実行する場合、以下がテストデータである。
テストデータ
 {
    "key": "12345",
    "error-info": {
      "Error": "Exception",
      "Cause": "{\"errorMessage\": \"failed!\", \"errorType\": \"Exception\", \"stackTrace\": [\"  File \\\"/var/task/lambda_function.py\\\", line 15, in lambda_handler\\n    raise Exception('failed!')\\n\"]}"
    }
 }

Step Functions の設定

画面や手順は2019年1月のものである。

  • マネジメントコンソールで Step Functions を開き、「ステートマシンの作成」を行う。(初回だと「いますぐ始める」というボタン)
  • [一から作成] を選択し、
    名前=StateMachine_Uncertain(任意)
    ステートマシンの定義は以下
ステートマシン定義
{
  "Comment": "test of low quority task",
  "StartAt": "UncertainTask",
  "States": {
    "UncertainTask": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-west-2:xxxxxxxxxxxxxx(メイン関数のARNを指定)",
      "End": true,
      "Retry": [ {
          "ErrorEquals": [ "States.Timeout" ],
          "MaxAttempts": 0
        }, {
          "ErrorEquals": [ "States.ALL" ],
          "IntervalSeconds": 5,
          "MaxAttempts": 1
      } ],
      "Catch": [ {
        "ErrorEquals": [ "States.ALL" ],
        "ResultPath": "$.error-info",
        "Next": "ErrorSpool"
      } ]
    },
    "ErrorSpool": {
     "Type": "Task",
     "Resource": "arn:aws:lambda:us-west-2:xxxxxxxxxxxxxxx(エラースプール関数のARNを指定)",
       "End": true
    }
  }
}

image.png
簡単に動きを説明すると、最初にUncertainTask を実行し、タイムアウト以外のエラーでは5秒後に1度だけリトライし、それでも失敗する場合は ErrorSpool を実行する。

  • ロールの作成画面で
    「自分用のIAMロールを作成する」を選択3
    名前(ロール名のこと)=exec_myStepFunction(任意)
    AWSのドキュメントによると、ここで生成したロールを削除したら後で復元できないとのことなので、自動作成におまかせする。
    参考: Lambda ステートマシンを作成する
    stepfunctions_role.png

ステートマシンの実行

  • 先ほど作成したステートマシンで「新しい実行」を選択。
    実行名=(空白だと自動的にIDがセットされる)
    入力に以下JSONを指定
{
  "key":"12345"
}

メイン関数が必ず失敗する設定であれば、1回リトライして、元の入力値"key"をSQSに入力値を書き込んで終了していれば成功である。
マネジメントコンソールでSQSを開き、該当のキューで右クリックして「メッセージの表示/削除」を実施すると確認できる。
image.png

再処理関数の作成

  • マネジメントコンソールでLambdaを開き「関数の作成」を開く
  • 「一から作成」で
    名前=function_reprocess(任意)
    ランタイム=Python3.6
    実行ロールはAWSサービスタイプでLambda関数を指定して作成したものにあとで、SQSのメッセージ読み取り・削除、およびステートマシンの実行の許可を追記する。
ポリシーの例
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:GetQueueUrl",
                "logs:CreateLogStream",
                "sqs:ListDeadLetterSourceQueues",
                "states:ListExecutions",
                "sqs:ReceiveMessage",
                "states:StartExecution",
                "sqs:GetQueueAttributes",
                "sqs:ListQueueTags",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:sqs:us-west-2:xxxxxxxxxxxxxxxx(エラースプールのSQSキューARN)",
                "arn:aws:states:us-west-2:xxxxxxxxxxxxxx(ステートマシンのARN)",
                "arn:aws:logs:*:*:*"
            ]
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "arn:aws:logs:*:*:*"
        }
    ]
}
  • 関数の本体を以下で置き換える
lambda_function.py
import json
import boto3
import logging
import os

logger = logging.getLogger()
logger.setLevel(logging.INFO)

sqsurl = "https://sqs.us-west-2.amazonaws.com/xxxxxxxxxxxxx(エラースプールのSQSのURL)"
stateMachine_arn = "arn:aws:states:us-west-2:xxxxxxxxxxxxxx(ステートマシンのARN)"
max_reprocess = int(os.environ.get("MAX_REPROCESS","10"))

def lambda_handler(event, context):
    success_count = 0
    name = 'test-load-mikami'
    sqs = boto3.client('sqs')
    stepfunctions = boto3.client('stepfunctions')
    while max_reprocess > success_count:
        # メッセージを取得
        response = sqs.receive_message(
            QueueUrl=sqsurl,
            AttributeNames=['SentTimestamp'],
            MaxNumberOfMessages=1,
            MessageAttributeNames=['All'],
            VisibilityTimeout=1,
            WaitTimeSeconds=0
        )
        #logger.info(response)
        if 'Messages' in response:
            message = response['Messages'][0]
            receipt_handle = message['ReceiptHandle']
            body = json.loads(message['Body'])
            logger.info(body)
            exec_response = stepfunctions.start_execution(
                stateMachineArn= stateMachine_arn,
                input=json.dumps(body)
            )
            logger.info(exec_response)
            sqs.delete_message(QueueUrl=sqsurl,ReceiptHandle=receipt_handle)
            success_count +=1
        else:
            # メッセージがなくなったらbreak
            break
    return { "success": success_count}
  • 環境文字列で "MAX_REPROCESS"に数字を指定するとその数だけエラースプールキューから読み取り、ステートマシンに実行を依頼する。読み取ったメッセージは削除する。実際の処理において、大量に同時実行すると負荷が心配な場合、2くらいにして少しづつ投げる方法もあるだろう。

まとめ

  • AWS Step Function + SQSで不確実なタスクのエラーリカバリ処理を実装した。
  • (2019年1月時点)AWS Step Functionsは、まだ枯れていないので、実際にプロダクションで採用するにあたり、若干の開発リスクは覚悟すべき。そのうち修正されるとは思うが、AWSドキュメントの日本語訳のほうのソース記述が間違っている箇所があるので、どうしても動かない場合、英語と交互に参照することをお勧めする。さらに、英語版ドキュメントも、コピペしても動かないコードもあるので注意。

今後の課題

現時点ではやり方がわからないか、まだ機能実装されていないため今回できなかったが、そのうちできるかもしれない。

  • ある実行処理中に、まったく同一の処理を重複実行しないように、識別子を用いて排他制御する。例えば、特定のファイルを移動(コピーして元を削除)するような処理を同時に実行してしまうと結果が不定になるため、それを避けたい
  • Lambda関数経由ではなく、Step Functionsから直接サービス統合を用いて、JSONオブジェクトを構築しSQSにメッセージする(ステートメント言語で文字処理ができるとよい)2
  • Step Functionsの特定のタスクの実行をペンディングにして、実行処理をスプールし、障害要因が復旧したタイミングで処理を再開する方法
  • Step Functionの特定のタスクは有限リソースのようなケースで、同時に特定タスクを起動する数を制限するような方法

参考文献


  1. 厳密には環境要因がなければの話。 

  2. 2018年11月に、Step Functionsから直接SQSにメッセージを送信できるサービス統合機能が追加されたので、今回試してみたが、単一値のメッセージは送れるが、入力値を加工してJSON文字列のメッセージをつくることができなかった。仕方なく、Lambdaで実装に切り替えた。
    参考: AWS のサービス統合 

  3. 「自分用」というのは不思議な用語であるが、つまり新規にロールを作成するということ。