0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Google Cloud Security Command Centerのリスク検知をSlackへ連携する機能をTerraformで実装してみた

Last updated at Posted at 2025-09-26

こんにちは、博報堂テクノロジーズの木村です。

Google Cloud を利用した開発を実施するうえで、セキュリティに関する設定不備や脆弱性の検知には Security Command Center(以下SCC)を利用することになるかと思います。

SCCでは検知したセキュリティリスクを通知する仕組みがあり、リスクの検知時には通知をリアルタイムに受け取って対応をすることが可能です。

しかし、SCCはCloud Monitoringの通知機能とは異なり、通知先として指定できるサービスが Pub/Sub のみであり、それ以外のSlackなどのチャットツールに直接通知を送付する仕組みがネイティブでは実装されていません。このため、Cloud Run functionsなどを利用して通知を転送して送付する仕組みを自分で実装する必要がありました。

この記事では備忘録を兼ねて、SCCでの検知をSlackに通知する機能をTerraformにて実装した方法について記載します。

実装

以下では通知に関する実装について順を追って説明します。

なお、この記事ではSCC自体の詳細な設定については記載しないので、SCC自体の有効化や不要なアラートをミュートする設定はあらかじめ実施されている前提とします。

Pub/Sub 構築

まず、SCCから通知を送付し、Cloud Run functionsにて通知を受け取るための、Pub/Subを構築します。

resource "google_pubsub_topic" "default" {
  name       = "scc-notifications"
}

SCC 通知設定

SCCの通知設定を実施します。

pubsub_topic に先ほどのPub/SubのIDを指定します。また、必要に応じて通知条件を streaming_config に記載します。

resource "google_scc_v2_project_notification_config" "default" {
  config_id    = "scc-notifications"
  pubsub_topic = google_pubsub_topic.default.id

  streaming_config {
    filter = "state=\"ACTIVE\" AND NOT mute=\"MUTED\""
  }

  depends_on = [
    google_pubsub_topic.default
  ]
}

Cloud Storage 構築

Cloud Run functionsの関数を配置するCloud Storageを構築したうえで、ローカルのディレクトリ(ここではfunctionという名前のディレクトリ)にあるCloud Run functionsのコードをCloud Storageに配置します。

resource "google_storage_bucket" "default" {
  name     = "scc-function-bucket"
  location = "asia-northeast1"

  uniform_bucket_level_access = true
  public_access_prevention    = "enforced"
}

# ソースコードの変更検知のため、ソースコードのハッシュ計算を実施
data "local_file" "function_file" {
  filename = "./function/main.py"
}
locals {
  function_file_hash = sha256(data.local_file.function_file.content)
}

# ソースコードのzipファイル生成
data "archive_file" "function_source" {
  type        = "zip"
  source_dir  = "./function"
  output_path = "./scc-function-${local.function_file_hash}.zip"
}

resource "google_storage_bucket_object" "function_source" {
  name   = "scc-function.zip"
  bucket = google_storage_bucket.default.name
  source = data.archive_file.function_source.output_path
}

Cloud Run functions 構築

Cloud Run functionsを構築します。今回はPythonランタイムを利用します。

pubsub_topic に 先ほど作成したPub/Subを、 storage_source に先ほど作成したCloud Storageを指定します。

また、Cloud Run functionsではSecret Managerに格納したSlackのWebhook URLを読み取る必要があるので、Secret Managerへのアクセス権限を付与したService Accountを構築し、Cloud Run functionsに設定します。

# サービスアカウントと権限設定
resource "google_service_account" "default" {
  account_id   = "scc-function-sa"
  display_name = "Security Notification Function Service Account"
  project      = var.project_id
}
resource "google_project_iam_member" "function_secret_accessor" {
  project = var.project_id
  role    = "roles/secretmanager.secretAccessor"
  member  = "serviceAccount:${google_service_account.default.email}"
}

# Cloud Run functions
resource "google_cloudfunctions2_function" "default" {
  name     = "scc-notification"
  location = var.location

  build_config {
    runtime     = "python312"
    entry_point = "main"
    source {
      storage_source {
        bucket = google_storage_bucket.default.name
        object = google_storage_bucket_object.function_source.name
      }
    }
  }

  event_trigger {
    event_type   = "google.cloud.pubsub.topic.v1.messagePublished"
    retry_policy = "RETRY_POLICY_DO_NOT_RETRY"
    pubsub_topic = google_pubsub_topic.default.id
  }

  service_config {
    ingress_settings               = "ALLOW_INTERNAL_ONLY"
    all_traffic_on_latest_revision = true
    service_account_email          = google_service_account.default.email
    environment_variables = {
      PROJECT_ID                       = var.project_id
      SLACK_WEBHOOK_URL_SECRET_NAME = "SLACK_WEBHOOK_URL"
    }
  }

  depends_on = [
    google_storage_bucket_object.function_source
  ]
}

