0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Durable Functions を用いて CloudFormation のドリフトを自動修復する

0
Posted at

はじめに

こんにちは、ほうき星 @H0ukiStar です。

皆さんは昨年(2025年)の11月に CloudFormation がアップデートされ、ドリフト状態の修正に利用可能なドリフト認識変更セットが追加されたことをご存じでしょうか?

本機能の登場以前は、ドリフト修正を行うためには一度ダミーの変更を加えてスタック更新を実施し、その後元に戻す必要がありました。
しかし現在は、変更セット作成時に --deployment-mode REVERT_DRIFT を指定することで、IaC と実際のインフラストラクチャとの差分を認識し、ドリフト修復用の変更セットを作成できるようになりました。

本記事では CloudFormation スタックのドリフトをチェックし、このドリフト認識変更セットを用いて自動的にドリフトの修正を行う Configuration Healing の仕組みを Durable Functions で実装してみましたのでご紹介します。

ドリフト認識変更セットを試してみる

サンプルスタックの展開

まずはドリフト認識変更セットがどのように動作するのかを確認します。
以下の CloudFormation テンプレートをスタック名:test としてデプロイしました。

sample.yaml
AWSTemplateFormatVersion: 2010-09-09
Description: Stack for testing drift-aware change sets

Resources:
  TestParameter:
    Type: AWS::SSM::Parameter
    Properties:
      Name: /drift-test/sample
      Type: String
      Value: initial-value
      Description: Parameter for drift testing

スタック展開直後は以下の通りドリフトは発生していません。

スタック展開直後のドリフトステータス

スタック展開時に以下のテンプレートで作成したロールを使用しています。

IAM ロールテンプレート
sample-execution-role.yaml
AWSTemplateFormatVersion: 2010-09-09
Description: IAM Role for deploying sample.yaml CloudFormation stack

