0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【AWS】VPCのリソース一覧を取得しよう!

Posted at

はじめに

AWSで基盤を構築しているとリソースがごちゃごちゃしてきて収集つかなくなることありませんか:interrobang:

私はあります。
このEC2はどこのVPCに入っているんだ?
このVPCの中に入っているリソースはどれだ?

やりたいこと

VPCに含まれるリソースの一覧を取得したい:bangbang:

以下を想定しています。

  • VPCを指定する。
  • 指定したVPCの中にEC2、RDSが含まれている。
  • EC2の関連リソース、RDSの関連リソースも取得したい。

VPC

VPCでは以下のリソースを取得します。

  • サブネット
  • ルートテーブル
  • インターネットゲートウェイ
  • Egress-only インターネットゲートウェイ
  • キャリアゲートウェイ
  • Elastic IP
  • NAT ゲートウェイ
  • ピアリング接続
  • ネットワーク ACL

EC2

EC2では以下のリソースを取得します。

  • セキュリティグループ
  • EBSボリューム
  • スナップショット
  • キーペア
  • ネットワークインターフェイス

RDS

RDSでは以下のリソースを取得します。

  • スナップショット
  • プロキシ
  • サブネットグループ

さっそく、やってみよう!

Lambdaを新規に作成します。言語はPythonとします。

image.png

リソース情報を取得するため、Lambdaの実行ロールに以下のポリシーを付与します。

image.png

Lambdaに関数URLを使うと簡単に実行できます。

image.png

それぞれ以下のようにboto3を使ってリソースの情報を取得します。

VPC - サブネット
def get_subnets(vpc_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_subnets(
        Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}]
    )
    return [subnet['SubnetId'] for subnet in response['Subnets']]
VPC - ルートテーブル
def get_route_tables(vpc_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_route_tables(
        Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}]
    )
    return [route_table['RouteTableId'] for route_table in response['RouteTables']]
