5
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

EC2への不正アクセスを検知すると、そのEC2のIAMロールを解除する処理を作成してみた

Posted at

はじめに

最近Cloud Questにはまっています。
その中で表題の処理を体験できるものがありましたので、さっそくやってみました。

今回の処理は以下のようになっています。

①Cloudwatch alermで不正アクセスを感知
➁Cloudwatch alermからSNSトピックに連携
③SNSトピックがトリガーなり、Lambda関数を起動
④Lambda関数の処理で、該当インスタンスのIAMロールを解除

では順番に確認していきます。

CloudWatchAgentのインストール

EC2インスタンスがSSMで確認可能ということを前提に進めます。
以下のリンクを参考にSSMのrun commandを使用して、当該インスタンスにCloudWatchAgentのインストールします。

run commandの後に以下のような画面が表示されていれば成功です。

image.png

CloudWatchAgentの設定

EC2インスタンスに接続し、以下のようなコマンドを打ちます。
なお、OSはLinuxの使用を想定しております。

[ec2-user@ip-10-10-0-10 ~]$ sudo /opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent-ctl -m ec2 -a status
{
  "status": "stopped",
  "starttime": "",
  "configstatus": "not configured",
  "version": "1.247357.0b252275"
}

こちらでCloudWatchAgentのステータス確認をします。
コマンド意味ですが、以下のようになります。
・「/opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent-ctl」:Amazon CloudWatch Agentのコマンド実行スクリプトへのパス
・「-m ec2」:Amazon EC2インスタンス上でAmazon CloudWatch Agentを実行する
・「-a status」:Amazon CloudWatch Agentの状態を確認するオプション

では状態確認が終わったので、さっそくCloudWatchAgentの設定を実施していきます。
以下のように対話式で、コンフィグ設定することになります。
こちらのコマンドを打つと、設定が開始されます。

sudo /opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent-config-wizard 

以下が実際の設定画面です。

[ec2-user@ip-10-10-0-10 ~]$ sudo /opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent-config-wizard 
================================================================
= Welcome to the Amazon CloudWatch Agent Configuration Manager =
=                                                              =
= CloudWatch Agent allows you to collect metrics and logs from =
= your host and send them to CloudWatch. Additional CloudWatch =
= charges may apply.                                           =
================================================================
On which OS are you planning to use the agent?
1. linux
2. windows
3. darwin
default choice: [1]:
1
Trying to fetch the default region based on ec2 metadata...
Are you using EC2 or On-Premises hosts?
1. EC2
2. On-Premises
default choice: [1]:
1
Which user are you planning to run the agent?
1. root
2. cwagent
3. others
default choice: [1]:
1
Do you want to turn on StatsD daemon?
1. yes
2. no
default choice: [1]:
2
Do you want to monitor metrics from CollectD? WARNING: CollectD must be installed or the Agent will fail to start

