内容
Security Hub から SLACK に通知させるときは、Amazon Q Developer(旧 AWS Chatbot)を通じて、以下のような形式でいい感じにで通知してくれます。
しかし、メールに通知させたい場合は、EventBridge の入力トランスフォーマーや Lambda を独自で開発して上手く動作させる必要があります。以下参考。
ただし、イベントの内容を人間が読みやすい形式に変更したのみであり、具体的にどんな内容か、何をしたら良いか分かりにくものです。
そこで、Security Hub からのアラートを Bedrock で要約してメール通知させるイベント駆動型の実装をしてみることにしました。
構成
Security Hub → EventBridge → Lambda → Bedrock → SNS となります。
構成図は、AWS Diagram MCP Serverによって、作成されています。
全体システム構成図
データフロー図
技術アーキテクチャ詳細図
実装
インフラ部分
インフラ部分は、AWS CDK で実装していきます。
lib/securityhub-bedrock-alert-stack.ts
import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as sns from 'aws-cdk-lib/aws-sns';
import * as events from 'aws-cdk-lib/aws-events';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as path from 'path';
export class SecurityhubBedrockAlertStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// SNSトピック作成
const alertTopic = new sns.Topic(this, 'SecurityHubAlertTopic', {
topicName: 'security-hub-bedrock-alerts',
displayName: 'Security Hub Bedrock Alerts'
});
// Lambda IAMロール作成
const lambdaRole = new iam.Role(this, 'SecurityHubLambdaRole', {
assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com'),
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSLambdaBasicExecutionRole')
]
});
// Bedrock権限を追加
const model = "anthropic.claude-sonnet-4-20250514-v1:0";
lambdaRole.addToPolicy(new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['bedrock:InvokeModel'],
resources: [
`arn:aws:bedrock:*:${this.account}:inference-profile/apac.${model}`,
`arn:aws:bedrock:*::foundation-model/${model}`
]
}));
// SNS権限を追加
lambdaRole.addToPolicy(new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['sns:Publish'],
resources: [alertTopic.topicArn]
}));
// Lambda関数作成
const securityHubProcessor = new lambda.Function(this, 'SecurityHubProcessor', {
runtime: lambda.Runtime.PYTHON_3_11,
handler: 'handler.lambda_handler',
code: lambda.Code.fromAsset(path.join(__dirname, '../lambda')),
role: lambdaRole,
timeout: cdk.Duration.minutes(5),
memorySize: 256,
environment: {
SNS_TOPIC_ARN: alertTopic.topicArn
}
});
// EventBridge IAMロール作成
const eventBridgeRole = new iam.Role(this, 'EventBridgeExecutionRole', {
assumedBy: new iam.ServicePrincipal('events.amazonaws.com'),
description: 'IAM role for EventBridge to invoke Lambda function'
});
// Lambda実行権限を追加
eventBridgeRole.addToPolicy(new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['lambda:InvokeFunction'],
resources: [securityHubProcessor.functionArn]
}));
// EventBridgeルール作成(Security Hubイベント監視)
const securityHubRule = new events.CfnRule(this, 'SecurityHubRule', {
eventPattern: {
source: ['aws.securityhub'],
'detail-type': ['Security Hub Findings - Imported'],
detail: {
findings: {
Compliance: {
Status: ['FAILED', 'WARNING']
},
RecordState: ['ACTIVE'],
Severity: {
Label: ['CRITICAL', 'HIGH', 'MEDIUM']
},
Workflow: {
Status: ['NEW']
}
}
}
},
description: 'Security Hub findings (CRITICAL/HIGH/MEDIUM, NEW, FAILED/WARNING) to Lambda',
state: 'ENABLED',
targets: [{
id: '1',
arn: securityHubProcessor.functionArn,
roleArn: eventBridgeRole.roleArn
}]
});
// 出力設定
new cdk.CfnOutput(this, 'SNSTopicArn', {
value: alertTopic.topicArn,
description: 'SNS Topic ARN for Security Hub alerts'
});
new cdk.CfnOutput(this, 'LambdaFunctionName', {
value: securityHubProcessor.functionName,
description: 'Lambda function name for processing Security Hub events'
});
new cdk.CfnOutput(this, 'EventBridgeRuleName', {
value: securityHubRule.ref,
description: 'EventBridge rule name for Security Hub events'
});
new cdk.CfnOutput(this, 'EventBridgeRoleArn', {
value: eventBridgeRole.roleArn,
description: 'EventBridge execution role ARN'
});
}
}
※ ちなみに今回の構成ではあまり意味はないですが、Amazon EventBridge Rule の IAM ロールで Lambda を呼び出すようにしています。詳しくは以下の記事で。
bin/securityhub-bedrock-alert.ts
#!/usr/bin/env node
import * as cdk from 'aws-cdk-lib';
import { SecurityhubBedrockAlertStack } from '../lib/securityhub-bedrock-alert-stack';
const app = new cdk.App();
new SecurityhubBedrockAlertStack(app, 'SecurityhubBedrockAlertStack', {
env: {
account: process.env.CDK_DEFAULT_ACCOUNT,
region: 'ap-northeast-1'
},
});
特別、Construct などを利用するまででもないので今回は 1 スタックでシンプルに構成しています。
Lambda 関数コード部分
lambda/handler.py
import json
import boto3
import os
import logging
from typing import Dict, Any
# ログ設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# AWSクライアント初期化
bedrock_runtime = boto3.client('bedrock-runtime')
sns = boto3.client('sns')
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""Security HubイベントをBedrockで要約してSNS通知"""
try:
logger.info(f"Received event: {json.dumps(event)}")
detail = event.get('detail', {})
if not detail:
logger.warning("No detail found in event")
return {'statusCode': 400, 'body': 'No detail found in event'}
findings = detail.get('findings', [])
if not findings:
logger.warning("No findings found in event detail")
return {'statusCode': 400, 'body': 'No findings found in event detail'}
# 各findingを処理
for finding in findings:
try:
# 基本情報を取得
finding_id = finding.get('Id', 'unknown')
finding_title = finding.get('Title', 'Unknown Title')
control_id = (finding.get('ProductFields', {}).get('ControlId') or
finding.get('Compliance', {}).get('SecurityControlId') or
'Unknown')
logger.info(f"Processing finding: {finding_id} - {finding_title} (Control: {control_id})")
# Bedrockで要約生成
summary = generate_summary_with_bedrock(finding)
logger.info(f"Summary generated for finding: {finding_id}")
# SNSに送信
send_to_sns(summary, finding)
except Exception as e:
logger.error(f"Error processing finding {finding.get('Id', 'unknown')}: {str(e)}")
continue
return {
'statusCode': 200,
'body': json.dumps({'message': f'Successfully processed {len(findings)} findings'})
}
except Exception as e:
logger.error(f"Lambda execution error: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def generate_summary_with_bedrock(finding: Dict[str, Any]) -> str:
"""BedrockでSecurity Hubのfindingを日本語要約"""
# 基本情報を抽出
title = finding.get('Title', '')
description = finding.get('Description', '')
severity = finding.get('Severity', {}).get('Label', '')
generator_id = finding.get('GeneratorId', '')
# コントロールIDを取得
product_fields = finding.get('ProductFields', {})
compliance = finding.get('Compliance', {})
control_id = (product_fields.get('ControlId') or
compliance.get('SecurityControlId') or
'')
# リソース情報を取得(最大3つ)
resources = finding.get('Resources', [])
resource_summary = []
for resource in resources[:3]:
resource_type = resource.get('Type', '')
resource_id = resource.get('Id', '')
resource_summary.append(f"{resource_type}: {resource_id}")
resources_text = ', '.join(resource_summary) if resource_summary else '不明'
# Bedrockプロンプト作成
prompt = f"""
AWS Security Hubで以下のセキュリティ検出が報告されました。日本語で専門的な分析と要約を提供してください。
【検出情報】
- タイトル: {title}
- 説明: {description}
- 重要度: {severity}
- 制御ID: {generator_id}
- セキュリティコントロールID: {control_id}
- 影響リソース: {resources_text}
【重要な指示】
1. 日本語で回答してください
2. セキュリティコントロールID「{control_id}」に対応するAWS Security Hub公式ドキュメントの内容を参照してください
3. 具体的で実行可能な修復手順を提供してください
4. 技術的内容を分かりやすく説明してください
【必須回答形式】
- 検出内容の要約: (コントロールの目的と技術的内容を日本語で分かりやすく説明)
- 詳細説明: (技術的な詳細を日本語で分かりやすく説明)
- 影響: (このコントロール違反によるリスクと影響範囲を日本語で分かりやすく説明)
- 対応の優先度: [Immediate/High/Medium/Low](英語のままで)
- 推奨される対応: (AWS公式ドキュメントに基づく具体的な修復手順を日本語で分かりやすく説明)
- 参照リンク: ({control_id}に対応するAWS Security Hub公式ドキュメントの正確なURL)
"""
try:
model_id = "apac.anthropic.claude-sonnet-4-20250514-v1:0"
body = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 2000,
"temperature": 0.1,
"system": "あなたはAWS Securityの専門家です。日本語で回答してください。",
"messages": [{"role": "user", "content": prompt}]
}
logger.info(f"Calling Bedrock with model: {model_id}")
response = bedrock_runtime.invoke_model(
modelId=model_id,
body=json.dumps(body),
contentType="application/json"
)
response_body = json.loads(response['body'].read())
summary = response_body['content'][0]['text']
logger.info("Successfully generated summary with Bedrock")
return summary
except Exception as e:
logger.error(f"Error generating summary with Bedrock: {str(e)}")
logger.error(f"Model ID used: {model_id}")
raise Exception(f"Bedrock summary generation failed: {str(e)}")
def send_to_sns(summary: str, finding: Dict[str, Any]) -> None:
"""SNSにSecurity Hub通知を送信"""
try:
topic_arn = os.environ['SNS_TOPIC_ARN']
# 基本情報を取得
finding_id = finding.get('Id', 'Unknown')
title = finding.get('Title', 'Security Finding')
account_id = finding.get('AwsAccountId', 'Unknown')
region = finding.get('Region', 'Unknown')
created_at = finding.get('CreatedAt', 'Unknown')
updated_at = finding.get('UpdatedAt', 'Unknown')
severity = finding.get('Severity', {}).get('Label', 'UNKNOWN')
workflow_state = finding.get('WorkflowState', 'Unknown')
record_state = finding.get('RecordState', 'Unknown')
# リソース情報を取得
resources = finding.get('Resources', [])
resource_info = []
for resource in resources:
resource_id = resource.get('Id', 'Unknown')
resource_type = resource.get('Type', 'Unknown')
resource_info.append(f" - {resource_type}: {resource_id}")
resources_text = '\n'.join(resource_info) if resource_info else ' - 不明'
# コントロールIDを取得
product_fields = finding.get('ProductFields', {})
compliance = finding.get('Compliance', {})
control_id = (product_fields.get('ControlId') or
compliance.get('SecurityControlId') or
'')
# SNSメッセージを作成
# Subject制限: 80文字以内、改行不可
raw_subject = f"Security Hub Alert [{severity}]: {title}"
subject = raw_subject.replace('\n', ' ').replace('\r', ' ').strip()
if len(subject) > 80:
subject = subject[:77] + "..."
# Security Hubコンソールリンクを生成
finding_links = generate_security_hub_links(region, finding_id, control_id, title, severity)
message = f"""🚨 Security Hub Finding Alert
=== 基本情報 ===
Finding ID: {finding_id}
タイトル: {title}
重要度: {severity}
Account ID: {account_id}
リージョン: {region}
作成日時: {created_at}
更新日時: {updated_at}
ワークフロー状態: {workflow_state}
記録状態: {record_state}
影響を受けるリソース: {resources_text}
=== AI要約分析 ===
{summary}
=== 詳細情報 ===
詳細な調査が必要な場合は、以下のリンクからAWS Security Hubコンソールで確認してください:
{finding_links}
---
このアラートは Security Hub Bedrock Alert System により自動生成されました。
"""
# SNS送信
response = sns.publish(
TopicArn=topic_arn,
Subject=subject,
Message=message
)
logger.info(f"Successfully sent message to SNS. MessageId: {response.get('MessageId')}")
except Exception as e:
logger.error(f"Error sending message to SNS: {str(e)}")
raise
def generate_security_hub_links(region: str, finding_id: str, control_id: str, title: str, severity: str) -> str:
"""
Security Hubコンソールへの直接リンクを生成
Finding IDを使用した検索URLで特定のFindingに直接アクセス
"""
if finding_id and finding_id != 'Unknown':
# 理想の形式:Finding IDを使用した検索URL
import urllib.parse
# コロンを%253A、スラッシュを%252Fでエンコード
encoded_finding_id = finding_id.replace(':', '%253A').replace('/', '%252F')
search_url = f"https://{region}.console.aws.amazon.com/securityhub/home?region={region}#/findings?search=Id%3D%255Coperator%255C%253AEQUALS%255C%253A{encoded_finding_id}"
return search_url
else:
# フォールバック:基本のSecurity Hubコンソール
return f"https://console.aws.amazon.com/securityhub/home?region={region}#/findings"
AI要約生成機能
概要: Amazon Bedrock を使用してセキュリティ finding を日本語で要約
要約内容:
- 検出内容の要約
- 詳細説明
- 影響
- 対応の優先度(AIの観点から)
- 推奨対される対応
- 参照リンク
プロンプト設計:
- 日本語での回答
- Security Hub 公式ドキュメント参照
- 具体的な修復手順を提供
- 技術的内容の説明
通知配信機能
概要: 要約されたアラート情報を SNS 経由で配信
メッセージ構成:
🚨 Security Hub Finding Alert
=== 基本情報 ===
Finding ID: [finding-id]
タイトル: [title]
重要度: [severity]
Account ID: [account-id]
リージョン: [region]
作成日時: [created-at]
更新日時: [updated-at]
ワークフロー状態: [workflow-state]
記録状態: [record-state]
影響を受けるリソース:
- [resource-type]: [resource-id]
=== AI要約分析 ===
[bedrock-generated-summary]
=== 詳細情報 ===
詳細な調査が必要な場合は、以下のリンクからAWS Security Hubコンソールで確認してください:
[security-hub-console-link]
---
このアラートは Security Hub Bedrock Alert System により自動生成されました。
リンク生成
概要: Security Hub コンソールへの直接リンクを生成
リンク形式:
- Finding ID による検索 URL(優先)
- フォールバック: 基本の Security Hub コンソール