0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

LangchainとDatabricksで学ぶRAG:DatabricksマネージドなAdvanced RAG Chatbot③ RAGパイプラインの実装

Posted at

こちらの続きです。

導入

前回はRAGの検索に利用するチャンクデータを使ったインデックス作成を行いました。

第3回はRAGのパイプラインの実装です。

流れの解説

LangChain/LangGraphを使って、RAGを実行するパイプラインを実装します。
今回のパイプラインは、以下の記事でも実践したCRAGという手法で構築します。

処理は以下のような流れです。

image.png

特長として、ベクトル検索を実行(retrieve)した後、それが当初の質問・指示に沿った内容かどうかを個々にLLMでチェックします(grade_documents)。
もし、1件でも質問に沿った文書が得られなかった場合、Web検索(transfrom_query, web_search)を実行し、ベクトル検索で得られた結果とWeb検索で得られた結果をマージして最終的な回答を生成(generate)します。

これによって、ベクトルデータベースに回答が存在しない場合でも適切な回答を得られる可能性を向上し、また回答にそぐわない文書を除去することで回答内容の精度を向上することができます。

この回では、CRAGのパイプラインの実装までを行います。(次回はこの処理にフロントエンドとなるUIをつけてChatbotを完成させます)

Step1. LangGraphでグラフ処理を定義

モジュールとして利するために、ノートブックではなくgraph.pyというファイルを作成して各処理を定義します。

フルコードは以下となります。

graph.py
import os
import json

from databricks.vector_search.client import VectorSearchClient
from typing_extensions import TypedDict
from typing import List, Literal
import pandas as pd
import time

from pydantic import BaseModel, conlist
import mlflow.deployments

from langchain_core.documents import Document
from langchain_core.messages import HumanMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_community.chat_models import ChatDatabricks
from langchain_community.tools.tavily_search import TavilySearchResults

from langgraph.graph import END, StateGraph


vector_search_endpoint_name = "default_vector_search_endpoint"
source_table_name = "training.databricks_qa_jp.databricks_documentation"
index_name = "training.databricks_qa_jp.databricks_documentation_vs_index"

generate_endpoint_name = "mistral-7b-instruct-v03-endpoint"
grader_endpoint_name = "batch-inference2-endpoint "
embedding_endpoint_name = "embedding_bge_m3_endpoint"

client = VectorSearchClient(personal_access_token=os.environ["DATABRICKS_TOKEN"])
index = client.get_index(
    endpoint_name=vector_search_endpoint_name, index_name=index_name
)

# RAG Chain preparation
rag_chat = ChatDatabricks(endpoint=generate_endpoint_name, temperature=0.1)
rag_chat_template = ChatPromptTemplate.from_messages(
    [
        (
            "human",
            """You are an assistant for question-answering tasks."""
            """Use the following pieces of retrieved context to answer the question."""
            """You MUST reply in Japanese."""
            """If you don't know the answer, just say that you don't know. """
            """Use three sentences maximum and keep the answer concise.\n\n"""
            """Question: {question} \n\n"""
            """Context: {context} \n\n""",
        ),
    ]
)
rag_chain = rag_chat_template | rag_chat | StrOutputParser()

# Question Rewriter Chain preparation
qr_chat = ChatDatabricks(
    endpoint=generate_endpoint_name, temperature=0.1, max_tokens=100
)
qr_chat_template = ChatPromptTemplate.from_messages(
    [
        (
            "human",
            """You a question re-writer that converts an input question to a better version that is optimized \n"""
            """for web search. Look at the input and try to reason about the underlying semantic intent / meaning."""
            """Here is the initial question: \n\n {question}. Please reply improved question with no preamble.""",
        ),
    ]
)
question_rewriter = qr_chat_template | qr_chat | StrOutputParser()

web_search_tool = TavilySearchResults(k=3)


class GraphState(TypedDict):
    """
    グラフの状態を表します。

    属性:
        question: 質問
        generation: LLMの生成
        web_search: 検索を追加するかどうか
        documents: ドキュメントのリスト
        web_query: WEB検索用クエリ
    """

    question: str
    generation: str
    web_search: str
    documents: List[str]
    web_query: str


def convert_similarity_results_to_pandas(result):
    cols = [c["name"] for c in result["manifest"]["columns"]]
    return pd.DataFrame(result["result"]["data_array"], columns=cols)


# Post-processing
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)


