1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

VPC OriginではないVPC内のOrigin を検出する

Posted at

こんにちは!

CloudFront VPC Originsは皆さん触られましたか?メリットが大きいので周囲に利用を推奨していきたい一方、VPC Originが使えるのに知らなくて利用されないケースが出るかもしれません。そこでVPC OriginsではないVPC内のOriginを検出する方法を考えてみます。

CloudFront VPC Originsについて

まずは概要とメリットを整理します。

概要

CloudFront VPC Origins は、プライベートサブネット内のALB、NLB、EC2インスタンスをCloudFrontディストリビューションのOriginとして直接指定できる機能です。2024年に発表されました。

メリット

大きく2つのメリットがあります。

Originの保護強化

従来、CloudFrontを使用しつつ、パブリックサブネット上のOriginへの直接アクセスを防ぐには、追加設定が必要でした:

  • カスタムヘッダ方式: CloudFrontでカスタムヘッダを付与し、ALBのリスナールールで検証
    • ヘッダが漏洩すると直接アクセスされるリスクがある
  • Managed Prefix List方式: ALBのセキュリティグループでCloudFrontのManaged Prefix Listを指定
    • 別のアカウントのCloudFrontからOriginにアクセスされる可能性は残る

CloudFront VPC Originsを使用するとOriginに直接アクセスする経路自体がなくなるため、より安全な構成が実現できます。

コストの節約

パブリックサブネット上に2つのAZを利用するALBを配置した場合、パブリックIPの料金として最低でも月額7.2ドルが発生します。CloudFront VPC Originsを使用すれば、パブリックIPが不要になるため、このコストが完全に削減できます。

AWSアカウント内のOriginの検出

VPC Originsは大きなデメリットがないため、既存のディストリビューションや今後作成するディストリビューションでも積極的に活用していきたいところです。

AWSアカウント内のリソースに特定の設定が適用されているかはAWS Configルールを利用して検出できます。ただし残念ながら2025年3月現在、AWS公式のマネージドルールは提供されていないようです。

このことからAWS Configカスタムルールを記述して検出してみます。カスタムルールとはユーザー独自に定義することが可能なConfigルールです。Lambda関数を記述することでロジックを定義します。

同じCloudFrontのConfigルールでも「S3がOACを利用しているか」などマネージドなルールが提供されているものもあるのでそのうち提供されるかもしれません。
現在のマネージドルールの一覧は以下から確認できます

https://docs.aws.amazon.com/ja_jp/config/latest/developerguide/managed-rules-by-aws-config.html

アーキテクチャ

アーキテクチャは以下のようになります
configrule.drawio.png

  1. 定期評価などをトリガーにConfigルールが呼び出されると、ConfigがLambdaを呼び出します。LambdaはリソースポリシーでConfigからの実行が許可されています。
  2. Lambdaにはカスタムルールのロジックが記述されており、CloudFrontにget_distribution_configを発行することで、ディストリビューションの設定を取得しVPC Originかどうか評価します。設定にVpcOriginConfigという項目があればVPC Originを利用しています。
  3. Lambdaは評価結果をConfigに返します。Configは画面上に結果を表示します。

Lambda関数のコード

Lambda関数のコードをこちらに置いておきます
# lambda/index.py
import json
import boto3
import datetime
import re
from time import sleep

def is_vpc_resource_domain(domain_name):
    """
    ドメイン名がVPC内リソース(ALBやNLBなど)であるかを判定する関数
    
    Args:
        domain_name: チェックするドメイン名
        
    Returns:
        bool: VPC内リソースの場合はTrue、そうでない場合はFalse
    """
    # ALB/NLBのドメインパターン
    elb_pattern = r'.*\.elb\.amazonaws\.com$'
    
    # EC2インスタンスのパブリックDNSパターン
    ec2_pattern = r'.*\.compute\.amazonaws\.com$'
    
    # いずれかのパターンに一致する場合はVPC内リソースと判定
    return (re.match(elb_pattern, domain_name) is not None or
            re.match(ec2_pattern, domain_name) is not None)

