1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Slack から AWS CLI を実行し ECS or EC2 をスケーリングする

Last updated at Posted at 2024-11-09

はじめに

何かしらサービスに障害が発生した際、大体が EC2 or ECS をスケーリングすることで問題が解消することが多くあった。
これは、アクセスが集中する想定外のイベントが発生することが原因。
当然 AutoScaling するような構成にはなっているが、それでも追いつかないほどの負荷がかかってしまうことがあり、そうなった場合は基本的には SRE の方で一気にスケーリングすることで対応していた。

しかし先日、勤務時間外に障害が発生し、初動対応が遅れてしまったことをきっかけに運用体制を見直すことになった。
そこで、休日でも外出先でも、スマホで Slack から簡単に、でも安全にスケーリングできたらいいね。という話になり作った構成のお話。

目標

  • Slack から AWS リソースを操作し EC2 or ECS をスケーリングできる
  • AWS リソース操作は最小権限で
  • 承認フロー付き
  • 実行できる Slack チャンネルは一つのみ
  • 複数 AWS アカウントのリソースを操作できる

先に構成図

初見ではなんのこっちゃ状態だと思うのでそれぞれ詳細は後述。

scaling_ec2_and_ecs_from_slack.drawio のコピー.drawio (1).png

Slack から AWS CLI を実行する

まず、Slack チャンネルに AWS Chatbot というアプリ(以後 Slack Chatbot)を入れる。

スクリーンショット 2024-10-12 10.20.13.png
 
続いて、AWS サービスの AWS Chatbot と該当チャンネルを連携してやるのみ。(連携法はここでは触れないが簡単)
すると、先ほど Slack Chatbot を入れたチャンネルから、メッセージの頭に@aws をつけると AWS CLI が実行可能になる。
どのようにして AWS API を実行しているのかというと、Slack Chatbot が AWS Chatbot に CLI コマンドを渡し、AWS Chatbot が API を実行しているイメージ。

例えば @aws lambda list-functions を実行すると以下のように返ってくる。
(マスキングしている部分は自分宛のメンションやら AWS アカウント ID、リソース名など)

スクリーンショット 2024-10-12 11.10.33.png

今回の構成では Slack から Step Functions を実行するのだが、実行系のコマンドの場合は以下のように確認が入り、[Run command] を押下すると実行される。

スクリーンショット 2024-10-12 10.52.08.png

尚、ここで実行できる API は AWS Chatbot の権限に依存するため、適切に制限すれば事故が起きることはない。
更に AWS Chatbot のガードレールのポリシーで制限しておくとより安全。
ガードレールのポリシーとは、AWS Chatbot の権限の上限のようなもので、例えばガードレールのポリシーに ReadOnlyAccess を設定すると、AWS Chatbot に AdministratorAccess を付与しても、実際に許可されるのはガードレールのポリシーで設定した ReadOnlyAccess の範囲内となる。

Step Functions を使用するメリット

AWS Chatbot の権限を制御したからといって安全というわけではなく、自由にスケーリングできてしまうと、誤ってインスタンスやタスクを0台にされてしまう恐れがある。
ただこの辺りは、例えば Lambda で『現在の台数より減らすことはできない』というような制限をかけるなど、やりようはたくさんあると思う。
今回は実際に減らしたい場合(障害が落ち着いた、DB がボトルネックになっており Web サーバが増えると障害を助長する場合など)も想定し、現在の台数より減らすような動きがある場合は制限をかけるのではなく警告文を表示するようにしている。

更に承認フローを挟むことでより安全なフローにすることができる。(現状ではセルフ承認ができる構成だが、そこを制御するほどまで厳格化していない)

Step Functions を使用するメリットとしては以下が考えられるか。
(というかそもそも Step Functions を使用しない場合、『Lambda の処理を途中で止め、承認結果を受け取ってから再開させる』ということができるのか不明)

  1. 『承認可否の結果を受け取って次の処理を走らせる』 ということが容易にできる
    1. 一意である task token というものを受け取るまで待機するということができる(後述)
    2. 承認依頼から承認可否まで、必ずしもノータイムとは限らないため、単一の Lambda だけで実装しようとするとタイムアウトする可能性がある
    3. タイムアウトを回避するために Lambda を複数に分割するのであれば、Step Functions でオーケストレーションすべき
    4. Step Functions は明示的に指定しない限りタイムアウトしない
  2. パラメータのバリデーション、処理の順番制御、エラーハンドリングなど、コードを書かなくてもある程度実装できる

最終的な Step Functions の構成は以下。
多分これだけ見ても意味がわからないので一つずつ後述する。(もっとスマートな構成があれば是非教えてください、、、)

スクリーンショット 2024-11-03 8.37.52.png

ざっと説明すると、必要なパラメータを持たせて Slack から Step Functions を実行することで、以下のフローを経て Lambda を実行しスケーリングしている。

  1. パラメータのバリデーション
    1. 必須パラメータである AWS アカウント ID の存在チェック
    2. パラメータの組み合わせの有効性チェック
      1. ECS と EC2 のパラメータが混ざっていないか
        1. ECS クラスタ名と EC2 AutoScaling グループ名が混ざっていないかなど
      2. 絶対値を指定したスケーリング用のパラメータと相対値を指定したスケーリング用のパラメータが混ざっていないか
  2. パラメータの内容によって分岐
    1. ECS or EC2
    2. 絶対値を指定したスケーリング or 相対値を指定したスケーリング
  3. 承認結果によって分岐
    1. 承認ならスケーリングを実行、拒否なら終了

パラメータのバリデーション・承認可否の結果を待機する

ここから実際に Step Functions の中身を見ていく。
今回は CloudFormation(SAM) で構築したため、template.yaml を元に説明する。
まず Step Functions 全体の記述は以下。(結構なボリュームになってしまった)

