1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS SSM Change CalendarでEventBridgeルールの一元起動管理

Last updated at Posted at 2024-06-21

はじめに

複数のシステムの開発を行っていると、気が付くと様々なEventBridgeルールができてしまっていることがあります。今回はこれらルールを一元的に管理でき、それでいて運用も楽なソリューションができないかを検討し、実際に実装をしてみました。

image.png

目指す姿

今回一元的な管理をする方法としてSystems Manager Change Calendarを採用してみました。Change Calendarはカレンダー上に設定するカレンダーイベントのタイミングでカレンダーのステータス(OPEN/CLOSE)が変更され、その際にAWSのイベントを発するサービスです。発せられるAWSイベントはEventBridgeでキャッチすることができ、後続の処理へと続けることができます。

カレンダーはGUIで確認することができるので、一目でどのジョブがいつ実行されるのかを確認することができるようになります。

SSM Change Calendarの詳細については以下をご参照ください。

image.png

実装方針

処理フロー概要

# 概要 要件
事前準備 ・Default:CLOSEのSSM Change Calendarを作成
・EventBridgeルール(Cron設定済み)を作成
・ルールにはタグkey:contlol,value:ChangeCalendaを付与する
対象のEventBridgeのルールの情報を取得 EventBridgeのルールで以下を満たしていること。
・ステータスがActive
・タグkey:contlol,value:ChangeCalendarが付与されていること
・cronが設定されていること
対象のルールのcron値をDynamoDBに格納 ・ルールに設定されているcron値をDynamoDBに格納する
・DynamoDBはパーティションキーにアカウントID, ソートキーにルール名を設定する
・AWSのcron形式で格納する
DynamoDB登録情報からcronのICS形式化 ・DynamoDBを全件取得
・カレンダーイベント名はルール名とする
ChangeCalendarへ反映 ・作成したICS形式データを使用してChangeCalendarのカレンダーイベントに反映する
EventBridgeルールの修正 ・ルールに付与されるスケジュールを削除
・ChangeCalendarに#4で設定したカレンダーイベントで発せられるAWSイベントをトリガにできるようにイベントパターンを設定

⓪事前準備
image.png

①~② EventBridgeルールの取得~DynamoDB格納まで
image.png

③ cronのICS形式化
image.png

④~⑤ ChangeCalendarへの反映、EventBridgeルールの修正

image.png

再掲になりますが、Change Calendarの操作については以下をご参照ください。

本設計のメリット

  • 一元的にジョブの起動時間を確認することが可能
  • 利用者はEventBridgeにルールを設定するのみでよく、登録オペレーションに大きな変更が生じない
  • EventBridgeを介しているため、別アカウントのジョブ起動管理にも用途を広げることができる

実装

環境

項目 設定値 備考
実行環境 Lambda
ランタイム python 3.12
アーキテクチャ x86_64
Lambda レイヤー pyawscron EventBridgeのCronはAWS特有のため、AWS cron用のライブラリを使用する。
https://pypi.org/project/pyawscron/
icalendar ChangeCalendar反映のためICS形式(icalendar形式)に変換するために使用する。
https://pypi.org/project/icalendar/

Lambdaに設定する実行ロール

サービス  許可アクション 備考
Systems Manager ssm:UpdateDocument
ssm:UpdateDocumentDefaultVersion
ssm:GetDocument デバッグ用
ssm:GetCalendar デバッグ用
ssm:GetCalendarState デバッグ用
DynamoDB dynamodb:PutItem
dynamodb:DeleteItem
dynamodb:Scan
dynamodb:GetItem デバッグ用
EventBridge events:ListRules
events:ListTagsForResource ListRulesではタグが取得できないため必要。
events:PutRule
events:TagResource PutRuleでタグを一緒に登録するときに必要。

※デフォルトのロールは省略

その他Lambdaの設定修正点

  • タイムアウト値のデフォルトが3秒になっているため値を大きくする
    • (検証では5秒あれば十分でした。要件に合わせて調整してください)

