LoginSignup
2
3

langchainとDatabricksで(私が)学ぶRAG : Adaptive RAG 後編

Posted at

こちらの続きです。

前編ではRAG用のベクトルストア準備から、LLM・各種チェーンの構築、Web検索用ツールの準備まで行いました。
後編は、これらを使ってAdaptive RAGのパイプラインを構成します。

※ 今回はほぼ以下のサンプルノートブックと同様です。

Step5. グラフの構築

LangGraphを使って処理パイプラインとなるグラフを構築します。

グラフ状態クラス定義

グラフ内で利用する状態クラスを定義します。

from typing_extensions import TypedDict
from typing import List

class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        documents: list of documents 
    """
    question : str
    generation : str
    documents : List[str]

ノード・条件分岐エッジ処理の定義

グラフのノード・エッジ処理内容を関数として定義します。
このあたり元ノートブックと全く同じです。
基本的にはStep2で作成したRetrieverやStep3で作成したチェーンを実行し、状態に結果を入れていきます。

### Nodes

from langchain.schema import Document

def retrieve(state):
    """
    Retrieve documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    question = state["question"]

    # Retrieval
    documents = retriever.get_relevant_documents(question)
    return {"documents": documents, "question": question}

def generate(state):
    """
    Generate answer

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]
    
    # RAG generation
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}

def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with only filtered relevant documents
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]
    
    # Score each doc
    filtered_docs = []
    for d in documents:
        score = retrieval_grader.invoke({"question": question, "document": d.page_content})
        grade = score['score']
        if grade == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            continue
    return {"documents": filtered_docs, "question": question}

def transform_query(state):
    """
    Transform the query to produce a better question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates question key with a re-phrased question
    """

    print("---TRANSFORM QUERY---")
    question = state["question"]
    documents = state["documents"]

    # Re-write question
    better_question = question_rewriter.invoke({"question": question})
    return {"documents": documents, "question": better_question}

def web_search(state):
    """
    Web search based on the re-phrased question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with appended web results
    """

    print("---WEB SEARCH---")
    question = state["question"]

    # Web search
    docs = web_search_tool.invoke({"query": question})
    # 先頭400文字だけ利用
    web_results = "\n".join([d["content"][:400] for d in docs])
    web_results = Document(page_content=web_results)

    return {"documents": web_results, "question": question}

### Edges ###

def route_question(state):
    """
    Route question to web search or RAG.

    Args:
        state (dict): The current graph state

    Returns:
        str: Next node to call
    """

    print("---ROUTE QUESTION---")
    question = state["question"]
    print(question)
    source = question_router.invoke({"question": question})  
    print(source)
    print(source['datasource'])
    if source['datasource'] == 'web_search':
        print("---ROUTE QUESTION TO WEB SEARCH---")
        return "web_search"
    elif source['datasource'] == 'vectorstore':
        print("---ROUTE QUESTION TO RAG---")
        return "vectorstore"

def decide_to_generate(state):
    """
    Determines whether to generate an answer, or re-generate a question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """

    print("---ASSESS GRADED DOCUMENTS---")
    question = state["question"]
    filtered_documents = state["documents"]

    if not filtered_documents:
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print("---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY---")
        return "transform_query"
    else:
        # We have relevant documents, so generate answer
        print("---DECISION: GENERATE---")
        return "generate"

def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """

    print("---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = hallucination_grader.invoke({"documents": documents, "generation": generation})
    grade = score['score']

    # Check hallucination
    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        # Check question-answering
        print("---GRADE GENERATION vs QUESTION---")
        score = answer_grader.invoke({"question": question,"generation": generation})
        grade = score['score']
        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
    else:
        print("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        return "not supported"

グラフの構築

ここまで定義した内容を利用して、グラフを構成します。
基本的に、以下画像の流れとなります。

image.png

from langgraph.graph import END, StateGraph

workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("web_search", web_search) # web search
workflow.add_node("retrieve", retrieve) # retrieve
workflow.add_node("grade_documents", grade_documents) # grade documents
workflow.add_node("generate", generate) # generatae
workflow.add_node("transform_query", transform_query) # transform_query

# Build graph
workflow.set_conditional_entry_point(
    route_question,
    {
        "web_search": "web_search",
        "vectorstore": "retrieve",
    },
)
workflow.add_edge("web_search", "generate")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "transform_query": "transform_query",
        "generate": "generate",
    },
)
workflow.add_edge("transform_query", "retrieve")
workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "transform_query",
    },
)

