こんにちは、博報堂テクノロジーズの木村です。
Google Cloud を利用した開発を実施するうえで、セキュリティに関する設定不備や脆弱性の検知には Security Command Center(以下SCC)を利用することになるかと思います。
SCCでは検知したセキュリティリスクを通知する仕組みがあり、リスクの検知時には通知をリアルタイムに受け取って対応をすることが可能です。
しかし、SCCはCloud Monitoringの通知機能とは異なり、通知先として指定できるサービスが Pub/Sub のみであり、それ以外のSlackなどのチャットツールに直接通知を送付する仕組みがネイティブでは実装されていません。このため、Cloud Run functionsなどを利用して通知を転送して送付する仕組みを自分で実装する必要がありました。
この記事では備忘録を兼ねて、SCCでの検知をSlackに通知する機能をTerraformにて実装した方法について記載します。
実装
以下では通知に関する実装について順を追って説明します。
なお、この記事ではSCC自体の詳細な設定については記載しないので、SCC自体の有効化や不要なアラートをミュートする設定はあらかじめ実施されている前提とします。
Pub/Sub 構築
まず、SCCから通知を送付し、Cloud Run functionsにて通知を受け取るための、Pub/Subを構築します。
resource "google_pubsub_topic" "default" {
name = "scc-notifications"
}
SCC 通知設定
SCCの通知設定を実施します。
pubsub_topic
に先ほどのPub/SubのIDを指定します。また、必要に応じて通知条件を streaming_config
に記載します。
resource "google_scc_v2_project_notification_config" "default" {
config_id = "scc-notifications"
pubsub_topic = google_pubsub_topic.default.id
streaming_config {
filter = "state=\"ACTIVE\" AND NOT mute=\"MUTED\""
}
depends_on = [
google_pubsub_topic.default
]
}
Cloud Storage 構築
Cloud Run functionsの関数を配置するCloud Storageを構築したうえで、ローカルのディレクトリ(ここではfunction
という名前のディレクトリ)にあるCloud Run functionsのコードをCloud Storageに配置します。
resource "google_storage_bucket" "default" {
name = "scc-function-bucket"
location = "asia-northeast1"
uniform_bucket_level_access = true
public_access_prevention = "enforced"
}
# ソースコードの変更検知のため、ソースコードのハッシュ計算を実施
data "local_file" "function_file" {
filename = "./function/main.py"
}
locals {
function_file_hash = sha256(data.local_file.function_file.content)
}
# ソースコードのzipファイル生成
data "archive_file" "function_source" {
type = "zip"
source_dir = "./function"
output_path = "./scc-function-${local.function_file_hash}.zip"
}
resource "google_storage_bucket_object" "function_source" {
name = "scc-function.zip"
bucket = google_storage_bucket.default.name
source = data.archive_file.function_source.output_path
}
Cloud Run functions 構築
Cloud Run functionsを構築します。今回はPythonランタイムを利用します。
pubsub_topic
に 先ほど作成したPub/Subを、 storage_source
に先ほど作成したCloud Storageを指定します。
また、Cloud Run functionsではSecret Managerに格納したSlackのWebhook URLを読み取る必要があるので、Secret Managerへのアクセス権限を付与したService Accountを構築し、Cloud Run functionsに設定します。
# サービスアカウントと権限設定
resource "google_service_account" "default" {
account_id = "scc-function-sa"
display_name = "Security Notification Function Service Account"
project = var.project_id
}
resource "google_project_iam_member" "function_secret_accessor" {
project = var.project_id
role = "roles/secretmanager.secretAccessor"
member = "serviceAccount:${google_service_account.default.email}"
}
# Cloud Run functions
resource "google_cloudfunctions2_function" "default" {
name = "scc-notification"
location = var.location
build_config {
runtime = "python312"
entry_point = "main"
source {
storage_source {
bucket = google_storage_bucket.default.name
object = google_storage_bucket_object.function_source.name
}
}
}
event_trigger {
event_type = "google.cloud.pubsub.topic.v1.messagePublished"
retry_policy = "RETRY_POLICY_DO_NOT_RETRY"
pubsub_topic = google_pubsub_topic.default.id
}
service_config {
ingress_settings = "ALLOW_INTERNAL_ONLY"
all_traffic_on_latest_revision = true
service_account_email = google_service_account.default.email
environment_variables = {
PROJECT_ID = var.project_id
SLACK_WEBHOOK_URL_SECRET_NAME = "SLACK_WEBHOOK_URL"
}
}
depends_on = [
google_storage_bucket_object.function_source
]
}
Cloud Run functions のコード実装
Cloud Run functionsのコードを以下の通り実装します。
- main.py
import base64
import json
import os
import functions_framework
import requests
from google.cloud import secretmanager
# 環境変数からSlackのWebhook URLを取得
PROJECT_ID = os.environ.get("PROJECT_ID")
SLACK_WEBHOOK_URL_SECRET_NAME = os.environ.get("SLACK_WEBHOOK_URL_SECRET_NAME")
@functions_framework.cloud_event
def main(cloud_event):
"""
メイン関数
"""
try:
# Get Slack webhook URL from Secret Manager
client = secretmanager.SecretManagerServiceClient()
name = f"projects/{PROJECT_ID}/secrets/{SLACK_WEBHOOK_URL_SECRET_NAME}/versions/latest"
response = client.access_secret_version(name=name)
webhook_url = response.payload.data.decode("UTF-8")
# Decode the PubSub message
if cloud_event.data and 'message' in cloud_event.data:
pubsub_message_data_row = base64.b64decode(cloud_event.data['message']['data']).decode('utf-8')
pubsub_message_data = json.loads(pubsub_message_data_row)
else:
print("No data in PubSub message")
return
# メッセージの生成
slack_message_data = create_slack_message(pubsub_message_data)
# メッセージ送信
send_to_slack(webhook_url, slack_message_data)
# ログ出力
finding = pubsub_message_data.get('finding', {})
print(f"Successfully sent Security Command Center finding to Slack: {finding.get('name', 'Unknown')}")
except Exception as e:
print(f"Error processing Security Command Center finding: {str(e)}")
raise
def create_slack_message(message_data):
"""
Slackに送信するメッセージの生成
"""
# 検知メッセージ・検知リソースに関するデータの抽出
finding = message_data.get('finding', {})
resource = message_data.get('resource', {})
# メッセージ
message = {
"resourceName": finding.get('resourceName', 'UNKNOWN'),
"state": finding.get('state', 'UNKNOWN'),
"category": finding.get('category', 'UNKNOWN'),
"severity": finding.get('severity', 'UNKNOWN'),
"description": finding.get('description', 'UNKNOWN'),
"projectDisplayName": resource.get('projectDisplayName', 'UNKNOWN'),
}
return message
def send_to_slack(webhook_url, slack_message_data):
"""
Send message to Slack using webhook
"""
headers = {
'Content-Type': 'application/json'
}
response = requests.post(webhook_url, json=slack_message_data, headers=headers)
if response.status_code != 200:
raise Exception(f"Failed to send message to Slack: {response.status_code} - {response.text}")
- requirements.txt
functions-framework==3.*
google-cloud-secret-manager==2.16.4
requests==2.31.0
Slack Webhookの設定
今回は通知にSlack WorkflowのWebhook機能を利用したので、設定方法について説明します。
まず通知を実施したいチャネルのメニューからワークフローを追加し、ワークフローのトリガーとしてWebhookを設定します。
Webhookのデータ変数として以下を設定します。
{
"description": "テキストの例",
"resourceName": "テキストの例",
"state": "テキストの例",
"category": "テキストの例",
"projectDisplayName": "テキストの例",
"severity": "テキストの例"
}
次にチャンネルへメッセージを送信するを設定し、通知対象のチャンネルを選択します。また、メッセージとして以下を設定します。
Security Command Centerにて以下のセキュリティイベントを検知しました。状況を確認してください。
プロジェクト: {{{{inputs.projectDisplayName}}}}
ステータス: {{{{inputs.state}}}}
重要度: {{{{inputs.severity}}}}
カテゴリ: {{{{inputs.category}}}}
リソース名: {{{{inputs.resourceName}}}}
詳細: {{{{inputs.description}}}}
WebhookのURLを控えた上でワークフローを公開します。
なお、通知Webhookについては、Incomming Webhookに差し替えて対応することも可能です。差し替える場合は Cloud Run functionsのPythonコードの create_slack_message
の部分のメッセージ内容を調整してください。
Secret Managerの設定
先ほど控えてたSlack WebhookのURLを SLACK_WEBHOOK_URL_SECRET_NAME
という名前で登録します。
結果
以下の通り、意図的に設定を変更してSCCに検知させたところ、Slackへの通知を送付することができました。
まとめ
今回はSCCでの検知をSlackに通知する方法について説明しました。ネイティブでSlack通知する機能がないのか…となった皆様の参考になれれば幸いです。