VPC - インターネットゲートウェイ
def get_internet_gateways(vpc_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_internet_gateways(
        Filters=[{'Name': 'attachment.vpc-id', 'Values': [vpc_id]}]
    )
    return [igw['InternetGatewayId'] for igw in response['InternetGateways']]
VPC - Egress-only インターネットゲートウェイ
def get_egress_only_internet_gateways(vpc_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_egress_only_internet_gateways(
        Filters=[{'Name': 'attachment.vpc-id', 'Values': [vpc_id]}]
    )
    return [eogw['EgressOnlyInternetGatewayId'] for eogw in response['EgressOnlyInternetGateways']]
VPC - キャリアゲートウェイ
def get_carrier_gateways(vpc_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_carrier_gateways(
        Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}]
    )
    return [cg['CarrierGatewayId'] for cg in response['CarrierGateways']]
VPC - Elastic IP
def get_elastic_ips(vpc_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_addresses(
        Filters=[{'Name': 'domain', 'Values': ['vpc']}]
    )
    return [eip['AllocationId'] for eip in response['Addresses'] if 'AssociationId' in eip and eip['AssociationId'].startswith('eipassoc')]
VPC - NAT ゲートウェイ
def get_nat_gateways(vpc_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_nat_gateways(
        Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}]
    )
    return [ngw['NatGatewayId'] for ngw in response['NatGateways']]
VPC - ピアリング接続
def get_vpc_peering_connections(vpc_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_vpc_peering_connections(
        Filters=[
            {'Name': 'requester-vpc-info.vpc-id', 'Values': [vpc_id]},
            {'Name': 'accepter-vpc-info.vpc-id', 'Values': [vpc_id]}
        ]
    )
    return [vpc_pc['VpcPeeringConnectionId'] for vpc_pc in response['VpcPeeringConnections']]
VPC - ネットワーク ACL
def get_network_acls(subnet_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_network_acls(
        Filters=[{'Name': 'association.subnet-id', 'Values': [subnet_id]}]
    )
    return [nacl['NetworkAclId'] for nacl in response['NetworkAcls']]
EC2
def get_ec2_instances(vpc_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_instances(
        Filters=[
            {'Name': 'vpc-id', 'Values': [vpc_id]}
        ]
    )
    instances = []
    for reservation in response['Reservations']:
        for instance in reservation['Instances']:
            instances.append(instance['InstanceId'])
    return instances
EC2 - セキュリティグループ
def get_security_groups(instance_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_instances(InstanceIds=[instance_id])
    return [sg['GroupId'] for sg in response['Reservations'][0]['Instances'][0]['SecurityGroups']]
EC2 - EBSボリューム
ddef get_ebs_volumes(instance_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_volumes(
        Filters=[{'Name': 'attachment.instance-id', 'Values': [instance_id]}]
    )
    return [volume['VolumeId'] for volume in response['Volumes']]
EC2 - スナップショット
def get_snapshots(volume_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_snapshots(
        Filters=[{'Name': 'volume-id', 'Values': [volume_id]}]
    )
    return [snapshot['SnapshotId'] for snapshot in response['Snapshots']]
EC2 - キーペア
def get_key_pairs(instance_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_instances(InstanceIds=[instance_id])
    key_name = response['Reservations'][0]['Instances'][0].get('KeyName', None)
    return [key_name] if key_name else []
EC2 - ネットワークインターフェイス
def get_network_interfaces(instance_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_network_interfaces(
        Filters=[{'Name': 'attachment.instance-id', 'Values': [instance_id]}]
    )
    return [ni['NetworkInterfaceId'] for ni in response['NetworkInterfaces']]
RDS
def get_rds_instances(vpc_id):
    rds_client = boto3.client('rds')
    response = rds_client.describe_db_instances()
    instances = []
    for db_instance in response['DBInstances']:
        if db_instance['DBSubnetGroup']['VpcId'] == vpc_id:
            instances.append(db_instance['DBInstanceIdentifier'])
    return instances
RDS - スナップショット
def get_rds_snapshots(db_instance_id):
    rds_client = boto3.client('rds')
    response = rds_client.describe_db_snapshots(
        DBInstanceIdentifier=db_instance_id
    )
    return [snapshot['DBSnapshotIdentifier'] for snapshot in response['DBSnapshots']]
RDS - プロキシ
def get_rds_proxies(db_instance_id):
    rds_client = boto3.client('rds')
    proxies = []
    # すべてのDBプロキシを取得
    response = rds_client.describe_db_proxies()
    # 各プロキシについて、ターゲットグループを調べる
    for proxy in response['DBProxies']:
        target_groups_response = rds_client.describe_db_proxy_target_groups(
            DBProxyName=proxy['DBProxyName']
        )
        # 各ターゲットグループのターゲットを調べ、指定されたDBインスタンスIDが含まれているか確認
        for target_group in target_groups_response['TargetGroups']:
            target_response = rds_client.describe_db_proxy_targets(
                DBProxyName=proxy['DBProxyName'],
                TargetGroupName=target_group['TargetGroupName']
            )
            for target in target_response['Targets']:
                if target['RdsResourceId'] == db_instance_id:
                    proxies.append(proxy['DBProxyName'])
                    break  # プロキシ名を追加したら、次のプロキシへ
    return proxies
RDS - サブネットグループ
def get_rds_subnet_groups(db_instance_id):
    rds_client = boto3.client('rds')
    response = rds_client.describe_db_instances(DBInstanceIdentifier=db_instance_id)
    db_subnet_group = response['DBInstances'][0]['DBSubnetGroup']['DBSubnetGroupName']
    return db_subnet_group

プログラム全体は以下に載せておきます。

プログラム
lambda_function.py
import boto3
import json

def get_subnets(vpc_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_subnets(
        Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}]
    )
    return [subnet['SubnetId'] for subnet in response['Subnets']]

def get_route_tables(vpc_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_route_tables(
        Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}]
    )
    return [route_table['RouteTableId'] for route_table in response['RouteTables']]

def get_internet_gateways(vpc_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_internet_gateways(
        Filters=[{'Name': 'attachment.vpc-id', 'Values': [vpc_id]}]
    )
    return [igw['InternetGatewayId'] for igw in response['InternetGateways']]

def get_egress_only_internet_gateways(vpc_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_egress_only_internet_gateways(
        Filters=[{'Name': 'attachment.vpc-id', 'Values': [vpc_id]}]
    )
    return [eogw['EgressOnlyInternetGatewayId'] for eogw in response['EgressOnlyInternetGateways']]

def get_carrier_gateways(vpc_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_carrier_gateways(
        Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}]
    )
    return [cg['CarrierGatewayId'] for cg in response['CarrierGateways']]

def get_elastic_ips(vpc_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_addresses(
        Filters=[{'Name': 'domain', 'Values': ['vpc']}]
    )
    return [eip['AllocationId'] for eip in response['Addresses'] if 'AssociationId' in eip and eip['AssociationId'].startswith('eipassoc')]

def get_nat_gateways(vpc_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_nat_gateways(
        Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}]
    )
    return [ngw['NatGatewayId'] for ngw in response['NatGateways']]

def get_vpc_peering_connections(vpc_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_vpc_peering_connections(
        Filters=[
            {'Name': 'requester-vpc-info.vpc-id', 'Values': [vpc_id]},
            {'Name': 'accepter-vpc-info.vpc-id', 'Values': [vpc_id]}
        ]
    )
    return [vpc_pc['VpcPeeringConnectionId'] for vpc_pc in response['VpcPeeringConnections']]

def get_network_acls(subnet_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_network_acls(
        Filters=[{'Name': 'association.subnet-id', 'Values': [subnet_id]}]
    )
    return [nacl['NetworkAclId'] for nacl in response['NetworkAcls']]

def get_ec2_instances(vpc_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_instances(
        Filters=[
            {'Name': 'vpc-id', 'Values': [vpc_id]}
        ]
    )
    instances = []
    for reservation in response['Reservations']:
        for instance in reservation['Instances']:
            instances.append(instance['InstanceId'])
    return instances

def get_security_groups(instance_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_instances(InstanceIds=[instance_id])
    return [sg['GroupId'] for sg in response['Reservations'][0]['Instances'][0]['SecurityGroups']]

def get_ebs_volumes(instance_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_volumes(
        Filters=[{'Name': 'attachment.instance-id', 'Values': [instance_id]}]
    )
    return [volume['VolumeId'] for volume in response['Volumes']]

def get_snapshots(volume_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_snapshots(
        Filters=[{'Name': 'volume-id', 'Values': [volume_id]}]
    )
    return [snapshot['SnapshotId'] for snapshot in response['Snapshots']]

def get_key_pairs(instance_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_instances(InstanceIds=[instance_id])
    key_name = response['Reservations'][0]['Instances'][0].get('KeyName', None)
    return [key_name] if key_name else []

def get_network_interfaces(instance_id):
    ec2_client = boto3.client('ec2')
    response = ec2_client.describe_network_interfaces(
        Filters=[{'Name': 'attachment.instance-id', 'Values': [instance_id]}]
    )
    return [ni['NetworkInterfaceId'] for ni in response['NetworkInterfaces']]

def get_rds_instances(vpc_id):
    rds_client = boto3.client('rds')
    response = rds_client.describe_db_instances()
    instances = []
    for db_instance in response['DBInstances']:
        if db_instance['DBSubnetGroup']['VpcId'] == vpc_id:
            instances.append(db_instance['DBInstanceIdentifier'])
    return instances

def get_rds_snapshots(db_instance_id):
    rds_client = boto3.client('rds')
    response = rds_client.describe_db_snapshots(
        DBInstanceIdentifier=db_instance_id
    )
    return [snapshot['DBSnapshotIdentifier'] for snapshot in response['DBSnapshots']]

def get_rds_proxies(db_instance_id):
    rds_client = boto3.client('rds')
    proxies = []
    # すべてのDBプロキシを取得
    response = rds_client.describe_db_proxies()
    # 各プロキシについて、ターゲットグループを調べる
    for proxy in response['DBProxies']:
        target_groups_response = rds_client.describe_db_proxy_target_groups(
            DBProxyName=proxy['DBProxyName']
        )
        # 各ターゲットグループのターゲットを調べ、指定されたDBインスタンスIDが含まれているか確認
        for target_group in target_groups_response['TargetGroups']:
            target_response = rds_client.describe_db_proxy_targets(
                DBProxyName=proxy['DBProxyName'],
                TargetGroupName=target_group['TargetGroupName']
            )
            for target in target_response['Targets']:
                if target['RdsResourceId'] == db_instance_id:
                    proxies.append(proxy['DBProxyName'])
                    break  # プロキシ名を追加したら、次のプロキシへ
    return proxies
    
def get_rds_subnet_groups(db_instance_id):
    rds_client = boto3.client('rds')
    response = rds_client.describe_db_instances(DBInstanceIdentifier=db_instance_id)
    db_subnet_group = response['DBInstances'][0]['DBSubnetGroup']['DBSubnetGroupName']
    return db_subnet_group

def lambda_handler(event, context):
    vpc_id = event['vpc_id']  # VPC IDをイベントから取得

    # 各リソースを取得
    subnets = get_subnets(vpc_id)
    route_tables = get_route_tables(vpc_id)
    internet_gateways = get_internet_gateways(vpc_id)
    egress_only_igs = get_egress_only_internet_gateways(vpc_id)
    carrier_gateways = get_carrier_gateways(vpc_id)
    elastic_ips = get_elastic_ips(vpc_id)
    nat_gateways = get_nat_gateways(vpc_id)
    vpc_peering_connections = get_vpc_peering_connections(vpc_id)
    ec2_instances = get_ec2_instances(vpc_id)
    rds_instances = get_rds_instances(vpc_id)

    # EC2関連リソースを取得
    ec2_resources = {}
    for instance_id in ec2_instances:
        ec2_resources[instance_id] = {
            'SecurityGroups': get_security_groups(instance_id),
            'EBSVolumes': get_ebs_volumes(instance_id),
            'Snapshots': [],
            'KeyPairs': get_key_pairs(instance_id),
            'NetworkInterfaces': get_network_interfaces(instance_id)
        }
        # 各EBSボリュームに対応するスナップショットを取得
        for volume_id in ec2_resources[instance_id]['EBSVolumes']:
            ec2_resources[instance_id]['Snapshots'].extend(get_snapshots(volume_id))

    # RDS関連リソースを取得
    rds_resources = {}
    for db_instance_id in rds_instances:
        rds_resources[db_instance_id] = {
            'Snapshots': get_rds_snapshots(db_instance_id),
            'Proxies': get_rds_proxies(db_instance_id),
            'SubnetGroups': get_rds_subnet_groups(db_instance_id)
        }

    # サブネットに紐づくネットワークACLを取得
    network_acls = {}
    for subnet_id in subnets:
        network_acls[subnet_id] = get_network_acls(subnet_id)

    resources = {
        'Subnets': subnets,
        'RouteTables': route_tables,
        'InternetGateways': internet_gateways,
        'EgressOnlyInternetGateways': egress_only_igs,
        'CarrierGateways': carrier_gateways,
        'ElasticIPs': elastic_ips,
        'NATGateways': nat_gateways,
        'VpcPeeringConnections': vpc_peering_connections,
        'NetworkAcls': network_acls,
        'EC2Resources': ec2_resources,
        'RDSResources': rds_resources
    }

    print(resources)  # コンソールに結果を出力
    return {
        'statusCode': 200,
        'body': json.dumps(resources, indent=4)
    }

ブラウザでLambdaの関数URLを実行すると以下のように出力されます。

image.png

いいね!:thumbsup:

さいごに

  • 不要になったリソースを削除するときに、どのリソースとどのリソースに紐づいているのかがわかりますね!
  • VPCに含まれるALBなど他のリソースも同様にやっていけば紐づけできます!
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?