0
0

はじめに

目的

AWS Foundational Security Best Practicesに含まれているコントロールである、
[S3.5] S3 汎用バケットでは、SSL を使用するリクエストが必要ですに自動で対応する。

動機

  • cdk bootstrapで作成されたS3バケットには、
    このコントロールに対応したバケットポリシーがアタッチされており、
    自動で対応されることに感動をおぼえた。

  • コンソールからのS3バケット作成時には、バケットポリシーを設定できない。
    作成後に改めてポリシーを設定する必要があり、手間だと思っていた。

以上の理由から、自動化したいと考えました。

構成

流れは以下の通りです

1. 新しいS3バケットが作成される
2. Amazon EventBridgeがこのバケットの作成イベントを検出
3. EventBridgeが、Lambda関数を起動
4. Lambda関数は以下の処理を実行
  - イベントのJSONから新しく作成されたS3バケットの名前を抽出
  - バケット名に応じた、目的のバケットポリシーを生成
  - 生成したポリシーを、該当するS3バケットにアタッチ

簡単な構成なので不要かもしれませんが、構成図は以下のとおりです。
構成図.png

実装

Eventbridge

前提条件

S3バケットの作成イベントを検知するために、CloudTrailを有効化しておく必要があります。

イベントパターン

今回は、S3バケットの作成のみを検知するようにしたいので、下記のとおりです。

イベントパターン
event_patern
{
  "detail-type": ["AWS API Call via CloudTrail"],
  "detail": {
    "eventSource": ["s3.amazonaws.com"],
    "eventName": ["CreateBucket"]
  },
  "source": ["aws.s3"]
}

Lambda

コード

S3バケットの作成イベントを検知したEventbridgeから渡されたJSONをもとに、
新しく作成されたバケットにSSL通信のみを許可するポリシーを適用します。
主な処理の流れは以下のとおりです。

1.イベント解析
  - EventBridgeから送信されたイベントデータから、作成されたS3バケット名を抽出

2.ポリシー生成:
  - バケット名に基づいて、SSL通信のみを許可するバケットポリシーを生成

3.ポリシー適用:
  - 生成したポリシーを、作成されたS3バケットに適用

4.結果のログ記録:
  - ポリシーの適用が成功したか失敗したかを、CloudWatch Logsに記録

Labdaのコード
add_ssl_policy_to_s3.py
import boto3
import json
import logging

# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# S3クライアント
s3 = boto3.client('s3')

# バケットポリシーのテンプレート
POLICY_TEMPLATE = {
    "Version": "2012-10-17",
    "Id": "AccessControl",
    "Statement": [
        {
            "Sid": "AllowSSLRequestsOnly",
            "Effect": "Deny",
            "Principal": "*",
            "Action": "s3:*",
            "Resource": [
                "arn:aws:s3:::{bucket_name}",
                "arn:aws:s3:::{bucket_name}/*"
            ],
            "Condition": {
                "Bool": {
                    "aws:SecureTransport": "false"
                }
            }
        }
    ]
}

def create_bucket_policy(bucket_name):
    # バケット名に基づいてポリシーを生成
    policy = json.dumps(POLICY_TEMPLATE).replace("{bucket_name}", bucket_name)
    return policy

def lambda_handler(event, context):
    # イベントからバケット名を取得
    bucket_name = event['detail']['requestParameters']['bucketName']
    
    try:
        # バケットポリシーを生成
        bucket_policy = create_bucket_policy(bucket_name)
        
        # バケットポリシーを適用
        s3.put_bucket_policy(
            Bucket=bucket_name,
            Policy=bucket_policy
        )
        
        logger.info(f"バケットポリシーが正常に適用されました: {bucket_name}")
        return {
            'statusCode': 200,
            'body': json.dumps(f'バケットポリシーが正常に適用されました: {bucket_name}')
        }
        
    except Exception as e:
        logger.error(f"エラーが発生しました: {str(e)}", exc_info=True)
        return {
            'statusCode': 500,
            'body': json.dumps(f'エラーが発生しました: {str(e)}')
        }

pythonは完全に独学で勉強不足なため、良くない部分があるかもしれません。
ご了承ください。

ポリシー

Lambdaの実行ロールは、AWSLambdaBasicExecutionRoleに加え
S3バケットにバケットポリシーをアタッチする許可が必要になります。

追加のIAMポリシー
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": "s3:PutBucketPolicy",
            "Resource": "arn:aws:s3:::*",
            "Effect": "Allow"
        }
    ]
}

トリガー

トリガーは、作成したEventbridgeを設定します。

動作確認

バケット名「auto-attach-ssl-policy-test」のS3バケット作成しました。
Lambdaのログは下記のとおりとなっており、想定通り動作したように見えます。

