導入
Doclingが凄いというのを見まして。
DoclingはPDFやOffice系のファイルなどを解析し、Markdown等の形式でテキスト情報や画像データなどをエクスポートするパッケージです。
上記のドキュメントより、説明部を抜粋。
Doclingは、ドキュメントを解析し、望む形式に簡単かつ迅速にエクスポートします。
特徴
- 🗂️ 人気のあるドキュメント形式(PDF、DOCX、PPTX、画像、HTML、AsciiDoc、Markdown)を読み取り、MarkdownおよびJSONにエクスポート
- 📑 PDFドキュメントの高度な理解を含むページレイアウト、読み取り順序、およびテーブル構造
- 🧩 統一された、表現力豊かなDoclingDocument表現形式
- 📝 タイトル、著者、参照、および言語を含むメタデータの抽出
- 🤖 強力なRAG / QAアプリケーションのためのLlamaIndex 🦙 & LangChain 🦜🔗とのシームレスな統合
- 🔍 スキャンされたPDFのOCRサポート
- 💻 シンプルで便利なCLI
以前、DatabricksでRAG用のドキュメントロード処理を簡易実装する記事を書きましたが、Doclingを使うと手軽にチャンク作成できるんじゃないかと思い、Doclingを試してみた記録です。
基本的にな流れはこの記事と同じで、ファイルの取込とテキストデータをDelta Tableに保管するところまでを行ってみます。
試験環境としてはDatabricks on AWS、DBR 15.4MLのシングルユーザクラスタを使いました。
サーバレスクラスタでも試してみたのですが、UDFで処理をラップした際に/.cache
にファイルを書き込もうとしてエラーが出たため、今回はサーバレスクラスタ不使用としています。
Step1. パッケージインストール
Doclingをインストールします。
%pip install docling
dbutils.library.restartPython()
Step2. 各種パラメータの設定
サンプルとして取り込むファイルの場所や、保管先のカタログ/スキーマ、テーブル名などをDatabricksのノートブックウィジットとして設定します。
場所は適当なので、マネする場合は適宜変更してください。
import os.path
dbutils.widgets.text("catalog", "training")
catalog = dbutils.widgets.get("catalog")
dbutils.widgets.text("schema", "llm")
schema = dbutils.widgets.get("schema")
dbutils.widgets.text("raw_table", "sample_documents_raw")
raw_table = dbutils.widgets.get("raw_table")
dbutils.widgets.text("text_table", "sample_documents_text")
text_table = dbutils.widgets.get("text_table")
dbutils.widgets.text("raw_data_path", "/Volumes/training/llm/raws/samples")
raw_data_path = dbutils.widgets.get("raw_data_path")
dbutils.widgets.text(
"checkpoint_path", "/Volumes/training/llm/delta/samples/checkpoint"
)
checkpoint_path = dbutils.widgets.get("checkpoint_path")
Step3. ドキュメントファイルの取込
Unity CatalogのVolumes内に置いてあるファイルをテーブルに取り込みます。
使うファイルは以前の記事のものを再び利用します。
行っていることは上記記事と全く同じで、ファイルをバイナリデータとしてDeltaTableにAutoLoaderを使って取り込んでいます。
(
# Databricks Autoloaderを使って増分ファイル取込
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobFilter", "{*.pdf,*.docx,*.pptx}")
.option("recursiveFileLookup", "true")
.load(raw_data_path)
.writeStream.option("checkpointLocation", os.path.join(checkpoint_path, raw_table))
.trigger(availableNow=True)
.toTable(f"{catalog}.{schema}.{raw_table}")
.awaitTermination()
)
Step4. Doclingで読込
では、Doclingを使ってStep3で取り込んだバイナリデータからテキスト抽出してみます。
まずはそのためのSpark UDFを定義。
今回は、①Markdown形式でテキスト取得するUDF、②JSON形式でテキスト取得するUDF、③構造に合わせたチャンク化した結果を取得するUDFの3種定義しました。
from pyspark.sql.functions import pandas_udf
import pyspark.sql.functions as F
import io
import pandas as pd
@pandas_udf("string")
def parse_binary_to_text(paths: pd.Series, contents: pd.Series) -> pd.Series:
from io import BytesIO
import uuid
from docling.datamodel.base_models import DocumentStream
from docling.document_converter import DocumentConverter
sources = [
DocumentStream(name=p, stream=BytesIO(c)) for p, c in zip(paths, contents)
]
try:
converter = DocumentConverter()
results = converter.convert_all(sources, raises_on_error=False)
return pd.Series([r.document.export_to_markdown() for r in results])
except Exception as e:
# 例外時はひとまずエラーメッセージを返す。ちゃんと実装することをお薦め。
return pd.Series([str(e)] * len(paths))
@pandas_udf("string")
def parse_binary_to_json(paths: pd.Series, contents: pd.Series) -> pd.Series:
from io import BytesIO
import json
from docling.datamodel.base_models import DocumentStream
from docling.document_converter import DocumentConverter
sources = [
DocumentStream(name=p, stream=BytesIO(c)) for p, c in zip(paths, contents)
]
try:
converter = DocumentConverter()
results = converter.convert_all(sources, raises_on_error=False)
return pd.Series([json.dumps(r.document.export_to_dict()) for r in results])
except Exception as e:
# 例外時はひとまずエラーメッセージを返す。ちゃんと実装することをお薦め。
return pd.Series([str(e)] * len(paths))
@pandas_udf("array<string>")
def parse_binary_to_chunks(paths: pd.Series, contents: pd.Series) -> pd.Series:
from io import BytesIO
import json
from docling.datamodel.base_models import DocumentStream
from docling.document_converter import DocumentConverter
from docling_core.transforms.chunker import HierarchicalChunker
sources = [
DocumentStream(name=p, stream=BytesIO(c)) for p, c in zip(paths, contents)
]
try:
converter = DocumentConverter()
results = converter.convert_all(sources, raises_on_error=False)
chunks_list = [list(HierarchicalChunker().chunk(r.document)) for r in results]
chunks = [ [json.dumps(c.export_json_dict()) for c in chunks] for chunks in chunks_list]
return pd.Series(chunks)
except Exception as e:
# 例外時はひとまずエラーメッセージを返す。ちゃんと実装することをお薦め。
return pd.Series([str(e)] * len(paths))
では、まずは各ファイルよりMarkdown形式でテキストデータを抽出してみます。
Step3で読み込んだデータ全てに対して抽出処理を実行して表示。
(
spark.table(f"{catalog}.{schema}.{raw_table}")
.withColumn("text", parse_binary_to_text("path", "content"))
.select("path", "text")
.display()
)
結果は以下のようになりました。
結果がマークダウン形式のテキストとして取り込めています。
また、Wordファイルに含まれる表もMarkdownの表形式として抽出されていました。
次にJSON形式のデータとして抽出してみます。
(
spark.table(f"{catalog}.{schema}.{raw_table}")
.withColumn("json", parse_binary_to_json("path", "content"))
.withColumn("json", F.try_parse_json("json"))
.select("path", "json")
.display()
)
結果は以下のようになりました。
こちらはメタデータ含めてテキスト情報を取得できます。
かなり細かい情報まで含まれるようなので、適宜必要な情報に絞って利用すると便利そう。
Databricks(DeltaLake)はVARIANT型に対応しているので、こういったJSONデータもパース出来て便利ですね。
最後に、Doclingの機能を使ってチャンク化したデータの抽出も試してみます。
(
spark.table(f"{catalog}.{schema}.{raw_table}")
.withColumn("chunk", parse_binary_to_chunks("path", "content"))
.withColumn("chunk", F.explode("chunk"))
.withColumn("chunk", F.try_parse_json("chunk"))
.select("path", "chunk")
.display()
)
結果は以下のようになりました。
構造に合わせてチャンク化してくれるようで、意味を持った塊で自動的にチャンクを作ってくれるのは便利そうです。
少し細かめにチャンキングする傾向にあるかな、という印象。
また、チャンク化したデータを保存する、という処理を作る場合は以下のようにすると保管できます。
(
spark.readStream.table(f"{catalog}.{schema}.{raw_table}")
.withColumn("chunk", parse_binary_to_chunks("path", "content"))
.withColumn("chunk", F.explode("chunk"))
.withColumn("chunk", F.try_parse_json("chunk"))
.select("path", "chunk")
.writeStream.option("checkpointLocation", os.path.join(checkpoint_path, text_table))
.trigger(availableNow=True)
.toTable(f"{catalog}.{schema}.{text_table}")
.awaitTermination()
)
まとめ
Doclingを使ってドキュメントファイルからテキストを抽出する処理を試してみました。
Spark UDF内でも使えるので、ファイルをアップする都度チャンクデータを作って取り込むことも容易にできると感じました。
初回実行時はモデルのダウンロードなどで時間がかかるので、そのあたりを事前にUnity Catalog Volumesなどに保管する設定が出来るといいのですが制御の仕方がわからず。コード読むしかないかなあ。
テキスト化の精度などはきちんと評価してないのですが、体感はよさそうです。
日本語のドキュメントファイルも問題ありません。
その他、画像や表をエクスポートする機能や、LlamaIndex/LangChainとの連携機能もあるようなのでRAG構築におけるドキュメントロード関連処理はこれだけで十分かもしれませんね。
もう少し深く触って理解を深めていきたいと思います。