Resources:
  CloudFormationExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: sample-cfn-execution-role
      Description: Execution role for deploying sample.yaml stack
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service: cloudformation.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - !Ref CloudFormationExecutionPolicy
      Tags:
        - Key: Purpose
          Value: CloudFormationExecution
        - Key: Stack
          Value: sample-drift-test

  CloudFormationExecutionPolicy:
    Type: AWS::IAM::ManagedPolicy
    Properties:
      ManagedPolicyName: sample-cfn-execution-policy
      Description: Policy for CloudFormation to manage resources in sample.yaml
      PolicyDocument:
        Version: 2012-10-17
        Statement:
          - Sid: SSMParameterAccess
            Effect: Allow
            Action:
              - ssm:PutParameter
              - ssm:DeleteParameter
              - ssm:GetParameter
              - ssm:GetParameters
              - ssm:DescribeParameters
              - ssm:AddTagsToResource
              - ssm:RemoveTagsFromResource
              - ssm:ListTagsForResource
            Resource:
              - !Sub arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/drift-test/*
          - Sid: CloudFormationReadAccess
            Effect: Allow
            Action:
              - cloudformation:DescribeStacks
              - cloudformation:DescribeStackResources
              - cloudformation:DescribeStackEvents
              - cloudformation:GetTemplate
              - cloudformation:ListStackResources
            Resource:
              - !Sub arn:aws:cloudformation:${AWS::Region}:${AWS::AccountId}:stack/sample-drift-test/*

ドリフトの発生

以下のコマンドで SSM パラメータの値を変更し、ドリフトを発生させます。

aws ssm put-parameter \
  --name /drift-test/sample \
  --value updated-value \
  --overwrite

CloudFormation のドリフト画面からもドリフトが発生していることを確認できます。

SSM パラメータ変更後のドリフトステータス

ドリフト認識変更セットで修復してみる

CloudFormation のドリフト画面上にドリフト認識変更セットを使用して修正できる旨の案内が表示されているので「変更セットを作成」から実施します。

変更セットの作成

変更セットのタイプがドリフト認識変更セットになっていることを確認し、画面の案内に従って変更セットを作成します。

ドリフト認識変更セットの作成

作成された変更セットのリソースの変更内容を確認すると、ドリフトになっている部分を認識したうえでその修正が行われる内容になっています。

変更セットの内容確認

変更セットの適用後、ドリフトが解消されていることが確認できます。

変更セット適用後のドリフトステータス

Durable Functions を用いた Configuration Healing

CloudFormation のドリフト検出や変更セット作成は非同期処理であるため、完了までポーリングによる待機が必要になります。

これを通常の Lambda 関数だけで実装すると、待機制御やリトライ、状態管理が複雑になりがちです。

今回は Durable Functions を利用することで、これらの待機処理や状態遷移をシンプルに実装しました。

また、Durable Functions は長時間実行されるワークフローを前提としているため、CloudFormation のような非同期 API を扱うユースケースと非常に相性が良いです。
特に今回のような「開始 → 完了待機 → 状態確認 → 次処理」というオーケストレーションを伴う処理では、実装を見通し良く記述することができます。

フロー

実装にあたっては、以下の流れで CloudFormation スタックのドリフト検出から修復までを行います。

Durable Functions を利用することで、ドリフト検出や変更セット作成完了までの待機をシンプルに実装できます。

また Lambda 関数に与えるイベント中で CreateChangeSetOnly:true とすることで、変更セットの作成までに留めるようにしています。

Durable Functions での実装サンプル

Durable Functions は前述のフローに則って実装しています。

SAM テンプレートを含む全ソースは以下のリポジトリに置いています。合わせて参考にしてください。

Configuration Healing を行う Lambda 関数のコード
lambda_function.py
from datetime import datetime
from urllib.parse import quote
from typing import Any, Callable, Optional, Literal, TypedDict, Mapping

import boto3
from botocore.config import Config
from mypy_boto3_sns import SNSClient
from pydantic_settings import BaseSettings
from pydantic import BaseModel, ValidationError
from mypy_boto3_cloudformation.type_defs import (
    CreateChangeSetOutputTypeDef,
    DescribeChangeSetOutputTypeDef,
    DetectStackDriftOutputTypeDef,
    DescribeStackDriftDetectionStatusOutputTypeDef,
)
from mypy_boto3_cloudformation import CloudFormationClient
from aws_durable_execution_sdk_python.config import Duration
from aws_durable_execution_sdk_python.types import WaitForConditionCheckContext
from aws_durable_execution_sdk_python.waits import WaitForConditionConfig, WaitForConditionDecision
from aws_durable_execution_sdk_python import StepContext, DurableContext, durable_step, durable_execution


class DescribeStackDriftDetectionStatusState(TypedDict):
    """
    State information for stack drift detection operations.

    Attributes
    ----------
    StackId : str
        The unique identifier of the CloudFormation stack.
    StackDriftDetectionId : str
        The unique identifier of the drift detection operation.
    StackDriftStatus : Literal["DRIFTED", "IN_SYNC", "UNKNOWN", "NOT_CHECKED"]
        Current drift status of the stack.
    DetectionStatus : Literal["DETECTION_IN_PROGRESS", "DETECTION_FAILED", "DETECTION_COMPLETE"]
        Current status of the drift detection operation.
    """
    StackId: str
    StackDriftDetectionId: str
    StackDriftStatus: Literal["DRIFTED", "IN_SYNC", "UNKNOWN", "NOT_CHECKED"]
    DetectionStatus: Literal["DETECTION_IN_PROGRESS", "DETECTION_FAILED", "DETECTION_COMPLETE"]


class DescribeChangeSetState(TypedDict):
    """
    State information for change set operations.

    Attributes
    ----------
    StackId : str
        The unique identifier of the CloudFormation stack associated with the change set.
    ChangeSetId : str
        The unique identifier of the change set.
    ExecutionStatus : Literal["UNAVAILABLE", "AVAILABLE", "EXECUTE_IN_PROGRESS", "EXECUTE_COMPLETE", "EXECUTE_FAILED", "OBSOLETE"]
        Current execution status of the change set.
    Status : Literal["CREATE_PENDING", "CREATE_IN_PROGRESS", "CREATE_COMPLETE", "DELETE_PENDING", "DELETE_IN_PROGRESS", "DELETE_COMPLETE", "DELETE_FAILED", "FAILED"]
        Current creation/deletion status of the change set.
    """
    StackId: str
    ChangeSetId: str
    ExecutionStatus: Literal["UNAVAILABLE", "AVAILABLE", "EXECUTE_IN_PROGRESS", "EXECUTE_COMPLETE", "EXECUTE_FAILED", "OBSOLETE"]
    Status: Literal["CREATE_PENDING", "CREATE_IN_PROGRESS", "CREATE_COMPLETE", "DELETE_PENDING", "DELETE_IN_PROGRESS", "DELETE_COMPLETE", "DELETE_FAILED", "FAILED"]


class DetectionStatusFailedError(Exception):
    """Raised when DetectionStatus is 'DETECTION_FAILED'."""

    def __init__(self, stack_id: str, stack_drift_detection_id: str):
        super().__init__(f"DetectionStatus is 'DETECTION_FAILED'.\nStack ID: {stack_id}\nStack Drift Detection ID: {stack_drift_detection_id}")


class StackDriftStatusUnknownError(Exception):
    """Raised when StackDriftStatus is 'UNKNOWN'."""

    def __init__(self, stack_id: str, stack_drift_detection_id: str):
        super().__init__(f"StackDriftStatus is 'UNKNOWN'.\nStack ID: {stack_id}\nStack Drift Detection ID: {stack_drift_detection_id}")


class ChangeSetCreationStatusFailedError(Exception):
    """Raised when change set Status is 'FAILED'."""

    def __init__(self, stack_id: str, change_set_id: str):
        super().__init__(f"Change set Status is 'FAILED'.\nStack ID: {stack_id}\nChange Set ID: {change_set_id}")


class ChangeSetExecutionStatusFailedError(Exception):
    """Raised when change set ExecutionStatus is 'EXECUTE_FAILED'."""

    def __init__(self, stack_id: str, change_set_id: str):
        super().__init__(f"Change set ExecutionStatus is 'EXECUTE_FAILED'.\nStack ID: {stack_id}\nChange Set ID: {change_set_id}")


class WaitConditionTimeoutError(Exception):
    """Raised when a wait condition times out after maximum attempts."""

    def __init__(self, operation: str, max_attempts: int, state: Optional[Mapping[str, Any]] = None):
        self.operation = operation
        self.max_attempts = max_attempts
        self.state = dict(state) if state else {}
        message = f"Wait condition timed out.\nOperation: {operation}\nMax Attempts: {max_attempts}"
        if state:
            message += f"\nState: {state}"
        super().__init__(message)


class Settings(BaseSettings):
    """
    Application settings loaded from environment variables.

    Attributes
    ----------
    SNS_TOPIC_ARN : str
        The ARN of the SNS topic for notifications.
    WAIT_FOR_STACK_DRIFT_DETECTION_COMPLETION_INTERVAL_SECONDS : int
        Interval in seconds between drift check status polls.
    WAIT_FOR_CHANGE_SET_CREATION_INTERVAL_SECONDS : int
        Interval in seconds between change set creation status polls.
    WAIT_FOR_CHANGE_SET_EXECUTION_INTERVAL_SECONDS : int
        Interval in seconds between change set execution status polls.
    MAX_WAIT_ATTEMPTS : int
        Maximum number of attempts for waiting operations before timing out.
    """
    SNS_TOPIC_ARN: str = "arn:aws:sns:us-east-1:123456789012:example-topic"
    WAIT_FOR_STACK_DRIFT_DETECTION_COMPLETION_INTERVAL_SECONDS: int = 30
    WAIT_FOR_CHANGE_SET_CREATION_INTERVAL_SECONDS: int = 30
    WAIT_FOR_CHANGE_SET_EXECUTION_INTERVAL_SECONDS: int = 60
    MAX_WAIT_ATTEMPTS: int = 10


class EventParameter(BaseModel):
    """
    Event parameter model for Lambda function input.

    Attributes
    ----------
    StackName : str
        The name of the CloudFormation stack to process.
    CreateChangeSetOnly : bool, default=False
        If True, only creates a change set without executing it.
        Useful for reviewing changes before applying them.
    """
    StackName: str
    CreateChangeSetOnly: bool = False


def get_settings() -> Settings:
    """
    Get application settings.

    This function can be easily mocked in tests.

    Returns
    -------
    Settings
        Application settings instance.
    """
    return Settings()


def create_cloudformation_client() -> CloudFormationClient:
    """
    Create and return a CloudFormation client.

    Returns
    -------
    CloudFormationClient
        A boto3 CloudFormation client instance.
    """
    return boto3.client(
        "cloudformation",
        config=Config(retries={"max_attempts": 3, "mode": "standard"})
    )


def create_sns_client() -> SNSClient:
    """
    Create and return an SNS client.

    Returns
    -------
    SNSClient
        A boto3 SNS client instance.
    """
    return boto3.client(
        "sns",
        config=Config(retries={"max_attempts": 3, "mode": "standard"})
    )


def parse_region_from_stack_id(stack_id: str) -> str:
    """
    Parse the AWS region from a CloudFormation stack ARN.

    Parameters
    ----------
    stack_id : str
        The CloudFormation stack ARN (e.g., arn:aws:cloudformation:ap-northeast-1:123456789012:stack/test/abc123).

    Returns
    -------
    str
        The AWS region extracted from the ARN.

    Raises
    ------
    ValueError
        If the Stack ID format is invalid.
    """
    # ARN format: arn:aws:cloudformation:{region}:{account}:stack/{stack_name}/{stack_id}
    parts = stack_id.split(":")
    if len(parts) >= 4:
        return parts[3]
    raise ValueError(f"Invalid Stack ID format: {stack_id}")


def build_changeset_console_url(stack_id: str, changeset_id: str) -> str:
    """
    Build AWS Console URL for viewing a change set.

    Parameters
    ----------
    stack_id : str
        The CloudFormation stack ARN.
    changeset_id : str
        The CloudFormation change set ARN.

    Returns
    -------
    str
        The AWS Console URL for the change set.
    """
    region = parse_region_from_stack_id(stack_id)
    encoded_stack_id = quote(stack_id, safe="")
    encoded_changeset_id = quote(changeset_id, safe="")

    return f"https://{region}.console.aws.amazon.com/cloudformation/home?region={region}#/stacks/changesets/changes?stackId={encoded_stack_id}&changeSetId={encoded_changeset_id}"


@durable_step
def detect_stack_drift(_: StepContext, stack_name: str, client_factory: Optional[Callable[[], CloudFormationClient]]=None) -> str:
    """
    Detect drift in a CloudFormation stack.

    Parameters
    ----------
    _ : StepContext
        The step context for the durable execution.
    stack_name : str
        The name of the CloudFormation stack to check for drift.
    client_factory : Optional[Callable[[], CloudFormationClient]], default=None
        Optional factory function to create a CloudFormation client.

    Returns
    -------
    str
        The stack drift detection ID.
    """
    client: CloudFormationClient = client_factory() if client_factory else create_cloudformation_client()

    response: DetectStackDriftOutputTypeDef = client.detect_stack_drift(StackName=stack_name)

    return response["StackDriftDetectionId"]


def describe_stack_drift_detection_status(state: DescribeStackDriftDetectionStatusState, context: WaitForConditionCheckContext, client_factory: Optional[Callable[[], CloudFormationClient]]=None) -> DescribeStackDriftDetectionStatusState:
    """
    Check the status of a stack drift detection operation.

    Parameters
    ----------
    state : DescribeStackDriftDetectionStatusState
        Current state containing the stack drift detection ID.
    context : WaitForConditionCheckContext
        Context object for logging and wait condition checks.
    client_factory : Optional[Callable[[], CloudFormationClient]], default=None
        Optional factory function to create a CloudFormation client.

    Returns
    -------
    DescribeStackDriftDetectionStatusState
        Updated state with current drift detection status.
    """
    client: CloudFormationClient = client_factory() if client_factory else create_cloudformation_client()

    response: DescribeStackDriftDetectionStatusOutputTypeDef = client.describe_stack_drift_detection_status(StackDriftDetectionId=state["StackDriftDetectionId"])

    return {
        "StackId": response["StackId"],
        "StackDriftDetectionId": response["StackDriftDetectionId"],
        "StackDriftStatus": response.get("StackDriftStatus", "UNKNOWN"),
        "DetectionStatus": response["DetectionStatus"]
    }


def wait_for_stack_drift_detection_completion(state: DescribeStackDriftDetectionStatusState, attempt: int, settings_factory: Optional[Callable[[], Settings]]=None) -> WaitForConditionDecision:
    """
    Determine whether to continue waiting for drift detection completion.

    Parameters
    ----------
    state : DescribeStackDriftDetectionStatusState
        Current drift detection state.
    attempt : int
        Current attempt number, used to check against MAX_WAIT_ATTEMPTS.
    settings_factory : Optional[Callable[[], Settings]], default=None
        Optional factory function to create a Settings instance.

    Returns
    -------
    WaitForConditionDecision
        Decision to stop polling or continue waiting.
    """
    settings: Settings = settings_factory() if settings_factory else get_settings()

    if state["DetectionStatus"] != "DETECTION_IN_PROGRESS":
        return WaitForConditionDecision.stop_polling()
    elif attempt >= settings.MAX_WAIT_ATTEMPTS:
        raise WaitConditionTimeoutError(
            operation="Wait for stack drift detection completion",
            max_attempts=settings.MAX_WAIT_ATTEMPTS,
            state=state
        )
    else:
        return WaitForConditionDecision.continue_waiting(Duration.from_seconds(settings.WAIT_FOR_STACK_DRIFT_DETECTION_COMPLETION_INTERVAL_SECONDS))


@durable_step
def create_change_set(_: StepContext, stack_name: str, client_factory: Optional[Callable[[], CloudFormationClient]]=None) -> CreateChangeSetOutputTypeDef:
    """
    Create a change set to revert drift in a CloudFormation stack.

    Parameters
    ----------
    _ : StepContext
        The step context for the durable execution.
    stack_name : str
        The name of the CloudFormation stack.
    client_factory : Optional[Callable[[], CloudFormationClient]], default=None
        Optional factory function to create a CloudFormation client.

    Returns
    -------
    CreateChangeSetOutputTypeDef
        Response containing the change set ID and other metadata.
    """
    client: CloudFormationClient = client_factory() if client_factory else create_cloudformation_client()
    now: datetime = datetime.now()

    return client.create_change_set(
        StackName=stack_name,
        ChangeSetName=f"healing-changeset-{stack_name}-{now.strftime('%Y%m%dT%H%M%S')}",
        UsePreviousTemplate=True,
        Description="Change set for configuration healing",
        DeploymentMode="REVERT_DRIFT"
    )


def describe_change_set(state: DescribeChangeSetState, context: WaitForConditionCheckContext, client_factory: Optional[Callable[[], CloudFormationClient]]=None) -> DescribeChangeSetState:
    """
    Retrieve the current status of a change set.

    Parameters
    ----------
    state : DescribeChangeSetState
        Current state containing the change set ID.
    context : WaitForConditionCheckContext
        Context object for logging and wait condition checks.
    client_factory : Optional[Callable[[], CloudFormationClient]], default=None
        Optional factory function to create a CloudFormation client.

    Returns
    -------
    DescribeChangeSetState
        Updated state with current change set status.
    """
    client: CloudFormationClient = client_factory() if client_factory else create_cloudformation_client()

    response: DescribeChangeSetOutputTypeDef = client.describe_change_set(ChangeSetName=state["ChangeSetId"])

    return {
        "StackId": response["StackId"],
        "ChangeSetId": response["ChangeSetId"],
        "ExecutionStatus": response["ExecutionStatus"],
        "Status": response["Status"]
    }


def wait_for_change_set_creation_completion(state: DescribeChangeSetState, attempt: int, settings_factory: Optional[Callable[[], Settings]]=None) -> WaitForConditionDecision:
    """
    Determine whether to continue waiting for change set creation.

    Parameters
    ----------
    state : DescribeChangeSetState
        Current change set state.
    attempt : int
        Current attempt number, used to check against MAX_WAIT_ATTEMPTS.
    settings_factory : Optional[Callable[[], Settings]], default=None
        Optional factory function to create a Settings instance.

    Returns
    -------
    WaitForConditionDecision
        Decision to stop polling or continue waiting.
    """
    settings: Settings = settings_factory() if settings_factory else get_settings()

    if state["Status"] in ["CREATE_COMPLETE", "FAILED"]:
        return WaitForConditionDecision.stop_polling()
    elif attempt >= settings.MAX_WAIT_ATTEMPTS:
        raise WaitConditionTimeoutError(
            operation="Wait for change set creation completion",
            max_attempts=settings.MAX_WAIT_ATTEMPTS,
            state=state
        )
    else:
        return WaitForConditionDecision.continue_waiting(Duration.from_seconds(settings.WAIT_FOR_CHANGE_SET_CREATION_INTERVAL_SECONDS))


@durable_step
def execute_change_set(_: StepContext, change_set_id: str, client_factory: Optional[Callable[[], CloudFormationClient]]=None) -> None:
    """
    Execute a change set to apply changes to a CloudFormation stack.

    Parameters
    ----------
    _ : StepContext
        The step context for the durable execution.
    change_set_id : str
        The ID of the change set to execute.
    client_factory : Optional[Callable[[], CloudFormationClient]], default=None
        Optional factory function to create a CloudFormation client.

    Returns
    -------
    None
    """
    client: CloudFormationClient = client_factory() if client_factory else create_cloudformation_client()

    client.execute_change_set(ChangeSetName=change_set_id)


def wait_for_change_set_execution_completion(state: DescribeChangeSetState, attempt: int, settings_factory: Optional[Callable[[], Settings]]=None) -> WaitForConditionDecision:
    """
    Determine whether to continue waiting for change set execution.

    Parameters
    ----------
    state : DescribeChangeSetState
        Current change set state.
    attempt : int
        Current attempt number, used to check against MAX_WAIT_ATTEMPTS.
    settings_factory : Optional[Callable[[], Settings]], default=None
        Optional factory function to create a Settings instance.

    Returns
    -------
    WaitForConditionDecision
        Decision to stop polling or continue waiting.
    """
    settings: Settings = settings_factory() if settings_factory else get_settings()

    if state["ExecutionStatus"] in ["EXECUTE_COMPLETE", "EXECUTE_FAILED", "OBSOLETE"]:
        return WaitForConditionDecision.stop_polling()
    elif attempt >= settings.MAX_WAIT_ATTEMPTS:
        raise WaitConditionTimeoutError(
            operation="Wait for change set execution completion",
            max_attempts=settings.MAX_WAIT_ATTEMPTS,
            state=state
        )
    else:
        return WaitForConditionDecision.continue_waiting(Duration.from_seconds(settings.WAIT_FOR_CHANGE_SET_EXECUTION_INTERVAL_SECONDS))


@durable_step
def send_notification(_: StepContext, message: str, client_factory: Optional[Callable[[], SNSClient]]=None, settings_factory: Optional[Callable[[], Settings]]=None) -> None:
    """
    Send a notification message to the configured SNS topic.

    Parameters
    ----------
    _ : StepContext
        The step context for the durable execution.
    message : str
        The message to send in the notification.
    client_factory : Optional[Callable[[], SNSClient]], default=None
        Optional factory function to create an SNS client.
    settings_factory : Optional[Callable[[], Settings]], default=None
        Optional factory function to retrieve application settings.

    Returns
    -------
    None
    """
    client: SNSClient = client_factory() if client_factory else create_sns_client()
    settings: Settings = settings_factory() if settings_factory else get_settings()

    client.publish(
        TopicArn=settings.SNS_TOPIC_ARN,
        Message=message,
        Subject="CloudFormation Stack Drift Detection and Healing Notification"
    )


@durable_execution
def lambda_handler(event: dict, context: DurableContext):
    """
    Main Lambda handler for CloudFormation stack drift detection and healing.

    This function orchestrates the process of detecting drift in a CloudFormation
    stack and automatically healing it by creating and executing a change set.

    Parameters
    ----------
    event : dict
        Lambda event containing the stack name.
    context : DurableContext
        Durable execution context for managing long-running workflows.

    Returns
    -------
    None
    """
    # Validate and parse input event
    try:
        event_parameter = EventParameter(**event)
        context.logger.debug(f"{event_parameter=}")
    except ValidationError as e:
        context.logger.exception(f"Input event validation failed for event: {event}")
        raise e

    # Step 1: Detect stack drift
    stack_drift_detection_id: str = context.step(detect_stack_drift(event_parameter.StackName))
    context.logger.debug(f"{stack_drift_detection_id=}")

    # Step 2: Wait for stack drift detection to complete and check results
    describe_stack_drift_detection_status_state: DescribeStackDriftDetectionStatusState = context.wait_for_condition(
        check=describe_stack_drift_detection_status,
        config=WaitForConditionConfig(
            initial_state={
                "StackId": "",
                "StackDriftDetectionId": stack_drift_detection_id,
                "StackDriftStatus": "UNKNOWN",
                "DetectionStatus": "DETECTION_IN_PROGRESS"
            },
            wait_strategy=wait_for_stack_drift_detection_completion
        ),
        name="wait_for_stack_drift_detection_completion"
    )
    context.logger.debug(f"{describe_stack_drift_detection_status_state=}")

    if describe_stack_drift_detection_status_state["DetectionStatus"] == "DETECTION_FAILED":
        detection_status_failed_error: DetectionStatusFailedError = DetectionStatusFailedError(
            stack_id=describe_stack_drift_detection_status_state["StackId"],
            stack_drift_detection_id=describe_stack_drift_detection_status_state["StackDriftDetectionId"]
        )
        context.logger.error(f"{detection_status_failed_error}")
        context.step(send_notification(f"{detection_status_failed_error}"))
        raise detection_status_failed_error

    if describe_stack_drift_detection_status_state["StackDriftStatus"] == "UNKNOWN":
        stack_drift_status_unknown_error: StackDriftStatusUnknownError = StackDriftStatusUnknownError(
            stack_id=describe_stack_drift_detection_status_state["StackId"],
            stack_drift_detection_id=describe_stack_drift_detection_status_state["StackDriftDetectionId"]
        )
        context.logger.error(f"{stack_drift_status_unknown_error}")
        context.step(send_notification(f"{stack_drift_status_unknown_error}"))
        raise stack_drift_status_unknown_error
    elif describe_stack_drift_detection_status_state["StackDriftStatus"] == "IN_SYNC":
        context.logger.info(f"Stack is in sync. No drift detected.\nStack ID: {describe_stack_drift_detection_status_state['StackId']}")
        return
    context.logger.warning(f"Stack is drifted.\nStack ID: {describe_stack_drift_detection_status_state['StackId']}")

    # Step 3: Create a change set to heal the drift
    healing_change_set_info: CreateChangeSetOutputTypeDef = context.step(create_change_set(event_parameter.StackName))
    context.logger.debug(f"{healing_change_set_info=}")

    # Step 4: Wait for change set creation to complete
    describe_change_set_state: DescribeChangeSetState = context.wait_for_condition(
        check=describe_change_set,
        config=WaitForConditionConfig(
            initial_state={
                "StackId": "",
                "ChangeSetId": healing_change_set_info["Id"],
                "ExecutionStatus": "UNAVAILABLE",
                "Status": "CREATE_PENDING"
            },
            wait_strategy=wait_for_change_set_creation_completion
        ),
        name="wait_for_change_set_creation"
    )
    context.logger.debug(f"{describe_change_set_state=}")

    if describe_change_set_state["Status"] == "FAILED":
        change_set_creation_status_failed_error: ChangeSetCreationStatusFailedError = ChangeSetCreationStatusFailedError(
            stack_id=describe_change_set_state["StackId"],
            change_set_id=describe_change_set_state["ChangeSetId"]
        )
        context.logger.error(f"{change_set_creation_status_failed_error}")
        context.step(send_notification(f"{change_set_creation_status_failed_error}"))
        raise change_set_creation_status_failed_error

    change_set_url: str = build_changeset_console_url(describe_change_set_state["StackId"], describe_change_set_state["ChangeSetId"])

    # If CreateChangeSetOnly flag is set, send notification and exit
    if event_parameter.CreateChangeSetOnly:
        change_set_created_message = (
            f"Change set created successfully.\n"
            f"Stack ID: {describe_change_set_state['StackId']}\n"
            f"Change Set ID: {describe_change_set_state['ChangeSetId']}\n"
            f"\n"
            f"Please review and execute the change set manually if needed.\n"
            f"URL: {change_set_url}"
        )
        context.logger.info(change_set_created_message)
        context.step(send_notification(change_set_created_message))
        return

    # Step 5: Execute the change set to heal the drift
    context.step(execute_change_set(describe_change_set_state["ChangeSetId"]))
    context.logger.info(f"Change set executed to heal the stack drift.\nStack ID: {describe_change_set_state['StackId']}\nChange Set ID: {describe_change_set_state['ChangeSetId']}")

    # Step 6: Wait for change set execution to complete and check results
    describe_change_set_state: DescribeChangeSetState = context.wait_for_condition(
        check=describe_change_set,
        config=WaitForConditionConfig(
            initial_state={
                "StackId": "",
                "ChangeSetId": healing_change_set_info["Id"],
                "ExecutionStatus": "AVAILABLE",
                "Status": "CREATE_COMPLETE"
            },
            wait_strategy=wait_for_change_set_execution_completion
        ),
        name="wait_for_change_set_execution"
    )
    context.logger.debug(f"{describe_change_set_state=}")

    if describe_change_set_state["ExecutionStatus"] == "EXECUTE_FAILED":
        change_set_execution_status_failed_error: ChangeSetExecutionStatusFailedError = ChangeSetExecutionStatusFailedError(
            stack_id=describe_change_set_state["StackId"],
            change_set_id=describe_change_set_state["ChangeSetId"]
        )
        context.logger.error(f"{change_set_execution_status_failed_error}")
        context.step(send_notification(f"{change_set_execution_status_failed_error}"))
        raise change_set_execution_status_failed_error
    elif describe_change_set_state["ExecutionStatus"] == "OBSOLETE":
        obsolete_message = (
            f"Change set is obsolete.\n"
            f"Stack ID: {describe_change_set_state['StackId']}\n"
            f"Change Set ID: {describe_change_set_state['ChangeSetId']}\n"
            f"\n"
            f"Please check the stack status and consider re-running the healing process if necessary.\n"
            f"URL: {change_set_url}"
        )
        context.logger.warning(obsolete_message)
        context.step(send_notification(obsolete_message))
    elif describe_change_set_state["ExecutionStatus"] == "EXECUTE_COMPLETE":
        success_message = (
            f"Change set execution completed successfully.\n"
            f"Stack ID: {describe_change_set_state['StackId']}\n"
            f"Change Set ID: {describe_change_set_state['ChangeSetId']}\n"
            f"\n"
            f"Stack drift has been healed."
        )
        context.logger.info(success_message)
        context.step(send_notification(success_message))
    return

動作確認

実際に test スタックで作成した SSM Parameter の値を変更してドリフトを発生させ、展開した Lambda 関数を実行してみます。

Lambda 関数の実行
aws lambda invoke \
  --function-name arn:aws:lambda:<region>:<account-id>:function:cfn-drift-healing:Alias \
  --invocation-type Event \
  --cli-binary-format raw-in-base64-out \
  --payload '{"StackName": "test"}' \
  response.json

ログ等を確認すると、以下の流れで自動修復が実施されていることが分かります。

  1. ドリフト検出開始
  2. ドリフト状態を検知
  3. ドリフト認識変更セットを作成
  4. 変更セットを実行
  5. ドリフト解消

Lambda 関数実行ログ
永続実行の実行結果

修復完了を SNS 通知によっても確認できます。

SNS 通知内容

また、以下のように Durable Functions 実行時にイベントで CreateChangeSetOnly: true を渡すことで、変更セットの作成までに留め変更セットの実行は人に委ねることも可能です。

Lambda 関数の実行
aws lambda invoke \
  --function-name arn:aws:lambda:<region>:<account-id>:function:cfn-drift-healing:Alias \
  --invocation-type Event \
  --cli-binary-format raw-in-base64-out \
  --payload '{"StackName": "test", "CreateChangeSetOnly": true}' \
  response.json

変更セット作成までにとどめた場合の Lambda 関数実行ログ

変更セット作成までにとどめた場合の SNS 通知内容

さいごに

CloudFormation の ドリフト認識変更セット により、これまで手間だったドリフト修復を安全かつ容易に実施できるようになりました。

本記事ではこのドリフト認識変更セットを用いて、自動的にドリフトの修正を行う Configuration Healing の仕組みを Durable Functions で実装しました。
IaC を利用していても、長期間運用される環境では意図しない変更によるドリフトは避けられません。
EventBridge Scheduler 等と組み合わせて定期的に実行することで、こういった意図しない変更や、「後で直そう」と思ったまま放置されていた変更に起因するドリフトを機械的に修正でき、IaC による構成管理を継続的に維持しやすくなります。

なお、本番環境等において強制的に自動修復を行うことに抵抗がある場合は、記事中でも触れたように変更セットの作成までに留め、人によるレビューを挟む運用にすると良いでしょう。

本記事の内容が、CloudFormation を利用した Configuration Healing の実装例として参考になれば幸いです。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?