実装コード

コード
lumbda_function.py
import os
import boto3
import datetime
from pyawscron import AWSCron

import eventbridge
import dynamo
import utils
import ssm

# メイン処理
def lambda_handler(event, context):
    # EventBridgeルールの抽出
    rules = eventbridge.get_rules()
    print(rules)
    
    # ルールから保存するcron情報を取得
    cron_expressions = utils.extract_cron_from_rules(rules)
    
    # DynamoDBに保管
    dynamo.put_cron_expressions(cron_expressions)
    
    # DynamoDBに保管されている情報を全て取得
    all_cron_expressions = dynamo.get_cron_expressions()
    print(all_cron_expressions)
    
    # cronからicsファイルを作成
    calendar = utils.create_ics_calendar(all_cron_expressions)

    # ChangeCalendarに反映
    ssm.register_to_change_calendar(calendar)
    
    # 対象のルールをイベントタイプに編集
    eventbridge.update_rule(rules)
    
    
    return {
        'statusCode': 200,
        'body': 'Success'
    }
eventbridge.py
import boto3
import os
import json

# EventBridge用クライアント
client = boto3.client('events', region_name='ap-northeast-1')

def get_rules():
    # Eventbridgeのルールを全件取得
    all_rules = client.list_rules()["Rules"]
    target_rutes = []
    
    for rule in all_rules:
        # タグの取得とターゲットのタグが存在するか確認
        tags, is_target_tag = get_tags(rule['Arn'])
        
        # ステータスがENABLEかつスケジュールが設定されているかつ指定のタグが付いているか確認
        if rule["State"] == "ENABLED" and 'ScheduleExpression' in rule.keys() and is_target_tag:
            rule["Tags"] = tags
            target_rutes.append(rule)
    
    return target_rutes

def get_tags(rule_arn):
    # Arnに紐づくタグを取得
    tags = client.list_tags_for_resource(ResourceARN=rule_arn)["Tags"]
    print("tag : " + str(tags))
    
    # 以下に一致するタグが含まれているかを確認
    key = os.getenv("TAG_KEY")
    value = os.getenv("TAG_VALUE")

    # 各タグをチェックして、指定されたキーと値が存在するか確認
    for tag in tags:
        if tag.get('Key') == key and tag.get('Value') == value:
            return tags, True
    
    # 指定されたキーと値が見つからなかった場合、Falseを返す
    return tags, False

# ルール名をキーにルールがあるかどうか確認用メソッド
def is_rule_exsist(name):
    rules = client.list_rules(NamePrefix=name)['Rules']
    if len(rules) > 0 and rules[0]['State'] == "ENABLED":
        return True
    
    return False

# ルールの更新
def update_rule(rules):
    for rule in rules:
        # イベントパターンを定義
        event_pattern = {
            "source": ["aws.ssm"],
            "detail-type": ["Calendar State Change"],
            "resources": [f"arn:aws:ssm:ap-northeast-1:{os.getenv("ACCOUNT_ID")}:document/{os.getenv("CALENDAR_NAME")}"],
            "detail": {
                "state": ["OPEN"],
                "event": [rule["Name"]]
            }
        }
        
         # ルールの更新
        client.put_rule(
            Name=rule["Name"],
            EventPattern=json.dumps(event_pattern, ensure_ascii=False, indent=2),
            State=rule['State'],
            Description=rule['Description'] if "Description" in rule else "",
            Tags=rule['Tags'],
            EventBusName=rule['EventBusName']
        )
utils.py
import os
import re
import time
import uuid
import datetime
from pyawscron import AWSCron
from icalendar import Calendar, Event


# 基準日を設定
today = datetime.datetime.today()
start_date = datetime.datetime(today.year, today.month, today.day, 0, 0, tzinfo=datetime.timezone.utc)
end_date = datetime.datetime(today.year, today.month + int(os.getenv("MONTH")), today.day + 1, 0, 0, tzinfo=datetime.timezone.utc)