details
timestamp,message
1721046769679,"INIT_START Runtime Version: python:3.12.v28	Runtime Version ARN: arn:aws:lambda:ap-northeast-1::runtime:7776dee4c90fff4d4bf833b78d94e5b4148d14d044b867a14756f301ecfe1e28
"
1721046769964,"[INFO]	2024-07-15T12:32:49.964Z		Found credentials in environment variables.
"
1721046770118,"START RequestId: 1c0c1597-7c6e-412e-b51f-54aa0edadf7c Version: $LATEST
"
1721046770985,"[INFO]	2024-07-15T12:32:50.985Z	1c0c1597-7c6e-412e-b51f-54aa0edadf7c	バケットポリシーが正常に適用されました: auto-attach-ssl-policy-test
"
1721046770996,"END RequestId: 1c0c1597-7c6e-412e-b51f-54aa0edadf7c
"
1721046770996,"REPORT RequestId: 1c0c1597-7c6e-412e-b51f-54aa0edadf7c	Duration: 878.00 ms	Billed Duration: 878 ms	Memory Size: 128 MB	Max Memory Used: 83 MB	Init Duration: 437.87 ms	
"

バケットポリシーを確認したところ、想定通りのポリシーがアタッチされていました。
image.png

アタッチされたバケットポリシー
{
    "Version": "2012-10-17",
    "Id": "AccessControl",
    "Statement": [
        {
            "Sid": "AllowSSLRequestsOnly",
            "Effect": "Deny",
            "Principal": "*",
            "Action": "s3:*",
            "Resource": [
                "arn:aws:s3:::auto-attach-ssl-policy-test",
                "arn:aws:s3:::auto-attach-ssl-policy-test/*"
            ],
            "Condition": {
                "Bool": {
                    "aws:SecureTransport": "false"
                }
            }
        }
    ]
}

課題点

本実装には、いくつかの課題や改善の余地があります

1.ポリシー削除への対応
現在の構成では、ポリシーが削除された場合に対応できていません。
この問題に対処するための方法として以下が考えられます。

  • EventBridgeでポリシー削除イベントを検知し、再適用するLambda関数の実装
  • AWS Config ルールを使用してポリシーの存在を監視し、修復アクションをトリガーする
  • AWS Systems Manager オートメーションランブックを使用した定期的なチェックと修復

ただし、上記の方法だと異なるポリシーも適用したうえで削除してしまった場合、
現在のLambdaだと、SSLに関するポリシーのみになってしまいます。
ですので、そもそもポリシーを削除できないように、IAMやSCPで制限すべきだと思います。

2. モニタリングと通知
ポリシー適用の失敗やエラーを適切に監視し、管理者に通知する仕組みが必要です。
(失敗した場合は、SecurityHubで通知されることになるので不要かもしれません。)
方法としては以下が考えられます

  • Lambda関数の、errorログを出力している部分でSNS通知も実装する
  • CloudWatchLogsのメトリクスフィルタを活用してSNS通知する
  • 正常にポリシーがアタッチされるまでリトライするようにしたうえで、
    上記2点どちらかのSNS通知を実装する

CDK

CDKのコードを置いておきます。

CDKのコード(python)
ssl_lambda_stack.py
from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_iam as iam,
    aws_events as events,
    aws_events_targets as targets,
)
from constructs import Construct

class SslLambdaStack(Stack):

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Lambda Function
        lambda_fn = _lambda.Function(
            self, 'SSLLambda',
            handler='add_ssl_policy_to_s3.lambda_handler',
            code=_lambda.Code.from_asset('lambda'),
            runtime=_lambda.Runtime.PYTHON_3_12,
        )

        # permissions to S3 Bucket
        lambda_fn.add_to_role_policy(iam.PolicyStatement(
            actions=['s3:PutBucketPolicy'],
            resources=['arn:aws:s3:::*'],
        ))

        # Lambda Version
        version = lambda_fn.current_version

        # EventBridge Rule
        rule = events.Rule(
            self, 'S3BucketCreationRule',
            event_pattern=events.EventPattern(
                source=['aws.s3'],
                detail_type=['AWS API Call via CloudTrail'],
                detail={
                    'eventSource': ['s3.amazonaws.com'],
                    'eventName': ['CreateBucket'],
                },
            )
        )

        # Add Lambda as target for the EventBridge rule
        rule.add_target(targets.LambdaFunction(lambda_fn))

まとめ

この記事では、AWS Foundational Security Best Practicesの
コントロールID[S3.5]に対応するため、S3バケット作成時に自動的に
SSL/HTTPS通信のみを許可するポリシーを適用するシステムを構築しました。

この実装により、コンプライアンス要件の遵守を自動化することができ、
手動によるミスを無くすことが出来ます。

興味のある方は、ぜひ試してみてください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0