# --- Graph Node ---


def retrieve(state):
    """
    関連ドキュメントを取得する

    Args:
        state (dict): 現在のグラフの状態

    Returns:
        state (dict): stateに追加された新しいキーであるdocumentsを含む状態
    """
    print("---ドキュメントの取得---")
    question = state["question"]

    # ドキュメントの取得 from Databricks Vector Search
    index.wait_until_ready()

    num_results = 10
    max_retries = 30
    retry_count = 0
    wait_time = 30
    while True:
        try:
            result = index.similarity_search(
                query_text=question,
                columns=["large_chunk_id", "large_chunk", "url"],
                num_results=num_results,
            )
            break
        except Exception as e:
            # Embedding用のエンドポイントが起動していないなど、Vector Searchが利用できない場合があるので、リトライする
            print(e)
            retry_count += 1
            if retry_count > max_retries:
                raise e
            time.sleep(wait_time)

    pdf = convert_similarity_results_to_pandas(result)
    pdf = pdf.drop("score", axis=1)
    pdf = pdf.drop_duplicates()  # 重複除去

    # 検索したコンテキストではなく、その親ドキュメントをコンテキストとして設定
    documents = [
        Document(
            page_content=row["large_chunk"],
            metadata={
                "large_chunk_id": row["large_chunk_id"],
                "url": row["url"],
            },
        )
        for index, row in pdf.iterrows()
    ]
    return {"documents": documents, "question": question}


