はじめに
現在、LLMを活用したアプリケーション開発において、エージェントは重要なトピックとなっています。その中で注目したいのが、LangChainの兄弟?的な位置付けであるLangGraphです。このフレームワークは、エージェントやマルチエージェントのワークフローを構築するために特別に設計されているようです。
LangGraphの特徴的な点は、「循環的な処理フロー」を自然に実現できることで、例えば、エージェントが情報を取得し、その結果を分析して新たな情報を取得する、という一連の処理を柔軟に実装できます。エージェント機能自体はLangChainでもできますが、グラフ化して状態を管理することでより直感的に扱えるということなのかなと思います。この点、現時点では学習不十分です。
今回は、LangGraphを使用して複数のデータベースを持つRAGシステムを構築します。具体的には、研究データベースと開発データベースという2つの異なるテキストのベクトルデータベースを用意し、ユーザーの質問内容に応じて適切なデータベースをAIが自動的に選択して回答を生成するエージェントシステムを実装します。
理解が十分でない可能性があります。実装も試行錯誤という状態です。
環境設定
GoogleColabを用いています。
まず必要なパッケージをインストールします。
PACKAGE_VERSIONS = {
'langchain': '0.3.10',
'langchain-community': '0.3.10',
'tiktoken': '0.8.0',
'langchain-openai': '0.2.11',
'langgraph': '0.2.56',
'chromadb': '0.5.23',
'langchain-text-splitters': '0.3.2',
'langgraph-sdk': '0.1.43',
'langchainhub': '0.1.21'
}
def generate_requirements():
with open('requirements.txt', 'w') as f:
for package, version in PACKAGE_VERSIONS.items():
f.write(f'{package}=={version}\n')
generate_requirements()
!pip install -r requirements.txt
OpenAIがエラーになる場合は下記も対応
https://community.openai.com/t/typeerror-asyncclient-init-got-an-unexpected-keyword-argument-proxies/1040287/4
!pip install --upgrade "httpx<0.28"
APIキーの設定
あらかじめGooglColabにシークレットを設定しておくこと
import os
import getpass
from google.colab import userdata
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
os.environ["OPENAI_API_KEY"] = userdata.get("OPENAI_API_KEY")
# OpenAI APIキーを設定
_set_env("OPENAI_API_KEY")
ライブラリのインポート
from langchain_openai import OpenAIEmbeddings
from langchain.docstore.document import Document
from langchain_community.vectorstores import Chroma
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langchain_core.tools import tool
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langgraph.graph import END, StateGraph, START
from langgraph.prebuilt import ToolNode
from langgraph.graph.message import add_messages
from typing_extensions import TypedDict, Literal, Annotated
from typing import Sequence
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain import hub
from pydantic import BaseModel, Field
データベースの準備
システムのデモンストレーション用に、研究データと開発データのダミーデータを作成します。
# ダミーデータ作成
research_texts = [
"研究レポート:新しいAIモデルが画像認識精度を98%に向上させた結果",
"学術論文サマリー:自然言語処理分野でTransformerが主流アーキテクチャになった理由",
"量子コンピューティングによる機械学習手法の最新動向"
]
development_texts = [
"プロジェクトA:UIデザイン完了、API統合中",
"プロジェクトB:新機能Xテスト中、バグ修正必要",
"製品Y:リリース前のパフォーマンス最適化段階"
]
次に、これらのテキストをベクトルデータベースに変換します。
# テキスト分割の設定
splitter = RecursiveCharacterTextSplitter(chunk_size=100, chunk_overlap=10)
# テキストからDocumentオブジェクトを生成
research_docs = splitter.create_documents(research_texts)
development_docs = splitter.create_documents(development_texts)
# ベクトルストア作成
embeddings = OpenAIEmbeddings()
research_vectorstore = Chroma.from_documents(
documents=research_docs,
embedding=embeddings,
collection_name="research_collection"
)
development_vectorstore = Chroma.from_documents(
documents=development_docs,
embedding=embeddings,
collection_name="development_collection"
)
検索ツールの作成
各データベースに対する検索ツールを作成します。
# 既存のリトリーバー(情報検索機能)を基にツールを作成する
from langchain.tools.retriever import create_retriever_tool
research_retriever = research_vectorstore.as_retriever()
development_retriever = development_vectorstore.as_retriever()
research_tool = create_retriever_tool(
research_retriever, # リトリーバーオブジェクト
"research_db_tool", # 作成するツールの名前
"Search information from the research database." # ツールの説明文
)
development_tool = create_retriever_tool(
development_retriever,
"development_db_tool",
"Search information from the development database."
)
# 作成したリサーチツールと開発ツールをリストにまとめる
tools = [research_tool, development_tool]
エージェントの実装
エージェントの状態管理のために、必要な型定義を行います。
class AgentState(TypedDict):
messages: Annotated[Sequence[AIMessage|HumanMessage|ToolMessage], add_messages]
エージェントの核となる部分を実装します。このエージェントは、ユーザーの質問を分析し、適切なデータベースを選択します。
def agent(state: AgentState): # state引数はエージェントの状態情報を保持
"""
エージェントノード:
ユーザ質問を元に、「研究DBを検索すべきか、開発DBを検索すべきか、または何もせず回答すべきか」をLLMに考えさせます。
"""
print("---CALL AGENT---")
messages = state["messages"]
# エージェントプロンプト: ユーザの質問を見て、researchかdevelopmentどちらが関連しそうか判断。
# 必要なら対応するツールを呼び出す形式(ReAct的フォーマット)を促す。
# ツールの呼び出しにはJSON形式を使用
system_msg = """You are a helpful AI assistant.
You have two tools:
1) research_db_tool: For academic or research-oriented queries
2) development_db_tool: For project development status queries
Decide which one to use based on the user's question.
- If the question is about research topics, use research_db_tool.
- If it's about project or development status, use development_db_tool.
- If the question can be answered directly with no external info, just answer the user directly without calling a tool.
IMPORTANT FORMAT RULES:
- To use a tool, follow this exact format:
Action: <tool_name>
{
"query": "<string>"
}
- DO NOT provide extra JSON objects or additional information outside this single JSON object.
- DO NOT output multiple JSON objects at once.
- If no tool is needed, do not use "Action:" and just answer directly.
Respond in a ReAct style:
- Think step by step in your reasoning (internally), but only output the final answer or the tool call as instructed.
"""
# メッセージにシステムプロンプトを挿入
all_messages = [HumanMessage(content=system_msg)] + list(messages) # システムプロンプト(system_msg)を最初のメッセージとして挿入し、ユーザの質問履歴(messages)を連結
model = ChatOpenAI(temperature=0, model="gpt-4o-mini-2024-07-18")
model = model.bind_tools(tools) # model.bind_tools(tools)で、エージェントが利用可能なツールを登録
response = model.invoke(all_messages)
return {"messages":[response]}
与えられた状態(AgentState)に基づいて次に取るべきアクションを判定
def simple_grade_documents(state: AgentState) -> Literal["generate","rewrite"]: # 型注釈.文字列 "generate" または "rewrite" のいずれかしか返さないことを明示
"""
シンプルな判定関数:
取得文書があれば"generate"、なければ"rewrite"へ
"""
messages = state["messages"]
last_message = messages[-1] # メッセージ履歴の最後(最新)のメッセージを取得。これが判定の対象
if isinstance(last_message, ToolMessage) and last_message.content.strip(): # 最後のメッセージが ToolMessage 型であることを確認
print("---DOCS FOUND, GO TO GENERATE---")
return "generate"
else:
print("---NO DOCS FOUND, TRY REWRITE---")
return "rewrite"
与えられた質問をより具体的かつ明確に書き換える
def rewrite(state: AgentState):
print("---REWRITE QUESTION---")
messages = state["messages"]
original_question = messages[0].content if len(messages)>0 else "N/A"
prompt = f"Rewrite the question '{original_question}' to be more specific and clearer." # 元の質問(original_question)を引用し、より具体的で明確な形に書き換える
model = ChatOpenAI(temperature=0, model="gpt-4o-mini-2024-07-18")
response = model.invoke([HumanMessage(content=prompt)])
return {"messages": [response]}
ユーザの質問に対して、提供されたコンテキストを基に最終的な回答を生成
def generate(state: AgentState):
print("---GENERATE FINAL ANSWER---")
messages = state["messages"]
question = messages[0].content
last_message = messages[-1]
docs = ""
if isinstance(last_message, ToolMessage):
docs = last_message.content # メッセージ履歴の最後のメッセージ(last_message)が ToolMessage 型の場合、その内容を docs に格納
prompt = """You are an assistant. Use the given context to answer the user's question.
If you don't know, just say you don't know.
Question: {question}
Context: {context}
Answer in concise 2-3 sentences."""
# プロンプトテンプレートを生成するクラス
chain_prompt = PromptTemplate(
template=prompt,
input_variables=["question","context"]
)
llm = ChatOpenAI(model_name="gpt-4o-mini-2024-07-18", temperature=0)
# プロンプトテンプレートから生成されたテキストを LLM に渡し、結果を文字列形式にパースするパイプライン
answer = chain_prompt | llm | StrOutputParser()
response = answer.invoke({"question": question, "context": docs})
return {"messages": [response]}
ワークフローの構築
LangGraphを使用して、エージェントのワークフローを構築します。
一連の流れをグラフデータ(ノードとエッジ)としてパズルのように積み重ねていくイメージで、ここの作業が肝なんだろうと捉えています。
workflow = StateGraph(AgentState)
# ノード定義
workflow.add_node("agent", agent)
retrieve_node = ToolNode(tools)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("rewrite", rewrite)
workflow.add_node("generate", generate)
# スタートはagent
workflow.add_edge(START, "agent")
# agentでツール呼び出しがあったらretrieveへ、それ以外ならEND
from langgraph.prebuilt import tools_condition
workflow.add_conditional_edges(
"agent",
tools_condition,
{
"tools": "retrieve",
END: END
}
)
# retrieveの後はドキュメント判定へ
workflow.add_conditional_edges("retrieve", simple_grade_documents)
workflow.add_edge("generate", END)
workflow.add_edge("rewrite", "agent")
# ワークフローを最終的にコンパイルして、実行可能な形式に
app = workflow.compile()
可視化
from IPython.display import Image, display
try:
display(Image(app.get_graph().draw_mermaid_png()))
except:
pass
ワークフローが可視化されます。これがすごくいい。
スタートしたらエージェントがtoolを判断し、検索をかけて回答を生成するルート、検索が十分出なかった場合にクエリを書き直すルートを構築できたようです。
システムの動作確認
実際にシステムを試してみます。
「AI研究における最新の進展は何ですか?」という質問を投げ、AIが研究データベース側を選択することに期待します。
# 研究関連の質問
query = "What is the latest advancement in AI research according to the reports?"
config = {"configurable":{"thread_id":"1"}}
for event in app.stream({"messages":[("user",query)]}, config):
print(event)
出力例
研究に関する質問「What is the latest advancement in AI research according to the reports?」を実行した際の出力:
---CALL AGENT---
{
'agent': {
'messages': [
AIMessage(
content='',
additional_kwargs={
'tool_calls': [
{
'id': 'call_Rnag0amEazcWzkXOHRwJEu50',
'function': {
'arguments': '{"query":"latest advancements in AI research"}',
'name': 'research_db_tool'
},
'type': 'function'
}
],
'refusal': None
},
response_metadata={
'token_usage': {
'completion_tokens': 20,
'prompt_tokens': 313,
'total_tokens': 333,
'completion_tokens_details': {
'accepted_prediction_tokens': 0,
'audio_tokens': 0,
'reasoning_tokens': 0,
'rejected_prediction_tokens': 0
},
'prompt_tokens_details': {
'audio_tokens': 0,
'cached_tokens': 0
}
},
'model_name': 'gpt-4o-mini-2024-07-18',
'system_fingerprint': 'fp_818c284075',
'finish_reason': 'tool_calls',
'logprobs': None
},
id='run-c391dad2-2e4c-470a-84a3-8d35ba624b90-0',
tool_calls=[
{
'name': 'research_db_tool',
'args': {
'query': 'latest advancements in AI research'
},
'id': 'call_Rnag0amEazcWzkXOHRwJEu50',
'type': 'tool_call'
}
],
usage_metadata={
'input_tokens': 313,
'output_tokens': 20,
'total_tokens': 333,
'input_token_details': {
'audio': 0,
'cache_read': 0
},
'output_token_details': {
'audio': 0,
'reasoning': 0
}
}
)
]
}
}
---DOCS FOUND, GO TO GENERATE---
{
'retrieve': {
'messages': [
ToolMessage(
content='量子コンピューティングによる機械学習手法の最新動向\n\n量子コンピューティングによる機械学習手法の最新動向\n\n量子コンピューティングによる機械学習手法の最新動向\n\n研究レポート:新しいAIモデルが画像認識精度を98%に向上させた結果',
name='research_db_tool',
id='3fe5a115-25b7-4492-8309-257f91525983',
tool_call_id='call_Rnag0amEazcWzkXOHRwJEu50'
)
]
}
}
---GENERATE FINAL ANSWER---
{
'generate': {
'messages': [
'The latest advancement in AI research, according to the reports, is the development of a new AI model that has improved image recognition accuracy to 98%. This advancement highlights the ongoing progress in machine learning techniques, particularly in the context of quantum computing.'
]
}
}
この出力から、システムの動作フローを確認できます:
- エージェントが質問を受け取り、research_db_tool(研究データベース)を使用することを決定
- ドキュメントが見つかり、生成フェーズに移行
- 最終的な回答を生成:画像認識精度が98%に向上した新しいAIモデルの開発について報告
まとめ
本記事では、LangGraphを使用して複数のデータベースを持つRAGシステムを構築しました。このシステムは、ユーザーの質問内容に応じて適切なデータベースを選択し、回答を生成することができます。
今後の発展として以下のような拡張が考えられます:
- データベースで回答できない場合のネット検索機能の追加
- より複雑な分岐ロジックの実装
- 回答の品質向上のための追加的なコンテキスト処理
LangGraphは、このような複雑なワークフローを直感的に実装できる強力なツールとして、活用が期待できるのかなと思います。もっと理解しないと・・・
参考文献
- LangGraph公式ドキュメント 特にRAGエージェントの項
https://langchain-ai.github.io/langgraph/tutorials/rag/langgraph_agentic_rag/ - LangChainドキュメント
https://python.langchain.com/