2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

LangchainとDatabricksで学ぶRAG:DatabricksマネージドなAdvanced RAG Chatbot① チャンクデータの作成

Last updated at Posted at 2024-06-04

タイトルが長い。。。

導入

以下の過去記事で、Databricksモデルサービングを使い、各種処理を行うエンドポイントを構築していました。

今回はこれらのエンドポイントを含む、Databricksマネージドなサービスを組み合わせたRAGパイプラインの構築にチャレンジします。


他方、既にDatabricksマネージドなサービスを使ったRAGパイプラインはdbdemosに含まれるRAGチャットボットとしてサンプルが公開されています。

また、日本語記事として、以下でも解説されています。

今回はこれらをベースに魔改造して以前行ったCRAGを用いた、少し高度なRAGパイプライン構築を行います。
内容的にかなり長いため複数回に分割します。最終的にはStreamlitによるUI構築を含めたチャットボット作成まで行う予定です。

第1回はRAGの検索に利用する文書データのチャンク化を行います。

このシリーズの内容は、必ずしも精度・性能改善につながるわけではありません。
(きちんと評価プロセスを組んでいないため、悪化する可能性もあります。また、システムの複雑性が過剰に上がるかもしれません。)
モデルサービングを利用することでDatabricks内に閉じた形でこんなことができるよ、というサンプル程度に見てもらえればと思います。

構築環境は、Databricks on AWS、DBRは15.2ML、インスタンスタイプは適当なCPUクラスタで行いました。

流れの解説

この回ではDatabricksの公式ドキュメントを取得し、チャンク化したデータをDelta Tableとして保管します。
基本的にはこちらの処理と同等ですが、最後のチャンク化については、大きなチャンク列とそれを細分化した小さなチャンク列の2種のチャンク列を作るところが異なります。

狙いとして、検索はベクトル化した小さなチャンク列に対して行い、回答生成においてはより大きなチャンクを与えることで、回答性能の向上を図るためです。(概念的にはこちらのParent Document Retrieverと同様です)

Step1. パッケージインストール/各種設定準備

ノートブックを作成し、必要なパッケージをインストール。
トークン数計算のためにtiktokenをインストールしておきます。

%pip install lxml==4.9.3 langchain tiktoken 

dbutils.library.restartPython()

各種定数を定義。
データ保管先として、trainingカタログのdatabricks_qa_jpスキーマを設定しました。
試す際は環境に合わせて変更ください。

DATABRICKS_SITEMAP_URL = "https://docs.databricks.com/en/doc-sitemap.xml"
CATALOG = "training"
SCHEMA = "databricks_qa_jp"

Step2. 文書データの取得

dbdemosおよびこちらの内容を基に、Databricks公式ドキュメントサイトから日本語ドキュメントを取得します。

まずは処理の関数を定義。

# from https://github.com/databricks-demos/dbdemos-notebooks/blob/main/product_demos/Data-Science/chatbot-rag-llm/

import requests
import pandas as pd
from bs4 import BeautifulSoup
import xml.etree.ElementTree as ET
from pyspark.sql.types import StringType
from concurrent.futures import ThreadPoolExecutor
from pyspark.sql.types import StringType
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, pandas_udf

# Add retries with backoff to avoid 429 while fetching the doc
retries = Retry(
    total=3,
    backoff_factor=3,
    status_forcelist=[429],
)


