0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

LangchainとDatabricksで学ぶRAG:DatabricksマネージドなAdvanced RAG Chatbot② ベクトルインデックスの作成

Last updated at Posted at 2024-06-05

こちらの続きです。

導入

前回はRAGの検索に利用する文書データをチャンク化し、保管するところまで実施しました。

第2回はチャンクデータを使ったインデックス作成です。

流れの解説

この回ではDatabricks Vector Searchを使って、チャンク化したデータからインデックスを作成します。
基本的な流れは、こちらdbdemosの内容と同様です。

Step1. パッケージインストール/各種設定準備

ノートブックを作成し、必要なパッケージをインストール。

%pip install --upgrade --force-reinstall databricks-vectorsearch
%pip install -U langchain langchain_community

dbutils.library.restartPython()

インデックスを作成するための元テーブルや、インデックスの保管先名などを定義します。
また、埋め込みに利用するDatabricksモデルサービングのエンドポイント名も設定しています。

埋め込み用エンドポイントは以下の記事で作成したものを利用します。詳細は下記記事を参照ください。

from databricks.vector_search.client import VectorSearchClient

vector_search_endpoint_name = "default_vector_search_endpoint"
source_table_name = "training.databricks_qa_jp.databricks_documentation"
index_name = "training.databricks_qa_jp.databricks_documentation_vs_index"

embedding_endpoint_name = "embedding_bge_m3_endpoint"

Step2. Vector Searchエンドポイントの作成

Databricks Vector Searchエンドポイントを作成します。
※ 既に別のエンドポイントを作成済みであり、それを再利用する場合は不要です。

client = VectorSearchClient()

client.create_endpoint(
    name=vector_search_endpoint_name,
    endpoint_type="STANDARD"
)

Step3. インデックスの作成

チャンクテーブルからインデックスを作成します。

まず、ユーティリティとなる関数を定義。

import time

def index_exists(vsc, endpoint_name, index_full_name):
    """ インデックステーブルの存在チェック """
    try:
        vsc.get_index(endpoint_name, index_full_name).describe()
        return True
    except Exception as e:
        if "RESOURCE_DOES_NOT_EXIST" not in str(e):
            print(
                f"Unexpected error describing the index. This could be a permission issue."
            )
            raise e
    return False


def wait_for_index_to_be_ready(vsc, vs_endpoint_name, index_name):
    """ インデックステーブルが利用可能になるまで待つ """

    for i in range(180):
        idx = vsc.get_index(vs_endpoint_name, index_name).describe()
        index_status = idx.get("status", idx.get("index_status", {}))
        status = index_status.get(
            "detailed_state", index_status.get("status", "UNKNOWN")
        ).upper()
        url = index_status.get("index_url", index_status.get("url", "UNKNOWN"))
        if "ONLINE" in status:
            return
        if "UNKNOWN" in status:
            print(
                f"Can't get the status - will assume index is ready {idx} - url: {url}"
            )
            return
        elif "PROVISIONING" in status:
            if i % 40 == 0:
                print(
                    f"Waiting for index to be ready, this can take a few min... {index_status} - pipeline url:{url}"
                )
            time.sleep(10)
        else:
            raise Exception(
                f"""Error with the index - this shouldn't happen. DLT pipeline might have been killed.\n Please delete it and re-run the previous cell: vsc.delete_index("{index_name}, {vs_endpoint_name}") \nIndex details: {idx}"""
            )
    raise Exception(
        f"Timeout, your index isn't ready yet: {vsc.get_index(index_name, vs_endpoint_name)}"
    )

上記定義の関数を利用しつつ、インデックスを作成します。
チャンクテーブルには2種のチャンクデータが含まれていますが、インデックス化する対象は、細分化した方のチャンクとなります。

vsc = VectorSearchClient()

if not index_exists(vsc, vector_search_endpoint_name, index_name):
    print(f"Creating index {index_name} on endpoint {vector_search_endpoint_name}...")
    index = client.create_delta_sync_index(
        endpoint_name=vector_search_endpoint_name,
        source_table_name=source_table_name,
        index_name=index_name,
        pipeline_type="TRIGGERED",
        primary_key="id",
        embedding_dimension=1024,
        embedding_source_column="content",
        embedding_model_endpoint_name=embedding_endpoint_name,
    )
    wait_for_index_to_be_ready(vsc, vector_search_endpoint_name, index_name)
else:
    # Vector Searchのコンテンツを更新し、新規データを保存するように同期処理を起動
    wait_for_index_to_be_ready(vsc, vector_search_endpoint_name, index_name)
    vsc.get_index(vector_search_endpoint_name, index_name).sync()

print(f"index {index_name} on table {source_table_name} is ready")

正常終了すると、インデックスが作成されます。

image.png

Step4. 検索の試行

正しく検索できるか試してみます。

まず、インデックスを取得。

client = VectorSearchClient()
index = client.get_index(endpoint_name=vector_search_endpoint_name, index_name=index_name)

類似文書を検索し、Pandas Dataframeに整形して出力してみます。

import pandas as pd

