本記事は オルトプラス Advent Calendar 2023 の12/02の記事です。
はじめに
こんにちは。オルトプラスSREの高場です。
普段はAWSでのインフラ構築・運用をメインで行っています。
趣味は漫画収集です(最近のマイブームは佐藤二葉『アンナ・コムネナ』)。
今回はStep functionsの記事になります。
Step Functionsは、一般的にはGlueを用いたETLやAWSサービス間の連携でよく使われると思います。
こういった使い方が適しているかわからないのですが、一つのケースとしてご紹介させていただきます。
概要
私が担当しているプロダクトではECS Fargateを採用しており、開発環境は作業がない夜間時間帯では自動停止させ、毎朝に起動処理を行っています。
自動停止・起動自体は何度かテストを行い、問題なく運用していましたが、あるとき、サービスのの起動時にエラーが出るという問題が発生しました。
Fargateクラスター内の構成は下記のようになっています。
- 同一クラスター内にサービスA~Eがある
- サービスBはサービスAに依存しており、サービスAが起動していないとエラーが出る
現状の仕組みでは、LambdaでdesiredCountを一括指定しています。
そのため、起動順を制御することができません。
普段あまりコードを書かない自分でも実装できる方法を探したところ、Step Functionsが候補に挙がりました。
理由としては、比較的簡単に実装できることと、工数もかからなさそうだったためです。
下記は実際のStepfunctionsの定義を使いながら詳細をご紹介します。
詳細
IAMロールの作成
管理はTerraformで行います。
まず、Stepfunctionsの実行に必要なIAMロールを作成します。
templatefile()
で呼び出したいLambdaのARNを指定しています。
このLambdaは、現在自動停止・起動に使っているLambdaのARNです。
resource "aws_iam_role" "sfn" {
name = local.sfn_role_name
assume_role_policy = file("../policies/assume/sfn.json")
managed_policy_arns = [
aws_iam_policy.sfn.arn,
local.ecs_controller_policy
]
}
resource "aws_iam_policy" "sfn" {
name = local.sfn_policy_name
policy = templatefile("../policies/role/sfn_role.json", {
controller_arn = local.ecs_controller_arn
topic_arn = local.sns_topic_arn
})
}
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"${controller_arn}:*",
"${controller_arn}"
]
},
{
"Effect": "Allow",
"Action": [
"xray:PutTraceSegments",
"xray:PutTelemetryRecords",
"xray:GetSamplingRules",
"xray:GetSamplingTargets"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"sns:Publish"
],
"Resource": [
"${topic_arn}"
]
}
]
}
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "states.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
Statemachineの作成
次にStatemachineの記述を行います。
まずはStatemachineと呼ばれる実行単位を作成します。
resource "aws_sfn_state_machine" "this" {
name = local.statemachin_name
role_arn = aws_iam_role.sfn.arn
definition = templatefile("${path.module}/files/sfn_definition.json", {
service = var.tags.service,
env = var.tags.env
})
}
definition
で指定しているjsonファイルが定義の本体です。
service
にはプロダクト名、env
には環境名が入ります。
definitionの作成
まず全体の概要を確認します。
definitionは次のような内容になっています。
- まずAを起動し、ステータスをwaitする
- その後、Aに依存するBを起動して再度ステータスを取得
- その後、C~Eを一括で起動
- 起動結果はLambdaからSlackチャネルに通知
1つずつ見ていきます。
①サービスAの起動
"DescribeFirstServices": {
"Type": "Task",
"Parameters": {
"Services": [
"${service}-${env}-serviceA"
],
"Cluster": "${service}-${env}-cluster"
},
"Resource": "arn:aws:states:::aws-sdk:ecs:describeServices",
"Next": "IsStartedFirstService"
},
"IsStartedFirstService": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.Services[0].RunningCount",
"NumericGreaterThanEquals": 1,
"Next": "NoticeFirstServiceUpdateResult"
},
{
"Variable": "$.Services[0].RunningCount",
"NumericLessThanEquals": 0,
"Next": "UpdateFirstService"
}
],
"Default": "WaitFirstServiceStart"
},
"UpdateFirstService": {
"Type": "Task",
"Parameters": {
"Cluster": "${service}-${env}-cluster",
"Service": "${service}-${env}-serviceA",
"DesiredCount": 1
},
"Resource": "arn:aws:states:::aws-sdk:ecs:updateService",
"Retry": [
{
"ErrorEquals": [
"States.TaskFailed"
],
"BackoffRate": 1,
"IntervalSeconds": 10,
"MaxAttempts": 2
}
],
"Next": "WaitFirstServiceStart"
},
"WaitFirstServiceStart": {
"Type": "Wait",
"Seconds": 10,
"Next": "DescribeFirstServices"
},
"NoticeFirstServiceUpdateResult": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:ap-northeast-1:${account_id}:${service}-${env}-service-start-notice",
"Message.$": "$"
},
"Next": "DescribeSecondServices",
"InputPath": "$.Services[0]"
}
StepFunctionsでは各ステップ間で値の受け渡しをすることができます。
受け渡しはjsonPath
を使います。
json全体を$
で表現して、ドット記法でプロパティを取得できます。
inputPathでは、次のステップに必要なプロパティのみを抽出して送ることができます。
この例ではNoticeFirstServiceUpdateResult
が受け取る結果をServices[0]
に制限しています。
②サービスBの起動
サービスAと同様です。
"DescribeSecondServices": {
"Type": "Task",
"Parameters": {
"Services": [
"${service}-${env}-serviceB"
],
"Cluster": "${service}-${env}-cluster"
},
"Resource": "arn:aws:states:::aws-sdk:ecs:describeServices",
"Next": "IsStartedSecondService"
},
"IsStartedSecondService": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.Services[0].RunningCount",
"NumericGreaterThanEquals": 1,
"Next": "NoticeSecondServicesUpdateResult"
},
{
"Variable": "$.Services[0].RunningCount",
"NumericLessThanEquals": 0,
"Next": "UpdateSecondService"
}
],
"Default": "WaitSecondserviceStart"
},
"UpdateSecondService": {
"Type": "Task",
"Parameters": {
"Cluster": "${service}-${env}-cluster",
"Service": "${service}-${env}-serviceB",
"DesiredCount": 3
},
"Resource": "arn:aws:states:::aws-sdk:ecs:updateService",
"Retry": [
{
"ErrorEquals": [
"States.TaskFailed"
],
"BackoffRate": 1,
"IntervalSeconds": 10,
"MaxAttempts": 2
}
],
"Next": "WaitSecondserviceStart"
},
③その後、C~Eを一括で起動
"NoticeSecondServicesUpdateResult": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:ap-northeast-1:${account_id}:${service}-${env}-service-start-notice",
"Message.$": "$"
},
"Next": "DescribeOtherServices",
"InputPath": "$.Services[0]"
},
"DescribeOtherServices": {
"Type": "Task",
"Parameters": {
"Services": [
"${service}-${env}-serviceC"
],
"Cluster": "${service}-${env}-cluster"
},
"Resource": "arn:aws:states:::aws-sdk:ecs:describeServices",
"Next": "Choice"
},
"Choice": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.Services[0].RunningCount",
"NumericLessThanEquals": 0,
"Next": "RunOtherServices"
},
{
"Variable": "$.Services[0].RunningCount",
"NumericGreaterThanEquals": 1,
"Next": "Pass"
}
],
"Default": "Pass"
},
"Pass": {
"Type": "Pass",
"End": true
},
"RunOtherServices": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "arn:aws:lambda:ap-northeast-1:${account_id}:function:${service}-ecs-controller:$LATEST",
"Payload": {
"targets": [
{
"cluster": "${service}-${env}-cluster",
"service": "${service}-${env}-serviceB",
"desiredCount": 1
},
{
"cluster": "${service}-${env}-cluster",
"service": "${service}-${env}-serviceC",
"desiredCount": 1
},
{
"cluster": "${service}-${env}-cluster",
"service": "${service}-${env}-serviceD",
"desiredCount": 7
},
{
"cluster": "${service}-${env}-cluster",
"service": "${service}-${env}-serviceE",
"desiredCount": 1
}
]
}
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"End": true
},
"WaitSecondserviceStart": {
"Type": "Wait",
"Seconds": 10,
"Next": "DescribeSecondServices"
}
},
"TimeoutSeconds": 300
上記をapplyして実際のAWS環境上へStatemachineを作成します。
④LambdaでSlackへ通知
最後にLambdaを設定します。
Slackへの通知はChatbotが簡単なのですが、柔軟にカスタムメッセージが設定できないのでLambdaでカスタムメッセージを送信します。
resource "aws_lambda_function" "slack_notice" {
architectures = [
"arm64",
]
function_name = "${var.tags.service}-slack-notice"
handler = "lambda_function.lambda_handler"
layers = []
memory_size = 128
package_type = "Zip"
reserved_concurrent_executions = -1
role = aws_iam_role.slack_notice.arn
runtime = "python3.9"
filename = data.archive_file.lambda_function.output_path
source_code_hash = data.archive_file.lambda_function.output_base64sha256
tags = var.tags
timeout = 60
tracing_config {
mode = "PassThrough"
}
}
data "archive_file" "lambda_function" {
type = "zip"
source_file = "functions/lambda_function.py"
output_path = "archive/lambda_function.zip"
}
カスタムメッセージ内では、下記の情報を表示します。
- 起動サービス名
- desiredCount
- RunningCount
- FailedTasks
import json
import urllib.request
import os
print('Loading function')
def lambda_handler(event, context):
message = event['Records'][0]['Sns']['Message']
# print("From SNS: " + message)
message = json.loads(message)
send_to_slack(message)
def send_to_slack(message):
events = message['Events'][0]
deployment = message['Deployments'][0]
serviceName = f"""
{message['ServiceName']}
"""
desiredCount = f"""
{deployment['DesiredCount']}
"""
failedTasks = f"""
{deployment['FailedTasks']}
"""
RunningCount = f"""
{deployment['RunningCount']}
"""
msg = f"""
* Id: *{events['Id']}*
* Event Created: *{events['CreatedAt']}*
* Message: *{events['Message']}*
"""
url = os.environ['WEBHOOK_URL']
send_data = {
"username": "ECS Auto Start Notification",
"icon_emoji": ":memo:",
"attachments": [
{
"title": f"{serviceName}",
"fields": [
{
"title": "desiredCount",
"value": f"{desiredCount}",
"short": "false"
},
{
"title": "failedTasks",
"value": f"{failedTasks}",
"short": "false"
},
{
"title": "RunningCount",
"value": f"{RunningCount}",
"short": "false"
}
],
"text": f"{msg}",
"mrkdwn_in": [
"text"
],
"color": "#4169e1"
}
]
}
send_text = "payload=" + json.dumps(send_data)
req = urllib.request.Request(
url,
data=send_text.encode("utf-8"),
method="POST"
)
with urllib.request.urlopen(req) as res:
res_body = res.read().decode("utf-8")
print(res_body)
こちらもterraformでapplyします。
実行
Eventbridgeで指定した時間になると、Stepfunctionsが起動してServiceが順番に起動されます。
起動に成功すると下記のようなメッセージがチャネルに投稿されます。
※2023/09のアップデートでEventbridgeの入力トランスフォーマーを経由することでカスタムメッセージの送信が可能になりました!詳細は下記をご覧ください。
まとめ
Step functionsを使って、ECS Fargateのサービス起動状況を監視しながら起動順を制御することができました!
その結果をSlackに通知してカスタムメッセージで受け取ることもできるので、失敗してもすぐに気づくことができました。
改良点は山ほどあると思うので、次のプロダクト運用に生かしていければと思います!