2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Microsoft Fabric で Content Understanding を使用して pdf を構造化する

Last updated at Posted at 2025-12-04

はじめに

Content Understanding が GA したので、Azure-Samples をアレンジし、Microsoft Fabric 上で動かしてみます。

参考リポジトリ:
https://github.com/Azure-Samples/azure-ai-content-understanding-python


使用データ

警視庁公開資料「自転車の正しい乗り方」を例に使用します。
https://www.keishicho.metro.tokyo.lg.jp/kotsu/jikoboshi/bicycle/menu/leaflet.html


アナライザーの作成

Content Understanding Studio でカスタム アナライザーを作成する
を参考に進めます。

PDF を Content Understanding Studio 上にアップロードし、スキーマを定義します。Studio では内容に応じてスキーマ候補を自動提案してくれます。

image.png

また、フィールドの抽出方法として「Extract」「Generate」などの Method を設定できます。
※ markdown は既定で取得できるため、ここでは例として表示しています。

image.png

テストを実行して結果を確認します。
ベストプラクティス では、スキーマ説明を丁寧に記述することで信頼度を高められるとありますが、本記事では簡略化します。

image.png

アナライザーを保存すると、名前がそのまま ID として発行されます。
この ID をアプリケーション側で利用します。

image.png


Microsoft Fabric での利用

レイクハウス(Lakehouse)に対象の PDF をアップロードします。

image.png

Azure-Samples 公式のクライアントモジュールをダウンロードします。

image.png

ノートブックのリソースにアップロードし、セルにドロップすると自動的に import 用コードが生成されます。

image.png

Fabric Notebook では DefaultAzureCredential がうまく動作しないため、本記事では API キーを利用します。
実際の運用では Key Vault から取得 してください。

pyspark

endpoint = "Foundry ポータルで確認できる https://<リソース名>.cognitiveservices.azure.com/"
api_version = "2025-11-01"  # GA バージョン
subscription_key = "<APIキー>"

クライアント設定(Azure-Samples をベースにアレンジ):

pyspark
import logging
import json
import os
import sys
import uuid
from pathlib import Path
# from dotenv import find_dotenv, load_dotenv
from azure.identity import DefaultAzureCredential, get_bearer_token_provider

# load_dotenv(find_dotenv())
logging.basicConfig(level=logging.INFO)

# For authentication, you can use either token-based auth or subscription key; only one is required
AZURE_AI_ENDPOINT = endpoint
# IMPORTANT: Replace with your actual subscription key or set it in your ".env" file if not using token authentication
AZURE_AI_API_KEY = subscription_key
API_VERSION = api_version


# Create the Content Understanding client
try:
    client = content_understanding_client.AzureContentUnderstandingClient(
        endpoint=AZURE_AI_ENDPOINT,
        api_version=API_VERSION,
        subscription_key=AZURE_AI_API_KEY,
        token_provider=token_provider if not AZURE_AI_API_KEY else None,
        x_ms_useragent="azure-ai-content-understanding-python-sample-ga"    # The user agent is used for tracking sample usage and does not provide identity information. You can change this if you want to opt out of tracking.
    )
    credential_type = "Subscription Key" if AZURE_AI_API_KEY else "Azure AD Token"
    print(f"✅ Client created successfully")
    print(f"   Endpoint: {AZURE_AI_ENDPOINT}")
    print(f"   Credential: {credential_type}")
    print(f"   API Version: {API_VERSION}")
except Exception as e:
    credential_type = "Subscription Key" if AZURE_AI_API_KEY else "Azure AD Token"
    print(f"❌ Failed to create client")
    print(f"   Endpoint: {AZURE_AI_ENDPOINT}")
    print(f"   Credential: {credential_type}")
    print(f"   Error: {e}")
    raise


次は必須ではありませんが、サンプルに倣い、GA で documentSearch などの RAG 用ビルトインアナライザーが追加されたことに伴ってか、 gpt-4.1,gpt-4.1-mini,text-embedding-3-largeが必要になるので、そのバリデーションをします。

pyspark

# Get model deployment names from environment variables
GPT_4_1_DEPLOYMENT = "gpt-4.1のデプロイ名"
GPT_4_1_MINI_DEPLOYMENT ="gpt-4.1-miniのデプロイ名"
TEXT_EMBEDDING_3_LARGE_DEPLOYMENT = "text-embedding-3-largeのデプロイ名"

# Check if required deployments are configured
missing_deployments = []
if not GPT_4_1_DEPLOYMENT:
    missing_deployments.append("GPT_4_1_DEPLOYMENT")
if not GPT_4_1_MINI_DEPLOYMENT:
    missing_deployments.append("GPT_4_1_MINI_DEPLOYMENT")
if not TEXT_EMBEDDING_3_LARGE_DEPLOYMENT:
    missing_deployments.append("TEXT_EMBEDDING_3_LARGE_DEPLOYMENT")

if missing_deployments:
    print(f"⚠️  Warning: Missing required model deployment configuration(s):")
    for deployment in missing_deployments:
        print(f"   - {deployment}")
    print("\n   Prebuilt analyzers require GPT-4.1, GPT-4.1-mini, and text-embedding-3-large deployments.")
    print("   Please:")
    print("   1. Deploy all three models in Azure AI Foundry")
    print("   2. Add the following to notebooks/.env:")
    print("      GPT_4_1_DEPLOYMENT=<your-gpt-4.1-deployment-name>")
    print("      GPT_4_1_MINI_DEPLOYMENT=<your-gpt-4.1-mini-deployment-name>")
    print("      TEXT_EMBEDDING_3_LARGE_DEPLOYMENT=<your-text-embedding-3-large-deployment-name>")
    print("   3. Restart the kernel and run this cell again")