def generate(state):
    """
    回答を生成する

    Args:
        state (dict): 現在のグラフの状態

    Returns:
        state (dict): stateに追加された新しいキーであるgenerationを含む状態
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    # LLMの生成
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}


def grade_documents(state):
    """
    質問に関連する文書を取得するかどうかを判断します。

    Args:
        state (dict): 現在のグラフの状態

    Returns:
        state (dict): フィルタリングされた関連文書のみを含むdocumentsキーで更新された状態
    """

    class Grade(BaseModel):
        grade: Literal["yes", "no"]

    print("---質問に関連する文書をチェック---")
    question = state["question"]
    documents = state["documents"]

    max_num_results = 5

    client = mlflow.deployments.get_deploy_client("databricks")

    # APIコールに必要なデータを準備
    system_prompt = (
        "You are a grader assessing relevance of a retrieved document to a user question. \n"
        f"Here is the user question: {question} \n"
        "Documents are provided as user prompt.\n"
        "If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \n"
        "Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question.\n"
        "Please return 'no' even if the document is an unintelligible sentence.\n\n"
        f"You MUST reply using the following schema:{Grade.schema_json()}"
    )
    inputs = pd.DataFrame([{"prompt": d.page_content} for d in documents])

    # 構造化出力APIをコール
    response = client.predict(
        endpoint=grader_endpoint_name,
        inputs={
            "inputs": inputs.to_dict(orient="records"),
            "params": {
                "temperature": 0.001,
                "max_tokens": 512,
                "system_prompt": system_prompt,
                "stop": "<|eot_id|>",
                "json_schema": Grade.schema_json(),
            },
        },
    )

    # 出力結果を整形
    outputs = pd.DataFrame(response.get("predictions"))
    def load_grade_json(x):
        try:
            return json.loads(x)
        except Exception as e:
            print("Can't convert to JSON:", x)
            return Grade(grade="no").dict()

    outputs["output"] = outputs["output"].apply(lambda x: load_grade_json(x))

    # Gradeの結果を基に、関連文書のみdocumentとして残す
    filtered_indices = []
    for index, row in outputs.iterrows():
        grade = row["output"] if row["output"] else {"grade": "no"}
        if grade["grade"] == "yes":
            print(f"---評価: 関連する文書---: {index}")
            filtered_indices.append(index)
        else:
            print(f"---評価: 関連しない文書---: {index}")
            continue
    
    # max_num_results以内で件数を返す
    filtered_indices = filtered_indices[:max_num_results]

    web_search = "Yes" if len(filtered_indices) != len(outputs) else "No"
    filtered_docs = [documents[i] for i in filtered_indices]
    return {"documents": filtered_docs, "question": question, "web_search": web_search}


def transform_query(state):
    """
    クエリを変換してより良い質問を生成します。

    Args:
        state (dict): 現在のグラフの状態

    Returns:
        state (dict): 質問キーを再構築された質問で更新した状態
    """

    print("---クエリの変換---")
    question = state["question"]
    documents = state["documents"]

    # 質問を書き直す
    better_question = question_rewriter.invoke({"question": question})
    return {"documents": documents, "question": question, "web_query":better_question}

def web_search(state):
    """
    質問の再構築に基づいてウェブ検索を実行します。

    Args:
        state (dict): 現在のグラフの状態

    Returns:
        state (dict): ウェブ結果が追加されたdocumentsキーで更新された状態
    """

    print("---ウェブ検索---")
    web_query = state["web_query"]
    question = state["question"]
    documents = state["documents"]

    # ウェブ検索
    docs = web_search_tool.invoke({"query": web_query})
    web_results = "\n".join([d["content"] for d in docs])
    web_results = Document(page_content=web_results)
    documents.append(web_results)

    return {"documents": documents, "question": question, "web_query":web_query}


# --- Graph Edge ---


def decide_to_generate(state):
    """
    質問に回答を生成するか、質問を再生成するかを決定します。

    Args:
        state (dict): 現在のグラフの状態

    Returns:
        str: 次に呼び出すノードのバイナリ判断
    """

    print("---評価されたドキュメントを評価---")
    question = state["question"]
    web_search = state["web_search"]
    filtered_documents = state["documents"]

    if web_search == "Yes":
        # 全てのドキュメントがフィルタリングされた場合は、クエリを再生成します
        print("---判断: 全てのドキュメントが質問に関連していないため、クエリを変換---")
        return "transform_query"
    else:
        # 関連するドキュメントがあるため、回答を生成します
        print("---判断: 生成---")
        return "generate"


# --- Build Graph ---


def build_graph():
    workflow = StateGraph(GraphState)

    # ノードを定義する
    workflow.add_node("retrieve", retrieve)  # データの取得
    workflow.add_node("grade_documents", grade_documents)  # ドキュメントの評価
    workflow.add_node("generate", generate)  # 回答の生成
    workflow.add_node("transform_query", transform_query)  # クエリの変換
    workflow.add_node("web_search_node", web_search)  # ウェブ検索

    # グラフを構築する
    workflow.set_entry_point("retrieve")
    workflow.add_edge("retrieve", "grade_documents")
    workflow.add_conditional_edges(
        "grade_documents",
        decide_to_generate,
        {
            "transform_query": "transform_query",
            "generate": "generate",
        },
    )
    workflow.add_edge("transform_query", "web_search_node")
    workflow.add_edge("web_search_node", "generate")
    workflow.add_edge("generate", END)

    # コンパイルする
    app = workflow.compile()

    return app

こちらを、コード内容を処理ごとに解説していきます。

Step1-1. グラフ状態の定義

LangGraphグラフを構築するための状態クラスを定義します。
状態として、質問文字列、生成結果、Web検索を行うかどうかのフラグ、検索された文書、Web検索用クエリを保持できるようにします。

class GraphState(TypedDict):
    """
    グラフの状態を表します。

    属性:
        question: 質問
        generation: LLMの生成
        web_search: 検索を追加するかどうか
        documents: ドキュメントのリスト
        web_query: WEB検索用クエリ
    """

    question: str
    generation: str
    web_search: str
    documents: List[str]
    web_query: str

Step1-2. 検索処理

Databricks Vector Searchを使い、類似文書を検索します。
今回は10件取得するようにしました。
その上で、実際に検索した文書ではなく、その親である大きいサイズのチャンク(large_chunk)を最終的な検索結果文書として返しています。
これによって検索は小さいサイズのチャンクを使うことで精度を上げ、最終的な生成にはセクション全体の文書を使うようにしています。

def retrieve(state):
    """
    関連ドキュメントを取得する

    Args:
        state (dict): 現在のグラフの状態

    Returns:
        state (dict): stateに追加された新しいキーであるdocumentsを含む状態
    """
    print("---ドキュメントの取得---")
    question = state["question"]

    # ドキュメントの取得 from Databricks Vector Search
    index.wait_until_ready()

    num_results = 10
    max_retries = 30
    retry_count = 0
    wait_time = 30
    while True:
        try:
            result = index.similarity_search(
                query_text=question,
                columns=["large_chunk_id", "large_chunk", "url"],
                num_results=num_results,
            )
            break
        except Exception as e:
            # Embedding用のエンドポイントが起動していないなど、Vector Searchが利用できない場合があるので、リトライする
            print(e)
            retry_count += 1
            if retry_count > max_retries:
                raise e
            time.sleep(wait_time)

    pdf = convert_similarity_results_to_pandas(result)
    pdf = pdf.drop("score", axis=1)
    pdf = pdf.drop_duplicates()  # 重複除去

    # 検索したコンテキストではなく、その親ドキュメントをコンテキストとして設定
    documents = [
        Document(
            page_content=row["large_chunk"],
            metadata={
                "large_chunk_id": row["large_chunk_id"],
                "url": row["url"],
            },
        )
        for index, row in pdf.iterrows()
    ]
    return {"documents": documents, "question": question}

Step1-3. 最終回答生成処理

検索文書を用いて、最終的な回答を生成する処理です。

今回は以下の記事で作成したモデルサービングのエンドポイントを利用しています。
処理としてはLangChainで文書を基に回答生成するChainを作成し、それを用いて回答生成を行っています。

# RAG Chain preparation
rag_chat = ChatDatabricks(endpoint=generate_endpoint_name, temperature=0.1)
rag_chat_template = ChatPromptTemplate.from_messages(
    [
        (
            "human",
            """You are an assistant for question-answering tasks."""
            """Use the following pieces of retrieved context to answer the question."""
            """You MUST reply in Japanese."""
            """If you don't know the answer, just say that you don't know. """
            """Use three sentences maximum and keep the answer concise.\n\n"""
            """Question: {question} \n\n"""
            """Context: {context} \n\n""",
        ),
    ]
)
rag_chain = rag_chat_template | rag_chat | StrOutputParser()

def generate(state):
    """
    回答を生成する

    Args:
        state (dict): 現在のグラフの状態

    Returns:
        state (dict): stateに追加された新しいキーであるgenerationを含む状態
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    # LLMの生成
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}