def evaluate_compliance(configuration_item, rule_parameters):
    """
    CloudFrontのディストリビューションがVPC内リソースをオリジンとしているが
    VPC Originを使用していないかを評価する関数
    
    Args:
        configuration_item: AWS Configから提供される設定項目
        rule_parameters: ルールのパラメータ
        
    Returns:
        str: 評価結果 (COMPLIANT または NON_COMPLIANT)
    """
    if configuration_item['resourceType'] != 'AWS::CloudFront::Distribution':
        return 'NOT_APPLICABLE'
    
    # CloudFront クライアントを初期化
    cloudfront = boto3.client('cloudfront')
    
    try:
        # ディストリビューションIDを取得
        distribution_id = configuration_item['resourceId']
        
        # ディストリビューションの詳細を取得
        distribution_config = cloudfront.get_distribution_config(
            Id=distribution_id
        )
        
        # Originsを確認
        origins = distribution_config['DistributionConfig']['Origins']['Items']
        
        for origin in origins:
            domain_name = origin.get('DomainName', '')
            
            # VPC内リソースのドメイン名パターンに一致するか確認
            if is_vpc_resource_domain(domain_name):
                # VPC内リソースがオリジンとして使用されている

                # VpcOriginConfigがあれば、VPC Originを使用していないとみなす
                if 'VpcOriginConfig' not in origin:
                    # VPC内リソースをオリジンとしているが、VPC Originを使用していない
                    # これは望ましくない状態なのでNON_COMPLIANT
                    return 'NON_COMPLIANT'
        
        # VPC内リソースがオリジンとして使用されていないか、
        # または使用されていてもすべてVPC Originを正しく使用している場合
        return 'COMPLIANT'
        
    except Exception as e:
        print(f"評価中にエラーが発生しました: {str(e)}")
        return 'ERROR'

def lambda_handler(event, context):
    """
    AWS Config Ruleのためのメインハンドラー関数
    
    Args:
        event: Lambda関数のイベントオブジェクト
        context: Lambda関数のコンテキストオブジェクト
        
    Returns:
        dict: AWS Config Ruleの評価結果
    """
    print('イベント:', json.dumps(event))
    
    invoking_event = json.loads(event['invokingEvent'])
    rule_parameters = json.loads(event.get('ruleParameters', '{}'))
    
    compliance_value = 'NOT_APPLICABLE'
    
    if 'configurationItem' in invoking_event:
        configuration_item = invoking_event['configurationItem']
        compliance_value = evaluate_compliance(configuration_item, rule_parameters)
    elif 'messageType' in invoking_event and invoking_event['messageType'] == 'ScheduledNotification':
        # スケジュールされた評価の場合、すべてのCloudFrontディストリビューションを評価
        config_client = boto3.client('config')
        cloudfront_client = boto3.client('cloudfront')
        
        # すべてのディストリビューションを取得
        paginator = cloudfront_client.get_paginator('list_distributions')
        evaluation_results = []
        
        for page in paginator.paginate():
            if 'Items' not in page['DistributionList']:
                continue
                
            for distribution in page['DistributionList']['Items']:
                distribution_id = distribution['Id']
                
                # ディストリビューションの詳細を取得
                try:
                    distribution_config = cloudfront_client.get_distribution_config(
                        Id=distribution_id
                    )
                    
                    # 評価のための構成項目を作成
                    config_item = {
                        'resourceType': 'AWS::CloudFront::Distribution',
                        'resourceId': distribution_id
                    }
                    
                    compliance_result = evaluate_compliance(config_item, rule_parameters)
                    
                    # NON_COMPLIANTの場合、詳細情報を取得
                    annotation = None
                    if compliance_result == 'NON_COMPLIANT':
                        origins = distribution_config['DistributionConfig']['Origins']['Items']
                        vpc_origins = [o['DomainName'] for o in origins if is_vpc_resource_domain(o.get('DomainName', ''))]
                        annotation = f"VPC内リソース {', '.join(vpc_origins)} をオリジンとして使用していますが、VPC Originではありません"
                    
                    evaluation = {
                        'ComplianceResourceType': 'AWS::CloudFront::Distribution',
                        'ComplianceResourceId': distribution_id,
                        'ComplianceType': compliance_result,
                        'OrderingTimestamp': datetime.datetime.now().isoformat()
                    }
                    
                    if annotation:
                        evaluation['Annotation'] = annotation
                    
                    evaluation_results.append(evaluation)
                    
                    # AWS Config APIの制限を考慮して評価を送信(最大25個の評価を一度に送信可能)
                    if len(evaluation_results) == 25:
                        put_evaluations(config_client, event, evaluation_results)
                        evaluation_results = []
                        # APIレート制限を回避するために短い待機時間を設ける
                        sleep(0.5)
                except Exception as e:
                    print(f"ディストリビューション {distribution_id} の評価中にエラーが発生しました: {str(e)}")
        
        # 残りの評価結果を送信
        if evaluation_results:
            put_evaluations(config_client, event, evaluation_results)
            
        return {
            'compliance': 'COMPLIANT',  # スケジュール評価の場合は常にCOMPLIANTを返す
            'evaluations': 'Performed bulk evaluation'
        }
    
    return {
        'compliance': compliance_value
    }

