1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【最小構成】BigQuery × Cloud Functions(第2世代)× OIDC(STS) で AWS IAM を収集して保存

Posted at

1. 目的

マルチクラウド環境では、各クラウドに散らばるアカウントやリソースを一元的に管理・監査するニーズがある。
特にAWSのIAMユーザーやクレデンシャルはセキュリティ上の重要な監視対象だが、AWS内だけで完結していると他システムとの統合が難しい場面もある。

そこで今回は、BigQuery の Remote Function から Cloud Functions(第2世代)を呼び出し、Google Cloud のサービスアカウント OIDC で AWS にフェデレーションして STS(AssumeRoleWithWebIdentity)で一時クレデンシャルを取得、その資格情報で IAM の ListUsers を実行し、結果を BigQuery テーブルに保存する仕組みを実装してみた。

これにより、BigQueryからSQLで直接AWSアカウント情報を参照でき、クロスクラウドな監査やレポート作成が容易にできる。


2. 構成概要

全体像は以下のとおり。

  • BigQuery Remote Function : SQLからCloud Runを呼び出すエンドポイント
  • Cloud Functions(第2世代) : AWS SDKを呼び出す関数(Pythonで実装)
  • AWS STS AssumeRoleWithWebIdentity : GoogleのサービスアカウントOIDCを使ってAWSロールに一時的にスイッチ
  • IAM API : IAMユーザー一覧を返却

3. 設定手順

(1) Google Cloud サービスアカウントを作成

  • Cloud Runを実行するサービスアカウントを作成
  • このアカウントは AWS側で信頼されるID かつCloud Functions(第2世代)の実行用サービスアカウントとして利用する

(2) AWS ロールを作成

  • STS:AssumeRoleWithWebIdentity を許可したIAMロールを作成
  • 信頼関係ポリシーにOIDCプロバイダを設定( accounts.google.com
  • Conditionにaccounts.google.com:(aud|oaud|sub)を追加し、それぞれ値を設定する
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Federated": "accounts.google.com"
            },
            "Action": "sts:AssumeRoleWithWebIdentity",
            "Condition": {
                "StringEquals": {
                  "accounts.google.com:aud":  "<SAのOAuth2クライアントID>",
                  "accounts.google.com:oaud": "<任意のaudience文字列>",
                  "accounts.google.com:sub":  "<SAの一意ID>"
                }
            }
        }
    ]
}
  • アタッチするポリシー例:IAMReadOnlyAccess

信頼ポリシー設定の注意点

AWS の信頼関係ポリシーでは、Google の ID トークンのクレームが次のようにマッピングされる。
ここを誤ると AccessDenied になるので注意が必要。

  • accounts.google.com:aud → ID トークンの azp(Authorized party)

    • サービスアカウントの OAuth2 クライアント ID が入る
  • accounts.google.com:oaud → ID トークンの aud

    • トークン取得時に指定した audience(任意の文字列) が入る
  • accounts.google.com:sub → ID トークンの sub

    • サービスアカウント固有の一意 ID(数値文字列) が入る

実際に Cloud Functions(Gen2) からトークンを取得して JWT をデコードし、値を確認してから設定するのがおすすめ。以下のコマンドで確認が可能。

# sub(サービスアカウント一意ID)
gcloud iam service-accounts describe <SAメールアドレス> --format='value(uniqueId)'

# aud(azp = OAuth2クライアントID)
gcloud iam service-accounts describe <SAメールアドレス> --format='value(oauth2ClientId)'

# oaud(=audience文字列)は自分で指定した値

(3) Google Cloud Functions(第2世代)を作成

  • ランタイムはPythonを利用
  • AWS SDK (boto3) を使ってListUsersを呼び出し、結果をJSONで返却

サンプルコード:

import os
import json
import datetime
from typing import Any, Dict, List

import requests
import boto3
from botocore.config import Config


ROLE_ARN   = os.getenv("ROLE_ARN", "")
AUDIENCE   = os.getenv("AUDIENCE", "<信頼関係ポリシーのaccounts.google.com:oaudに指定した任意の文字列>")
STS_REGION = os.getenv("STS_REGION", "ap-northeast-1")
DURATION   = int(os.getenv("STS_DURATION", "900"))

MD_URL     = "http://metadata/computeMetadata/v1/instance/service-accounts/default/identity"
MD_HEADERS = {"Metadata-Flavor": "Google"}

def _ok(obj, status=200):
    return (json.dumps(obj), status, {"Content-Type": "application/json"})

def _err(status, msg, detail=None):
    body = {"ok": False, "error": msg}
    if detail is not None:
        body["detail"] = detail
    return _ok(body, status)