else:
    print(f"📋 Configuring default model deployments...")
    print(f"   GPT-4.1 deployment: {GPT_4_1_DEPLOYMENT}")
    print(f"   GPT-4.1-mini deployment: {GPT_4_1_MINI_DEPLOYMENT}")
    print(f"   text-embedding-3-large deployment: {TEXT_EMBEDDING_3_LARGE_DEPLOYMENT}")
    
    try:
        # Update defaults to map model names to your deployments
        result = client.update_defaults({
            "gpt-4.1": GPT_4_1_DEPLOYMENT,
            "gpt-4.1-mini": GPT_4_1_MINI_DEPLOYMENT,
            "text-embedding-3-large": TEXT_EMBEDDING_3_LARGE_DEPLOYMENT
        })
        
        print(f"✅ Default model deployments configured successfully")
        print(f"   Model mappings:")
        for model, deployment in result.get("modelDeployments", {}).items():
            print(f"     {model}{deployment}")
    except Exception as e:
        print(f"❌ Failed to configure defaults: {e}")
        print(f"   This may happen if:")
        print(f"   - One or more deployment names don't exist in your Azure AI Foundry project")
        print(f"   - You don't have permission to update defaults")
        raise


Azure Samples にはない部分です。

レイクハウスのディレクトリを走査し、Content Understanding に投げ、結果を DataFrame 化する関数を作成します。

pyspark
import os
import pandas as pd
import json

def create_cu_dataframe(base_dir, client, analyzer_id, extensions=None):
    search_dir = "file:" + base_dir

    # extensions 正規化
    if extensions is None:
        exts = None
    else:
        if isinstance(extensions, str):
            exts = [extensions.lower()]
        else:
            exts = [ext.lower() for ext in extensions]

    rows = []

    # notebookutils でファイル一覧取得
    for file in notebookutils.fs.ls(search_dir):
        name = file.name.lower()

        # 拡張子フィルタ
        if exts is not None and not any(name.endswith(ext) for ext in exts):
            continue

        fullpath = os.path.join(base_dir, file.name)

        # 🔹 メタデータ取得 (notebookutils.fs.ls の FileInfo)
        file_size = file.size
        # mod_time  = file.modificationTime   # epoch(ms)

        # 🔹 Content Understanding 呼び出し
        try:
            response = client.begin_analyze_binary(analyzer_id, file_location=fullpath)
            result = client.poll_result(response)
            out = json.dumps(result, ensure_ascii=False)
        except Exception as e:
            out = json.dumps(
                {"error": str(e), "file_path": fullpath},
                ensure_ascii=False
            )

        rows.append({
            "audit__filepath": fullpath,
            "audit__filename": file.name,
            "audit__filesize_bytes": file_size,
            # "audit__modified_time_ms": mod_time,
            "ai_json": out
        })
    if len(rows) == 0:
        raise ValueError("No matching files found. Check extension or directory path.")

    return spark.createDataFrame(pd.DataFrame(rows))



ファイルのある場所を関数に渡して、処理を開始します。
オブジェクトストレージの URL を対象にできない、ファイルシステム API を利用してファイルアクセスする ような、python 処理の場合には、ファイルAPIパスでパスを取得します。

参考:https://learn.microsoft.com/ja-jp/fabric/data-engineering/lakehouse-notebook-load-data

image.png

pyspark
import datetime
import pyspark.sql.functions as F 
df = create_cu_dataframe(
    "/lakehouse/default/Files/landing/bicycle",
    client,
    "bicycle",
    extensions=["pdf"]
)

実行結果はこのようになります。Variant 型がないので、応答結果の json は文字列で格納します。

image.png

get_json_object を使用して、任意のフィールドの値を項目として追加できます。

例として、PenaltyDrunkDrivingLesserYears(酒気帯び運転の罰則年数)を取り出します。Content Analyzers - Get Resultを参考にjson のスキーマをたどって取得します。

pyspark
# spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

def sort_audit_columns(df):
    # カラム並び替え
    audit_columns = [
        "audit__run_date", 
        "audit__run_at",
        "audit__run_at_id",
        "audit__filepath",
        "audit__filename",
        "audit__filesize_bytes",
    ]
    existing_audit = [c for c in audit_columns if c in df.columns]
    others = [c for c in df.columns if c not in existing_audit]
    df = df.select(*existing_audit, *others)
    return df

df = df.withColumn("audit__run_at", F.current_timestamp()) \
       .withColumn("audit__run_at_id", 
                   F.date_format(F.col("audit__run_at"), "yyyyMMddHHmmss")) \
       .withColumn("audit__run_date",
                   F.to_date(F.col("audit__run_at")))

df = sort_audit_columns(df)

landing_cu_path = "bicycle_cu"
df.coalesce(1)\
    .write\
    .format("parquet")\
    .partitionBy("audit__run_date","audit__run_at_id")\
    .mode("overwrite")\
    .save(f"Files/landing/{landing_cu_path}")

df.write.mode("overwrite").saveAsTable("input_bicycle_knowledge")

output_df = df.withColumn("markdown", F.get_json_object("ai_json","$.result.contents[0].markdown"))\
                .withColumn("PenaltyDrunkDrivingLesserYears", F.get_json_object("ai_json","$.result.contents[0].fields.PenaltyDrunkDrivingLesserYears.valueInteger"))\
                .drop("ai_json")

output_df.write.saveAsTable("output_bicycle_knowledge")

※私なりのお作法をいれていますが、これはご自由にどうぞ。

テーブルでの表示はこのようになります。
image.png

以上、参考になれば幸いです。

参考

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?