はじめに
様々な環境のVPCフローログを1つのS3バケットに集約することで、管理が容易になり、VPCフローログの分析も容易になります。
VPCフローログの設定がないVPCにVPCフローログを設定したい人向けの記事となります。
AWS公式ブログで、AWS Configルールを利用して、VPCフローログを自動設定するブログがあります。この記事はこちらのブログのアレンジです。
単純にVPCフローログが設定されていないVPCに対してVPCフローログを設定したい場合は、こちらのブログを参照してください。
本記事では、既にVPCフローログが設定されているVPCもあるが、それぞれの環境ごとにバラバラのため、集約できていないという場合のソリューションです。
つまり 「特定のS3バケットを宛先としているVPCフローログがない場合はVPCフローログを設定する」 仕組みをご紹介します!
構成
以下の図のような構成とワークフローで実行します。
リソース
リソース一式を作成するCloudFormationテンプレートは以下の通りです。
CloudFormationテンプレート
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Detect VPCs whose Flow Logs are not sent to the specified S3 bucket, and optionally auto-remediate.'
Parameters:
CustomConfigRuleName:
Description: Name of the AWS Config Rule.
Type: String
Default: ConfigRuleForVpcFlowLogDestination
ExpectedS3BucketName:
Description: Expected S3 bucket name for VPC Flow Logs.
Type: String
Default: sample-vpc-flow-log-bucket
TrafficType:
Type: String
AllowedValues:
- ACCEPT
- REJECT
- ALL
Default: ALL
MaxExecutionFrequency:
Type: String
AllowedValues:
- One_Hour
- Three_Hours
- Six_Hours
- Twelve_Hours
- TwentyFour_Hours
Default: One_Hour
Resources:
LambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
Path: '/'
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: AllowDescribeFlowLogs
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- ec2:DescribeFlowLogs
- ec2:DescribeVpcs
- config:PutEvaluations
Resource: "*"
VpcFlowLogCheckerFunction:
Type: AWS::Lambda::Function
Properties:
Handler: index.lambda_handler
Role: !GetAtt LambdaExecutionRole.Arn
Runtime: python3.11
Timeout: 300
Code:
ZipFile: |
import logging
import json
import boto3
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def evaluation_resource(event):
invoking_event = json.loads(event['invokingEvent'])
vpc_id = invoking_event['configurationItem']['resourceId']
expected_bucket = json.loads(event['ruleParameters'])['ExpectedS3BucketName']
expected_arn_prefix = f"arn:aws:s3:::{expected_bucket}"
ec2 = boto3.client('ec2')
response = ec2.describe_flow_logs(Filters=[{'Name': 'resource-id', 'Values': [vpc_id]}])
for log in response.get('FlowLogs', []):
destination = log.get('LogDestination', '')
if destination.startswith(expected_arn_prefix):
return {
'ComplianceType': 'COMPLIANT',
'Annotation': 'VPC has flow logs configured to the expected S3 bucket'
}
logger.info(f"Checking flow logs for VPC: {vpc_id}")
logger.info(f"Expected S3 bucket: {expected_bucket}")
logger.info(f"Flow logs found: {response.get('FlowLogs', [])}")
return {
'ComplianceType': 'NON_COMPLIANT',
'Annotation': 'VPC has not flow logs configured to the expected S3 bucket'
}
def lambda_handler(event, context):
logger.info(f"Received event: {json.dumps(event)}")
evaluation = evaluation_resource(event)
invoking_event = json.loads(event["invokingEvent"])
configuration_item = invoking_event["configurationItem"]
result_token = "No token found."
if "resultToken" in event:
result_token = event["resultToken"]
# boto3を使用して AWS Config に結果を put
config = boto3.client("config")
config.put_evaluations(
Evaluations=[
{
# 通常はinputのリソースタイプ/リソースIDを使用する
"ComplianceResourceType":
configuration_item["resourceType"],
"ComplianceResourceId":
configuration_item["resourceId"],
# 以下2つが評価結果
# COMPLIANT, NON_COMPLIANT, NOT_APPLICABLE
"ComplianceType":
evaluation['ComplianceType'],
# 評価結果の文字列
"Annotation":
evaluation['Annotation'],
# 並べ替えに使うタイムスタンプ
# キャプチャ時のタイムスタンプを用いれば良い
"OrderingTimestamp":
configuration_item["configurationItemCaptureTime"]
},
],
ResultToken=result_token
)
VpcFlowLogCheckerFunctionPermission:
Type: AWS::Lambda::Permission
Properties:
Action: lambda:InvokeFunction
FunctionName: !Ref VpcFlowLogCheckerFunction
Principal: config.amazonaws.com
ConfigRuleForVpcFlowLogDestination:
Type: AWS::Config::ConfigRule
Properties:
ConfigRuleName: !Ref CustomConfigRuleName
Description: Ensure VPC Flow Logs are sent to the specified S3 bucket.
InputParameters:
ExpectedS3BucketName: !Ref ExpectedS3BucketName
MaximumExecutionFrequency: !Ref MaxExecutionFrequency
Scope:
ComplianceResourceTypes:
- AWS::EC2::VPC
Source:
Owner: CUSTOM_LAMBDA
SourceIdentifier: !GetAtt VpcFlowLogCheckerFunction.Arn
SourceDetails:
- EventSource: aws.config
MessageType: ConfigurationItemChangeNotification
ConfigRemediationRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: ssm.amazonaws.com
Action: sts:AssumeRole
Path: /
Policies:
- PolicyName: RemediateVpcFlowLogsPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- logs:CreateLogDelivery
- logs:DeleteLogDelivery
- logs:CreateLogGroup
- logs:PutResourcePolicy
- ec2:CreateFlowLogs
- ec2:DescribeFlowLogs
Resource: "*"
VpcFlowLogsRemediationConfiguration:
DependsOn: ConfigRuleForVpcFlowLogDestination
Type: AWS::Config::RemediationConfiguration
Properties:
ConfigRuleName: !Ref CustomConfigRuleName
Automatic: true
MaximumAutomaticAttempts: 5
RetryAttemptSeconds: 50
ResourceType: AWS::EC2::VPC
Parameters:
VPCIds:
ResourceValue:
Value: RESOURCE_ID
LogDestinationType:
StaticValue:
Values:
- s3
LogDestinationArn:
StaticValue:
Values:
- !Sub arn:aws:s3:::${ExpectedS3BucketName}
TrafficType:
StaticValue:
Values:
- !Ref TrafficType
AutomationAssumeRole:
StaticValue:
Values:
- !GetAtt ConfigRemediationRole.Arn
TargetId: AWS-EnableVPCFlowLogs
TargetType: SSM_DOCUMENT
TargetVersion: 1
Outputs:
ConfigRuleArn:
Description: ARN of the AWS Config Rule
Value: !GetAtt ConfigRuleForVpcFlowLogDestination.Arn
ConfigRemediationRoleArn:
Description: IAM Role used for auto-remediation
Value: !GetAtt ConfigRemediationRole.Arn
カスタムルール用Lambda関数
以下がソースコードです。
やっていることは至ってシンプルで、VPCに設定されているフローログの一覧を取得し、一覧の中に宛先が指定のS3バケットだったら 準拠 として登録、なければ 非準拠 として登録します。
今回は宛先で準拠/非準拠の判断をさせていますが、例えば、特定の名前がついているか、タグがついているか、ALLのフローログがあるかなど様々な判断基準でルールを作成することができます。
Lambdaソースコード
import logging
import json
import boto3
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def evaluation_resource(event):
invoking_event = json.loads(event['invokingEvent'])
vpc_id = invoking_event['configurationItem']['resourceId']
expected_bucket = json.loads(event['ruleParameters'])['ExpectedS3BucketName']
expected_arn_prefix = f"arn:aws:s3:::{expected_bucket}"
ec2 = boto3.client('ec2')
response = ec2.describe_flow_logs(Filters=[{'Name': 'resource-id', 'Values': [vpc_id]}])
for log in response.get('FlowLogs', []):
destination = log.get('LogDestination', '')
if destination.startswith(expected_arn_prefix):
return {
'ComplianceType': 'COMPLIANT',
'Annotation': 'VPC has flow logs configured to the expected S3 bucket'
}
logger.info(f"Checking flow logs for VPC: {vpc_id}")
logger.info(f"Expected S3 bucket: {expected_bucket}")
logger.info(f"Flow logs found: {response.get('FlowLogs', [])}")
return {
'ComplianceType': 'NON_COMPLIANT',
'Annotation': 'VPC has not flow logs configured to the expected S3 bucket'
}
def lambda_handler(event, context):
logger.info(f"Received event: {json.dumps(event)}")
evaluation = evaluation_resource(event)
invoking_event = json.loads(event["invokingEvent"])
configuration_item = invoking_event["configurationItem"]
result_token = "No token found."
if "resultToken" in event:
result_token = event["resultToken"]
# boto3を使用して AWS Config に結果を put
config = boto3.client("config")
config.put_evaluations(
Evaluations=[
{
# 通常はinputのリソースタイプ/リソースIDを使用する
"ComplianceResourceType":
configuration_item["resourceType"],
"ComplianceResourceId":
configuration_item["resourceId"],
# 以下2つが評価結果
# COMPLIANT, NON_COMPLIANT, NOT_APPLICABLE
"ComplianceType":
evaluation['ComplianceType'],
# 評価結果の文字列
"Annotation":
evaluation['Annotation'],
# 並べ替えに使うタイムスタンプ
# キャプチャ時のタイムスタンプを用いれば良い
"OrderingTimestamp":
configuration_item["configurationItemCaptureTime"]
},
],
ResultToken=result_token
)
Configルール
ConfigルールはScopeをAWS::EC2::VPC
に設定します。
Configの課金は意外と気づかずに上がりがちなので、Scopeは必要最低限にしましょう!
パラメータにExpectedS3BucketName
キーを作成し、宛先バケット名を設定しておきます。
修復アクションには、AWS が用意している SSM ドキュメントAWS-EnableVPCFlowLogs
を利用します。
検知まで若干時間がかかるかと思いますので、設定してから10分程度待ってフローログが作成されているかご確認ください。
さいごに
今回はVPCフローログの集約管理のための自動設定ソリューションを紹介しました。
必要に応じてアレンジして是非使ってみてください!
集約したら、ぜひ以下の記事を参考に自然言語での分析を試してみてください!
弊社では一緒に働く仲間を募集中です!
現在、様々な職種を募集しております。
カジュアル面談も可能ですので、ご連絡お待ちしております!
募集内容等詳細は、是非採用サイトをご確認ください。
https://engineer.po-holdings.co.jp/