はじめに
最近Cloud Questにはまっています。
その中で表題の処理を体験できるものがありましたので、さっそくやってみました。
今回の処理は以下のようになっています。
①Cloudwatch alermで不正アクセスを感知
➁Cloudwatch alermからSNSトピックに連携
③SNSトピックがトリガーなり、Lambda関数を起動
④Lambda関数の処理で、該当インスタンスのIAMロールを解除
では順番に確認していきます。
CloudWatchAgentのインストール
EC2インスタンスがSSMで確認可能ということを前提に進めます。
以下のリンクを参考にSSMのrun commandを使用して、当該インスタンスにCloudWatchAgentのインストールします。
run commandの後に以下のような画面が表示されていれば成功です。
CloudWatchAgentの設定
EC2インスタンスに接続し、以下のようなコマンドを打ちます。
なお、OSはLinuxの使用を想定しております。
[ec2-user@ip-10-10-0-10 ~]$ sudo /opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent-ctl -m ec2 -a status
{
"status": "stopped",
"starttime": "",
"configstatus": "not configured",
"version": "1.247357.0b252275"
}
こちらでCloudWatchAgentのステータス確認をします。
コマンド意味ですが、以下のようになります。
・「/opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent-ctl」:Amazon CloudWatch Agentのコマンド実行スクリプトへのパス
・「-m ec2」:Amazon EC2インスタンス上でAmazon CloudWatch Agentを実行する
・「-a status」:Amazon CloudWatch Agentの状態を確認するオプション
では状態確認が終わったので、さっそくCloudWatchAgentの設定を実施していきます。
以下のように対話式で、コンフィグ設定することになります。
こちらのコマンドを打つと、設定が開始されます。
sudo /opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent-config-wizard
以下が実際の設定画面です。
[ec2-user@ip-10-10-0-10 ~]$ sudo /opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent-config-wizard
================================================================
= Welcome to the Amazon CloudWatch Agent Configuration Manager =
= =
= CloudWatch Agent allows you to collect metrics and logs from =
= your host and send them to CloudWatch. Additional CloudWatch =
= charges may apply. =
================================================================
On which OS are you planning to use the agent?
1. linux
2. windows
3. darwin
default choice: [1]:
1
Trying to fetch the default region based on ec2 metadata...
Are you using EC2 or On-Premises hosts?
1. EC2
2. On-Premises
default choice: [1]:
1
Which user are you planning to run the agent?
1. root
2. cwagent
3. others
default choice: [1]:
1
Do you want to turn on StatsD daemon?
1. yes
2. no
default choice: [1]:
2
Do you want to monitor metrics from CollectD? WARNING: CollectD must be installed or the Agent will fail to start
1. yes
2. no
default choice: [1]:
2
Do you want to monitor any host metrics? e.g. CPU, memory, etc.
1. yes
2. no
default choice: [1]:
2
Do you have any existing CloudWatch Log Agent (http://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/AgentReference.html) configuration file to import for migration?
1. yes
2. no
default choice: [2]:
2
Do you want to monitor any log files?
1. yes
2. no
default choice: [1]:
1
Log file path:
/home/ec2-user/record.log
Log group name:
default choice: [record.log]
Log stream name:
default choice: [{instance_id}]
Log Group Retention in days
1. -1
2. 1
3. 3
4. 5
5. 7
6. 14
7. 30
8. 60
9. 90
10. 120
11. 150
12. 180
13. 365
14. 400
15. 545
16. 731
17. 1827
18. 2192
19. 2557
20. 2922
21. 3288
22. 3653
default choice: [1]:
7
Do you want to specify any additional log files to monitor?
1. yes
2. no
default choice: [1]:
2
Saved config file to /opt/aws/amazon-cloudwatch-agent/bin/config.json successfully.
Current config as follows:
{
"agent": {
"run_as_user": "root"
},
"logs": {
"logs_collected": {
"files": {
"collect_list": [
{
"file_path": "/home/ec2-user/record.log",
"log_group_name": "record.log",
"log_stream_name": "{instance_id}",
"retention_in_days": 30
}
]
}
}
}
}
Please check the above content of the config.
The config file is also located at /opt/aws/amazon-cloudwatch-agent/bin/config.json.
Edit it manually if needed.
Do you want to store the config in the SSM parameter store?
1. yes
2. no
default choice: [1]:
1
What parameter store name do you want to use to store your config? (Use 'AmazonCloudWatch-' prefix if you use our managed AWS policy)
default choice: [AmazonCloudWatch-linux]
2
Trying to fetch the default region based on ec2 metadata...
Which region do you want to store the config in the parameter store?
default choice: [us-east-1]
Which AWS credential should be used to send json config to parameter store?
1. ASIA45DCSBDIS7CGT7HF(From SDK)
2. Other
default choice: [1]:
1
Successfully put config to parameter store 2.
Program exits now.
はい、こんな感じで設定が終わりました。
上記で回答した内容はSSM parameter storeに保存されています。
設定が終わったら、以下のコマンドを実行します。
sudo /opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent-ctl -a fetch-config -s -m ec2 -c file:/opt/aws/amazon-cloudwatch-agent/bin/config.json
コマンドの意味について説明します。
「-a fetch-config」:Amazon CloudWatch Agentの設定情報を取得するオプション
「-s」:設定情報を取得するときに、Amazon CloudWatch Agentを停止する
「-m ec2」:Amazon EC2インスタンス上でAmazon CloudWatch Agentを実行する
「-c file:/opt/aws/amazon-cloudwatch-agent/bin/config.json」:取得する設定情報を格納するJSONファイルのパスを指定するオプション
このコマンドを実行することで、Amazon CloudWatch Agentの設定情報を取得し、指定されたJSONファイルに保存することができます。
保存されたJSONファイルは、新にCloudWatchAgentを設定する際に利用することができます。
以上でCloudWatchAgentのインストールおよび設定は終了です。
CloudWatch Alermの設定
では次にCloudWatch Alermの設定をしていきます。
流れとしては以下のようになります。
①先ほど作成したロググループに基づいて、メトリクスフィルターを作成
➁アラームの作成画面で、①で作成したメトリクスフィルターを適用
③SNSトピックに連携するように設定
先ほどのCloudWatchAgentの設定が正しくできていれば、CloudWatchLogsから以下のようなロググループが確認できるはずです。
ロググループをクリックすると、次はログストリームが表示されます。
こちらの設定はinstance_idで指定されているため、対象のインスタンスidをクリックすると以下の画面に遷移します。
こちらは実際のログのイベントですね。
このログを使用して、不正アクセス(401)のフィルタリングをしていきます。
では次に、イベントをフィルター欄に401と入力し、メトリクスフィルターを作成を実施していきます。
その前に用語説明をします。
メトリックス名前空間
メトリックスをグループ化するための任意の名前を付けたものです。これにより、同じ名前空間内のメトリックスは同一のグループに分類されます。
メトリックス名
メトリックスを識別するための名前です。今回は分かりやすくinstance_idとしています。
メトリックス値
今回は401を検知した場合に、アラートに渡される値です。
今回は特定の文字列でフィルターをかけていますが、以下のようにCPUやメモリでもフィルターをかけることが可能です。
CPU使用率: 0.65
メモリ使用量: 512MB
ディスク使用量: 40GB
ネットワークトラフィック: 100 Mbps
HTTPリクエスト数: 500 req/sec
エラー数: 10 errors/minute
データベースのクエリ数: 200 queries/sec
では実際の設定画面がこちらです。
ここまでで、メトリクスフィルターの作成は完了です。
では次に作成したメトリクスフィルターとアラームを紐づけていきましょう。
まずCloudWatch Alermコンソール上から”アラームの作成”をクリックします。
すると、メトリクス名前空間(メトリクスをグループ化したもの)を選択できる画面に遷移します。
ここで先ほど作成したLabApplicationsというメトリクス名前空間を選択します。
メトリクス名前空間を選択すると、次はメトリクス名を選択できますので、ここで先ほど設定したinstance_idを選択します。
ではここからアラームの詳細設定に移ります。
下の画像では、統計の方法や期間を選択可能です。
次は条件の設定です。
アラームの作成が終わったので、次はアラーム状態になった際のアクションを設定していきます。
つまりCloudwatch alarmとSNSトピックの紐づけですね。
SNSトピックはあらかじめUnauthorizedExceptionNotificationという名前で作成しておきました。
設定の際に気を付けることは特にありませんが、タイプをFIFOではなくスタンダートで選択する必要があります。
※FIFOタイプに対応しているプロトコルはSQSのみとなります。
以上が、アラーム作成時に必要な設定時の必要項目です。
ちなみにSNSトピックを作成した際には、以下のような画面でした
ここまでで、以下の設定を実施したことになります。
①Cloudwatch agent
➁Cloudwatch alarm
③メトリクスフィルター
④Cloud alarmとSNSトピック
Lambdaの設定
では次にLambdaを設定していきます。
このLambda関数が、SNSトピックをトリガーに所定のインスタンスのIAMロールを解除する処理となります。
以下が実際のコードです。
"""
This lambda function isolates the instance
for further forensic analysis.
"""
import os, json
import boto3
import logging
# It is a good practice to use proper logging.
# Here we are using the logging module of python.
# https://docs.python.org/3/library/logging.html
logger = logging.getLogger()
logger.setLevel(logging.INFO)
ec2_client = boto3.client('ec2')
ec2_resource = boto3.resource('ec2')
instance_list = []
def lambda_handler(event, context):
print(json.dumps(event))
# Find the instance ID from MetricName defined
# in the Cloudwatch Alarm configuration. MetricName should be set to the instance-id.
message = json.loads(event['Records'][0]["Sns"].get('Message'))
instance_id = message['Trigger'].get('MetricName')
logger.info(instance_id)
# Get the VPC of the instance
vpcId = get_instace_vpc_id(instance_id)
# Call the get_instance function to generate list of all the available instances.
get_instances()
# If the instance is in the list, remove role
# and attach the Isolated_SG
if instance_id in instance_list:
try:
remove_role(instance_id)
except Exception as e:
print(e)
# Attach the isolated Security group
try:
sg_response = ec2_client.describe_security_groups(
Filters=[
{
'Name': 'group-name',
'Values': [
'Isolated_SG',
]
},
]
)
logger.info(sg_response)
if sg_response.get('SecurityGroups'):
security_group_id = sg_response.get('SecurityGroups')[0].get("GroupId")
logger(security_group_id)
attach_isolated_sg(instance_id, security_group_id)
else:
security_group_id = create_sg(vpcId)
attach_isolated_sg(instance_id, security_group_id)
except Exception as e:
print(e)
def get_instances():
"""
This function gets the list of all the EC2 instances in the region.
"""
get_instances = ec2_client.describe_instances()
instances = get_instances['Reservations'][0].get('Instances')
print(instances)
for instance in instances:
print(instance['InstanceId'])
# print(instance['VpcId'])
instance_id = instance['InstanceId']
instance_list.append(instance_id)
print(f"instance-List:{instance_list}")
def remove_role(instance_id):
"""
This function removed the instance proflie attached to the instance.
"""
describe_instance_profile_association_response = ec2_client.describe_iam_instance_profile_associations(
Filters=[
{
'Name': 'instance-id',
'Values': [
instance_id,
]
},
]
)
print(describe_instance_profile_association_response)
association_id = describe_instance_profile_association_response['IamInstanceProfileAssociations'][0].get('AssociationId')
print(association_id)
response = ec2_client.disassociate_iam_instance_profile(
AssociationId=association_id
)
logger.info(response)
def get_instace_vpc_id(instanceId):
"""
This function gets the VPC Id of the instance.
"""
instanceReservations = ec2_client.describe_instances(InstanceIds=[instanceId])['Reservations']
for instanceReservation in instanceReservations:
instancesDescription = instanceReservation['Instances']
for instance in instancesDescription:
return instance['VpcId']
def create_sg(vpcId):
"""
This function creates the isolated security group with no egress access.
"""
security_group_id = ec2_resource.create_security_group(GroupName="Isolated_SG",
Description="Isolated SG for forensic analysis",
VpcId=vpcId)
security_group_id.revoke_egress(IpPermissions= [{'IpProtocol': '-1','IpRanges': [{'CidrIp': '0.0.0.0/0'}],'Ipv6Ranges': [],'PrefixListIds': [],'UserIdGroupPairs': []}])
return security_group_id
def attach_isolated_sg(instance_id, security_group_id):
"""
This function attach the isolated security group to the instance.
"""
logger.info("Inside attach_sg")
logger.info(security_group_id.id)
response = ec2_client.modify_instance_attribute(
Groups=[security_group_id.id],
InstanceId=instance_id)
では細かく解説していきます。
まずは必要ライブラリのインポート及びログの設定から始めます。
import os, json
import boto3
import logging
# It is a good practice to use proper logging.
# Here we are using the logging module of python.
# https://docs.python.org/3/library/logging.html
logger = logging.getLogger()
logger.setLevel(logging.INFO)
ec2_client = boto3.client('ec2')
ec2_resource = boto3.resource('ec2')
instance_list = []
boto3.clientとboto3.resourceの違いは以下を参考するすると良いかと思います。
端的に説明すると
①boto3.clientの場合
import boto3
client = boto3.client('ec2')
response = client.describe_instances(InstanceIds=['i-025e8bfafc8937f02'])
instance = response['Reservations'][0]['Instances'][0]
instance_type = instance.get('InstanceType')
print(instance_type)
実行結果:
t3.micro
このように、Client APIでは、情報を得たい対象のリソースIDを引数に与えてメソッドを実行します。 実行結果は「辞書型」で得られるため、そこから階層を辿って必要な情報を取り出すことになります。
➁boto3.resourceの場合
import boto3
ec2 = boto3.resource('ec2')
instance = ec2.Instance('i-025e8bfafc8937f02')
instance_type = instance.instance_type
print(instance_type)
実行結果:
t3.micro
このように、Resource APIでは、まず対象のリソースを「オブジェクト」として取得してから、オブジェクトの持つ「属性」を参照して情報を取り出すという手順になります
その後で、instance_list[]という空のリストを定義します。
こちらのリストはこの後使用していきます。
では次はlambda_handler関数の中身について確認していきます。
def lambda_handler(event, context):
print(json.dumps(event))
# Find the instance ID from MetricName defined
# in the Cloudwatch Alarm configuration. MetricName should be set to the instance-id.
message = json.loads(event['Records'][0]["Sns"].get('Message'))
instance_id = message['Trigger'].get('MetricName')
logger.info(instance_id)
# Get the VPC of the instance
vpcId = get_instace_vpc_id(instance_id)
# Call the get_instance function to generate list of all the available instances.
get_instances()
今回Lambdaが受け取るeventは、SNSトピックから受け取るイベントオブジェクトとなります。
以下のコードの出力結果は以下のようになります。
print(json.dumps(event))
出力結果
{
"Records": [
{
"EventSource": "aws:sns",
"EventVersion": "1.0",
"EventSubscriptionArn": "arn:aws:sns:ap-northeast-1:123456789012:mytopic:0884-5d81c0db-4e13-829f-596f7ea9f8ad",
"Sns": {
"Type": "Notification",
...
"Subject": "Message Title",
"Message": "Message Body",
...
}
}
]
}
この後、最終的にSNSから渡されるMetricNameを取得するためのコードを書いていきます。
本記事の前半で説明したように、MetricName=渡されるインスタンスIDでしたね。
以下参考記事です。
上記記事にあるように、今回取得したい情報は、"Records"の中の"Sns"の"message"に格納されています。そのため以下のようなコードが必要になるわけです。
message = json.loads(event['Records'][0]["Sns"].get('Message'))
またRecords→Snsと階層を下げて確認していくと、Snsの中身は以下のようになっています。
"Sns": {
"Type": "Notification",
"MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e",
"TopicArn": "arn:aws:sns:us-west-2:123456789012:example-topic",
"Subject": "example subject",
"Message": "省略",
"Timestamp": "1970-01-01T00:00:00.000Z",
"SignatureVersion": "1",
"Signature": "EXAMPLE",
"SigningCertUrl": "EXAMPLE",
"UnsubscribeUrl": "EXAMPLE",
"MessageAttributes": {
"Test": {
"Type": "String",
"Value": "TestString"
},
"TestBinary": {
"Type": "Binary",
"Value": "TestBinary"
}
}
}
つまり、ここではmessageという変数に、上記のmessageの中身のオブジェクトを格納したことになります。
次にinstance_idという変数に格納する値を取得します。
instance_id = message['Trigger'].get('MetricName')
messageオブジェクトには以下のような情報が格納されています。
この中で実際にinstance_idに格納するのが、Triggerの中にあるMetricNameです。
{
"AlarmName": "Example Alarm",
"AlarmDescription": "This is an example alarm",
"AWSAccountId": "123456789012",
"NewStateValue": "ALARM",
"NewStateReason": "Threshold Crossed: 1 datapoint (10.0) was greater than or equal to the threshold (10.0).",
"StateChangeTime": "2019-02-28T21:08:32.453+0000",
"Region": "us-west-2",
"OldStateValue": "INSUFFICIENT_DATA",
"Trigger": {
"MetricName": "ExampleMetric",
"Namespace": "ExampleNamespace",
"StatisticType": "SampleCount",
"Statistic": "Sum",
"Unit": null,
"Dimensions": [
{
"name": "ExampleDimensionName",
"value": "ExampleDimensionValue"
}
],
"Period": 300,
"EvaluationPeriods": 1,
"ComparisonOperator": "GreaterThanOrEqualToThreshold",
"Threshold": 10.0,
"TreatMissingData": "",
"EvaluateLowSampleCountPercentile": ""
}
}
本記事の冒頭で、CloudWatch Agentを設定した際、メトリクス名はそのインスタンスidでした。
そのためここでようやくSNSトピックから渡されたEC2 instanceIdを取得することが出来ました。
次にvpcId = get_instace_vpc_id(instance_id)で、そのインスタンスが存在するVPCidを取得しています。
この関数は後半でしれっと定義されていますので、今から確認していきます。
def get_instace_vpc_id(instanceId):
"""
This function gets the VPC Id of the instance.
"""
instanceReservations = ec2_client.describe_instances(InstanceIds=[instanceId])['Reservations']
for instanceReservation in instanceReservations:
instancesDescription = instanceReservation['Instances']
for instance in instancesDescription:
return instance['VpcId']
まずinstanceReservationsという変数に、describe_instancesの結果をinstanceIdでフィルタリングした値を格納しています。
ちなみにReservationsの中身はこんな感じです。
→長いので省略しています。全容はこちらより確認可能です。
ここまでで、instanceReservationsの中にSNSから渡されたMetricName=InstanceIDが所属するVpcIdを取得することができました。
この中のVpcIdを取得するのが、本関数の目的になります。
{
"Reservations": [
{
"OwnerId": "xxxxxxxxxxxx",
"ReservationId": "r-xxxxxxxxxxxxxxxxx",
"Groups": [],
"Instances": [
{
"Monitoring": {
"State": "disabled"
},
"PublicDnsName": "ec2-xxx-xxx-xxx-xxx.ap-northeast-1.compute.amazonaws.com",
"Platform": "xxxxxxx",
"State": {
"Code": 80,
"Name": "stopped"
},
"EbsOptimized": false,
"LaunchTime": "xxxx-xx-xxxxx:xx:xx.xxxx",
"PublicIpAddress": "xxx.xxx.xxx.xxx",
"PrivateIpAddress": "xxx.xxx.xxx.xxx",
"ProductCodes": [],
"VpcId": "vpc-xxxxxxx",
"StateTransitionReason": "",
"InstanceId": "i-xxxxxxxxxxxxxxxxx",
"EnaSupport": true,
"ImageId": "ami-xxxxxxxx",
この後はforでループさせて返り値で、VpcIdを返します。
次にget_instances()についても解説していきます。
def get_instances():
"""
This function gets the list of all the EC2 instances in the region.
"""
get_instances = ec2_client.describe_instances()
instances = get_instances['Reservations'][0].get('Instances')
print(instances)
for instance in instances:
print(instance['InstanceId'])
# print(instance['VpcId'])
instance_id = instance['InstanceId']
instance_list.append(instance_id)
print(f"instance-List:{instance_list}")
まずは、get_instances という変数に、describe_instancesで取得された値を格納します。
その後、['Reservations'][0]で階層をおります。Reservationsフィールドは、複数のインスタンス情報をリストとして格納するので、そのリストの中の最初の一つから、Instancesキーを用いて情報を取得しています。
そのため、instancesには、Reservationsに確報されている以下のような情報が格納されます。
{
'AmiLaunchIndex': 0,
'ImageId': 'ami-0c55b159cbfafe1f0',
'InstanceId': 'i-0b8f3a3d3c251e3e3',
'InstanceType': 't2.micro',
'KeyName': 'my-key-pair',
'LaunchTime': datetime.datetime(2022, 2, 8, 7, 54, 40, tzinfo=tzutc()),
'Monitoring': {'State': 'disabled'},
'Placement': {'AvailabilityZone': 'us-west-2b',
'GroupName': '',
'Tenancy': 'default'},
'PrivateDnsName': 'ip-172-31-26-78.us-west-2.compute.internal',
'PrivateIpAddress': '172.31.26.78',
'ProductCodes': [],
'PublicDnsName': '',
'State': {'Code': 80, 'Name': 'stopped'},
'StateTransitionReason': 'User initiated (2022-02-08 07:58:15 GMT)',
'SubnetId': 'subnet-0feda679',
'VpcId': 'vpc-0d7f982c'
}
次に以下のコードについても確認します。
instance_id = instance['InstanceId']
instance_list.append(instance_id)
print(f"instance-List:{instance_list}")
まず先ほどのinstanesの中にあるInstanceId'(上の図だとi-0b8f3a3d3c251e3e3)をinstance_idという変数に格納します。
その後しれっと、このinstance_idをinstance_listというリストに加えます。
はい、ここまでで、以下二つの関数についての解説を終わります。
vpcId = get_instace_vpc_id(instance_id)
get_instances()
では次はifの中身について深堀していきます。
ざっくりの説明ですが、先ほど取得したinstance_idがinstance_listに含まれていた場合に、そのインスタンスのIAMロールをデタッチするといった作業になります。
if instance_id in instance_list:
try:
remove_role(instance_id)
except Exception as e:
print(e)
# Attach the isolated Security group
try:
sg_response = ec2_client.describe_security_groups(
Filters=[
{
'Name': 'group-name',
'Values': [
'Isolated_SG',
]
},
]
)
logger.info(sg_response)
if sg_response.get('SecurityGroups'):
security_group_id = sg_response.get('SecurityGroups')[0].get("GroupId")
logger(security_group_id)
attach_isolated_sg(instance_id, security_group_id)
else:
security_group_id = create_sg(vpcId)
attach_isolated_sg(instance_id, security_group_id)
except Exception as e:
print(e)
SNSより渡されたeventに含まれているinstance_idがget_instances()関数でリスト化されたinstance_listに含まれているか確認します。
仮に含まれている場合は、remove_roleメソッドで、ロールを解除します。
その後describe_security_groupsメソッドで、Isolated_SGでフィルタリングした値sg_response に格納します。具体的にはこんか感じのものがsg_responseに格納されます。
"SecurityGroups": [
{
"Description": "examplename",
"GroupName": "examplename",
"IpPermissions": [
{
"IpProtocol": "-1",
"IpRanges": [
{
"CidrIp": "0.0.0.0/0"
}
],
"Ipv6Ranges": [],
"PrefixListIds": [],
"UserIdGroupPairs": []
}
],
"OwnerId": "example",
"GroupId": "example",
"IpPermissionsEgress": [
{
"IpProtocol": "-1",
"IpRanges": [
{
"CidrIp": "0.0.0.0/0"
}
],
"Ipv6Ranges": [],
"PrefixListIds": [],
"UserIdGroupPairs": []
}
],
"VpcId": "example"
}
]
では次のifの処理について確認していきます。
if sg_response.get('SecurityGroups'):
security_group_id = sg_response.get('SecurityGroups')[0].get("GroupId")
logger(security_group_id)
attach_isolated_sg(instance_id, security_group_id)
else:
security_group_id = create_sg(vpcId)
attach_isolated_sg(instance_id, security_group_id)
except Exception as e:
print(e)
一行目は、実際に指定された引数が存在するか確認し、存在した場合はその情報を返します。その後sg_responseに格納されている情報から、group_idを取得し、security_group_idに格納します。
ここで、格納されている値はIsolated_SGというSG名と紐づくsecuritygroupのidとなります。
つまりここでの処理はSNSから渡されたinstanceIdにIsolated_SGをアタッチするという処理になります。
仮にここでIsolated_SGが存在しなかった場合は、以下のcreate_sgで作成されることになります。
def create_sg(vpcId):
"""
This function creates the isolated security group with no egress access.
"""
security_group_id = ec2_resource.create_security_group(GroupName="Isolated_SG",
Description="Isolated SG for forensic analysis",
VpcId=vpcId)
security_group_id.revoke_egress(IpPermissions= [{'IpProtocol': '-1','IpRanges': [{'CidrIp': '0.0.0.0/0'}],'Ipv6Ranges': [],'PrefixListIds': [],'UserIdGroupPairs': []}])
return security_group_id
この関数は、与えられたVPC IDを使用して「Isolated_SG」という名前のセキュリティグループを作成し、すべての出口トラフィックを拒否する設定をしています。
その後、ec2_resource.create_security_groupメソッドを使用してセキュリティグループを作成し、セキュリティグループIDを返します。作成されたセキュリティグループには、許可された入力トラフィックがなく、すべてのインバウンドトラフィックがブロックされています。
次に、reboke_engressメソッドでアウトバウンド通信も無効化します。
以下のIpProtocal: "-1"は、すべてのトラフィックを許可するアウトバウンドルールを削除する。という挙動です。
[{'IpProtocol': '-1', 'IpRanges': [{'CidrIp': '0.0.0.0/0'}],
'Ipv6Ranges': [],
'PrefixListIds': [],
'UserIdGroupPairs': []}]
はい、これで前半部分のifの解説は終了になります。
あとは、まだ解説しきれていなかったコードについて説明します。
remove_role(instance_id):
"""
This function removed the instance proflie attached to the instance.
"""
describe_instance_profile_association_response = ec2_client.describe_iam_instance_profile_associations(
Filters=[
{
'Name': 'instance-id',
'Values': [
instance_id,
]
},
]
)
print(describe_instance_profile_association_response)
association_id = describe_instance_profile_association_response['IamInstanceProfileAssociations'][0].get('AssociationId')
print(association_id)
response = ec2_client.disassociate_iam_instance_profile(
AssociationId=association_id
)
logger.info(response)
変数describe_instance_profile_association_response には、instance_idで絞った以下のような情報が格納されます。
詳細は公式ドキュメントをご確認ください
{
"IamInstanceProfileAssociations": [
{
"InstanceId": "i-09eb09efa73ec1dee",
"State": "associated",
"AssociationId": "iip-assoc-0db249b1f25fa24b8",
"IamInstanceProfile": {
"Id": "AIPAJVQN4F5WVLGCJDRGM",
"Arn": "arn:aws:iam::123456789012:instance-profile/admin-role"
}
},
{
"InstanceId": "i-0402909a2f4dffd14",
"State": "associating",
"AssociationId": "iip-assoc-0d1ec06278d29f44a",
"IamInstanceProfile": {
"Id": "AGJAJVQN4F5WVLGCJABCM",
"Arn": "arn:aws:iam::123456789012:instance-profile/user1-role"
}
}
]
}
その後、以下のコードでAsossiationIdをassociation_id に格納します。
ここで格納したassociation_id は後続のdisassociate_iam_instance_profile メソッドで使用します。
association_id = describe_instance_profile_association_response['IamInstanceProfileAssociations'][0].get('AssociationId')
では実際にIAMロールをデタッチしていきます。
response = ec2_client.disassociate_iam_instance_profile(
AssociationId=association_id
)
disassociate_iam_instance_profileメソッドで、AssociationIdを指定する必要があります。
ここで、先ほど取得したassociation_idを指定します。
以上でコードの解説を終わります。
この後Lambdaのコンソール上から、以下のようにトリガーのSNSトピックを設定します。
動作確認
最後に意図的に401エラーを出すことにより、IAMロールが解除され、かつIsolated_SGがアタッチされたことを確認します。
終わりに
以上です。
想像上のボリュームになってしまい、一部コードの解説が雑になってしまった箇所があったかと思います。
このCloudwatch alarm→SNS→Lambdaの流れは、業務でも使用するフローですので、引き続き学習を続けたいです。