# Compile
app = workflow.compile()

長かった準備がこれで終わりました。

Step6. グラフの実行

では、処理を実行してみます。

まずは、ベクトルデータを参照して回答するパイプラインが実行されるような質問を入れてみます。

# Run 
inputs = {"question": "葬送のフリーレンの原作者は誰?"}
for output in app.stream(inputs):
    for key, value in output.items():
        print(f"Node '{key}':")
    print("---")

# Final generation
print(value["generation"])
出力
---ROUTE QUESTION---
葬送のフリーレンの原作者は誰?
{'datasource': 'vectorstore'}
vectorstore
---ROUTE QUESTION TO RAG---
---RETRIEVE---
Node 'retrieve':
---
---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: GENERATE---
Node 'grade_documents':
---
---GENERATE---
---CHECK HALLUCINATIONS---
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
---GRADE GENERATION vs QUESTION---
---DECISION: GENERATION ADDRESSES QUESTION---
Node 'generate':
---
葬送のフリーレンの原作者は、山田鐘人です。この作品は、『週刊少年サンデー』にて連載されており、多くの賞を獲得しています。

In English, the original author of "葬送のフリーレン" (Sōsō no Furiren) is Yamazaki Kōichi. The manga is serialized in 'Shūkan Shōnen Sunday' and has won numerous awards, including the 14th Manga Taishō Award, the 25th Handan Tezuka Osamu Newcomer Award, and the 69th Shōgakukan Manga Award.

想定通り、vectorstoreのルートが選択され、正しい結果が取得できました。
(英語での翻訳が入っている(かつ、内容が誤っている)のはプロンプトの与え方のためだと思われます。ここは工夫の余地あり)


次にベクトルストアを使わず、Web検索を利用する質問を投げかけてみます。

# Run 
inputs = {"question": "Databricksの詳細機能を教えてください。"}
for output in app.stream(inputs):
    for key, value in output.items():
        # Node
        print(f"Node '{key}':")
    print("---")

# Final generation
print(value["generation"])
出力
---ROUTE QUESTION---
Databricksの詳細機能を教えてください。
{'datasource': 'web_search'}
web_search
---ROUTE QUESTION TO WEB SEARCH---
---WEB SEARCH---
Node 'web_search':
---
---GENERATE---
---CHECK HALLUCINATIONS---
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
---GRADE GENERATION vs QUESTION---
---DECISION: GENERATION ADDRESSES QUESTION---
Node 'generate':
---
データブリックスは、クラウド上の統合分析プラットフォームで、データ統合、データ分析、AI活用を行うことができます。主な機能は以下の通りです。

1. データウェアハウス: コスト削減や調整の課題に対応し、データの保存、共有、分析が容易になります。
2. データ処理ワークフローのスケジューリングと管理: データ処理の流れを効率的に管理できます。
3. ダッシュボードとビジュアライズATION: データを視覚化し、情報を効果的に伝えることができます。
4. セキュリティー、ガバナンス、高可用性、ディザスタリカバリーの管理: システムの安全性や信頼性を確保できます。
5. データの検出、アノテーション、探索: データの詳細な分析が可能です。
6. 機械学習(ML)のモデリング、追跡、モデルサービング: モデルの構築や管理が容易になります。
7. 生成AIソリューション: 自動化やAI技術を活用した解決策が提供されます。
8. オープンソースとのマネージド統合: オープンソースプロジェクトとの連携が容易になります。

これらの機能を活用することで、データ分析やAI活用の効率化が期待できます。

質問の内容からweb_seachルートに入り、検索結果から回答が生成されました。

Step7. グラフのビジュアライゼーション

最後に、構成したグラフを画像表示します。

from IPython.display import Image

Image(app.get_graph().draw_png())

image.png

反復的なフローが入るため、どうしても複雑な図になってしまいますね。

まとめ

Adaptive RAGのサンプルをDatabricksを使って実装し直してみました。

今回は簡単な例のみ実行していますが、質問によってはクエリ変換など、より反復的な処理が実行されます。
パラメータの調整やループ回数の制限など、実運用においてはいろいろ気にしないといけないことがありそうです。

とはいえ、クエリに基づいてルートを決めて実行、というのは品質・効率性の面で理にかなった考え方です。

※余談ですが、以下のOpenAI社のRAG戦略記事を思い出しました。

サーベイしてないのですが、ルート決定に特化したLLMや各種モデルというのが今後どんどん出てくるかもしれません。

2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3