どもども、ランサーズでSREしているogimanです。
いやー暑いですねー溶けそうです。なんなら、既に一部溶けてるかもしれないです・・・
さて、今回は便利スクリプトの紹介です!
生成AIが当たり前になったこの世の中で、生成AI駆使して、さくっと作りました。
皆さん感じているとおり、生成AIはエンジニアにとって最高のパートナーとなりました。
本当に感謝感謝〜
では、やってきましょう
結論
24H365D、動いていた検証環境のECSを夜間止めることでコストを30%削減しましたよ
※24H365D動いていたのかぃ!というツッコミはお控えくださいw
誰向け
- AWSを使う方
- ECS使っている方
目指すところ
システムでは各種ツールやクラウドを駆使して最適なシステムを実現するが、
一方でコストとの戦いとも向き合わないと・・・
右肩上がりのコストをどうやって抑え込むか?そして抑え込んだ上でどう最高のシステムにするのか?
そして、ユーザーに提供する価値を最大化できるのか?
そんな日々ですね。
対応したこと
AWSコストがマジで右肩上がり・・・
そこで前からずっと対応したかった、検証環境の夜間停止。
夜間は誰も使っていないのに動き続けているシステムはマジで無駄
まずはECSのサービスから着手。
使ったツール
- Windsurf
プロンプト
当時のプロンプトが見つからない・・・汗
ただ、このくらいのコードなら以下のプロンプトで作成できると思います。
必要に応じてカスタマイズしていただければ!
EventBridgeからのイベントを受け取り、複数のECSサービスを一括で起動/停止するLambda関数をPythonで作成してください。
要件:
- actionパラメータで"start"または"stop"を指定
- targetsパラメータで複数のクラスター/サービスを指定可能
- startの場合はstart_countでタスク数を指定可能(デフォルト1)
- 適切なエラーハンドリングとログ出力
- 処理結果を成功/失敗で分けて返却
イベント形式:
{
"action": "start" | "stop",
"targets": [
{ "cluster": "cluster-name", "services": ["service-a", "service-b"], "start_count": 1 }
]
}
実際のコード
以下の感じでLambdaにコード作成すれば解決できまっすー
Eventbridgeから起動したい・停止したい時間を指定すればOK
複数のクラスターにも対応しているので便利よ
"""
EventBridgeからのイベントを受け取り、ECSサービスを起動/停止する、Lambda関数ハンドラー。
想定されるイベント形式:
{
"action": "start" | "stop",
"targets": [
{ "cluster": "cluster-name-1", "services": ["service-a", "service-b"], "start_count": 1 }, // Optional start_count
{ "cluster": "cluster-name-2", "services": ["service-c"] }
]
}
"""
import boto3
import logging
import os
import json
# ロギング設定 (Lambda環境では自動的にCloudWatch Logsに出力される)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# AWS リージョンを指定してクライアントを初期化
ecs_client = boto3.client('ecs', region_name='リージョン名入れて!例:ap-northeast-1')
def lambda_handler(event, context):
logger.info("Received event: %s", json.dumps(event))
action = event.get('action')
targets = event.get('targets')
# --- 入力バリデーション ---
if not action or action not in ['start', 'stop']:
logger.error("Invalid or missing 'action'. Must be 'start' or 'stop'.")
return {'statusCode': 400, 'body': json.dumps({"error": "Invalid or missing 'action'."})}
if not targets or not isinstance(targets, list):
logger.error("Invalid or missing 'targets'. Must be a list of cluster/service definitions.")
return {'statusCode': 400, 'body': json.dumps({"error": "Invalid or missing 'targets'."})}
results = {"success": [], "failed": []}
processed_count = 0
failed_count = 0
# --- ターゲットごとに処理 ---
for target in targets:
cluster_name = target.get('cluster')
service_names = target.get('services')
# startアクションの場合、オプションで開始時のタスク数を指定可能 (デフォルトは1)
start_count = target.get('start_count', 1) if action == 'start' else 0
if not cluster_name or not isinstance(service_names, list) or not service_names:
logger.warning("Skipping invalid target definition (missing cluster or services): %s", target)
results["failed"].append({"target": target, "reason": "Invalid format (missing cluster or services)"})
failed_count += 1 # ターゲット定義自体が不正な場合もカウント
continue
# --- サービスごとに処理 ---
for service_name in service_names:
processed_count += 1
item = {"cluster": cluster_name, "service": service_name}
desired_count = start_count if action == 'start' else 0
success = update_service_desired_count(cluster_name, service_name, desired_count)
if success:
results["success"].append(item)
else:
results["failed"].append(item)
failed_count += 1
# --- 結果の集計とレスポンス ---
status_code = 200 if failed_count == 0 else 500 # 1つでも失敗したら500を返す
total_processed = len(results["success"]) + len(results["failed"])
response_body = {
"message": "Processed %s request for %s services." % (action, total_processed),
"success_count": len(results["success"]),
"failed_count": failed_count,
"details": results
}
if failed_count > 0:
logger.error("Finished processing with %s failures. Details: %s", failed_count, json.dumps(results['failed']))
else:
logger.info("Successfully processed all %s services.", total_processed)
return {
'statusCode': status_code,
'body': json.dumps(response_body)
}
def update_service_desired_count(cluster_name, service_name, desired_count):
# 指定されたECSサービスのdesiredCountを更新する
action = "start" if desired_count > 0 else "stop"
log_prefix = "[Cluster: %s, Service: %s]" % (cluster_name, service_name)
logger.info("%s Attempting to %s service (set desiredCount to %s)...", log_prefix, action, desired_count)
try:
response = ecs_client.update_service(
cluster=cluster_name,
service=service_name,
desiredCount=desired_count
# forceNewDeployment=True # 必要に応じてデプロイ強制フラグを設定
)
logger.info("%s Successfully initiated %s for service. Response: %s", log_prefix, action, response.get('service', {}).get('serviceArn'))
return True
except ecs_client.exceptions.ServiceNotFoundException:
logger.error("%s Service not found.", log_prefix)
return False
except ecs_client.exceptions.ClusterNotFoundException:
logger.error("%s Cluster not found.", log_prefix)
# このクラスターの他のサービス処理をスキップするために例外を再raiseすることも検討可能
return False
except ecs_client.exceptions.InvalidParameterException as e:
logger.error("%s Invalid parameter for update_service: %s", log_prefix, str(e))
return False
except Exception as e:
logger.error("%s Error updating service desired count: %s", log_prefix, str(e))
return False
最後に
今回はECSですが、Aurora(RDS)も同様に対応しているので、どっかで投稿したいと思います!
記録を残すことで同じ事象にハマった人の役に立ちますように・・・
この記事が少しでも参考になったら、 お願いします!
励みになります♪
以上です!
暑いので体調にくれぐれもお気をつけてー