こちらと、
こちらの組み合わせです。
はじめに
企業のデータの大部分は非構造化形式(PDF、画像、スキャン文書など)に保存されており、分析やAI活用が困難です。Databricksは、この課題を解決する2つの強力な機能を提供しています。ai_parse_document関数と 構造化出力(Structured Outputs) を組み合わせることで、非構造化データから高品質な構造化データへの変換を自動化できます。
本記事では、これら2つの機能の特徴と、組み合わせて使用することで得られる相乗効果について詳しく解説します。
各機能の概要
ai_parse_document関数とは
ai_parse_document関数は、Databricks基盤モデルAPIを通じて最先端AIモデルを呼び出し、非構造化ドキュメントから構造化コンテンツを抽出する組み込み関数です。
主な特徴:
- PDF、画像、スキャン文書などの複数のファイル形式をサポート
- ページ番号、ヘッダー、フッターなどのレイアウトメタデータを抽出
- テキスト段落や表をMarkdown形式で表現
- Unity Catalog内でバイナリデータとして保存されたファイルを直接処理
- Databricks SQL、ノートブック、ワークフロー、Lakeflow宣言型パイプラインから使用可能
基本的な使用例:
SELECT
path,
ai_parse_document(content) as parsed
FROM READ_FILES('/Volumes/path/to/documents', format => 'binaryFile');
構造化出力(Structured Outputs)とは
構造化出力は、入力データからJSON形式の構造化データを生成する機能です。事前に定義したJSONスキーマに準拠した出力を生成することで、生成AIの出力を予測可能かつ処理しやすい形式にします。
主な特徴:
- 特定のJSONスキーマに準拠した出力を保証
- OpenAI互換のAPIを使用してアクセス可能
- 基盤モデルAPIとプロビジョニングスループットエンドポイントで利用可能
- バッチ推論タスクに最適
推奨される使用シーン:
- 大量のドキュメントからのデータ抽出
- 指定された形式が必要なバッチ推論タスク
- 非構造化データを構造化データに変換するデータ処理
基本的な使用例:
from openai import OpenAI
response_format = {
"type": "json_schema",
"json_schema": {
"name": "research_paper_extraction",
"schema": {
"type": "object",
"properties": {
"title": {"type": "string"},
"authors": {
"type": "array",
"items": {"type": "string"}
},
"abstract": {"type": "string"},
"keywords": {
"type": "array",
"items": {"type": "string"}
}
}
},
"strict": True
}
}
response = client.chat.completions.create(
model="databricks-gpt-oss-20b",
messages=messages,
response_format=response_format
)
組み合わせることで得られる5つのメリット
1. エンドツーエンドの自動化された文書処理パイプライン
ai_parse_document関数で非構造化ドキュメントからテキストを抽出し、構造化出力でそのテキストを定義されたスキーマに従って構造化します。この2段階のアプローチにより、PDFから最終的なデータベースレコードまでの完全な自動化が可能になります。
実装例:
from pyspark.sql.functions import *
from openai import OpenAI
# ステップ1: ai_parse_documentでPDFを解析
df_parsed = spark.read.format("binaryFile") \
.load("/Volumes/main/default/invoices") \
.withColumn("parsed", ai_parse_document("content")) \
.withColumn("parsed_text", col("parsed.document.pages[0].content"))
# ステップ2: 構造化出力で特定のスキーマに変換
def extract_structured_invoice(text):
client = OpenAI(
api_key=os.environ.get('DATABRICKS_TOKEN'),
base_url=os.environ.get('DATABRICKS_BASE_URL')
)
response_format = {
"type": "json_schema",
"json_schema": {
"name": "invoice_extraction",
"schema": {
"type": "object",
"properties": {
"invoice_number": {"type": "string"},
"date": {"type": "string"},
"vendor": {"type": "string"},
"total_amount": {"type": "number"},
"items": {
"type": "array",
"items": {
"type": "object",
"properties": {
"description": {"type": "string"},
"quantity": {"type": "integer"},
"unit_price": {"type": "number"}
}
}
}
}
},
"strict": True
}
}
response = client.chat.completions.create(
model="databricks-gpt-oss-20b",
messages=[{
"role": "user",
"content": f"Extract invoice information from: {text}"
}],
response_format=response_format
)
return response.choices[0].message.content
# 結果をテーブルに保存
df_structured = df_parsed.withColumn(
"invoice_data",
udf(extract_structured_invoice)("parsed_text")
)
2. データ品質の大幅な向上
ai_parse_document
は文書構造を保持しながらテキストを抽出し、構造化出力は厳格なスキーマ検証を提供します。この組み合わせにより、以下のような品質保証が可能です。
- レイアウト情報の保持: ページ番号、ヘッダー、フッター、表構造が維持されます
- スキーマ準拠の保証: 出力が必ず定義されたJSON構造に従います
-
エラー処理の一元化:
ai_parse_document
のエラー情報と構造化出力の検証を組み合わせた堅牢なパイプライン
3. 複雑な文書タイプへの対応
2つの機能を組み合わせることで、以下のような複雑な文書処理が可能になります。
請求書処理の例:
-
ai_parse_document
で表形式の明細を抽出 - 構造化出力で各明細項目を配列型のJSONとして構造化
- 請求書番号、日付、金額などのメタデータも同時に抽出
契約書処理の例:
-
ai_parse_document
で複数ページにわたる条項を抽出 - 構造化出力で当事者情報、契約期間、金額条件などを特定のスキーマに分類
研究論文処理の例:
-
ai_parse_document
で論文の構造(タイトル、要約、本文、参考文献)を識別 - 構造化出力で著者情報、キーワード、引用データなどを標準化されたフォーマットで抽出
4. スケーラビリティと生産性の向上
両方の機能がDatabricksプラットフォームにネイティブに統合されているため、以下のメリットがあります。
大規模バッチ処理:
- Unity Catalogに保存された数千のドキュメントを並列処理
- Sparkの分散処理能力を活用した高速な文書処理
- Lakeflow宣言型パイプラインによる定期的な自動処理
運用の簡素化:
- 外部APIやサードパーティツールへの依存を排除
- Databricksセキュリティ境界内でデータ処理が完結
- 統一されたモニタリングとクエリプロファイル機能
コスト効率:
- 従量課金制のバッチ推論により無駄なコストを削減
- 最適化されたモデル選択により効率的な処理を実現
5. Agent Bricksとの連携による高度なワークフロー
Databricks Agent Bricksの情報抽出機能と組み合わせることで、さらに高度な処理が可能です。
推奨ワークフロー:
-
初期変換:
ai_parse_document
でPDFをMarkdownに変換 - スキーマ定義: 構造化出力で抽出したいデータのスキーマを定義
- Agent Bricks活用: 少量のラベル付きデータで最適なモデルを自動選択
- 本番デプロイ: Managed LangChainエンドポイントとして展開
この組み合わせにより、MLOpsのベストプラクティスに沿った本番運用レベルの文書処理システムを迅速に構築できます。
実践的なユースケース
ユースケース1: 財務文書の自動処理
課題: 経理部門に毎月数百枚の請求書が届き、手作業でのデータ入力に多大な時間がかかる
ソリューション:
# 1. ai_parse_documentで請求書PDFを解析
invoice_df = spark.read.format("binaryFile") \
.load("/Volumes/finance/invoices") \
.withColumn("parsed", ai_parse_document("content"))
# 2. 構造化出力で財務システム用のJSONを生成
structured_schema = {
"type": "json_schema",
"json_schema": {
"name": "invoice_data",
"schema": {
"type": "object",
"properties": {
"vendor_id": {"type": "string"},
"invoice_number": {"type": "string"},
"invoice_date": {"type": "string"},
"due_date": {"type": "string"},
"subtotal": {"type": "number"},
"tax": {"type": "number"},
"total": {"type": "number"},
"line_items": {
"type": "array",
"items": {
"type": "object",
"properties": {
"item_code": {"type": "string"},
"description": {"type": "string"},
"quantity": {"type": "number"},
"unit_price": {"type": "number"},
"amount": {"type": "number"}
}
}
}
}
},
"strict": True
}
}
# 3. DLTパイプラインとしてデプロイ
# 新しい請求書が到着すると自動的に処理され、財務システムに統合
ユースケース2: 契約書レビューの自動化
課題: 法務部門が数百ページの契約書から重要な条項を抽出し、リスク評価を行う必要がある
ソリューション:
# 契約書から重要情報を構造化して抽出
contract_schema = {
"type": "json_schema",
"json_schema": {
"name": "contract_analysis",
"schema": {
"type": "object",
"properties": {
"parties": {
"type": "array",
"items": {"type": "string"}
},
"effective_date": {"type": "string"},
"termination_date": {"type": "string"},
"payment_terms": {"type": "string"},
"liability_clauses": {
"type": "array",
"items": {"type": "string"}
},
"risk_factors": {
"type": "array",
"items": {
"type": "object",
"properties": {
"risk_type": {"type": "string"},
"severity": {"type": "string"},
"description": {"type": "string"}
}
}
}
}
},
"strict": True
}
}
ユースケース3: 顧客フィードバック分析
課題: 様々な形式(PDF、画像、スキャン文書)で届く顧客アンケートを統一的に分析したい
ソリューション:
# 顧客フィードバックの構造化
feedback_schema = {
"type": "json_schema",
"json_schema": {
"name": "customer_feedback",
"schema": {
"type": "object",
"properties": {
"customer_id": {"type": "string"},
"feedback_date": {"type": "string"},
"satisfaction_score": {"type": "integer"},
"product_ratings": {
"type": "object",
"properties": {
"quality": {"type": "integer"},
"service": {"type": "integer"},
"value": {"type": "integer"}
}
},
"comments": {"type": "string"},
"sentiment": {"type": "string"},
"action_items": {
"type": "array",
"items": {"type": "string"}
}
}
},
"strict": True
}
}
# BIツールでの可視化やトレンド分析が容易に
具体例: 設備保守
これをai_parse_document
で読み込みます。
SELECT
path,
ai_parse_document(content, map('version', '2.0')) as parsed
FROM READ_FILES('/Volumes/takaakiyayoi_catalog/uploaded_docs/image.png', format => 'binaryFile')
以下のようにテキストを取得できます。
path | parsed |
---|---|
dbfs:/Volumes/takaakiyayoi_catalog/uploaded_docs/image.png | {"document":{"elements":[{"bbox":[{"coord":[60,69,332,113],"page_id":0}],"content":"装置:RX-200","description":null,"id":0,"type":"title"},{"bbox":[{"coord":[66,137,441,209],"page_id":0}],"content":"日時:2024年4月25日10:15","description":null,"id":1,"type":"title"},{"bbox":[{"coord":[60,228,423,309],"page_id":0}],"content":"事象:動作中に振動・発音が発生","description":null,"id":2,"type":"title"},{"bbox":[{"coord":[60,323,451,482],"page_id":0}],"content":"推奨対策:モーター軸の回転位置調整およびベアリングの点検・交換を実施","description":null,"id":3,"type":"title"}],"pages":[{"id":0,"image_uri":null}]},"error_status":null,"metadata":{"id":"c741ca94-8d58-4919-8bb1-acd28139e02e","version":"2.0"}} |
しかしこの時点ではcontent
のキーのみで完全に構造化されているとは言えません。そこで構造化出力の出番です。
以下のようなJSONスキーマを定義します。より詳細な仕様はこちらに。
{"type": "json_schema",
"json_schema": {
"name": "maintenance_memo_extraction",
"schema": {
"type": "object",
"properties": {
"equipment": {
"type": "string",
"description": "装置"
},
"date": {
"type": "string",
"description": "日時"
},
"phenomenon": {
"type": "string",
"description": "事象"
},
"countermeasure": {
"type": "string",
"description": "対策"
}
},
"required": ["equipment", "date", "phenomenon", "countermeasure"]
}
}}}
ai_query
関数で上のスキーマを指定します。
WITH parsed_doc AS (
SELECT
path,
ai_parse_document(content, map('version', '2.0')) as parsed_content
FROM
READ_FILES(
'/Volumes/takaakiyayoi_catalog/uploaded_docs/image.png',
format => 'binaryFile'
)
)
SELECT
parsed_content,
ai_query(
'databricks-claude-sonnet-4-5',
CONCAT(
'次のメンテナンス記録から情報を抽出してください。画像内のラベルと値の位置関係を考慮して、正確に抽出してください。各フィールドのラベルの近くにある値を抽出してください:',
CAST(parsed_content AS STRING)
),
responseFormat =>
'{"type": "json_schema",
"json_schema": {
"name": "maintenance_memo_extraction",
"schema": {
"type": "object",
"properties": {
"equipment": {
"type": "string",
"description": "装置"
},
"date": {
"type": "string",
"description": "日時"
},
"phenomenon": {
"type": "string",
"description": "事象"
},
"countermeasure": {
"type": "string",
"description": "対策"
}
},
"required": ["equipment", "date", "phenomenon", "countermeasure"]
}
}}}'
) as structured_data
FROM
parsed_doc
structured_data
を見ると、適切なキーと値がマッピングされたJSONが得られています。
parsed_content | structured_data |
---|---|
{"document":{"elements":[{"bbox":[{"coord":[60,69,332,113],"page_id":0}],"content":"装置:RX-200","description":null,"id":0,"type":"title"},{"bbox":[{"coord":[66,137,441,209],"page_id":0}],"content":"日時:2024年4月25日10:15","description":null,"id":1,"type":"title"},{"bbox":[{"coord":[60,228,423,309],"page_id":0}],"content":"事象:動作中に振動・発音が発生","description":null,"id":2,"type":"title"},{"bbox":[{"coord":[60,323,451,482],"page_id":0}],"content":"推奨対策:モーター軸の過負荷位置調整およびベアリングの点検・交換を実施","description":null,"id":3,"type":"title"}],"pages":[{"id":0,"image_uri":null}]},"error_status":null,"metadata":{"id":"1e854dce-0f33-48ab-9ed0-29d748d96ce1","version":"2.0"}} | {"equipment":"RX-200","date":"2024年4月25日10:15","phenomenon":"動作中に振動・発音が発生","countermeasure":"モーター軸の過負荷位置調整およびベアリングの点検・交換を実施"} |
さらに以下のSQLを追記して実行します。
SELECT
parsed_content,
sd.equipment,
sd.date,
sd.phenomenon,
sd.countermeasure
FROM (
SELECT
parsed_content,
from_json(structured_data,
'struct<equipment:string,date:string,phenomenon:string,countermeasure:string>') AS sd
FROM structured_json
) t;
parsed_content | equipment | date | phenomenon | countermeasure |
---|---|---|---|---|
{"document":{"elements":[{"bbox":[{"coord":[60,69,332,113],"page_id":0}],"content":"装置:RX-200","description":null,"id":0,"type":"title"},{"bbox":[{"coord":[66,137,441,209],"page_id":0}],"content":"日時:2024年4月25日10:15","description":null,"id":1,"type":"title"},{"bbox":[{"coord":[60,228,423,309],"page_id":0}],"content":"事象:動作中に振動・発音が発生","description":null,"id":2,"type":"title"},{"bbox":[{"coord":[60,323,451,482],"page_id":0}],"content":"推奨対策:モーター軸の回転位置調整およびベアリングの点検・交換を実施","description":null,"id":3,"type":"title"}],"pages":[{"id":0,"image_uri":null}]},"error_status":null,"metadata":{"id":"6bba93a5-a177-4841-bcf9-3f3dc37b9def","version":"2.0"}} | RX-200 | 2024年4月25日10:15 | 動作中に振動・発音が発生 | モーター軸の回転位置調整およびベアリングの点検・交換を実施 |
こうすることで、非構造化の文書や画像から完全に構造化されたデータを抽出でき、これらをテーブルに書き込んで再利用、分析することが可能となります。
ベストプラクティス
1. スキーマ設計
構造化出力で使用するJSONスキーマは、シンプルかつフラットな構造を心がけましょう。
推奨:
- キーの数は64個以下に抑える
- 深いネストは避け、可能な限りフラット化
- 必須フィールドと任意フィールドを明確に区別
2. エラーハンドリング
ai_parse_documentはerror_status
とcorrupted_data
フィールドを提供します。これらを活用して堅牢なパイプラインを構築しましょう。
df_with_errors = df_parsed.selectExpr(
"path",
"parsed.document",
"parsed.error_status",
"parsed.corrupted_data"
).filter("size(error_status) > 0 OR size(corrupted_data) > 0")
# エラーがあるドキュメントを別テーブルに保存して手動レビュー
3. パフォーマンス最適化
- バッチサイズの最適化: 大量のドキュメントは適切なバッチに分割
-
キャッシングの活用: 繰り返し処理する場合は
ai_parse_document
の結果をキャッシュ - 並列処理: Sparkの並列性を最大限に活用
4. バージョン管理
ai_parse_document
の出力スキーマはバージョン管理されています。下位互換性を考慮した実装を心がけましょう。
# スキーマバージョンを確認
df_with_version = df_parsed.selectExpr(
"path",
"parsed.metadata.version as schema_version",
"parsed.document"
)
技術的な考慮事項
制限事項
ai_parse_document:
- 日本語や韓国語などの非ラテン文字では最適なパフォーマンスが得られない場合がありますが、今時点で私が触っている限りでは不満はありません。
- 高密度コンテンツやスキャン品質の低い文書では処理時間が長くなる可能性があります。
- Databricks Runtime 16.4 LTS以降が必要です。
構造化出力:
- JSONスキーマでのキー数は最大64個
- 正規表現(
pattern
)や複雑なスキーマ構成(anyOf
、oneOf
など)は未サポート - 配列やオブジェクトのサイズ制約は強制されない
セキュリティとプライバシー
- すべてのデータ処理はDatabricksセキュリティ境界内で実行
-
ai_parse_document
呼び出しのパラメータは保存されない(メタデータのみ保持) - Unity Catalogの権限管理により適切なアクセス制御が可能
まとめ
Databricksのai_parse_document関数と構造化出力を組み合わせることで、非構造化ドキュメントから高品質な構造化データへの変換を完全に自動化できます。
主な利点:
- エンドツーエンドの自動化されたドキュメント処理パイプライン
- スキーマ準拠とレイアウト情報保持による高いデータ品質
- 複雑な文書タイプへの柔軟な対応
- Sparkの分散処理によるスケーラビリティ
- Agent Bricksとの連携による高度なMLOpsワークフロー
これらの技術を活用することで、請求書処理、契約書分析、顧客フィードバック収集など、様々なビジネスプロセスを効率化し、データドリブンな意思決定を加速できます。
Databricksデータインテリジェンスプラットフォームの強力な機能を最大限に活用して、組織の非構造化データから価値を引き出しましょう。