0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【超便利!!】ConfigカスタムルールでVPCフローログを自動設定

Posted at

はじめに

様々な環境のVPCフローログを1つのS3バケットに集約することで、管理が容易になり、VPCフローログの分析も容易になります。
VPCフローログの設定がないVPCにVPCフローログを設定したい人向けの記事となります。

AWS公式ブログで、AWS Configルールを利用して、VPCフローログを自動設定するブログがあります。この記事はこちらのブログのアレンジです。

単純にVPCフローログが設定されていないVPCに対してVPCフローログを設定したい場合は、こちらのブログを参照してください。
本記事では、既にVPCフローログが設定されているVPCもあるが、それぞれの環境ごとにバラバラのため、集約できていないという場合のソリューションです。

つまり 「特定のS3バケットを宛先としているVPCフローログがない場合はVPCフローログを設定する」 仕組みをご紹介します!

構成

以下の図のような構成とワークフローで実行します。

image.png

リソース

リソース一式を作成するCloudFormationテンプレートは以下の通りです。

CloudFormationテンプレート
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Detect VPCs whose Flow Logs are not sent to the specified S3 bucket, and optionally auto-remediate.'

Parameters:
  CustomConfigRuleName:
    Description: Name of the AWS Config Rule.
    Type: String
    Default: ConfigRuleForVpcFlowLogDestination
  ExpectedS3BucketName:
    Description: Expected S3 bucket name for VPC Flow Logs.
    Type: String
    Default: sample-vpc-flow-log-bucket
  TrafficType:
    Type: String
    AllowedValues:
      - ACCEPT
      - REJECT
      - ALL
    Default: ALL
  MaxExecutionFrequency:
    Type: String
    AllowedValues:
      - One_Hour
      - Three_Hours
      - Six_Hours
      - Twelve_Hours
      - TwentyFour_Hours
    Default: One_Hour