def put_evaluations(config_client, event, evaluation_results):
    """
    評価結果をAWS Configに送信する関数
    
    Args:
        config_client: AWS Config クライアント
        event: Lambda関数のイベントオブジェクト
        evaluation_results: 評価結果のリスト
    """
    response = config_client.put_evaluations(
        Evaluations=evaluation_results,
        ResultToken=event.get('resultToken')
    )
    return response

AWS CDK

上で述べたアーキテクチャを実現するためAWS CDKのコードも置いておきます
my-project/
├── bin/
│   └── app.ts
├── lib/
│   └── cloudfront-vpc-origin-config-rule-stack.ts
└── lambda/
    └── index.py(上で示したコード)

bin/app.ts

#!/usr/bin/env node
import "source-map-support/register";
import * as cdk from "aws-cdk-lib";
import { CloudFrontVpcOriginConfigRuleStack } from "../lib/cloudfront-vpc-origin-config-rule-stack";

const app = new cdk.App();
new CloudFrontVpcOriginConfigRuleStack(
  app,
  "CloudFrontVpcOriginConfigRuleStack",
  {
    /* 必要に応じてスタックプロパティを指定 */
    env: {
      account: process.env.CDK_DEFAULT_ACCOUNT,
      region: process.env.CDK_DEFAULT_REGION,
    },
    description:
      "CloudFrontディストリビューションのVPC Origin使用状況を検出するConfig Rule",
  }
);

lib/cloudfront-vpc-origin-config-rule-stack.ts

import * as cdk from "aws-cdk-lib";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as iam from "aws-cdk-lib/aws-iam";
import * as config from "aws-cdk-lib/aws-config";
import { Construct } from "constructs";

export class CloudFrontVpcOriginConfigRuleStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // Lambda用のIAMロールを作成
    const lambdaRole = new iam.Role(this, "CloudFrontVpcOriginLambdaRole", {
      assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"),
      managedPolicies: [
        iam.ManagedPolicy.fromAwsManagedPolicyName(
          "service-role/AWSLambdaBasicExecutionRole"
        ),
      ],
    });

    // CloudFrontとConfig APIにアクセスする権限を追加
    lambdaRole.addToPolicy(
      new iam.PolicyStatement({
        effect: iam.Effect.ALLOW,
        actions: [
          "cloudfront:ListDistributions",
          "cloudfront:GetDistributionConfig",
          "config:PutEvaluations",
        ],
        resources: ["*"],
      })
    );

    // Config Rule用のLambda関数
    const configRuleLambda = new lambda.Function(
      this,
      "CloudFrontVpcOriginLambda",
      {
        runtime: lambda.Runtime.PYTHON_3_13,
        handler: "index.lambda_handler",
        code: lambda.Code.fromAsset("lambda"), // lambdaディレクトリに配置されていることを想定
        timeout: cdk.Duration.seconds(60),
        memorySize: 256,
        role: lambdaRole,
      }
    );

    // AWS Config Ruleの作成
    const configRule = new config.CustomRule(
      this,
      "CloudFrontVpcOriginConfigRule",
      {
        configRuleName: "cloudfront-vpc-origin-check",
        lambdaFunction: configRuleLambda,
        description:
          "CloudFrontディストリビューションがVPC Originを使用しているかを確認するルール",
        periodic: true, // 定期的な評価を有効化
        maximumExecutionFrequency:
          config.MaximumExecutionFrequency.TWENTY_FOUR_HOURS, // 1日1回の評価
        configurationChanges: false,
        ruleScope: config.RuleScope.fromResources([
          config.ResourceType.CLOUDFRONT_DISTRIBUTION,
        ]), // CloudFrontリソースに限定
      }
    );

    // 出力
    new cdk.CfnOutput(this, "ConfigRuleName", {
      value: configRule.configRuleName,
      description: "ConfigRuleの名前",
    });

    new cdk.CfnOutput(this, "LambdaFunctionName", {
      value: configRuleLambda.functionName,
      description: "Lambda関数の名前",
    });
  }
}

検出結果

Configルールの評価画面を示します。画面中の対象範囲内のリソースの一行目がVPC OriginのALB、二行目がVPC OriginではないALBをオリジンとするディストリビューションです。コンプライアンスの列から正しく判定されていることがわかります。