# cronを抽出する
def extract_cron_from_rules(rules):
    cron_expressions = []
    for rule in rules:
        name = rule
        cron_expression = rule["ScheduleExpression"].replace("cron(", "").replace(")", "")
        # ScheduleExpressionは"cron(....)"の文字列が含まれるためreplaceを行う
        cron_expression_dict = {
            "account" : rule['Arn'].split(":")[4],
            "name" : rule['Name'],
            "aws_cron" : rule["ScheduleExpression"].replace("cron(", "").replace(")", "")
        }
        
        cron_expressions.append(cron_expression_dict)
    
    return cron_expressions
        

def create_ics_calendar(cron_expressions):
    # aws_cronから実行日時を取得。ディクショナリにルール名と実行日時を格納。
    event_list = get_event_list(cron_expressions)
    
    # ICS形式のカレンダーデータを作成
    calendar = create_ics_calendar_with(event_list)
    print(calendar)
    
    return calendar
    
    
def get_event_list(cron_expressions):
    event_list = []
    for cron_expression in cron_expressions:
        name = cron_expression['name']
        aws_cron = cron_expression['aws_cron']
        
        # AWSのCron形式からdatetimeのリストを取得
        # 繰り返しがあるためリストで受け取る
        event_datetime_list = convert_awscron_to_datetime_list(aws_cron)
        print(event_datetime_list)
        
        for event_datetime in event_datetime_list:
            # 終了時刻は開始時刻から5分後
            end_datetime = event_datetime + datetime.timedelta(minutes=5)
            # UIDを生成
            uid = get_uid()
            
            event_list.append({"summary": name, "dtstart": event_datetime, "dtend": end_datetime, "uid": uuid})
    
    return event_list
            

def convert_awscron_to_datetime_list(aws_cron_str):
    datetime_list = []
    
    aws_cron = AWSCron(aws_cron_str)
    dt = start_date
    while True:
        # occurrence関数、next関数の組み合わせで、指定した最終日までの実行日を抽出する
        dt = aws_cron.occurrence(dt).next()
        if dt > end_date:
            break
        datetime_list.append(dt)
    
    return datetime_list
    
def get_uid():
    # タイムスタンプを取得
    timestamp = time.time()
    
    # UUID4を生成
    random_uuid = uuid.uuid4()
    
    # タイムスタンプとUUID4を組み合わせてUIDを生成
    uid = f"{int(timestamp * 1e9)}-{random_uuid}"
    
    return uid

# ICS形式のファイルを作成
def create_ics_calendar_with(event_list):
    cal = Calendar()
    cal.add('prodid', '-//AWS//Change Calendar 1.0//EN')
    cal.add('version', '2.0')
    
    for event_dict in event_list:
        event = Event()
        event.add('summary', event_dict["summary"])
        event.add('dtstart', event_dict["dtstart"])
        event.add('dtend', event_dict["dtend"])
        event.add('uid', event_dict["uid"])
        
        cal.add_component(event)
    
    return cal

dynamo.py
import os
import boto3

import eventbridge

# DynamoDB用クライアント
client = boto3.client('dynamodb', region_name='ap-northeast-1')
table_name = os.getenv("TABLE")

# 各要素をDynamoDBに反映
def put_cron_expressions(cron_expressions):
    for cron_expression in cron_expressions:
        item = {
            'account': {'S': cron_expression['account']},
            'name': {'S': cron_expression['name']},
            'aws_cron': {'S': cron_expression["aws_cron"]},
        }
        
        try:
            # アイテムをテーブルに書き込む
            client.put_item(
                TableName = table_name,
                Item = item
            )
            print("Item written successfully. : " + str(item))
        except Exception as e:
            print(f"An error occurred: {e}")

