4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

DatabricksにおけるRAGのモジュール化

Last updated at Posted at 2024-11-10

こちらのイベントでモジュール化された方法でLLMOpsを実装するという話をさせていただきました。

言うは易しなので、実際にRAGをモジュール化して実装してみます。以前はほとんどをノートブック1つで実装していたので、モジュール化どころかモノリシックな実装でした。

最近の機能によって、思った以上にモジュール化がしやすくなっていました。ここでは、モジュール化を意識しながら自分のQiita記事を用いたRAGを構築します。

Screenshot 2024-11-10 at 14.42.02.png

モデルサービングエンドポイントの構築

RAGの構成要素であるベクトルDBとレスポンス生成で使用するモデルのエンドポイントを構築しておきます。

  • エンべディングモデル: text-embedding-ada-002
  • LLM: gpt-fo-mini

Screenshot 2024-11-10 at 9.37.01.png
Screenshot 2024-11-10 at 9.37.20.png

ベクトルDBの構築

こちらは以前からあるVector Searchを使います。コンテキスト長の超過を避けるためにチャンキングしておきます。セマンティックチャンキングもトライしたいですが、まずはシンプルにH2タグで。

%pip install mlflow==2.10.1 lxml==4.9.3 transformers==4.30.2 langchain==0.1.5 databricks-vectorsearch==0.22
dbutils.library.restartPython()
from langchain.text_splitter import HTMLHeaderTextSplitter, RecursiveCharacterTextSplitter
from transformers import AutoTokenizer, OpenAIGPTTokenizer

max_chunk_size = 500

tokenizer = OpenAIGPTTokenizer.from_pretrained("openai-gpt")
text_splitter = RecursiveCharacterTextSplitter.from_huggingface_tokenizer(tokenizer, chunk_size=max_chunk_size, chunk_overlap=50)
html_splitter = HTMLHeaderTextSplitter(headers_to_split_on=[("h2", "header2")])

# Split on H2で分割しますが、あまり小さすぎないように小さなh2チャンクはマージします
def split_html_on_h2(html, min_chunk_size = 20, max_chunk_size=500):
  if not html:
      return []
  h2_chunks = html_splitter.split_text(html)
  chunks = []
  previous_chunk = ""
  # チャンクを結合し、h2の前にテキストを追加することでチャンクを結合し、小さすぎる文書を回避します
  for c in h2_chunks:
    # h2の結合 (注意: 重複したh2を回避するために以前のチャンクを削除することもできます)
    content = c.metadata.get('header2', "") + "\n" + c.page_content
    if len(tokenizer.encode(previous_chunk + content)) <= max_chunk_size/2:
        previous_chunk += content + "\n"
    else:
        chunks.extend(text_splitter.split_text(previous_chunk.strip()))
        previous_chunk = content + "\n"
  if previous_chunk:
      chunks.extend(text_splitter.split_text(previous_chunk.strip()))
  # 小さすぎるチャンクの破棄
  return [c for c in chunks if len(tokenizer.encode(c)) > min_chunk_size]
 
