1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

MLflowトレースのPIIマスキング (続編) — テーブル物理分離 + Structured Streaming

1
Posted at

はじめに

前回記事 — MLflowトレースのPIIマスキング — Unity Catalogカラムマスクとai_mask() ETLの2層構成 では、Unity Catalogに書き込まれるMLflowトレースのPIIを、カラムマスク (正規表現) とETLパイプライン (ai_mask() で元テーブル上書き) の2層構成でマスキングする方法を紹介しました。

公開後に、より素直で堅牢なアーキテクチャに思い至りました。ログ収集用テーブルと表示用テーブルを物理的に分離し、Structured Streamingで増分マスキングする構成です。本記事ではこの新方式を解説します。

前回方式と比較した本質的な改善は次のとおりです。

  • バッチ間の「PII露出ウィンドウ」が原理的に発生しない (テーブルを物理分離するため)
  • MLflow UIのSQL Exec API制約 (ai_mask() カラムマスクで key not found) に当たらない (表示用テーブルはすでにマスク済みデータを保持するだけ)
  • 正規表現マスクが不要になり、ai_mask() のみで完結
  • Structured Streamingのチェックポイントで自動的に増分処理
  • MERGE INTO で冪等性が確保され、再実行・失敗復旧に強い
  • Unity Catalogの GRANT / REVOKE で自然にアクセス制御 (UDF内分岐が不要)

UCトレーステーブルのスキーマやMLflow UI上の表示箇所との対応関係は前回記事の調査内容をそのまま使用します。並行して読むと理解しやすいと思います。

アーキテクチャ

pii_streaming_fig1_architecture.png

LLMアプリは @mlflow.trace でトレースを ログ収集用experiment (pii_raw) に書き込みます。このテーブルはPIIを含む生データを保持し、運用上非公開にします。

Structured Streamingのジョブがログ収集テーブルから増分でレコードを読み取り、ai_mask() を適用したうえで表示用experiment (pii_display) のテーブルに MERGE INTO で書き込みます。

MLflow UIのユーザーは「表示用experiment」を開くため、目にするのは常にマスク済みデータのみです。

ゾーン 役割 アクセス制御
ログ収集 (pii_raw) LLMアプリの書き込み先、PIIを含む生データ サービスプリンシパルのみ
Structured Streaming ai_mask() を適用して表示用に転送 ジョブとして実行
表示用 (pii_display) マスク済みデータ、MLflow UI / BIが参照 全ユーザーに公開

なぜこの構成が優れるのか

pii_streaming_fig2_exposure_window.png

PII露出ウィンドウが消える

前回のETL方式は、表示用 = 元テーブルを定期バッチで ai_mask() 上書きする構成でした。バッチが走るまでの間、新しく書き込まれたトレースのPIIはUI経由で見える状態にあります。

新方式では、UIが参照する表示用テーブルには最初からマスク済みのレコードしか入りません。生データのテーブルは別実体で、UCの権限によりUIから到達できません。

MLflow UIの制約に当たらない

前回検証で発覚した「ai_mask() をカラムマスクに使うとMLflow UIのSQL Exec APIで key not found が発生する」問題は、新方式では起きません。表示用テーブルにはあらかじめマスク済みデータが書かれているため、UIのクエリ時に ai_mask() を実行する必要がないためです。

増分処理と冪等性

Structured Streamingのcheckpointが処理済みオフセットを管理するため、ジョブ再実行時は未処理分だけが処理されます。前回ETL方式の「全件CTASしてTRUNCATE + INSERT」は不要です。

書き込みは MERGE INTO ... WHEN NOT MATCHED THEN INSERT WHEN MATCHED THEN UPDATE で行うため、同一バッチが二重実行されても結果は変わりません。

実装

1. セットアップ

ログ収集用と表示用、2つのexperimentを定義し、それぞれ別の table_prefix でUC連携します。

%pip install --upgrade "mlflow>=3.11"
dbutils.library.restartPython()
import os
import mlflow
from mlflow.entities.trace_location import UnityCatalog