Step1-4. 文書のグレード確認

検索で取得された文書が、質問と関連があるかをLLMに確認させる工程です。

以下で作成した構造化出力用バッチエンドポイントを利用します。

行っている処理はエンドポイントのAPIをコールし、JSON形式の文字列で結果を取得。
その内容を基に、回答に関係ない文書をフィルタしています。
また、1件でも関係ない文書を含んでいる場合、Web検索を併用することを示すフラグをONにしています。

def grade_documents(state):
    """
    質問に関連する文書を取得するかどうかを判断します。

    Args:
        state (dict): 現在のグラフの状態

    Returns:
        state (dict): フィルタリングされた関連文書のみを含むdocumentsキーで更新された状態
    """

    class Grade(BaseModel):
        grade: Literal["yes", "no"]

    print("---質問に関連する文書をチェック---")
    question = state["question"]
    documents = state["documents"]

    max_num_results = 5

    client = mlflow.deployments.get_deploy_client("databricks")

    # APIコールに必要なデータを準備
    system_prompt = (
        "You are a grader assessing relevance of a retrieved document to a user question. \n"
        f"Here is the user question: {question} \n"
        "Documents are provided as user prompt.\n"
        "If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \n"
        "Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question.\n"
        "Please return 'no' even if the document is an unintelligible sentence.\n\n"
        f"You MUST reply using the following schema:{Grade.schema_json()}"
    )
    inputs = pd.DataFrame([{"prompt": d.page_content} for d in documents])

    # 構造化出力APIをコール
    response = client.predict(
        endpoint=grader_endpoint_name,
        inputs={
            "inputs": inputs.to_dict(orient="records"),
            "params": {
                "temperature": 0.001,
                "max_tokens": 512,
                "system_prompt": system_prompt,
                "stop": "<|eot_id|>",
                "json_schema": Grade.schema_json(),
            },
        },
    )

    # 出力結果を整形
    outputs = pd.DataFrame(response.get("predictions"))
    def load_grade_json(x):
        try:
            return json.loads(x)
        except Exception as e:
            print("Can't convert to JSON:", x)
            return Grade(grade="no").dict()

    outputs["output"] = outputs["output"].apply(lambda x: load_grade_json(x))

    # Gradeの結果を基に、関連文書のみdocumentとして残す
    filtered_indices = []
    for index, row in outputs.iterrows():
        grade = row["output"] if row["output"] else {"grade": "no"}
        if grade["grade"] == "yes":
            print(f"---評価: 関連する文書---: {index}")
            filtered_indices.append(index)
        else:
            print(f"---評価: 関連しない文書---: {index}")
            continue
    
    # max_num_results以内で件数を返す
    filtered_indices = filtered_indices[:max_num_results]

    web_search = "Yes" if len(filtered_indices) != len(outputs) else "No"
    filtered_docs = [documents[i] for i in filtered_indices]
    return {"documents": filtered_docs, "question": question, "web_search": web_search}