cloudfront-vpc-origin-check2.png

組織(Organizations)内のOriginの検出

先ほどは単一AWSアカウント内のOriginの特定方法でしたが、Organizations内の複数のAWSアカウントにわたって検出したい場合について考えます。

アーキテクチャ

カスタムConfigルールは自アカウント以外のLambda関数を呼ぶことが可能です。この仕様を活かすとアーキテクチャは以下のように構成できます。こうするとLambdaを全アカウントにデプロイしなくてよくなります。

configrule3.drawio.png

  1. 定期評価などをトリガーにConfigルールが呼び出されると、Configが別アカウントに存在するLambdaを呼び出します。Lambdaはリソースポリシーでconfig.amazonaws.comからの実行が許可されています。
  2. Lambda関数は元のアカウントのIAMロールをAssumeします。IAMロール名はすべてのアカウントで同じロール名を利用することでLambdaからAssumeできるようにしています。
  3. LambdaからCloudFrontにget_distribution_configを発行することで、ディストリビューションの設定を取得し非VPC Originかどうか評価します。設定にVpcOriginConfigという項目があればVPC Originを利用しています。
  4. LambdaはAssumeしたロールを利用して評価結果を元のアカウントのConfigに返します。Configは画面上に結果を表示します。

アーキテクチャ補足

図中のカスタムConfigルールとIAMロールはCloudFormation StackSets(Service Managed)を利用してデプロイすることでアカウント追加時に検出も構成できます。

組織全体にConfigルールを展開する場合、OrganizationConfigRuleを利用する手もあるのですが、ルールの適用除外の柔軟性やカスタムルールとの相性を考えてこちらを選択しています。

項目 OrganizationConfigRule CloudFormation StackSets
ルール適用除外の柔軟性 AWSアカウント単位 OUまたはAWSアカウント単位
カスタムルールとの相性 IAMロールのAssumeを伴う場合、ターゲットアカウントのIAMロールを別途要作成 カスタムルールがIAMロールのAssumeを伴う場合、同時に作成できる

StackSetsで作成するIAMロールは以下のように別アカウントからのsts:AssumeRoleを許可する信頼ポリシーを記述します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:root"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

上の例ではPrincipalで別アカウント全体からのAssumeを許容していますが、必要に応じて別アカウントのLambdaに付与するロールのみに絞ります。

ロールを絞る場合は一点注意事項があります。何らかの理由でLambdaに付与するIAMロールが削除されて再作成されると、信頼ポリシーの参照がおかしくなるためCloudFormationのStackSetsをいったん全て削除して作成する必要があります。

Lambdaは別のAWSアカウントにあらかじめ作成しておきます。ハンドラ内では以下のようにIAMロールAssumeするコードを記述しておくことでcloudfront:get_distribution_configconfig:put_evaluationを元のアカウントの権限で実行できます。

def get_boto3_client(service, invoking_account_id):
    """
    指定されたアカウントのロールを Assume して boto3 クライアントを作成する関数
    
    Args:
        service: 使用するAWSサービス名
        invoking_account_id: Config Ruleがあるアカウント (Lambda を呼び出したアカウント)
        
    Returns:
        boto3.client: 指定されたサービスのクライアント
    """
    # デフォルトのセッションでSTSクライアントを作成
    sts_client = boto3.client('sts')
    
    # 呼び出し元のアカウントIDでロールを Assume
    role_arn = f'arn:aws:iam::{invoking_account_id}:role/{CROSS_ACCOUNT_ROLE_NAME}'
    
    try:
        # 役割を引き受ける
        assumed_role = sts_client.assume_role(
            RoleArn=role_arn,
            RoleSessionName='CloudFrontConfigRuleSession'
        )
        
        # 一時的な認証情報を取得
        credentials = assumed_role['Credentials']
        
        # 一時的な認証情報を使用してクライアントを作成
        client = boto3.client(
            service,
            aws_access_key_id=credentials['AccessKeyId'],
            aws_secret_access_key=credentials['SecretAccessKey'],
            aws_session_token=credentials['SessionToken']
        )
        
        return client

リソースポリシーは単体アカウントの時同様config.amazonaws.comからのInvokeを許可します。

{
  "Version": "2012-10-17",
  "Id": "default",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "config.amazonaws.com"
      },
      "Action": "lambda:InvokeFunction",
      "Resource": "arn:aws:lambda:<region>:123456789012:function:<function-name>"
    }
  ]
}