# 全件取得
def get_cron_expressions():
    try:
        items = client.scan(TableName=table_name)['Items']
    except Exception as e:
        print(f"An error occurred: {e}")
    
    cron_expressions = []
    for item in items:
        # 取得した情報から現在もEventbridgeのデータがあるのか確認    
        if eventbridge.is_rule_exsist(item['name']['S']):
            cron_expression_dict = {
                "account" : item['account']['S'],
                "name" : item['name']['S'],
                "aws_cron" : item['aws_cron']['S']
            }
            cron_expressions.append(cron_expression_dict)
        else:
            # 存在しない場合は削除されたものとみなしてDynamoDB上からも削除する
            delete_cron_expression(item)
    
    return cron_expressions
        
def delete_cron_expression(item):
    key = {
        "account" : {
            "S": item['account']['S']
        },
        "name" : {
            "S": item['name']['S']
        }
    }
    try:
        # アイテムをテーブルから削除
        response = dynamodb_client.delete_item(
            TableName=table_name,
            Key=key
        )
        print("Item deleted successfully")
    except Exception as e:
        print(f"An error occurred: {e}")
ssm.py
import boto3
import os

# SSM用クライアント
client = boto3.client("ssm", region_name='ap-northeast-1')

# ICS形式のカレンダーをChangeCalendarに登録
def register_to_change_calendar(calendar):
    calendar_name = os.getenv("CALENDAR_NAME")
    
    try:
        # calendar.to_ical()のみではバイナリ形式のため、decode()関数でstringに変換する
        response = client.update_document(
            Name = calendar_name,
            DocumentVersion = "$LATEST",
            Content = calendar.to_ical().decode()
        )

        # UpdateDocumentのレスポンスにLatesVersionが含まれている
        # それを取得してデフォルトバージョンとして登録
        document_version = response['DocumentDescription']['LatestVersion']
        
        response = client.update_document_default_version(
            Name = calendar_name,
            DocumentVersion = document_version
        )

    except client.exceptions.DuplicateDocumentContent as e:
        # 更新がない場合にはDuplicateDocumentContentの例外が発生する。
        print("No Change")
    except Exception as e:
        print(f"An error occurred: {e}")

環境変数
環境変数名 用途・設定値
ACCOUNT_ID ChangeCalendar作成アカウントのID
CALENDAR_NAME 作成したカレンダー名
MONTH イベント作成範囲(単位:月)
TABLE DynamoDBのテーブル
TAG_KEY EventBridgeに付与されるタグのキー名
controlを設定
TAG_VALUE EventBridgeに付与されるタグのバリュー名
ChangeCalendarを設定

動作確認

準備

Change Calendar
image.png

EventBridge
image.png

日本時間で毎週日曜日9:00に実行するように設定しました。
image.png

実行されたかどうかを確認するため、今回はイベントをCloudWatchに連携してみます。
image.png

指定のタグもしっかりつけました。
image.png

実行確認

実行日が2024/6/21なので1か月後(7/21)までの毎週日曜日9:00に設定されていれば成功です。

ChangeClaendar
指定日までの日程を正しく設定されています。
image.png

カレンダーイベントの詳細を見てみると、タイトルがEventBridgeルール名と一致していることが確認できます。
image.png

EventBridge
スケジュールが削除され、イベントパターンが自動で設定されました。こちらの設定でカレンダーイベントが実行されるとイベントをキャッチすることができます。
image.png

DynamoDB
DynamoDBにも格納されていることがわかりました。
image.png

おまけ

EventBridgeルールを2つ用意して、日曜の9:00にTestRuleForChangeCalendar、水曜の9:00にTestRuleForChangeCalendar02が実行されるようにスケジュールをそれぞれ設定しました。

Lambdaを実行するとChangeCalendarには正しく登録が行われることを確認しました。
このあとにEventBridgeルールを消すと正しくChangeCalendarに反映されることも確認できました。
image.png

おわりに

様々なジョブがEventBridgeで管理されている場合には有効な策かと思います。
ぜひご参考ください。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?