# チャンク処理関数を試しましょう
html = spark.table("takaakiyayoi_catalog.qiita_2023.taka_qiita_2023").limit(1).collect()[0]['rendered_body']
split_html_on_h2(html)
['前見た時には、Delta Sharingで共有できるのはDeltaテーブルだけだったのですが、今日になってファイルやノートブックも共有できることに気づきました(遅い)。  \nいずれもDatabricks間での共有ですが、クラウドやアカウントを越えて共有できるのでユースケースは結構あると思います。  \n共有側(プロバイダー)をワークスペースA、利用側(受信者)をワークスペースBとします。',
 '共有の作成:ワークスペースAでの作業\nカタログエクスプローラで共有を作成します。Delta Sharingの自分が共有にアクセスし、データを共有をクリックして新規共有を作成します。  \nアセットを追加をクリックします。左のペインからテーブルやボリュームを選択して保存します。  \nこれでテーブルとボリュームが共有に追加されました。  \nさらに右上のアセットを管理をクリックし、ノートブックファイルを追加しますを選択します。  \nノートブックを選択して保存します。',
 '受信者の作成:ワークスペースBでの作業\n別のクラウドやアカウントにあるDatabricksと共有する際には、受信側の共有識別子が必要となります。共有識別子は<cloud>:<region>:<uuid>の形式となっており、以前は特定するのが面倒でしたが今では受信側のカタログエクスプローラで容易にコピーできます。  \nワークスペースBのカタログエクスプローラにアクセスし、Delta Sharing > 自分と共有にアクセスします。画面上部に共有識別子のコピーボタンがあるのでこれをクリックしてコピーします。',
 '受信者の作成:ワークスペースAでの作業\n提供側のDelta Sharing > 自分が共有にアクセスし、上で作成した共有オブジェクトにアクセスし、受信者タブをクリックします。受信者を追加をクリックします。  \n+ 新規受信者を作成 をクリックします。  \n受信者名を入力し、上でコピーした共有識別子を貼り付けて、受信者を作成および追加をクリックします。  \n追加をクリックします。  \nこれで提供者側の作業は完了です。\n\n以降はワークスペースBでの作業となります。',
 'ファイル(ボリューム)へのアクセス\nワークスペースBのカタログエクスプローラにアクセスし、Delta Sharing > 自分と共有にアクセスします。プロバイダーに提供者一覧が表示されます。提供者名は提供側に確認してください。  \n提供側の共有が表示されます。カタログを作成をクリックします。  \n受信者側におけるカタログ名を入力し、作成をクリックします。  \nこれで共有されたテーブルやボリュームにアクセスできるようになります。',
 'ノートブックへのアクセス\n上で作成したカタログにアクセスし、その他のアセットをクリックすると共有されているノートブックが表示されます。  \nクリックすることで内容を確認することができ、右上のクローン作成で任意の場所にコピーすることができます。  \nテーブルやファイルを共有する際には、サンプルとなるノートブックがあった方がコラボレーションが円滑になるかと思います。是非ご活用ください!  \nDatabricksクイックスタートガイド  \nDatabricks無料トライアル']
%sql
--インデックスを作成するためには、テーブルでチェンジデータフィードを有効化する必要があることに注意してください
CREATE TABLE IF NOT EXISTS takaakiyayoi_catalog.qiita_2023.qiita_documentation (
  id BIGINT GENERATED BY DEFAULT AS IDENTITY,
  url STRING,
  content STRING
) TBLPROPERTIES (delta.enableChangeDataFeed = true); 

Sparkのユーザー定義関数を使ってテーブルのレコードを一括でチャンキングします。

from pyspark.sql.functions import pandas_udf
import pandas as pd
import pyspark.sql.functions as F

# sparkですべてのドキュメントのチャンクを作成するためのユーザー定義関数(UDF)を作成しましょう
@pandas_udf("array<string>")
def parse_and_split(docs: pd.Series) -> pd.Series:
    return docs.apply(split_html_on_h2)
    
(spark.table("takaakiyayoi_catalog.qiita_2023.taka_qiita_2023")
      .filter('rendered_body is not null')
      .withColumn('content', F.explode(parse_and_split('rendered_body')))
      .select(F.col("url"), F.col("content"))
      .write.mode('overwrite').saveAsTable("takaakiyayoi_catalog.qiita_2023.qiita_documentation"))

display(spark.table("takaakiyayoi_catalog.qiita_2023.qiita_documentation"))

同じURLの記事が複数のチャンクに分割されました。

Screenshot 2024-11-10 at 9.59.12.png

ベクトル検索インデックスを作ります。今回はGUIから。最初に準備したエンべディングモデルのエンドポイントを指定して、処理をトリガーします。

Screenshot 2024-11-10 at 9.36.43.png
Screenshot 2024-11-10 at 9.42.11.png

ベクトルが作成されました。