template.yaml
template.yaml
ScalingEC2AndECSFromSlackStepFunctions: 
  Type: AWS::Serverless::StateMachine
  Properties: 
    Name: !Sub ScalingEC2AndECSFromSlack${CamelShortEnv}
    Role: !GetAtt ScalingEC2AndECSFromSlackStepFunctionsRole.Arn
    Definition: 
      StartAt: CheckParameters
      States: 
        # 渡されたパラメーターが以下条件に当てはまるかチェック
        # 1. aws_account_id が存在する
        # 2. ecs_cluster_name, ecs_service_name が両方存在し、ec2_autoscaling_group_name が存在しない
        # 3. もしくは ec2_autoscaling_group_name が存在し、ecs_cluster_name, ecs_service_name の両方が存在しない
        # 4. min, max が両方存在し、scaling_unit が存在しない
        # 5. もしくは scaling_unit が存在し、min, max の両方が存在しない
        CheckParameters: 
          Type: Choice
          Choices: 
            # ECS, 絶対値スケーリング
            - And: 
                - Variable: "$.aws_account_id"
                  IsPresent: true
                - Variable: "$.ecs_cluster_name"
                  IsPresent: true
                - Variable: "$.ecs_service_name"
                  IsPresent: true
                - Variable: "$.ec2_autoscaling_group_name"
                  IsPresent: false
                - Variable: "$.min"
                  IsPresent: true
                - Variable: "$.max"
                  IsPresent: true
                - Variable: "$.scaling_unit"
                  IsPresent: false
              Next: WaitForPermissionScalingAbsoluteValueECS
            # ECS, 相対値スケーリング
            - And: 
                - Variable: "$.aws_account_id"
                  IsPresent: true
                - Variable: "$.ecs_cluster_name"
                  IsPresent: true
                - Variable: "$.ecs_service_name"
                  IsPresent: true
                - Variable: "$.ec2_autoscaling_group_name"
                  IsPresent: false
                - Variable: "$.min"
                  IsPresent: false
                - Variable: "$.max"
                  IsPresent: false
                - Variable: "$.scaling_unit"
                  IsPresent: true
              Next: WaitForPermissionScalingRelativeValueECS
            # EC2, 絶対値スケーリング
            - And: 
                - Variable: "$.aws_account_id"
                  IsPresent: true
                - Variable: "$.ecs_cluster_name"
                  IsPresent: false
                - Variable: "$.ecs_service_name"
                  IsPresent: false
                - Variable: "$.ec2_autoscaling_group_name"
                  IsPresent: true
                - Variable: "$.min"
                  IsPresent: true
                - Variable: "$.max"
                  IsPresent: true
                - Variable: "$.scaling_unit"
                  IsPresent: false
              Next: WaitForPermissionScalingAbsoluteValueEC2
            # EC2, 相対値スケーリング
            - And: 
                - Variable: "$.aws_account_id"
                  IsPresent: true
                - Variable: "$.ecs_cluster_name"
                  IsPresent: false
                - Variable: "$.ecs_service_name"
                  IsPresent: false
                - Variable: "$.ec2_autoscaling_group_name"
                  IsPresent: true
                - Variable: "$.min"
                  IsPresent: false
                - Variable: "$.max"
                  IsPresent: false
                - Variable: "$.scaling_unit"
                  IsPresent: true
              Next: WaitForPermissionScalingRelativeValueEC2
          Default: InputInvalidParameters
        WaitForPermissionScalingAbsoluteValueECS: 
          Type: Task
          Resource: arn:aws:states:::lambda:invoke.waitForTaskToken # task token が送られるまで待機するコールバックタスク
          Parameters: 
            # 待機中にこの Lambda を実行し task token を返させる(この Lambda 自身は task token を返さない)
            # PostScalingRequestToSlackLambda -> Slack -> API Gateway -> ReturnTaskTokenToStepFunctionsLambda(task token) -> Step Functions
            FunctionName: !Ref PostScalingRequestToSlackLambda
            Payload: # Slack から渡されたパラメータ + task token を PostScalingRequestToSlackLambda に渡す
              TaskToken.$: $$.Task.Token
              AWSAccountID.$: $.aws_account_id
              ECSClusterName.$: $.ecs_cluster_name
              ECSServiceName.$: $.ecs_service_name
              Min.$: $.min
              Max.$: $.max
          ResultPath: $.payload # Slack から渡されたパラメータに ReturnTaskTokenToStepFunctionsLambda の出力結果(slack_bot_token, desired_count, max_capacity)を付加
          TimeoutSeconds: 600 # 承認待ちの最大時間
          Catch: 
            - ErrorEquals: 
              - RejectedScalingRequest # ReturnTaskTokenToStepFunctionsLambda からこのエラー(カスタム例外クラス)が返された場合に該当
              Next: Rejected
            - ErrorEquals: 
              - States.ALL # それ以外のエラーが返された場合に該当
              Next: Error
          Next: ExecScalingEC2OrECS
        WaitForPermissionScalingRelativeValueECS: 
          Type: Task
          Resource: arn:aws:states:::lambda:invoke.waitForTaskToken # task token が送られるまで待機するコールバックタスク
          Parameters: 
            # 待機中にこの Lambda を実行し task token を返させる(この Lambda 自身は task token を返さない)
            # PostScalingRequestToSlackLambda -> Slack -> API Gateway -> ReturnTaskTokenToStepFunctionsLambda(task token) -> Step Functions
            FunctionName: !Ref PostScalingRequestToSlackLambda
            Payload: # Slack から渡されたパラメータ + task token を PostScalingRequestToSlackLambda に渡す
              TaskToken.$: $$.Task.Token
              AWSAccountID.$: $.aws_account_id
              ECSClusterName.$: $.ecs_cluster_name
              ECSServiceName.$: $.ecs_service_name
              ScalingUnit.$: $.scaling_unit
          ResultPath: $.payload # Slack から渡されたパラメータに ReturnTaskTokenToStepFunctionsLambda の出力結果(slack_bot_token, desired_count, max_capacity)を付加
          TimeoutSeconds: 600 # 承認待ちの最大時間
          Catch: 
            - ErrorEquals: 
              - RejectedScalingRequest # ReturnTaskTokenToStepFunctionsLambda からこのエラー(カスタム例外クラス)が返された場合に該当
              Next: Rejected
            - ErrorEquals: 
              - States.ALL # それ以外のエラーが返された場合に該当
              Next: Error
          Next: ExecScalingEC2OrECS
        WaitForPermissionScalingAbsoluteValueEC2: 
          Type: Task
          Resource: arn:aws:states:::lambda:invoke.waitForTaskToken # task token をが送られるまで待機するコールバックタスク
          Parameters: 
            # 待機中にこの Lambda を実行し task token を返させる(この Lambda 自身は task token を返さない)
            # PostScalingRequestToSlackLambda -> Slack -> API Gateway -> ReturnTaskTokenToStepFunctionsLambda(task token) -> Step Functions
            FunctionName: !Ref PostScalingRequestToSlackLambda
            Payload: # Slack から渡されたパラメータ + task token を PostScalingRequestToSlackLambda に渡す
              TaskToken.$: $$.Task.Token
              AWSAccountID.$: $.aws_account_id
              EC2AutoScalingGroupName.$: $.ec2_autoscaling_group_name
              Min.$: $.min
              Max.$: $.max
          ResultPath: $.payload # Slack から渡されたパラメータに ReturnTaskTokenToStepFunctionsLambda の出力結果(slack_bot_token, desired_count, max_capacity)を付加
          TimeoutSeconds: 600 # 承認待ちの最大時間
          Catch: 
            - ErrorEquals: 
              - RejectedScalingRequest # ReturnTaskTokenToStepFunctionsLambda からこのエラー(カスタム例外クラス)が返された場合に該当
              Next: Rejected
            - ErrorEquals: 
              - States.ALL # それ以外のエラーが返された場合に該当
              Next: Error
          Next: ExecScalingEC2OrECS
        WaitForPermissionScalingRelativeValueEC2: 
          Type: Task
          Resource: arn:aws:states:::lambda:invoke.waitForTaskToken # task token が送られるまで待機するコールバックタスク
          Parameters: 
            # 待機中にこの Lambda を実行し task token を返させる(この Lambda 自身は task token を返さない)
            # PostScalingRequestToSlackLambda -> Slack -> API Gateway -> ReturnTaskTokenToStepFunctionsLambda(task token) -> Step Functions
            FunctionName: !Ref PostScalingRequestToSlackLambda
            Payload: # Slack から渡されたパラメータ + task token を PostScalingRequestToSlackLambda に渡す
              TaskToken.$: $$.Task.Token
              AWSAccountID.$: $.aws_account_id
              ECSClusterName.$: $.ecs_cluster_name
              ECSServiceName.$: $.ecs_service_name
              ScalingUnit.$: $.scaling_unit
          ResultPath: $.payload # Slack から渡されたパラメータに ReturnTaskTokenToStepFunctionsLambda の出力結果(slack_bot_token, desired_count, max_capacity)を付加
          TimeoutSeconds: 600 # 承認待ちの最大時間
          Catch: 
            - ErrorEquals: 
              - RejectedScalingRequest # ReturnTaskTokenToStepFunctionsLambda からこのエラー(カスタム例外クラス)が返された場合に該当
              Next: Rejected
            - ErrorEquals: 
              - States.ALL # それ以外のエラーが返された場合に該当
              Next: Error
          Next: ExecScalingEC2OrECS
        ExecScalingEC2OrECS: 
          Type: Task
          Resource: !GetAtt ExecScalingEC2AndECSLambda.Arn
          End: true
        InputInvalidParameters: 
          Type: Fail
          Cause: "Required parameters are missing or invalid parameter combination."
          Error: InputInvalidParametersError
          Comment: 必須パラメータが存在しないか、パラメータの組み合わせに異常があります。
        Rejected:
          Type: Pass
          Comment: スケーリング申請が拒否されました。
          End: true
        Error: 
          Type: Fail
          Cause: Error occured. Exec SendTaskFailure API.
          Comment: エラーが発生したため、ステートマシンを停止します。
    Tags: 
      Name: !Sub ScalingEC2AndECSFromSlack${CamelShortEnv}