CATALOG = "<catalog>"
SCHEMA  = "<schema>"
SQL_WAREHOUSE_ID = "<warehouse_id>"

# ログ収集用 (非公開)
RAW_PREFIX = "pii_raw"
RAW_EXPERIMENT_NAME = "/Users/<email>/pii_masking_raw"

# 表示用 (公開)
DISPLAY_PREFIX = "pii_display"
DISPLAY_EXPERIMENT_NAME = "/Users/<email>/pii_masking_display"

os.environ["MLFLOW_TRACING_SQL_WAREHOUSE_ID"] = SQL_WAREHOUSE_ID

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")

# ログ収集用 experiment
mlflow.set_experiment(
    experiment_name=RAW_EXPERIMENT_NAME,
    trace_location=UnityCatalog(
        catalog_name=CATALOG,
        schema_name=SCHEMA,
        table_prefix=RAW_PREFIX,
    ),
)

# 表示用 experiment
mlflow.set_experiment(
    experiment_name=DISPLAY_EXPERIMENT_NAME,
    trace_location=UnityCatalog(
        catalog_name=CATALOG,
        schema_name=SCHEMA,
        table_prefix=DISPLAY_PREFIX,
    ),
)

# LLMアプリは raw に書き込む
mlflow.set_experiment(experiment_name=RAW_EXPERIMENT_NAME)

# テーブル名
RAW_SPANS_TABLE           = f"{CATALOG}.{SCHEMA}.{RAW_PREFIX}_otel_spans"
RAW_ANNOTATIONS_TABLE     = f"{CATALOG}.{SCHEMA}.{RAW_PREFIX}_otel_annotations"
DISPLAY_SPANS_TABLE       = f"{CATALOG}.{SCHEMA}.{DISPLAY_PREFIX}_otel_spans"
DISPLAY_ANNOTATIONS_TABLE = f"{CATALOG}.{SCHEMA}.{DISPLAY_PREFIX}_otel_annotations"

# チェックポイント
CHECKPOINT_BASE        = f"/tmp/pii_masking/{RAW_PREFIX}_to_{DISPLAY_PREFIX}"
SPANS_CHECKPOINT       = f"{CHECKPOINT_BASE}/spans"
ANNOTATIONS_CHECKPOINT = f"{CHECKPOINT_BASE}/annotations"

2. PIIを含むトレースの生成 (デモ用)

ログ収集用experimentに対し、PIIを含むクエリでトレースを記録します。

@mlflow.trace(name="customer_query", span_type="AGENT")
def handle_query(query: str) -> str:
    context = retrieve(query)
    return generate(query, context)

@mlflow.trace(name="retrieve", span_type="RETRIEVER")
def retrieve(query: str) -> str:
    return f"顧客DB検索結果: {query}"

@mlflow.trace(name="generate", span_type="LLM")
def generate(query: str, context: str) -> str:
    return f"回答: {query} に対する応答です。"

handle_query("田中太郎 (tanaka.taro@example.com, 090-1234-5678) の注文状況を確認してください")
handle_query("鈴木花子 (suzuki.hanako@example.co.jp, 03-9876-5432) のアカウント情報")
handle_query("佐藤一郎 (sato.ichiro@company.jp, 080-5555-1234) です。カード番号 4111-1111-1111-1111 で決済エラー")

3. 表示用テーブルの準備

表示用テーブルは、ログ収集テーブルと同じスキーマで空テーブルとして作成します。WHERE 1=0 でスキーマだけを継承します。

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {DISPLAY_SPANS_TABLE}
    AS SELECT * FROM {RAW_SPANS_TABLE} WHERE 1=0
""")

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {DISPLAY_ANNOTATIONS_TABLE}
    AS SELECT * FROM {RAW_ANNOTATIONS_TABLE} WHERE 1=0
""")

4. Structured Streamingパイプライン

foreachBatch でバッチごとに ai_mask() を適用し、MERGE INTO で表示用テーブルに反映します。