Step1-5. Web検索用のクエリ変換

(Web検索を実施する場合)Web検索へ適した形に質問文を変換する処理です。
最終回答生成と同じLLMのエンドポイントを用いて専用のChainを作成し、変換を実行させています。

# Question Rewriter Chain preparation
qr_chat = ChatDatabricks(
    endpoint=generate_endpoint_name, temperature=0.1, max_tokens=100
)
qr_chat_template = ChatPromptTemplate.from_messages(
    [
        (
            "human",
            """You a question re-writer that converts an input question to a better version that is optimized \n"""
            """for web search. Look at the input and try to reason about the underlying semantic intent / meaning."""
            """Here is the initial question: \n\n {question}. Please reply improved question with no preamble.""",
        ),
    ]
)
question_rewriter = qr_chat_template | qr_chat | StrOutputParser()

def transform_query(state):
    """
    クエリを変換してより良い質問を生成します。

    Args:
        state (dict): 現在のグラフの状態

    Returns:
        state (dict): 質問キーを再構築された質問で更新した状態
    """

    print("---クエリの変換---")
    question = state["question"]
    documents = state["documents"]

    # 質問を書き直す
    better_question = question_rewriter.invoke({"question": question})
    return {"documents": documents, "question": question, "web_query":better_question}

Step1-6. Web検索実行

Tabilyを使ってWeb検索を実行します。
検索に使うクエリは、「Web検索用のクエリ変換」処理で作成したWeb検索用クエリです。

web_search_tool = TavilySearchResults(k=3)

def web_search(state):
    """
    質問の再構築に基づいてウェブ検索を実行します。

    Args:
        state (dict): 現在のグラフの状態

    Returns:
        state (dict): ウェブ結果が追加されたdocumentsキーで更新された状態
    """

    print("---ウェブ検索---")
    web_query = state["web_query"]
    question = state["question"]
    documents = state["documents"]

    # ウェブ検索
    docs = web_search_tool.invoke({"query": web_query})
    web_results = "\n".join([d["content"] for d in docs])
    web_results = Document(page_content=web_results)
    documents.append(web_results)

    return {"documents": documents, "question": question, "web_query":web_query}

Step1-7. 条件分岐エッジの定義

Web検索を行うかどうかを分岐するエッジ処理の定義です。
内容はシンプル。

def decide_to_generate(state):
    """
    質問に回答を生成するか、質問を再生成するかを決定します。

    Args:
        state (dict): 現在のグラフの状態

    Returns:
        str: 次に呼び出すノードのバイナリ判断
    """

    print("---評価されたドキュメントを評価---")
    question = state["question"]
    web_search = state["web_search"]
    filtered_documents = state["documents"]

    if web_search == "Yes":
        # 全てのドキュメントがフィルタリングされた場合は、クエリを再生成します
        print("---判断: 全てのドキュメントが質問に関連していないため、クエリを変換---")
        return "transform_query"
    else:
        # 関連するドキュメントがあるため、回答を生成します
        print("---判断: 生成---")
        return "generate"

Step1-8. 条件分岐エッジの定義

これまで定義したノードやエッジ処理を基に、LangGraphで以下のようにグラフを構築します。

image.png

def build_graph():
    workflow = StateGraph(GraphState)

    # ノードを定義する
    workflow.add_node("retrieve", retrieve)  # データの取得
    workflow.add_node("grade_documents", grade_documents)  # ドキュメントの評価
    workflow.add_node("generate", generate)  # 回答の生成
    workflow.add_node("transform_query", transform_query)  # クエリの変換
    workflow.add_node("web_search_node", web_search)  # ウェブ検索

    # グラフを構築する
    workflow.set_entry_point("retrieve")
    workflow.add_edge("retrieve", "grade_documents")
    workflow.add_conditional_edges(
        "grade_documents",
        decide_to_generate,
        {
            "transform_query": "transform_query",
            "generate": "generate",
        },
    )
    workflow.add_edge("transform_query", "web_search_node")
    workflow.add_edge("web_search_node", "generate")
    workflow.add_edge("generate", END)

    # コンパイルする
    app = workflow.compile()

    return app