各ステートごとに見てみる。

CheckParameters

template.yaml
CheckParameters: 
  Type: Choice
  Choices: 
    # ECS, 絶対値スケーリング
    - And: 
        - Variable: "$.aws_account_id"
          IsPresent: true
        - Variable: "$.ecs_cluster_name"
          IsPresent: true
        - Variable: "$.ecs_service_name"
          IsPresent: true
        - Variable: "$.ec2_autoscaling_group_name"
          IsPresent: false
        - Variable: "$.min"
          IsPresent: true
        - Variable: "$.max"
          IsPresent: true
        - Variable: "$.scaling_unit"
          IsPresent: false
      Next: WaitForPermissionScalingAbsoluteValueECS
    # ECS, 相対値スケーリング
    - And: 
        - Variable: "$.aws_account_id"
          IsPresent: true
        - Variable: "$.ecs_cluster_name"
          IsPresent: true
        - Variable: "$.ecs_service_name"
          IsPresent: true
        - Variable: "$.ec2_autoscaling_group_name"
          IsPresent: false
        - Variable: "$.min"
          IsPresent: false
        - Variable: "$.max"
          IsPresent: false
        - Variable: "$.scaling_unit"
          IsPresent: true
      Next: WaitForPermissionScalingRelativeValueECS
    # EC2, 絶対値スケーリング
    - And: 
        - Variable: "$.aws_account_id"
          IsPresent: true
        - Variable: "$.ecs_cluster_name"
          IsPresent: false
        - Variable: "$.ecs_service_name"
          IsPresent: false
        - Variable: "$.ec2_autoscaling_group_name"
          IsPresent: true
        - Variable: "$.min"
          IsPresent: true
        - Variable: "$.max"
          IsPresent: true
        - Variable: "$.scaling_unit"
          IsPresent: false
      Next: WaitForPermissionScalingAbsoluteValueEC2
    # EC2, 相対値スケーリング
    - And: 
        - Variable: "$.aws_account_id"
          IsPresent: true
        - Variable: "$.ecs_cluster_name"
          IsPresent: false
        - Variable: "$.ecs_service_name"
          IsPresent: false
        - Variable: "$.ec2_autoscaling_group_name"
          IsPresent: true
        - Variable: "$.min"
          IsPresent: false
        - Variable: "$.max"
          IsPresent: false
        - Variable: "$.scaling_unit"
          IsPresent: true
      Next: WaitForPermissionScalingRelativeValueEC2
  Default: InputInvalidParameters

