こんにちは!
CloudFront VPC Originsは皆さん触られましたか?メリットが大きいので周囲に利用を推奨していきたい一方、VPC Originが使えるのに知らなくて利用されないケースが出るかもしれません。そこでVPC OriginsではないVPC内のOriginを検出する方法を考えてみます。
CloudFront VPC Originsについて
まずは概要とメリットを整理します。
概要
CloudFront VPC Origins は、プライベートサブネット内のALB、NLB、EC2インスタンスをCloudFrontディストリビューションのOriginとして直接指定できる機能です。2024年に発表されました。
メリット
大きく2つのメリットがあります。
Originの保護強化
従来、CloudFrontを使用しつつ、パブリックサブネット上のOriginへの直接アクセスを防ぐには、追加設定が必要でした:
- カスタムヘッダ方式: CloudFrontでカスタムヘッダを付与し、ALBのリスナールールで検証
- ヘッダが漏洩すると直接アクセスされるリスクがある
- Managed Prefix List方式: ALBのセキュリティグループでCloudFrontのManaged Prefix Listを指定
- 別のアカウントのCloudFrontからOriginにアクセスされる可能性は残る
CloudFront VPC Originsを使用するとOriginに直接アクセスする経路自体がなくなるため、より安全な構成が実現できます。
コストの節約
パブリックサブネット上に2つのAZを利用するALBを配置した場合、パブリックIPの料金として最低でも月額7.2ドルが発生します。CloudFront VPC Originsを使用すれば、パブリックIPが不要になるため、このコストが完全に削減できます。
AWSアカウント内のOriginの検出
VPC Originsは大きなデメリットがないため、既存のディストリビューションや今後作成するディストリビューションでも積極的に活用していきたいところです。
AWSアカウント内のリソースに特定の設定が適用されているかはAWS Configルールを利用して検出できます。ただし残念ながら2025年3月現在、AWS公式のマネージドルールは提供されていないようです。
このことからAWS Configカスタムルールを記述して検出してみます。カスタムルールとはユーザー独自に定義することが可能なConfigルールです。Lambda関数を記述することでロジックを定義します。
同じCloudFrontのConfigルールでも「S3がOACを利用しているか」などマネージドなルールが提供されているものもあるのでそのうち提供されるかもしれません。
現在のマネージドルールの一覧は以下から確認できます
https://docs.aws.amazon.com/ja_jp/config/latest/developerguide/managed-rules-by-aws-config.html
アーキテクチャ
- 定期評価などをトリガーにConfigルールが呼び出されると、ConfigがLambdaを呼び出します。LambdaはリソースポリシーでConfigからの実行が許可されています。
- Lambdaにはカスタムルールのロジックが記述されており、CloudFrontに
get_distribution_config
を発行することで、ディストリビューションの設定を取得しVPC Originかどうか評価します。設定にVpcOriginConfig
という項目があればVPC Originを利用しています。 - Lambdaは評価結果をConfigに返します。Configは画面上に結果を表示します。
Lambda関数のコード
Lambda関数のコードをこちらに置いておきます
# lambda/index.py
import json
import boto3
import datetime
import re
from time import sleep
def is_vpc_resource_domain(domain_name):
"""
ドメイン名がVPC内リソース(ALBやNLBなど)であるかを判定する関数
Args:
domain_name: チェックするドメイン名
Returns:
bool: VPC内リソースの場合はTrue、そうでない場合はFalse
"""
# ALB/NLBのドメインパターン
elb_pattern = r'.*\.elb\.amazonaws\.com$'
# EC2インスタンスのパブリックDNSパターン
ec2_pattern = r'.*\.compute\.amazonaws\.com$'
# いずれかのパターンに一致する場合はVPC内リソースと判定
return (re.match(elb_pattern, domain_name) is not None or
re.match(ec2_pattern, domain_name) is not None)
def evaluate_compliance(configuration_item, rule_parameters):
"""
CloudFrontのディストリビューションがVPC内リソースをオリジンとしているが
VPC Originを使用していないかを評価する関数
Args:
configuration_item: AWS Configから提供される設定項目
rule_parameters: ルールのパラメータ
Returns:
str: 評価結果 (COMPLIANT または NON_COMPLIANT)
"""
if configuration_item['resourceType'] != 'AWS::CloudFront::Distribution':
return 'NOT_APPLICABLE'
# CloudFront クライアントを初期化
cloudfront = boto3.client('cloudfront')
try:
# ディストリビューションIDを取得
distribution_id = configuration_item['resourceId']
# ディストリビューションの詳細を取得
distribution_config = cloudfront.get_distribution_config(
Id=distribution_id
)
# Originsを確認
origins = distribution_config['DistributionConfig']['Origins']['Items']
for origin in origins:
domain_name = origin.get('DomainName', '')
# VPC内リソースのドメイン名パターンに一致するか確認
if is_vpc_resource_domain(domain_name):
# VPC内リソースがオリジンとして使用されている
# VpcOriginConfigがあれば、VPC Originを使用していないとみなす
if 'VpcOriginConfig' not in origin:
# VPC内リソースをオリジンとしているが、VPC Originを使用していない
# これは望ましくない状態なのでNON_COMPLIANT
return 'NON_COMPLIANT'
# VPC内リソースがオリジンとして使用されていないか、
# または使用されていてもすべてVPC Originを正しく使用している場合
return 'COMPLIANT'
except Exception as e:
print(f"評価中にエラーが発生しました: {str(e)}")
return 'ERROR'
def lambda_handler(event, context):
"""
AWS Config Ruleのためのメインハンドラー関数
Args:
event: Lambda関数のイベントオブジェクト
context: Lambda関数のコンテキストオブジェクト
Returns:
dict: AWS Config Ruleの評価結果
"""
print('イベント:', json.dumps(event))
invoking_event = json.loads(event['invokingEvent'])
rule_parameters = json.loads(event.get('ruleParameters', '{}'))
compliance_value = 'NOT_APPLICABLE'
if 'configurationItem' in invoking_event:
configuration_item = invoking_event['configurationItem']
compliance_value = evaluate_compliance(configuration_item, rule_parameters)
elif 'messageType' in invoking_event and invoking_event['messageType'] == 'ScheduledNotification':
# スケジュールされた評価の場合、すべてのCloudFrontディストリビューションを評価
config_client = boto3.client('config')
cloudfront_client = boto3.client('cloudfront')
# すべてのディストリビューションを取得
paginator = cloudfront_client.get_paginator('list_distributions')
evaluation_results = []
for page in paginator.paginate():
if 'Items' not in page['DistributionList']:
continue
for distribution in page['DistributionList']['Items']:
distribution_id = distribution['Id']
# ディストリビューションの詳細を取得
try:
distribution_config = cloudfront_client.get_distribution_config(
Id=distribution_id
)
# 評価のための構成項目を作成
config_item = {
'resourceType': 'AWS::CloudFront::Distribution',
'resourceId': distribution_id
}
compliance_result = evaluate_compliance(config_item, rule_parameters)
# NON_COMPLIANTの場合、詳細情報を取得
annotation = None
if compliance_result == 'NON_COMPLIANT':
origins = distribution_config['DistributionConfig']['Origins']['Items']
vpc_origins = [o['DomainName'] for o in origins if is_vpc_resource_domain(o.get('DomainName', ''))]
annotation = f"VPC内リソース {', '.join(vpc_origins)} をオリジンとして使用していますが、VPC Originではありません"
evaluation = {
'ComplianceResourceType': 'AWS::CloudFront::Distribution',
'ComplianceResourceId': distribution_id,
'ComplianceType': compliance_result,
'OrderingTimestamp': datetime.datetime.now().isoformat()
}
if annotation:
evaluation['Annotation'] = annotation
evaluation_results.append(evaluation)
# AWS Config APIの制限を考慮して評価を送信(最大25個の評価を一度に送信可能)
if len(evaluation_results) == 25:
put_evaluations(config_client, event, evaluation_results)
evaluation_results = []
# APIレート制限を回避するために短い待機時間を設ける
sleep(0.5)
except Exception as e:
print(f"ディストリビューション {distribution_id} の評価中にエラーが発生しました: {str(e)}")
# 残りの評価結果を送信
if evaluation_results:
put_evaluations(config_client, event, evaluation_results)
return {
'compliance': 'COMPLIANT', # スケジュール評価の場合は常にCOMPLIANTを返す
'evaluations': 'Performed bulk evaluation'
}
return {
'compliance': compliance_value
}
def put_evaluations(config_client, event, evaluation_results):
"""
評価結果をAWS Configに送信する関数
Args:
config_client: AWS Config クライアント
event: Lambda関数のイベントオブジェクト
evaluation_results: 評価結果のリスト
"""
response = config_client.put_evaluations(
Evaluations=evaluation_results,
ResultToken=event.get('resultToken')
)
return response
AWS CDK
上で述べたアーキテクチャを実現するためAWS CDKのコードも置いておきます
my-project/
├── bin/
│ └── app.ts
├── lib/
│ └── cloudfront-vpc-origin-config-rule-stack.ts
└── lambda/
└── index.py(上で示したコード)
bin/app.ts
#!/usr/bin/env node
import "source-map-support/register";
import * as cdk from "aws-cdk-lib";
import { CloudFrontVpcOriginConfigRuleStack } from "../lib/cloudfront-vpc-origin-config-rule-stack";
const app = new cdk.App();
new CloudFrontVpcOriginConfigRuleStack(
app,
"CloudFrontVpcOriginConfigRuleStack",
{
/* 必要に応じてスタックプロパティを指定 */
env: {
account: process.env.CDK_DEFAULT_ACCOUNT,
region: process.env.CDK_DEFAULT_REGION,
},
description:
"CloudFrontディストリビューションのVPC Origin使用状況を検出するConfig Rule",
}
);
lib/cloudfront-vpc-origin-config-rule-stack.ts
import * as cdk from "aws-cdk-lib";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as iam from "aws-cdk-lib/aws-iam";
import * as config from "aws-cdk-lib/aws-config";
import { Construct } from "constructs";
export class CloudFrontVpcOriginConfigRuleStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// Lambda用のIAMロールを作成
const lambdaRole = new iam.Role(this, "CloudFrontVpcOriginLambdaRole", {
assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"),
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName(
"service-role/AWSLambdaBasicExecutionRole"
),
],
});
// CloudFrontとConfig APIにアクセスする権限を追加
lambdaRole.addToPolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
"cloudfront:ListDistributions",
"cloudfront:GetDistributionConfig",
"config:PutEvaluations",
],
resources: ["*"],
})
);
// Config Rule用のLambda関数
const configRuleLambda = new lambda.Function(
this,
"CloudFrontVpcOriginLambda",
{
runtime: lambda.Runtime.PYTHON_3_13,
handler: "index.lambda_handler",
code: lambda.Code.fromAsset("lambda"), // lambdaディレクトリに配置されていることを想定
timeout: cdk.Duration.seconds(60),
memorySize: 256,
role: lambdaRole,
}
);
// AWS Config Ruleの作成
const configRule = new config.CustomRule(
this,
"CloudFrontVpcOriginConfigRule",
{
configRuleName: "cloudfront-vpc-origin-check",
lambdaFunction: configRuleLambda,
description:
"CloudFrontディストリビューションがVPC Originを使用しているかを確認するルール",
periodic: true, // 定期的な評価を有効化
maximumExecutionFrequency:
config.MaximumExecutionFrequency.TWENTY_FOUR_HOURS, // 1日1回の評価
configurationChanges: false,
ruleScope: config.RuleScope.fromResources([
config.ResourceType.CLOUDFRONT_DISTRIBUTION,
]), // CloudFrontリソースに限定
}
);
// 出力
new cdk.CfnOutput(this, "ConfigRuleName", {
value: configRule.configRuleName,
description: "ConfigRuleの名前",
});
new cdk.CfnOutput(this, "LambdaFunctionName", {
value: configRuleLambda.functionName,
description: "Lambda関数の名前",
});
}
}
検出結果
Configルールの評価画面を示します。画面中の対象範囲内のリソース
の一行目がVPC OriginのALB、二行目がVPC OriginではないALBをオリジンとするディストリビューションです。コンプライアンス
の列から正しく判定されていることがわかります。
組織(Organizations)内のOriginの検出
先ほどは単一AWSアカウント内のOriginの特定方法でしたが、Organizations内の複数のAWSアカウントにわたって検出したい場合について考えます。
アーキテクチャ
カスタムConfigルールは自アカウント以外のLambda関数を呼ぶことが可能です。この仕様を活かすとアーキテクチャは以下のように構成できます。こうするとLambdaを全アカウントにデプロイしなくてよくなります。
- 定期評価などをトリガーにConfigルールが呼び出されると、Configが別アカウントに存在するLambdaを呼び出します。Lambdaはリソースポリシーで
config.amazonaws.com
からの実行が許可されています。 - Lambda関数は元のアカウントのIAMロールをAssumeします。IAMロール名はすべてのアカウントで同じロール名を利用することでLambdaからAssumeできるようにしています。
- LambdaからCloudFrontに
get_distribution_config
を発行することで、ディストリビューションの設定を取得し非VPC Originかどうか評価します。設定にVpcOriginConfig
という項目があればVPC Originを利用しています。 - LambdaはAssumeしたロールを利用して評価結果を元のアカウントのConfigに返します。Configは画面上に結果を表示します。
アーキテクチャ補足
図中のカスタムConfigルールとIAMロールはCloudFormation StackSets(Service Managed)を利用してデプロイすることでアカウント追加時に検出も構成できます。
組織全体にConfigルールを展開する場合、OrganizationConfigRuleを利用する手もあるのですが、ルールの適用除外の柔軟性やカスタムルールとの相性を考えてこちらを選択しています。
項目 | OrganizationConfigRule | CloudFormation StackSets |
---|---|---|
ルール適用除外の柔軟性 | AWSアカウント単位 | OUまたはAWSアカウント単位 |
カスタムルールとの相性 | IAMロールのAssumeを伴う場合、ターゲットアカウントのIAMロールを別途要作成 | カスタムルールがIAMロールのAssumeを伴う場合、同時に作成できる |
StackSetsで作成するIAMロールは以下のように別アカウントからのsts:AssumeRole
を許可する信頼ポリシーを記述します。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::123456789012:root"
},
"Action": "sts:AssumeRole"
}
]
}
上の例ではPrincipal
で別アカウント全体からのAssumeを許容していますが、必要に応じて別アカウントのLambdaに付与するロールのみに絞ります。
ロールを絞る場合は一点注意事項があります。何らかの理由でLambdaに付与するIAMロールが削除されて再作成されると、信頼ポリシーの参照がおかしくなるためCloudFormationのStackSetsをいったん全て削除して作成する必要があります。
Lambdaは別のAWSアカウントにあらかじめ作成しておきます。ハンドラ内では以下のようにIAMロールAssumeするコードを記述しておくことでcloudfront:get_distribution_config
やconfig:put_evaluation
を元のアカウントの権限で実行できます。
def get_boto3_client(service, invoking_account_id):
"""
指定されたアカウントのロールを Assume して boto3 クライアントを作成する関数
Args:
service: 使用するAWSサービス名
invoking_account_id: Config Ruleがあるアカウント (Lambda を呼び出したアカウント)
Returns:
boto3.client: 指定されたサービスのクライアント
"""
# デフォルトのセッションでSTSクライアントを作成
sts_client = boto3.client('sts')
# 呼び出し元のアカウントIDでロールを Assume
role_arn = f'arn:aws:iam::{invoking_account_id}:role/{CROSS_ACCOUNT_ROLE_NAME}'
try:
# 役割を引き受ける
assumed_role = sts_client.assume_role(
RoleArn=role_arn,
RoleSessionName='CloudFrontConfigRuleSession'
)
# 一時的な認証情報を取得
credentials = assumed_role['Credentials']
# 一時的な認証情報を使用してクライアントを作成
client = boto3.client(
service,
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)
return client
リソースポリシーは単体アカウントの時同様config.amazonaws.com
からのInvokeを許可します。
{
"Version": "2012-10-17",
"Id": "default",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "config.amazonaws.com"
},
"Action": "lambda:InvokeFunction",
"Resource": "arn:aws:lambda:<region>:123456789012:function:<function-name>"
}
]
}
Lambda関数のコード
Lambda関数のコードをこちらに置いておきます
import json
import boto3
import datetime
import re
from time import sleep
import os
# AssumeするIAMロール名をグローバル変数として定義
CROSS_ACCOUNT_ROLE_NAME = os.environ.get('CROSS_ACCOUNT_ROLE_NAME')
def is_vpc_resource_domain(domain_name):
"""
ドメイン名がVPC内リソース(ALBやNLBなど)であるかを判定する関数
Args:
domain_name: チェックするドメイン名
Returns:
bool: VPC内リソースの場合はTrue、そうでない場合はFalse
"""
# ALB/NLBのドメインパターン
elb_pattern = r'.*\.elb\.amazonaws\.com$'
# EC2インスタンスのパブリックDNSパターン
ec2_pattern = r'.*\.compute\.amazonaws\.com$'
# いずれかのパターンに一致する場合はVPC内リソースと判定
return (re.match(elb_pattern, domain_name) is not None or
re.match(ec2_pattern, domain_name) is not None)
def get_boto3_client(service, invoking_account_id):
"""
指定されたアカウントのロールを Assume して boto3 クライアントを作成する関数
Args:
service: 使用するAWSサービス名
invoking_account_id: Config Ruleがあるアカウント (Lambda を呼び出したアカウント)
Returns:
boto3.client: 指定されたサービスのクライアント
"""
# デフォルトのセッションでSTSクライアントを作成
sts_client = boto3.client('sts')
# 呼び出し元のアカウントIDでロールを Assume
role_arn = f'arn:aws:iam::{invoking_account_id}:role/{CROSS_ACCOUNT_ROLE_NAME}'
try:
# 役割を引き受ける
assumed_role = sts_client.assume_role(
RoleArn=role_arn,
RoleSessionName='CloudFrontConfigRuleSession'
)
# 一時的な認証情報を取得
credentials = assumed_role['Credentials']
# 一時的な認証情報を使用してクライアントを作成
client = boto3.client(
service,
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)
return client
except Exception as e:
print(f"ロールを引き受ける際にエラーが発生しました: {str(e)}")
# フォールバックとして、デフォルトの認証情報を使用
print("デフォルトの認証情報を使用します")
return boto3.client(service)
def evaluate_compliance(configuration_item, rule_parameters, cloudfront_client):
"""
CloudFrontのディストリビューションがVPC内リソースをオリジンとしているが
VPC Originを使用していないかを評価する関数
Args:
configuration_item: AWS Configから提供される設定項目
rule_parameters: ルールのパラメータ
cloudfront_client: CloudFront クライアント
Returns:
str: 評価結果 (COMPLIANT または NON_COMPLIANT)
"""
if configuration_item['resourceType'] != 'AWS::CloudFront::Distribution':
return 'NOT_APPLICABLE'
try:
# ディストリビューションIDを取得
distribution_id = configuration_item['resourceId']
# ディストリビューションの詳細を取得
distribution_config = cloudfront_client.get_distribution_config(
Id=distribution_id
)
# Originsを確認
origins = distribution_config['DistributionConfig']['Origins']['Items']
for origin in origins:
domain_name = origin.get('DomainName', '')
# VPC内リソースのドメイン名パターンに一致するか確認
if is_vpc_resource_domain(domain_name):
# VPC内リソースがオリジンとして使用されている
# vpceでなければ、VPC Originを使用していないとみなす
if 'VpcOriginConfig' not in origin:
# VPC内リソースをオリジンとしているが、VPC Originを使用していない
# これは望ましくない状態なのでNON_COMPLIANT
return 'NON_COMPLIANT'
# VPC内リソースがオリジンとして使用されていないか、
# または使用されていてもすべてVPC Originを正しく使用している場合
return 'COMPLIANT'
except Exception as e:
print(f"評価中にエラーが発生しました: {str(e)}")
return 'ERROR'
def lambda_handler(event, context):
"""
AWS Config Ruleのためのメインハンドラー関数
Args:
event: Lambda関数のイベントオブジェクト
context: Lambda関数のコンテキストオブジェクト
Returns:
dict: AWS Config Ruleの評価結果
"""
print('イベント:', json.dumps(event))
invoking_event = json.loads(event['invokingEvent'])
rule_parameters = json.loads(event.get('ruleParameters', '{}'))
# Config Ruleを含むアカウントID (Lambda を呼び出したアカウント) を取得
# AWS Config Rule からの呼び出しの場合、accountId はイベント内に含まれている
if 'awsAccountId' in invoking_event:
invoking_account_id = invoking_event['awsAccountId']
print(f"設定評価対象アカウントID: {invoking_account_id}")
# クロスアカウント認証情報でクライアントを作成
cloudfront_client = get_boto3_client('cloudfront', invoking_account_id)
compliance_value = 'NOT_APPLICABLE'
if 'configurationItem' in invoking_event:
configuration_item = invoking_event['configurationItem']
compliance_value = evaluate_compliance(configuration_item, rule_parameters, cloudfront_client)
elif 'messageType' in invoking_event and invoking_event['messageType'] == 'ScheduledNotification':
# スケジュールされた評価の場合、すべてのCloudFrontディストリビューションを評価
config_client = get_boto3_client('config', invoking_account_id)
# すべてのディストリビューションを取得
paginator = cloudfront_client.get_paginator('list_distributions')
evaluation_results = []
for page in paginator.paginate():
if 'Items' not in page['DistributionList']:
continue
for distribution in page['DistributionList']['Items']:
distribution_id = distribution['Id']
# ディストリビューションの詳細を取得
try:
distribution_config = cloudfront_client.get_distribution_config(
Id=distribution_id
)
# 評価のための構成項目を作成
config_item = {
'resourceType': 'AWS::CloudFront::Distribution',
'resourceId': distribution_id
}
compliance_result = evaluate_compliance(config_item, rule_parameters, cloudfront_client)
# NON_COMPLIANTの場合、詳細情報を取得
annotation = None
if compliance_result == 'NON_COMPLIANT':
origins = distribution_config['DistributionConfig']['Origins']['Items']
vpc_origins = [o['DomainName'] for o in origins if is_vpc_resource_domain(o.get('DomainName', ''))]
annotation = f"VPC内リソース {', '.join(vpc_origins)} をオリジンとして使用していますが、VPC Originではありません"
evaluation = {
'ComplianceResourceType': 'AWS::CloudFront::Distribution',
'ComplianceResourceId': distribution_id,
'ComplianceType': compliance_result,
'OrderingTimestamp': datetime.datetime.now().isoformat()
}
if annotation:
evaluation['Annotation'] = annotation
evaluation_results.append(evaluation)
# AWS Config APIの制限を考慮して評価を送信(最大25個の評価を一度に送信可能)
if len(evaluation_results) == 25:
put_evaluations(config_client, event, evaluation_results)
evaluation_results = []
# APIレート制限を回避するために短い待機時間を設ける
sleep(0.5)
except Exception as e:
print(f"ディストリビューション {distribution_id} の評価中にエラーが発生しました: {str(e)}")
# 残りの評価結果を送信
if evaluation_results:
put_evaluations(config_client, event, evaluation_results)
return {
'compliance': 'COMPLIANT', # スケジュール評価の場合は常にCOMPLIANTを返す
'evaluations': 'Performed bulk evaluation'
}
return {
'compliance': compliance_value
}
def put_evaluations(config_client, event, evaluation_results):
"""
評価結果をAWS Configに送信する関数
Args:
config_client: AWS Config クライアント
event: Lambda関数のイベントオブジェクト
evaluation_results: 評価結果のリスト
"""
response = config_client.put_evaluations(
Evaluations=evaluation_results,
ResultToken=event.get('resultToken')
)
return response
検出結果
先ほどと代り映えしませんが正しく検出できました!
まとめ
CloudFrontの非VPC OriginなOriginを特定する方法を紹介しました。何かのお役に立てば幸いです。