def download_databricks_documentation_articles(max_documents=None):
    """Databricksのドキュメントの記事をサイトマップからダウンロードします。"""

    # サイトマップからXMLコンテンツを取得
    response = requests.get(DATABRICKS_SITEMAP_URL)
    root = ET.fromstring(response.content)

    # XML内のすべての 'loc' 要素(URL)を取得
    urls = [
        loc.text
        for loc in root.findall(".//{http://www.sitemaps.org/schemas/sitemap/0.9}loc")
    ]
    replace_urls = [
        s.replace("https://docs.databricks.com/en/", "https://docs.databricks.com/ja/")
        for s in urls
    ]  # 英語URLを日本語URLに

    if max_documents:
        replace_urls = replace_urls[:max_documents]

    # URLからDataFrameを作成
    df_urls = (
        spark.createDataFrame(replace_urls, StringType()).toDF("url").repartition(10)
    )

    # 一括でURLのHTMLコンテンツを取得するためのPandas UDF
    @pandas_udf("string")
    def fetch_html_udf(urls: pd.Series) -> pd.Series:
        adapter = HTTPAdapter(max_retries=retries)
        http = requests.Session()
        http.mount("http://", adapter)
        http.mount("https://", adapter)

        def fetch_html(url):
            try:
                response = http.get(url)
                if response.status_code == 200:
                    return response.content
            except requests.RequestException:
                return None
            return None

        with ThreadPoolExecutor(max_workers=200) as executor:
            results = list(executor.map(fetch_html, urls))
        return pd.Series(results)

    # HTMLコンテンツからテキストを抽出するためのPandas UDF
    @pandas_udf("string")
    def download_web_page_udf(html_contents: pd.Series) -> pd.Series:
        def extract_text(html_content):
            if html_content:
                soup = BeautifulSoup(html_content, "html.parser")
                article_div = soup.find("div", itemprop="articleBody")
                if article_div:
                    return str(article_div).strip()
            return None

        return html_contents.apply(extract_text)

    # UDFをDataFrameに適用
    df_with_html = df_urls.withColumn("html_content", fetch_html_udf("url"))
    final_df = df_with_html.withColumn("text", download_web_page_udf("html_content"))

    # 非nullの結果を選択してフィルタリング
    final_df = final_df.select("url", "text").filter("text IS NOT NULL").cache()
    if final_df.isEmpty():
        raise Exception(
            "Dataframe is empty, couldn't download Databricks documentation, please check sitemap status."
        )

    return final_df


# シェアードクラスターモードでテーブルが存在するかどうかをテストするための一時的な回避策 DBR 14.2(SASP-2467を参照)
def table_exists(table_name):
    try:
        spark.table(table_name).isEmpty()
    except:
        return False
    return True

定義したダウンロード処理を実行します。

spark.sql(f"USE CATALOG `{CATALOG}`")
spark.sql(f"USE SCHEMA `{SCHEMA}`")

if (
    not table_exists("databricks_jp_raw_documentation")
    or spark.table("databricks_jp_raw_documentation").isEmpty()
):
    # Databricksのドキュメントをデータフレームにダウンロード
    doc_articles = download_databricks_documentation_articles()
    # databricks_jp_raw_documentationテーブルに保存
    doc_articles.write.mode("overwrite").saveAsTable("databricks_jp_raw_documentation")

display(spark.table("databricks_jp_raw_documentation").limit(2))

image.png

こちらのように保管されます。
※ 自分はシングルノードクラスタで実行したのですが、かなり時間がかかったので、マルチノードクラスタで実行したほうがよさそうです。

Step3. データのチャンキング

保管した文書データを基に、文書を適切な粒度でチャンキングしたデータを作成します。
ここが一部ベースとは異なり、2種のチャンクデータを生成する関数をそれぞれ定義します。

split_html_on_h2がHTMLのH2単位でチャンク化する処理(ただし、細かいチャンクはマージする)、split_to_small_chunksはH2単位のチャンクをさらに細かくチャンク化する処理です。

ここのチャンク処理は文書特性に応じて調整する余地があると思います。
チャンクのサイズなど。

raw_table_name = "databricks_jp_raw_documentation"

spark.sql(f"USE CATALOG `{CATALOG}`")
spark.sql(f"USE SCHEMA `{SCHEMA}`")
from langchain_text_splitters import (
    HTMLHeaderTextSplitter,
    RecursiveCharacterTextSplitter,
)
import tiktoken
from typing import Any


class JapaneseCharacterTextSplitter(RecursiveCharacterTextSplitter):
    """句読点も句切り文字に含めるようにするためのスプリッタ"""

    def __init__(self, **kwargs: Any):
        separators = ["\n\n", "\n", "", "", " ", ""]
        super().__init__(separators=separators, **kwargs)


# Split on H2で分割しますが、あまり小さすぎないように小さなh2チャンクはマージします
def split_html_on_h2(html, min_chunk_size=20, max_chunk_size=1000):
    if not html:
        return []

    html_splitter = HTMLHeaderTextSplitter(headers_to_split_on=[("h2", "header2")])
    text_splitter = JapaneseCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)    
    h2_chunks = html_splitter.split_text(html)
    tokenizer = tiktoken.get_encoding("cl100k_base")
    chunks = []
    previous_chunk = ""
    # チャンクを結合し、h2の前にテキストを追加することでチャンクを結合し、小さすぎる文書を回避します
    for c in h2_chunks:
        # h2の結合 (注意: 重複したh2を回避するために以前のチャンクを削除することもできます)
        content = c.metadata.get("header2", "") + "\n" + c.page_content
        # チャンクサイズが小さい場合、前のものと結合
        if len(tokenizer.encode(previous_chunk + content)) <= max_chunk_size / 2:
            previous_chunk += content + "\n"
        # 大きすぎるチャンクサイズの場合は分割
        elif len(tokenizer.encode(previous_chunk)) > (max_chunk_size * 2):
            chunks.extend(text_splitter.split_text(previous_chunk.strip()))
            previous_chunk = content + "\n"
        else:
            chunks.extend([previous_chunk.strip()])
            previous_chunk = content + "\n"
    if previous_chunk:
        chunks.extend([previous_chunk.strip()])

    # 小さすぎるチャンクの破棄
    return [c for c in chunks if len(tokenizer.encode(c)) > min_chunk_size]


