0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

[自分用]Lambda関数で管理アカウントからメンバーアカウントのデフォルトVPCを削除

Posted at

Organizationsでアカウント作成し、デフォルトVPCを削除するためのLambda関数を記載

import boto3
import os
import logging
import time
from datetime import datetime

region = os.environ.get("AWS_REGION", "ap-northeast-1")
logs_client = boto3.client('logs', region_name=region)
logger = logging.getLogger()
logger.setLevel(logging.INFO)

formatter = logging.Formatter('[%(levelname)s] - %(message)s')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.handlers.clear()
logger.addHandler(handler)

# CloudWatchログに特定のキーワードが出力されたか確認
def confirm_log_written(log_group_name, log_stream_name, keyword="CloudWatch Logs Output Check"):
    logger.info(f"{keyword}")

    for _ in range(5):
        try:
            events = logs_client.get_log_events(
                logGroupName=log_group_name,
                logStreamName=log_stream_name,
                startFromHead=True
            )['events']

            messages = [e['message'] for e in events]
            if any(keyword in m for m in messages):
                logger.info("CloudWatch Logs Output OK")
                return True

            time.sleep(2)
        except logs_client.exceptions.ResourceNotFoundException:
            time.sleep(2)
        except Exception as e:
            logger.error("Log Write Failed", exc_info=True)
            return False

    return False

# 指定アカウントのIAMロールを引き受けて一時クレデンシャルを取得
def assume_role(account_id):
    sts = boto3.client('sts')
    role_arn = f"arn:aws:iam::{account_id}:role/OrganizationAccountAccessRole"
    creds = sts.assume_role(RoleArn=role_arn, RoleSessionName='delete-vpc-session')['Credentials']
    return creds

# 指定アカウントで利用可能なEC2リージョンを取得
def get_regions(account_id):
    creds = assume_role(account_id)
    ec2 = boto3.client(
        "ec2",
        aws_access_key_id=creds["AccessKeyId"],
        aws_secret_access_key=creds["SecretAccessKey"],
        aws_session_token=creds["SessionToken"]
    )
    regions = ec2.describe_regions()["Regions"]
    region_names = []
    for region in regions:
        region_names.append(region["RegionName"])
    return region_names

# 指定VPCに関連付けられているサブネットを削除
def delete_subnets(ec2, vpc_id, region_name):
    try:
        subnets = ec2.describe_subnets(Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}])['Subnets']
        for subnet in subnets:
            subnet_id = subnet['SubnetId']
            ec2.delete_subnet(SubnetId=subnet_id)
            logger.info(f"Deleted Subnet: {subnet_id} in {region_name}")
    except Exception as e:
        logger.error(f"Failed to delete subnets for VPC {vpc_id} in {region_name}: {e}")
        raise

# 指定VPCに関連付けられているインターネットゲートウェイをデタッチして削除
def detach_and_delete_igws(ec2, vpc_id, region_name):
    try:
        igws = ec2.describe_internet_gateways(Filters=[{'Name': 'attachment.vpc-id', 'Values': [vpc_id]}])['InternetGateways']
        for igw in igws:
            igw_id = igw['InternetGatewayId']
            ec2.detach_internet_gateway(InternetGatewayId=igw_id, VpcId=vpc_id)
            ec2.delete_internet_gateway(InternetGatewayId=igw_id)
            logger.info(f"Detached and Deleted Internet Gateway: {igw_id} in {region_name}")
    except Exception as e:
        logger.error(f"Failed to delete internet gateways for VPC {vpc_id} in {region_name}: {e}")
        raise

# 各リージョンでデフォルトVPCを検出し、関連リソースを削除後にVPCを削除
def delete_default_vpcs(account_id, regions):
    creds = assume_role(account_id)
    for region_name in regions:
        ec2 = boto3.client(
            'ec2',
            region_name=region_name,
            aws_access_key_id=creds['AccessKeyId'],
            aws_secret_access_key=creds['SecretAccessKey'],
            aws_session_token=creds['SessionToken']
        )
        try:
            vpcs = ec2.describe_vpcs(Filters=[{"Name": "isDefault", "Values": ["true"]}])['Vpcs']
            if vpcs:
                vpc_id = vpcs[0]['VpcId']
                logger.info(f"Default VPC found in {region_name}: {vpc_id}")
                delete_subnets(ec2, vpc_id, region_name)
                detach_and_delete_igws(ec2, vpc_id, region_name)
                ec2.delete_vpc(VpcId=vpc_id)
                logger.info(f"Deleted default VPC in {region_name}")
            else:
                logger.info(f"Default VPC does not exist in {region_name}")
        except Exception as e:
            logger.error(f"{account_id} - Failed to delete default VPC in {region_name}: {e}")
            raise

# Lambda関数のエントリポイント
def lambda_handler(event, context):
    # ログストリーム名取得
    log_stream_name = context.log_stream_name
    # CloudWatch Logs にログが出力されたか確認
    if not confirm_log_written(context.log_group_name, log_stream_name):
        return {
            "status": "error",
            "message": "CloudWatch Logs output not found",
            "logname": f"{log_stream_name}"
        }

    # イベントからアカウントIDを取得し、フォーマットチェック
    try:
        logger.info("Input argument Check")
        account_id = event.get('account_id')
        if not account_id or not account_id.isdigit() or len(account_id) != 12:
            raise ValueError('Invalid input argument')
        logger.info("Input argument is valid")
    except ValueError as e:
        logger.error(f"{account_id} - {e}")
        return {
            "status": "error",
            "error_code": "401",
            "message": "Invalid input argument",
            "logname": f"{log_stream_name}"
        }
    except Exception as e:
        logger.error(f"{account_id} - Error occurred during input validation: {e}")
        return {
            "status": "error",
            "message": f"Error occurred during input validation: {e}",
            "logname": f"{log_stream_name}"
        }

    # 全リージョンを取得
    try:
        regions = get_regions(account_id)
    except Exception as e:
        logger.error(f"{account_id} - Failed to retrieve regions: {e}")
        return {
            "status": "error",
            "message": "Failed to retrieve regions",
            "logname": f"{log_stream_name}"
        }

    # 全リージョンでデフォルトVPCを削除
    try:
        delete_default_vpcs(account_id, regions)
        logger.info(f"{account_id} - Lambda executed successfully: Default VPC deleted")
    except Exception as e:
        return {
            "status": "error",
            "error_code": "401",
            "message": f"Failed to delete default VPCs: {e}",
            "logname": f"{log_stream_name}"
        }

    return {
        "status": "success",
        "message": "Default VPCs deleted successfully",
        "logname": f"{log_stream_name}"
    }

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?