既存のSecurityGroupをCloudFormationテンプレートにする
Usage: script.py --profile <aws profile> > cfn_template.json
これでJSONテンプレートが出来上がる。
** ImportStackするには、別途resources-to-import用のJSONが必要
** プロファイルは、適宜設定が必要
#!/usr/bin/env python
import boto3
import json
import ipaddress
import argparse
import sys
import re
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--profile", required=True)
args = parser.parse_args()
if not args.profile:
sys.exit("ERROR: Argument is missing.\nFor example: ./" + sys.args[0] + "--profile default")
profile = args.profile
session = boto3.session.Session(region_name='ap-northeast-1', profile_name=profile)
ec2 = session.client('ec2')
# すべてのSecurityGroupをリストして情報取得
security_groups = ec2.describe_security_groups()
security_groups_result = security_groups["SecurityGroups"]
security_groups_number_of = len(security_groups_result)
# すべてのSecurityGroupのすべてのルールをリストして情報取得
page_iter = ec2.get_paginator("describe_security_group_rules")
security_group_rules_list = []
for page in page_iter.paginate(PaginationConfig=dict(PageSize=1000)):
security_group_rules_list.extend(page["SecurityGroupRules"])
security_group_rules_number_of = len(security_group_rules_list)
security_groups_list = []
dict = {}
def convert_sg_to_template():
parent_dict = {}
# SecurityGroupを1個ずつ処理する
for security_group in security_groups_result:
rule = {}
tags = {}
# タグがあればtags辞書に追加
if "Tags" in security_group:
tags["Tags"] = security_group["Tags"]
else:
pass
# 最初のforで抽出したSecurityGroupを軸にして、ルールを1個ずつ処理する
for security_group_rule in security_group_rules_list:
properties = {}
type = {}
vpc = {}
import_value = {}
sub = {}
# SecurityGroupIdと同じIDをフィルタ
if security_group["GroupId"] == security_group_rule["GroupId"]:
# SecurityGroup名
security_group_name = security_group["GroupName"]
# Descriptionが無ければ「-」を付ける
security_group_description = security_group["Description"]
if "Description" in security_group_rule:
rule_description = security_group_rule["Description"]
else:
rule_description = "-"
# ルールパラメータを取得
IpProtocol = security_group_rule["IpProtocol"]
FromPort = security_group_rule["FromPort"]
ToPort = security_group_rule["ToPort"]
# ルールがEgressルールかどうかを判断
IsEgress = str(security_group_rule["IsEgress"])
# Source/DestinationがCidrなのかその他(ID参照系)なのかを判断
if "CidrIpv4" in security_group_rule:
CidrIp = security_group_rule["CidrIpv4"]
elif "ReferencedGroupInfo" in security_group_rule:
CidrIp = security_group_rule["ReferencedGroupInfo"]["GroupId"]
# Ingressルールの処理
if IsEgress == "False":
try:
try:
ipaddress.ip_network(CidrIp)
rule["SecurityGroupIngress"].append({"Description":rule_description, "IpProtocol":IpProtocol, "FromPort":str(FromPort), "ToPort":str(ToPort), "CidrIp":str(CidrIp)})
except ValueError:
rule["SecurityGroupIngress"].append({"Description":rule_description, "IpProtocol":IpProtocol, "FromPort":str(FromPort), "ToPort":str(ToPort), "SourceSecurityGroupId":str(CidrIp)})
except KeyError:
try:
ipaddress.ip_network(CidrIp)
rule["SecurityGroupIngress"] = [{"Description":rule_description, "IpProtocol":IpProtocol, "FromPort":str(FromPort), "ToPort":str(ToPort), "CidrIp":str(CidrIp)}]
except ValueError:
rule["SecurityGroupIngress"] = [{"Description":rule_description, "IpProtocol":IpProtocol, "FromPort":str(FromPort), "ToPort":str(ToPort), "SourceSecurityGroupId":str(CidrIp)}]
# Egressルールの処理
if IsEgress == "True":
try:
try:
ipaddress.ip_network(CidrIp)
rule["SecurityGroupEgress"].append({"Description":rule_description, "IpProtocol":IpProtocol, "FromPort":str(FromPort), "ToPort":str(ToPort), "CidrIp":str(CidrIp)})
except ValueError:
if CidrIp.startswith("sg-"):
rule["SecurityGroupEgress"].append({"Description":rule_description, "IpProtocol":IpProtocol, "FromPort":str(FromPort), "ToPort":str(ToPort), "DestinationSecurityGroupId":str(CidrIp)})
if CidrIp.startswith("pl-"):
rule["SecurityGroupEgress"].append({"Description":rule_description, "IpProtocol":IpProtocol, "FromPort":str(FromPort), "ToPort":str(ToPort), "DestinationPrefixListId":str(CidrIp)})
except KeyError:
try:
ipaddress.ip_network(CidrIp)
rule["SecurityGroupEgress"] = [{"Description":rule_description, "IpProtocol":IpProtocol, "FromPort":str(FromPort), "ToPort":str(ToPort), "CidrIp":str(CidrIp)}]
except ValueError:
rule["SecurityGroupEgress"] = [{"Description":rule_description, "IpProtocol":IpProtocol, "FromPort":str(FromPort), "ToPort":str(ToPort), "DestinationSecurityGroupId":str(CidrIp)}]
vpc["VpcId"] = security_group["VpcId"]
# SecurityGroup名をリソース名に変換
logical_name = re.sub('[-/._ ]','',security_group_name)
# Propertiesの処理
properties["Properties"] = {"GroupName":security_group_name}
properties["Properties"].update({"GroupDescription":security_group_description})
properties["Properties"].update(rule)
properties["Properties"].update(tags)
properties["Properties"].update(vpc)
type["Type"] = "AWS::EC2::SecurityGroup"
dict[logical_name] = type
dict[logical_name].update(properties)
# 最初のforで抽出したSecurityGroupを親辞書に追加
parent_dict["Resources"] = dict
print(json.dumps(parent_dict))
convert_sg_to_template()