はじめに
複数のシステムの開発を行っていると、気が付くと様々なEventBridgeルールができてしまっていることがあります。今回はこれらルールを一元的に管理でき、それでいて運用も楽なソリューションができないかを検討し、実際に実装をしてみました。
目指す姿
今回一元的な管理をする方法としてSystems Manager Change Calendarを採用してみました。Change Calendarはカレンダー上に設定するカレンダーイベントのタイミングでカレンダーのステータス(OPEN/CLOSE)が変更され、その際にAWSのイベントを発するサービスです。発せられるAWSイベントはEventBridgeでキャッチすることができ、後続の処理へと続けることができます。
カレンダーはGUIで確認することができるので、一目でどのジョブがいつ実行されるのかを確認することができるようになります。
SSM Change Calendarの詳細については以下をご参照ください。
実装方針
処理フロー概要
# | 概要 | 要件 |
---|---|---|
⓪ | 事前準備 | ・Default:CLOSEのSSM Change Calendarを作成 ・EventBridgeルール(Cron設定済み)を作成 ・ルールにはタグ key:contlol ,value:ChangeCalenda を付与する |
① | 対象のEventBridgeのルールの情報を取得 | EventBridgeのルールで以下を満たしていること。 ・ステータスがActive ・タグ key:contlol ,value:ChangeCalendar が付与されていること・cronが設定されていること |
② | 対象のルールのcron値をDynamoDBに格納 | ・ルールに設定されているcron値をDynamoDBに格納する ・DynamoDBはパーティションキーにアカウントID, ソートキーにルール名を設定する ・AWSのcron形式で格納する |
③ | DynamoDB登録情報からcronのICS形式化 | ・DynamoDBを全件取得 ・カレンダーイベント名はルール名とする |
④ | ChangeCalendarへ反映 | ・作成したICS形式データを使用してChangeCalendarのカレンダーイベントに反映する |
⑤ | EventBridgeルールの修正 | ・ルールに付与されるスケジュールを削除 ・ChangeCalendarに#4で設定したカレンダーイベントで発せられるAWSイベントをトリガにできるようにイベントパターンを設定 |
①~② EventBridgeルールの取得~DynamoDB格納まで
④~⑤ ChangeCalendarへの反映、EventBridgeルールの修正
再掲になりますが、Change Calendarの操作については以下をご参照ください。
本設計のメリット
- 一元的にジョブの起動時間を確認することが可能
- 利用者はEventBridgeにルールを設定するのみでよく、登録オペレーションに大きな変更が生じない
- EventBridgeを介しているため、別アカウントのジョブ起動管理にも用途を広げることができる
実装
環境
項目 | 設定値 | 備考 |
---|---|---|
実行環境 | Lambda | |
ランタイム | python 3.12 | |
アーキテクチャ | x86_64 | |
Lambda レイヤー | pyawscron | EventBridgeのCronはAWS特有のため、AWS cron用のライブラリを使用する。 https://pypi.org/project/pyawscron/ |
icalendar | ChangeCalendar反映のためICS形式(icalendar形式)に変換するために使用する。 https://pypi.org/project/icalendar/ |
Lambdaに設定する実行ロール
サービス | 許可アクション | 備考 |
---|---|---|
Systems Manager | ssm:UpdateDocument | |
ssm:UpdateDocumentDefaultVersion | ||
ssm:GetDocument | デバッグ用 | |
ssm:GetCalendar | デバッグ用 | |
ssm:GetCalendarState | デバッグ用 | |
DynamoDB | dynamodb:PutItem | |
dynamodb:DeleteItem | ||
dynamodb:Scan | ||
dynamodb:GetItem | デバッグ用 | |
EventBridge | events:ListRules | |
events:ListTagsForResource | ListRulesではタグが取得できないため必要。 | |
events:PutRule | ||
events:TagResource | PutRuleでタグを一緒に登録するときに必要。 |
※デフォルトのロールは省略
その他Lambdaの設定修正点
- タイムアウト値のデフォルトが3秒になっているため値を大きくする
- (検証では5秒あれば十分でした。要件に合わせて調整してください)
実装コード
コード
import os
import boto3
import datetime
from pyawscron import AWSCron
import eventbridge
import dynamo
import utils
import ssm
# メイン処理
def lambda_handler(event, context):
# EventBridgeルールの抽出
rules = eventbridge.get_rules()
print(rules)
# ルールから保存するcron情報を取得
cron_expressions = utils.extract_cron_from_rules(rules)
# DynamoDBに保管
dynamo.put_cron_expressions(cron_expressions)
# DynamoDBに保管されている情報を全て取得
all_cron_expressions = dynamo.get_cron_expressions()
print(all_cron_expressions)
# cronからicsファイルを作成
calendar = utils.create_ics_calendar(all_cron_expressions)
# ChangeCalendarに反映
ssm.register_to_change_calendar(calendar)
# 対象のルールをイベントタイプに編集
eventbridge.update_rule(rules)
return {
'statusCode': 200,
'body': 'Success'
}
import boto3
import os
import json
# EventBridge用クライアント
client = boto3.client('events', region_name='ap-northeast-1')
def get_rules():
# Eventbridgeのルールを全件取得
all_rules = client.list_rules()["Rules"]
target_rutes = []
for rule in all_rules:
# タグの取得とターゲットのタグが存在するか確認
tags, is_target_tag = get_tags(rule['Arn'])
# ステータスがENABLEかつスケジュールが設定されているかつ指定のタグが付いているか確認
if rule["State"] == "ENABLED" and 'ScheduleExpression' in rule.keys() and is_target_tag:
rule["Tags"] = tags
target_rutes.append(rule)
return target_rutes
def get_tags(rule_arn):
# Arnに紐づくタグを取得
tags = client.list_tags_for_resource(ResourceARN=rule_arn)["Tags"]
print("tag : " + str(tags))
# 以下に一致するタグが含まれているかを確認
key = os.getenv("TAG_KEY")
value = os.getenv("TAG_VALUE")
# 各タグをチェックして、指定されたキーと値が存在するか確認
for tag in tags:
if tag.get('Key') == key and tag.get('Value') == value:
return tags, True
# 指定されたキーと値が見つからなかった場合、Falseを返す
return tags, False
# ルール名をキーにルールがあるかどうか確認用メソッド
def is_rule_exsist(name):
rules = client.list_rules(NamePrefix=name)['Rules']
if len(rules) > 0 and rules[0]['State'] == "ENABLED":
return True
return False
# ルールの更新
def update_rule(rules):
for rule in rules:
# イベントパターンを定義
event_pattern = {
"source": ["aws.ssm"],
"detail-type": ["Calendar State Change"],
"resources": [f"arn:aws:ssm:ap-northeast-1:{os.getenv("ACCOUNT_ID")}:document/{os.getenv("CALENDAR_NAME")}"],
"detail": {
"state": ["OPEN"],
"event": [rule["Name"]]
}
}
# ルールの更新
client.put_rule(
Name=rule["Name"],
EventPattern=json.dumps(event_pattern, ensure_ascii=False, indent=2),
State=rule['State'],
Description=rule['Description'] if "Description" in rule else "",
Tags=rule['Tags'],
EventBusName=rule['EventBusName']
)
import os
import re
import time
import uuid
import datetime
from pyawscron import AWSCron
from icalendar import Calendar, Event
# 基準日を設定
today = datetime.datetime.today()
start_date = datetime.datetime(today.year, today.month, today.day, 0, 0, tzinfo=datetime.timezone.utc)
end_date = datetime.datetime(today.year, today.month + int(os.getenv("MONTH")), today.day + 1, 0, 0, tzinfo=datetime.timezone.utc)
# cronを抽出する
def extract_cron_from_rules(rules):
cron_expressions = []
for rule in rules:
name = rule
cron_expression = rule["ScheduleExpression"].replace("cron(", "").replace(")", "")
# ScheduleExpressionは"cron(....)"の文字列が含まれるためreplaceを行う
cron_expression_dict = {
"account" : rule['Arn'].split(":")[4],
"name" : rule['Name'],
"aws_cron" : rule["ScheduleExpression"].replace("cron(", "").replace(")", "")
}
cron_expressions.append(cron_expression_dict)
return cron_expressions
def create_ics_calendar(cron_expressions):
# aws_cronから実行日時を取得。ディクショナリにルール名と実行日時を格納。
event_list = get_event_list(cron_expressions)
# ICS形式のカレンダーデータを作成
calendar = create_ics_calendar_with(event_list)
print(calendar)
return calendar
def get_event_list(cron_expressions):
event_list = []
for cron_expression in cron_expressions:
name = cron_expression['name']
aws_cron = cron_expression['aws_cron']
# AWSのCron形式からdatetimeのリストを取得
# 繰り返しがあるためリストで受け取る
event_datetime_list = convert_awscron_to_datetime_list(aws_cron)
print(event_datetime_list)
for event_datetime in event_datetime_list:
# 終了時刻は開始時刻から5分後
end_datetime = event_datetime + datetime.timedelta(minutes=5)
# UIDを生成
uid = get_uid()
event_list.append({"summary": name, "dtstart": event_datetime, "dtend": end_datetime, "uid": uuid})
return event_list
def convert_awscron_to_datetime_list(aws_cron_str):
datetime_list = []
aws_cron = AWSCron(aws_cron_str)
dt = start_date
while True:
# occurrence関数、next関数の組み合わせで、指定した最終日までの実行日を抽出する
dt = aws_cron.occurrence(dt).next()
if dt > end_date:
break
datetime_list.append(dt)
return datetime_list
def get_uid():
# タイムスタンプを取得
timestamp = time.time()
# UUID4を生成
random_uuid = uuid.uuid4()
# タイムスタンプとUUID4を組み合わせてUIDを生成
uid = f"{int(timestamp * 1e9)}-{random_uuid}"
return uid
# ICS形式のファイルを作成
def create_ics_calendar_with(event_list):
cal = Calendar()
cal.add('prodid', '-//AWS//Change Calendar 1.0//EN')
cal.add('version', '2.0')
for event_dict in event_list:
event = Event()
event.add('summary', event_dict["summary"])
event.add('dtstart', event_dict["dtstart"])
event.add('dtend', event_dict["dtend"])
event.add('uid', event_dict["uid"])
cal.add_component(event)
return cal
import os
import boto3
import eventbridge
# DynamoDB用クライアント
client = boto3.client('dynamodb', region_name='ap-northeast-1')
table_name = os.getenv("TABLE")
# 各要素をDynamoDBに反映
def put_cron_expressions(cron_expressions):
for cron_expression in cron_expressions:
item = {
'account': {'S': cron_expression['account']},
'name': {'S': cron_expression['name']},
'aws_cron': {'S': cron_expression["aws_cron"]},
}
try:
# アイテムをテーブルに書き込む
client.put_item(
TableName = table_name,
Item = item
)
print("Item written successfully. : " + str(item))
except Exception as e:
print(f"An error occurred: {e}")
# 全件取得
def get_cron_expressions():
try:
items = client.scan(TableName=table_name)['Items']
except Exception as e:
print(f"An error occurred: {e}")
cron_expressions = []
for item in items:
# 取得した情報から現在もEventbridgeのデータがあるのか確認
if eventbridge.is_rule_exsist(item['name']['S']):
cron_expression_dict = {
"account" : item['account']['S'],
"name" : item['name']['S'],
"aws_cron" : item['aws_cron']['S']
}
cron_expressions.append(cron_expression_dict)
else:
# 存在しない場合は削除されたものとみなしてDynamoDB上からも削除する
delete_cron_expression(item)
return cron_expressions
def delete_cron_expression(item):
key = {
"account" : {
"S": item['account']['S']
},
"name" : {
"S": item['name']['S']
}
}
try:
# アイテムをテーブルから削除
response = dynamodb_client.delete_item(
TableName=table_name,
Key=key
)
print("Item deleted successfully")
except Exception as e:
print(f"An error occurred: {e}")
import boto3
import os
# SSM用クライアント
client = boto3.client("ssm", region_name='ap-northeast-1')
# ICS形式のカレンダーをChangeCalendarに登録
def register_to_change_calendar(calendar):
calendar_name = os.getenv("CALENDAR_NAME")
try:
# calendar.to_ical()のみではバイナリ形式のため、decode()関数でstringに変換する
response = client.update_document(
Name = calendar_name,
DocumentVersion = "$LATEST",
Content = calendar.to_ical().decode()
)
# UpdateDocumentのレスポンスにLatesVersionが含まれている
# それを取得してデフォルトバージョンとして登録
document_version = response['DocumentDescription']['LatestVersion']
response = client.update_document_default_version(
Name = calendar_name,
DocumentVersion = document_version
)
except client.exceptions.DuplicateDocumentContent as e:
# 更新がない場合にはDuplicateDocumentContentの例外が発生する。
print("No Change")
except Exception as e:
print(f"An error occurred: {e}")
環境変数
環境変数名 | 用途・設定値 |
---|---|
ACCOUNT_ID | ChangeCalendar作成アカウントのID |
CALENDAR_NAME | 作成したカレンダー名 |
MONTH | イベント作成範囲(単位:月) |
TABLE | DynamoDBのテーブル |
TAG_KEY | EventBridgeに付与されるタグのキー名control を設定 |
TAG_VALUE | EventBridgeに付与されるタグのバリュー名ChangeCalendar を設定 |
動作確認
準備
実行されたかどうかを確認するため、今回はイベントをCloudWatchに連携してみます。
実行確認
実行日が2024/6/21なので1か月後(7/21)までの毎週日曜日9:00に設定されていれば成功です。
ChangeClaendar
指定日までの日程を正しく設定されています。
カレンダーイベントの詳細を見てみると、タイトルがEventBridgeルール名と一致していることが確認できます。
EventBridge
スケジュールが削除され、イベントパターンが自動で設定されました。こちらの設定でカレンダーイベントが実行されるとイベントをキャッチすることができます。
DynamoDB
DynamoDBにも格納されていることがわかりました。
おまけ
EventBridgeルールを2つ用意して、日曜の9:00にTestRuleForChangeCalendar、水曜の9:00にTestRuleForChangeCalendar02が実行されるようにスケジュールをそれぞれ設定しました。
Lambdaを実行するとChangeCalendarには正しく登録が行われることを確認しました。
このあとにEventBridgeルールを消すと正しくChangeCalendarに反映されることも確認できました。
おわりに
様々なジョブがEventBridgeで管理されている場合には有効な策かと思います。
ぜひご参考ください。