はじめに
この度、自社サービスのカスタマーサポート担当者向けに、カスタマーサポートに特化した AI アシスタント(RAG アプリ)を構築しました。
その際に、念のため AI に学習させるデータから個人情報を検出/匿名化する必要があったのですが、BigQuery のリモート関数経由で DLP API を利用したお話について書いていきます。
なお、DLP API はあらゆる個人情報を完全に検出できるわけではありません。検出漏れや誤検出が発生する可能性があります。
アウトプット
以下のような形で、BigQuery リモート関数(DLP API)を利用し、 個人情報(サンプルデータ)を検出/匿名化することができました。
上記アウトプットに利用した SQL は以下で、dlp_deidentify_textが今回定義したリモート関数になります。
SELECT
original_text,
`{PROJECT_ID}.bq_remote_fn.dlp_deidentify_text`(original_text) AS masked_text
FROM UNNEST([
'私の名前は、山田太郎です。',
'メールアドレスは、hoge@example.com です'
]) AS original_text;
BigQuery リモート関数とは
概要
先ほどのアウトプットのように、BigQuery の SQL(リモート関数)から Cloud Run Functions / Cloud Run にデプロイされたエンドポイントに対して POST リクエストを送信し、そのレスポンスを SQL の値として扱うことができます。
ゆえに、組み込み関数やユーザー定義関数以外の処理を実現できることがリモート関数の最大の強みになります。
なお、今回は Cloud Run Fucntions に「与えられたテキストから個人情報を検出し、匿名化したテキストを返す」関数をデプロイします。
個人情報の検出/匿名化には、Cloud Data Loss Prevention に含まれる DLP API を利用します。
Cloud Run Functions でエンドポイントを作成する
前提
Cloud Run Functions で関数を作成する前に、関数の入力/出力形式について整理しておきます。
- 入力形式:リクエストボディ(POST/JSON)/
BigQuery → Cloud Run Fn - 出力形式:レスポンスボディ(JSON)/
Cloud Run Fn → BigQuery
また、BigQuery リモート関数がサポートしている引数/戻り値のデータ型は以下を参照します。
入力形式
リクエストボディの形式は、以下の通りとなります。
| フィールド名 | データ型 | 必須 | 説明 |
|---|---|---|---|
| requestId | 文字列 | ◯ | リクエストの ID |
| caller | 文字列 | ◯ | クエリのジョブの完全なリソース名 |
| sessionUser | 文字列 | ◯ | クエリ実行ユーザーのメールアドレス |
| userDefinedContext | JSON オブジェクト |
リモート関数実行時の ユーザー定義のコンテキスト |
|
| calls | JSON 配列 | ◯ | 入力データのバッチ |
以下は、リクエストボディのサンプル値となります。
リクエストの ID などメタデータの情報が多く含まれており、関数内で利用するのはcallsフィールドの値のみとなりそうです。
{
"requestId": "123abc",
"caller": "//bigquery.googleapis.com/projects/...",
"sessionUser": "atsuhiro.sakai@example.com",
"userDefinedContext": {},
"calls": [
["私の名前は、山田太郎です。"],
["メールアドレスは、hoge@example.com です"]
]
}
出力形式
続いて、レスポンスは以下の形式で返却する必要があります。
| フィールド名 | データ型 | 必須 | 説明 |
|---|---|---|---|
| replies | JSON 配列 | HTTP ステータスコードが200 の場合の 戻り値のバッチ |
|
| errorMessage | 文字列 | 200 以外の HTTP レスポンスコードが 返却される場合のエラーメッセージ |
-
replies- 配列のサイズを HTTP リクエスト内の
callsの配列のサイズと一致させる必要がある
- 配列のサイズを HTTP リクエスト内の
-
errorMessage- 再試行不可能なエラーの場合
- BigQuery ジョブのエラーメッセージの一部として返却される
- サイズは 1 KB 未満にする必要がある
- 再試行不可能なエラーの場合
成功時のレスポンス例
{
"replies": [
"私の名前は、[PERSON_NAME]です",
"メールアドレスは、[EMAIL_ADDRESS] です"
]
}
なお、バッチ内の1行だけ失敗した場合は、全体を失敗とせず該当行のみをnullにして返す実装としました。その結果、BigQuery のクエリ結果でも該当行がnullとなります。
これは、後述のバッチ全体の再試行を防ぎ、成功していた行まで再び処理されるのを避けるためです。
ただ、nullのままでは失敗を検知できないため、Dataform のアサーションで失敗を検知しています。
失敗時のレスポンスの例
{
"errorMessage": "..."
}
HTTP レスポンスコードが 200 以外の場合、BigQuery 側はレスポンス全体を失敗と見なしクエリ全体をエラーとします。
公式ドキュメントに記載の通り、BigQuery は特定の HTTP ステータスコードを受け取ると自動で POST リクエストが再試行されるようです。
例外を放置していると再試行が繰り返され、コストや外部 API のクォータ消費に繋がる可能性があるため注意が必要です。
BigQuery からの再試行を最小限に抑えるには、失敗したレスポンスに 408、429、500、503、504 以外の HTTP レスポンス コードを使用し、関数コード内のすべての例外をキャッチします。それ以外の場合、キャッチされない例外に対して HTTP サービス フレームワークは自動的に 500 を返します。失敗したデータ パーティションまたはクエリが BigQuery によって再試行されると、HTTP リクエストが再試行されることがあります。
リモート関数を使用する | BigQuery | Google Cloud Documentation
関数を作成する
技術選定
今回の関数の作成には、私が普段使い慣れている Golang を利用しました。
加えて、Cloud Run Functions と互換性のあるエントリーポイントを簡単に定義できる functions-framework-go を利用しました。
フレームワークの利用により HTTP サーバーやルーティングの実装を自前で書かずに関数のロジックの作成に集中できます。
具体的には、以下のような数十行程度のコード(サンプルコード)で HTTP サーバーの起動と関数の登録が完了します。
package function
import (
"net/http"
"github.com/GoogleCloudPlatform/functions-framework-go/functions"
)
func init() {
functions.HTTP("HelloWorld", helloWorld)
}
func helloWorld(w http.ResponseWriter, r *http.Request) {
...
}
package main
import (
"log"
"os"
_ "example.com/hello"
"github.com/GoogleCloudPlatform/functions-framework-go/funcframework"
)
func main() {
port := "8080"
if envPort := os.Getenv("PORT"); envPort != "" {
port = envPort
}
hostname := ""
if localOnly := os.Getenv("LOCAL_ONLY"); localOnly == "true" {
hostname = "127.0.0.1"
}
if err := funcframework.StartHostPort(hostname, port); err != nil {
log.Fatalf("funcframework.StartHostPort: %v\n", err)
}
}
プロジェクト構造
今回のプロジェクト構造と主なファイルの役割は、以下の通りです。
-
cmd/main.go- logger, config, DLP クライアントを作成
- 対象関数の service と handler を組み立てて Functions Framework に登録
- BigQuery からの HTTP リクエストを受け付けるサーバーを起動
-
dlp_deidentify_text/のフォルダ配下は、以下の役割に分かれています-
handler.go- POST リクエストを検証し、結果を HTTP レスポンスとして返す
-
service.go- 外部 API 呼び出し、個人情報の匿名化の処理を行う
-
types.go- HTTP レスポンス JSON の構造を定義する
-
register.go- 関数名(DlpDeidentifyText)と handler を Functions Framework に結びつける
-
bq-remote-fn/
├── cmd/
│ └── main.go # App entry point
├── config/
│ ├── config.go # Config load and validation
│ └── config.toml.tmpl # Config template
├── shared/
│ ├── types.go # Shared request type
│ └── http.go # Shared HTTP helpers
├── dlp_deidentify_text/
│ ├── handler.go # HTTP handler
│ ├── service.go # Deidentification logic
│ ├── types.go # Response type
│ └── register.go # Function registration
├── .envrc.sample # Env var sample
├── .gitignore
├── Makefile # Run and deploy tasks
├── README.md
├── go.mod
└── go.sum
Deidentification Logic
個人情報を検出/匿名化する部分を記述したservice.goの重要な箇所は、以下の通りです。
-
InfoTypes:検出したい個人情報の Type を指定する(InfoTypeの一覧) -
PrimitiveTransformation:検出した個人情報の変換方法を指定する(変換方法の一覧)
package dlp_deidentify_text
import (
"context"
"fmt"
"log/slog"
dlp "cloud.google.com/go/dlp/apiv2"
"cloud.google.com/go/dlp/apiv2/dlppb"
"github.com/{ORGANIZATION_NAME}/bq-remote-fn/config"
)
type Service struct {
logger *slog.Logger
cfg *config.Config
client *dlp.Client
}
func NewService(logger *slog.Logger, cfg *config.Config, client *dlp.Client) *Service {
return &Service{logger: logger, cfg: cfg, client: client}
}
func (s *Service) DeidentifyText(ctx context.Context, text string) (*string, error) {
s.logger.Info("deidentify started")
if text == "" {
s.logger.Error("text is empty")
return nil, nil
}
req := buildDeidentifyRequest(s.cfg, text)
resp, err := s.client.DeidentifyContent(ctx, req)
if err != nil {
s.logger.Error("dlp request failed", "error", err)
return nil, nil
}
out, err := extractDeidentifiedText(resp)
if err != nil {
s.logger.Error("failed to parse dlp response", "error", err)
return nil, nil
}
s.logger.Info("deidentify completed")
return &out, nil
}
func buildDeidentifyRequest(cfg *config.Config, text string) *dlppb.DeidentifyContentRequest {
parent := fmt.Sprintf("projects/%s/locations/global", cfg.Main.ProjectID)
return &dlppb.DeidentifyContentRequest{
Parent: parent,
Item: &dlppb.ContentItem{
DataItem: &dlppb.ContentItem_Value{
Value: text,
},
},
DeidentifyConfig: &dlppb.DeidentifyConfig{
Transformation: &dlppb.DeidentifyConfig_InfoTypeTransformations{
InfoTypeTransformations: &dlppb.InfoTypeTransformations{
Transformations: []*dlppb.InfoTypeTransformations_InfoTypeTransformation{
{
InfoTypes: []*dlppb.InfoType{
{Name: "EMAIL_ADDRESS"},
{Name: "PERSON_NAME"},
{Name: "PHONE_NUMBER"},
},
PrimitiveTransformation: &dlppb.PrimitiveTransformation{
Transformation: &dlppb.PrimitiveTransformation_ReplaceWithInfoTypeConfig{
ReplaceWithInfoTypeConfig: &dlppb.ReplaceWithInfoTypeConfig{},
},
},
},
},
},
},
},
}
}
func extractDeidentifiedText(resp *dlppb.DeidentifyContentResponse) (string, error) {
if resp == nil || resp.GetItem() == nil {
return "", fmt.Errorf("empty dlp response")
}
return resp.GetItem().GetValue(), nil
}
Deploy
すべてのファイルの作成が完了したら、以下の make コマンドを利用して Cloud Run Functions へデプロイを行います。
なお、Cloud Run Functions をデプロイするリージョンは、後ほど作成するリモート関数を登録する BigQuery データセット/接続のリージョンと揃える必要があります。
GOOGLE_CLOUD_PROJECT_ID=replace with your GCP project ID
GOOGLE_CLOUD_REGION=replace with your Cloud Run region
# Deploy the function to Cloud Run (auth required, internal ingress only)
deploy-dlp:
gcloud run deploy bq-remote-fn-dlp-deidentify-text \
--project=$(GOOGLE_CLOUD_PROJECT_ID) \
--region=$(GOOGLE_CLOUD_REGION) \
--source=. \
--no-allow-unauthenticated \
--ingress=internal \
--set-env-vars=FUNCTION_TARGET=DlpDeidentifyText
以上で、関数の作成は完了となります。
BigQuery リモート関数を作成する
接続を作成する
以下の手順に沿って、BigQuery 上の接続を作成します。
- BigQuery > エクスプローラーペイン > 接続 > 接続を作成 をクリックする
- 接続タイプ は、以下を選択する
a.Vertex AI リモートモデル、リモート関数、BigLake、Spanner(Cloud リソース) - 接続 IDには、任意の文字列を入力する
- ローケーション タイプを選択し、接続を作成する
その後、作成した接続の接続情報ペインに移動しサービスアカウント ID をコピーします。
アクセス権限を設定する
先ほどコピーしたサービスアカウント ID に対して、Cloud Run 起動元の IAM ロールを付与します。
リモート関数を作成する
以下の SQL をコンソールで実行すると、リモート関数が利用できるようになります。
なお、リモート関数を登録する BigQuery データセットbq_remote_fnは事前に作成しておく必要があります。
CREATE OR REPLACE FUNCTION `{PROJECT_ID}.bq_remote_fn.dlp_deidentify_text`(text STRING)
RETURNS STRING
REMOTE WITH CONNECTION `{PROJECT ID}.us.bq-remote-fn-dlp-deidentify-text`
OPTIONS (
endpoint = '{ENDPOINT_URL}',
);
以上で、すべての作業が完了となります。
以下のような形で、リモート関数が利用できるようになっていると思います。
SELECT
original_text,
`{PROJECT_ID}.bq_remote_fn.dlp_deidentify_text`(original_text) AS masked_text
FROM UNNEST([
'私の名前は、山田太郎です。',
'メールアドレスは、hoge@example.com です'
]) AS original_text;
おまけ
表題の内容とは別に、社内メンバーから依頼があり Rakuten Books Book Search API を利用して ISBN(国際標準図書番号)をもとに、書籍情報を返すリモート関数も作成しました。
具体的には、以下のような SQL を実行すると下記の JSON が返却されます。
SELECT
`{PROJECT_ID}.bq_remote_fn.rakuten_books_by_isbn`('9784101010014') AS book;
{
"affiliateUrl": "",
"author": "夏目漱石",
"authorKana": "ナツメ,ソウセキ",
"availability": "1",
"booksGenreId": "001019001/001004008005",
"chirayomiUrl": "",
"contents": "",
"discountPrice": 0,
"discountRate": 0,
"isbn": "9784101010014",
"itemCaption": "",
"itemPrice": 693,
"itemUrl": "https://books.rakuten.co.jp/rb/1656073/",
"largeImageUrl": "https://thumbnail.image.rakuten.co.jp/@0_mall/book/cabinet/0014/9784101010014.jpg?_ex=200x200",
"limitedFlag": 0,
"listPrice": 0,
"mediumImageUrl": "https://thumbnail.image.rakuten.co.jp/@0_mall/book/cabinet/0014/9784101010014.jpg?_ex=120x120",
"postageFlag": 2,
"publisherName": "新潮社",
"reviewAverage": "3.96",
"reviewCount": 261,
"salesDate": "2003年06月",
"seriesName": "新潮文庫",
"seriesNameKana": "シンチョウ ブンコ",
"size": "文庫",
"smallImageUrl": "https://thumbnail.image.rakuten.co.jp/@0_mall/book/cabinet/0014/9784101010014.jpg?_ex=64x64",
"subTitle": "",
"subTitleKana": "",
"title": "吾輩は猫である改版",
"titleKana": "ワガハイ ワ ネコ デ アル"
}
例えばですが、書籍関連の広告運用時のキャンペーンデータに ISBN の値が含まれていれば、詳細な書籍情報と紐付けてより高度な分析が可能となりそうです。
おわりに
BigQuery リモート関数を活用することで、BigQuery 単体では実現できない処理を SQL の中で実現することができました。
今後は Cloud Run にデプロイした ADK エージェントをリモート関数から呼び出すなどより幅広い活用にも挑戦していきたいと考えています。
どなたかの参考になれば幸いです。
参考