Choices にて、ステートマシンが次にどのステートに遷移するかを決定するための条件式の配列を記載している。(1行で簡潔に説明しようとすると難しい、、、)

尚、頭に $ がついた $.aws_account_id などはステートマシン実行時に与える JSON パラメータのキーを表す。
先ほどスクショをチラッと載せているが、Slack から CLI 実行時は以下のようになり、このパラメータの値に対し条件式に該当するかどうかを判定している。

@aws stepfunctions start-execution --input {"min": 1, "max": 3, "aws_account_id": "123456789012", "ecs_cluster_name": "hoge", "ecs_service_name": "huga"} --state-machine-arn arn:aws:states:ap-northeast-1:123456789012:stateMachine:ScalingEC2AndECSFromSlackStg --region ap-northeast-1

Choices の使い方としては、例えば以下のようにパラメータを特定の数値または文字列と比較し、true である場合に Next で指定したステートに遷移させるという感じだ。

template.yaml
{
  "Variable": "$.foo",
  "NumericEquals": 1,
  "Next": "FirstMatchState"
}
template.yaml
{
  "Variable": "$.foo",
  "StringEquals": "MyString",
  "Next": "FirstMatchState"
}

今回はまず And で複数条件との比較を可能にしている。(条件式の評価は上から順)
そして、コメントとして記載している通り、それぞれの条件式の中で以下の有効性をチェックする。

  1. aws_account_id が存在する
  2. ecs_cluster_name, ecs_service_name が両方存在し、ec2_autoscaling_group_name が存在しない
  3. もしくは ec2_autoscaling_group_name が存在し、ecs_cluster_name, ecs_service_name の両方が存在しない
  4. min, max が両方存在し、scaling_unit が存在しない
  5. もしくは scaling_unit が存在し、min, max の両方が存在しない

尚、min, max は現在の稼働台数に関係なく 絶対値 で AutoScaling の最小・最大値を決め打ちし、scaling_unit は現在の稼働台数にプラス何台追加という 相対値 による指定をする。

いずれかの条件式に該当した場合、それぞれの Next で指定されたステートに進み、いずれにも該当しない場合、つまり無効なパラメータである場合は Default で指定された InputInvalidParameters に進む。

InputInvalidParameters

ボリュームが少ないので先に。

template.yaml
InputInvalidParameters: 
  Type: Fail
  Cause: "Required parameters are missing or invalid parameter combination."
  Error: InputInvalidParametersError
  Comment: 必須パラメータが存在しないか、パラメータの組み合わせに異常があります。

Catch ブロックでキャッチされない限り、ステートマシンの失敗として処理し実行を停止する。(Catch ブロックでキャッチされると次のステートに進める)
ここで定義した内容は以下のように表示される。

スクリーンショット 2024-11-09 8.10.00.png

WaitForPermissionScalingAbsoluteValueECS(Absolute / Relative, ECS / EC2)

template.yaml
WaitForPermissionScalingAbsoluteValueECS: 
  Type: Task
  Resource: arn:aws:states:::lambda:invoke.waitForTaskToken # task token が送られるまで待機するコールバックタスク
  Parameters: 
    # 待機中にこの Lambda を実行し task token を返させる(この Lambda 自身は task token を返さない)
    # PostScalingRequestToSlackLambda -> Slack -> API Gateway -> ReturnTaskTokenToStepFunctionsLambda(task token) -> Step Functions
    FunctionName: !Ref PostScalingRequestToSlackLambda
    Payload: # Slack から渡されたパラメータ + task token を PostScalingRequestToSlackLambda に渡す
      TaskToken.$: $$.Task.Token
      AWSAccountID.$: $.aws_account_id
      ECSClusterName.$: $.ecs_cluster_name
      ECSServiceName.$: $.ecs_service_name
      Min.$: $.min
      Max.$: $.max
  ResultPath: $.payload # Slack から渡されたパラメータに ReturnTaskTokenToStepFunctionsLambda の出力結果(slack_bot_token, desired_count, max_capacity)を付加
  TimeoutSeconds: 600 # 承認待ちの最大時間
  Catch: 
    - ErrorEquals: 
      - RejectedScalingRequest # ReturnTaskTokenToStepFunctionsLambda からこのエラー(カスタム例外クラス)が返された場合に該当
      Next: Rejected
    - ErrorEquals: 
      - States.ALL # それ以外のエラーが返された場合に該当
      Next: Error
  Next: ExecScalingEC2OrECS

Type

Task ステートはアクティビティや Lambda を使用したり、サポートされている他の AWS サービスと統合したり、Stripe のようなサードパーティ API を呼び出したりすることができる。

Resource

ここで Resource に指定しているのは arn:aws:states:::lambda:invoke.waitForTaskToken というもの。
これは特殊なリソースで、コールバックタスクと言われる。
詳細は以下を参照してほしいが、要は今回の例でいうと、Slack 上で承認可否が確定されるまでステートマシンを待機させるためのもの。
そして SendTaskSuccess または SendTaskFailure API を実行してステートマシンに task token を返すことで再開させることができる。
尚、SendTaskFailure API が実行されるとステートマシンは停止する。

※ DeepL 翻訳
コールバックタスクは、タスクトークンが返されるまでワークフローを一時停止する方法を提供します。 タスクは人間の承認を待ったり、サードパーティと統合したり、レガシーシステムを呼び出したりする必要があるかもしれません。 このようなタスクの場合、ワークフローの実行が1年間のサービスクォータに達するまでStep Functionsを一時停止し(状態スロットリングに関連するクォータを参照)、外部プロセスやワークフローが完了するのを待つことができます。 このような状況のために、Step Functions では、AWS SDK サービス統合や、いくつかの Optimized サービス統合にタスクトークンを渡すことができます。 タスクは SendTaskSuccess または SendTaskFailure 呼び出しでそのタスクトークンを返すまで一時停止します。