Screenshot 2024-11-10 at 9.45.35.png

ツールの作成

DatabricksではRAGはツールを活用するエージェントシステム(複合AIシステム)の一つとみなしています。以下のツールを作成して、エージェントシステムで活用されるようにします。DatabricksではSQLやPythonを用いて関数を定義することで、容易にツールを作成することができます。

  • ベクトルDBへの検索
  • 検索結果と質問から回答を生成

ベクトルDB検索ツール

VECTOR_SEARCH関数を使うことで、SQLから直接ベクトルDBに問い合わせを行うことができます。

CREATE
OR REPLACE FUNCTION takaakiyayoi_catalog.qiita_2023.vector_search(
  query_string STRING
  COMMENT 'Databricks記事のベクトルストアへのクエリー'
) RETURNS TABLE(context STRING, url STRING) NOT DETERMINISTIC CONTAINS SQL
COMMENT 'Databricks記事のベクトルストアに問い合わせて問い合わせに類似する文書を返却する関数' RETURN
SELECT
  content, url
FROM
  VECTOR_SEARCH(
    index => "takaakiyayoi_catalog.qiita_2023.qiita_2023_vs",
    query => query_string,
    num_results => 1
  ) AS context

動作確認します。

SELECT * FROM takaakiyayoi_catalog.qiita_2023.vector_search("DeltaとParquetの違い")

Screenshot 2024-11-10 at 10.06.27.png

レスポンス生成ツール

ai_query関数を使うことで、SQLから直接LLMにレスポンスの指示を行うことができます。

CREATE
OR REPLACE FUNCTION takaakiyayoi_catalog.qiita_2023.generate_response(
  query_string STRING
  COMMENT 'Databricks記事のベクトルストアへのクエリー',
  context STRING
  COMMENT 'ベクトルストアから取得した文書',
  url STRING
  COMMENT 'ベクトルストアから取得した文書のURL'
) RETURNS TABLE(response STRING) NOT DETERMINISTIC CONTAINS SQL
COMMENT 'ベクトルストアからのレスポンスと質問に基づいて回答を生成する関数' RETURN
SELECT
  ai_query(
    'taka-openai-gpt-4o-mini',
    "あなたはDatabricksの専門家です。" || query_string || "という質問に対して" || context || "という文脈と" || url || "というURLが得られています。質問と得られた文脈とURLに基づいて、簡潔かつ適切な回答を生成してください。文脈が得られない場合には「わからない」と回答してください。文脈が質問に適していない場合には「質問に適していない」と回答してください。"
  ) AS response

AI Playgroundでの動作確認

これで以下のモジュールが準備できました。

  • gpt-4o-miniをサービングするエンドポイント
  • エンべディングモデルをサービングするエンドポイント
  • エンべディングモデルを用いて作成したベクトル検索インデックス
  • ベクトル検索インデックスを用いて問い合わせに類似した文書を取得するツール
  • 質問と検索結果から回答を生成するツール

AI Playgroundでこれらをまとめ上げて動作確認します。

Screenshot 2024-11-10 at 14.45.41.png

言語モデルとしてgpt-4o-mini、上で定義したツールを追加します。これによって、問い合わせに応じて、言語モデルがツールを活用するようになります。まさにエージェントシステムです。

DeltaとParquetの違いは

と言うような問い合わせを行うと、想定したようにベクトルDBへの検索が行われます。

Screenshot 2024-11-10 at 14.47.48.png

そして、検索結果と問い合わせ内容に基づいてレスポンスを生成します。

Screenshot 2024-11-10 at 14.48.07.png

Screenshot 2024-11-10 at 14.48.13.png

今回はプロトタイピングまででしたが、このようにモジュール化することで、さらに精度を高めるためにファインチューニングしたLLMと差し替えたり、ベクトルDB検索の挙動なども容易に調整することが可能になります。

そして、次回ではプロトタイピングしたRAG(エージェントシステム)のデプロイメントとレビューまでをカバーします。

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

4
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?