はじめに
Databricks上でエージェントのトレースデータ(MLflow + Unity Catalog統合)に含まれるPII(個人識別情報)を、テキスト全体ではなく該当部分のみマスキングする方法を検証しました。
Unity Catalogの標準的なカラムマスキングはカラム全体を置換しますが、本記事ではカスタム関数を使い、テキスト内のPII部分だけをマスクする手法を紹介します。
やりたいこと
- OTELトレースがDelta Tableに書き込まれる
- テーブル内のトレースデータに含まれるPII部分のみをマスク
- マスクした結果をMLflow UIのTracesタブで確認
- Admin以外のユーザーにはPIIが見えない状態にする
結論: 2層構成を推奨
検証の結果、以下の2つの手法を併用する構成が現時点のベストプラクティスです。
| 手法 | PII検出範囲 | MLflow UI反映 | 用途 |
|---|---|---|---|
| カラムマスク(正規表現) | メール・電話・クレカ番号 | リアルタイム | 即時マスク |
ETLパイプライン(ai_mask()) |
上記 + 日本語氏名・住所 | バッチ処理後 | 全PIIマスク |
前提: UCトレース連携
MLflow 3.11以降では、trace_location パラメータでトレースの格納先をUnity Catalog内のDelta Tableに指定できます。
import mlflow
from mlflow.entities.trace_location import UnityCatalog
mlflow.set_experiment(
experiment_name="/Users/<your-email>/my_experiment",
trace_location=UnityCatalog(
catalog_name="<catalog>",
schema_name="<schema>",
table_prefix="<prefix>",
),
)
これにより以下のテーブルが自動生成されます:
-
<prefix>_otel_spans— スパンデータ(入力/出力を含む) -
<prefix>_otel_annotations— メタデータ・タグ・アセスメント -
<prefix>_otel_logs/<prefix>_otel_metrics
MLflow UIのTracesタブはこれらのテーブルを直接参照するため、テーブルにカラムマスクを適用すればUIにも反映されます。
手法1: カラムマスク(正規表現)
マスク対象テーブルの構造
検証の過程で、MLflow UIの各表示箇所がどのテーブル・カラムから読み取られているかを特定しました。この情報は公式ドキュメントには記載されていません。
| テーブル | カラム | 型 | UI上の表示箇所 |
|---|---|---|---|
_otel_spans |
attributes |
VARIANT | トレース詳細の入力/出力 |
_otel_annotations |
value |
VARIANT | トレース一覧のタイトル(_trace_metadata ビュー経由) |
マスク関数の作成
_otel_spans.attributes 用の関数です。VARIANT型をSTRINGにキャスト → 正規表現でマスク → parse_json() でVARIANTに戻します。
CREATE OR REPLACE FUNCTION <catalog>.<schema>.pii_mask_spans(input_val VARIANT)
RETURNS VARIANT
RETURN
CASE
WHEN is_member('pii_admin') THEN input_val
ELSE parse_json(
regexp_replace(
regexp_replace(
regexp_replace(
CAST(input_val AS STRING),
'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+[.][a-zA-Z]{2,}',
'[EMAIL]'
),
'[0-9]{4}-[0-9]{4}-[0-9]{4}-[0-9]{4}',
'[CREDIT_CARD]'
),
'0[0-9]{1,3}-[0-9]{2,4}-[0-9]{3,4}',
'[PHONE]'
)
)
END
_otel_annotations.value 用はJSON(メタデータ)と単純文字列(タグ名等)が混在するため、annotation_type を参照してMETADATA行のみマスクします。
CREATE OR REPLACE FUNCTION <catalog>.<schema>.pii_mask_annotations(
input_val VARIANT, annotation_type STRING
)
RETURNS VARIANT
RETURN
CASE
WHEN is_member('pii_admin') THEN input_val
WHEN annotation_type = 'METADATA' THEN
parse_json(
-- 上記と同じ regexp_replace のネスト
)
ELSE input_val
END
テーブルへの適用
ALTER TABLE <prefix>_otel_spans
ALTER COLUMN attributes
SET MASK <catalog>.<schema>.pii_mask_spans;
ALTER TABLE <prefix>_otel_annotations
ALTER COLUMN value
SET MASK <catalog>.<schema>.pii_mask_annotations USING COLUMNS (annotation_type);
USING COLUMNS (annotation_type) がポイントです。カラムマスク関数に他カラムの値を追加パラメータとして渡すことで、行ごとに処理を分岐できます。
結果
MLflow UIのTracesタブで、トレースのタイトル・入力・出力のPII部分がマスクされた状態で表示されます。pii_admin グループのメンバーには生データがそのまま表示されます。
手法2: ETLパイプライン(ai_mask())
なぜカラムマスクに ai_mask() を使えないのか
ai_mask() は日本語氏名や住所も検出可能なAI関数ですが、バッチ推論に最適化されており、カラムマスク(クエリ時に毎行リアルタイム実行)として使用すると、MLflow UIのSQL Exec APIでエラー(key not found)が発生します。
ノートブック上の spark.sql / display や mlflow.search_traces() では ai_mask() のカラムマスクは正常に動作するため、MLflow UIのSQL Exec API固有の制限です。
この制限はDatabricks社内の担当チームにフィードバック済みです。
ETLパイプラインによる上書き
カラムマスクの代わりに、バッチ処理で ai_mask() を実行し元テーブルを上書きします。処理の流れは次のとおりです。
元テーブル → CTAS(ai_mask適用) → 一時テーブル → TRUNCATE + INSERT → 元テーブル上書き
UPDATE 文で直接 ai_mask() を使うと非決定的関数のエラーになるため、CTAS(CREATE TABLE AS SELECT)で一時テーブルを経由する必要があります。
PII_TYPES = "array('PersonName', 'Email', 'PhoneNumber', 'Address', 'CreditCardNumber')"
# カラム名を動的に取得し、対象カラムのみ ai_mask() で置換
def build_select(table, target_col, mask_expr):
rows = spark.sql(f"DESCRIBE TABLE {table}").collect()
seen = set()
cols = []
for r in rows:
name = r.col_name
if name.startswith("#") or name in seen or name == "":
continue
seen.add(name)
cols.append(name)
exprs = [f"{mask_expr} AS `{c}`" if c == target_col else f"`{c}`" for c in cols]
return f"SELECT {', '.join(exprs)} FROM {table}"
# ai_mask() でマスク(失敗時は元の値にフォールバック)
spans_mask = f"COALESCE(try_parse_json(ai_mask(CAST(attributes AS STRING), {PII_TYPES})), attributes)"
spans_select = build_select(SPANS_TABLE, "attributes", spans_mask)
spark.sql(f"CREATE OR REPLACE TABLE tmp_masked AS {spans_select}")
# 元テーブルを上書き
spark.sql(f"TRUNCATE TABLE {SPANS_TABLE}")
spark.sql(f"INSERT INTO {SPANS_TABLE} SELECT * FROM tmp_masked")
spark.sql("DROP TABLE tmp_masked")
COALESCE(try_parse_json(...), 元の値) で、ai_mask() が失敗した行もデータが欠落しないようにしています。
検証で判明した制限事項
| 問題 | 原因 | 対処 |
|---|---|---|
ai_mask() のカラムマスクがMLflow UIで動作しない |
バッチ推論最適化。SQL Exec API経由でセッション破綻 | カラムマスクには正規表現、ai_mask() はETLで使用 |
| トレーステーブルのスキーマがドキュメントに未記載 | UC統合トレースのPublic Preview段階 | DESCRIBE TABLE / SHOW CREATE TABLE で調査 |
| UIのタイトルと詳細が別テーブルから参照 |
_trace_metadata ビューが _otel_annotations から読み取る |
両テーブルにマスクを適用 |
_otel_annotations.value にJSONと文字列が混在 |
annotation_type で異なるデータ型が同一カラムに格納 |
USING COLUMNS (annotation_type) でMETADATA行のみマスク |
SQL UDFで \d が正しく解釈されない |
Python f-string → SQL文字列リテラル間のエスケープ問題 |
[0-9] を使用 |
UPDATE 文で ai_mask() が使えない |
非決定的関数はUPDATEに使用不可 | CTASで一時テーブル経由 |
本番運用の推奨構成
トレース書き込みからMLflow UIでの確認まで、2層がどのタイミングで作用するかを以下に整理します。
- トレース書き込み時: カラムマスク(正規表現)が即時適用され、MLflow UI上でメールアドレス・電話番号・クレジットカード番号がマスク済みになります。
-
定期バッチ実行後(例: 1時間ごと): ETLが
ai_mask()で全PIIを上書きし、MLflow UI上で日本語氏名・住所もマスク済みになります。
| ユーザー | カラムマスク適用時 | ETL実行後 |
|---|---|---|
pii_admin グループ |
生データ | ai_mask済み |
| その他 | 正規表現マスク済み | ai_mask済み |
環境
- Databricks Runtime 14.3 LTS
- MLflow 3.12.0(3.11以上が必要)
- Unity Catalog 有効
- Foundation Model API 有効(
ai_mask()に必要)