コールバックに対応しているタスクは以下表を参照。

そして、ParametersFunctionName で指定した Lambda を実行し task token を返す。(Payload で Lambda に対し Slack から渡されたパラメータ + task token を渡す)
ただしその Lambda 自身が返す必要はなく、誰からだろうと task token さえ返せば問題ない。

実際今回の場合、ParametersFunctionName で指定した Lambda はスケーリングの承認リクエストを Slack に送信するのみ。
冒頭の構成図にも記載しているが、task token は別の Lambda が返している。

ResultPath

そのステートの Lambda の出力結果を、ステートへの入力 JSON(ここでは Slack から渡されたパラメータ) のどこに埋め込むかを決めるプロパティである。
個人的に以下記事が分かりやすかったので詳しくは以下参考。

しかし、コールバックタスクが使用されたステート内では、ResultPath の扱いが少し異なっており、task token を返したリソースの出力結果をステートの出力結果として扱う
試しにコールバックタスクによって実行される Lambda(ParametersFunctionName で指定した Lambda) の中で適当に return してみても、ResultPath に含まれることはなく、あくまで task token を返す Lambda の出力結果を対象としている様子。

Catch

エラー発生時に Catch の中で拾い、次のステートに遷移させることでエラーハンドリングができる。
ここでは、発生したエラーによって遷移させるステートを分岐させている。

少し整理すると、ステートマシンが Next で指定したステートに遷移できず停止するのは、前述の通り SendTaskFailure API が実行された場合と、本当にエラーが発生した場合である。
ただし、SendTaskFailure API が実行されるのは、あくまで Slack 上で「Reject」ボタンが押下されたのであって、正確にはエラーが発生しているわけではない。
そのため、エラー終了扱いにしてしまうのはよろしくない。
加えて、Step Functions の ExecutionsFailed メトリクスを検知して Slack に通知しているため、「Reject」 ボタンが押下されるたびにエラー発生通知が来てしまうのは非常に紛らわしいことになる。
そこで、ステートマシンが停止した理由によって分岐させているというわけである。

ここでは、SendTaskFailure API が実行された場合は、Lambda で RejectedScalingRequest というカスタム例外クラスを発生させ、ステート側でキャッチしている。
コンソール上で Step Functions を触っていると、あらかじめ定義された例外クラスがあるように思えるのだが、以下記事の通り好きな文字列でエラー名を定義し、そのエラーが発生したらキャッチする、ということが可能。

Lambda では例えば以下のようにすればカスタム例外クラスを発生させられる。

class RejectedScalingRequest(Exception):
    pass

### 略 ###

    if action == "Reject":
        raise RejectedScalingRequest("Rejected scaling request.")

では、Slack にメッセージを送信してからどのような流れで task token を Step Functions に返しているのか。
まずは Slack へのメッセージ送信法や送信内容について触れていく。

Slack へのメッセージ送信

実際に Slack に送信されるメッセージは以下。
誤操作を防ぐため、最終確認画面を挟んでいる。

スクリーンショット 2024-11-03 8.26.21.png

スクリーンショット 2024-11-03 8.29.09.png

Webhook URL と OAuth Tokens

Slack に対しメッセージを送信するには、Webhook URLOAuth Tokens を使用する2通りの方法がある。
簡単に説明すると、Webhook URL は各チャンネルごとに一意の URL が紐づき、それに対し必要なパラメータを持たせて POST することでメッセージの送信が可能となるもの。

一方 OAuth Tokens は、token とチャンネルが一意に紐づくのではなく、送信処理の中でチャンネルを指定することができるため、(非推奨だとは思うが)一つの token で複数チャンネルへのメッセージの送信も可能。
また、Webhook URL と違ってメッセージの送信以外の処理も可能。そのため、以下のように token の scope(権限)を制御できる。
OAuth Tokens を使用するメリットはこの辺りになるのではないかと思う。

スクリーンショット 2024-10-13 13.39.24.png

送信後のメッセージを修正する

今回は OAuth Tokens を使用する方法を採用した。
理由は、Slack に送信されたメッセージを後から修正する必要があるため。

先ほど添付したスクショの、「Approve」「Reject」 ボタンがついたメッセージは、どちらが押下されたとしても、一度結果が確定しても尚ボタンが押下できる状態は好ましくない。
とは言っても実際には、一度確定した後は仮にメッセージが残っていたとして、再度押下しても何も起こらず、task token を返す Lambda では以下のようなエラーが発生する。

[ERROR] TaskTimedOut: An error occurred (TaskTimedOut) when calling the SendTaskFailure operation: Task Timed Out: 'Provided task does not exist anymore'

「Approve」「Reject」 ボタンが押下されるとステートマシンは再開し、どちらにせよ最終的には停止する。
そして task token は一意のものであるので、『そんな task token に対応するステートマシンはどこにもないよ』と怒られている。
それでもやはり紛らわしいし、Lambda のエラーが発生し得る状態を残しておくのは美しくない。

そこで OAuth Tokens を使用すると、メッセージの修正が可能となる。(必要な権限は chat:write のみ)
以下は OAuth Tokens を使用して Slack にメッセージを送信するスクリプトの一部抜粋。(スクショで載せたメッセージを構成するものとはちょっと違うが)
ひとまず Slack への送信法や送信内容の構成について触れるために載せておく。
尚、メッセージの修正は別の Lambda が実施しているため後述。

### 略 ###

from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

### 略 ###