# Split on H2で分割しますが、あまり小さすぎないように小さなh2チャンクはマージします
def split_to_small_chunks(text):
    text_splitter = JapaneseCharacterTextSplitter(chunk_size=400, chunk_overlap=100)
    return text_splitter.split_text(text)


# チャンク処理関数を試行
html = spark.table(raw_table_name).limit(1).collect()[0]["text"]
result1 = split_html_on_h2(html)
result2 = split_to_small_chunks(result1[0])

print(result1)
print("----------------------------")
print(result2)
出力
['Databricks Connect for Scala  \n注  \nこの記事では、Databricks Connect for Databricks Runtime 13.3 LTS 以降について説明します。  \nこの記事では、 IntelliJ IDEA と Scala プラグインで Scala を使用して、Databricks Connect をすぐに使い始める方法について説明します。  \nこの記事の Python バージョンについては、「 Databricks Connect for Python」を参照してください。  \nこの記事の R バージョンについては、「 Databricks Connect for R」を参照してください。  \nDatabricks Connect を使用すると、IntelliJ IDEA、ノートブック サーバー、その他のカスタム アプリケーションなどの一般的な IDE を Databricks クラスターに接続できます。 「Databricks Connect とは」を参照してください。  \nチュートリアル\nチュートリアル\nこのチュートリアルをスキップして、代わりに別の IDE を使用するには、「 次の手順」を参照してください。\n\n要件', '要件\nこのチュートリアルを完了するには、次の要件を満たす必要があります。  \nターゲットの Databricks ワークスペースとクラスターは、 Databricks Connect のクラスター構成の要件を満たしている必要があります。  \nクラスター ID が使用可能になっている必要があります。 クラスター ID を取得するには、ワークスペースでサイドバーの [ コンピュート ] をクリックし、クラスターの名前をクリックします。 Web ブラウザーのアドレスバーで、URL の clusters から configuration までの文字列をコピーします。  \nJava 開発キット (JDK) が開発マシンにインストールされていること。 Databricks では、使用する JDK インストールのバージョンを Databricks クラスターの JDK バージョンと一致させることをお勧めします。 次の表に、サポートされている各 Databricks Runtimeの JDK バージョンを示します。  \nDatabricks Runtimeのバージョン  \nJDK のバージョン  \n13.3 LTS - 15.0、13.3 ML LTS - 15.0 ML  \nJDK 8  \n注  \nJDK がインストールされていない場合、または開発マシンに複数の JDK がインストールされている場合は、後の手順 1 で特定の JDK をインストールまたは選択できます。クラスター上の JDK バージョンより下または上位の JDK インストールを選択すると、予期しない結果が発生したり、コードがまったく実行されなかったりする可能性があります。  \nIntelliJ IDEA がインストールされています。 このチュートリアルは、IntelliJ IDEA Community Edition 2023.3.6 でテストされました。 IntelliJ IDEA の異なるバージョンまたはエディションを使用する場合、次の手順は異なる場合があります。  \nIntelliJ IDEA 用の Scala プラグイン がインストールされています。', 'ステップ 1: Databricks 認証を構成する', 'ステップ 1: Databricks 認証を構成する\nこのチュートリアルでは、Databricks OAuth ユーザー対マシン (U2M) 認証と Databricks 構成プロファイル を使用して、Databricks ワークスペースで認証を行います。 代わりに別の認証の種類を使用するには、「 接続プロパティの構成」を参照してください。  \nOAuth U2M 認証を構成するには、次のように Databricks CLI が必要です。  \nまだインストールされていない場合は、 次のように Databricks CLI をインストールします。  \nHomebrew を使用して、次の 2 つのコマンドを実行して Databricks CLI をインストールします。  \nbrew tap databricks/tap brew install databricks  \nDatabricks CLI をインストールするには、 winget、 Chocolatey 、または Windows Subsystem for Linux (WSL) を使用できます。 winget、Chocolatey、または WSL を使用できない場合は、この手順をスキップし、代わりにコマンド プロンプトまたは PowerShell を使用してソースから Databricks CLI をインストールする必要があります。  \n注  \nDatabricks CLI と Chocolatey のインストールは 実験段階です。  \nwinget を使用して Databricks CLI をインストールするには、次の 2 つのコマンドを実行し、コマンド プロンプトを再起動します。  \nwinget search databricks winget install Databricks.DatabricksCLI  \nChocolatey を使用して Databricks CLI をインストールするには、次のコマンドを実行します。  \nchoco install databricks-cli  \nWSL を使用して Databricks CLI をインストールするには、次のようにします。  \nWSL を使用して curl と zip をインストールします。 詳細については、オペレーティング システムのマニュアルを参照してください。  \nWSL を使用して、次のコマンドを実行して Databricks CLI をインストールします。  \ncurl -fsSL https://raw.githubusercontent.com/databricks/setup-cli/main/install.sh | sh  \n次のコマンドを実行して、Databricks CLI がインストールされ、インストールされている Databricks CLI の現在のバージョンが表示されることを確認します。 このバージョンは 0.205.0 以上である必要があります。  \ndatabricks -v  \n注  \ndatabricks を実行しても command not found: databricksなどのエラーが発生した場合、または databricks -v を実行してバージョン番号が 0.18 以下の場合は、マシンで Databricks CLI 実行可能ファイルの正しいバージョンが見つからないことを意味します。これを修正するには、「 CLI のインストールを確認する」を参照してください。  \nOAuth U2M 認証を次のように開始します。  \nDatabricks CLI を使用して、ターゲット ワークスペースごとに次のコマンドを実行して、OAuth トークン管理をローカルで開始します。  \n次のコマンドで、 <workspace-url> を Databricks ワークスペース インスタンスの URL に置き換えます (例: https://dbc-a1b2345c-d6e7.cloud.databricks.com)。  \ndatabricks auth login --configure-cluster --host <workspace-url>  \nDatabricks CLI では、入力した情報を Databricks 構成プロファイルとして保存するように求められます。 Enterを押して、提案されたプロファイル名を受け入れるか、新規または既存のプロファイルの名前を入力します。同じ名前の既存のプロファイルは、入力した情報で上書きされます。 プロファイルを使用すると、複数のワークスペース間で認証コンテキストをすばやく切り替えることができます。  \n既存のプロファイルの一覧を取得するには、別のターミナルまたはコマンド プロンプトで、Databricks CLI を使用してコマンド databricks auth profilesを実行します。 特定のプロファイルの既存の設定を表示するには、コマンド databricks auth env --profile <profile-name>を実行します。  \nWeb ブラウザーで、画面の指示に従って Databricks ワークスペースにログインします。  \nターミナルまたはコマンド プロンプトに表示される使用可能なクラスターの一覧で、上方向キーと下方向キーを使用してワークスペース内のターゲット Databricks クラスターを選択し、 Enterを押します。 また、クラスターの表示名の任意の部分を入力して、使用可能なクラスターの一覧をフィルター処理することもできます。  \nプロファイルの現在の OAuth トークン値とトークンの今後の有効期限のタイムスタンプを表示するには、次のいずれかのコマンドを実行します。  \ndatabricks auth token --host <workspace-url>  \ndatabricks auth token -p <profile-name>  \ndatabricks auth token --host <workspace-url> -p <profile-name>  \n同じ --host 値を持つプロファイルが複数ある場合は、Databricks CLI で一致する正しい OAuth トークン情報を見つけられるように、 --host オプションと -p オプションを一緒に指定する必要がある場合があります。', 'ステップ2:プロジェクトを作成する\nIntelliJ IDEAを起動します。  \nメイン メニューで、[ファイル] > [新しい> プロジェクト] をクリックします。  \nプロジェクトに意味のある 名前を付けます。  \n[場所] でフォルダー アイコンをクリックし、画面の指示を完了して、新しい Scala プロジェクトへのパスを指定します。  \n[言語] で [Scala] をクリックします。  \n[ビルド システム] で sbt をクリックする。  \n「JDK」ドロップダウンリストで、クラスターの JDK バージョンと一致する開発マシン上の JDK の既存のインストールを選択するか、「JDK のダウンロード」を選択し、画面の指示に従って、クラスターの JDK バージョンと一致する JDK をダウンロードします。  \n注  \nクラスター上の JDK バージョンより上または下の JDK インストールを選択すると、予期しない結果が発生したり、コードがまったく実行されなかったりする可能性があります。  \nsbt のドロップダウンリストで、最新バージョンを選択する。  \n[Scala] ドロップダウンリストで、クラスター上の Scala バージョンと一致する Scala のバージョンを選択します。 次の表は、サポートされている各 Databricks Runtimeの Scala バージョンを示しています。  \nDatabricks Runtimeのバージョン  \nScala バージョン  \n13.3 LTS - 15.0、13.3 ML LTS - 15.0 ML  \n2.12.15  \n注  \nクラスターで Scala バージョンより下または上位の Scala バージョンを選択すると、予期しない結果が発生したり、コードがまったく実行されなかったりする可能性があります。  \nScalaの横にある[ソースのダウンロード]ボックスがオンになっていることを確認します。  \n[ パッケージ接頭辞] に、プロジェクトのソースのパッケージ接頭辞の値 ( org.example.applicationなど) を入力します。  \n[ サンプル コードの追加 ] ボックスがオンになっていることを確認します。  \n「作成」をクリックします。', 'ステップ3:Databricks Connectパッケージを追加する\nステップ3:Databricks Connectパッケージを追加する\n新しい Scala プロジェクトを開いた状態で、[プロジェクト] ツール ウィンドウ ([ツールウィンドウの表示] > [プロジェクト] > で、プロジェクト名>ターゲットで build.sbtという名前のファイルを開きます。  \n次のコードを build.sbt ファイルの末尾に追加して、Scala 用 Databricks Connect ライブラリの特定のバージョンに対するプロジェクトの依存関係を宣言します。  \nlibraryDependencies += "com.databricks" % "databricks-connect" % "14.3.1"  \n14.3.1、クラスター上の Databricks Runtime バージョンと一致するバージョンの Databricks Connect ライブラリに置き換えます。 Databricks Connect ライブラリのバージョン番号は、 Maven の中央リポジトリで確認できます。  \nsbt の変更をロードする通知アイコンをクリックして、Scala プロジェクトを新しいライブラリの場所と依存関係で更新する。  \nIDE の下部にある sbt 進行状況インジケーターが消えるまで待ちます。 sbtロード プロセスが完了するまでに数分かかる場合があります。\n\nステップ4:コードを追加する', 'ステップ4:コードを追加する\n[プロジェクト ] ツール ウィンドウで、 メイMain.scalaン > の [プロジェクト名 ] > src &gtScala; で という名前のファイルを開きます。  \nファイル内の既存のコードを次のコードに置き換え、構成プロファイルの名前に応じてファイルを保存します。  \nステップ 1 の構成プロファイルの名前が DEFAULTの場合は、ファイル内の既存のコードを次のコードに置き換えて、ファイルを保存します。  \npackage org.example.application import com.databricks.connect.DatabricksSession import org.apache.spark.sql.SparkSession object Main { def main(args: Array[String]): Unit = { val spark = DatabricksSession.builder().remote().getOrCreate() val df = spark.read.table("samples.nyctaxi.trips") df.limit(5).show() } }  \nステップ 1 の構成プロファイルの名前が DEFAULTでない場合は、ファイル内の既存のコードを次のコードに置き換えてください。 プレースホルダ <profile-name> をステップ 1 の構成プロファイルの名前に置き換えて、ファイルを保存します。  \npackage org.example.application import com.databricks.connect.DatabricksSession import com.databricks.sdk.core.DatabricksConfig import org.apache.spark.sql.SparkSession object Main { def main(args: Array[String]): Unit = { val config = new DatabricksConfig().setProfile("<profile-name>") val spark = DatabricksSession.builder().sdkConfig(config).getOrCreate() val df = spark.read.table("samples.nyctaxi.trips") df.limit(5).show() } }\n\nステップ 5: コードを実行する', "ステップ 5: コードを実行する\nリモートのDatabricksワークスペースでターゲット・クラスターを開始します。  \nクラスターが起動したら、メイン メニューで [ 実行] をクリックし> ['Main' の実行] をクリックします。  \n[ 実行 ] ツール ウィンドウ ([実行] > [ツール ウィンドウの表示]> [ メイン ] タブに、 samples.nyctaxi.trips テーブルの最初の 5 行が表示されます。\n\nステップ 6: コードをデバッグする\nステップ 6: コードをデバッグする\nターゲット クラスターがまだ実行されている状態で、上記のコードで [ df.limit(5).show() ] の横にある余白をクリックしてブレークポイントを設定します。  \nメイン メニューで、[実行] > [' Main'] のデバッグをクリックします。  \n[デバッグ] ツール ウィンドウ ([デバッグ] >> [ツール] ウィンドウの表示) の [コンソール] タブで、電卓 ([式の評価]) アイコンをクリックします。  \n式df.schemaを入力し、[ 評価] をクリックして DataFrameのスキーマを表示します。  \n[デバッグ] ツールウィンドウのサイドバーで、緑色の矢印([プログラムの再開])アイコンをクリックします。  \n[コンソール] ウィンドウに、samples.nyctaxi.trips テーブルの最初の 5 行が表示されます。\n\n次のステップ", '次のステップ\nDatabricks Connect の詳細については、次のような記事を参照してください。  \nOAuth U2M 以外の Databricks 認証の種類を使用するには、「 接続プロパティの構成」を参照してください。  \n他の IDE を使用するには、以下を参照してください。  \nIntelliJ IDEA  \nVisual Studio Code  \nその他の簡単なコード例については、「 Databricks Connect for Scala のコード例」を参照してください。  \nより複雑なコード例を表示するには、GitHub の Databricks Connect リポジトリのサンプル アプリケーション を参照してください。  \nシンプルなETLアプリケーション  \nJFreeChartによるグラフの視覚化  \nDatabricks Connect for Databricks Runtime 12.2 LTS 以下から Databricks Connect for Databricks Runtime 13.3 LTS 以降に移行するには、「 Databricks Connect for Scala への移行」を参照してください。  \nトラブルシューティングと制限事項に関する情報も参照してください。']
----------------------------
['Databricks Connect for Scala  \n注  \nこの記事では、Databricks Connect for Databricks Runtime 13.3 LTS 以降について説明します。  \nこの記事では、 IntelliJ IDEA と Scala プラグインで Scala を使用して、Databricks Connect をすぐに使い始める方法について説明します。  \nこの記事の Python バージョンについては、「 Databricks Connect for Python」を参照してください。  \nこの記事の R バージョンについては、「 Databricks Connect for R」を参照してください。', 'この記事の R バージョンについては、「 Databricks Connect for R」を参照してください。  \nDatabricks Connect を使用すると、IntelliJ IDEA、ノートブック サーバー、その他のカスタム アプリケーションなどの一般的な IDE を Databricks クラスターに接続できます。 「Databricks Connect とは」を参照してください。  \nチュートリアル\nチュートリアル\nこのチュートリアルをスキップして、代わりに別の IDE を使用するには、「 次の手順」を参照してください。', '要件']

