はじめに
前回記事 — MLflowトレースのPIIマスキング — Unity Catalogカラムマスクとai_mask() ETLの2層構成 では、Unity Catalogに書き込まれるMLflowトレースのPIIを、カラムマスク (正規表現) とETLパイプライン (ai_mask() で元テーブル上書き) の2層構成でマスキングする方法を紹介しました。
公開後に、より素直で堅牢なアーキテクチャに思い至りました。ログ収集用テーブルと表示用テーブルを物理的に分離し、Structured Streamingで増分マスキングする構成です。本記事ではこの新方式を解説します。
前回方式と比較した本質的な改善は次のとおりです。
- バッチ間の「PII露出ウィンドウ」が原理的に発生しない (テーブルを物理分離するため)
- MLflow UIのSQL Exec API制約 (
ai_mask()カラムマスクでkey not found) に当たらない (表示用テーブルはすでにマスク済みデータを保持するだけ) - 正規表現マスクが不要になり、
ai_mask()のみで完結 - Structured Streamingのチェックポイントで自動的に増分処理
-
MERGE INTOで冪等性が確保され、再実行・失敗復旧に強い - Unity Catalogの
GRANT/REVOKEで自然にアクセス制御 (UDF内分岐が不要)
UCトレーステーブルのスキーマやMLflow UI上の表示箇所との対応関係は前回記事の調査内容をそのまま使用します。並行して読むと理解しやすいと思います。
アーキテクチャ
LLMアプリは @mlflow.trace でトレースを ログ収集用experiment (pii_raw) に書き込みます。このテーブルはPIIを含む生データを保持し、運用上非公開にします。
Structured Streamingのジョブがログ収集テーブルから増分でレコードを読み取り、ai_mask() を適用したうえで表示用experiment (pii_display) のテーブルに MERGE INTO で書き込みます。
MLflow UIのユーザーは「表示用experiment」を開くため、目にするのは常にマスク済みデータのみです。
| ゾーン | 役割 | アクセス制御 |
|---|---|---|
ログ収集 (pii_raw) |
LLMアプリの書き込み先、PIIを含む生データ | サービスプリンシパルのみ |
| Structured Streaming |
ai_mask() を適用して表示用に転送 |
ジョブとして実行 |
表示用 (pii_display) |
マスク済みデータ、MLflow UI / BIが参照 | 全ユーザーに公開 |
なぜこの構成が優れるのか
PII露出ウィンドウが消える
前回のETL方式は、表示用 = 元テーブルを定期バッチで ai_mask() 上書きする構成でした。バッチが走るまでの間、新しく書き込まれたトレースのPIIはUI経由で見える状態にあります。
新方式では、UIが参照する表示用テーブルには最初からマスク済みのレコードしか入りません。生データのテーブルは別実体で、UCの権限によりUIから到達できません。
MLflow UIの制約に当たらない
前回検証で発覚した「ai_mask() をカラムマスクに使うとMLflow UIのSQL Exec APIで key not found が発生する」問題は、新方式では起きません。表示用テーブルにはあらかじめマスク済みデータが書かれているため、UIのクエリ時に ai_mask() を実行する必要がないためです。
増分処理と冪等性
Structured Streamingのcheckpointが処理済みオフセットを管理するため、ジョブ再実行時は未処理分だけが処理されます。前回ETL方式の「全件CTASしてTRUNCATE + INSERT」は不要です。
書き込みは MERGE INTO ... WHEN NOT MATCHED THEN INSERT WHEN MATCHED THEN UPDATE で行うため、同一バッチが二重実行されても結果は変わりません。
実装
1. セットアップ
ログ収集用と表示用、2つのexperimentを定義し、それぞれ別の table_prefix でUC連携します。
%pip install --upgrade "mlflow>=3.11"
dbutils.library.restartPython()
import os
import mlflow
from mlflow.entities.trace_location import UnityCatalog
CATALOG = "<catalog>"
SCHEMA = "<schema>"
SQL_WAREHOUSE_ID = "<warehouse_id>"
# ログ収集用 (非公開)
RAW_PREFIX = "pii_raw"
RAW_EXPERIMENT_NAME = "/Users/<email>/pii_masking_raw"
# 表示用 (公開)
DISPLAY_PREFIX = "pii_display"
DISPLAY_EXPERIMENT_NAME = "/Users/<email>/pii_masking_display"
os.environ["MLFLOW_TRACING_SQL_WAREHOUSE_ID"] = SQL_WAREHOUSE_ID
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")
# ログ収集用 experiment
mlflow.set_experiment(
experiment_name=RAW_EXPERIMENT_NAME,
trace_location=UnityCatalog(
catalog_name=CATALOG,
schema_name=SCHEMA,
table_prefix=RAW_PREFIX,
),
)
# 表示用 experiment
mlflow.set_experiment(
experiment_name=DISPLAY_EXPERIMENT_NAME,
trace_location=UnityCatalog(
catalog_name=CATALOG,
schema_name=SCHEMA,
table_prefix=DISPLAY_PREFIX,
),
)
# LLMアプリは raw に書き込む
mlflow.set_experiment(experiment_name=RAW_EXPERIMENT_NAME)
# テーブル名
RAW_SPANS_TABLE = f"{CATALOG}.{SCHEMA}.{RAW_PREFIX}_otel_spans"
RAW_ANNOTATIONS_TABLE = f"{CATALOG}.{SCHEMA}.{RAW_PREFIX}_otel_annotations"
DISPLAY_SPANS_TABLE = f"{CATALOG}.{SCHEMA}.{DISPLAY_PREFIX}_otel_spans"
DISPLAY_ANNOTATIONS_TABLE = f"{CATALOG}.{SCHEMA}.{DISPLAY_PREFIX}_otel_annotations"
# チェックポイント
CHECKPOINT_BASE = f"/tmp/pii_masking/{RAW_PREFIX}_to_{DISPLAY_PREFIX}"
SPANS_CHECKPOINT = f"{CHECKPOINT_BASE}/spans"
ANNOTATIONS_CHECKPOINT = f"{CHECKPOINT_BASE}/annotations"
2. PIIを含むトレースの生成 (デモ用)
ログ収集用experimentに対し、PIIを含むクエリでトレースを記録します。
@mlflow.trace(name="customer_query", span_type="AGENT")
def handle_query(query: str) -> str:
context = retrieve(query)
return generate(query, context)
@mlflow.trace(name="retrieve", span_type="RETRIEVER")
def retrieve(query: str) -> str:
return f"顧客DB検索結果: {query}"
@mlflow.trace(name="generate", span_type="LLM")
def generate(query: str, context: str) -> str:
return f"回答: {query} に対する応答です。"
handle_query("田中太郎 (tanaka.taro@example.com, 090-1234-5678) の注文状況を確認してください")
handle_query("鈴木花子 (suzuki.hanako@example.co.jp, 03-9876-5432) のアカウント情報")
handle_query("佐藤一郎 (sato.ichiro@company.jp, 080-5555-1234) です。カード番号 4111-1111-1111-1111 で決済エラー")
3. 表示用テーブルの準備
表示用テーブルは、ログ収集テーブルと同じスキーマで空テーブルとして作成します。WHERE 1=0 でスキーマだけを継承します。
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {DISPLAY_SPANS_TABLE}
AS SELECT * FROM {RAW_SPANS_TABLE} WHERE 1=0
""")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {DISPLAY_ANNOTATIONS_TABLE}
AS SELECT * FROM {RAW_ANNOTATIONS_TABLE} WHERE 1=0
""")
4. Structured Streamingパイプライン
foreachBatch でバッチごとに ai_mask() を適用し、MERGE INTO で表示用テーブルに反映します。
PII_TYPES = "array('PersonName', 'Email', 'PhoneNumber', 'Address', 'CreditCardNumber')"
def mask_spans_batch(batch_df, batch_id):
batch_df.createOrReplaceTempView("_pii_batch_spans")
if spark.sql("SELECT COUNT(*) AS c FROM _pii_batch_spans").first().c == 0:
return
spark.sql(f"""
MERGE INTO {DISPLAY_SPANS_TABLE} AS target
USING (
SELECT
* EXCEPT(attributes),
COALESCE(
try_parse_json(ai_mask(CAST(attributes AS STRING), {PII_TYPES})),
attributes
) AS attributes
FROM _pii_batch_spans
) AS source
ON target.trace_id = source.trace_id AND target.span_id = source.span_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
print(f" Spans batch {batch_id}: マスク完了")
def mask_annotations_batch(batch_df, batch_id):
batch_df.createOrReplaceTempView("_pii_batch_annotations")
if spark.sql("SELECT COUNT(*) AS c FROM _pii_batch_annotations").first().c == 0:
return
spark.sql(f"""
MERGE INTO {DISPLAY_ANNOTATIONS_TABLE} AS target
USING (
SELECT
* EXCEPT(value),
COALESCE(
try_parse_json(ai_mask(CAST(value AS STRING), {PII_TYPES})),
value
) AS value
FROM _pii_batch_annotations
) AS source
ON target.annotation_id = source.annotation_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
ポイント:
MERGE INTOはカラム名マッチなので、* EXCEPT(attributes)で対象カラムを除外して末尾に追加し直しても位置ずれが起きません。INSERT INTOは位置マッチのため同じ書き方ができません。
ストリームを起動します。
# Spans
spans_query = (
spark.readStream
.table(RAW_SPANS_TABLE)
.writeStream
.foreachBatch(mask_spans_batch)
.option("checkpointLocation", SPANS_CHECKPOINT)
.trigger(availableNow=True) # 未処理分を処理して停止 (スケジュール実行向き)
.start()
)
spans_query.awaitTermination()
# Annotations
annotations_query = (
spark.readStream
.table(RAW_ANNOTATIONS_TABLE)
.writeStream
.foreachBatch(mask_annotations_batch)
.option("checkpointLocation", ANNOTATIONS_CHECKPOINT)
.trigger(availableNow=True)
.start()
)
annotations_query.awaitTermination()
5. 動作確認
ログ収集テーブルと表示用テーブル、および mlflow.search_traces() で結果を確認します。
# ログ収集テーブル (生データ)
display(spark.sql(f"""
SELECT trace_id, name, CAST(attributes AS STRING) AS attributes_str
FROM {RAW_SPANS_TABLE} WHERE name = 'customer_query' LIMIT 3
"""))
# 表示用テーブル (マスク済み)
display(spark.sql(f"""
SELECT trace_id, name, CAST(attributes AS STRING) AS attributes_str
FROM {DISPLAY_SPANS_TABLE} WHERE name = 'customer_query' LIMIT 3
"""))
# 表示用 experiment 経由 (MLflow UI と同じ参照経路)
display_experiment = mlflow.get_experiment_by_name(DISPLAY_EXPERIMENT_NAME)
traces_df = mlflow.search_traces(experiment_ids=[display_experiment.experiment_id])
ログ収集テーブルにはPIIがそのまま含まれているのに対し、表示用テーブルではPII該当箇所が ai_mask() のマスクトークンに置換されています。mlflow.search_traces() で表示用experimentのIDを指定して取得した結果も同様で、MLflow UIで表示用experimentを開いたときと同じ「マスク済みのみ」のビューになります。
6. 増分処理の確認
追加トレースを生成して再度ストリームを起動すると、checkpointにより新規分だけが処理されます。
handle_query("山田花子 (yamada.hanako@test.jp, 070-9999-8888) の請求書を再送してください")
# 同じパイプラインを再実行 — checkpoint により増分のみ処理される
spans_query = (
spark.readStream
.table(RAW_SPANS_TABLE)
.writeStream
.foreachBatch(mask_spans_batch)
.option("checkpointLocation", SPANS_CHECKPOINT)
.trigger(availableNow=True)
.start()
)
spans_query.awaitTermination()
アクセス制御
ログ収集テーブルを「非公開」、表示用テーブルを「公開」にすることで、UI経由および直接のSQLアクセスのいずれからもPIIを保護します。
-- ログ収集テーブル: マスキングジョブのサービスプリンシパルのみ
REVOKE SELECT ON TABLE <catalog>.<schema>.pii_raw_otel_spans FROM `account users`;
REVOKE SELECT ON TABLE <catalog>.<schema>.pii_raw_otel_annotations FROM `account users`;
GRANT SELECT ON TABLE <catalog>.<schema>.pii_raw_otel_spans TO `pii_masking_sp`;
GRANT SELECT ON TABLE <catalog>.<schema>.pii_raw_otel_annotations TO `pii_masking_sp`;
-- 表示用テーブル: 全ユーザー
GRANT SELECT ON TABLE <catalog>.<schema>.pii_display_otel_spans TO `account users`;
GRANT SELECT ON TABLE <catalog>.<schema>.pii_display_otel_annotations TO `account users`;
experiment自体のアクセス権も、pii_masking_raw を管理者限定、pii_masking_display を全ユーザー閲覧可、と分けておきます。
本番運用ガイド
トリガーの選び方
| トリガー | 動作 | 用途 |
|---|---|---|
trigger(availableNow=True) |
未処理分を処理して停止 | Databricks Jobsでスケジュール実行 (推奨) |
trigger(processingTime="5 minutes") |
一定間隔で継続実行 | 常時稼働クラスタで露出ウィンドウを最小化 |
スケジュール実行 + 5〜10分間隔で十分なケースが多く、その場合は availableNow=True をJobsから定期起動する構成がコスト効率に優れます。
チェックポイントのリセット
全件を再処理したい場合はcheckpointを削除します。
dbutils.fs.rm(SPANS_CHECKPOINT, recurse=True)
dbutils.fs.rm(ANNOTATIONS_CHECKPOINT, recurse=True)
マスキング失敗時のフォールバック
COALESCE(try_parse_json(ai_mask(...)), 元の値) により、ai_mask() がエラーを返した行は元の値が保持され、レコード自体が落ちることはありません。ただしこの場合PIIが表示用に流れ込む可能性があるため、運用ではフォールバック発生件数をメトリクスとして監視するのが望ましいです。
前回方式との比較
| 観点 | 前回 (カラムマスク + ETL 2層) | 今回 (テーブル物理分離 + Streaming) |
|---|---|---|
| PII露出ウィンドウ | バッチ間に発生 | 原理的に発生しない |
ai_mask() の使い方 |
ETLで元テーブル上書き | Streamingで表示用に増分転送 |
| MLflow UI制約 | カラムマスクに ai_mask() を使えない |
制約なし |
| 全件処理 | CTASで毎回全件 | checkpointで増分のみ |
| 冪等性 | TRUNCATE + INSERTで実質的に確保 |
MERGE INTO で確保 |
| アクセス制御 | UDF内で is_member() 分岐 |
UCの GRANT / REVOKE で自然に |
| 構成の複雑さ | 正規表現 + ETLの2層 |
ai_mask() Streamingの1経路 |
| ストレージ | 元テーブル1セット | raw + display の2セット |
新方式に固有の追加コストは「テーブルを2セット持つ」点で、表示用テーブルはrawからの派生のためストレージは約2倍になります。これが許容できる規模であれば、ほぼ全方位で新方式が優れます。
環境
- Databricks Runtime 14.3 LTS 以上
- MLflow 3.11 以上 (UC統合トレース)
- Unity Catalog 有効
- Foundation Model API 有効 (
ai_mask()に必要) - Databricks Jobs (スケジュール実行用)
参考
- 前回記事: MLflowトレースのPIIマスキング — Unity Catalogカラムマスクとai_mask() ETLの2層構成
- Unity Catalogへのトレースの保存
- AI Functions
- Structured Streaming
- DeltaテーブルにアップサートするMERGE