1. yes
2. no
default choice: [1]:
2
Do you want to monitor any host metrics? e.g. CPU, memory, etc.
1. yes
2. no
default choice: [1]:
2
Do you have any existing CloudWatch Log Agent (http://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/AgentReference.html) configuration file to import for migration?
1. yes
2. no
default choice: [2]:
2
Do you want to monitor any log files?
1. yes
2. no
default choice: [1]:
1
Log file path:
/home/ec2-user/record.log
Log group name:
default choice: [record.log]

Log stream name:
default choice: [{instance_id}]

Log Group Retention in days
1. -1
2. 1
3. 3
4. 5
5. 7
6. 14
7. 30
8. 60
9. 90
10. 120
11. 150
12. 180
13. 365
14. 400
15. 545
16. 731
17. 1827
18. 2192
19. 2557
20. 2922
21. 3288
22. 3653
default choice: [1]:
7
Do you want to specify any additional log files to monitor?
1. yes
2. no
default choice: [1]:
2
Saved config file to /opt/aws/amazon-cloudwatch-agent/bin/config.json successfully.
Current config as follows:
{
        "agent": {
                "run_as_user": "root"
        },
        "logs": {
                "logs_collected": {
                        "files": {
                                "collect_list": [
                                        {
                                                "file_path": "/home/ec2-user/record.log",
                                                "log_group_name": "record.log",
                                                "log_stream_name": "{instance_id}",
                                                "retention_in_days": 30
                                        }
                                ]
                        }
                }
        }
}
Please check the above content of the config.
The config file is also located at /opt/aws/amazon-cloudwatch-agent/bin/config.json.
Edit it manually if needed.
Do you want to store the config in the SSM parameter store?
1. yes
2. no
default choice: [1]:
1
What parameter store name do you want to use to store your config? (Use 'AmazonCloudWatch-' prefix if you use our managed AWS policy)
default choice: [AmazonCloudWatch-linux]
2
Trying to fetch the default region based on ec2 metadata...
Which region do you want to store the config in the parameter store?
default choice: [us-east-1]

Which AWS credential should be used to send json config to parameter store?
1. ASIA45DCSBDIS7CGT7HF(From SDK)
2. Other
default choice: [1]:
1
Successfully put config to parameter store 2.
Program exits now.

はい、こんな感じで設定が終わりました。
上記で回答した内容はSSM parameter storeに保存されています。

設定が終わったら、以下のコマンドを実行します。

sudo /opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent-ctl -a fetch-config -s -m ec2 -c file:/opt/aws/amazon-cloudwatch-agent/bin/config.json

コマンドの意味について説明します。
「-a fetch-config」:Amazon CloudWatch Agentの設定情報を取得するオプション
「-s」:設定情報を取得するときに、Amazon CloudWatch Agentを停止する
「-m ec2」:Amazon EC2インスタンス上でAmazon CloudWatch Agentを実行する
「-c file:/opt/aws/amazon-cloudwatch-agent/bin/config.json」:取得する設定情報を格納するJSONファイルのパスを指定するオプション

このコマンドを実行することで、Amazon CloudWatch Agentの設定情報を取得し、指定されたJSONファイルに保存することができます。
保存されたJSONファイルは、新にCloudWatchAgentを設定する際に利用することができます。

以上でCloudWatchAgentのインストールおよび設定は終了です。

CloudWatch Alermの設定

では次にCloudWatch Alermの設定をしていきます。
流れとしては以下のようになります。

①先ほど作成したロググループに基づいて、メトリクスフィルターを作成
➁アラームの作成画面で、①で作成したメトリクスフィルターを適用
③SNSトピックに連携するように設定

先ほどのCloudWatchAgentの設定が正しくできていれば、CloudWatchLogsから以下のようなロググループが確認できるはずです。
image.png
ロググループをクリックすると、次はログストリームが表示されます。
こちらの設定はinstance_idで指定されているため、対象のインスタンスidをクリックすると以下の画面に遷移します。
こちらは実際のログのイベントですね。
このログを使用して、不正アクセス(401)のフィルタリングをしていきます。
image.png

では次に、イベントをフィルター欄に401と入力し、メトリクスフィルターを作成を実施していきます。
その前に用語説明をします。

メトリックス名前空間
メトリックスをグループ化するための任意の名前を付けたものです。これにより、同じ名前空間内のメトリックスは同一のグループに分類されます。

メトリックス名
メトリックスを識別するための名前です。今回は分かりやすくinstance_idとしています。

メトリックス値
今回は401を検知した場合に、アラートに渡される値です。
今回は特定の文字列でフィルターをかけていますが、以下のようにCPUやメモリでもフィルターをかけることが可能です。

CPU使用率: 0.65
メモリ使用量: 512MB
ディスク使用量: 40GB
ネットワークトラフィック: 100 Mbps
HTTPリクエスト数: 500 req/sec
エラー数: 10 errors/minute
データベースのクエリ数: 200 queries/sec

では実際の設定画面がこちらです。

image.png

ここまでで、メトリクスフィルターの作成は完了です。
では次に作成したメトリクスフィルターとアラームを紐づけていきましょう。

まずCloudWatch Alermコンソール上から”アラームの作成”をクリックします。

image.png
すると、メトリクス名前空間(メトリクスをグループ化したもの)を選択できる画面に遷移します。
ここで先ほど作成したLabApplicationsというメトリクス名前空間を選択します。

image.png

メトリクス名前空間を選択すると、次はメトリクス名を選択できますので、ここで先ほど設定したinstance_idを選択します。

image.png

ではここからアラームの詳細設定に移ります。
下の画像では、統計の方法や期間を選択可能です。

image.png

次は条件の設定です。

image.png

アラームの作成が終わったので、次はアラーム状態になった際のアクションを設定していきます。
つまりCloudwatch alarmとSNSトピックの紐づけですね。

SNSトピックはあらかじめUnauthorizedExceptionNotificationという名前で作成しておきました。
設定の際に気を付けることは特にありませんが、タイプをFIFOではなくスタンダートで選択する必要があります。
※FIFOタイプに対応しているプロトコルはSQSのみとなります。

image.png

以上が、アラーム作成時に必要な設定時の必要項目です。
ちなみにSNSトピックを作成した際には、以下のような画面でした

image.png

ここまでで、以下の設定を実施したことになります。
①Cloudwatch agent
➁Cloudwatch alarm
③メトリクスフィルター
④Cloud alarmとSNSトピック

Lambdaの設定

では次にLambdaを設定していきます。
このLambda関数が、SNSトピックをトリガーに所定のインスタンスのIAMロールを解除する処理となります。
以下が実際のコードです。

"""
This lambda function isolates the instance 
for further forensic analysis.
"""
import os, json
import boto3
import logging

# It is a good practice to use proper logging.
# Here we are using the logging module of python.
# https://docs.python.org/3/library/logging.html

logger = logging.getLogger()
logger.setLevel(logging.INFO)
ec2_client = boto3.client('ec2')
ec2_resource = boto3.resource('ec2')
instance_list = []

def lambda_handler(event, context):
    print(json.dumps(event))
    
    # Find the instance ID from MetricName defined 
    # in the Cloudwatch Alarm configuration. MetricName should be set to the instance-id.
    
    message = json.loads(event['Records'][0]["Sns"].get('Message'))
    instance_id = message['Trigger'].get('MetricName')
    logger.info(instance_id)
    
    # Get the VPC of the instance
    vpcId = get_instace_vpc_id(instance_id)
    
    # Call the get_instance function to generate list of all the available instances.
    get_instances()
    
    # If the instance is in the list, remove role 
    # and attach the Isolated_SG
    if instance_id in instance_list:
        try:
            remove_role(instance_id)
        except Exception as e:
            print(e)
    
        # Attach the isolated Security group
        try:
            sg_response = ec2_client.describe_security_groups(
            Filters=[
                    {
                        'Name': 'group-name',
                        'Values': [
                            'Isolated_SG',
                        ]
                    },
                ]
                )
            logger.info(sg_response)    
            if sg_response.get('SecurityGroups'):
                security_group_id = sg_response.get('SecurityGroups')[0].get("GroupId")
                logger(security_group_id)
                attach_isolated_sg(instance_id, security_group_id)
            else:
                security_group_id = create_sg(vpcId)
                attach_isolated_sg(instance_id, security_group_id)
        except Exception as e:
            print(e)
    

def get_instances():
    """
    This function gets the list of all the EC2 instances in the region.
    """
    get_instances = ec2_client.describe_instances()
    instances = get_instances['Reservations'][0].get('Instances')
    print(instances)
    for instance in instances:
        print(instance['InstanceId'])
        # print(instance['VpcId'])
        instance_id = instance['InstanceId']
        instance_list.append(instance_id)
    print(f"instance-List:{instance_list}")
    
def remove_role(instance_id):
    """
    This function removed the instance proflie attached to the instance.
    """
    describe_instance_profile_association_response = ec2_client.describe_iam_instance_profile_associations(
        Filters=[
                {
                    'Name': 'instance-id',
                    'Values': [
                        instance_id,
                    ]
                },
            ]
        )
    print(describe_instance_profile_association_response)
    association_id = describe_instance_profile_association_response['IamInstanceProfileAssociations'][0].get('AssociationId')
    
    print(association_id)
    response = ec2_client.disassociate_iam_instance_profile(
            AssociationId=association_id
            )
    
    logger.info(response)
    
    
def get_instace_vpc_id(instanceId):
    """
    This function gets the VPC Id of the instance.
    """
    instanceReservations = ec2_client.describe_instances(InstanceIds=[instanceId])['Reservations']
    for instanceReservation in instanceReservations:
        instancesDescription = instanceReservation['Instances']
        for instance in instancesDescription:
            return instance['VpcId']
            
def create_sg(vpcId):
    """
    This function creates the isolated security group with no egress access.
    """
    security_group_id = ec2_resource.create_security_group(GroupName="Isolated_SG", 
                                                     Description="Isolated SG for forensic analysis", 
                                                     VpcId=vpcId)
    security_group_id.revoke_egress(IpPermissions= [{'IpProtocol': '-1','IpRanges': [{'CidrIp': '0.0.0.0/0'}],'Ipv6Ranges': [],'PrefixListIds': [],'UserIdGroupPairs': []}])
    return security_group_id

def attach_isolated_sg(instance_id, security_group_id):
    """
    This function attach the isolated security group to the instance.
    """

    logger.info("Inside attach_sg")
    logger.info(security_group_id.id)
    
    
    response = ec2_client.modify_instance_attribute(
        Groups=[security_group_id.id],
        InstanceId=instance_id)

では細かく解説していきます。
まずは必要ライブラリのインポート及びログの設定から始めます。

import os, json
import boto3
import logging

# It is a good practice to use proper logging.
# Here we are using the logging module of python.
# https://docs.python.org/3/library/logging.html

logger = logging.getLogger()
logger.setLevel(logging.INFO)
ec2_client = boto3.client('ec2')
ec2_resource = boto3.resource('ec2')
instance_list = []

boto3.clientとboto3.resourceの違いは以下を参考するすると良いかと思います。

端的に説明すると

①boto3.clientの場合

import boto3
 
client = boto3.client('ec2')
response = client.describe_instances(InstanceIds=['i-025e8bfafc8937f02'])

instance = response['Reservations'][0]['Instances'][0]
instance_type = instance.get('InstanceType')
print(instance_type)

実行結果:

t3.micro

このように、Client APIでは、情報を得たい対象のリソースIDを引数に与えてメソッドを実行します。 実行結果は「辞書型」で得られるため、そこから階層を辿って必要な情報を取り出すことになります。

➁boto3.resourceの場合

import boto3

ec2 = boto3.resource('ec2')
instance = ec2.Instance('i-025e8bfafc8937f02')

instance_type = instance.instance_type
print(instance_type)

実行結果:

t3.micro

このように、Resource APIでは、まず対象のリソースを「オブジェクト」として取得してから、オブジェクトの持つ「属性」を参照して情報を取り出すという手順になります

その後で、instance_list[]という空のリストを定義します。
こちらのリストはこの後使用していきます。

では次はlambda_handler関数の中身について確認していきます。

def lambda_handler(event, context):
    print(json.dumps(event))
    
    # Find the instance ID from MetricName defined 
    # in the Cloudwatch Alarm configuration. MetricName should be set to the instance-id.
    
    message = json.loads(event['Records'][0]["Sns"].get('Message'))
    instance_id = message['Trigger'].get('MetricName')
    logger.info(instance_id)
    
    # Get the VPC of the instance
    vpcId = get_instace_vpc_id(instance_id)
    
    # Call the get_instance function to generate list of all the available instances.
    get_instances()

今回Lambdaが受け取るeventは、SNSトピックから受け取るイベントオブジェクトとなります。
以下のコードの出力結果は以下のようになります。

print(json.dumps(event))

出力結果

    {
  "Records": [
    {
      "EventSource": "aws:sns",
      "EventVersion": "1.0",
      "EventSubscriptionArn": "arn:aws:sns:ap-northeast-1:123456789012:mytopic:0884-5d81c0db-4e13-829f-596f7ea9f8ad",
      "Sns": {
        "Type": "Notification",
        ...
        "Subject": "Message Title",
        "Message": "Message Body",
        ...
      }
    }
  ]
}

この後、最終的にSNSから渡されるMetricNameを取得するためのコードを書いていきます。
本記事の前半で説明したように、MetricName=渡されるインスタンスIDでしたね。
以下参考記事です。

上記記事にあるように、今回取得したい情報は、"Records"の中の"Sns"の"message"に格納されています。そのため以下のようなコードが必要になるわけです。

message = json.loads(event['Records'][0]["Sns"].get('Message'))

またRecords→Snsと階層を下げて確認していくと、Snsの中身は以下のようになっています。

   "Sns": {
        "Type": "Notification",
        "MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e",
        "TopicArn": "arn:aws:sns:us-west-2:123456789012:example-topic",
        "Subject": "example subject",
        "Message": "省略",
        "Timestamp": "1970-01-01T00:00:00.000Z",
        "SignatureVersion": "1",
        "Signature": "EXAMPLE",
        "SigningCertUrl": "EXAMPLE",
        "UnsubscribeUrl": "EXAMPLE",
        "MessageAttributes": {
          "Test": {
            "Type": "String",
            "Value": "TestString"
          },
          "TestBinary": {
            "Type": "Binary",
            "Value": "TestBinary"
          }
        }
      }

つまり、ここではmessageという変数に、上記のmessageの中身のオブジェクトを格納したことになります。

次にinstance_idという変数に格納する値を取得します。

instance_id = message['Trigger'].get('MetricName')

messageオブジェクトには以下のような情報が格納されています。
この中で実際にinstance_idに格納するのが、Triggerの中にあるMetricNameです。

{
  "AlarmName": "Example Alarm",
  "AlarmDescription": "This is an example alarm",
  "AWSAccountId": "123456789012",
  "NewStateValue": "ALARM",
  "NewStateReason": "Threshold Crossed: 1 datapoint (10.0) was greater than or equal to the threshold (10.0).",
  "StateChangeTime": "2019-02-28T21:08:32.453+0000",
  "Region": "us-west-2",
  "OldStateValue": "INSUFFICIENT_DATA",
  "Trigger": {
    "MetricName": "ExampleMetric",
    "Namespace": "ExampleNamespace",
    "StatisticType": "SampleCount",
    "Statistic": "Sum",
    "Unit": null,
    "Dimensions": [
      {
        "name": "ExampleDimensionName",
        "value": "ExampleDimensionValue"
      }
    ],
    "Period": 300,
    "EvaluationPeriods": 1,
    "ComparisonOperator": "GreaterThanOrEqualToThreshold",
    "Threshold": 10.0,
    "TreatMissingData": "",
    "EvaluateLowSampleCountPercentile": ""
  }
}

本記事の冒頭で、CloudWatch Agentを設定した際、メトリクス名はそのインスタンスidでした。
そのためここでようやくSNSトピックから渡されたEC2 instanceIdを取得することが出来ました。

次にvpcId = get_instace_vpc_id(instance_id)で、そのインスタンスが存在するVPCidを取得しています。
この関数は後半でしれっと定義されていますので、今から確認していきます。

def get_instace_vpc_id(instanceId):
    """
    This function gets the VPC Id of the instance.
    """
    instanceReservations = ec2_client.describe_instances(InstanceIds=[instanceId])['Reservations']
    for instanceReservation in instanceReservations:
        instancesDescription = instanceReservation['Instances']
        for instance in instancesDescription:
            return instance['VpcId']

まずinstanceReservationsという変数に、describe_instancesの結果をinstanceIdでフィルタリングした値を格納しています。
ちなみにReservationsの中身はこんな感じです。
→長いので省略しています。全容はこちらより確認可能です。

ここまでで、instanceReservationsの中にSNSから渡されたMetricName=InstanceIDが所属するVpcIdを取得することができました。
この中のVpcIdを取得するのが、本関数の目的になります。

{
"Reservations": [
    {
        "OwnerId": "xxxxxxxxxxxx", 
        "ReservationId": "r-xxxxxxxxxxxxxxxxx", 
        "Groups": [], 
        "Instances": [
            {
                "Monitoring": {
                    "State": "disabled"
                }, 
                "PublicDnsName": "ec2-xxx-xxx-xxx-xxx.ap-northeast-1.compute.amazonaws.com", 
                "Platform": "xxxxxxx", 
                "State": {
                    "Code": 80, 
                    "Name": "stopped"
                }, 
                "EbsOptimized": false, 
                "LaunchTime": "xxxx-xx-xxxxx:xx:xx.xxxx", 
                "PublicIpAddress": "xxx.xxx.xxx.xxx", 
                "PrivateIpAddress": "xxx.xxx.xxx.xxx", 
                "ProductCodes": [], 
                "VpcId": "vpc-xxxxxxx", 
                "StateTransitionReason": "", 
                "InstanceId": "i-xxxxxxxxxxxxxxxxx", 
                "EnaSupport": true, 
                "ImageId": "ami-xxxxxxxx", 

この後はforでループさせて返り値で、VpcIdを返します。

次にget_instances()についても解説していきます。

def get_instances():
    """
    This function gets the list of all the EC2 instances in the region.
    """
    get_instances = ec2_client.describe_instances()
    instances = get_instances['Reservations'][0].get('Instances')
    print(instances)
    for instance in instances:
        print(instance['InstanceId'])
        # print(instance['VpcId'])
        instance_id = instance['InstanceId']
        instance_list.append(instance_id)
    print(f"instance-List:{instance_list}")

まずは、get_instances という変数に、describe_instancesで取得された値を格納します。
その後、['Reservations'][0]で階層をおります。Reservationsフィールドは、複数のインスタンス情報をリストとして格納するので、そのリストの中の最初の一つから、Instancesキーを用いて情報を取得しています。

そのため、instancesには、Reservationsに確報されている以下のような情報が格納されます。

{
    'AmiLaunchIndex': 0,
    'ImageId': 'ami-0c55b159cbfafe1f0',
    'InstanceId': 'i-0b8f3a3d3c251e3e3',
    'InstanceType': 't2.micro',
    'KeyName': 'my-key-pair',
    'LaunchTime': datetime.datetime(2022, 2, 8, 7, 54, 40, tzinfo=tzutc()),
    'Monitoring': {'State': 'disabled'},
    'Placement': {'AvailabilityZone': 'us-west-2b',
                  'GroupName': '',
                  'Tenancy': 'default'},
    'PrivateDnsName': 'ip-172-31-26-78.us-west-2.compute.internal',
    'PrivateIpAddress': '172.31.26.78',
    'ProductCodes': [],
    'PublicDnsName': '',
    'State': {'Code': 80, 'Name': 'stopped'},
    'StateTransitionReason': 'User initiated (2022-02-08 07:58:15 GMT)',
    'SubnetId': 'subnet-0feda679',
    'VpcId': 'vpc-0d7f982c'
}

次に以下のコードについても確認します。

        instance_id = instance['InstanceId']
        instance_list.append(instance_id)
    print(f"instance-List:{instance_list}")

まず先ほどのinstanesの中にあるInstanceId'(上の図だとi-0b8f3a3d3c251e3e3)をinstance_idという変数に格納します。
その後しれっと、このinstance_idをinstance_listというリストに加えます。

はい、ここまでで、以下二つの関数についての解説を終わります。

vpcId = get_instace_vpc_id(instance_id)
get_instances()

では次はifの中身について深堀していきます。
ざっくりの説明ですが、先ほど取得したinstance_idがinstance_listに含まれていた場合に、そのインスタンスのIAMロールをデタッチするといった作業になります。

 if instance_id in instance_list:
        try:
            remove_role(instance_id)
        except Exception as e:
            print(e)
    
        # Attach the isolated Security group
        try:
            sg_response = ec2_client.describe_security_groups(
            Filters=[
                    {
                        'Name': 'group-name',
                        'Values': [
                            'Isolated_SG',
                        ]
                    },
                ]
                )
            logger.info(sg_response)    
            if sg_response.get('SecurityGroups'):
                security_group_id = sg_response.get('SecurityGroups')[0].get("GroupId")
                logger(security_group_id)
                attach_isolated_sg(instance_id, security_group_id)
            else:
                security_group_id = create_sg(vpcId)
                attach_isolated_sg(instance_id, security_group_id)
        except Exception as e:
            print(e)

SNSより渡されたeventに含まれているinstance_idがget_instances()関数でリスト化されたinstance_listに含まれているか確認します。
仮に含まれている場合は、remove_roleメソッドで、ロールを解除します。

その後describe_security_groupsメソッドで、Isolated_SGでフィルタリングした値sg_response に格納します。具体的にはこんか感じのものがsg_responseに格納されます。

    "SecurityGroups": [
        {
            "Description": "examplename",
            "GroupName": "examplename",
            "IpPermissions": [
                {
                    "IpProtocol": "-1",
                    "IpRanges": [
                        {
                            "CidrIp": "0.0.0.0/0"
                        }
                    ],
                    "Ipv6Ranges": [],
                    "PrefixListIds": [],
                    "UserIdGroupPairs": []
                }
            ],
            "OwnerId": "example",
            "GroupId": "example",
            "IpPermissionsEgress": [
                {
                    "IpProtocol": "-1",
                    "IpRanges": [
                        {
                            "CidrIp": "0.0.0.0/0"
                        }
                    ],
                    "Ipv6Ranges": [],
                    "PrefixListIds": [],
                    "UserIdGroupPairs": []
                }
            ],
            "VpcId": "example"
        }
    ]

では次のifの処理について確認していきます。

  if sg_response.get('SecurityGroups'):
                security_group_id = sg_response.get('SecurityGroups')[0].get("GroupId")
                logger(security_group_id)
                attach_isolated_sg(instance_id, security_group_id)
            else:
                security_group_id = create_sg(vpcId)
                attach_isolated_sg(instance_id, security_group_id)
        except Exception as e:
            print(e)

一行目は、実際に指定された引数が存在するか確認し、存在した場合はその情報を返します。その後sg_responseに格納されている情報から、group_idを取得し、security_group_idに格納します。
ここで、格納されている値はIsolated_SGというSG名と紐づくsecuritygroupのidとなります。
つまりここでの処理はSNSから渡されたinstanceIdにIsolated_SGをアタッチするという処理になります。

仮にここでIsolated_SGが存在しなかった場合は、以下のcreate_sgで作成されることになります。

def create_sg(vpcId):
    """
    This function creates the isolated security group with no egress access.
    """
    security_group_id = ec2_resource.create_security_group(GroupName="Isolated_SG", 
                                                     Description="Isolated SG for forensic analysis", 
                                                     VpcId=vpcId)
    security_group_id.revoke_egress(IpPermissions= [{'IpProtocol': '-1','IpRanges': [{'CidrIp': '0.0.0.0/0'}],'Ipv6Ranges': [],'PrefixListIds': [],'UserIdGroupPairs': []}])
    return security_group_id

この関数は、与えられたVPC IDを使用して「Isolated_SG」という名前のセキュリティグループを作成し、すべての出口トラフィックを拒否する設定をしています。

その後、ec2_resource.create_security_groupメソッドを使用してセキュリティグループを作成し、セキュリティグループIDを返します。作成されたセキュリティグループには、許可された入力トラフィックがなく、すべてのインバウンドトラフィックがブロックされています。
次に、reboke_engressメソッドでアウトバウンド通信も無効化します。

以下のIpProtocal: "-1"は、すべてのトラフィックを許可するアウトバウンドルールを削除する。という挙動です。

[{'IpProtocol': '-1',  'IpRanges': [{'CidrIp': '0.0.0.0/0'}],
  'Ipv6Ranges': [],
  'PrefixListIds': [],
  'UserIdGroupPairs': []}]

はい、これで前半部分のifの解説は終了になります。

あとは、まだ解説しきれていなかったコードについて説明します。


remove_role(instance_id):
    """
    This function removed the instance proflie attached to the instance.
    """
    describe_instance_profile_association_response = ec2_client.describe_iam_instance_profile_associations(
        Filters=[
                {
                    'Name': 'instance-id',
                    'Values': [
                        instance_id,
                    ]
                },
            ]
        )
    print(describe_instance_profile_association_response)
    association_id = describe_instance_profile_association_response['IamInstanceProfileAssociations'][0].get('AssociationId')
    
    print(association_id)
    response = ec2_client.disassociate_iam_instance_profile(
            AssociationId=association_id
            )
    
    logger.info(response)

変数describe_instance_profile_association_response には、instance_idで絞った以下のような情報が格納されます。

詳細は公式ドキュメントをご確認ください

{
  "IamInstanceProfileAssociations": [
      {
          "InstanceId": "i-09eb09efa73ec1dee",
          "State": "associated",
          "AssociationId": "iip-assoc-0db249b1f25fa24b8",
          "IamInstanceProfile": {
              "Id": "AIPAJVQN4F5WVLGCJDRGM",
              "Arn": "arn:aws:iam::123456789012:instance-profile/admin-role"
          }
      },
      {
          "InstanceId": "i-0402909a2f4dffd14",
          "State": "associating",
          "AssociationId": "iip-assoc-0d1ec06278d29f44a",
          "IamInstanceProfile": {
              "Id": "AGJAJVQN4F5WVLGCJABCM",
              "Arn": "arn:aws:iam::123456789012:instance-profile/user1-role"
          }
      }
   ]
}

その後、以下のコードでAsossiationIdをassociation_id に格納します。
ここで格納したassociation_id は後続のdisassociate_iam_instance_profile メソッドで使用します。

association_id = describe_instance_profile_association_response['IamInstanceProfileAssociations'][0].get('AssociationId')

では実際にIAMロールをデタッチしていきます。

  response = ec2_client.disassociate_iam_instance_profile(
            AssociationId=association_id
            )

disassociate_iam_instance_profileメソッドで、AssociationIdを指定する必要があります。
ここで、先ほど取得したassociation_idを指定します。

以上でコードの解説を終わります。

この後Lambdaのコンソール上から、以下のようにトリガーのSNSトピックを設定します。

image.png

image.png

動作確認

最後に意図的に401エラーを出すことにより、IAMロールが解除され、かつIsolated_SGがアタッチされたことを確認します。
image.png

終わりに

以上です。
想像上のボリュームになってしまい、一部コードの解説が雑になってしまった箇所があったかと思います。
このCloudwatch alarm→SNS→Lambdaの流れは、業務でも使用するフローですので、引き続き学習を続けたいです。

5
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?