PII_TYPES = "array('PersonName', 'Email', 'PhoneNumber', 'Address', 'CreditCardNumber')"

def mask_spans_batch(batch_df, batch_id):
    batch_df.createOrReplaceTempView("_pii_batch_spans")
    if spark.sql("SELECT COUNT(*) AS c FROM _pii_batch_spans").first().c == 0:
        return
    spark.sql(f"""
        MERGE INTO {DISPLAY_SPANS_TABLE} AS target
        USING (
            SELECT
                * EXCEPT(attributes),
                COALESCE(
                    try_parse_json(ai_mask(CAST(attributes AS STRING), {PII_TYPES})),
                    attributes
                ) AS attributes
            FROM _pii_batch_spans
        ) AS source
        ON target.trace_id = source.trace_id AND target.span_id = source.span_id
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
    """)
    print(f"  Spans batch {batch_id}: マスク完了")


def mask_annotations_batch(batch_df, batch_id):
    batch_df.createOrReplaceTempView("_pii_batch_annotations")
    if spark.sql("SELECT COUNT(*) AS c FROM _pii_batch_annotations").first().c == 0:
        return
    spark.sql(f"""
        MERGE INTO {DISPLAY_ANNOTATIONS_TABLE} AS target
        USING (
            SELECT
                * EXCEPT(value),
                COALESCE(
                    try_parse_json(ai_mask(CAST(value AS STRING), {PII_TYPES})),
                    value
                ) AS value
            FROM _pii_batch_annotations
        ) AS source
        ON target.annotation_id = source.annotation_id
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
    """)

ポイント: MERGE INTOカラム名マッチなので、* EXCEPT(attributes) で対象カラムを除外して末尾に追加し直しても位置ずれが起きません。INSERT INTO は位置マッチのため同じ書き方ができません。

ストリームを起動します。

# Spans
spans_query = (
    spark.readStream
    .table(RAW_SPANS_TABLE)
    .writeStream
    .foreachBatch(mask_spans_batch)
    .option("checkpointLocation", SPANS_CHECKPOINT)
    .trigger(availableNow=True)  # 未処理分を処理して停止 (スケジュール実行向き)
    .start()
)
spans_query.awaitTermination()

# Annotations
annotations_query = (
    spark.readStream
    .table(RAW_ANNOTATIONS_TABLE)
    .writeStream
    .foreachBatch(mask_annotations_batch)
    .option("checkpointLocation", ANNOTATIONS_CHECKPOINT)
    .trigger(availableNow=True)
    .start()
)
annotations_query.awaitTermination()

5. 動作確認

ログ収集テーブルと表示用テーブル、および mlflow.search_traces() で結果を確認します。

# ログ収集テーブル (生データ)
display(spark.sql(f"""
    SELECT trace_id, name, CAST(attributes AS STRING) AS attributes_str
    FROM {RAW_SPANS_TABLE} WHERE name = 'customer_query' LIMIT 3
"""))

# 表示用テーブル (マスク済み)
display(spark.sql(f"""
    SELECT trace_id, name, CAST(attributes AS STRING) AS attributes_str
    FROM {DISPLAY_SPANS_TABLE} WHERE name = 'customer_query' LIMIT 3
"""))

# 表示用 experiment 経由 (MLflow UI と同じ参照経路)
display_experiment = mlflow.get_experiment_by_name(DISPLAY_EXPERIMENT_NAME)
traces_df = mlflow.search_traces(experiment_ids=[display_experiment.experiment_id])

ログ収集テーブルにはPIIがそのまま含まれているのに対し、表示用テーブルではPII該当箇所が ai_mask() のマスクトークンに置換されています。mlflow.search_traces() で表示用experimentのIDを指定して取得した結果も同様で、MLflow UIで表示用experimentを開いたときと同じ「マスク済みのみ」のビューになります。

6. 増分処理の確認

追加トレースを生成して再度ストリームを起動すると、checkpointにより新規分だけが処理されます。

