はじめに
こんにちは、ほうき星 @H0ukiStar です。
皆さんは昨年(2025年)の11月に CloudFormation がアップデートされ、ドリフト状態の修正に利用可能なドリフト認識変更セットが追加されたことをご存じでしょうか?
本機能の登場以前は、ドリフト修正を行うためには一度ダミーの変更を加えてスタック更新を実施し、その後元に戻す必要がありました。
しかし現在は、変更セット作成時に --deployment-mode REVERT_DRIFT を指定することで、IaC と実際のインフラストラクチャとの差分を認識し、ドリフト修復用の変更セットを作成できるようになりました。
本記事では CloudFormation スタックのドリフトをチェックし、このドリフト認識変更セットを用いて自動的にドリフトの修正を行う Configuration Healing の仕組みを Durable Functions で実装してみましたのでご紹介します。
ドリフト認識変更セットを試してみる
サンプルスタックの展開
まずはドリフト認識変更セットがどのように動作するのかを確認します。
以下の CloudFormation テンプレートをスタック名:test としてデプロイしました。
AWSTemplateFormatVersion: 2010-09-09
Description: Stack for testing drift-aware change sets
Resources:
TestParameter:
Type: AWS::SSM::Parameter
Properties:
Name: /drift-test/sample
Type: String
Value: initial-value
Description: Parameter for drift testing
スタック展開直後は以下の通りドリフトは発生していません。
スタック展開時に以下のテンプレートで作成したロールを使用しています。
IAM ロールテンプレート
AWSTemplateFormatVersion: 2010-09-09
Description: IAM Role for deploying sample.yaml CloudFormation stack
Resources:
CloudFormationExecutionRole:
Type: AWS::IAM::Role
Properties:
RoleName: sample-cfn-execution-role
Description: Execution role for deploying sample.yaml stack
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service: cloudformation.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- !Ref CloudFormationExecutionPolicy
Tags:
- Key: Purpose
Value: CloudFormationExecution
- Key: Stack
Value: sample-drift-test
CloudFormationExecutionPolicy:
Type: AWS::IAM::ManagedPolicy
Properties:
ManagedPolicyName: sample-cfn-execution-policy
Description: Policy for CloudFormation to manage resources in sample.yaml
PolicyDocument:
Version: 2012-10-17
Statement:
- Sid: SSMParameterAccess
Effect: Allow
Action:
- ssm:PutParameter
- ssm:DeleteParameter
- ssm:GetParameter
- ssm:GetParameters
- ssm:DescribeParameters
- ssm:AddTagsToResource
- ssm:RemoveTagsFromResource
- ssm:ListTagsForResource
Resource:
- !Sub arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/drift-test/*
- Sid: CloudFormationReadAccess
Effect: Allow
Action:
- cloudformation:DescribeStacks
- cloudformation:DescribeStackResources
- cloudformation:DescribeStackEvents
- cloudformation:GetTemplate
- cloudformation:ListStackResources
Resource:
- !Sub arn:aws:cloudformation:${AWS::Region}:${AWS::AccountId}:stack/sample-drift-test/*
ドリフトの発生
以下のコマンドで SSM パラメータの値を変更し、ドリフトを発生させます。
aws ssm put-parameter \
--name /drift-test/sample \
--value updated-value \
--overwrite
CloudFormation のドリフト画面からもドリフトが発生していることを確認できます。
ドリフト認識変更セットで修復してみる
CloudFormation のドリフト画面上にドリフト認識変更セットを使用して修正できる旨の案内が表示されているので「変更セットを作成」から実施します。
変更セットのタイプがドリフト認識変更セットになっていることを確認し、画面の案内に従って変更セットを作成します。
作成された変更セットのリソースの変更内容を確認すると、ドリフトになっている部分を認識したうえでその修正が行われる内容になっています。
変更セットの適用後、ドリフトが解消されていることが確認できます。
Durable Functions を用いた Configuration Healing
CloudFormation のドリフト検出や変更セット作成は非同期処理であるため、完了までポーリングによる待機が必要になります。
これを通常の Lambda 関数だけで実装すると、待機制御やリトライ、状態管理が複雑になりがちです。
今回は Durable Functions を利用することで、これらの待機処理や状態遷移をシンプルに実装しました。
また、Durable Functions は長時間実行されるワークフローを前提としているため、CloudFormation のような非同期 API を扱うユースケースと非常に相性が良いです。
特に今回のような「開始 → 完了待機 → 状態確認 → 次処理」というオーケストレーションを伴う処理では、実装を見通し良く記述することができます。
フロー
実装にあたっては、以下の流れで CloudFormation スタックのドリフト検出から修復までを行います。
Durable Functions を利用することで、ドリフト検出や変更セット作成完了までの待機をシンプルに実装できます。
また Lambda 関数に与えるイベント中で CreateChangeSetOnly:true とすることで、変更セットの作成までに留めるようにしています。
Durable Functions での実装サンプル
Durable Functions は前述のフローに則って実装しています。
SAM テンプレートを含む全ソースは以下のリポジトリに置いています。合わせて参考にしてください。
Configuration Healing を行う Lambda 関数のコード
from datetime import datetime
from urllib.parse import quote
from typing import Any, Callable, Optional, Literal, TypedDict, Mapping
import boto3
from botocore.config import Config
from mypy_boto3_sns import SNSClient
from pydantic_settings import BaseSettings
from pydantic import BaseModel, ValidationError
from mypy_boto3_cloudformation.type_defs import (
CreateChangeSetOutputTypeDef,
DescribeChangeSetOutputTypeDef,
DetectStackDriftOutputTypeDef,
DescribeStackDriftDetectionStatusOutputTypeDef,
)
from mypy_boto3_cloudformation import CloudFormationClient
from aws_durable_execution_sdk_python.config import Duration
from aws_durable_execution_sdk_python.types import WaitForConditionCheckContext
from aws_durable_execution_sdk_python.waits import WaitForConditionConfig, WaitForConditionDecision
from aws_durable_execution_sdk_python import StepContext, DurableContext, durable_step, durable_execution
class DescribeStackDriftDetectionStatusState(TypedDict):
"""
State information for stack drift detection operations.
Attributes
----------
StackId : str
The unique identifier of the CloudFormation stack.
StackDriftDetectionId : str
The unique identifier of the drift detection operation.
StackDriftStatus : Literal["DRIFTED", "IN_SYNC", "UNKNOWN", "NOT_CHECKED"]
Current drift status of the stack.
DetectionStatus : Literal["DETECTION_IN_PROGRESS", "DETECTION_FAILED", "DETECTION_COMPLETE"]
Current status of the drift detection operation.
"""
StackId: str
StackDriftDetectionId: str
StackDriftStatus: Literal["DRIFTED", "IN_SYNC", "UNKNOWN", "NOT_CHECKED"]
DetectionStatus: Literal["DETECTION_IN_PROGRESS", "DETECTION_FAILED", "DETECTION_COMPLETE"]
class DescribeChangeSetState(TypedDict):
"""
State information for change set operations.
Attributes
----------
StackId : str
The unique identifier of the CloudFormation stack associated with the change set.
ChangeSetId : str
The unique identifier of the change set.
ExecutionStatus : Literal["UNAVAILABLE", "AVAILABLE", "EXECUTE_IN_PROGRESS", "EXECUTE_COMPLETE", "EXECUTE_FAILED", "OBSOLETE"]
Current execution status of the change set.
Status : Literal["CREATE_PENDING", "CREATE_IN_PROGRESS", "CREATE_COMPLETE", "DELETE_PENDING", "DELETE_IN_PROGRESS", "DELETE_COMPLETE", "DELETE_FAILED", "FAILED"]
Current creation/deletion status of the change set.
"""
StackId: str
ChangeSetId: str
ExecutionStatus: Literal["UNAVAILABLE", "AVAILABLE", "EXECUTE_IN_PROGRESS", "EXECUTE_COMPLETE", "EXECUTE_FAILED", "OBSOLETE"]
Status: Literal["CREATE_PENDING", "CREATE_IN_PROGRESS", "CREATE_COMPLETE", "DELETE_PENDING", "DELETE_IN_PROGRESS", "DELETE_COMPLETE", "DELETE_FAILED", "FAILED"]
class DetectionStatusFailedError(Exception):
"""Raised when DetectionStatus is 'DETECTION_FAILED'."""
def __init__(self, stack_id: str, stack_drift_detection_id: str):
super().__init__(f"DetectionStatus is 'DETECTION_FAILED'.\nStack ID: {stack_id}\nStack Drift Detection ID: {stack_drift_detection_id}")
class StackDriftStatusUnknownError(Exception):
"""Raised when StackDriftStatus is 'UNKNOWN'."""
def __init__(self, stack_id: str, stack_drift_detection_id: str):
super().__init__(f"StackDriftStatus is 'UNKNOWN'.\nStack ID: {stack_id}\nStack Drift Detection ID: {stack_drift_detection_id}")
class ChangeSetCreationStatusFailedError(Exception):
"""Raised when change set Status is 'FAILED'."""
def __init__(self, stack_id: str, change_set_id: str):
super().__init__(f"Change set Status is 'FAILED'.\nStack ID: {stack_id}\nChange Set ID: {change_set_id}")
class ChangeSetExecutionStatusFailedError(Exception):
"""Raised when change set ExecutionStatus is 'EXECUTE_FAILED'."""
def __init__(self, stack_id: str, change_set_id: str):
super().__init__(f"Change set ExecutionStatus is 'EXECUTE_FAILED'.\nStack ID: {stack_id}\nChange Set ID: {change_set_id}")
class WaitConditionTimeoutError(Exception):
"""Raised when a wait condition times out after maximum attempts."""
def __init__(self, operation: str, max_attempts: int, state: Optional[Mapping[str, Any]] = None):
self.operation = operation
self.max_attempts = max_attempts
self.state = dict(state) if state else {}
message = f"Wait condition timed out.\nOperation: {operation}\nMax Attempts: {max_attempts}"
if state:
message += f"\nState: {state}"
super().__init__(message)
class Settings(BaseSettings):
"""
Application settings loaded from environment variables.
Attributes
----------
SNS_TOPIC_ARN : str
The ARN of the SNS topic for notifications.
WAIT_FOR_STACK_DRIFT_DETECTION_COMPLETION_INTERVAL_SECONDS : int
Interval in seconds between drift check status polls.
WAIT_FOR_CHANGE_SET_CREATION_INTERVAL_SECONDS : int
Interval in seconds between change set creation status polls.
WAIT_FOR_CHANGE_SET_EXECUTION_INTERVAL_SECONDS : int
Interval in seconds between change set execution status polls.
MAX_WAIT_ATTEMPTS : int
Maximum number of attempts for waiting operations before timing out.
"""
SNS_TOPIC_ARN: str = "arn:aws:sns:us-east-1:123456789012:example-topic"
WAIT_FOR_STACK_DRIFT_DETECTION_COMPLETION_INTERVAL_SECONDS: int = 30
WAIT_FOR_CHANGE_SET_CREATION_INTERVAL_SECONDS: int = 30
WAIT_FOR_CHANGE_SET_EXECUTION_INTERVAL_SECONDS: int = 60
MAX_WAIT_ATTEMPTS: int = 10
class EventParameter(BaseModel):
"""
Event parameter model for Lambda function input.
Attributes
----------
StackName : str
The name of the CloudFormation stack to process.
CreateChangeSetOnly : bool, default=False
If True, only creates a change set without executing it.
Useful for reviewing changes before applying them.
"""
StackName: str
CreateChangeSetOnly: bool = False
def get_settings() -> Settings:
"""
Get application settings.
This function can be easily mocked in tests.
Returns
-------
Settings
Application settings instance.
"""
return Settings()
def create_cloudformation_client() -> CloudFormationClient:
"""
Create and return a CloudFormation client.
Returns
-------
CloudFormationClient
A boto3 CloudFormation client instance.
"""
return boto3.client(
"cloudformation",
config=Config(retries={"max_attempts": 3, "mode": "standard"})
)
def create_sns_client() -> SNSClient:
"""
Create and return an SNS client.
Returns
-------
SNSClient
A boto3 SNS client instance.
"""
return boto3.client(
"sns",
config=Config(retries={"max_attempts": 3, "mode": "standard"})
)
def parse_region_from_stack_id(stack_id: str) -> str:
"""
Parse the AWS region from a CloudFormation stack ARN.
Parameters
----------
stack_id : str
The CloudFormation stack ARN (e.g., arn:aws:cloudformation:ap-northeast-1:123456789012:stack/test/abc123).
Returns
-------
str
The AWS region extracted from the ARN.
Raises
------
ValueError
If the Stack ID format is invalid.
"""
# ARN format: arn:aws:cloudformation:{region}:{account}:stack/{stack_name}/{stack_id}
parts = stack_id.split(":")
if len(parts) >= 4:
return parts[3]
raise ValueError(f"Invalid Stack ID format: {stack_id}")
def build_changeset_console_url(stack_id: str, changeset_id: str) -> str:
"""
Build AWS Console URL for viewing a change set.
Parameters
----------
stack_id : str
The CloudFormation stack ARN.
changeset_id : str
The CloudFormation change set ARN.
Returns
-------
str
The AWS Console URL for the change set.
"""
region = parse_region_from_stack_id(stack_id)
encoded_stack_id = quote(stack_id, safe="")
encoded_changeset_id = quote(changeset_id, safe="")
return f"https://{region}.console.aws.amazon.com/cloudformation/home?region={region}#/stacks/changesets/changes?stackId={encoded_stack_id}&changeSetId={encoded_changeset_id}"
@durable_step
def detect_stack_drift(_: StepContext, stack_name: str, client_factory: Optional[Callable[[], CloudFormationClient]]=None) -> str:
"""
Detect drift in a CloudFormation stack.
Parameters
----------
_ : StepContext
The step context for the durable execution.
stack_name : str
The name of the CloudFormation stack to check for drift.
client_factory : Optional[Callable[[], CloudFormationClient]], default=None
Optional factory function to create a CloudFormation client.
Returns
-------
str
The stack drift detection ID.
"""
client: CloudFormationClient = client_factory() if client_factory else create_cloudformation_client()
response: DetectStackDriftOutputTypeDef = client.detect_stack_drift(StackName=stack_name)
return response["StackDriftDetectionId"]
def describe_stack_drift_detection_status(state: DescribeStackDriftDetectionStatusState, context: WaitForConditionCheckContext, client_factory: Optional[Callable[[], CloudFormationClient]]=None) -> DescribeStackDriftDetectionStatusState:
"""
Check the status of a stack drift detection operation.
Parameters
----------
state : DescribeStackDriftDetectionStatusState
Current state containing the stack drift detection ID.
context : WaitForConditionCheckContext
Context object for logging and wait condition checks.
client_factory : Optional[Callable[[], CloudFormationClient]], default=None
Optional factory function to create a CloudFormation client.
Returns
-------
DescribeStackDriftDetectionStatusState
Updated state with current drift detection status.
"""
client: CloudFormationClient = client_factory() if client_factory else create_cloudformation_client()
response: DescribeStackDriftDetectionStatusOutputTypeDef = client.describe_stack_drift_detection_status(StackDriftDetectionId=state["StackDriftDetectionId"])
return {
"StackId": response["StackId"],
"StackDriftDetectionId": response["StackDriftDetectionId"],
"StackDriftStatus": response.get("StackDriftStatus", "UNKNOWN"),
"DetectionStatus": response["DetectionStatus"]
}
def wait_for_stack_drift_detection_completion(state: DescribeStackDriftDetectionStatusState, attempt: int, settings_factory: Optional[Callable[[], Settings]]=None) -> WaitForConditionDecision:
"""
Determine whether to continue waiting for drift detection completion.
Parameters
----------
state : DescribeStackDriftDetectionStatusState
Current drift detection state.
attempt : int
Current attempt number, used to check against MAX_WAIT_ATTEMPTS.
settings_factory : Optional[Callable[[], Settings]], default=None
Optional factory function to create a Settings instance.
Returns
-------
WaitForConditionDecision
Decision to stop polling or continue waiting.
"""
settings: Settings = settings_factory() if settings_factory else get_settings()
if state["DetectionStatus"] != "DETECTION_IN_PROGRESS":
return WaitForConditionDecision.stop_polling()
elif attempt >= settings.MAX_WAIT_ATTEMPTS:
raise WaitConditionTimeoutError(
operation="Wait for stack drift detection completion",
max_attempts=settings.MAX_WAIT_ATTEMPTS,
state=state
)
else:
return WaitForConditionDecision.continue_waiting(Duration.from_seconds(settings.WAIT_FOR_STACK_DRIFT_DETECTION_COMPLETION_INTERVAL_SECONDS))
@durable_step
def create_change_set(_: StepContext, stack_name: str, client_factory: Optional[Callable[[], CloudFormationClient]]=None) -> CreateChangeSetOutputTypeDef:
"""
Create a change set to revert drift in a CloudFormation stack.
Parameters
----------
_ : StepContext
The step context for the durable execution.
stack_name : str
The name of the CloudFormation stack.
client_factory : Optional[Callable[[], CloudFormationClient]], default=None
Optional factory function to create a CloudFormation client.
Returns
-------
CreateChangeSetOutputTypeDef
Response containing the change set ID and other metadata.
"""
client: CloudFormationClient = client_factory() if client_factory else create_cloudformation_client()
now: datetime = datetime.now()
return client.create_change_set(
StackName=stack_name,
ChangeSetName=f"healing-changeset-{stack_name}-{now.strftime('%Y%m%dT%H%M%S')}",
UsePreviousTemplate=True,
Description="Change set for configuration healing",
DeploymentMode="REVERT_DRIFT"
)
def describe_change_set(state: DescribeChangeSetState, context: WaitForConditionCheckContext, client_factory: Optional[Callable[[], CloudFormationClient]]=None) -> DescribeChangeSetState:
"""
Retrieve the current status of a change set.
Parameters
----------
state : DescribeChangeSetState
Current state containing the change set ID.
context : WaitForConditionCheckContext
Context object for logging and wait condition checks.
client_factory : Optional[Callable[[], CloudFormationClient]], default=None
Optional factory function to create a CloudFormation client.
Returns
-------
DescribeChangeSetState
Updated state with current change set status.
"""
client: CloudFormationClient = client_factory() if client_factory else create_cloudformation_client()
response: DescribeChangeSetOutputTypeDef = client.describe_change_set(ChangeSetName=state["ChangeSetId"])
return {
"StackId": response["StackId"],
"ChangeSetId": response["ChangeSetId"],
"ExecutionStatus": response["ExecutionStatus"],
"Status": response["Status"]
}
def wait_for_change_set_creation_completion(state: DescribeChangeSetState, attempt: int, settings_factory: Optional[Callable[[], Settings]]=None) -> WaitForConditionDecision:
"""
Determine whether to continue waiting for change set creation.
Parameters
----------
state : DescribeChangeSetState
Current change set state.
attempt : int
Current attempt number, used to check against MAX_WAIT_ATTEMPTS.
settings_factory : Optional[Callable[[], Settings]], default=None
Optional factory function to create a Settings instance.
Returns
-------
WaitForConditionDecision
Decision to stop polling or continue waiting.
"""
settings: Settings = settings_factory() if settings_factory else get_settings()
if state["Status"] in ["CREATE_COMPLETE", "FAILED"]:
return WaitForConditionDecision.stop_polling()
elif attempt >= settings.MAX_WAIT_ATTEMPTS:
raise WaitConditionTimeoutError(
operation="Wait for change set creation completion",
max_attempts=settings.MAX_WAIT_ATTEMPTS,
state=state
)
else:
return WaitForConditionDecision.continue_waiting(Duration.from_seconds(settings.WAIT_FOR_CHANGE_SET_CREATION_INTERVAL_SECONDS))
@durable_step
def execute_change_set(_: StepContext, change_set_id: str, client_factory: Optional[Callable[[], CloudFormationClient]]=None) -> None:
"""
Execute a change set to apply changes to a CloudFormation stack.
Parameters
----------
_ : StepContext
The step context for the durable execution.
change_set_id : str
The ID of the change set to execute.
client_factory : Optional[Callable[[], CloudFormationClient]], default=None
Optional factory function to create a CloudFormation client.
Returns
-------
None
"""
client: CloudFormationClient = client_factory() if client_factory else create_cloudformation_client()
client.execute_change_set(ChangeSetName=change_set_id)
def wait_for_change_set_execution_completion(state: DescribeChangeSetState, attempt: int, settings_factory: Optional[Callable[[], Settings]]=None) -> WaitForConditionDecision:
"""
Determine whether to continue waiting for change set execution.
Parameters
----------
state : DescribeChangeSetState
Current change set state.
attempt : int
Current attempt number, used to check against MAX_WAIT_ATTEMPTS.
settings_factory : Optional[Callable[[], Settings]], default=None
Optional factory function to create a Settings instance.
Returns
-------
WaitForConditionDecision
Decision to stop polling or continue waiting.
"""
settings: Settings = settings_factory() if settings_factory else get_settings()
if state["ExecutionStatus"] in ["EXECUTE_COMPLETE", "EXECUTE_FAILED", "OBSOLETE"]:
return WaitForConditionDecision.stop_polling()
elif attempt >= settings.MAX_WAIT_ATTEMPTS:
raise WaitConditionTimeoutError(
operation="Wait for change set execution completion",
max_attempts=settings.MAX_WAIT_ATTEMPTS,
state=state
)
else:
return WaitForConditionDecision.continue_waiting(Duration.from_seconds(settings.WAIT_FOR_CHANGE_SET_EXECUTION_INTERVAL_SECONDS))
@durable_step
def send_notification(_: StepContext, message: str, client_factory: Optional[Callable[[], SNSClient]]=None, settings_factory: Optional[Callable[[], Settings]]=None) -> None:
"""
Send a notification message to the configured SNS topic.
Parameters
----------
_ : StepContext
The step context for the durable execution.
message : str
The message to send in the notification.
client_factory : Optional[Callable[[], SNSClient]], default=None
Optional factory function to create an SNS client.
settings_factory : Optional[Callable[[], Settings]], default=None
Optional factory function to retrieve application settings.
Returns
-------
None
"""
client: SNSClient = client_factory() if client_factory else create_sns_client()
settings: Settings = settings_factory() if settings_factory else get_settings()
client.publish(
TopicArn=settings.SNS_TOPIC_ARN,
Message=message,
Subject="CloudFormation Stack Drift Detection and Healing Notification"
)
@durable_execution
def lambda_handler(event: dict, context: DurableContext):
"""
Main Lambda handler for CloudFormation stack drift detection and healing.
This function orchestrates the process of detecting drift in a CloudFormation
stack and automatically healing it by creating and executing a change set.
Parameters
----------
event : dict
Lambda event containing the stack name.
context : DurableContext
Durable execution context for managing long-running workflows.
Returns
-------
None
"""
# Validate and parse input event
try:
event_parameter = EventParameter(**event)
context.logger.debug(f"{event_parameter=}")
except ValidationError as e:
context.logger.exception(f"Input event validation failed for event: {event}")
raise e
# Step 1: Detect stack drift
stack_drift_detection_id: str = context.step(detect_stack_drift(event_parameter.StackName))
context.logger.debug(f"{stack_drift_detection_id=}")
# Step 2: Wait for stack drift detection to complete and check results
describe_stack_drift_detection_status_state: DescribeStackDriftDetectionStatusState = context.wait_for_condition(
check=describe_stack_drift_detection_status,
config=WaitForConditionConfig(
initial_state={
"StackId": "",
"StackDriftDetectionId": stack_drift_detection_id,
"StackDriftStatus": "UNKNOWN",
"DetectionStatus": "DETECTION_IN_PROGRESS"
},
wait_strategy=wait_for_stack_drift_detection_completion
),
name="wait_for_stack_drift_detection_completion"
)
context.logger.debug(f"{describe_stack_drift_detection_status_state=}")
if describe_stack_drift_detection_status_state["DetectionStatus"] == "DETECTION_FAILED":
detection_status_failed_error: DetectionStatusFailedError = DetectionStatusFailedError(
stack_id=describe_stack_drift_detection_status_state["StackId"],
stack_drift_detection_id=describe_stack_drift_detection_status_state["StackDriftDetectionId"]
)
context.logger.error(f"{detection_status_failed_error}")
context.step(send_notification(f"{detection_status_failed_error}"))
raise detection_status_failed_error
if describe_stack_drift_detection_status_state["StackDriftStatus"] == "UNKNOWN":
stack_drift_status_unknown_error: StackDriftStatusUnknownError = StackDriftStatusUnknownError(
stack_id=describe_stack_drift_detection_status_state["StackId"],
stack_drift_detection_id=describe_stack_drift_detection_status_state["StackDriftDetectionId"]
)
context.logger.error(f"{stack_drift_status_unknown_error}")
context.step(send_notification(f"{stack_drift_status_unknown_error}"))
raise stack_drift_status_unknown_error
elif describe_stack_drift_detection_status_state["StackDriftStatus"] == "IN_SYNC":
context.logger.info(f"Stack is in sync. No drift detected.\nStack ID: {describe_stack_drift_detection_status_state['StackId']}")
return
context.logger.warning(f"Stack is drifted.\nStack ID: {describe_stack_drift_detection_status_state['StackId']}")
# Step 3: Create a change set to heal the drift
healing_change_set_info: CreateChangeSetOutputTypeDef = context.step(create_change_set(event_parameter.StackName))
context.logger.debug(f"{healing_change_set_info=}")
# Step 4: Wait for change set creation to complete
describe_change_set_state: DescribeChangeSetState = context.wait_for_condition(
check=describe_change_set,
config=WaitForConditionConfig(
initial_state={
"StackId": "",
"ChangeSetId": healing_change_set_info["Id"],
"ExecutionStatus": "UNAVAILABLE",
"Status": "CREATE_PENDING"
},
wait_strategy=wait_for_change_set_creation_completion
),
name="wait_for_change_set_creation"
)
context.logger.debug(f"{describe_change_set_state=}")
if describe_change_set_state["Status"] == "FAILED":
change_set_creation_status_failed_error: ChangeSetCreationStatusFailedError = ChangeSetCreationStatusFailedError(
stack_id=describe_change_set_state["StackId"],
change_set_id=describe_change_set_state["ChangeSetId"]
)
context.logger.error(f"{change_set_creation_status_failed_error}")
context.step(send_notification(f"{change_set_creation_status_failed_error}"))
raise change_set_creation_status_failed_error
change_set_url: str = build_changeset_console_url(describe_change_set_state["StackId"], describe_change_set_state["ChangeSetId"])
# If CreateChangeSetOnly flag is set, send notification and exit
if event_parameter.CreateChangeSetOnly:
change_set_created_message = (
f"Change set created successfully.\n"
f"Stack ID: {describe_change_set_state['StackId']}\n"
f"Change Set ID: {describe_change_set_state['ChangeSetId']}\n"
f"\n"
f"Please review and execute the change set manually if needed.\n"
f"URL: {change_set_url}"
)
context.logger.info(change_set_created_message)
context.step(send_notification(change_set_created_message))
return
# Step 5: Execute the change set to heal the drift
context.step(execute_change_set(describe_change_set_state["ChangeSetId"]))
context.logger.info(f"Change set executed to heal the stack drift.\nStack ID: {describe_change_set_state['StackId']}\nChange Set ID: {describe_change_set_state['ChangeSetId']}")
# Step 6: Wait for change set execution to complete and check results
describe_change_set_state: DescribeChangeSetState = context.wait_for_condition(
check=describe_change_set,
config=WaitForConditionConfig(
initial_state={
"StackId": "",
"ChangeSetId": healing_change_set_info["Id"],
"ExecutionStatus": "AVAILABLE",
"Status": "CREATE_COMPLETE"
},
wait_strategy=wait_for_change_set_execution_completion
),
name="wait_for_change_set_execution"
)
context.logger.debug(f"{describe_change_set_state=}")
if describe_change_set_state["ExecutionStatus"] == "EXECUTE_FAILED":
change_set_execution_status_failed_error: ChangeSetExecutionStatusFailedError = ChangeSetExecutionStatusFailedError(
stack_id=describe_change_set_state["StackId"],
change_set_id=describe_change_set_state["ChangeSetId"]
)
context.logger.error(f"{change_set_execution_status_failed_error}")
context.step(send_notification(f"{change_set_execution_status_failed_error}"))
raise change_set_execution_status_failed_error
elif describe_change_set_state["ExecutionStatus"] == "OBSOLETE":
obsolete_message = (
f"Change set is obsolete.\n"
f"Stack ID: {describe_change_set_state['StackId']}\n"
f"Change Set ID: {describe_change_set_state['ChangeSetId']}\n"
f"\n"
f"Please check the stack status and consider re-running the healing process if necessary.\n"
f"URL: {change_set_url}"
)
context.logger.warning(obsolete_message)
context.step(send_notification(obsolete_message))
elif describe_change_set_state["ExecutionStatus"] == "EXECUTE_COMPLETE":
success_message = (
f"Change set execution completed successfully.\n"
f"Stack ID: {describe_change_set_state['StackId']}\n"
f"Change Set ID: {describe_change_set_state['ChangeSetId']}\n"
f"\n"
f"Stack drift has been healed."
)
context.logger.info(success_message)
context.step(send_notification(success_message))
return
動作確認
実際に test スタックで作成した SSM Parameter の値を変更してドリフトを発生させ、展開した Lambda 関数を実行してみます。
aws lambda invoke \
--function-name arn:aws:lambda:<region>:<account-id>:function:cfn-drift-healing:Alias \
--invocation-type Event \
--cli-binary-format raw-in-base64-out \
--payload '{"StackName": "test"}' \
response.json
ログ等を確認すると、以下の流れで自動修復が実施されていることが分かります。
- ドリフト検出開始
- ドリフト状態を検知
- ドリフト認識変更セットを作成
- 変更セットを実行
- ドリフト解消
修復完了を SNS 通知によっても確認できます。
また、以下のように Durable Functions 実行時にイベントで CreateChangeSetOnly: true を渡すことで、変更セットの作成までに留め変更セットの実行は人に委ねることも可能です。
aws lambda invoke \
--function-name arn:aws:lambda:<region>:<account-id>:function:cfn-drift-healing:Alias \
--invocation-type Event \
--cli-binary-format raw-in-base64-out \
--payload '{"StackName": "test", "CreateChangeSetOnly": true}' \
response.json
さいごに
CloudFormation の ドリフト認識変更セット により、これまで手間だったドリフト修復を安全かつ容易に実施できるようになりました。
本記事ではこのドリフト認識変更セットを用いて、自動的にドリフトの修正を行う Configuration Healing の仕組みを Durable Functions で実装しました。
IaC を利用していても、長期間運用される環境では意図しない変更によるドリフトは避けられません。
EventBridge Scheduler 等と組み合わせて定期的に実行することで、こういった意図しない変更や、「後で直そう」と思ったまま放置されていた変更に起因するドリフトを機械的に修正でき、IaC による構成管理を継続的に維持しやすくなります。
なお、本番環境等において強制的に自動修復を行うことに抵抗がある場合は、記事中でも触れたように変更セットの作成までに留め、人によるレビューを挟む運用にすると良いでしょう。
本記事の内容が、CloudFormation を利用した Configuration Healing の実装例として参考になれば幸いです。