Lambda関数のコード

Lambda関数のコードをこちらに置いておきます
import json
import boto3
import datetime
import re
from time import sleep
import os

# AssumeするIAMロール名をグローバル変数として定義
CROSS_ACCOUNT_ROLE_NAME = os.environ.get('CROSS_ACCOUNT_ROLE_NAME')

def is_vpc_resource_domain(domain_name):
    """
    ドメイン名がVPC内リソース(ALBやNLBなど)であるかを判定する関数
    
    Args:
        domain_name: チェックするドメイン名
        
    Returns:
        bool: VPC内リソースの場合はTrue、そうでない場合はFalse
    """
    # ALB/NLBのドメインパターン
    elb_pattern = r'.*\.elb\.amazonaws\.com$'
    
    # EC2インスタンスのパブリックDNSパターン
    ec2_pattern = r'.*\.compute\.amazonaws\.com$'
    
    # いずれかのパターンに一致する場合はVPC内リソースと判定
    return (re.match(elb_pattern, domain_name) is not None or
            re.match(ec2_pattern, domain_name) is not None)

def get_boto3_client(service, invoking_account_id):
    """
    指定されたアカウントのロールを Assume して boto3 クライアントを作成する関数
    
    Args:
        service: 使用するAWSサービス名
        invoking_account_id: Config Ruleがあるアカウント (Lambda を呼び出したアカウント)
        
    Returns:
        boto3.client: 指定されたサービスのクライアント
    """
    # デフォルトのセッションでSTSクライアントを作成
    sts_client = boto3.client('sts')
    
    # 呼び出し元のアカウントIDでロールを Assume
    role_arn = f'arn:aws:iam::{invoking_account_id}:role/{CROSS_ACCOUNT_ROLE_NAME}'
    
    try:
        # 役割を引き受ける
        assumed_role = sts_client.assume_role(
            RoleArn=role_arn,
            RoleSessionName='CloudFrontConfigRuleSession'
        )
        
        # 一時的な認証情報を取得
        credentials = assumed_role['Credentials']
        
        # 一時的な認証情報を使用してクライアントを作成
        client = boto3.client(
            service,
            aws_access_key_id=credentials['AccessKeyId'],
            aws_secret_access_key=credentials['SecretAccessKey'],
            aws_session_token=credentials['SessionToken']
        )
        
        return client
    except Exception as e:
        print(f"ロールを引き受ける際にエラーが発生しました: {str(e)}")
        # フォールバックとして、デフォルトの認証情報を使用
        print("デフォルトの認証情報を使用します")
        return boto3.client(service)

def evaluate_compliance(configuration_item, rule_parameters, cloudfront_client):
    """
    CloudFrontのディストリビューションがVPC内リソースをオリジンとしているが
    VPC Originを使用していないかを評価する関数
    
    Args:
        configuration_item: AWS Configから提供される設定項目
        rule_parameters: ルールのパラメータ
        cloudfront_client: CloudFront クライアント
        
    Returns:
        str: 評価結果 (COMPLIANT または NON_COMPLIANT)
    """
    if configuration_item['resourceType'] != 'AWS::CloudFront::Distribution':
        return 'NOT_APPLICABLE'
    
    try:
        # ディストリビューションIDを取得
        distribution_id = configuration_item['resourceId']
        
        # ディストリビューションの詳細を取得
        distribution_config = cloudfront_client.get_distribution_config(
            Id=distribution_id
        )
        
        # Originsを確認
        origins = distribution_config['DistributionConfig']['Origins']['Items']
        
        for origin in origins:
            domain_name = origin.get('DomainName', '')
            
            # VPC内リソースのドメイン名パターンに一致するか確認
            if is_vpc_resource_domain(domain_name):
                # VPC内リソースがオリジンとして使用されている
                
                # vpceでなければ、VPC Originを使用していないとみなす
                if 'VpcOriginConfig' not in origin:
                    # VPC内リソースをオリジンとしているが、VPC Originを使用していない
                    # これは望ましくない状態なのでNON_COMPLIANT
                    return 'NON_COMPLIANT'
        
        # VPC内リソースがオリジンとして使用されていないか、
        # または使用されていてもすべてVPC Originを正しく使用している場合
        return 'COMPLIANT'
        
    except Exception as e:
        print(f"評価中にエラーが発生しました: {str(e)}")
        return 'ERROR'

