はじめに
とあるコンペに参加したとき、RAGを構築する必要があったのですが、LangGraphという、LLMのワークフローを簡単に構築できるライブラリの存在を知ったので、使ってみました。
結論として上手くできませんでした、なので、Langgraphの使い方、メモ程度の記事です。
インストール
pip install dotenv
pip install openai
pip install langchain
手法の概要
今回はlanggraph用いて、以下のワークフローを構築します。RAGの参考文章の読み込み、検索はワークフローの前段で行います。
- 参考文章の整形
- 回答生成
- 回答に問題ないかチェック、問題あれば2に戻る、問題なければ終了
コード
Azure周りの設定、LLMの設定
from dotenv import load_dotenv
load_dotenv()
from openai import AzureOpenAI
import os
# Azure OpenAIの設定
AZURE_OPENAI_API_KEY_FOR_EMB = os.getenv("AZURE_OPENAI_API_KEY_FOR_EMB")
AZURE_OPENAI_ENDPOINT_FOR_EMB = os.getenv("AZURE_OPENAI_ENDPOINT_FOR_EMB")
API_VERSION_FOR_EMB = os.getenv("API_VERSION_FOR_EMB")
DEPLOYMENT_ID_FOR_EMB = os.getenv("DEPLOYMENT_ID_FOR_EMB")
# AzureOpenAIのクライアントを初期化
client_embedding = AzureOpenAI(
api_key=AZURE_OPENAI_API_KEY_FOR_EMB,
azure_endpoint=AZURE_OPENAI_ENDPOINT_FOR_EMB,
api_version=API_VERSION_FOR_EMB,
)
from langchain_openai import AzureChatOpenAI
from langchain_core.runnables import ConfigurableField
# Azure OpenAIの設定
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
API_VERSION = os.getenv("API_VERSION")
DEPLOYMENT_ID_FOR_CHAT_COMPLETION = os.getenv("DEPLOYMENT_ID_FOR_CHAT_COMPLETION")
llm4chat = AzureChatOpenAI(api_key=AZURE_OPENAI_API_KEY, azure_endpoint=AZURE_OPENAI_ENDPOINT, api_version=API_VERSION ,model=DEPLOYMENT_ID_FOR_CHAT_COMPLETION, temperature=0.0)
# 後からmax_tokensの値を変更できるように、変更可能なフィールドを宣言
llm4chat = llm4chat.configurable_fields(max_tokens=ConfigurableField(id='max_tokens'))
RAGのための文章読み込み
# 文章の読み込み
from langchain.text_splitter import CharacterTextSplitter
import os
# 文章読み込み
folder_path = "PATH/TO/TEXT_FOLDER"
md_files_content = []
for filename in os.listdir(folder_path):
if filename.endswith(".md"):
with open(os.path.join(folder_path, filename), 'r', encoding='utf-8') as file:
text = file.read()
# CharacterTextSplitterインスタンスを作成
text_splitter = CharacterTextSplitter(
chunk_size=1000,
chunk_overlap=150
)
# テキストをチャンクに分割
chunks = text_splitter.split_text(text)
# チャンクを配列に追加
md_files_content.extend(chunks)
# Embedding APIを用いて各テキストをベクトル化
embeddings = [] # 取得したベクトルを格納するリスト
for text in md_files_content:
# Azure OpenAI Service の Embedding APIを呼び出し
response = client_embedding.embeddings.create(
input=text, # Embeddingしたいテキスト
model=DEPLOYMENT_ID_FOR_EMB # 使用するEmbeddingモデル(デプロイ名)
)
embedding_vector = response.data[0].embedding
embeddings.append(embedding_vector)
ワークフローの定義
# Stateを定義(これを更新しながらワークフローが進む)
import operator
from typing import Annotated
from pydantic import BaseModel, Field
class State(BaseModel):
query: str = Field(..., description="ユーザーからの質問")
ref_docs: str = Field(..., description="参照文書")
messages: Annotated[list[str], operator.add] = Field(
default=[], description="回答履歴"
)
current_judge: bool = Field(
default=False, description="品質チェックの結果"
)
attenion: str = Field(
default="", description="品質チェックの結果に対する注意事項"
)
# ref_docsの整形ノード
from typing import Any
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
def format_ref_node(state: State) -> dict[str, Any]:
ref_docs = state.ref_docs
prompt = ChatPromptTemplate.from_template(
"""
あなたはテキスト整形の専門家です。
ユーザーはHTMLタグや改行を使って図表を含んだ文章を作成しています。
その文章を、内容の抜け漏れがないように、わかりやすく整形してください。
成形されたものはRAGシステムに使用するベクトルDBを作成する文章になります。
そのため、ベクトルDBに使用するために適した形に整形してください。
また、整形したテキスト以外は応答に含めないでください。
ユーザーの文章:{ref_docs}
整形後の文章:""".strip()
)
chain = prompt | llm4chat | StrOutputParser()
ref = chain.invoke({"ref_docs": ref_docs})
return {"ref_docs": ref}
# 回答生成ノード
from typing import Any
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
def answering_node(state: State) -> dict[str, Any]:
query = state.query
ref_docs = state.ref_docs
attentnion = state.attenion
prompt = ChatPromptTemplate.from_template(
"""
あなたは優秀なAIアシスタントです。
コンテキストだけをもとに、質問に26文字以内で回答してください。
また注意点がある場合は、注意点も考慮して回答してください。
情報がコンテキストに含まれない場合は『わかりません』と答えてください。
質問: {query}"
コンテキスト:
{ref_docs}
注意点:
{attention}
回答:""".strip()
)
chain = prompt | llm4chat | StrOutputParser()
answer = chain.invoke({"query": query, "ref_docs": ref_docs, "attention": attentnion})
return {"messages": [answer]}
class Judgement(BaseModel):
judge: bool = Field(..., description="判定結果")
attention: str = Field(..., description="注意点")
def check_node(state: State) -> dict[str, Any]:
query = state.query
answer = state.messages[-1]
prompt = ChatPromptTemplate.from_template(
"""以下の回答の品質をチェックし、問題がある場合は'False'、問題がない場合は'True'を回答してください。
また、回答に問題がなくなるように、システムプロンプトに追記すべき注意点も回答してください。
ユーザーからの質問: {query}
回答: {answer}
""".strip()
)
chain = prompt | llm4chat.with_structured_output(Judgement)
result: Judgement = chain.invoke({"query": query, "answer": answer})
return {
"current_judge": result.judge,
"judgement_reason": result.attention
}
# ワークフローを定義
from langgraph.graph import StateGraph
from langgraph.graph import END
workflow = StateGraph(State)
# ノードの設定
workflow.add_node("format_ref", format_ref_node)
workflow.add_node("answering", answering_node)
workflow.add_node("check", check_node)
# 開始ノードの設定
workflow.set_entry_point("format_ref")
# エッジの設定
workflow.add_edge("format_ref", "answering")
workflow.add_edge("answering", "check")
# checkノードから次のノードへの遷移に条件付きエッジを定義
# state.current_judgeの値がTrue、もしくはstate.messagesが2個以上あればENDノードへ、Falseならansweringノードへ
workflow.add_conditional_edges(
"check",
lambda state: (state.current_judge)|(state.current_judge=="true")|(len(state.messages)>=2),
{True: END, False: "answering"}
)
compiled = workflow.compile()
参考文献の検索、回答を生成
# コサイン類似度を計算する関数を定義
import numpy as np
def cosine_similarity(vec_a, vec_b):
# np.arrayに変換しておくと、ドット積やノルムの計算が簡単
a = np.array(vec_a)
b = np.array(vec_b)
# コサイン類似度 = (a · b) / (||a|| * ||b||)
# np.dot() で内積を計算し、np.linalg.norm() でベクトルの長さを計算
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
# 問題文の読み込み
import pandas as pd
# 質問が記載されたファイル
questions = pd.read_csv("PATH/TO/QUESTIONS")
# 回答記録用に、変数を設定
output = questions.copy()
# 1行ずつ処理
from tqdm import tqdm
for i in tqdm(range(len(questions))):
now_query = questions.loc[i, "problem"]
# 質問をベクトル化
response = client_embedding.embeddings.create(
input=now_query,
model=DEPLOYMENT_ID_FOR_EMB
)
new_text_embedding = response.data[0].embedding
# コサイン類似度を計算
similarities = []
for idx, embedding_vector in enumerate(embeddings):
score = cosine_similarity(embedding_vector, new_text_embedding)
similarities.append((md_files_content[idx], score))
# 類似度スコアが高い順にソート
similarities.sort(key=lambda x: x[1], reverse=True)
# similarityの上位30件のテキストを取得
ref_docs = ""
for cnt in range(30):
ref_docs += f"* {similarities[cnt][0]}\n"
# 実行
initial_state = State(query=now_query, ref_docs=ref_docs)
result = compiled.invoke(initial_state)
# 結果を記録
output.loc[i,"problem"] = result["messages"][-1]
結果
ほとんどの場合に「わかりません」と回答されてしまいました。
さいごに
結論は上手くできませんでした、です。
改善点としては、
- 回答が不十分な場合にリランクを行う
- Web検索を組み込む
- 検索時にベクトル検索だけでなく、キーワード検索も行う
- ファイル読み込み後の処理を充実させる
など、挙げればキリがなさそうです。