def _fmt_ts(ts) -> str | None:
    if ts is None:
        return None
    return ts.astimezone(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")

def _get_id_token(aud: str) -> str:
    r = requests.get(MD_URL, headers=MD_HEADERS, params={"audience": aud, "format": "full"}, timeout=10)
    r.raise_for_status()
    return r.text

def _assume_with_web_identity(token: str) -> Dict[str, str]:
    cfg = Config(retries={"max_attempts": 3, "mode": "standard"})
    sts = boto3.client("sts", region_name=STS_REGION, config=cfg) if STS_REGION else boto3.client("sts", config=cfg)
    resp = sts.assume_role_with_web_identity(
        RoleArn=ROLE_ARN,
        RoleSessionName="bq-remote-func",
        WebIdentityToken=token,
        DurationSeconds=DURATION,
    )
    c = resp["Credentials"]
    return {
        "aws_access_key_id":     c["AccessKeyId"],
        "aws_secret_access_key": c["SecretAccessKey"],
        "aws_session_token":     c["SessionToken"],
    }

def _list_users_and_keys(aws_creds: Dict[str, str]) -> List[Dict[str, Any]]:
    cfg = Config(retries={"max_attempts": 3, "mode": "standard"})
    iam = boto3.client("iam", config=cfg, **aws_creds)

    rows: List[Dict[str, Any]] = []
    for page in iam.get_paginator("list_users").paginate():
        for u in page.get("Users", []):
            aks = iam.list_access_keys(UserName=u["UserName"]).get("AccessKeyMetadata", [])
            if aks:
                for k in aks:
                    rows.append({
                        "UserName":             u["UserName"],
                        "Arn":                  u.get("Arn"),
                        "Path":                 u.get("Path"),
                        "CreateDate":           _fmt_ts(u.get("CreateDate")),
                        "PasswordLastUsed":     _fmt_ts(u.get("PasswordLastUsed")),
                        "AccessKeyId":          k["AccessKeyId"],
                        "AccessKeyStatus":      k["Status"],
                        "AccessKeyCreateDate":  _fmt_ts(k.get("CreateDate")),
                    })
            else:
                rows.append({
                    "UserName":             u["UserName"],
                    "Arn":                  u.get("Arn"),
                    "Path":                 u.get("Path"),
                    "CreateDate":           _fmt_ts(u.get("CreateDate")),
                    "PasswordLastUsed":     _fmt_ts(u.get("PasswordLastUsed")),
                    "AccessKeyId":          None,
                    "AccessKeyStatus":      None,
                    "AccessKeyCreateDate":  None,
                })
    return rows

def get_iam_users(request):
    try:
        # 1) Google メタデータから OIDC ID トークン取得
        aud = request.args.get("aud") or AUDIENCE
        id_token = _get_id_token(aud)

        # 2) STS: AssumeRoleWithWebIdentity
        aws_creds = _assume_with_web_identity(id_token)

        # 3) IAM ユーザー+アクセスキー一覧
        rows = _list_users_and_keys(aws_creds)

        # 4) BigQuery Remote Function 呼び出しかを判定
        req_json = request.get_json(silent=True) or {}
        if isinstance(req_json, dict) and "calls" in req_json:
            return _ok({"replies": [json.dumps(rows)]})

        return _ok({"ok": True, "items": rows})

    except requests.HTTPError as e:
        return _err(502, "failed to get ID token from metadata", {"status": e.response.status_code, "text": e.response.text})
    except Exception as e:
        status = 502 if "AssumeRoleWithWebIdentity" in str(e) else 500
        return _err(status, "unhandled error", {"type": type(e).__name__, "msg": str(e)})

(4) BigQuery remote functionを作成

  • Cloud Runのエンドポイントを指定し、BigQuery SQLから呼び出せるようにする
  • Cloud Runを呼び出すためにはクラウドリソース接続の作成が必要
bq --location=asia-northeast1 \
  mk --connection \
  --connection_type=CLOUD_RESOURCE \
  remote-conn
CREATE OR REPLACE FUNCTION `PROJECT.DATASET.get_aws_iam_users`() 
RETURNS STRING
REMOTE WITH CONNECTION `PROJECT.asia-northeast1.remote-conn`
OPTIONS (
  endpoint = '<Cloud Runのエンドポイント>'
);

4. 実際に動かす

呼び出しはSQLで簡単にできます。テーブルに保存してみる。

CREATE OR REPLACE TABLE `PROJECT.DATASET.aws_iam_users` AS
WITH payload AS (
  SELECT `PROJECT.DATASET.get_aws_iam_users`() AS js
),
arr AS (
  SELECT JSON_EXTRACT_ARRAY(js, '$') AS items FROM payload
)
SELECT
  JSON_VALUE(item, '$.UserName') AS UserName,
  JSON_VALUE(item, '$.Arn') AS Arn,
  JSON_VALUE(item, '$.Path') AS Path,
  TIMESTAMP(JSON_VALUE(item, '$.CreateDate')) AS CreateDate,
  CASE
    WHEN JSON_VALUE(item, '$.PasswordLastUsed') IS NULL THEN NULL
    ELSE TIMESTAMP(JSON_VALUE(item, '$.PasswordLastUsed'))
  END AS PasswordLastUsed,
  JSON_VALUE(item, '$.AccessKeyId') AS AccessKeyId,
  JSON_VALUE(item, '$.AccessKeyStatus') AS AccessKeyStatus,
  CASE
    WHEN JSON_VALUE(item, '$.AccessKeyCreateDate') IS NULL THEN NULL
    ELSE TIMESTAMP(JSON_VALUE(item, '$.AccessKeyCreateDate'))
  END AS AccessKeyCreateDate
FROM arr, UNNEST(items) AS item;

実行すると、IAMユーザー情報がBigQueryの行として返ってきます。

❯ bq head PROJECT:DATASET.aws_iam_users
+----------+-------------------------------------+------+---------------------+------------------+----------------------+-----------------+---------------------+
| UserName |                 Arn                 | Path |     CreateDate      | PasswordLastUsed |     AccessKeyId      | AccessKeyStatus | AccessKeyCreateDate |
+----------+-------------------------------------+------+---------------------+------------------+----------------------+-----------------+---------------------+
| test     | arn:aws:iam::999999999999:user/test | /    | 2025-08-16 15:08:32 |             NULL | AKIARJTXXXXXXXXXXXXX | Active          | 2025-08-16 15:08:45 |
+----------+-------------------------------------+------+---------------------+------------------+----------------------+-----------------+---------------------+

5. まとめと応用

今回は、BigQueryからAWSのIAMユーザー一覧を直接参照する仕組みを構築した。

  • メリット

    • BigQueryからSQLだけでAWSリソース監査が可能
    • IAMユーザー情報を他の社内データとJOINして分析できる
  • 課題

    • 実行ごとにAPIコールが発生するためレイテンシがある
    • 権限設計を誤るとセキュリティリスクが高まる
    • 呼び出し頻度が多いとコストがかかる
  • 応用例

    • AccessKeyの最終使用日をチェックし、セキュリティ監査を自動化
    • 複数AWSアカウントのIAMデータを統合してBigQueryで横断分析
    • IAMポリシーの棚卸しレポートを自動生成
  • 注意点

    • BigQuery Remote Function は 同時実行数に制限(10クエリまで) や BQとCloud Runのリージョン一致要件があるので、本番利用では注意が必要

Appendix. トラブルシューティング

特にFederationのところでパラメータ不一致などの設定ミスが発生しやすいので、以下の認証の流れを参考にして、トラブルシューティングを行うのがよさそう。

A. AccessDenied: Not authorized to perform sts:AssumeRoleWithWebIdentity

  • 確認ポイント

    • AWS の OIDC Provider URL が https://accounts.google.com になっているか
    • 信頼ポリシーの accounts.google.com:aud が Google Cloud 側の AUDIENCE と一致しているか
      • aud/oaud/sub の取り違いがないか

B. failed to get ID token from metadata

  • 原因例

    • 関数の実行サービスアカウントが想定外(メタデータの発行主体がズレる)
    • Cloud Functions(第2世代)の実行環境からメタデータに到達できない設定
  • 対処

    • デプロイ時の --service-account を見直し
    • aud パラメータの指定漏れ/値の不一致を確認

C. BigQuery 側で 401/403/REMOTE function 呼び出し失敗

  • 原因例

    • クラウドリソース接続のサービスアカウントに対して、Cloud Functions(実体は Cloud Run)への roles/run.invoker 付与が未実施
  • 対処(概略)

    1. 接続SAを取得

      bq show --connection --location=asia-northeast1 PROJECT.asia-northeast1.remote-conn \
        --format=json | jq -r '.cloudResource.serviceAccountId'
      
    2. 取得したSAに Invoker を付与

      gcloud run services add-iam-policy-binding <SERVICE_NAME> \
        --region=asia-northeast1 \
        --member=serviceAccount:<上で取得したSA> \
        --role=roles/run.invoker
      
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?