def create_slack_payload(task_token, aws_account_id, ecs_cluster_name, ecs_service_name, ec2_autoscaling_group_name, min, max, ec2_or_ecs):
    if ec2_or_ecs == "ECS":
        text = f'''
AWS アカウント ID: *{aws_account_id}*
ECS クラスター名: *{ecs_cluster_name}*
ECS サービス名: *{ecs_service_name}*
スケーリング予定数 *min: {min}, max: {max}*
'''
    elif ec2_or_ecs == "EC2":
        text = f'''
AWS アカウント ID: *{aws_account_id}*
AutoScaling グループ名: *{ec2_autoscaling_group_name}*
スケーリング予定数 *min: {min}, max: {max}*
'''
    logger.info(f"text: {text}")
    
    payload = {
        "blocks": [
            {
		    	"type": "section",
		    	"text": {
		    		"type": "mrkdwn",
		    		"text": text
		    	}
		    },
            {
		        "type": "actions",
		        "elements": [
		            {
		  	            "type": "button",
		  	            "text": {
		  	                "type": "plain_text",
		  	                "text": "Approve"
		  	            },
                        "confirm": {
						    "title": {
						    	"type": "plain_text",
						    	"text": "承認してよろしいですか?"
						    },
						    "text": {
						    	"type": "mrkdwn",
						    	"text": text
						    },
						    "confirm": {
						    	"type": "plain_text",
						    	"text": "OK"
						    },
						    "deny": {
						    	"type": "plain_text",
						    	"text": "Cancel"
						    }
					    },
		  	            "style": "primary",
		  	            "value": task_token
		            },
		            {
		  	            "type": "button",
		  	            "text": {
		  	                "type": "plain_text",
		  	                "text": "Reject"
		  	            },
                        "confirm": {
						    "title": {
						    	"type": "plain_text",
						    	"text": "却下してよろしいですか?"
						    },
						    "text": {
						    	"type": "mrkdwn",
						    	"text": text
						    },
						    "confirm": {
						    	"type": "plain_text",
						    	"text": "OK"
						    },
						    "deny": {
						    	"type": "plain_text",
						    	"text": "Cancel"
						    }
					    },
		  	            "style": "danger",
		  	            "value": task_token
		            }
		        ]
	        }
	    ]
    }
    return payload

### 略 ###

def post_message_to_slack(slack_bot_token, slack_channel, payload):
    try:
        client = WebClient(token=slack_bot_token)
        response = client.chat_postMessage(
            channel=slack_channel,
            text="fallback",
            blocks =payload["blocks"]
        )
        if response["ok"]:
            logger.info(f"Succeeded to post message to Slack!!! Response: {json.dumps(response.data, indent=2)}")
    except SlackApiError as e:
        logger.error(f"Failed to post message to Slack.\nslack_channel: {slack_channel}, slack_bot_token: {slack_bot_token}")
        raise
    except Exception as e:
        raise

### 略 ###

Slack 送信用 payload の作成時は以下の使用がオススメ。

このようにしてメッセージを送信している。
"value": task_token とあるように、Slack に送信されたメッセージのボタンを押下すると task_token が返されるようになっている。(他の値も送信したい場合は "value" にまとめてあげれば OK)

では続いて、task_token を返している Lambda の説明に移る。

task token を Step Functions に返却する

task token を返却するフロー

Slack 上のボタンを押下した際のリクエスト先は、Slack app の 「Interactivity & Shortcuts」→「Interactivity」→「Request URL」 で設定できる。
ここでは API Gateway の URL を設定しており、Slack → API Gateway → Lambda の流れで実行している。

スクリーンショット 2024-10-14 11.51.39.png

そして API Gateway によってキックされる Lambda が "value": task_token を Step Functions に返している。

個人的に思うこの構成の難しいところは、この API Gateway や Lambda は Step Functions のオーケストレーション範囲外に位置するところ。
WaitForPermissionScalingAbsoluteValueECS などが実行されてから次のステートに遷移するまでの間に、以下の2ステップが Step Funtions の外側で実行されている。

  1. Slack からリクエストされる API Gateway
  2. task token を返す Lambda

task token の返却法

task token をパラメータとして持たせ、次のステートに進める場合はSendTaskSuccess API を、エラー終了させたい場合は SendTaskFailure API を実行する。
詳細は以下参考。

以下のようなイメージ。
payload で次のステートに渡したいパラメータを定義している。

def return_task_token_to_stepfunctions(action, taskToken, slack_bot_token):
    sfn = boto3.client("stepfunctions", region_name=region)
    try:
        if action == "Approve":
            payload = {
                "slack_bot_token": slack_bot_token
            }
            sfn.send_task_success(
                taskToken = taskToken,
                output    = json.dumps(payload) # 次の state(ExecScalingEC2OrECS) に渡される
            )
            logger.info("Got approval scaling. Exec 'SendTaskSuccess' api for Step Functions")
        elif action == "Reject":
            payload = {
                "cause": "Rejected scaling. This is not error."
            }
            sfn.send_task_failure(
                taskToken = taskToken,
                cause     = json.dumps(payload) # 次の state(ExecSendTaskFailureAPI) に渡される
            )
            logger.info("Rejected scaling. Exec 'SendTaskFailure' api for Step Functions")

### 略 ###

Approve された場合は SendTaskSuccess API を実行し次のステートに進め、Reject された場合は SendTaskFailure API を実行し、ExecSendTaskFailureAPI ステートに遷移させ終了させている。

Slack 上での承認結果の取得法

ここで気になるのは Slack で実行された action の取得法。
ボタンが押下され API Gateway が発火すると、以下のようなイベントが Lambda に渡される。(user 関連の値はボタンを押下したユーザーの情報)