チャンクデータを保管するテーブルを事前作成します。

%sql
--インデックスを作成するためには、テーブルでチェンジデータフィードを有効化する必要があることに注意してください
CREATE TABLE IF NOT EXISTS databricks_documentation (
  id BIGINT GENERATED BY DEFAULT AS IDENTITY,
  url STRING,
  large_chunk_id BIGINT,
  large_chunk STRING,
  content STRING
) TBLPROPERTIES (delta.enableChangeDataFeed = true); 

チャンキング処理を呼び出すpandas_udfを作成し、テーブルへ保管します。

import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, pandas_udf


# H2単位チャンキングのためのUDFを定義
@pandas_udf("array<string>")
def parse_and_split_large(docs: pd.Series) -> pd.Series:
    return docs.apply(split_html_on_h2)

# 詳細単位チャンキングのためのUDFを定義
@pandas_udf("array<string>")
def parse_and_split_small(docs: pd.Series) -> pd.Series:
    return docs.apply(split_to_small_chunks)

df = (
    spark.table(raw_table_name)
    .filter("text is not null")
    .withColumn("large_chunk", F.explode(parse_and_split_large("text")))
    .withColumn("large_chunk_id", F.monotonically_increasing_id())
    .withColumn("content", F.explode(parse_and_split_small("large_chunk")))
    .drop("text")
    .write.mode("overwrite")
    .saveAsTable("databricks_documentation")
)

display(spark.table("databricks_documentation"))

これで以下のようなテーブルが作られます。

image.png

同じ大きなチャンクから分割した小型チャンクのレコードは、同じlarge_chunk_idを持つ構造になります。
これで2種のチャンクの親子関係を保持したチャンクテーブルが出来ました。

まとめ

Databricks公式ドキュメントを用いたチャンクデータテーブルが出来ました。
次回はこのテーブルを利用してVector Search用のインデックスを作成します。

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?