Organizationsでアカウント作成し、デフォルトVPCを削除するためのLambda関数を記載
import boto3
import os
import logging
import time
from datetime import datetime
region = os.environ.get("AWS_REGION", "ap-northeast-1")
logs_client = boto3.client('logs', region_name=region)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter('[%(levelname)s] - %(message)s')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.handlers.clear()
logger.addHandler(handler)
# CloudWatchログに特定のキーワードが出力されたか確認
def confirm_log_written(log_group_name, log_stream_name, keyword="CloudWatch Logs Output Check"):
logger.info(f"{keyword}")
for _ in range(5):
try:
events = logs_client.get_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
startFromHead=True
)['events']
messages = [e['message'] for e in events]
if any(keyword in m for m in messages):
logger.info("CloudWatch Logs Output OK")
return True
time.sleep(2)
except logs_client.exceptions.ResourceNotFoundException:
time.sleep(2)
except Exception as e:
logger.error("Log Write Failed", exc_info=True)
return False
return False
# 指定アカウントのIAMロールを引き受けて一時クレデンシャルを取得
def assume_role(account_id):
sts = boto3.client('sts')
role_arn = f"arn:aws:iam::{account_id}:role/OrganizationAccountAccessRole"
creds = sts.assume_role(RoleArn=role_arn, RoleSessionName='delete-vpc-session')['Credentials']
return creds
# 指定アカウントで利用可能なEC2リージョンを取得
def get_regions(account_id):
creds = assume_role(account_id)
ec2 = boto3.client(
"ec2",
aws_access_key_id=creds["AccessKeyId"],
aws_secret_access_key=creds["SecretAccessKey"],
aws_session_token=creds["SessionToken"]
)
regions = ec2.describe_regions()["Regions"]
region_names = []
for region in regions:
region_names.append(region["RegionName"])
return region_names
# 指定VPCに関連付けられているサブネットを削除
def delete_subnets(ec2, vpc_id, region_name):
try:
subnets = ec2.describe_subnets(Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}])['Subnets']
for subnet in subnets:
subnet_id = subnet['SubnetId']
ec2.delete_subnet(SubnetId=subnet_id)
logger.info(f"Deleted Subnet: {subnet_id} in {region_name}")
except Exception as e:
logger.error(f"Failed to delete subnets for VPC {vpc_id} in {region_name}: {e}")
raise
# 指定VPCに関連付けられているインターネットゲートウェイをデタッチして削除
def detach_and_delete_igws(ec2, vpc_id, region_name):
try:
igws = ec2.describe_internet_gateways(Filters=[{'Name': 'attachment.vpc-id', 'Values': [vpc_id]}])['InternetGateways']
for igw in igws:
igw_id = igw['InternetGatewayId']
ec2.detach_internet_gateway(InternetGatewayId=igw_id, VpcId=vpc_id)
ec2.delete_internet_gateway(InternetGatewayId=igw_id)
logger.info(f"Detached and Deleted Internet Gateway: {igw_id} in {region_name}")
except Exception as e:
logger.error(f"Failed to delete internet gateways for VPC {vpc_id} in {region_name}: {e}")
raise
# 各リージョンでデフォルトVPCを検出し、関連リソースを削除後にVPCを削除
def delete_default_vpcs(account_id, regions):
creds = assume_role(account_id)
for region_name in regions:
ec2 = boto3.client(
'ec2',
region_name=region_name,
aws_access_key_id=creds['AccessKeyId'],
aws_secret_access_key=creds['SecretAccessKey'],
aws_session_token=creds['SessionToken']
)
try:
vpcs = ec2.describe_vpcs(Filters=[{"Name": "isDefault", "Values": ["true"]}])['Vpcs']
if vpcs:
vpc_id = vpcs[0]['VpcId']
logger.info(f"Default VPC found in {region_name}: {vpc_id}")
delete_subnets(ec2, vpc_id, region_name)
detach_and_delete_igws(ec2, vpc_id, region_name)
ec2.delete_vpc(VpcId=vpc_id)
logger.info(f"Deleted default VPC in {region_name}")
else:
logger.info(f"Default VPC does not exist in {region_name}")
except Exception as e:
logger.error(f"{account_id} - Failed to delete default VPC in {region_name}: {e}")
raise
# Lambda関数のエントリポイント
def lambda_handler(event, context):
# ログストリーム名取得
log_stream_name = context.log_stream_name
# CloudWatch Logs にログが出力されたか確認
if not confirm_log_written(context.log_group_name, log_stream_name):
return {
"status": "error",
"message": "CloudWatch Logs output not found",
"logname": f"{log_stream_name}"
}
# イベントからアカウントIDを取得し、フォーマットチェック
try:
logger.info("Input argument Check")
account_id = event.get('account_id')
if not account_id or not account_id.isdigit() or len(account_id) != 12:
raise ValueError('Invalid input argument')
logger.info("Input argument is valid")
except ValueError as e:
logger.error(f"{account_id} - {e}")
return {
"status": "error",
"error_code": "401",
"message": "Invalid input argument",
"logname": f"{log_stream_name}"
}
except Exception as e:
logger.error(f"{account_id} - Error occurred during input validation: {e}")
return {
"status": "error",
"message": f"Error occurred during input validation: {e}",
"logname": f"{log_stream_name}"
}
# 全リージョンを取得
try:
regions = get_regions(account_id)
except Exception as e:
logger.error(f"{account_id} - Failed to retrieve regions: {e}")
return {
"status": "error",
"message": "Failed to retrieve regions",
"logname": f"{log_stream_name}"
}
# 全リージョンでデフォルトVPCを削除
try:
delete_default_vpcs(account_id, regions)
logger.info(f"{account_id} - Lambda executed successfully: Default VPC deleted")
except Exception as e:
return {
"status": "error",
"error_code": "401",
"message": f"Failed to delete default VPCs: {e}",
"logname": f"{log_stream_name}"
}
return {
"status": "success",
"message": "Default VPCs deleted successfully",
"logname": f"{log_stream_name}"
}