handle_query("山田花子 (yamada.hanako@test.jp, 070-9999-8888) の請求書を再送してください")

# 同じパイプラインを再実行 — checkpoint により増分のみ処理される
spans_query = (
    spark.readStream
    .table(RAW_SPANS_TABLE)
    .writeStream
    .foreachBatch(mask_spans_batch)
    .option("checkpointLocation", SPANS_CHECKPOINT)
    .trigger(availableNow=True)
    .start()
)
spans_query.awaitTermination()

アクセス制御

ログ収集テーブルを「非公開」、表示用テーブルを「公開」にすることで、UI経由および直接のSQLアクセスのいずれからもPIIを保護します。

-- ログ収集テーブル: マスキングジョブのサービスプリンシパルのみ
REVOKE SELECT ON TABLE <catalog>.<schema>.pii_raw_otel_spans       FROM `account users`;
REVOKE SELECT ON TABLE <catalog>.<schema>.pii_raw_otel_annotations FROM `account users`;
GRANT  SELECT ON TABLE <catalog>.<schema>.pii_raw_otel_spans       TO `pii_masking_sp`;
GRANT  SELECT ON TABLE <catalog>.<schema>.pii_raw_otel_annotations TO `pii_masking_sp`;

-- 表示用テーブル: 全ユーザー
GRANT SELECT ON TABLE <catalog>.<schema>.pii_display_otel_spans       TO `account users`;
GRANT SELECT ON TABLE <catalog>.<schema>.pii_display_otel_annotations TO `account users`;

experiment自体のアクセス権も、pii_masking_raw を管理者限定、pii_masking_display を全ユーザー閲覧可、と分けておきます。

本番運用ガイド

トリガーの選び方

トリガー 動作 用途
trigger(availableNow=True) 未処理分を処理して停止 Databricks Jobsでスケジュール実行 (推奨)
trigger(processingTime="5 minutes") 一定間隔で継続実行 常時稼働クラスタで露出ウィンドウを最小化

スケジュール実行 + 5〜10分間隔で十分なケースが多く、その場合は availableNow=True をJobsから定期起動する構成がコスト効率に優れます。

チェックポイントのリセット

全件を再処理したい場合はcheckpointを削除します。

dbutils.fs.rm(SPANS_CHECKPOINT, recurse=True)
dbutils.fs.rm(ANNOTATIONS_CHECKPOINT, recurse=True)

マスキング失敗時のフォールバック

COALESCE(try_parse_json(ai_mask(...)), 元の値) により、ai_mask() がエラーを返した行は元の値が保持され、レコード自体が落ちることはありません。ただしこの場合PIIが表示用に流れ込む可能性があるため、運用ではフォールバック発生件数をメトリクスとして監視するのが望ましいです。

前回方式との比較

観点 前回 (カラムマスク + ETL 2層) 今回 (テーブル物理分離 + Streaming)
PII露出ウィンドウ バッチ間に発生 原理的に発生しない
ai_mask() の使い方 ETLで元テーブル上書き Streamingで表示用に増分転送
MLflow UI制約 カラムマスクに ai_mask() を使えない 制約なし
全件処理 CTASで毎回全件 checkpointで増分のみ
冪等性 TRUNCATE + INSERTで実質的に確保 MERGE INTO で確保
アクセス制御 UDF内で is_member() 分岐 UCの GRANT / REVOKE で自然に
構成の複雑さ 正規表現 + ETLの2層 ai_mask() Streamingの1経路
ストレージ 元テーブル1セット raw + display の2セット

新方式に固有の追加コストは「テーブルを2セット持つ」点で、表示用テーブルはrawからの派生のためストレージは約2倍になります。これが許容できる規模であれば、ほぼ全方位で新方式が優れます。

環境

  • Databricks Runtime 14.3 LTS 以上
  • MLflow 3.11 以上 (UC統合トレース)
  • Unity Catalog 有効
  • Foundation Model API 有効 (ai_mask() に必要)
  • Databricks Jobs (スケジュール実行用)

参考

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?