ConfigやGuardDutyでセキュリティ通知をメールで飛ばすような仕組みを運用していたのですが、通知内容をちゃんと読まないと何が問題なのかわからず、調査に時間がかかるという課題がありました。
そんな中、技術書展で買った書籍の「Security Hubを最大限活用するためのポイント」という章で、LambdaとBedrockで検知内容を日本語で要約して、対応方法まで含めた通知を送信する仕組みが紹介されており、それを参考に実装してみることにしました。
実装
実装では主に以下の記事を参考にしつつ、独自の工夫を何点か加えました。
Security Hubの有効化
コードを書く前に、Security Hubを有効化します。
Security HubはConfig、GuardDuty、Trusted Advisorの検知内容をまとめて管理してくれて、EventBridgeのトリガーにすることもできます。
Slack API
Slack APIの管理画面から新しいAppを作成し、Incoming Webhooksを有効化してWebhook URLを取得します。取得したWebhook URLは、後ほどLambdaを作成する際に環境変数SLACK_WEBHOOK_URL
として登録します。
ファイル構成
以下がプロジェクトのファイル構成です。
- Dockerfile
- lambda_function.py
- requiments.txt
- README.md
コード全文
以下がLambdaのスクリプトで、Security Hubの検知結果をBedrockを使用して解釈して要約を生成します。その結果をSlack通知およびZenhubへのIssue作成として活用します。
import json
import os
import boto3
import requests
from botocore.exceptions import ClientError
SLACK_WEBHOOK_URL = os.environ['SLACK_WEBHOOK_URL']
BEDROCK_MODEL_ID = os.environ['BEDROCK_MODEL_ID']
INCLUDED_SEVERITIES = os.environ.get(
'INCLUDED_SEVERITIES', 'CRITICAL,HIGH,MEDIUM').split(',')
GITHUB_TOKEN = os.environ['GITHUB_TOKEN']
GITHUB_REPO = os.environ['GITHUB_REPO']
MAX_TOKENS = int(os.environ.get('MAX_TOKENS', 1000))
if not SLACK_WEBHOOK_URL or not BEDROCK_MODEL_ID or not GITHUB_TOKEN or not GITHUB_REPO:
raise ValueError("必要な環境変数が設定されていません。")
bedrock = boto3.client('bedrock-runtime', region_name='ap-northeast-1')
def lambda_handler(event, context):
print(f"Received event: {json.dumps(event)}")
findings = event.get('detail', {}).get('findings', [])
if not findings:
print("No findings to process.")
return {
'statusCode': 200,
'body': json.dumps('No findings in the event.')
}
results = []
for finding in findings:
interpreted_finding, severity = interpret_finding(finding)
# 環境変数で指定された重要度に含まれる場合のみ結果に追加
if severity in INCLUDED_SEVERITIES:
results.append(interpreted_finding)
if results:
combined_message = "\n\n".join(results)
issue_title = generate_title_from_summary(combined_message)
send_slack_notification(
issue_title,
combined_message
)
create_zenhub_ticket(
issue_title,
combined_message
)
return {
'statusCode': 200,
'body': json.dumps('Processing complete')
}
def interpret_finding(finding) -> tuple[str, str]:
system_message = (
"あなたはセキュリティ専門家です。Security Hub の検出結果を解釈し、日本語で開発者向けに簡潔な説明と推奨される対応方針を提示してください。"
)
human_message = (
f"以下のSecurity Hub Findingを解釈し、開発者向けに簡潔な説明と推奨される対応方針を提示してください:\n\n"
f"{json.dumps(finding, indent=2)}\n\n"
"回答は以下の形式で提供してください:\n"
"- 検出内容の要約:\n"
"- 重要度: [CRITICAL/HIGH/MEDIUM/LOW]\n"
"- 影響:\n"
"- 推奨される対応:\n"
)
try:
response = bedrock.invoke_model(
modelId=BEDROCK_MODEL_ID,
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": MAX_TOKENS,
"system": system_message,
"messages": [{"role": "user", "content": human_message}]
}),
contentType="application/json",
accept="application/json"
)
response_body = json.loads(response['body'].read())
print("Full Bedrock Response:", json.dumps(response_body, indent=2))
# contentフィールドからtextタイプを抽出
content_items = response_body.get("content", [])
extracted_texts = [
item.get("text", "") for item in content_items if item.get("type") == "text"
]
# 抽出されたテキストを結合
interpreted_text = "\n".join(extracted_texts).strip()
severity = "LOW" # デフォルトの重要度
if "重要度:" in interpreted_text:
for line in interpreted_text.splitlines():
if line.startswith("- 重要度:"):
severity = line.replace("- 重要度:", "").strip().split()[0]
break
print(
f"Bedrock interpretation: {interpreted_text}, Severity: {severity}")
return interpreted_text, severity
except ClientError as e:
print(f"Error calling Bedrock: {e}")
return "Bedrockの呼び出しに失敗しました。原文を確認してください。"
def generate_title_from_summary(summary: str) -> str:
lines = summary.splitlines()
for idx, line in enumerate(lines):
if line.startswith("- 検出内容の要約:"):
# 要約の次の行を取得し、最初の句点までを抽出
if idx + 1 < len(lines):
first_sentence = lines[idx + 1].split("。")[0]
return first_sentence.strip() if first_sentence else "🔐 Security Hubの検知結果"
return "🔐 Security Hubの検知結果"
def send_slack_notification(title: str, detail: str) -> None:
payload = {
'attachments': [
{
'color': '#36a64f',
'pretext': title,
'text': detail
}
]
}
try:
response = requests.post(SLACK_WEBHOOK_URL, data=json.dumps(payload))
except requests.exceptions.RequestException as e:
print(e)
else:
print(response.status_code)
def create_zenhub_ticket(title: str, description: str) -> None:
payload = {
"title": title,
"body": description
}
headers = {
"Authorization": f"token {GITHUB_TOKEN}",
"Content-Type": "application/json"
}
try:
url = f"https://api.github.com/repos/{GITHUB_REPO}/issues"
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
print(
f"GitHub Issue created successfully: {response.json().get('html_url')}")
except requests.exceptions.RequestException as e:
print(f"Error creating GitHub Issue: {e}")
boto3
requests
環境変数による設定とコスト調整
Bedrockのモデル(BEDROCK_MODEL_ID
)、出力トークンの最大サイズ(MAX_TOKENS
)、検知内容の重大度(INCLUDED_SEVERITIES
)を環境変数から設定することで、柔軟にコストを調整できるようにしました。
最初は以下の設定にしていたのですが、10 $/dayほどかかってしまってしまいました。
-
BEDROCK_MODEL_ID
:anthropic.claude-3-5-sonnet-20240620-v1:0
-
INCLUDED_SEVERITIES
:CRITICAL,HIGH,MEDIUM
-
MAX_TOKENS
:1000
以下の設定に変えたら、1.5 $/dayほどに下がりました。
-
BEDROCK_MODEL_ID
:anthropic.claude-3-haiku-20240307-v1:0
-
INCLUDED_SEVERITIES
:CRITICAL,HIGH
-
MAX_TOKENS
:500
Zenhubへのチケット起票
create_zenhub_ticket
で、Security Hubの検知結果をもとにGitHub Issueを作成します。
Issue作成のためにrepo
にチェックをつけて、Personal Access Tokenを発行します。Lambda作成時に、GITHUB_TOKEN
として環境変数を登録します。GITHUB_TOKEN
に発行したトークン、GITHUB_REPO
に<organization>/<repository>
形式でリポジトリ名を設定します。
Lambdaのデプロイ
外部ライブラリを使用するので、DockerでLambdaをデプロイします。
以下がDockerfile
です。
FROM public.ecr.aws/lambda/python:3.13
WORKDIR /var/task
COPY requirements.txt ./
COPY lambda_function.py ./
RUN pip install -r requirements.txt
CMD ["lambda_function.lambda_handler"]
ECRにリポジトリを作成して、以下の手順でビルド&プッシュします。
docker buildx build --platform linux/amd64 -f Dockerfile -t <account-id>.dkr.ecr.ap-northeast-1.amazonaws.com/securityhub-bedrock-slack:latest .
aws ecr get-login-password --region ap-northeast-1 | docker login --username AWS --password-stdin <account-id>.dkr.ecr.ap-northeast-1.amazonaws.com
docker push <account-id>.dkr.ecr.ap-northeast-1.amazonaws.com/securityhub-bedrock-slack
Lambda作成時にプッシュしたコンテナイメージを選択します。
Lambda作成後、タイムアウト時間を1分に伸ばします(デフォルトの3秒だとタイムアウトするため)。
EventBridge
EventBridgeの新しいルールを作成します。
イベントタイプにはSecurity Hub Findings - Imported
を選択します。
ターゲットには作成したLambdaを登録します。