event
{
    "type": "block_actions",
    "user": {
        "id": "<slack_user_id>",
        "username": "<slack_user_name>",
        "name": "<slack_user_name>",
        "team_id": "<slack_workspace_id>"
    },
    "api_app_id": "<slack_app_id>",
    "token": "xxxxxxxxxxxx",
    "container": {
        "type": "message",
        "message_ts": "1728867701.411119",
        "channel_id": "<slack_channel_id>",
        "is_ephemeral": false
    },
    "trigger_id": "xxxxxxxxxxxxx.xxxxxxxxxx.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
    "team": {
        "id": "<slack_workspace_id>",
        "domain": "<slack_workspace_name>"
    },
    "enterprise": null,
    "is_enterprise_install": false,
    "channel": {
        "id": "<slack_channel_id>",
        "name": "<slack_channel_name>"
    },
    "message": {
        "user": "<slack_user_id>",
        "type": "message",
        "ts": "1728867701.411119",
        "bot_id": "<slack_bot_id>",
        "app_id": "<slack_app_id>",
        "text": "fallback",
        "team": "<slack_workspace_id>",
        "blocks": [
            {
                "type": "section",
                "block_id": "0Y1/u",
                "text": {
                    "type": "mrkdwn",
                    "text": "hogehoge",
                    "verbatim": false
                }
            },
            {
                "type": "actions",
                "block_id": "xWtcU",
                "elements": [
                    {
                        "type": "button",
                        "action_id": "6AJqd",
                        "text": {
                            "type": "plain_text",
                            "text": "Approve",
                            "emoji": true
                        },
                        "style": "primary",
                        "value": "<stepfunctions_task_token>",
                        "confirm": {
                            "title": {
                                "type": "plain_text",
                                "text": "承認してよろしいですか?",
                                "emoji": true
                            },
                            "text": {
                                "type": "mrkdwn",
                                "text": "hogehoge",
                                "verbatim": false
                            },
                            "confirm": {
                                "type": "plain_text",
                                "text": "OK",
                                "emoji": true
                            },
                            "deny": {
                                "type": "plain_text",
                                "text": "Cancel",
                                "emoji": true
                            }
                        }
                    },
                    {
                        "type": "button",
                        "action_id": "MEO4X",
                        "text": {
                            "type": "plain_text",
                            "text": "Reject",
                            "emoji": true
                        },
                        "style": "danger",
                        "value": "<stepfunctions_task_token>",
                        "confirm": {
                            "title": {
                                "type": "plain_text",
                                "text": "却下してよろしいですか?",
                                "emoji": true
                            },
                            "text": {
                                "type": "mrkdwn",
                                "text": "hogehoge",
                                "verbatim": false
                            },
                            "confirm": {
                                "type": "plain_text",
                                "text": "OK",
                                "emoji": true
                            },
                            "deny": {
                                "type": "plain_text",
                                "text": "Cancel",
                                "emoji": true
                            }
                        }
                    }
                ]
            }
        ]
    },
    "state": {
        "values": {}
    },
    "response_url": "https://hooks.slack.com/actions/<slack_workspace_id>/xxxxxxxxxxxx/xxxxxxxxxxxx",
    "actions": [
        {
            "confirm": {
                "title": {
                    "type": "plain_text",
                    "text": "却下してよろしいですか?",
                    "emoji": true
                },
                "text": {
                    "type": "mrkdwn",
                    "text": "hogehoge",
                    "verbatim": false
                },
                "confirm": {
                    "type": "plain_text",
                    "text": "OK",
                    "emoji": true
                },
                "deny": {
                    "type": "plain_text",
                    "text": "Cancel",
                    "emoji": true
                }
            },
            "action_id": "MEO4X",
            "block_id": "xWtcU",
            "text": {
                "type": "plain_text",
                "text": "Reject",
                "emoji": true
            },
            "value": "<stepfunctions_task_token>",
            "style": "danger",
            "type": "button",
            "action_ts": "1728867761.265754"
        }
    ]
}

ここから必要な情報を取得し、action によって処理を分岐させたり、メンション付きで新たにメッセージを送信したりすることが可能。

送信されたメッセージの修正も、この JSON から情報を取得することで可能となる。
メッセージの特定に必要な情報は、チャンネル ID と送信されたタイムスタンプである。
チャンネル ID は良いとして、タイムスタンプをどのようにして取得するかというと、上記 API Gateway から渡されるイベントのうち、message_ts が該当。
この二つをもって以下のように記述することでメッセージの修正が可能となる。

### 略 ###

def lambda_handler(event, context):
    decoded_body = b64decode(event["body"])
    parsed_body = parse.parse_qs(decoded_body.decode("utf-8"))
    api_gateway_request_body = json.loads(parsed_body["payload"][0])

    user      = api_gateway_request_body.get("user", {}).get("name")
    action    = api_gateway_request_body.get("actions", [{}])[0].get("text", {}).get("text", None)
    taskToken = api_gateway_request_body.get("actions", [{}])[0].get("value", None)
    timestamp = api_gateway_request_body.get("container", {}).get("message_ts", None)

### 略 ###

def update_posted_message_to_slack(user, action, timestamp, slack_bot_token):
    payload = create_slack_payload(user, action)
    try:
        client = WebClient(token=slack_bot_token)
        response = client.chat_update(
            token=slack_bot_token,
            channel=slack_channel,
            text="fallback",
            ts=timestamp, # timestamp で変更対象のメッセージを特定
            blocks=payload["blocks"]
        )
        if response["ok"]:
            logger.info(f"Succeeded to update posted message to Slack!!! Response: {json.dumps(response.data, indent=2)}")
    except SlackApiError as e:
        logger.error(f"Failed to update message to Slack.\nslack_channel: {slack_channel}, slack_bot_token: {slack_bot_token}")
        raise
    except Exception as e:
        raise

### 略 ###

今回は、誰が承認 or 拒否したのかという内容でボタン付きメッセージを上書きすることで、重複してボタンを押下できない、かつ誰がボタンを押下したのかを表示することでセルフ承認を回避している。
※ 厳密に言えば、Step Functions やら Lambda やらの処理中に間髪空けずにボタンを連打すれば重複して押下することもできるし、セルフ承認に関しては何も制御はしていない(心理的にしづらいだけ)

ここまでで、『Slack に承認可否を問うメッセージを送信』、『承認結果を受けて処理を分岐』、『承認結果でメッセージを上書き』までできたので、最後に実際にリソースをスケーリングするところ。

他 AWS アカウントのリソースをスケーリングする

ExecScalingEC2OrECS

template.yaml
ExecScalingEC2OrECS: 
  Type: Task
  Resource: !GetAtt ExecScalingEC2AndECSLambda.Arn
  End: true

これはただ対象リソースをスケーリングする Lambda を実行しているのみ。
触れておくべき事項としては、①パラメータの受け渡し ②他 AWS アカウントのリソース操作法の2点だろうか。

パラメータの受け渡し

ここまでで、コールバックタスクやら Lambda やら API Gateway やら複数リソースが登場しているが、Step Functions 内で次のステートへのパラメータの受け渡しは実は初めて。(コールバックタスクのところで Lambda に渡しているがちょっと例外的)