def conv_related_docs_to_pdf(result):
    cols = [c["name"] for c in result["manifest"]["columns"]]
    return pd.DataFrame(result["result"]["data_array"], columns=cols)

result = index.similarity_search(
    query_text="Unity Catalogとはどのような機能なのか?",
    columns=["id", "large_chunk", "large_chunk_id", "content", "url"],
    num_results=5,
)

display(conv_related_docs_to_pdf(result))

結果は以下の通り。

image.png

表示の都合上、テキスト部分が一部隠れていますが、Unity Catalogに関する内容が取れているように見えます。

Step5. 検索の試行 in LangChain

LangChainのDatabricks Communityモジュール経由でも類似文書検索をしてみます。

from langchain_community.vectorstores import DatabricksVectorSearch
from langchain_community.embeddings import DatabricksEmbeddings
from pprint import pprint

embeddings = DatabricksEmbeddings(endpoint=embedding_endpoint_name)
dvs = DatabricksVectorSearch(
    index,
    text_column="content",
    columns=["id", "large_chunk", "large_chunk_id", "content", "url"],
)

query = "Unity Catalogとはどのような機能なのか?"
docs = dvs.similarity_search(query, k=3)

pprint(docs)
出力
[Document(page_content='Unity Catalogイベント', metadata={'large_chunk': 'サービスプリンシパル資格情報イベント (パブリック プレビュー)\n次のservicePrincipalCredentialsイベントがアカウント レベルで記録されます。  \nサービス  \n操作  \n説明  \nリクエストパラメーター  \nservicePrincipalCredentials  \ncreate  \nアカウント管理者は、サービスプリンシパルのOAuthシークレットを生成します。  \naccount_id  \nservice_principal  \nsecret_id  \nservicePrincipalCredentials  \nlist  \nアカウント管理者は、サービスプリンシパルの下にすべてのOAuthシークレットを一覧表示します。  \naccount_id  \nservice_principal  \nservicePrincipalCredentials  \ndelete  \nアカウント管理者がサービスプリンシパルのOAuthシークレットを削除します。  \naccount_id  \nservice_principal  \nsecret_id\n\nシングルサインオンイベント\nシングルサインオンイベント\n次のssoConfigBackendイベントはアカウント レベルで記録され、アカウント コンソールの SSO 認証に関連しています。  \nサービス  \n操作  \n説明  \nリクエストパラメーター  \nssoConfigBackend  \ncreate  \nアカウント管理者がアカウント コンソールのSSO構成を作成しました。  \naccount_id  \nsso_type  \nconfig  \nssoConfigBackend  \nget  \nアカウント管理者がアカウント コンソールのSSO構成の詳細を要求しました。  \naccount_id  \nsso_type  \nssoConfigBackend  \nupdate  \nアカウント管理者がアカウント コンソールのSSO構成を更新しました。  \naccount_id  \nsso_type  \nconfig\n\nUnity Catalogイベント', 'large_chunk_id': 25769803903.0, 'url': 'https://docs.databricks.com/ja/admin/account-settings/audit-logs.html', 'id': 29551.0}),
 Document(page_content='ワークスペースの Unity Catalog を有効にする前の考慮事項', metadata={'large_chunk': 'ワークスペースの Unity Catalog を有効にする前の考慮事項', 'large_chunk_id': 727.0, 'url': 'https://docs.databricks.com/ja/data-governance/unity-catalog/enable-workspaces.html', 'id': 41228.0}),
 Document(page_content='Unity Catalog: Unity Catalog は、Databricks ワークスペース全体で一元化されたアクセス制御、監査、系列、およびデータディスカバリー機能を提供します。', metadata={'large_chunk': '認証と承認\nこのセクションでは、Databricks SQL のユーザーとグループ、および資産へのアクセスを管理するときに知っておく必要がある概念について説明します。  \nユーザーとグループ: ユーザーは、システムにアクセスできる固有の個人です。 グループはユーザーの集合です。  \nPersonal アクセストークン: 不透明な文字列は、REST API に対する認証と、 技術パートナー 内のツールによる SQLウェアハウスへの接続に使用されます。  \nアクセス制御リスト: オブジェクトへのアクセスを必要とするプリンシパルにアタッチされた一連のアクセス許可。 ACL エントリは、オブジェクトと、そのオブジェクトに対して許可されるアクションを指定します。 ACL の各エントリは、プリンシパル、アクションタイプ、およびオブジェクトを指定します。  \nUnity Catalog: Unity Catalog は、Databricks ワークスペース全体で一元化されたアクセス制御、監査、系列、およびデータディスカバリー機能を提供します。', 'large_chunk_id': 8589935389.0, 'url': 'https://docs.databricks.com/ja/sql/get-started/concepts.html', 'id': 40189.0})]

こちらの方法でもできました。

この後の記事ではLangChainを使った方法での検索は行いませんが、状況に応じて適切なやり方を選んでいただくのがよいと思います。

まとめ

RAG検索用のインデックスが出来ました。
次回はインデックスも利用する、CRAGのパイプラインを構築します。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?