はじめに
AWSで基盤を構築しているとリソースがごちゃごちゃしてきて収集つかなくなることありませんか
私はあります。
このEC2はどこのVPCに入っているんだ?
このVPCの中に入っているリソースはどれだ?
やりたいこと
VPCに含まれるリソースの一覧を取得したい
以下を想定しています。
- VPCを指定する。
- 指定したVPCの中にEC2、RDSが含まれている。
- EC2の関連リソース、RDSの関連リソースも取得したい。
VPC
VPCでは以下のリソースを取得します。
- サブネット
- ルートテーブル
- インターネットゲートウェイ
- Egress-only インターネットゲートウェイ
- キャリアゲートウェイ
- Elastic IP
- NAT ゲートウェイ
- ピアリング接続
- ネットワーク ACL
EC2
EC2では以下のリソースを取得します。
- セキュリティグループ
- EBSボリューム
- スナップショット
- キーペア
- ネットワークインターフェイス
RDS
RDSでは以下のリソースを取得します。
- スナップショット
- プロキシ
- サブネットグループ
さっそく、やってみよう!
Lambdaを新規に作成します。言語はPythonとします。
リソース情報を取得するため、Lambdaの実行ロールに以下のポリシーを付与します。
Lambdaに関数URLを使うと簡単に実行できます。
それぞれ以下のようにboto3を使ってリソースの情報を取得します。
VPC - サブネット
def get_subnets(vpc_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_subnets(
Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}]
)
return [subnet['SubnetId'] for subnet in response['Subnets']]
VPC - ルートテーブル
def get_route_tables(vpc_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_route_tables(
Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}]
)
return [route_table['RouteTableId'] for route_table in response['RouteTables']]
VPC - インターネットゲートウェイ
def get_internet_gateways(vpc_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_internet_gateways(
Filters=[{'Name': 'attachment.vpc-id', 'Values': [vpc_id]}]
)
return [igw['InternetGatewayId'] for igw in response['InternetGateways']]
VPC - Egress-only インターネットゲートウェイ
def get_egress_only_internet_gateways(vpc_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_egress_only_internet_gateways(
Filters=[{'Name': 'attachment.vpc-id', 'Values': [vpc_id]}]
)
return [eogw['EgressOnlyInternetGatewayId'] for eogw in response['EgressOnlyInternetGateways']]
VPC - キャリアゲートウェイ
def get_carrier_gateways(vpc_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_carrier_gateways(
Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}]
)
return [cg['CarrierGatewayId'] for cg in response['CarrierGateways']]
VPC - Elastic IP
def get_elastic_ips(vpc_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_addresses(
Filters=[{'Name': 'domain', 'Values': ['vpc']}]
)
return [eip['AllocationId'] for eip in response['Addresses'] if 'AssociationId' in eip and eip['AssociationId'].startswith('eipassoc')]
VPC - NAT ゲートウェイ
def get_nat_gateways(vpc_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_nat_gateways(
Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}]
)
return [ngw['NatGatewayId'] for ngw in response['NatGateways']]
VPC - ピアリング接続
def get_vpc_peering_connections(vpc_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_vpc_peering_connections(
Filters=[
{'Name': 'requester-vpc-info.vpc-id', 'Values': [vpc_id]},
{'Name': 'accepter-vpc-info.vpc-id', 'Values': [vpc_id]}
]
)
return [vpc_pc['VpcPeeringConnectionId'] for vpc_pc in response['VpcPeeringConnections']]
VPC - ネットワーク ACL
def get_network_acls(subnet_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_network_acls(
Filters=[{'Name': 'association.subnet-id', 'Values': [subnet_id]}]
)
return [nacl['NetworkAclId'] for nacl in response['NetworkAcls']]
EC2
def get_ec2_instances(vpc_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_instances(
Filters=[
{'Name': 'vpc-id', 'Values': [vpc_id]}
]
)
instances = []
for reservation in response['Reservations']:
for instance in reservation['Instances']:
instances.append(instance['InstanceId'])
return instances
EC2 - セキュリティグループ
def get_security_groups(instance_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_instances(InstanceIds=[instance_id])
return [sg['GroupId'] for sg in response['Reservations'][0]['Instances'][0]['SecurityGroups']]
EC2 - EBSボリューム
ddef get_ebs_volumes(instance_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_volumes(
Filters=[{'Name': 'attachment.instance-id', 'Values': [instance_id]}]
)
return [volume['VolumeId'] for volume in response['Volumes']]
EC2 - スナップショット
def get_snapshots(volume_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_snapshots(
Filters=[{'Name': 'volume-id', 'Values': [volume_id]}]
)
return [snapshot['SnapshotId'] for snapshot in response['Snapshots']]
EC2 - キーペア
def get_key_pairs(instance_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_instances(InstanceIds=[instance_id])
key_name = response['Reservations'][0]['Instances'][0].get('KeyName', None)
return [key_name] if key_name else []
EC2 - ネットワークインターフェイス
def get_network_interfaces(instance_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_network_interfaces(
Filters=[{'Name': 'attachment.instance-id', 'Values': [instance_id]}]
)
return [ni['NetworkInterfaceId'] for ni in response['NetworkInterfaces']]
RDS
def get_rds_instances(vpc_id):
rds_client = boto3.client('rds')
response = rds_client.describe_db_instances()
instances = []
for db_instance in response['DBInstances']:
if db_instance['DBSubnetGroup']['VpcId'] == vpc_id:
instances.append(db_instance['DBInstanceIdentifier'])
return instances
RDS - スナップショット
def get_rds_snapshots(db_instance_id):
rds_client = boto3.client('rds')
response = rds_client.describe_db_snapshots(
DBInstanceIdentifier=db_instance_id
)
return [snapshot['DBSnapshotIdentifier'] for snapshot in response['DBSnapshots']]
RDS - プロキシ
def get_rds_proxies(db_instance_id):
rds_client = boto3.client('rds')
proxies = []
# すべてのDBプロキシを取得
response = rds_client.describe_db_proxies()
# 各プロキシについて、ターゲットグループを調べる
for proxy in response['DBProxies']:
target_groups_response = rds_client.describe_db_proxy_target_groups(
DBProxyName=proxy['DBProxyName']
)
# 各ターゲットグループのターゲットを調べ、指定されたDBインスタンスIDが含まれているか確認
for target_group in target_groups_response['TargetGroups']:
target_response = rds_client.describe_db_proxy_targets(
DBProxyName=proxy['DBProxyName'],
TargetGroupName=target_group['TargetGroupName']
)
for target in target_response['Targets']:
if target['RdsResourceId'] == db_instance_id:
proxies.append(proxy['DBProxyName'])
break # プロキシ名を追加したら、次のプロキシへ
return proxies
RDS - サブネットグループ
def get_rds_subnet_groups(db_instance_id):
rds_client = boto3.client('rds')
response = rds_client.describe_db_instances(DBInstanceIdentifier=db_instance_id)
db_subnet_group = response['DBInstances'][0]['DBSubnetGroup']['DBSubnetGroupName']
return db_subnet_group
プログラム全体は以下に載せておきます。
プログラム
lambda_function.py
import boto3
import json
def get_subnets(vpc_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_subnets(
Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}]
)
return [subnet['SubnetId'] for subnet in response['Subnets']]
def get_route_tables(vpc_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_route_tables(
Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}]
)
return [route_table['RouteTableId'] for route_table in response['RouteTables']]
def get_internet_gateways(vpc_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_internet_gateways(
Filters=[{'Name': 'attachment.vpc-id', 'Values': [vpc_id]}]
)
return [igw['InternetGatewayId'] for igw in response['InternetGateways']]
def get_egress_only_internet_gateways(vpc_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_egress_only_internet_gateways(
Filters=[{'Name': 'attachment.vpc-id', 'Values': [vpc_id]}]
)
return [eogw['EgressOnlyInternetGatewayId'] for eogw in response['EgressOnlyInternetGateways']]
def get_carrier_gateways(vpc_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_carrier_gateways(
Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}]
)
return [cg['CarrierGatewayId'] for cg in response['CarrierGateways']]
def get_elastic_ips(vpc_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_addresses(
Filters=[{'Name': 'domain', 'Values': ['vpc']}]
)
return [eip['AllocationId'] for eip in response['Addresses'] if 'AssociationId' in eip and eip['AssociationId'].startswith('eipassoc')]
def get_nat_gateways(vpc_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_nat_gateways(
Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}]
)
return [ngw['NatGatewayId'] for ngw in response['NatGateways']]
def get_vpc_peering_connections(vpc_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_vpc_peering_connections(
Filters=[
{'Name': 'requester-vpc-info.vpc-id', 'Values': [vpc_id]},
{'Name': 'accepter-vpc-info.vpc-id', 'Values': [vpc_id]}
]
)
return [vpc_pc['VpcPeeringConnectionId'] for vpc_pc in response['VpcPeeringConnections']]
def get_network_acls(subnet_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_network_acls(
Filters=[{'Name': 'association.subnet-id', 'Values': [subnet_id]}]
)
return [nacl['NetworkAclId'] for nacl in response['NetworkAcls']]
def get_ec2_instances(vpc_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_instances(
Filters=[
{'Name': 'vpc-id', 'Values': [vpc_id]}
]
)
instances = []
for reservation in response['Reservations']:
for instance in reservation['Instances']:
instances.append(instance['InstanceId'])
return instances
def get_security_groups(instance_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_instances(InstanceIds=[instance_id])
return [sg['GroupId'] for sg in response['Reservations'][0]['Instances'][0]['SecurityGroups']]
def get_ebs_volumes(instance_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_volumes(
Filters=[{'Name': 'attachment.instance-id', 'Values': [instance_id]}]
)
return [volume['VolumeId'] for volume in response['Volumes']]
def get_snapshots(volume_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_snapshots(
Filters=[{'Name': 'volume-id', 'Values': [volume_id]}]
)
return [snapshot['SnapshotId'] for snapshot in response['Snapshots']]
def get_key_pairs(instance_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_instances(InstanceIds=[instance_id])
key_name = response['Reservations'][0]['Instances'][0].get('KeyName', None)
return [key_name] if key_name else []
def get_network_interfaces(instance_id):
ec2_client = boto3.client('ec2')
response = ec2_client.describe_network_interfaces(
Filters=[{'Name': 'attachment.instance-id', 'Values': [instance_id]}]
)
return [ni['NetworkInterfaceId'] for ni in response['NetworkInterfaces']]
def get_rds_instances(vpc_id):
rds_client = boto3.client('rds')
response = rds_client.describe_db_instances()
instances = []
for db_instance in response['DBInstances']:
if db_instance['DBSubnetGroup']['VpcId'] == vpc_id:
instances.append(db_instance['DBInstanceIdentifier'])
return instances
def get_rds_snapshots(db_instance_id):
rds_client = boto3.client('rds')
response = rds_client.describe_db_snapshots(
DBInstanceIdentifier=db_instance_id
)
return [snapshot['DBSnapshotIdentifier'] for snapshot in response['DBSnapshots']]
def get_rds_proxies(db_instance_id):
rds_client = boto3.client('rds')
proxies = []
# すべてのDBプロキシを取得
response = rds_client.describe_db_proxies()
# 各プロキシについて、ターゲットグループを調べる
for proxy in response['DBProxies']:
target_groups_response = rds_client.describe_db_proxy_target_groups(
DBProxyName=proxy['DBProxyName']
)
# 各ターゲットグループのターゲットを調べ、指定されたDBインスタンスIDが含まれているか確認
for target_group in target_groups_response['TargetGroups']:
target_response = rds_client.describe_db_proxy_targets(
DBProxyName=proxy['DBProxyName'],
TargetGroupName=target_group['TargetGroupName']
)
for target in target_response['Targets']:
if target['RdsResourceId'] == db_instance_id:
proxies.append(proxy['DBProxyName'])
break # プロキシ名を追加したら、次のプロキシへ
return proxies
def get_rds_subnet_groups(db_instance_id):
rds_client = boto3.client('rds')
response = rds_client.describe_db_instances(DBInstanceIdentifier=db_instance_id)
db_subnet_group = response['DBInstances'][0]['DBSubnetGroup']['DBSubnetGroupName']
return db_subnet_group
def lambda_handler(event, context):
vpc_id = event['vpc_id'] # VPC IDをイベントから取得
# 各リソースを取得
subnets = get_subnets(vpc_id)
route_tables = get_route_tables(vpc_id)
internet_gateways = get_internet_gateways(vpc_id)
egress_only_igs = get_egress_only_internet_gateways(vpc_id)
carrier_gateways = get_carrier_gateways(vpc_id)
elastic_ips = get_elastic_ips(vpc_id)
nat_gateways = get_nat_gateways(vpc_id)
vpc_peering_connections = get_vpc_peering_connections(vpc_id)
ec2_instances = get_ec2_instances(vpc_id)
rds_instances = get_rds_instances(vpc_id)
# EC2関連リソースを取得
ec2_resources = {}
for instance_id in ec2_instances:
ec2_resources[instance_id] = {
'SecurityGroups': get_security_groups(instance_id),
'EBSVolumes': get_ebs_volumes(instance_id),
'Snapshots': [],
'KeyPairs': get_key_pairs(instance_id),
'NetworkInterfaces': get_network_interfaces(instance_id)
}
# 各EBSボリュームに対応するスナップショットを取得
for volume_id in ec2_resources[instance_id]['EBSVolumes']:
ec2_resources[instance_id]['Snapshots'].extend(get_snapshots(volume_id))
# RDS関連リソースを取得
rds_resources = {}
for db_instance_id in rds_instances:
rds_resources[db_instance_id] = {
'Snapshots': get_rds_snapshots(db_instance_id),
'Proxies': get_rds_proxies(db_instance_id),
'SubnetGroups': get_rds_subnet_groups(db_instance_id)
}
# サブネットに紐づくネットワークACLを取得
network_acls = {}
for subnet_id in subnets:
network_acls[subnet_id] = get_network_acls(subnet_id)
resources = {
'Subnets': subnets,
'RouteTables': route_tables,
'InternetGateways': internet_gateways,
'EgressOnlyInternetGateways': egress_only_igs,
'CarrierGateways': carrier_gateways,
'ElasticIPs': elastic_ips,
'NATGateways': nat_gateways,
'VpcPeeringConnections': vpc_peering_connections,
'NetworkAcls': network_acls,
'EC2Resources': ec2_resources,
'RDSResources': rds_resources
}
print(resources) # コンソールに結果を出力
return {
'statusCode': 200,
'body': json.dumps(resources, indent=4)
}
ブラウザでLambdaの関数URLを実行すると以下のように出力されます。
いいね!
さいごに
- 不要になったリソースを削除するときに、どのリソースとどのリソースに紐づいているのかがわかりますね!
- VPCに含まれるALBなど他のリソースも同様にやっていけば紐づけできます!