5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

DatabricksでRAG用のドキュメントロード処理用にDoclingを試してみる

Posted at

導入

Doclingが凄いというのを見まして。

DoclingはPDFやOffice系のファイルなどを解析し、Markdown等の形式でテキスト情報や画像データなどをエクスポートするパッケージです。

上記のドキュメントより、説明部を抜粋。

Doclingは、ドキュメントを解析し、望む形式に簡単かつ迅速にエクスポートします。

特徴

  • 🗂️ 人気のあるドキュメント形式(PDF、DOCX、PPTX、画像、HTML、AsciiDoc、Markdown)を読み取り、MarkdownおよびJSONにエクスポート
  • 📑 PDFドキュメントの高度な理解を含むページレイアウト、読み取り順序、およびテーブル構造
  • 🧩 統一された、表現力豊かなDoclingDocument表現形式
  • 📝 タイトル、著者、参照、および言語を含むメタデータの抽出
  • 🤖 強力なRAG / QAアプリケーションのためのLlamaIndex 🦙 & LangChain 🦜🔗とのシームレスな統合
  • 🔍 スキャンされたPDFのOCRサポート
  • 💻 シンプルで便利なCLI

以前、DatabricksでRAG用のドキュメントロード処理を簡易実装する記事を書きましたが、Doclingを使うと手軽にチャンク作成できるんじゃないかと思い、Doclingを試してみた記録です。

基本的にな流れはこの記事と同じで、ファイルの取込とテキストデータをDelta Tableに保管するところまでを行ってみます。

試験環境としてはDatabricks on AWS、DBR 15.4MLのシングルユーザクラスタを使いました。
サーバレスクラスタでも試してみたのですが、UDFで処理をラップした際に/.cacheにファイルを書き込もうとしてエラーが出たため、今回はサーバレスクラスタ不使用としています。

Step1. パッケージインストール

Doclingをインストールします。

%pip install docling

dbutils.library.restartPython()

Step2. 各種パラメータの設定

サンプルとして取り込むファイルの場所や、保管先のカタログ/スキーマ、テーブル名などをDatabricksのノートブックウィジットとして設定します。
場所は適当なので、マネする場合は適宜変更してください。

import os.path

dbutils.widgets.text("catalog", "training")
catalog = dbutils.widgets.get("catalog")

dbutils.widgets.text("schema", "llm")
schema = dbutils.widgets.get("schema")

dbutils.widgets.text("raw_table", "sample_documents_raw")
raw_table = dbutils.widgets.get("raw_table")

dbutils.widgets.text("text_table", "sample_documents_text")
text_table = dbutils.widgets.get("text_table")

dbutils.widgets.text("raw_data_path", "/Volumes/training/llm/raws/samples")
raw_data_path = dbutils.widgets.get("raw_data_path")

dbutils.widgets.text(
    "checkpoint_path", "/Volumes/training/llm/delta/samples/checkpoint"
)
checkpoint_path = dbutils.widgets.get("checkpoint_path")

Step3. ドキュメントファイルの取込

Unity CatalogのVolumes内に置いてあるファイルをテーブルに取り込みます。

使うファイルは以前の記事のものを再び利用します。

行っていることは上記記事と全く同じで、ファイルをバイナリデータとしてDeltaTableにAutoLoaderを使って取り込んでいます。

(
    # Databricks Autoloaderを使って増分ファイル取込
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "binaryFile")
    .option("pathGlobFilter", "{*.pdf,*.docx,*.pptx}")
    .option("recursiveFileLookup", "true")
    .load(raw_data_path)
    .writeStream.option("checkpointLocation", os.path.join(checkpoint_path, raw_table))
    .trigger(availableNow=True)
    .toTable(f"{catalog}.{schema}.{raw_table}")
    .awaitTermination()
)

Step4. Doclingで読込

では、Doclingを使ってStep3で取り込んだバイナリデータからテキスト抽出してみます。
まずはそのためのSpark UDFを定義。

今回は、①Markdown形式でテキスト取得するUDF、②JSON形式でテキスト取得するUDF、③構造に合わせたチャンク化した結果を取得するUDFの3種定義しました。

from pyspark.sql.functions import pandas_udf
import pyspark.sql.functions as F
import io
import pandas as pd


@pandas_udf("string")
def parse_binary_to_text(paths: pd.Series, contents: pd.Series) -> pd.Series:
    from io import BytesIO
    import uuid
    from docling.datamodel.base_models import DocumentStream
    from docling.document_converter import DocumentConverter

    sources = [
        DocumentStream(name=p, stream=BytesIO(c)) for p, c in zip(paths, contents)
    ]

    try:
        converter = DocumentConverter()
        results = converter.convert_all(sources, raises_on_error=False)

        return pd.Series([r.document.export_to_markdown() for r in results])
    except Exception as e:
        # 例外時はひとまずエラーメッセージを返す。ちゃんと実装することをお薦め。
        return pd.Series([str(e)] * len(paths))

