はじめに
何かしらサービスに障害が発生した際、大体が EC2 or ECS をスケーリングすることで問題が解消することが多くあった。
これは、アクセスが集中する想定外のイベントが発生することが原因。
当然 AutoScaling するような構成にはなっているが、それでも追いつかないほどの負荷がかかってしまうことがあり、そうなった場合は基本的には SRE の方で一気にスケーリングすることで対応していた。
しかし先日、勤務時間外に障害が発生し、初動対応が遅れてしまったことをきっかけに運用体制を見直すことになった。
そこで、休日でも外出先でも、スマホで Slack から簡単に、でも安全にスケーリングできたらいいね。という話になり作った構成のお話。
目標
- Slack から AWS リソースを操作し EC2 or ECS をスケーリングできる
- AWS リソース操作は最小権限で
- 承認フロー付き
- 実行できる Slack チャンネルは一つのみ
- 複数 AWS アカウントのリソースを操作できる
先に構成図
初見ではなんのこっちゃ状態だと思うのでそれぞれ詳細は後述。
Slack から AWS CLI を実行する
まず、Slack チャンネルに AWS Chatbot というアプリ(以後 Slack Chatbot)を入れる。
続いて、AWS サービスの AWS Chatbot と該当チャンネルを連携してやるのみ。(連携法はここでは触れないが簡単)
すると、先ほど Slack Chatbot を入れたチャンネルから、メッセージの頭に@aws
をつけると AWS CLI が実行可能になる。
どのようにして AWS API を実行しているのかというと、Slack Chatbot が AWS Chatbot に CLI コマンドを渡し、AWS Chatbot が API を実行しているイメージ。
例えば @aws lambda list-functions
を実行すると以下のように返ってくる。
(マスキングしている部分は自分宛のメンションやら AWS アカウント ID、リソース名など)
今回の構成では Slack から Step Functions を実行するのだが、実行系のコマンドの場合は以下のように確認が入り、[Run command] を押下すると実行される。
尚、ここで実行できる API は AWS Chatbot の権限に依存するため、適切に制限すれば事故が起きることはない。
更に AWS Chatbot のガードレールのポリシーで制限しておくとより安全。
ガードレールのポリシーとは、AWS Chatbot の権限の上限のようなもので、例えばガードレールのポリシーに ReadOnlyAccess
を設定すると、AWS Chatbot に AdministratorAccess
を付与しても、実際に許可されるのはガードレールのポリシーで設定した ReadOnlyAccess
の範囲内となる。
Step Functions を使用するメリット
AWS Chatbot の権限を制御したからといって安全というわけではなく、自由にスケーリングできてしまうと、誤ってインスタンスやタスクを0台にされてしまう恐れがある。
ただこの辺りは、例えば Lambda で『現在の台数より減らすことはできない』というような制限をかけるなど、やりようはたくさんあると思う。
今回は実際に減らしたい場合(障害が落ち着いた、DB がボトルネックになっており Web サーバが増えると障害を助長する場合など)も想定し、現在の台数より減らすような動きがある場合は制限をかけるのではなく警告文を表示するようにしている。
更に承認フローを挟むことでより安全なフローにすることができる。(現状ではセルフ承認ができる構成だが、そこを制御するほどまで厳格化していない)
Step Functions を使用するメリットとしては以下が考えられるか。
(というかそもそも Step Functions を使用しない場合、『Lambda の処理を途中で止め、承認結果を受け取ってから再開させる』ということができるのか不明)
-
『承認可否の結果を受け取って次の処理を走らせる』 ということが容易にできる
- 一意である task token というものを受け取るまで待機するということができる(後述)
- 承認依頼から承認可否まで、必ずしもノータイムとは限らないため、単一の Lambda だけで実装しようとするとタイムアウトする可能性がある
- タイムアウトを回避するために Lambda を複数に分割するのであれば、Step Functions でオーケストレーションすべき
- Step Functions は明示的に指定しない限りタイムアウトしない
- パラメータのバリデーション、処理の順番制御、エラーハンドリングなど、コードを書かなくてもある程度実装できる
最終的な Step Functions の構成は以下。
多分これだけ見ても意味がわからないので一つずつ後述する。(もっとスマートな構成があれば是非教えてください、、、)
ざっと説明すると、必要なパラメータを持たせて Slack から Step Functions を実行することで、以下のフローを経て Lambda を実行しスケーリングしている。
- パラメータのバリデーション
- 必須パラメータである AWS アカウント ID の存在チェック
- パラメータの組み合わせの有効性チェック
- ECS と EC2 のパラメータが混ざっていないか
- ECS クラスタ名と EC2 AutoScaling グループ名が混ざっていないかなど
- 絶対値を指定したスケーリング用のパラメータと相対値を指定したスケーリング用のパラメータが混ざっていないか
- ECS と EC2 のパラメータが混ざっていないか
- パラメータの内容によって分岐
- ECS or EC2
- 絶対値を指定したスケーリング or 相対値を指定したスケーリング
- 承認結果によって分岐
- 承認ならスケーリングを実行、拒否なら終了
パラメータのバリデーション・承認可否の結果を待機する
ここから実際に Step Functions の中身を見ていく。
今回は CloudFormation(SAM) で構築したため、template.yaml
を元に説明する。
まず Step Functions 全体の記述は以下。(結構なボリュームになってしまった)
template.yaml
ScalingEC2AndECSFromSlackStepFunctions:
Type: AWS::Serverless::StateMachine
Properties:
Name: !Sub ScalingEC2AndECSFromSlack${CamelShortEnv}
Role: !GetAtt ScalingEC2AndECSFromSlackStepFunctionsRole.Arn
Definition:
StartAt: CheckParameters
States:
# 渡されたパラメーターが以下条件に当てはまるかチェック
# 1. aws_account_id が存在する
# 2. ecs_cluster_name, ecs_service_name が両方存在し、ec2_autoscaling_group_name が存在しない
# 3. もしくは ec2_autoscaling_group_name が存在し、ecs_cluster_name, ecs_service_name の両方が存在しない
# 4. min, max が両方存在し、scaling_unit が存在しない
# 5. もしくは scaling_unit が存在し、min, max の両方が存在しない
CheckParameters:
Type: Choice
Choices:
# ECS, 絶対値スケーリング
- And:
- Variable: "$.aws_account_id"
IsPresent: true
- Variable: "$.ecs_cluster_name"
IsPresent: true
- Variable: "$.ecs_service_name"
IsPresent: true
- Variable: "$.ec2_autoscaling_group_name"
IsPresent: false
- Variable: "$.min"
IsPresent: true
- Variable: "$.max"
IsPresent: true
- Variable: "$.scaling_unit"
IsPresent: false
Next: WaitForPermissionScalingAbsoluteValueECS
# ECS, 相対値スケーリング
- And:
- Variable: "$.aws_account_id"
IsPresent: true
- Variable: "$.ecs_cluster_name"
IsPresent: true
- Variable: "$.ecs_service_name"
IsPresent: true
- Variable: "$.ec2_autoscaling_group_name"
IsPresent: false
- Variable: "$.min"
IsPresent: false
- Variable: "$.max"
IsPresent: false
- Variable: "$.scaling_unit"
IsPresent: true
Next: WaitForPermissionScalingRelativeValueECS
# EC2, 絶対値スケーリング
- And:
- Variable: "$.aws_account_id"
IsPresent: true
- Variable: "$.ecs_cluster_name"
IsPresent: false
- Variable: "$.ecs_service_name"
IsPresent: false
- Variable: "$.ec2_autoscaling_group_name"
IsPresent: true
- Variable: "$.min"
IsPresent: true
- Variable: "$.max"
IsPresent: true
- Variable: "$.scaling_unit"
IsPresent: false
Next: WaitForPermissionScalingAbsoluteValueEC2
# EC2, 相対値スケーリング
- And:
- Variable: "$.aws_account_id"
IsPresent: true
- Variable: "$.ecs_cluster_name"
IsPresent: false
- Variable: "$.ecs_service_name"
IsPresent: false
- Variable: "$.ec2_autoscaling_group_name"
IsPresent: true
- Variable: "$.min"
IsPresent: false
- Variable: "$.max"
IsPresent: false
- Variable: "$.scaling_unit"
IsPresent: true
Next: WaitForPermissionScalingRelativeValueEC2
Default: InputInvalidParameters
WaitForPermissionScalingAbsoluteValueECS:
Type: Task
Resource: arn:aws:states:::lambda:invoke.waitForTaskToken # task token が送られるまで待機するコールバックタスク
Parameters:
# 待機中にこの Lambda を実行し task token を返させる(この Lambda 自身は task token を返さない)
# PostScalingRequestToSlackLambda -> Slack -> API Gateway -> ReturnTaskTokenToStepFunctionsLambda(task token) -> Step Functions
FunctionName: !Ref PostScalingRequestToSlackLambda
Payload: # Slack から渡されたパラメータ + task token を PostScalingRequestToSlackLambda に渡す
TaskToken.$: $$.Task.Token
AWSAccountID.$: $.aws_account_id
ECSClusterName.$: $.ecs_cluster_name
ECSServiceName.$: $.ecs_service_name
Min.$: $.min
Max.$: $.max
ResultPath: $.payload # Slack から渡されたパラメータに ReturnTaskTokenToStepFunctionsLambda の出力結果(slack_bot_token, desired_count, max_capacity)を付加
TimeoutSeconds: 600 # 承認待ちの最大時間
Catch:
- ErrorEquals:
- RejectedScalingRequest # ReturnTaskTokenToStepFunctionsLambda からこのエラー(カスタム例外クラス)が返された場合に該当
Next: Rejected
- ErrorEquals:
- States.ALL # それ以外のエラーが返された場合に該当
Next: Error
Next: ExecScalingEC2OrECS
WaitForPermissionScalingRelativeValueECS:
Type: Task
Resource: arn:aws:states:::lambda:invoke.waitForTaskToken # task token が送られるまで待機するコールバックタスク
Parameters:
# 待機中にこの Lambda を実行し task token を返させる(この Lambda 自身は task token を返さない)
# PostScalingRequestToSlackLambda -> Slack -> API Gateway -> ReturnTaskTokenToStepFunctionsLambda(task token) -> Step Functions
FunctionName: !Ref PostScalingRequestToSlackLambda
Payload: # Slack から渡されたパラメータ + task token を PostScalingRequestToSlackLambda に渡す
TaskToken.$: $$.Task.Token
AWSAccountID.$: $.aws_account_id
ECSClusterName.$: $.ecs_cluster_name
ECSServiceName.$: $.ecs_service_name
ScalingUnit.$: $.scaling_unit
ResultPath: $.payload # Slack から渡されたパラメータに ReturnTaskTokenToStepFunctionsLambda の出力結果(slack_bot_token, desired_count, max_capacity)を付加
TimeoutSeconds: 600 # 承認待ちの最大時間
Catch:
- ErrorEquals:
- RejectedScalingRequest # ReturnTaskTokenToStepFunctionsLambda からこのエラー(カスタム例外クラス)が返された場合に該当
Next: Rejected
- ErrorEquals:
- States.ALL # それ以外のエラーが返された場合に該当
Next: Error
Next: ExecScalingEC2OrECS
WaitForPermissionScalingAbsoluteValueEC2:
Type: Task
Resource: arn:aws:states:::lambda:invoke.waitForTaskToken # task token をが送られるまで待機するコールバックタスク
Parameters:
# 待機中にこの Lambda を実行し task token を返させる(この Lambda 自身は task token を返さない)
# PostScalingRequestToSlackLambda -> Slack -> API Gateway -> ReturnTaskTokenToStepFunctionsLambda(task token) -> Step Functions
FunctionName: !Ref PostScalingRequestToSlackLambda
Payload: # Slack から渡されたパラメータ + task token を PostScalingRequestToSlackLambda に渡す
TaskToken.$: $$.Task.Token
AWSAccountID.$: $.aws_account_id
EC2AutoScalingGroupName.$: $.ec2_autoscaling_group_name
Min.$: $.min
Max.$: $.max
ResultPath: $.payload # Slack から渡されたパラメータに ReturnTaskTokenToStepFunctionsLambda の出力結果(slack_bot_token, desired_count, max_capacity)を付加
TimeoutSeconds: 600 # 承認待ちの最大時間
Catch:
- ErrorEquals:
- RejectedScalingRequest # ReturnTaskTokenToStepFunctionsLambda からこのエラー(カスタム例外クラス)が返された場合に該当
Next: Rejected
- ErrorEquals:
- States.ALL # それ以外のエラーが返された場合に該当
Next: Error
Next: ExecScalingEC2OrECS
WaitForPermissionScalingRelativeValueEC2:
Type: Task
Resource: arn:aws:states:::lambda:invoke.waitForTaskToken # task token が送られるまで待機するコールバックタスク
Parameters:
# 待機中にこの Lambda を実行し task token を返させる(この Lambda 自身は task token を返さない)
# PostScalingRequestToSlackLambda -> Slack -> API Gateway -> ReturnTaskTokenToStepFunctionsLambda(task token) -> Step Functions
FunctionName: !Ref PostScalingRequestToSlackLambda
Payload: # Slack から渡されたパラメータ + task token を PostScalingRequestToSlackLambda に渡す
TaskToken.$: $$.Task.Token
AWSAccountID.$: $.aws_account_id
ECSClusterName.$: $.ecs_cluster_name
ECSServiceName.$: $.ecs_service_name
ScalingUnit.$: $.scaling_unit
ResultPath: $.payload # Slack から渡されたパラメータに ReturnTaskTokenToStepFunctionsLambda の出力結果(slack_bot_token, desired_count, max_capacity)を付加
TimeoutSeconds: 600 # 承認待ちの最大時間
Catch:
- ErrorEquals:
- RejectedScalingRequest # ReturnTaskTokenToStepFunctionsLambda からこのエラー(カスタム例外クラス)が返された場合に該当
Next: Rejected
- ErrorEquals:
- States.ALL # それ以外のエラーが返された場合に該当
Next: Error
Next: ExecScalingEC2OrECS
ExecScalingEC2OrECS:
Type: Task
Resource: !GetAtt ExecScalingEC2AndECSLambda.Arn
End: true
InputInvalidParameters:
Type: Fail
Cause: "Required parameters are missing or invalid parameter combination."
Error: InputInvalidParametersError
Comment: 必須パラメータが存在しないか、パラメータの組み合わせに異常があります。
Rejected:
Type: Pass
Comment: スケーリング申請が拒否されました。
End: true
Error:
Type: Fail
Cause: Error occured. Exec SendTaskFailure API.
Comment: エラーが発生したため、ステートマシンを停止します。
Tags:
Name: !Sub ScalingEC2AndECSFromSlack${CamelShortEnv}
各ステートごとに見てみる。
CheckParameters
CheckParameters:
Type: Choice
Choices:
# ECS, 絶対値スケーリング
- And:
- Variable: "$.aws_account_id"
IsPresent: true
- Variable: "$.ecs_cluster_name"
IsPresent: true
- Variable: "$.ecs_service_name"
IsPresent: true
- Variable: "$.ec2_autoscaling_group_name"
IsPresent: false
- Variable: "$.min"
IsPresent: true
- Variable: "$.max"
IsPresent: true
- Variable: "$.scaling_unit"
IsPresent: false
Next: WaitForPermissionScalingAbsoluteValueECS
# ECS, 相対値スケーリング
- And:
- Variable: "$.aws_account_id"
IsPresent: true
- Variable: "$.ecs_cluster_name"
IsPresent: true
- Variable: "$.ecs_service_name"
IsPresent: true
- Variable: "$.ec2_autoscaling_group_name"
IsPresent: false
- Variable: "$.min"
IsPresent: false
- Variable: "$.max"
IsPresent: false
- Variable: "$.scaling_unit"
IsPresent: true
Next: WaitForPermissionScalingRelativeValueECS
# EC2, 絶対値スケーリング
- And:
- Variable: "$.aws_account_id"
IsPresent: true
- Variable: "$.ecs_cluster_name"
IsPresent: false
- Variable: "$.ecs_service_name"
IsPresent: false
- Variable: "$.ec2_autoscaling_group_name"
IsPresent: true
- Variable: "$.min"
IsPresent: true
- Variable: "$.max"
IsPresent: true
- Variable: "$.scaling_unit"
IsPresent: false
Next: WaitForPermissionScalingAbsoluteValueEC2
# EC2, 相対値スケーリング
- And:
- Variable: "$.aws_account_id"
IsPresent: true
- Variable: "$.ecs_cluster_name"
IsPresent: false
- Variable: "$.ecs_service_name"
IsPresent: false
- Variable: "$.ec2_autoscaling_group_name"
IsPresent: true
- Variable: "$.min"
IsPresent: false
- Variable: "$.max"
IsPresent: false
- Variable: "$.scaling_unit"
IsPresent: true
Next: WaitForPermissionScalingRelativeValueEC2
Default: InputInvalidParameters
Choices
にて、ステートマシンが次にどのステートに遷移するかを決定するための条件式の配列を記載している。(1行で簡潔に説明しようとすると難しい、、、)
尚、頭に $
がついた $.aws_account_id
などはステートマシン実行時に与える JSON パラメータのキーを表す。
先ほどスクショをチラッと載せているが、Slack から CLI 実行時は以下のようになり、このパラメータの値に対し条件式に該当するかどうかを判定している。
@aws stepfunctions start-execution --input {"min": 1, "max": 3, "aws_account_id": "123456789012", "ecs_cluster_name": "hoge", "ecs_service_name": "huga"} --state-machine-arn arn:aws:states:ap-northeast-1:123456789012:stateMachine:ScalingEC2AndECSFromSlackStg --region ap-northeast-1
Choices
の使い方としては、例えば以下のようにパラメータを特定の数値または文字列と比較し、true
である場合に Next
で指定したステートに遷移させるという感じだ。
{
"Variable": "$.foo",
"NumericEquals": 1,
"Next": "FirstMatchState"
}
{
"Variable": "$.foo",
"StringEquals": "MyString",
"Next": "FirstMatchState"
}
今回はまず And
で複数条件との比較を可能にしている。(条件式の評価は上から順)
そして、コメントとして記載している通り、それぞれの条件式の中で以下の有効性をチェックする。
-
aws_account_id
が存在する -
ecs_cluster_name
,ecs_service_name
が両方存在し、ec2_autoscaling_group_name
が存在しない - もしくは
ec2_autoscaling_group_name
が存在し、ecs_cluster_name
,ecs_service_name
の両方が存在しない -
min
,max
が両方存在し、scaling_unit
が存在しない - もしくは
scaling_unit
が存在し、min
,max
の両方が存在しない
尚、min
, max
は現在の稼働台数に関係なく 絶対値 で AutoScaling の最小・最大値を決め打ちし、scaling_unit
は現在の稼働台数にプラス何台追加という 相対値 による指定をする。
いずれかの条件式に該当した場合、それぞれの Next
で指定されたステートに進み、いずれにも該当しない場合、つまり無効なパラメータである場合は Default
で指定された InputInvalidParameters
に進む。
InputInvalidParameters
ボリュームが少ないので先に。
InputInvalidParameters:
Type: Fail
Cause: "Required parameters are missing or invalid parameter combination."
Error: InputInvalidParametersError
Comment: 必須パラメータが存在しないか、パラメータの組み合わせに異常があります。
Catch
ブロックでキャッチされない限り、ステートマシンの失敗として処理し実行を停止する。(Catch
ブロックでキャッチされると次のステートに進める)
ここで定義した内容は以下のように表示される。
WaitForPermissionScalingAbsoluteValueECS(Absolute / Relative, ECS / EC2)
WaitForPermissionScalingAbsoluteValueECS:
Type: Task
Resource: arn:aws:states:::lambda:invoke.waitForTaskToken # task token が送られるまで待機するコールバックタスク
Parameters:
# 待機中にこの Lambda を実行し task token を返させる(この Lambda 自身は task token を返さない)
# PostScalingRequestToSlackLambda -> Slack -> API Gateway -> ReturnTaskTokenToStepFunctionsLambda(task token) -> Step Functions
FunctionName: !Ref PostScalingRequestToSlackLambda
Payload: # Slack から渡されたパラメータ + task token を PostScalingRequestToSlackLambda に渡す
TaskToken.$: $$.Task.Token
AWSAccountID.$: $.aws_account_id
ECSClusterName.$: $.ecs_cluster_name
ECSServiceName.$: $.ecs_service_name
Min.$: $.min
Max.$: $.max
ResultPath: $.payload # Slack から渡されたパラメータに ReturnTaskTokenToStepFunctionsLambda の出力結果(slack_bot_token, desired_count, max_capacity)を付加
TimeoutSeconds: 600 # 承認待ちの最大時間
Catch:
- ErrorEquals:
- RejectedScalingRequest # ReturnTaskTokenToStepFunctionsLambda からこのエラー(カスタム例外クラス)が返された場合に該当
Next: Rejected
- ErrorEquals:
- States.ALL # それ以外のエラーが返された場合に該当
Next: Error
Next: ExecScalingEC2OrECS
Type
Task
ステートはアクティビティや Lambda を使用したり、サポートされている他の AWS サービスと統合したり、Stripe のようなサードパーティ API を呼び出したりすることができる。
Resource
ここで Resource
に指定しているのは arn:aws:states:::lambda:invoke.waitForTaskToken
というもの。
これは特殊なリソースで、コールバックタスクと言われる。
詳細は以下を参照してほしいが、要は今回の例でいうと、Slack 上で承認可否が確定されるまでステートマシンを待機させるためのもの。
そして SendTaskSuccess
または SendTaskFailure
API を実行してステートマシンに task token を返すことで再開させることができる。
尚、SendTaskFailure
API が実行されるとステートマシンは停止する。
※ DeepL 翻訳
コールバックタスクは、タスクトークンが返されるまでワークフローを一時停止する方法を提供します。 タスクは人間の承認を待ったり、サードパーティと統合したり、レガシーシステムを呼び出したりする必要があるかもしれません。 このようなタスクの場合、ワークフローの実行が1年間のサービスクォータに達するまでStep Functionsを一時停止し(状態スロットリングに関連するクォータを参照)、外部プロセスやワークフローが完了するのを待つことができます。 このような状況のために、Step Functions では、AWS SDK サービス統合や、いくつかの Optimized サービス統合にタスクトークンを渡すことができます。 タスクは SendTaskSuccess または SendTaskFailure 呼び出しでそのタスクトークンを返すまで一時停止します。
コールバックに対応しているタスクは以下表を参照。
そして、Parameters
の FunctionName
で指定した Lambda を実行し task token を返す。(Payload
で Lambda に対し Slack から渡されたパラメータ + task token を渡す)
ただしその Lambda 自身が返す必要はなく、誰からだろうと task token さえ返せば問題ない。
実際今回の場合、Parameters
の FunctionName
で指定した Lambda はスケーリングの承認リクエストを Slack に送信するのみ。
冒頭の構成図にも記載しているが、task token は別の Lambda が返している。
ResultPath
そのステートの Lambda の出力結果を、ステートへの入力 JSON(ここでは Slack から渡されたパラメータ) のどこに埋め込むかを決めるプロパティである。
個人的に以下記事が分かりやすかったので詳しくは以下参考。
しかし、コールバックタスクが使用されたステート内では、ResultPath
の扱いが少し異なっており、task token を返したリソースの出力結果をステートの出力結果として扱う。
試しにコールバックタスクによって実行される Lambda(Parameters
の FunctionName
で指定した Lambda) の中で適当に return
してみても、ResultPath
に含まれることはなく、あくまで task token を返す Lambda の出力結果を対象としている様子。
Catch
エラー発生時に Catch
の中で拾い、次のステートに遷移させることでエラーハンドリングができる。
ここでは、発生したエラーによって遷移させるステートを分岐させている。
少し整理すると、ステートマシンが Next
で指定したステートに遷移できず停止するのは、前述の通り SendTaskFailure
API が実行された場合と、本当にエラーが発生した場合である。
ただし、SendTaskFailure
API が実行されるのは、あくまで Slack 上で「Reject」ボタンが押下されたのであって、正確にはエラーが発生しているわけではない。
そのため、エラー終了扱いにしてしまうのはよろしくない。
加えて、Step Functions の ExecutionsFailed
メトリクスを検知して Slack に通知しているため、「Reject」 ボタンが押下されるたびにエラー発生通知が来てしまうのは非常に紛らわしいことになる。
そこで、ステートマシンが停止した理由によって分岐させているというわけである。
ここでは、SendTaskFailure
API が実行された場合は、Lambda で RejectedScalingRequest
というカスタム例外クラスを発生させ、ステート側でキャッチしている。
コンソール上で Step Functions を触っていると、あらかじめ定義された例外クラスがあるように思えるのだが、以下記事の通り好きな文字列でエラー名を定義し、そのエラーが発生したらキャッチする、ということが可能。
Lambda では例えば以下のようにすればカスタム例外クラスを発生させられる。
class RejectedScalingRequest(Exception):
pass
### 略 ###
if action == "Reject":
raise RejectedScalingRequest("Rejected scaling request.")
では、Slack にメッセージを送信してからどのような流れで task token を Step Functions に返しているのか。
まずは Slack へのメッセージ送信法や送信内容について触れていく。
Slack へのメッセージ送信
実際に Slack に送信されるメッセージは以下。
誤操作を防ぐため、最終確認画面を挟んでいる。
Webhook URL と OAuth Tokens
Slack に対しメッセージを送信するには、Webhook URL と OAuth Tokens を使用する2通りの方法がある。
簡単に説明すると、Webhook URL は各チャンネルごとに一意の URL が紐づき、それに対し必要なパラメータを持たせて POST することでメッセージの送信が可能となるもの。
一方 OAuth Tokens は、token とチャンネルが一意に紐づくのではなく、送信処理の中でチャンネルを指定することができるため、(非推奨だとは思うが)一つの token で複数チャンネルへのメッセージの送信も可能。
また、Webhook URL と違ってメッセージの送信以外の処理も可能。そのため、以下のように token の scope(権限)を制御できる。
OAuth Tokens を使用するメリットはこの辺りになるのではないかと思う。
送信後のメッセージを修正する
今回は OAuth Tokens を使用する方法を採用した。
理由は、Slack に送信されたメッセージを後から修正する必要があるため。
先ほど添付したスクショの、「Approve」「Reject」 ボタンがついたメッセージは、どちらが押下されたとしても、一度結果が確定しても尚ボタンが押下できる状態は好ましくない。
とは言っても実際には、一度確定した後は仮にメッセージが残っていたとして、再度押下しても何も起こらず、task token を返す Lambda では以下のようなエラーが発生する。
[ERROR] TaskTimedOut: An error occurred (TaskTimedOut) when calling the SendTaskFailure operation: Task Timed Out: 'Provided task does not exist anymore'
「Approve」「Reject」 ボタンが押下されるとステートマシンは再開し、どちらにせよ最終的には停止する。
そして task token は一意のものであるので、『そんな task token に対応するステートマシンはどこにもないよ』と怒られている。
それでもやはり紛らわしいし、Lambda のエラーが発生し得る状態を残しておくのは美しくない。
そこで OAuth Tokens を使用すると、メッセージの修正が可能となる。(必要な権限は chat:write
のみ)
以下は OAuth Tokens を使用して Slack にメッセージを送信するスクリプトの一部抜粋。(スクショで載せたメッセージを構成するものとはちょっと違うが)
ひとまず Slack への送信法や送信内容の構成について触れるために載せておく。
尚、メッセージの修正は別の Lambda が実施しているため後述。
### 略 ###
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
### 略 ###
def create_slack_payload(task_token, aws_account_id, ecs_cluster_name, ecs_service_name, ec2_autoscaling_group_name, min, max, ec2_or_ecs):
if ec2_or_ecs == "ECS":
text = f'''
AWS アカウント ID: *{aws_account_id}*
ECS クラスター名: *{ecs_cluster_name}*
ECS サービス名: *{ecs_service_name}*
スケーリング予定数 *min: {min}, max: {max}*
'''
elif ec2_or_ecs == "EC2":
text = f'''
AWS アカウント ID: *{aws_account_id}*
AutoScaling グループ名: *{ec2_autoscaling_group_name}*
スケーリング予定数 *min: {min}, max: {max}*
'''
logger.info(f"text: {text}")
payload = {
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": text
}
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {
"type": "plain_text",
"text": "Approve"
},
"confirm": {
"title": {
"type": "plain_text",
"text": "承認してよろしいですか?"
},
"text": {
"type": "mrkdwn",
"text": text
},
"confirm": {
"type": "plain_text",
"text": "OK"
},
"deny": {
"type": "plain_text",
"text": "Cancel"
}
},
"style": "primary",
"value": task_token
},
{
"type": "button",
"text": {
"type": "plain_text",
"text": "Reject"
},
"confirm": {
"title": {
"type": "plain_text",
"text": "却下してよろしいですか?"
},
"text": {
"type": "mrkdwn",
"text": text
},
"confirm": {
"type": "plain_text",
"text": "OK"
},
"deny": {
"type": "plain_text",
"text": "Cancel"
}
},
"style": "danger",
"value": task_token
}
]
}
]
}
return payload
### 略 ###
def post_message_to_slack(slack_bot_token, slack_channel, payload):
try:
client = WebClient(token=slack_bot_token)
response = client.chat_postMessage(
channel=slack_channel,
text="fallback",
blocks =payload["blocks"]
)
if response["ok"]:
logger.info(f"Succeeded to post message to Slack!!! Response: {json.dumps(response.data, indent=2)}")
except SlackApiError as e:
logger.error(f"Failed to post message to Slack.\nslack_channel: {slack_channel}, slack_bot_token: {slack_bot_token}")
raise
except Exception as e:
raise
### 略 ###
Slack 送信用 payload の作成時は以下の使用がオススメ。
このようにしてメッセージを送信している。
"value": task_token
とあるように、Slack に送信されたメッセージのボタンを押下すると task_token
が返されるようになっている。(他の値も送信したい場合は "value"
にまとめてあげれば OK)
では続いて、task_token
を返している Lambda の説明に移る。
task token を Step Functions に返却する
task token を返却するフロー
Slack 上のボタンを押下した際のリクエスト先は、Slack app の 「Interactivity & Shortcuts」→「Interactivity」→「Request URL」 で設定できる。
ここでは API Gateway の URL を設定しており、Slack → API Gateway → Lambda の流れで実行している。
そして API Gateway によってキックされる Lambda が "value": task_token
を Step Functions に返している。
個人的に思うこの構成の難しいところは、この API Gateway や Lambda は Step Functions のオーケストレーション範囲外に位置するところ。
WaitForPermissionScalingAbsoluteValueECS
などが実行されてから次のステートに遷移するまでの間に、以下の2ステップが Step Funtions の外側で実行されている。
- Slack からリクエストされる API Gateway
- task token を返す Lambda
task token の返却法
task token をパラメータとして持たせ、次のステートに進める場合はSendTaskSuccess
API を、エラー終了させたい場合は SendTaskFailure
API を実行する。
詳細は以下参考。
以下のようなイメージ。
payload
で次のステートに渡したいパラメータを定義している。
def return_task_token_to_stepfunctions(action, taskToken, slack_bot_token):
sfn = boto3.client("stepfunctions", region_name=region)
try:
if action == "Approve":
payload = {
"slack_bot_token": slack_bot_token
}
sfn.send_task_success(
taskToken = taskToken,
output = json.dumps(payload) # 次の state(ExecScalingEC2OrECS) に渡される
)
logger.info("Got approval scaling. Exec 'SendTaskSuccess' api for Step Functions")
elif action == "Reject":
payload = {
"cause": "Rejected scaling. This is not error."
}
sfn.send_task_failure(
taskToken = taskToken,
cause = json.dumps(payload) # 次の state(ExecSendTaskFailureAPI) に渡される
)
logger.info("Rejected scaling. Exec 'SendTaskFailure' api for Step Functions")
### 略 ###
Approve された場合は SendTaskSuccess
API を実行し次のステートに進め、Reject された場合は SendTaskFailure
API を実行し、ExecSendTaskFailureAPI
ステートに遷移させ終了させている。
Slack 上での承認結果の取得法
ここで気になるのは Slack で実行された action
の取得法。
ボタンが押下され API Gateway が発火すると、以下のようなイベントが Lambda に渡される。(user 関連の値はボタンを押下したユーザーの情報)
event
{
"type": "block_actions",
"user": {
"id": "<slack_user_id>",
"username": "<slack_user_name>",
"name": "<slack_user_name>",
"team_id": "<slack_workspace_id>"
},
"api_app_id": "<slack_app_id>",
"token": "xxxxxxxxxxxx",
"container": {
"type": "message",
"message_ts": "1728867701.411119",
"channel_id": "<slack_channel_id>",
"is_ephemeral": false
},
"trigger_id": "xxxxxxxxxxxxx.xxxxxxxxxx.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"team": {
"id": "<slack_workspace_id>",
"domain": "<slack_workspace_name>"
},
"enterprise": null,
"is_enterprise_install": false,
"channel": {
"id": "<slack_channel_id>",
"name": "<slack_channel_name>"
},
"message": {
"user": "<slack_user_id>",
"type": "message",
"ts": "1728867701.411119",
"bot_id": "<slack_bot_id>",
"app_id": "<slack_app_id>",
"text": "fallback",
"team": "<slack_workspace_id>",
"blocks": [
{
"type": "section",
"block_id": "0Y1/u",
"text": {
"type": "mrkdwn",
"text": "hogehoge",
"verbatim": false
}
},
{
"type": "actions",
"block_id": "xWtcU",
"elements": [
{
"type": "button",
"action_id": "6AJqd",
"text": {
"type": "plain_text",
"text": "Approve",
"emoji": true
},
"style": "primary",
"value": "<stepfunctions_task_token>",
"confirm": {
"title": {
"type": "plain_text",
"text": "承認してよろしいですか?",
"emoji": true
},
"text": {
"type": "mrkdwn",
"text": "hogehoge",
"verbatim": false
},
"confirm": {
"type": "plain_text",
"text": "OK",
"emoji": true
},
"deny": {
"type": "plain_text",
"text": "Cancel",
"emoji": true
}
}
},
{
"type": "button",
"action_id": "MEO4X",
"text": {
"type": "plain_text",
"text": "Reject",
"emoji": true
},
"style": "danger",
"value": "<stepfunctions_task_token>",
"confirm": {
"title": {
"type": "plain_text",
"text": "却下してよろしいですか?",
"emoji": true
},
"text": {
"type": "mrkdwn",
"text": "hogehoge",
"verbatim": false
},
"confirm": {
"type": "plain_text",
"text": "OK",
"emoji": true
},
"deny": {
"type": "plain_text",
"text": "Cancel",
"emoji": true
}
}
}
]
}
]
},
"state": {
"values": {}
},
"response_url": "https://hooks.slack.com/actions/<slack_workspace_id>/xxxxxxxxxxxx/xxxxxxxxxxxx",
"actions": [
{
"confirm": {
"title": {
"type": "plain_text",
"text": "却下してよろしいですか?",
"emoji": true
},
"text": {
"type": "mrkdwn",
"text": "hogehoge",
"verbatim": false
},
"confirm": {
"type": "plain_text",
"text": "OK",
"emoji": true
},
"deny": {
"type": "plain_text",
"text": "Cancel",
"emoji": true
}
},
"action_id": "MEO4X",
"block_id": "xWtcU",
"text": {
"type": "plain_text",
"text": "Reject",
"emoji": true
},
"value": "<stepfunctions_task_token>",
"style": "danger",
"type": "button",
"action_ts": "1728867761.265754"
}
]
}
ここから必要な情報を取得し、action
によって処理を分岐させたり、メンション付きで新たにメッセージを送信したりすることが可能。
送信されたメッセージの修正も、この JSON から情報を取得することで可能となる。
メッセージの特定に必要な情報は、チャンネル ID と送信されたタイムスタンプである。
チャンネル ID は良いとして、タイムスタンプをどのようにして取得するかというと、上記 API Gateway から渡されるイベントのうち、message_ts
が該当。
この二つをもって以下のように記述することでメッセージの修正が可能となる。
### 略 ###
def lambda_handler(event, context):
decoded_body = b64decode(event["body"])
parsed_body = parse.parse_qs(decoded_body.decode("utf-8"))
api_gateway_request_body = json.loads(parsed_body["payload"][0])
user = api_gateway_request_body.get("user", {}).get("name")
action = api_gateway_request_body.get("actions", [{}])[0].get("text", {}).get("text", None)
taskToken = api_gateway_request_body.get("actions", [{}])[0].get("value", None)
timestamp = api_gateway_request_body.get("container", {}).get("message_ts", None)
### 略 ###
def update_posted_message_to_slack(user, action, timestamp, slack_bot_token):
payload = create_slack_payload(user, action)
try:
client = WebClient(token=slack_bot_token)
response = client.chat_update(
token=slack_bot_token,
channel=slack_channel,
text="fallback",
ts=timestamp, # timestamp で変更対象のメッセージを特定
blocks=payload["blocks"]
)
if response["ok"]:
logger.info(f"Succeeded to update posted message to Slack!!! Response: {json.dumps(response.data, indent=2)}")
except SlackApiError as e:
logger.error(f"Failed to update message to Slack.\nslack_channel: {slack_channel}, slack_bot_token: {slack_bot_token}")
raise
except Exception as e:
raise
### 略 ###
今回は、誰が承認 or 拒否したのかという内容でボタン付きメッセージを上書きすることで、重複してボタンを押下できない、かつ誰がボタンを押下したのかを表示することでセルフ承認を回避している。
※ 厳密に言えば、Step Functions やら Lambda やらの処理中に間髪空けずにボタンを連打すれば重複して押下することもできるし、セルフ承認に関しては何も制御はしていない(心理的にしづらいだけ)
ここまでで、『Slack に承認可否を問うメッセージを送信』、『承認結果を受けて処理を分岐』、『承認結果でメッセージを上書き』までできたので、最後に実際にリソースをスケーリングするところ。
他 AWS アカウントのリソースをスケーリングする
ExecScalingEC2OrECS
ExecScalingEC2OrECS:
Type: Task
Resource: !GetAtt ExecScalingEC2AndECSLambda.Arn
End: true
これはただ対象リソースをスケーリングする Lambda を実行しているのみ。
触れておくべき事項としては、①パラメータの受け渡し ②他 AWS アカウントのリソース操作法の2点だろうか。
パラメータの受け渡し
ここまでで、コールバックタスクやら Lambda やら API Gateway やら複数リソースが登場しているが、Step Functions 内で次のステートへのパラメータの受け渡しは実は初めて。(コールバックタスクのところで Lambda に渡しているがちょっと例外的)
リソースのスケーリングにあたり必要なパラメータは、以下の通り一番初めにSlack から渡されたもののみ。
- 操作対象の AWS アカウント ID
- 対象 ECS or EC2 の情報
- (最小台数 and 最大台数) or 追加台数
また、スケーリングの実行結果を Slack に通知したいので、OAuth Tokens も必要となる。
OAuth Tokens は前段までの Lambda で既に使用しているので、ステートを通じて Lambda から Lambda へと渡してやる。
ここで先ほど以下のように説明した ResultPath が生きてくる。
そのステートの Lambda の出力結果を、ステートへの入力 JSON のどこに埋め込むかを決めるプロパティである。
うまく表現できないので詳しくは以下参考。
https://dev.classmethod.jp/articles/stepfunctions-parameters-inter-states/#ResultPath
しかし、コールバックタスクが使用されたステート内では、ResultPath の扱いが少し異なるように思える。
おそらく、コールバックタスクが使用されたステート内では、task token を返したリソースの出力結果をステートの出力結果として扱う。
試しに、コールバックタスクによって実行される Lambda(Slack に承認を問うメッセージを送信する) の中で適当に return してみても、ResultPath に含まれることはなく、あくまで task token を返す Lambda の出力結果を対象としている様子。
まず、ExecScalingEC2OrECS
ステートの前に位置するステートから OAuth Tokens は渡される。
前に位置するステートというのは、WaitForPermissionScalingAbsoluteValueECS
などのコールバックタスク。
つまり、ResultPath はこのステートの Lambda の出力結果を、ステートへの入力 JSON、つまりは Slack から渡されたパラメータにどのように付加するのかを決めるということになる。
そして、このステートの Lambda の出力結果というのは、『task token を返したリソースの出力結果』 を指す。
task token を Step Functions に返却する の項でソースを載せているが、output
の部分が出力結果となり、slack_bot_token
を出力している。(以下該当部抜粋)
if action == "Approve":
payload = {
"slack_bot_token": slack_bot_token
}
sfn.send_task_success(
taskToken = taskToken,
output = json.dumps(payload) # 次の state(ExecScalingEC2OrECS) に渡される
)
ここでは output
という変数に格納して出力している。
Step Functions 側では ResultPath: $.output
とすることで、ステートへの入力 JSON に output
というキーで Lambda の出力結果である payload
を埋め込んでいる。(以下該当部抜粋)
WaitForPermissionScalingAbsoluteValueECS:
Type: Task
Resource: arn:aws:states:::lambda:invoke.waitForTaskToken # task token が送られるまで待機するコールバックタスク
Parameters:
# 待機中にこの Lambda を実行し task token を返させる(この Lambda 自身は task token を返さない)
# PostScalingRequestToSlackLambda -> Slack -> API Gateway -> ReturnTaskTokenToStepFunctionsLambda(task token) -> Step Functions
FunctionName: !Ref PostScalingRequestToSlackLambda
Payload: # Slack から渡されたパラメータ + task token を PostScalingRequestToSlackLambda に渡す
TaskToken.$: $$.Task.Token
AWSAccountID.$: $.aws_account_id
ECSClusterName.$: $.ecs_cluster_name
ECSServiceName.$: $.ecs_service_name
Min.$: $.min
Max.$: $.max
ResultPath: $.payload # Slack から渡されたパラメータに ReturnTaskTokenToStepFunctionsLambda の出力結果(slack_bot_token, desired_count, max_capacity)を付加
すると ExecScalingEC2OrECS
ステートの Lambda では、 Slack から渡されたパラメータに加えて OAuth Tokens が受け取れるようになる。
他 AWS アカウントのリソース操作法
今回は一つのアカウントのリソースから複数アカウントのリソースをスケーリングする。
以下のように assume role して対象リソースの操作権限を取得し、スケーリングするための API を実行する。
この辺りは boto3 のドキュメント参照。
def assume_role(aws_account_id, ec2_or_ecs):
role_arn = f"arn:aws:iam::{aws_account_id}:role/Scaling{ec2_or_ecs}FromInfraAccountRole{camel_short_env}"
role_session_name = f"{ec2_or_ecs}ScalingFromInfraAccountSession"
try:
sts_client = boto3.client("sts")
response = sts_client.assume_role(
RoleArn=role_arn,
RoleSessionName=role_session_name,
DurationSeconds=900 # 指定できる最小値
)
credentials = response["Credentials"]
client_params = {
"aws_access_key_id": credentials["AccessKeyId"],
"aws_secret_access_key": credentials["SecretAccessKey"],
"aws_session_token": credentials["SessionToken"],
"region_name": region
}
return client_params
except ClientError as e:
logger.error(f"Failed to assume role: {str(e)}\n{traceback.format_exc()}")
raise
except Exception as e:
logger.error(f"Unexpected error: {str(e)}\n{traceback.format_exc()}")
raise
def ecs_scaling(aws_account_id, ecs_cluster_name, ecs_service_name, slack_bot_token, ec2_or_ecs, min, max):
ecs_service_console_url = f"https://{region}.console.aws.amazon.com/ecs/v2/clusters/{ecs_cluster_name}/services/{ecs_service_name}/tasks?region={region}"
client_params = assume_role(aws_account_id, ec2_or_ecs)
try:
app_autoscaling_client = boto3.client("application-autoscaling", **client_params)
app_autoscaling_client.register_scalable_target(
ServiceNamespace="ecs",
ScalableDimension="ecs:service:DesiredCount",
ResourceId=f"service/{ecs_cluster_name}/{ecs_service_name}",
MinCapacity=min,
MaxCapacity=max
)
logger.info(f"Scaling {ec2_or_ecs} succeeded!!!")
### 略 ###
以上で、Slack から AWS CLI を実行し EC2 or ECS をスケーリングすることができるようになった。
これでいつ障害が起きても、Slack が見られる環境さえあればスケーリングし放題となった。
ただしもちろん障害が起きないに越したことはない。。。