0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

既存のSecurityGroupをCloudFormationテンプレートにする

Posted at

既存のSecurityGroupをCloudFormationテンプレートにする

Usage: script.py --profile <aws profile> > cfn_template.json

これでJSONテンプレートが出来上がる。

** ImportStackするには、別途resources-to-import用のJSONが必要
** プロファイルは、適宜設定が必要

#!/usr/bin/env python
import boto3
import json
import ipaddress
import argparse
import sys
import re

parser = argparse.ArgumentParser()
parser.add_argument("-p", "--profile", required=True)
args = parser.parse_args()

if not args.profile:
    sys.exit("ERROR: Argument is missing.\nFor example: ./" + sys.args[0] + "--profile default")

profile = args.profile
session = boto3.session.Session(region_name='ap-northeast-1', profile_name=profile)
ec2 = session.client('ec2')

# すべてのSecurityGroupをリストして情報取得
security_groups = ec2.describe_security_groups()
security_groups_result = security_groups["SecurityGroups"]
security_groups_number_of = len(security_groups_result)

# すべてのSecurityGroupのすべてのルールをリストして情報取得
page_iter = ec2.get_paginator("describe_security_group_rules")
security_group_rules_list = []
for page in page_iter.paginate(PaginationConfig=dict(PageSize=1000)):
    security_group_rules_list.extend(page["SecurityGroupRules"])
security_group_rules_number_of = len(security_group_rules_list)

security_groups_list = []
dict = {}

def convert_sg_to_template():
    parent_dict = {}
    # SecurityGroupを1個ずつ処理する
    for security_group in security_groups_result:
        rule = {}
        tags = {}
        # タグがあればtags辞書に追加
        if "Tags" in security_group:
            tags["Tags"] = security_group["Tags"]
        else:
            pass
        # 最初のforで抽出したSecurityGroupを軸にして、ルールを1個ずつ処理する
        for security_group_rule in security_group_rules_list:
            properties = {}
            type = {}
            vpc = {}
            import_value = {}
            sub = {}
            # SecurityGroupIdと同じIDをフィルタ
            if security_group["GroupId"] == security_group_rule["GroupId"]:
                # SecurityGroup名
                security_group_name = security_group["GroupName"]
                # Descriptionが無ければ「-」を付ける
                security_group_description = security_group["Description"]
                if "Description" in security_group_rule:
                    rule_description = security_group_rule["Description"]
                else:
                    rule_description = "-"
                # ルールパラメータを取得
                IpProtocol = security_group_rule["IpProtocol"]
                FromPort = security_group_rule["FromPort"]
                ToPort = security_group_rule["ToPort"]
                # ルールがEgressルールかどうかを判断
                IsEgress = str(security_group_rule["IsEgress"])
                # Source/DestinationがCidrなのかその他(ID参照系)なのかを判断
                if "CidrIpv4" in security_group_rule:
                    CidrIp = security_group_rule["CidrIpv4"]
                elif "ReferencedGroupInfo" in security_group_rule:
                    CidrIp = security_group_rule["ReferencedGroupInfo"]["GroupId"]
                # Ingressルールの処理
                if IsEgress == "False":
                    try:
                        try:
                            ipaddress.ip_network(CidrIp)
                            rule["SecurityGroupIngress"].append({"Description":rule_description, "IpProtocol":IpProtocol, "FromPort":str(FromPort), "ToPort":str(ToPort), "CidrIp":str(CidrIp)})
                        except ValueError:
                            rule["SecurityGroupIngress"].append({"Description":rule_description, "IpProtocol":IpProtocol, "FromPort":str(FromPort), "ToPort":str(ToPort), "SourceSecurityGroupId":str(CidrIp)})
                    except KeyError:
                        try:
                            ipaddress.ip_network(CidrIp)
                            rule["SecurityGroupIngress"] = [{"Description":rule_description, "IpProtocol":IpProtocol, "FromPort":str(FromPort), "ToPort":str(ToPort), "CidrIp":str(CidrIp)}]
                        except ValueError:
                            rule["SecurityGroupIngress"] = [{"Description":rule_description, "IpProtocol":IpProtocol, "FromPort":str(FromPort), "ToPort":str(ToPort), "SourceSecurityGroupId":str(CidrIp)}]
                # Egressルールの処理
                if IsEgress == "True":
                    try:
                        try:
                            ipaddress.ip_network(CidrIp)
                            rule["SecurityGroupEgress"].append({"Description":rule_description, "IpProtocol":IpProtocol, "FromPort":str(FromPort), "ToPort":str(ToPort), "CidrIp":str(CidrIp)})
                        except ValueError:
                            if CidrIp.startswith("sg-"):
                                rule["SecurityGroupEgress"].append({"Description":rule_description, "IpProtocol":IpProtocol, "FromPort":str(FromPort), "ToPort":str(ToPort), "DestinationSecurityGroupId":str(CidrIp)})
                            if CidrIp.startswith("pl-"):
                                rule["SecurityGroupEgress"].append({"Description":rule_description, "IpProtocol":IpProtocol, "FromPort":str(FromPort), "ToPort":str(ToPort), "DestinationPrefixListId":str(CidrIp)})
                    except KeyError:
                        try:
                            ipaddress.ip_network(CidrIp)
                            rule["SecurityGroupEgress"] = [{"Description":rule_description, "IpProtocol":IpProtocol, "FromPort":str(FromPort), "ToPort":str(ToPort), "CidrIp":str(CidrIp)}]
                        except ValueError:
                            rule["SecurityGroupEgress"] = [{"Description":rule_description, "IpProtocol":IpProtocol, "FromPort":str(FromPort), "ToPort":str(ToPort), "DestinationSecurityGroupId":str(CidrIp)}]
        vpc["VpcId"] = security_group["VpcId"]
        # SecurityGroup名をリソース名に変換
        logical_name = re.sub('[-/._ ]','',security_group_name)
        # Propertiesの処理
        properties["Properties"] = {"GroupName":security_group_name}
        properties["Properties"].update({"GroupDescription":security_group_description})
        properties["Properties"].update(rule)
        properties["Properties"].update(tags)
        properties["Properties"].update(vpc)
        type["Type"] = "AWS::EC2::SecurityGroup"
        dict[logical_name] = type
        dict[logical_name].update(properties)
    # 最初のforで抽出したSecurityGroupを親辞書に追加
    parent_dict["Resources"] = dict
    print(json.dumps(parent_dict))

convert_sg_to_template()

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?