Resources:

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      Path: '/'
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: AllowDescribeFlowLogs
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - ec2:DescribeFlowLogs
                  - ec2:DescribeVpcs
                  - config:PutEvaluations
                Resource: "*"

  VpcFlowLogCheckerFunction:
    Type: AWS::Lambda::Function
    Properties:
      Handler: index.lambda_handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Runtime: python3.11
      Timeout: 300
      Code:
        ZipFile: |
          import logging
          import json
          import boto3

          logger = logging.getLogger()
          logger.setLevel(logging.INFO)

          def evaluation_resource(event):
              invoking_event = json.loads(event['invokingEvent'])
              vpc_id = invoking_event['configurationItem']['resourceId']
              expected_bucket = json.loads(event['ruleParameters'])['ExpectedS3BucketName']
              expected_arn_prefix = f"arn:aws:s3:::{expected_bucket}"

              ec2 = boto3.client('ec2')
              response = ec2.describe_flow_logs(Filters=[{'Name': 'resource-id', 'Values': [vpc_id]}])

              for log in response.get('FlowLogs', []):
                  destination = log.get('LogDestination', '')
                  if destination.startswith(expected_arn_prefix):
                      return {
                          'ComplianceType': 'COMPLIANT',
                          'Annotation': 'VPC has flow logs configured to the expected S3 bucket'
                      }

              logger.info(f"Checking flow logs for VPC: {vpc_id}")
              logger.info(f"Expected S3 bucket: {expected_bucket}")
              logger.info(f"Flow logs found: {response.get('FlowLogs', [])}")
              return {
                  'ComplianceType': 'NON_COMPLIANT',
                  'Annotation': 'VPC has not flow logs configured to the expected S3 bucket'
              }

          def lambda_handler(event, context):
              logger.info(f"Received event: {json.dumps(event)}")
              evaluation = evaluation_resource(event)

              invoking_event = json.loads(event["invokingEvent"])
              configuration_item = invoking_event["configurationItem"]

              result_token = "No token found."
              if "resultToken" in event:
                  result_token = event["resultToken"]

              # boto3を使用して AWS Config に結果を put
              config = boto3.client("config")
              config.put_evaluations(
                  Evaluations=[
                      {
                          # 通常はinputのリソースタイプ/リソースIDを使用する
                          "ComplianceResourceType":
                              configuration_item["resourceType"],
                          "ComplianceResourceId":
                              configuration_item["resourceId"],
                              
                          # 以下2つが評価結果
                          # COMPLIANT, NON_COMPLIANT, NOT_APPLICABLE
                          "ComplianceType":
                              evaluation['ComplianceType'],
                          # 評価結果の文字列
                          "Annotation":
                              evaluation['Annotation'],
                          
                          # 並べ替えに使うタイムスタンプ
                          # キャプチャ時のタイムスタンプを用いれば良い
                          "OrderingTimestamp":
                              configuration_item["configurationItemCaptureTime"]
                      },
                  ],
                  ResultToken=result_token
              )

  VpcFlowLogCheckerFunctionPermission:
    Type: AWS::Lambda::Permission
    Properties:
      Action: lambda:InvokeFunction
      FunctionName: !Ref VpcFlowLogCheckerFunction
      Principal: config.amazonaws.com

  ConfigRuleForVpcFlowLogDestination:
    Type: AWS::Config::ConfigRule
    Properties:
      ConfigRuleName: !Ref CustomConfigRuleName
      Description: Ensure VPC Flow Logs are sent to the specified S3 bucket.
      InputParameters:
        ExpectedS3BucketName: !Ref ExpectedS3BucketName
      MaximumExecutionFrequency: !Ref MaxExecutionFrequency
      Scope:
        ComplianceResourceTypes:
          - AWS::EC2::VPC
      Source:
        Owner: CUSTOM_LAMBDA
        SourceIdentifier: !GetAtt VpcFlowLogCheckerFunction.Arn
        SourceDetails:
          - EventSource: aws.config
            MessageType: ConfigurationItemChangeNotification

  ConfigRemediationRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: ssm.amazonaws.com
            Action: sts:AssumeRole
      Path: /
      Policies:
        - PolicyName: RemediateVpcFlowLogsPolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogDelivery
                  - logs:DeleteLogDelivery
                  - logs:CreateLogGroup
                  - logs:PutResourcePolicy
                  - ec2:CreateFlowLogs
                  - ec2:DescribeFlowLogs
                Resource: "*"

  VpcFlowLogsRemediationConfiguration:
    DependsOn: ConfigRuleForVpcFlowLogDestination
    Type: AWS::Config::RemediationConfiguration
    Properties:
      ConfigRuleName: !Ref CustomConfigRuleName
      Automatic: true
      MaximumAutomaticAttempts: 5
      RetryAttemptSeconds: 50
      ResourceType: AWS::EC2::VPC
      Parameters:
        VPCIds:
          ResourceValue:
            Value: RESOURCE_ID
        LogDestinationType:
          StaticValue:
            Values:
              - s3
        LogDestinationArn:
          StaticValue:
            Values:
              - !Sub arn:aws:s3:::${ExpectedS3BucketName}
        TrafficType:
          StaticValue:
            Values:
              - !Ref TrafficType
        AutomationAssumeRole:
          StaticValue:
            Values:
              - !GetAtt ConfigRemediationRole.Arn
      TargetId: AWS-EnableVPCFlowLogs
      TargetType: SSM_DOCUMENT
      TargetVersion: 1

Outputs:
  ConfigRuleArn:
    Description: ARN of the AWS Config Rule
    Value: !GetAtt ConfigRuleForVpcFlowLogDestination.Arn
  ConfigRemediationRoleArn:
    Description: IAM Role used for auto-remediation
    Value: !GetAtt ConfigRemediationRole.Arn

カスタムルール用Lambda関数