Step2. グラフの試行

作成したモジュールを実際に動かしてみます。

まず、ノートブックを作成し、必要なパッケージ類をインストール。

%pip install --upgrade --force-reinstall databricks-vectorsearch
%pip install -U langchain langgraph langchain_community tavily-python pydantic

dbutils.library.restartPython()

TavilyやDatabricksモデルサービングエンドポイントを利用するため、環境変数にAPIキー等を設定。
※ TavilyのAPIキーについては、こちらなどを参考にしてください。

# Tavily APIを実行するためのAPIキーを設定
os.environ["TAVILY_API_KEY"] = dbutils.secrets.get("tavily", "api_key")

os.environ["DATABRICKS_HOST"] = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() 
os.environ["DATABRICKS_TOKEN"] = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()

グラフを構築。

from graph import build_graph

app = build_graph()

グラフを可視化してみます。

from IPython.display import Image, display

try:
    display(Image(app.get_graph().draw_mermaid_png()))
except:
    # This requires some extra dependencies and is optional
    pass

image.png

では、動かしてみましょう。

user_input = "Unity Catalogとは何ですか"

for event in app.stream({"question": user_input}):
    print(event.keys())
    if "generate" in event:
        print(event.get("generate").get('generation'))
出力
---ドキュメントの取得---
dict_keys(['retrieve'])
---質問に関連する文書をチェック---
---評価: 関連しない文書---: 0
---評価: 関連する文書---: 1
---評価: 関連する文書---: 2
---評価: 関連しない文書---: 3
---評価: 関連しない文書---: 4
---評価: 関連する文書---: 5
---評価: 関連しない文書---: 6
---評価: 関連する文書---: 7
---評価: 関連しない文書---: 8
---評価: 関連する文書---: 9
---評価されたドキュメントを評価---
---判断: 全てのドキュメントが質問に関連していないため、クエリを変換---
dict_keys(['grade_documents'])
---クエリの変換---
dict_keys(['transform_query'])
---ウェブ検索---
dict_keys(['web_search_node'])
---GENERATE---
dict_keys(['generate'])
 Unity Catalog (ユニティ カタログ) は、Databricks ワークスペース全体で一元化されたアクセス制御、監査、系列、およびデータディスカバリー機能を提供します。

最後に生成した回答を出力していますが、なんとなく正しそうです。
ログを見ると、ベクトル検索で取得した10件のうち、回答に適した内容は5件でした。
グレードの確認結果自体が正しいのか、というところもありますが、不要な検索結果を適切にフィルタできているように思います。

また、最終出力ももうちょっと詳細を教えて欲しいところですね。
このあたりはプロンプトなどの改善の余地があります。


では、Databricksのドキュメントに載ってない(だろう)ことを聞いてみましょう。

user_input = "日本で一番高い山は?"

for event in app.stream({"question": user_input}):
    print(event.keys())
    if "generate" in event:
        print(event.get("generate").get('generation'))
出力
---ドキュメントの取得---
dict_keys(['retrieve'])
---質問に関連する文書をチェック---
---評価: 関連しない文書---: 0
---評価: 関連しない文書---: 1
---評価: 関連しない文書---: 2
---評価: 関連しない文書---: 3
---評価: 関連しない文書---: 4
---評価: 関連しない文書---: 5
---評価: 関連しない文書---: 6
---評価: 関連しない文書---: 7
---評価: 関連しない文書---: 8
---評価: 関連しない文書---: 9
---評価されたドキュメントを評価---
---判断: 全てのドキュメントが質問に関連していないため、クエリを変換---
dict_keys(['grade_documents'])
---クエリの変換---
dict_keys(['transform_query'])
---ウェブ検索---
dict_keys(['web_search_node'])
---GENERATE---
dict_keys(['generate'])
 日本で一番高い山は、富士山です。富士山の高さは12,388フートです。

検索文書は全て関連しないという判定になり、おそらくWeb検索の結果だけで回答しているのことわかります。

ユースケースとして悩ましいところはありますが、汎用的なQ&Aを行う仕組みができたのではないかと思います。

まとめ

Databricksモデルサービングのエンドポイントを活用した、少し高度なRAG(CRAG)の構築ができました。
次回はこれにUIを付けて、実際のアプリケーションライクに動作させてみたいと思います。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?