def lambda_handler(event, context):
    """
    AWS Config Ruleのためのメインハンドラー関数
    
    Args:
        event: Lambda関数のイベントオブジェクト
        context: Lambda関数のコンテキストオブジェクト
        
    Returns:
        dict: AWS Config Ruleの評価結果
    """
    print('イベント:', json.dumps(event))
    
    invoking_event = json.loads(event['invokingEvent'])
    rule_parameters = json.loads(event.get('ruleParameters', '{}'))
    
    # Config Ruleを含むアカウントID (Lambda を呼び出したアカウント) を取得
    # AWS Config Rule からの呼び出しの場合、accountId はイベント内に含まれている
    if 'awsAccountId' in invoking_event:
        invoking_account_id = invoking_event['awsAccountId']
    
    print(f"設定評価対象アカウントID: {invoking_account_id}")
    
    # クロスアカウント認証情報でクライアントを作成
    cloudfront_client = get_boto3_client('cloudfront', invoking_account_id)
    
    compliance_value = 'NOT_APPLICABLE'
    
    if 'configurationItem' in invoking_event:
        configuration_item = invoking_event['configurationItem']
        compliance_value = evaluate_compliance(configuration_item, rule_parameters, cloudfront_client)
    elif 'messageType' in invoking_event and invoking_event['messageType'] == 'ScheduledNotification':
        # スケジュールされた評価の場合、すべてのCloudFrontディストリビューションを評価
        config_client = get_boto3_client('config', invoking_account_id)
        
        # すべてのディストリビューションを取得
        paginator = cloudfront_client.get_paginator('list_distributions')
        evaluation_results = []
        
        for page in paginator.paginate():
            if 'Items' not in page['DistributionList']:
                continue
                
            for distribution in page['DistributionList']['Items']:
                distribution_id = distribution['Id']
                
                # ディストリビューションの詳細を取得
                try:
                    distribution_config = cloudfront_client.get_distribution_config(
                        Id=distribution_id
                    )
                    
                    # 評価のための構成項目を作成
                    config_item = {
                        'resourceType': 'AWS::CloudFront::Distribution',
                        'resourceId': distribution_id
                    }
                    
                    compliance_result = evaluate_compliance(config_item, rule_parameters, cloudfront_client)
                    
                    # NON_COMPLIANTの場合、詳細情報を取得
                    annotation = None
                    if compliance_result == 'NON_COMPLIANT':
                        origins = distribution_config['DistributionConfig']['Origins']['Items']
                        vpc_origins = [o['DomainName'] for o in origins if is_vpc_resource_domain(o.get('DomainName', ''))]
                        annotation = f"VPC内リソース {', '.join(vpc_origins)} をオリジンとして使用していますが、VPC Originではありません"
                    
                    evaluation = {
                        'ComplianceResourceType': 'AWS::CloudFront::Distribution',
                        'ComplianceResourceId': distribution_id,
                        'ComplianceType': compliance_result,
                        'OrderingTimestamp': datetime.datetime.now().isoformat()
                    }
                    
                    if annotation:
                        evaluation['Annotation'] = annotation
                    
                    evaluation_results.append(evaluation)
                    
                    # AWS Config APIの制限を考慮して評価を送信(最大25個の評価を一度に送信可能)
                    if len(evaluation_results) == 25:
                        put_evaluations(config_client, event, evaluation_results)
                        evaluation_results = []
                        # APIレート制限を回避するために短い待機時間を設ける
                        sleep(0.5)
                except Exception as e:
                    print(f"ディストリビューション {distribution_id} の評価中にエラーが発生しました: {str(e)}")
        
        # 残りの評価結果を送信
        if evaluation_results:
            put_evaluations(config_client, event, evaluation_results)
            
        return {
            'compliance': 'COMPLIANT',  # スケジュール評価の場合は常にCOMPLIANTを返す
            'evaluations': 'Performed bulk evaluation'
        }
    
    return {
        'compliance': compliance_value
    }

def put_evaluations(config_client, event, evaluation_results):
    """
    評価結果をAWS Configに送信する関数
    
    Args:
        config_client: AWS Config クライアント
        event: Lambda関数のイベントオブジェクト
        evaluation_results: 評価結果のリスト
    """
    response = config_client.put_evaluations(
        Evaluations=evaluation_results,
        ResultToken=event.get('resultToken')
    )
    return response

検出結果

先ほどと代り映えしませんが正しく検出できました!

config2.png

まとめ

CloudFrontの非VPC OriginなOriginを特定する方法を紹介しました。何かのお役に立てば幸いです。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?