以下がソースコードです。
やっていることは至ってシンプルで、VPCに設定されているフローログの一覧を取得し、一覧の中に宛先が指定のS3バケットだったら 準拠 として登録、なければ 非準拠 として登録します。
今回は宛先で準拠/非準拠の判断をさせていますが、例えば、特定の名前がついているか、タグがついているか、ALLのフローログがあるかなど様々な判断基準でルールを作成することができます。

Lambdaソースコード
import logging
import json
import boto3

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def evaluation_resource(event):
    invoking_event = json.loads(event['invokingEvent'])
    vpc_id = invoking_event['configurationItem']['resourceId']
    expected_bucket = json.loads(event['ruleParameters'])['ExpectedS3BucketName']
    expected_arn_prefix = f"arn:aws:s3:::{expected_bucket}"

    ec2 = boto3.client('ec2')
    response = ec2.describe_flow_logs(Filters=[{'Name': 'resource-id', 'Values': [vpc_id]}])

    for log in response.get('FlowLogs', []):
        destination = log.get('LogDestination', '')
        if destination.startswith(expected_arn_prefix):
            return {
                'ComplianceType': 'COMPLIANT',
                'Annotation': 'VPC has flow logs configured to the expected S3 bucket'
            }

    logger.info(f"Checking flow logs for VPC: {vpc_id}")
    logger.info(f"Expected S3 bucket: {expected_bucket}")
    logger.info(f"Flow logs found: {response.get('FlowLogs', [])}")
    return {
        'ComplianceType': 'NON_COMPLIANT',
        'Annotation': 'VPC has not flow logs configured to the expected S3 bucket'
    }

def lambda_handler(event, context):
    logger.info(f"Received event: {json.dumps(event)}")
    evaluation = evaluation_resource(event)

    invoking_event = json.loads(event["invokingEvent"])
    configuration_item = invoking_event["configurationItem"]

    result_token = "No token found."
    if "resultToken" in event:
        result_token = event["resultToken"]

    # boto3を使用して AWS Config に結果を put
    config = boto3.client("config")
    config.put_evaluations(
        Evaluations=[
            {
                # 通常はinputのリソースタイプ/リソースIDを使用する
                "ComplianceResourceType":
                    configuration_item["resourceType"],
                "ComplianceResourceId":
                    configuration_item["resourceId"],
                    
                # 以下2つが評価結果
                # COMPLIANT, NON_COMPLIANT, NOT_APPLICABLE
                "ComplianceType":
                    evaluation['ComplianceType'],
                # 評価結果の文字列
                "Annotation":
                    evaluation['Annotation'],
                
                # 並べ替えに使うタイムスタンプ
                # キャプチャ時のタイムスタンプを用いれば良い
                "OrderingTimestamp":
                    configuration_item["configurationItemCaptureTime"]
            },
        ],
        ResultToken=result_token
    )

Configルール

ConfigルールはScopeをAWS::EC2::VPCに設定します。

Configの課金は意外と気づかずに上がりがちなので、Scopeは必要最低限にしましょう!

パラメータにExpectedS3BucketNameキーを作成し、宛先バケット名を設定しておきます。

image.png

修復アクションには、AWS が用意している SSM ドキュメントAWS-EnableVPCFlowLogsを利用します。

検知まで若干時間がかかるかと思いますので、設定してから10分程度待ってフローログが作成されているかご確認ください。

さいごに

今回はVPCフローログの集約管理のための自動設定ソリューションを紹介しました。
必要に応じてアレンジして是非使ってみてください!

集約したら、ぜひ以下の記事を参考に自然言語での分析を試してみてください!

弊社では一緒に働く仲間を募集中です!

現在、様々な職種を募集しております。
カジュアル面談も可能ですので、ご連絡お待ちしております!

募集内容等詳細は、是非採用サイトをご確認ください。
https://engineer.po-holdings.co.jp/

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?