Cloud Run functions のコード実装

Cloud Run functionsのコードを以下の通り実装します。

  • main.py
import base64
import json
import os

import functions_framework
import requests
from google.cloud import secretmanager

# 環境変数からSlackのWebhook URLを取得
PROJECT_ID = os.environ.get("PROJECT_ID")
SLACK_WEBHOOK_URL_SECRET_NAME = os.environ.get("SLACK_WEBHOOK_URL_SECRET_NAME")

@functions_framework.cloud_event
def main(cloud_event):
    """
    メイン関数
    """
    try:
        # Get Slack webhook URL from Secret Manager
        client = secretmanager.SecretManagerServiceClient()
        name = f"projects/{PROJECT_ID}/secrets/{SLACK_WEBHOOK_URL_SECRET_NAME}/versions/latest"
        response = client.access_secret_version(name=name)
        webhook_url = response.payload.data.decode("UTF-8")

        # Decode the PubSub message
        if cloud_event.data and 'message' in cloud_event.data:
            pubsub_message_data_row = base64.b64decode(cloud_event.data['message']['data']).decode('utf-8')
            pubsub_message_data = json.loads(pubsub_message_data_row)
        else:
            print("No data in PubSub message")
            return

        # メッセージの生成
        slack_message_data = create_slack_message(pubsub_message_data)

        # メッセージ送信
        send_to_slack(webhook_url, slack_message_data)

        # ログ出力
        finding = pubsub_message_data.get('finding', {})
        print(f"Successfully sent Security Command Center finding to Slack: {finding.get('name', 'Unknown')}")

    except Exception as e:
        print(f"Error processing Security Command Center finding: {str(e)}")
        raise

def create_slack_message(message_data):
    """
    Slackに送信するメッセージの生成
    """

    # 検知メッセージ・検知リソースに関するデータの抽出
    finding = message_data.get('finding', {})
    resource = message_data.get('resource', {})

    # メッセージ
    message = {
        "resourceName": finding.get('resourceName', 'UNKNOWN'),
        "state": finding.get('state', 'UNKNOWN'),
        "category": finding.get('category', 'UNKNOWN'),
        "severity": finding.get('severity', 'UNKNOWN'),
        "description": finding.get('description', 'UNKNOWN'),
        "projectDisplayName": resource.get('projectDisplayName', 'UNKNOWN'),
    }
    return message

def send_to_slack(webhook_url, slack_message_data):
    """
    Send message to Slack using webhook
    """
    headers = {
        'Content-Type': 'application/json'
    }

    response = requests.post(webhook_url, json=slack_message_data, headers=headers)

    if response.status_code != 200:
        raise Exception(f"Failed to send message to Slack: {response.status_code} - {response.text}")

  • requirements.txt
functions-framework==3.*
google-cloud-secret-manager==2.16.4
requests==2.31.0

Slack Webhookの設定

今回は通知にSlack WorkflowのWebhook機能を利用したので、設定方法について説明します。

まず通知を実施したいチャネルのメニューからワークフローを追加し、ワークフローのトリガーとしてWebhookを設定します。

Webhookのデータ変数として以下を設定します。

{
    "description": "テキストの例",
    "resourceName": "テキストの例",
    "state": "テキストの例",
    "category": "テキストの例",
    "projectDisplayName": "テキストの例",
    "severity": "テキストの例"
}

次にチャンネルへメッセージを送信するを設定し、通知対象のチャンネルを選択します。また、メッセージとして以下を設定します。

Security Command Centerにて以下のセキュリティイベントを検知しました。状況を確認してください。

プロジェクト: {{{{inputs.projectDisplayName}}}}
ステータス: {{{{inputs.state}}}}
重要度: {{{{inputs.severity}}}}
カテゴリ: {{{{inputs.category}}}}
リソース名: {{{{inputs.resourceName}}}}
詳細: {{{{inputs.description}}}}

WebhookのURLを控えた上でワークフローを公開します。

なお、通知Webhookについては、Incomming Webhookに差し替えて対応することも可能です。差し替える場合は Cloud Run functionsのPythonコードの create_slack_message の部分のメッセージ内容を調整してください。

Secret Managerの設定

先ほど控えてたSlack WebhookのURLを SLACK_WEBHOOK_URL_SECRET_NAME という名前で登録します。

結果

以下の通り、意図的に設定を変更してSCCに検知させたところ、Slackへの通知を送付することができました。

スクリーンショット 2025-09-25 11.01.10.png

まとめ

今回はSCCでの検知をSlackに通知する方法について説明しました。ネイティブでSlack通知する機能がないのか…となった皆様の参考になれれば幸いです。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?