もう結構経ってしまっていますが、先日Microsoft社がMarkitdownというパッケージをOSSで公開しました。
PDFやOffice系ファイル等をMarkdownに変換できるものです。
以前、Doclingを使って非構造ファイルをテキスト化・チャンク化してDatabricksに保管する流れを試してみましたが、今回はこちらをMarkitdownに置き換えて実行してみます。
基本的にな流れは上記記事と同じで、ファイルの取込とそのバイナリデータをMarkdownに変換するところまで行います。Delta Tableに保管するところは繰り返しになるので割愛。
環境としてはDatabricks on AWS、サーバレスクラスタを使いました。
Markitdownとは?
既に数多くの紹介記事がでていますので、そちらを参考に。。。
こちらの紹介記事がわかりやすいです。
やってみる
Step1. パッケージインストール
Markitdownをインストールします。
%pip install markitdown
dbutils.library.restartPython()
Step2. 各種パラメータの設定
サンプルとして取り込むファイルの場所や、保管先のカタログ/スキーマ、テーブル名などをDatabricksのノートブックウィジットとして設定します。
こちらの記事と内容は同じです。
場所は適当なので、試する場合は適宜変更してください。
import os.path
dbutils.widgets.text("catalog", "training")
catalog = dbutils.widgets.get("catalog")
dbutils.widgets.text("schema", "llm")
schema = dbutils.widgets.get("schema")
dbutils.widgets.text("raw_table", "sample_documents_raw")
raw_table = dbutils.widgets.get("raw_table")
dbutils.widgets.text("text_table", "sample_documents_text")
text_table = dbutils.widgets.get("text_table")
dbutils.widgets.text("raw_data_path", "/Volumes/training/llm/raws/samples")
raw_data_path = dbutils.widgets.get("raw_data_path")
dbutils.widgets.text(
"checkpoint_path", "/Volumes/training/llm/delta/samples/checkpoint"
)
checkpoint_path = dbutils.widgets.get("checkpoint_path")
Step3. ドキュメントファイルの取込
Unity CatalogのVolumes内に置いてあるファイルをテーブルに取り込みます。
使うファイルは以前のこちらの記事のものを再々利用します。
行っていることは上記記事と全く同じで、ファイルをバイナリデータとしてDeltaTableにAutoLoaderを使って取り込んでいます。
(
# Databricks Autoloaderを使って増分ファイル取込
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobFilter", "{*.pdf,*.docx,*.pptx}")
.option("recursiveFileLookup", "true")
.load(raw_data_path)
.writeStream.option("checkpointLocation", os.path.join(checkpoint_path, raw_table))
.trigger(availableNow=True)
.toTable(f"{catalog}.{schema}.{raw_table}")
.awaitTermination()
)
処理を実行すると以下のようにバイナリデータがテーブルに格納されます。
Step4. Markitdownを使ってマークダウンに変換・保存
では、Markitdownを使ってStep3で取り込んだバイナリデータからテキスト抽出してみます。
まずは変換用のSpark UDFを定義。
バイナリデータからの変換はconvert_stream
メソッドを使うのが便利そうでしたのでそちらを利用しました。
from pyspark.sql.functions import pandas_udf
import pyspark.sql.functions as F
import io
import pandas as pd
@pandas_udf("string")
def parse_binary_to_text(paths: pd.Series, contents: pd.Series) -> pd.Series:
from io import BytesIO
from markitdown import MarkItDown
try:
markitdown = MarkItDown()
results = [markitdown.convert_stream(io.BytesIO(byte_data)) for byte_data in contents]
return pd.Series([r.text_content for r in results])
except Exception as e:
# 例外時はひとまずエラーメッセージを返す。ちゃんと実装することをお薦め。
return pd.Series([str(e)] * len(paths))
こちらのUDFを使って変換をしてみましょう。
では、まずは各ファイルよりMarkdown形式でテキストデータを抽出してみます。
Step3で読み込んだデータ全てに対して抽出処理を実行して表示。
df = (
spark.table(f"{catalog}.{schema}.{raw_table}")
.withColumn("text", parse_binary_to_text("path", "content"))
.select("path", "text")
)
display(df)
以下のようにtext
列に変換された結果が入りました。変換速度はかなり速い。
これをチャンク化して保存すればRAG用のデータとしても使えそうです。
まとめ
簡単でしたが、Markitdownを使って非構造データファイルをMarkdown化して構造化データとして管理することをDatabricks上で行ってみました。性能面はdoclingの方が良いかもしれませんが、軽量で処理速度も速く扱いやすいパッケージです。
Markitdown自体は様々なパッケージをうまく使う形で実装されているようですが、OSSとして公開されたので独自処理なども今後追加されていく気がしています。
また、今回は行っていませんがOpenAIのAPI等と組み合わせて画像の中身をOCR的にMarkdownに変換することもできるようなので、とにかく多様なファイル群をテキスト・Markdownに変換したいという際に便利だと思います。
試してませんが、Unity Catalog上のpython関数として登録しておけば、SQLでも使えてさらに便利そう。
データ処理が捗るなあ。
個人的な話ですが、ここしばらく記事を書く余裕がさっぱり無くアドベントカレンダーに全然参加できず。。。
年末年始は時間が取れそうなので、いろいろやってみたかったことを記事にしていきたいと思います。