【実践】LangGraph × RAGでAgentic AIを構築するチュートリアル
はじめに
LangChain 1.0のリリースに伴い、LangGraphが正式統合されました。従来の線形RAG(質問→検索→回答)から、条件分岐やループを含む高度なワークフローが構築できるようになりました。
この記事では、実際に動くコードを使って、LangGraphでAgentic RAGシステムを構築する方法をステップバイステップで解説します。
この記事で学べること
- LangGraphの基本構成要素(StateGraph、Node、Edge)の理解
- 条件分岐やループ処理を含むRAGワークフローの実装
- Human-in-the-Loop機能の活用
- プロダクション向けのエラーハンドリング
動作環境
- Python 3.10+
- LangChain 1.0.1
- LangGraph 1.0.1
1. 環境セットアップ
1.1 必要なライブラリのインストール
pip install langchain langchain-openai langchain-community langchain-text-splitters langgraph chromadb tenacity
1.2 OpenAI APIキーの設定
import os
import getpass
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = getpass.getpass("OpenAI APIキーを入力してください: ")
print("APIキーが設定されました")
注意: APIキーは公開しないでください。
getpassを使うことで、入力内容が画面に表示されません。
1.3 バージョン確認
import langchain
from importlib.metadata import version
print(f"LangChain version: {langchain.__version__}")
print(f"LangGraph version: {version('langgraph')}")
出力例:
LangChain version: 1.0.1
LangGraph version: 1.0.1
2. LangGraphの基本構成要素
LangGraphは以下の3つの要素で構成されています:
| 要素 | 役割 |
|---|---|
| StateGraph | 状態管理の中核 |
| Node | 具体的な処理を実行するコンポーネント |
| Edge | ノード間の接続とルーティング |
2.1 StateGraph(状態管理)
TypedDictを使用して型安全に状態を定義します。
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage
import operator
class AgentState(TypedDict):
# メッセージの履歴(operator.addで新しいメッセージが追加される)
messages: Annotated[Sequence[BaseMessage], operator.add]
# 次のステップ
next: str
# ドキュメント取得結果
documents: list
# 生成された回答
answer: str
ポイント:
operator.addを指定することで、新しいメッセージが既存のリストに追加されます(上書きではない)。
2.2 Node(処理ノード)
ノードは状態を受け取り、更新された状態を返す関数です。
from langchain_openai import ChatOpenAI
def chatbot_node(state: AgentState) -> dict:
"""LLMとの会話ノード"""
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
messages = state["messages"]
response = llm.invoke(messages)
return {
"messages": [response],
"next": "retrieve"
}
2.3 Edge(接続とルーティング)
LangGraphには3つの基本的なエッジタイプがあります:
- 通常エッジ: 常に次のノードが実行される
- 条件付きエッジ: 状態に応じて動的にルーティング
- エントリーポイント: グラフの開始点
from langgraph.graph import StateGraph, START, END
# 条件付きエッジのルーター関数
def router(state: AgentState) -> str:
"""次のノードを動的に決定"""
messages = state["messages"]
last_message = messages[-1]
if "再検索" in last_message.content:
return "retrieve"
elif "完了" in last_message.content:
return "end"
else:
return "generate"
3. Agentic RAGの実装
ここからは、実際に動作するAgentic RAGシステムを構築していきます。
3.1 必要なライブラリのインポート
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langgraph.graph import StateGraph, START, END
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.messages import HumanMessage, AIMessage
3.2 ベクトルストアの準備
# サンプルドキュメント
sample_documents = [
"量子コンピューティングによる機械学習手法の最新動向について解説します。量子コンピュータは従来のコンピュータでは困難な計算を高速に処理できます。",
"深層学習モデルの最適化技術には、学習率スケジューリング、勾配クリッピング、バッチ正規化などがあります。",
"自然言語処理の分野では、TransformerアーキテクチャがBERTやGPTの基盤となっています。",
"強化学習は、エージェントが環境と相互作用しながら報酬を最大化する行動を学習する手法です。",
"LangChainは、大規模言語モデル(LLM)を活用したアプリケーション開発のためのフレームワークです。",
"RAG(Retrieval-Augmented Generation)は、外部知識を検索してLLMの回答精度を向上させる技術です。"
]
# テキストの分割
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
docs = splitter.create_documents(sample_documents)
# ベクトルストアの作成
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(
documents=docs,
embedding=embeddings,
collection_name="research_collection"
)
# Retrieverの作成
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
出力例:
ドキュメント数: 6
Doc 1: 量子コンピューティングによる機械学習手法の最新動向について解説します。量子コンピュータは従来のコンピ...
Doc 2: 深層学習モデルの最適化技術には、学習率スケジューリング、勾配クリッピング、バッチ正規化などがあります...
...
3.3 エージェント状態の定義
class RAGAgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
documents: list
question: str
is_relevant: bool
answer: str
3.4 各ノードの実装
文書検索ノード
def retrieve_node(state: RAGAgentState) -> dict:
"""ドキュメント取得ノード"""
question = state["question"]
print(f"[検索] クエリ: {question}")
docs = retriever.invoke(question)
print(f"[検索] {len(docs)}件のドキュメントを取得")
return {"documents": docs}
文書関連性評価ノード
これは従来のRAGにはない重要な機能です。検索結果が質問に関連しているかをLLMで評価します。
def grade_documents_node(state: RAGAgentState) -> dict:
"""ドキュメント関連性評価ノード"""
documents = state["documents"]
question = state["question"]
grade_prompt = ChatPromptTemplate.from_template("""
You are a grader assessing relevance of a retrieved document to a user question.
Retrieved document: {document}
User question: {question}
Respond with only 'yes' or 'no'.
""")
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
grade_chain = grade_prompt | llm | StrOutputParser()
relevant_docs = []
print("[評価] ドキュメントの関連性を評価中...")
for doc in documents:
grade = grade_chain.invoke({
"document": doc.page_content,
"question": question
})
if "yes" in grade.lower():
relevant_docs.append(doc)
print(f" ✓ 関連あり: {doc.page_content[:40]}...")
else:
print(f" ✗ 関連なし: {doc.page_content[:40]}...")
is_relevant = len(relevant_docs) > 0
print(f"[評価] 結果: {len(relevant_docs)}/{len(documents)} 件が関連あり")
return {
"documents": relevant_docs,
"is_relevant": is_relevant
}
クエリ改善ノード
関連ドキュメントが見つからなかった場合に、クエリを改善して再検索します。
def rewrite_query_node(state: RAGAgentState) -> dict:
"""クエリ改善ノード"""
rewrite_prompt = ChatPromptTemplate.from_template("""
You are an assistant tasked with improving a user's search query.
Original question: {question}
Provide an improved, more specific version that would yield better search results.
Respond with only the improved query, nothing else.
""")
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
rewrite_chain = rewrite_prompt | llm | StrOutputParser()
original_question = state["question"]
improved_question = rewrite_chain.invoke({"question": original_question})
print(f"[クエリ改善]")
print(f" 元のクエリ: {original_question}")
print(f" 改善後: {improved_question}")
return {"question": improved_question}
回答生成ノード
def generate_answer_node(state: RAGAgentState) -> dict:
"""回答生成ノード"""
documents = state["documents"]
question = state["question"]
context = "\n\n".join([doc.page_content for doc in documents])
generate_prompt = ChatPromptTemplate.from_template("""
以下のコンテキストを参考に、ユーザーの質問に日本語で回答してください。
コンテキスト:
{context}
質問: {question}
回答:
""")
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
generate_chain = generate_prompt | llm | StrOutputParser()
print("[生成] 回答を生成中...")
answer = generate_chain.invoke({"context": context, "question": question})
print(f"[生成] 完了")
return {
"answer": answer,
"messages": [AIMessage(content=answer)]
}
3.5 ワークフローの構築
# StateGraphの作成
workflow = StateGraph(RAGAgentState)
# ノードの追加
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("grade_documents", grade_documents_node)
workflow.add_node("rewrite", rewrite_query_node)
workflow.add_node("generate", generate_answer_node)
# ルーター関数
def document_router(state: RAGAgentState) -> str:
"""ドキュメントの関連性に基づいて分岐"""
if state["is_relevant"]:
print("[ルーター] → generate(関連ドキュメントあり)")
return "generate"
else:
print("[ルーター] → rewrite(再検索が必要)")
return "rewrite"
# エッジの追加
workflow.add_edge(START, "retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
"grade_documents",
document_router,
{
"generate": "generate",
"rewrite": "rewrite"
}
)
workflow.add_edge("rewrite", "retrieve") # ループ
workflow.add_edge("generate", END)
# グラフのコンパイル
graph = workflow.compile()
ワークフロー構造:
START → retrieve → grade_documents → [条件分岐]
├─ 関連あり → generate → END
└─ 関連なし → rewrite → retrieve(ループ)
3.6 ワークフローの実行
# 初期状態の設定
initial_state = {
"messages": [HumanMessage(content="最新の機械学習技術について教えてください")],
"documents": [],
"question": "最新の機械学習技術について",
"is_relevant": False,
"answer": ""
}
# 実行
result = graph.invoke(initial_state)
print("\n【回答】")
print(result["answer"])
実行結果:
============================================================
ワークフロー実行開始
============================================================
[検索] クエリ: 最新の機械学習技術について
[検索] 3件のドキュメントを取得
[評価] ドキュメントの関連性を評価中...
✓ 関連あり: 量子コンピューティングによる機械学習手法の最新動向について解説します。量子コンピ...
✗ 関連なし: 深層学習モデルの最適化技術には、学習率スケジューリング、勾配クリッピング、バッチ...
✗ 関連なし: 自然言語処理の分野では、TransformerアーキテクチャがBERTやGPTの...
[評価] 結果: 1/3 件が関連あり
[ルーター] → generate(関連ドキュメントあり)
[生成] 回答を生成中...
[生成] 完了
============================================================
実行完了
============================================================
【回答】
最新の機械学習技術には、量子コンピューティングを活用した手法が注目されています。量子コンピュータは、従来のコンピュータでは処理が難しい大規模なデータセットや複雑な計算を高速に行うことができるため、機械学習の効率を大幅に向上させる可能性があります。
3.7 ストリーミング実行
中間結果を観察しながら実行することもできます。
new_state = {
"messages": [HumanMessage(content="RAGとは何ですか?")],
"documents": [],
"question": "RAGとは何ですか?",
"is_relevant": False,
"answer": ""
}
for chunk in graph.stream(new_state, stream_mode="updates"):
print(f"\n--- ノード更新 ---")
for key, value in chunk.items():
print(f"ノード: {key}")
if "answer" in value and value["answer"]:
print(f"回答: {value['answer'][:100]}...")
4. Human-in-the-Loop機能
LangGraphでは、特定のノード実行前に処理を一時停止させ、ユーザーの確認を得ることができます。
from langgraph.checkpoint.memory import MemorySaver
# チェックポインターの作成
memory = MemorySaver()
# 人間の介入ポイントを設定
graph_with_interrupt = workflow.compile(
checkpointer=memory,
interrupt_before=["generate"] # 回答生成前に停止
)
# 設定(thread_idでセッションを識別)
config = {"configurable": {"thread_id": "user_123"}}
# 初期状態
hitl_state = {
"messages": [HumanMessage(content="深層学習について教えて")],
"documents": [],
"question": "深層学習について",
"is_relevant": False,
"answer": ""
}
# 最初の実行(generateノード前で停止)
result = graph_with_interrupt.invoke(hitl_state, config=config)
print(f"処理が一時停止しました。取得したドキュメント数: {len(result.get('documents', []))}")
# ユーザー確認後、続きから再開
result_continued = graph_with_interrupt.invoke(None, config=config)
print(result_continued["answer"])
実行結果:
[検索] クエリ: 深層学習について
[検索] 3件のドキュメントを取得
[評価] ドキュメントの関連性を評価中...
✓ 関連あり: 深層学習モデルの最適化技術には、学習率スケジューリング、勾配クリッピング、バッチ...
[評価] 結果: 1/3 件が関連あり
[ルーター] → generate(関連ドキュメントあり)
処理が一時停止しました(generateノード前)
取得したドキュメント数: 1
処理を再開します...
[生成] 回答を生成中...
[生成] 完了
【最終回答】
深層学習は、人工知能の一分野であり、特にニューラルネットワークを用いてデータから特徴を自動的に学習する手法です...
5. エラーハンドリングと再試行
プロダクション環境では、外部API呼び出しでのタイムアウトやレート制限に対する対策が必要です。
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def resilient_llm_call(llm, messages):
"""再試行機能付きLLM呼び出し"""
return llm.invoke(messages)
def agent_node_with_retry(state: RAGAgentState) -> dict:
"""エラーハンドリング付きエージェントノード"""
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
try:
response = resilient_llm_call(llm, state["messages"])
return {"messages": [response]}
except Exception as e:
error_message = AIMessage(content=f"エラーが発生しました: {str(e)}")
return {"messages": [error_message]}
6. 注意点・よくある落とし穴
6.1 状態管理における型安全性
# ❌ 問題のあるコード
class BadState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
value: str # デフォルト値なし - エラーの原因
# ✓ 正しいコード
class GoodState(TypedDict, total=False):
messages: Annotated[Sequence[BaseMessage], operator.add]
value: str
6.2 operator.addの正しい使い方
| フィールド型 | operator.add | 理由 |
|---|---|---|
| リスト(messages等) | ✓ 使用する | 追加したい |
| 文字列 | ✗ 使用しない | 上書きしたい |
| 数値 | ✗ 使用しない | 加算されてしまう |
6.3 条件付きエッジでの論理エラー
# ❌ 問題のあるルーター
def bad_router(state):
return "invalid_node_name" # 定義されていないノード!
# ✓ 正しいルーター
def good_router(state):
if state.get("is_relevant"):
return "generate" # mappingで定義済み
return "rewrite" # mappingで定義済み
まとめ
このチュートリアルでは、LangGraphを使用したAgentic RAGシステムの構築方法を学びました。
重要なポイント
- StateGraph、Node、Edgeの3要素がLangGraph活用の基盤
- 条件付きエッジにより、検索結果の品質に応じた動的な処理分岐が可能
- Human-in-the-Loop機能で、AIの判断に人間の確認を組み込める
- 状態管理とエラーハンドリングをプロダクションレベルで実装する必要がある