@pandas_udf("string")
def parse_binary_to_json(paths: pd.Series, contents: pd.Series) -> pd.Series:
    from io import BytesIO
    import json
    from docling.datamodel.base_models import DocumentStream
    from docling.document_converter import DocumentConverter

    sources = [
        DocumentStream(name=p, stream=BytesIO(c)) for p, c in zip(paths, contents)
    ]

    try:
        converter = DocumentConverter()
        results = converter.convert_all(sources, raises_on_error=False)

        return pd.Series([json.dumps(r.document.export_to_dict()) for r in results])
    except Exception as e:
        # 例外時はひとまずエラーメッセージを返す。ちゃんと実装することをお薦め。
        return pd.Series([str(e)] * len(paths))


@pandas_udf("array<string>")
def parse_binary_to_chunks(paths: pd.Series, contents: pd.Series) -> pd.Series:
    from io import BytesIO
    import json
    from docling.datamodel.base_models import DocumentStream
    from docling.document_converter import DocumentConverter
    from docling_core.transforms.chunker import HierarchicalChunker

    sources = [
        DocumentStream(name=p, stream=BytesIO(c)) for p, c in zip(paths, contents)
    ]

    try:
        converter = DocumentConverter()
        results = converter.convert_all(sources, raises_on_error=False)

        chunks_list = [list(HierarchicalChunker().chunk(r.document)) for r in results]
        chunks = [ [json.dumps(c.export_json_dict()) for c in chunks] for chunks in chunks_list]
        return pd.Series(chunks)

    except Exception as e:
        # 例外時はひとまずエラーメッセージを返す。ちゃんと実装することをお薦め。
        return pd.Series([str(e)] * len(paths))

では、まずは各ファイルよりMarkdown形式でテキストデータを抽出してみます。
Step3で読み込んだデータ全てに対して抽出処理を実行して表示。

(
    spark.table(f"{catalog}.{schema}.{raw_table}")
    .withColumn("text", parse_binary_to_text("path", "content"))
    .select("path", "text")
    .display()
)

結果は以下のようになりました。

image.png

結果がマークダウン形式のテキストとして取り込めています。
また、Wordファイルに含まれる表もMarkdownの表形式として抽出されていました。

次にJSON形式のデータとして抽出してみます。

(
    spark.table(f"{catalog}.{schema}.{raw_table}")
    .withColumn("json", parse_binary_to_json("path", "content"))
    .withColumn("json", F.try_parse_json("json"))
    .select("path", "json")
    .display()
)

結果は以下のようになりました。

image.png

こちらはメタデータ含めてテキスト情報を取得できます。
かなり細かい情報まで含まれるようなので、適宜必要な情報に絞って利用すると便利そう。

Databricks(DeltaLake)はVARIANT型に対応しているので、こういったJSONデータもパース出来て便利ですね。

最後に、Doclingの機能を使ってチャンク化したデータの抽出も試してみます。

(
    spark.table(f"{catalog}.{schema}.{raw_table}")
    .withColumn("chunk", parse_binary_to_chunks("path", "content"))
    .withColumn("chunk", F.explode("chunk"))
    .withColumn("chunk", F.try_parse_json("chunk"))
    .select("path", "chunk")
    .display()
)

結果は以下のようになりました。

構造に合わせてチャンク化してくれるようで、意味を持った塊で自動的にチャンクを作ってくれるのは便利そうです。
少し細かめにチャンキングする傾向にあるかな、という印象。

image.png

また、チャンク化したデータを保存する、という処理を作る場合は以下のようにすると保管できます。

(
    spark.readStream.table(f"{catalog}.{schema}.{raw_table}")
    .withColumn("chunk", parse_binary_to_chunks("path", "content"))
    .withColumn("chunk", F.explode("chunk"))
    .withColumn("chunk", F.try_parse_json("chunk"))
    .select("path", "chunk")
    .writeStream.option("checkpointLocation", os.path.join(checkpoint_path, text_table))
    .trigger(availableNow=True)
    .toTable(f"{catalog}.{schema}.{text_table}")
    .awaitTermination()
)

まとめ

Doclingを使ってドキュメントファイルからテキストを抽出する処理を試してみました。
Spark UDF内でも使えるので、ファイルをアップする都度チャンクデータを作って取り込むことも容易にできると感じました。

初回実行時はモデルのダウンロードなどで時間がかかるので、そのあたりを事前にUnity Catalog Volumesなどに保管する設定が出来るといいのですが制御の仕方がわからず。コード読むしかないかなあ。

テキスト化の精度などはきちんと評価してないのですが、体感はよさそうです。
日本語のドキュメントファイルも問題ありません。

その他、画像や表をエクスポートする機能や、LlamaIndex/LangChainとの連携機能もあるようなのでRAG構築におけるドキュメントロード関連処理はこれだけで十分かもしれませんね。

もう少し深く触って理解を深めていきたいと思います。

5
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?