リソースのスケーリングにあたり必要なパラメータは、以下の通り一番初めにSlack から渡されたもののみ。

  1. 操作対象の AWS アカウント ID
  2. 対象 ECS or EC2 の情報
  3. (最小台数 and 最大台数) or 追加台数

また、スケーリングの実行結果を Slack に通知したいので、OAuth Tokens も必要となる。
OAuth Tokens は前段までの Lambda で既に使用しているので、ステートを通じて Lambda から Lambda へと渡してやる。
ここで先ほど以下のように説明した ResultPath が生きてくる。

そのステートの Lambda の出力結果を、ステートへの入力 JSON のどこに埋め込むかを決めるプロパティである。
うまく表現できないので詳しくは以下参考。

https://dev.classmethod.jp/articles/stepfunctions-parameters-inter-states/#ResultPath

しかし、コールバックタスクが使用されたステート内では、ResultPath の扱いが少し異なるように思える。
おそらく、コールバックタスクが使用されたステート内では、task token を返したリソースの出力結果をステートの出力結果として扱う。
試しに、コールバックタスクによって実行される Lambda(Slack に承認を問うメッセージを送信する) の中で適当に return してみても、ResultPath に含まれることはなく、あくまで task token を返す Lambda の出力結果を対象としている様子。

まず、ExecScalingEC2OrECS ステートの前に位置するステートから OAuth Tokens は渡される。
前に位置するステートというのは、WaitForPermissionScalingAbsoluteValueECS などのコールバックタスク。
つまり、ResultPath はこのステートの Lambda の出力結果を、ステートへの入力 JSON、つまりは Slack から渡されたパラメータにどのように付加するのかを決めるということになる。

そして、このステートの Lambda の出力結果というのは、『task token を返したリソースの出力結果』 を指す。
task token を Step Functions に返却する の項でソースを載せているが、output の部分が出力結果となり、slack_bot_token を出力している。(以下該当部抜粋)

if action == "Approve":
    payload = {
        "slack_bot_token": slack_bot_token
    }
    sfn.send_task_success(
        taskToken = taskToken,
        output    = json.dumps(payload) # 次の state(ExecScalingEC2OrECS) に渡される
    )

ここでは output という変数に格納して出力している。
Step Functions 側では ResultPath: $.output とすることで、ステートへの入力 JSON に output というキーで Lambda の出力結果である payload を埋め込んでいる。(以下該当部抜粋)

template.yaml
WaitForPermissionScalingAbsoluteValueECS: 
  Type: Task
  Resource: arn:aws:states:::lambda:invoke.waitForTaskToken # task token が送られるまで待機するコールバックタスク
  Parameters: 
    # 待機中にこの Lambda を実行し task token を返させる(この Lambda 自身は task token を返さない)
    # PostScalingRequestToSlackLambda -> Slack -> API Gateway -> ReturnTaskTokenToStepFunctionsLambda(task token) -> Step Functions
    FunctionName: !Ref PostScalingRequestToSlackLambda
    Payload: # Slack から渡されたパラメータ + task token を PostScalingRequestToSlackLambda に渡す
      TaskToken.$: $$.Task.Token
      AWSAccountID.$: $.aws_account_id
      ECSClusterName.$: $.ecs_cluster_name
      ECSServiceName.$: $.ecs_service_name
      Min.$: $.min
      Max.$: $.max
  ResultPath: $.payload # Slack から渡されたパラメータに ReturnTaskTokenToStepFunctionsLambda の出力結果(slack_bot_token, desired_count, max_capacity)を付加

すると ExecScalingEC2OrECS ステートの Lambda では、 Slack から渡されたパラメータに加えて OAuth Tokens が受け取れるようになる。

他 AWS アカウントのリソース操作法

今回は一つのアカウントのリソースから複数アカウントのリソースをスケーリングする。
以下のように assume role して対象リソースの操作権限を取得し、スケーリングするための API を実行する。
この辺りは boto3 のドキュメント参照。

def assume_role(aws_account_id, ec2_or_ecs):
    role_arn = f"arn:aws:iam::{aws_account_id}:role/Scaling{ec2_or_ecs}FromInfraAccountRole{camel_short_env}"
    role_session_name = f"{ec2_or_ecs}ScalingFromInfraAccountSession"
    
    try:
        sts_client = boto3.client("sts")
        response = sts_client.assume_role(
            RoleArn=role_arn,
            RoleSessionName=role_session_name,
            DurationSeconds=900 # 指定できる最小値
        )
        credentials = response["Credentials"]
        client_params = {
            "aws_access_key_id": credentials["AccessKeyId"],
            "aws_secret_access_key": credentials["SecretAccessKey"],
            "aws_session_token": credentials["SessionToken"],
            "region_name": region
        }
        return client_params
    except ClientError as e:
        logger.error(f"Failed to assume role: {str(e)}\n{traceback.format_exc()}")
        raise
    except Exception as e:
        logger.error(f"Unexpected error: {str(e)}\n{traceback.format_exc()}")
        raise

def ecs_scaling(aws_account_id, ecs_cluster_name, ecs_service_name, slack_bot_token, ec2_or_ecs, min, max):
    ecs_service_console_url = f"https://{region}.console.aws.amazon.com/ecs/v2/clusters/{ecs_cluster_name}/services/{ecs_service_name}/tasks?region={region}"
    
    client_params = assume_role(aws_account_id, ec2_or_ecs)
    try:
        app_autoscaling_client = boto3.client("application-autoscaling", **client_params)
        app_autoscaling_client.register_scalable_target(
            ServiceNamespace="ecs",
            ScalableDimension="ecs:service:DesiredCount",
            ResourceId=f"service/{ecs_cluster_name}/{ecs_service_name}",
            MinCapacity=min,
            MaxCapacity=max
        )
        logger.info(f"Scaling {ec2_or_ecs} succeeded!!!")

### 略 ###

以上で、Slack から AWS CLI を実行し EC2 or ECS をスケーリングすることができるようになった。
これでいつ障害が起きても、Slack が見られる環境さえあればスケーリングし放題となった。
ただしもちろん障害が起きないに越したことはない。。。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?