はじめに
もんたです。
ダッシュボードプロダクトに LangGraph × MCP を用い、構造化 RAG/非構造化 RAG を備えた AI アシスタントを実装しました。
本記事では、Cloud Run 二層構成(MCP Client / MCP Server)・SSE ストリーミング・Text-to-SQL(BigQuery)・ColPali(Byaldi)によるマルチモーダル検索の実装と学びをまとめます。
ざっくりとですが、今回のプロジェクトでは以下のようなAI Assistantを開発しました。
概要
本アシスタントは MCP Client と MCP Server の 2サービス構成で、いずれも Cloud Run 上で独立しています。
Client はユーザーの問いを受けて LangGraph のワークフローを回し、必要に応じて Server のツールを HTTP で呼び出し、結果をユーザーへストリーミングで返します。
-
MCP Client(Cloud Run):FastAPI + LangGraph
LangGraphの流れはpreprocess → summarize → analysisのようになっています
また、analysisで ReAct Agent が MCP ツールを HTTP 呼び出しを行っており、データの永続化は PostgreSQL(LangGraph Store / Checkpoint)が行っています
-
MCP Server(Cloud Run):RAGを行うツールを提供するMCP Server
ユーザーからの問い合わせに対して、ツールを使用して必要なデータを取得し、分析結果を返すツールを提供する主なツールは次の2つです
- 構造化RAG:BigQuery から構造化データを取得して分析(Text-to-SQL)
- 非構造化RAG:GCS 上の資料をベクトル検索して関連スニペット/画像を取得し分析
これらを組み合わせ、ユーザーの自然言語質問に対してダッシュボード上で直接、分析結果を返す機能を実装しました
こんな人に読んで欲しい
今回の記事は以下のような人たちに向けて書いているつもりです。
- LangGraphを用いてRAG機能を実装したいと考えているエンジニア
- LangGraphの実務レベルのコードを知りたいエンジニア
- MCP ClientとMCP Serverを切り分けて動かすことを考えているエンジニア
🐶「今回の記事は初心者向けの記事ではないので、そこんところご了承ください。」
技術スタックとアーキテクチャ
技術スタック
| カテゴリ | 技術・ツール |
|---|---|
| 言語 | Python |
| エージェントフレームワーク | LangGraph, LangChain, LangMem |
| RAG関連 | ColPali(マルチモーダルモデル), Cloud Storage(非構造化RAG), BigQuery(構造化RAG) |
| ストリーミング | Server-Sent Events(SSE) |
| DB | PostgreSQL |
| インフラ | Google Cloud(Cloud Run, Cloud Storage, BigQuery) |
| IaC | Terraform |
| CI/CD | Bitbucket Pipelines |
| その他 | LangGraph Store機能(データの永続化),LangSmith, FastMCP |
MCP Client 側のアーキテクチャ
MCP Client側のアーキテクチャは以下のようになっています。
MCP Server 側のアーキテクチャ
1. 構造化RAG Tool
初めに構造化RAGを行うToolのフロー図です。
以下の流れで処理を行っています。
2. 非構造化RAG Tool
続いて非構造化RAGを行うToolのフロー図です。
以下の流れで処理を行っています。
ちなみに、ByaldiとはColpaliというマルチモーダルモデルを簡単に使えるようにするラッパーライブラリです。
詳しくは以下のリンクをどうぞ。
さらに詳しく知りたい人はこちらをご覧ください。
ざっくりとどんな実装をしたか
ここからはどんな実装をしたのかを説明していきます。
本アプリケーションはMCP ClientとMCP Serverの2つのサーバーで動いているので、分けて説明します。
MCP Client
まずはMCP Clientからです。
MCP Clientを実装する上で重要な実装は以下の通りです。
- LangGraphを用いたAgentの実装
- ストリーミング処理
- MCP Server接続とReAct Agentへのツール受け渡し
になります。
それぞれ詳しく解説していきます。
1. LangGraphを用いたAgentの実装
すでに載せましたが、MCP Clientは以下のような構成になっています。
今回作成するGraphは以下のように定義されています。(簡略化して書いています)
langgraphの定義
要約
ユーザー入力などを持つAgentStateを定義し、preprocess →(必要なら)summarization → analysisの順で処理するワークフローをLangGraphで構築。preprocessで分析要否を判定し、不要なら即終了、必要なら要約を挟んでMCPツール実行へ。処理の流れを明確化し、早期応答と安定化を狙う。
from typing import TypedDict, Literal, Optional, Any
from langgraph.graph import StateGraph, END
# ********************* #
# 他にも色々インポートしてる #
# ********************* #
class AgentState(TypedDict):
user_message: str # ユーザーの入力
user_id: int # ユーザーID
thread_id: str # スレッドID(UUID文字列)
command: Optional[Literal["analysis", "others"]] # 分類結果
summary: Optional[str] # 要約(あれば)
final_result: Optional[dict[str, Any]] # 最終結果
def build_graph():
g = StateGraph(AgentState)
g.add_node("preprocess", preprocess_messages) # 前処理(分類など)
g.add_node("summarization", summarization_with_phase) # 要約(必要なら)
g.add_node("analysis", analysis_messages) # MCPツール呼び出し等
g.set_entry_point("preprocess")
g.add_conditional_edges(
"preprocess",
lambda s: "summarization" if s.get("command") == "analysis" else END,
{"summarization": "summarization", END: END},
)
g.add_edge("summarization", "analysis")
return g.compile()
# グローバルに定義
graph = build_graph()
続いて、それぞれのノードの具体についてです。
preprocessノード
要約
ユーザー入力をLLMにかけ、CommandDetermination(analysis/others と説明文)を構造化出力で判定。othersなら get_stream_writer で即ストリーム返信し、final_result を state に入れて早期終了。analysisなら履歴だけ state に渡して次ノードへ進める。前処理で要否判定+即時応答を担い、後段負荷と待機時間を減らす。
import json
import logging
from typing import Any, TypedDict, Literal
from langchain_core.messages import HumanMessage
from langsmith import traceable
from langgraph.config import get_stream_writer
# ********************* #
# 他にも色々インポートしてる #
# ********************* #
class CommandDetermination(TypedDict):
command: Literal["analysis", "others"]
message: str
async def _determine_command_structured(
user_message: str,
) -> CommandDetermination:
"""ユーザーからのクエリに回答するために分析が必要かどうかを決める"""
# 生成結果をストリーミングするために使う(後ほど説明)
writer = get_stream_writer()
try:
# モデルの定義を行う
llm_model = create_llm_for_task("task_determination")
# LLMが生成する結果をCommandDeterminationに従ったJSON形式にする
structured_llm = llm_model.with_structured_output(
CommandDetermination,
method="json_mode",
)
# プロンプトの定義
prompt_template = load_prompt_template("task_determination")
formatted_prompt = prompt_template.format(
user_prompt=user_message,
)
result = None
async for chunk in structured_llm.astream(formatted_prompt):
if isinstance(chunk, dict):
# with_structured_outputで生成結果にcommandとmessageが含まれている
result = chunk
command = chunk.get("command")
msg = chunk.get("message")
# commandがothers、つまり「分析が必要ない質問」と判断された場合の処理
if writer and msg and command == "others":
# Response型に変換し、writerを使ってストリーミングする
response = Response(
type="others", message=msg
)
writer(
{
"type": "stream_result",
"content": json.dumps(
response.model_dump(),
ensure_ascii=False,
),
}
)
# ****************************** #
# バリデーションなどの色々な処理を行う #
# ****************************** #
return result
except Exception:
logger.exception("エラー起きたよ")
raise
# ノードを定義している部分
async def preprocess_messages(state: AgentState) -> dict[str, Any]:
"""preprocessノードの定義を行う関数。
preprocessノードでは、ユーザーの質問を解析し分析が必要かどうか判断したりする
"""
try:
# LangGraphのStateから値を取得する
user_message = state.get("user_message", "")
user_id = state.get("user_id", None)
thread_id = state.get("thread_id", "")
# ユーザーの質問を分析し、タスクの定義を行う関数
command_result = await _determine_command_structured(user_message)
messages = [HumanMessage(content=user_message)]
# 分析が必要ないタスクの場合は、final_resultに格納する
if command_result["command"] == "others":
others_response = Response(
type="others", message=command_result["message"]
)
return {
"user_message": user_message,
"user_id": user_id,
"thread_id": thread_id,
"command": command_result["command"],
"messages": messages,
"messages_for_summary": messages,
"final_result": others_response.model_dump(),
}
# ユーザーからの質問内容的に分析が必要な場合はこちらが処理される
else:
return {
"user_message": user_message,
"user_id": user_id,
"thread_id": thread_id,
"command": command_result["command"],
"messages": messages,
"messages_for_summary": messages,
}
except Exception:
logger.exception("なんかエラー起きてるやで")
raise
summarizationノード
要約
要約専用LLMで SummarizationNode を作り、messages_for_summary を圧縮して summarized_messages に書き戻す。max_tokens 等で長さを制御。summarization_with_phase はノードを実行して要約済み state を返すだけ。狙いは履歴圧縮によるトークン節約と後段(analysis)の安定化。
import logging
from typing import Any
from langmem.short_term import SummarizationNode
from langsmith import traceable
# ********************* #
# 他にも色々インポートしてる #
# ********************* #
def _create_summarization_node() -> SummarizationNode:
"""LangMemのSummarizationNodeを使ってノードを作成する"""
# モデルの定義を行う
summarization_model = create_llm_for_task("text_summarization")
summarization_node = SummarizationNode(
model=summarization_model,
max_tokens=settings.summarization.max_tokens,
max_summary_tokens=settings.summarization.max_summary_tokens,
input_messages_key="messages_for_summary",
output_messages_key="summarized_messages",
name="summarization",
)
return summarization_node
async def summarization_with_phase(state: AgentState) -> dict[str, Any]:
"""Summarizationノードをラッパーする関数 """
summarization_node = _create_summarization_node()
result = await summarization_node.ainvoke(state)
return result
🐶「LangMemのSummarizationNodeとは?」
🐶「“LangMem の SummarizationNode”、いちばん大事なポイントだけ先に教えて!」👴「要は“会話履歴を切り捨てずに圧縮するノード”じゃよ。古いメッセージを順に数えて、しきい値に達したところまでを要約し、1つの要約メッセージに置き換える。結果は『[要約]+[最近の発話]』という並びになるのじゃ。システムメッセージは要約対象から外れるぞい。」 (LangChain)
🐶「いつ要約が走るかは、どう決まるの?」
👴「“古い→新しい”の順にトークンを加算して、max_tokens_before_summary に届いた範囲を要約する仕組みじゃ。これで履歴が長くなっても max_tokens の目標内に収められるのう。」 (LangChain)
🐶「導入の置きどころは?」
👴「二通りが典型じゃよ。
・“LLM 呼び出し前のフック”として、元の履歴は残したまま“要約版の入力だけ差し替える”。
・“履歴そのものを更新”して以後ずっと短い履歴で回す。
LangGraph のハウツーでも“トリム”と並ぶ基本戦略として説明されておる。」 (LangChain)
🐶「主要パラメータ、要点だけ!」
👴「このあたりを押さえれば十分じゃ。
・model … 要約に使う LLM。
・max_tokens_before_summary … 要約を発火させるしきい値。
・max_tokens … 要約後に目指す全体サイズ。
・max_summary_tokens … 要約メッセージに割り振る上限。
・input_messages_key / output_messages_key … どのキーの履歴を読み書きするか。
・RunningSummary … 前回要約を引き継いで一貫性を保つオブジェクトじゃ。」 (LangChain)
🐶「コードの雰囲気、短めで見たい!」
👴「参考程度にな、最小イメージはこんな按配じゃよ。」
from langmem.short_term import SummarizationNode
summ_node = SummarizationNode(
model=summ_model,
max_tokens=4000,
max_tokens_before_summary=2800, # しきい値(例:目標の7割)
max_summary_tokens=256,
input_messages_key="messages_for_summary",
output_messages_key="summarized_messages",
name="summarization",
)
# 以後のノードは summarized_messages を読むよう統一する
「履歴を“差し替え”にするか“入力だけ置換”にするかは設計で選ぶのじゃ。ハウツーでは前者・後者の両方のパターンが示されておる。」 (LangChain)
🐶「運用のコツ、3つだけ教えて!」
👴「
- しきい値は目標上限の6〜8割あたりに置いて“毎ターン要約”になるのを避ける。
- 要約プロンプトには『誰/何/決定事項/未解決』を明示し、落ちやすい情報を守る。
- 下流ノードが読むキー(原本か要約か)を統一し、混在を避けるのじゃ。」
(※この三つは実装・運用上の推奨)
🐶「落とし穴ってある?」
👴「あるとも。output_messages_key と input_messages_key を分けたときの挙動に関する報告があって、設定次第では“意図せず全履歴がモデル入力に載ってコンテキストオーバー”という Issue が上がっておる。設計時に確認するのじゃ。」 (GitHub)
👴「なお、“サマリノードが呼ばれない”類の相談は LangGraph 側の設定やフック位置が原因のことが多い。ドキュメントの手順どおり、メッセージの流路と発火条件を先に点検するのじゃよ。」 (GitHub)
🐶「トリム(削除)との違いってなに??」
👴「trim_messages は“古い発話を捨てる”。SummarizationNode は“古い発話を要約に圧縮する”。長対話でも要点を残したいときは後者が向くのう。」 (LangChain, DataCamp)
🐶「まとめるとどんな感じ??」
👴「SummarizationNode は“短期記憶の圧縮器”じゃ。max_tokens_before_summary と入力/出力キーの設計、要約プロンプトの定義、この三点をきちんと決めれば、長尺の会話でも破綻せずに回せるのじゃよ。」
analysisノード
要約
analysisノードはRAG実行の心臓部です。まず _create_analysis_chain で LLM に MCP Server から取得したツール群を注入し、ReAct Agent を構築(プロンプトで AnalysisResponse 型のJSON出力を要求)。analysis_messages では State から messages を入力に 10 分タイムアウトで実行し、得られた出力(answer または末尾メッセージ)から JSON を抽出→_extract_and_parse_json で Pydantic model_validate により厳密に構造化。アプリ独自型へ整形した結果を final_result に保存します。並行して get_stream_writer で結果を小さなチャンクに分け stream_result として逐次配信(UI進捗/即時性向上)。例外は捕捉しログ化、後処理を挟んで終了——ツール選択→実行→構造化→配信→永続化を一手に担います。
import asyncio
import json
import logging
import random
from typing import Any
import httpx
from langgraph.prebuilt import create_react_agent
from langgraph.config import get_stream_writer
from langgraph.errors import GraphRecursionError
from langsmith import traceable
# ********************* #
# 他にも色々インポートしてる #
# ********************* #
class AnalysisResponse(BaseModel):
summary: str = ""
insights: list[Insight] = Field(default_factory=list)
recommendations: list[str] = Field(default_factory=list)
model_config = {"json_encoders": {OrderedDict: dict}}
def _extract_and_parse_json(raw_text: str) -> DataAnalyzerResponse:
"""テキストからJSONを抽出し、構造化されたレスポンスとしてパースする"""
# *********************************** #
# 生成されたJSONデータをパースする処理してる #
# *********************************** #
try:
json_data = json.loads(json_text)
# jsonデータをAnalysisResponse型に変換
# class AnalysisResponse(BaseModel):
# summary: str = ""
# insights: list[Insight] = Field(default_factory=list)
# recommendations: list[str] = Field(default_factory=list)
# model_config = {"json_encoders": {OrderedDict: dict}}
analysis_response = AnalysisResponse.model_validate(json_data)
# ******************************************* #
# さらにアプリケーション内で使う特定の型に変換する処理 #
# ******************************************* #
return structured_result
except (json.JSONDecodeError, ValueError, TypeError) as e:
raise ValueError("なんかエラー起きたよ") from e
async def _create_analysis_chain(user_message: str):
"""分析用のReAct Agentの作成"""
# モデル定義
llm_model = create_llm_for_task("content_analysis")
# MCP Serverと接続を行い、Toolを取得する処理(後ほど詳しく解説)
async with get_mcp_tools() as mcp_tools:
# プロンプトの定義
# プロンプト内部で特定のJSON形式でアウトプットを生成するように指定している
prompt_template = load_prompt_template("react_agent")
final_prompt = prompt_template.format(
user_request=user_message,
)
# react_agentのアウトプットはAnalysisResponse型のJSON
react_agent = create_react_agent(
model=llm_model,
tools=mcp_tools,
prompt=final_prompt,
)
return react_agent
async def analysis_messages(state: AgentState) -> dict[str, Any]:
"""分析ノードを作成する関数"""
user_message = state.get("user_message", "")
thread_id = state.get("thread_id", "")
summarized_messages = state.get("summarized_messages", [])
try:
# 分析を行うReAct Agentの作成
analysis_chain = await _create_analysis_chain(user_message)
# ユーザーからのクエリの定義
agent_input = {
"messages": state.get("messages", []),
}
# 生成結果をストリーミングするために使う(後ほど説明)
writer = get_stream_writer()
structured_result = None
result = await asyncio.wait_for(
analysis_chain.ainvoke(agent_input),
timeout=60 * 10, # 10 minutes
)
raw_response = ""
if "answer" in result:
raw_response = result["answer"]
elif result.get("messages"):
raw_response = result["messages"][-1].content
if raw_response:
# 生成された結果から構造化されたJSONデータを取得
structured_result = _extract_and_parse_json(raw_response)
# writerを使ってストリーミングする
if writer:
streaming_chunks = await _generate_streaming_chunks(
structured_result
)
for chunk in streaming_chunks:
writer(
{
"type": "stream_result",
"content": json.dumps(
chunk.model_dump(), ensure_ascii=False
),
}
)
# **************** #
# 後処理的なやつがある #
# **************** #
return {
"user_message": user_message,
"thread_id": thread_id,
"summarized_messages": summarized_messages,
"final_result": structured_result.model_dump(),
}
except Exception as e:
raise Exception("エラー起きたよ") from e
🐶「analysis_nodeは何をやっているのか」
🐶「この“analysisノード”、全体で何してるの?」👴「流れはこうじゃよ。
- LLMとMCPツール入りのReActエージェントを組み立てる。
- ユーザーの会話履歴を渡して推論を走らせる。
- 返ってきたテキストからJSONを抜き出して型(AnalysisResponse)にバリデート。
- その結果をストリーミングでクライアントへ送りつつ、最終結果を返す――という構成じゃ。」
🐶「パーツごとに教えて!」
👴「よかろう、順に噛み砕くのじゃ。」
— _extract_and_parse_json
・LLMが出したテキストからJSONを抽出→json.loads→AnalysisResponse(Pydantic)に型付け。
・さらにアプリ内の独自型へ組み替えて structured_result を返す。
・JSON壊れ/型不一致は ValueError("なんかエラー起きたよ") にまとめて上げ直し。
(ポイント)“モデルが必ずJSONを出す”前提のプロンプトを上流で与えておるから、この関数は厳しめに落として良いのじゃ。
— _create_analysis_chain
・create_llm_for_task("content_analysis") で“分析タスク用”のLLMを取得。
・get_mcp_tools() でMCPサーバからツール群を引き出す(DB・Web・社内APIなど)。
・プロンプトテンプレを埋め、create_react_agent(model, tools, prompt) でReActエージェントを生成して返す。
(ポイント)ReActなので、思考→ツール実行→観測→最終回答、のループを内包しておるのじゃ。
— analysis_messages(メイン)
-
stateからuser_message,thread_id,summarized_messagesを取り出す。 -
_create_analysis_chainでエージェントを用意。 -
agent_input = {"messages": state.get("messages", [])}を渡してainvoke(10分タイムアウト)。 - 戻り値から
answerか末尾メッセージを拾ってraw_responseとする。 -
_extract_and_parse_jsonで構造化 →structured_result。 -
get_stream_writer()があれば、structured_resultをチャンクに刻んでtype: "stream_result"で逐次送信。 - 最後に
final_result(構造化データ)等をまとめて返す。
(ポイント)ストリーミングと最終レスポンスの“二層返し”になっておるのじゃ。
🐶「どこが“要点”なの?」
👴「三つじゃ。
- ReAct×MCPで“ツール利用前提”の分析パイプを作っておること。
- モデル出力を必ずJSONで整形させ、
AnalysisResponseで型検証していること。 - ユーザー体験として、途中経過をストリーミングしつつ、最後に構造化済みの完成品を返す設計になっておること、じゃよ。」
🐶「最終的に何が返るの?」
👴「{"user_message", "thread_id", "summarized_messages", "final_result"} じゃ。final_result は AnalysisResponse 由来の構造化JSON(要約・洞察・提言)で、UIはこれをそのまま描画すればよい作りになっておる。ストリームで先出し→最後に確定版、の順でユーザーは待たされにくいのじゃ。」
かなりざっくりですが、LangGraphの定義はこのように行いました。
これらのコードを組み合わせることで、ユーザーからの質問に対して内容を理解し、それに関する分析結果を返してくれるAI Assistantを作成することができます。
2. ストリーミング処理
続いてストリーミング処理の実装に関してです。
今回はget_stream_writer()を使ってストリーミングを行いました。
これを用いることで、好きな時に好きな値をストリーミングすることができるようになります。
🐶「 get_stream_writerってなに?」
🐶「get_stream_writerってなに?」
👴「LangGraph のノード(や entrypoint の task)“実行中”に、ストリーミング用の書き込み口(StreamWriter)を取り出す関数じゃよ。これでノード内から任意のデータを“配信”できるのじゃ。」 (LangChain)
🐶「どこで使えるの?」
👴「StateGraph の各ノードでも、関数型APIの task でも使えるぞい。Python 3.11 未満で“非同期運用”している場合は、contextvars 伝播の都合で動かんので要注意じゃ。」 (LangChain)
🐶「どう使うの?3ステップで!」
👴「こうじゃよ。
- ノード内で
writer = get_stream_writer()を呼ぶ。 -
writer({...})で任意のペイロード(dict など)を書き込む。 - 呼び出し側は
graph.stream(..., stream_mode="custom")のように “custom” モードで受信する。」 (LangChain)
🐶「最小イメージ見せて!」
👴「参考の骨格はこんな按配じゃ。」
from langgraph.config import get_stream_writer
def my_node(state):
writer = get_stream_writer()
writer({"custom_data": "Hello!"})
return {"foo": state["foo"] + 1}
# 受信側
for chunk in graph.stream({"foo": 1}, stream_mode="custom"):
print(chunk) # => {'custom_data': 'Hello!'}
「公式の例もほぼこれと同じ流れじゃな。」 (LangChain)
🐶「ぼくの“analysisノード”に当てはめると?」
👴「君のコードは writer = get_stream_writer() を取り、writer({"type": "stream_result", "content": ...}) を逐次発行しとる。親側(UI/呼び出し側)は stream(...) を使い、stream_mode を“custom”など発行側に合わせておくと、その場でリアルタイムに受け取れる――という仕組みじゃ。」 (LangChain)
🐶「よくある落とし穴は?」
👴「三つ覚えておくのじゃ。
・invoke() だけでは届かん。stream() を使うのじゃ。 (LangChain)
・stream_mode が一致していないと受信できん(例では "custom")。 (LangChain)
・Python<3.11 の非同期では動作しない(contextvars 伝播要件)。 (LangChain)」
🐶「つまり?」
👴「“ノードの中からイベントを押し流すための口”を渡してくれるのが get_stream_writer()。受け手は graph.stream(..., stream_mode="custom") で購読、送り手は writer({...}) で任意のチャンクを投げる――これが設計の芯じゃよ。」 (LangChain)
サンプルコードでは以下のようにコードが書かれてあります。
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START
from langgraph.config import get_stream_writer
class State(TypedDict):
foo: int
def my_node(state: State):
my_stream_writer = get_stream_writer()
my_stream_writer({"custom_data": "Hello!"})
return {"foo": state["foo"] + 1}
graph = (
StateGraph(State)
.add_node(my_node)
.add_edge(START, "my_node")
.compile(store=store)
)
for chunk in graph.stream({"foo": 1}, stream_mode="custom"):
print(chunk)
{'custom_data': 'Hello!'}
引用:https://langchain-ai.github.io/langgraph/reference/config/#langgraph.config.get_stream_writer
get_stream_writerを使うには、呼び出し側で以下のようにstream_mode="custom"を定義する必要があります。
for chunk in graph.stream({"foo": 1}, stream_mode="custom"):
print(chunk)
そして、任意の値をストリーミングしたい場合は以下のようにコードを書きます。
def my_node(state: State):
my_stream_writer = get_stream_writer()
my_stream_writer({"custom_data": "Hello!"})
こうすることで、LangGraphのノード内部からでも、好きな値を好きなタイミングでストリーミングできるようになります。
そして、先ほど紹介したpreprocess_nodeやanalysis_nodeでも同様にこのget_stream_writerが使われています。
class CommandDetermination(TypedDict):
command: Literal["analysis", "others"]
message: str
async def _determine_command_structured(
user_message: str,
) -> CommandDetermination:
"""ユーザーからのクエリに回答するために分析が必要かどうかを決める"""
# get_stream_writerの呼び出し
writer = get_stream_writer()
try:
# モデルの定義を行う
llm_model = create_llm_for_task("task_determination")
# LLMが生成する結果をCommandDeterminationに従ったJSON形式にする
structured_llm = llm_model.with_structured_output(
CommandDetermination,
method="json_mode",
)
# プロンプトの定義
prompt_template = load_prompt_template("task_determination")
formatted_prompt = prompt_template.format(
user_prompt=user_message,
)
result = None
async for chunk in structured_llm.astream(formatted_prompt):
if isinstance(chunk, dict):
# with_structured_outputで生成結果にcommandとmessageが含まれている
result = chunk
command = chunk.get("command")
msg = chunk.get("message")
# commandがothers、つまり「分析が必要ない質問」と判断された場合の処理
if writer and msg and command == "others":
# Response型に変換し、writerを使ってストリーミングする
response = Response(
type="others", message=msg
)
writer(
{
"type": "stream_result",
"content": json.dumps(
response.model_dump(),
ensure_ascii=False,
),
}
)
# ****************************** #
# バリデーションなどの色々な処理を行う #
# ****************************** #
return result
except Exception:
logger.exception("エラー起きたよ")
raise
get_stream_writerを呼び出し、CommandDetermination型で生成された結果をwriterを使ってストリーミングしています。
# Response型に変換し、writerを使ってストリーミングする
response = Response(
type="others", message=msg
)
writer(
{
"type": "stream_result",
"content": json.dumps(
response.model_dump(),
ensure_ascii=False,
),
}
)
そして、ノード内からストリーミングされた値は呼び出し側では以下のようにして処理しています。
# LangGraphの実行
async for event in graph.astream(
initial_state, config, stream_mode=["custom", "updates"] # stream_modeにcustomが含まれている
):
if isinstance(event, tuple) and len(event) == 2:
event_type, event_data = event
if event_type == "updates" and isinstance(
event_data, dict
):
# *************************** #
# event_typeがupdatesの時の処理 #
# *************************** #
# ノードからtype = stream_resultの結果を受け取る
elif (
event_type == "custom" # event_typeがcustom = get_stream_writerからのデータ
and isinstance(event_data, dict)
and event_data.get("type") == "stream_result"
):
# get_stream_writerから受け取ったデータをdumpし、ストリーミングする
try:
content = json.loads(event_data["content"])
ai_response = AIResponse(
id=str(thread_id),
author="agent",
data=[content],
)
yield f"data: {ai_response.model_dump_json()}\n\n"
except json.JSONDecodeError:
pass
このようにコードを書くことで、ノード内部で生成されたデータをユーザーにストリーミングすることができるようになります。
3. MCP Server 接続(HTTP)と ReAct Agent へのツール受け渡し
MCP Serverとの接続部分に関してです。
後ほど詳しく説明しますが、MCP Server側はFastMCPを用いてサーバーを立てています。
ローカルでサーバーを立てる場合は、http://localhost:8080/mcpにアクセスすることでMCPを経由してToolにアクセスすることができるようになっています。
上記のエンドポイントでサーバーが提供されている前提でコードの解説を行います。
MCP Serverから受け取ったToolをReAct Agentに渡す処理
要約
_create_analysis_chain は、分析専用LLMを用意し、get_mcp_tools()でMCP Serverから取得したツール群をReAct Agentに注入して組み立てる関数です。ユーザー入力からプロンプトを生成し、エージェントは必要に応じて構造化RAG/非構造化RAGツールを呼び出して分析を実行します。出力は AnalysisResponse 形式のJSONを想定し、後段での構造化処理・表示にそのまま使える形に整えます。
まず、MCPを経由して提供されるToolを使っている部分ですが、以下のanalysis_nodeにある_create_analysis_chain関数になります。
この関数では、ユーザーからの質問に対して分析を行う専用のLLMを定義する関数となっています。
分析の際に関連するデータを取得するために、MCP Serverにて提供されている構造化RAGツールと非構造化RAGツールを呼び出す必要があります。
class AnalysisResponse(BaseModel):
summary: str = ""
insights: list[Insight] = Field(default_factory=list)
recommendations: list[str] = Field(default_factory=list)
model_config = {"json_encoders": {OrderedDict: dict}}
async def _create_analysis_chain(user_message: str):
"""分析用のReAct Agentの作成
MCP Serverと接続を行い、構造化RAGや非構造化RAGツールなどを呼び出して分析を行うAgentを作成する
"""
llm_model = create_llm_for_task("content_analysis")
# get_mcp_tools関数がMCP Serverと接続を行い、取得したToolを返す
async with get_mcp_tools() as mcp_tools:
prompt_template = load_prompt_template("react_agent")
final_prompt = prompt_template.format(
user_request=user_message,
)
# react_agentのアウトプットはAnalysisResponse型のJSON
# MCP Serverより取得した関数をReAct Agentのtoolsに渡すことで、ReAct AgentがToolを呼び出せるようになる
react_agent = create_react_agent(
model=llm_model,
tools=mcp_tools,
prompt=final_prompt,
)
return react_agent
ここで重要なのが、get_mcp_tools関数になります。
この関数は非同期でMCP Serverと接続を行い、サーバーから提供されているToolを取得することができる関数となっています。
MCP Serverとの接続を行う処理
要約
get_mcp_toolsは、設定JSON(PROJECT_ROOT/src/config/mcp_servers.json)を読み込み、MultiServerMCPClient を初期化して MCP Server が公開するツール群を取得し、非同期コンテキストで yield する関数です。設定は transport(例:streamable_http)と url(例:http://localhost:8080/mcp)を持ち、複数サーバも扱えます。呼び出し側は async with 内で受け取った list[BaseTool] をそのまま ReAct Agent に渡せます。
具体的な内容は以下のようになっています。
import json
import logging
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from pathlib import Path
from langchain_core.tools import BaseTool
from langchain_mcp_adapters.client import MultiServerMCPClient
from langsmith import traceable
# プロジェクトルートへのパス
PROJECT_ROOT = Path(__file__).resolve().parents[2]
@asynccontextmanager
async def get_mcp_tools() -> AsyncGenerator[list[BaseTool], None]:
"""非同期でMCPツールをサーバーから受け取り、yieldで返却する関数"""
# MCP ClientがMCP Serverと接続するために必要なjsonファイルへのパス
config_path = PROJECT_ROOT.joinpath("src", "config", "mcp_servers.json")
try:
with open(config_path, encoding="utf-8") as f:
server_configs = json.load(f)
except FileNotFoundError:
logger.exception("ファイル内やないかい")
raise
except json.JSONDecodeError:
logger.exception("JSON形式ちゃうやないかい")
raise
except Exception:
logger.exception("なんかエラー起きてもうとるがな")
raise
# MultiServerMCPClientを用いてMCP Serverとの接続を行う
client = MultiServerMCPClient(server_configs)
try:
tools = await client.get_tools()
yield tools
except Exception:
logger.exception("なんかエラー起きたやで")
raise
この関数がMCP Serverとの接続を行ってくれており、そこで取得したツールをyieldしてくれている。
また、ここで重要なのがMCP ClientがMCP Serverとの接続を行うために使用するJSONファイルである。 具体的には以下のようなJSONファイルとなっています。
{
"mcp_server_tools": {
"transport": "streamable_http",
"url": "http://localhost:8080/mcp"
}
}
Claude DesktopでMCP経由でサービスを呼び出す時に定義するやつと同じで、呼び出すツールにアクセスする方法が書かれてあります。
🐶「これはなにをやってるの?」
🐶「このget_mcp_tools()って、結局なにをしてるの?」
👴「“設定ファイルを読み→MCPクライアントを初期化し→サーバからツール一覧を取得して渡す”という、接続準備の一連を非同期コンテキストマネージャとしてまとめた関数じゃよ。」
🐶「コードの流れを順番に教えて!」
👴「こう進むのじゃ。」
-
設定読込
PROJECT_ROOT/src/config/mcp_servers.jsonを開き、server_configsに読み込む。
・成功時:logger.infoで読み込み先とサーバ件数を記録。
・失敗時:FileNotFoundError/JSONDecodeError/その他例外をそれぞれlogger.exception付きで再送出するのじゃ。 -
クライアント生成
client = MultiServerMCPClient(server_configs)で複数サーバ対応のMCPクライアントを初期化するのう。
server_configsは「サーバ識別名: {transport, url, …}」の辞書(例:streamable_http+http://localhost:8080/mcp)。 -
ツール取得して引き渡し
tools = await client.get_tools()で、各MCP Serverが公開する**ツール定義(LangChainのBaseTool)**を収集。
件数をlogger.infoで記録して、yield toolsで呼び出し側へ渡す。
このyield以降、async with … as tools:ブロック内でエージェントにtoolsをそのまま渡せるのじゃ。 -
後処理(finally)
このライブラリ版では明示的なクローズは不要という前提ゆえ、close()/aclose()は呼んでおらんのじゃよ。
🐶「“非同期コンテキストマネージャ”の利点は?」
👴「エラー時や正常終了時のログと制御フローがきれいになるのじゃ。
async with get_mcp_tools() as tools:の一行で“読込→接続→取得→(ブロック)→片付けログ”までを包める。呼び出し側は接続の生存期間を意識せず、受け取ったtoolsだけ使えばよいのう。」
🐶「MultiServerMCPClientって何者?」
👴「複数のMCP Serverを一つのクライアントで扱う器じゃ。設定に書いたサーバすべてへ接続し、get_tools()で各サーバのツールをひと束(list[BaseTool])にして返す。ReActエージェントへはこの束を丸ごと渡してよいのじゃ。」
🐶「実際の使い方は?」
👴「呼び出し側はこうなるのじゃ。」
async with get_mcp_tools() as mcp_tools:
agent = create_react_agent(model=llm_model, tools=mcp_tools, prompt=final_prompt)
result = await agent.ainvoke({"messages": messages})
「これでエージェントは“必要時にMCPツールを呼び出す”振る舞いを自然に取れるのじゃ。」
👴「要するに、“設定→クライアント→ツール取得→引き渡し”を安全・可観測に包んだ実装が、この関数の肝じゃよ。」
MultiServerMCPClientとは
要約
MultiServerMCPClient は、複数の MCP Server への接続設定(transport・url・command/args など)を1つの辞書で受け取り、各サーバが公開するツールをまとめて取得する LangChain のアダプターです。
await client.get_tools() で list[BaseTool] を得て、そのまま ReAct Agent に渡すだけで、エージェントは必要時に各 MCP ツールを呼び出せます(例:stdio のローカルツールと streamable_http のHTTPツールを同時利用)。
MultiServerMCPClientとは、LangChainが提供するMCP接続を行うためのアダプターです。
以下のように引数にjson形式でMCP Serverへの接続方法を渡すことで、MCP Serverと接続を行ってくれます。
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent
client = MultiServerMCPClient(
{
"math": {
"command": "python",
# Make sure to update to the full absolute path to your math_server.py file
"args": ["/path/to/math_server.py"],
"transport": "stdio",
},
"weather": {
# Make sure you start your weather server on port 8000
"url": "http://localhost:8000/mcp/",
"transport": "streamable_http",
}
}
)
tools = await client.get_tools()
agent = create_react_agent("openai:gpt-4.1", tools)
math_response = await agent.ainvoke({"messages": "what's (3 + 5) x 12?"})
weather_response = await agent.ainvoke({"messages": "what is the weather in nyc?"})
引用:https://github.com/langchain-ai/langchain-mcp-adapters?tab=readme-ov-file#client-1
MCP Server
構造化RAG(Text-to-SQL × BigQuery)
改めてですが、構造化RAGの処理の流れは以下のようになっています。
構造化RAGの処理においてポイントとなるのは、「Text-to-SQL」→「BigQueryジョブ実行」の部分だと思うので、そこを中心に実装コードの解説を行います。
1.ざっくりとした流れ
要約
自然言語をText-to-SQLでSQL化し、BigQueryを実行して得た結果をAIで分析・要約する流れ。structured_rag ツールは クエリ強化→SQL生成→実行→分析 を行い、original/enhanced_query、取得データ、分析結果、成功可否を返します。
ちなみにですが、Text-to-SQLを簡単に解説をすると、「AIを使って自然言語からSQLを生成する手法」のことを指します。
以下のリンクが非常に参考になるかと思いますので、ぜひご覧ください。
例えばですが、「idが1のユーザーって誰?」みたいな質問をAIに投げたら、
SELECT name FROM users WHERE id = 1;
みたいなSQLをAIが生成してくれるような感じです。
今回はそのText-to-SQLを用いて、BigQueryに格納されているデータを取得するという処理を実装しました。
具体的には、以下のようなフローでText-to-SQLを実行しています。
def register_structured_rag_tools(mcp: FastMCP) -> None:
"""MCP ServerにToolを追加する関数"""
@mcp.tool
async def structured_rag(user_prompt: str) -> dict:
"""ユーザーからの質問をもとにText-to-SQLでSQLを作成し、BigQueryからデータを取得。その分析結果を返すTool"""
try:
# クエリ強化を行うサービスクラス
query_enhancement_service = StructuredQueryEnhancementService()
# Text-to-SQLを行うサービスクラス
text_to_sql_service = TextToSQLService()
# BigQueryにアクセスし、SQLを実行するサービスクラス
sql_execution_service = BigQueryExecutionRepository()
# 分析を行うサービスクラス
analysis_service = StructuredDataAnalysisService()
# ユースケース
structured_rag_use_case = StructuredRAGUseCase(
query_enhancement_service=query_enhancement_service,
text_to_sql_service=text_to_sql_service,
sql_execution_service=sql_execution_service,
analysis_service=analysis_service,
)
# クエリ強化を実施
# BigQueryのスキーマ情報などを考慮し、Text-to-SQLをしやすい、プロンプトに変換する
enhancement_result = await structured_rag_use_case.enhance_query(
user_prompt
)
# *************** #
# バリデーションなど #
# *************** #
# 強化されたクエリやBigQueryのスキーマ情報をもとにText-to-SQLを実施
sql_result = await (
structured_rag_use_case.generate_sql_from_enhanced_query(
enhancement_result.enhanced_query
)
)
# *************** #
# バリデーションなど #
# *************** #
# 作成されたSQLを使ってBigQueryにてジョブを実行
execution_result = await structured_rag_use_case.execute_sql_query(
sql_result.generated_sql
)
# SQLが成功したら、AIによる分析を行う
analysis_result = ""
if execution_result.success:
try:
analysis_result = await (
structured_rag_use_case.analyze_sql_results(
original_query=enhancement_result.original_query,
enhanced_query=enhancement_result.enhanced_query,
generated_sql=sql_result.generated_sql,
execution_results={
"success": execution_result.success,
"data": execution_result.data,
"row_count": execution_result.row_count,
"error": execution_result.error,
},
)
)
except Exception as e:
analysis_result = "分析失敗しとるやないかい"
# 値のレスポンス
response = {
"original_query": enhancement_result.original_query,
"enhanced_query": enhancement_result.enhanced_query,
"data": execution_result.data,
"analysis_result": analysis_result,
"success": enhancement_result.success
and sql_result.success
and execution_result.success,
"error": execution_result.error
if not execution_result.success
else None,
}
# *************** #
# バリデーションなど #
# *************** #
return response
except Exception as e:
return {
"original_query": user_prompt,
"enhanced_query": None,
"data": None,
"analysis_result": "",
"success": False,
"error": "失敗したやで",
}
大まかな流れとしてはこんな感じです。
🐶「このコードってなにやってるの?」
🐶「このregister_structured_rag_toolsって、なにしてるの?」
👴「“MCP ServerにText-to-SQL+実行+分析”を一発でやるツールを登録しておるのじゃ。中で定義している structured_rag が、その実体の非同期ツールじゃよ。」
🐶「@mcp.tool って?」
👴「FastMCP に“この関数をツールとして公開する”合図じゃ。呼び出し側(エージェントなど)は、structured_rag(user_prompt: str) -> dict をMCP経由のツールとして叩けるようになるのじゃ。」
🐶「ツールの中身、ざっくり流れで!」
👴「四段構えになっておる。」
-
準備
・クエリ強化、Text-to-SQL、BigQuery実行、分析——の役割別サービスをインスタンス化。
・それらを束ねるユースケースStructuredRAGUseCaseを作る。 -
クエリ強化(Enhance)
・enhance_query(user_prompt)を実行。
・ユーザー入力を、スキーマ情報などを踏まえた“Text-to-SQLしやすい形”に整える。
・結果はenhancement_result(original_queryとenhanced_queryなど)に入る。
・途中に“バリデーションなど”のコメントがあり、検査処理の挿し所が示されておる。 -
SQL生成 → 実行
・generate_sql_from_enhanced_query(enhanced_query)で SQL文字列 を作る(sql_result.generated_sql)。
・(ここでも“バリデーションなど”の挿し所あり)
・execute_sql_query(generated_sql)で BigQueryジョブを実行。
・実行結果はexecution_resultにまとまり、success / data / row_count / errorを持っている。 -
分析(成功時のみ)
・execution_result.successが真なら、analyze_sql_results(...)を呼ぶ。
・引数には元クエリ/強化後クエリ/生成SQL/実行結果(成功可否・データ・件数・エラー)を渡す。
・分析に失敗した場合はanalysis_result = "分析失敗しとるやないかい"と文字列を入れておる。
🐶「最終的な返り値は?」
👴「辞書(JSON相当)じゃ。キーはこうじゃよ。
-
original_query: 強化前のユーザー入力 -
enhanced_query: 強化後プロンプト(Text-to-SQL向け) -
data: BigQueryの実行結果データ -
analysis_result: AIによる分析結果(成功時は文字列や要約、失敗時は固定文言) -
success: 全工程の成功判定(enhancement_result.success and sql_result.success and execution_result.success) -
error: 失敗時にexecution_result.errorを入れる(成功時はNone)」
🐶「外側の try/except は?」
👴「ツール処理の総合ガードじゃ。どこかで例外が出ても、最後に固定フォーマットで
{"original_query": user_prompt, "enhanced_query": None, "data": None, "analysis_result": "", "success": False, "error": "失敗したやで"}
を返すのが意図じゃな。」
🐶「非同期や await の使い方は?」
👴「structured_rag は async 関数。ユースケースの各工程(強化/生成/実行/分析)は非同期メソッドとして await で順に呼ばれておる。これにより、MCPツールとしてもノンブロッキングで逐次処理できる構成になっておるのじゃ。」
🐶「まとめ!」
👴「このコードは、(1)入力強化 → (2)SQL生成 → (3)BigQuery実行 → (4)結果分析を一括で行うMCPツール structured_rag を定義し、標準化されたレスポンス辞書を返す作り。工程ごとにサービスを分け、ユースケースで束ね、例外時も一定形で応答する——そこが骨子なのじゃよ。」
2. Text-to-SQL
要約
Text-to-SQLは、コンテキストの充実(用語マッピング/スキーマJSON/フィルターセット)で意図を正確化。
生成SQLはSELECT限定をプロンプトと検証で強制し、プロンプトインジェクションも抑止。推論は温度0で決定性を高め、あり得ないSQLの生成を低減する――この三点で精度と安全性を確保する方針。
Text-to-SQLを実行する上で、意識したことを話します。
-
コンテキスト
Text-to-SQLを行う上で、重要なのはコンテキスト情報です。
ドメインごとの知識や特定の部署だけで使われている名前などがあります。
そのマッピングなどを行った内容をプロンプトに含めるようにしました。そうすることで、LLMは「ユーザーからの質問で〇〇という単語が使われているな。これはどうやら△△という意味らしい。これを考慮した上で、Text-to-SQLを実行しようか。」と判断してくれます。
その他には、RAGの対象となるBigQueryのテーブルのスキーマ情報なども渡しました。
LLMが理解しやすいJSON形式でBigQueryの対象テーブルのスキーマをプロンプトに含めることで、LLMがどのようなテーブルを対象にSQLを作成すれば良いかを理解できるようにすることを意識しました。また、マッピングした情報の他にも、特定の単語のフィルターセットなども作成しました。
テーブル構造やマッピング情報だけではわからないこともある場合があるので、「特定の単語に関するデータを取得する際はこのフィルターを使ってね」というのをわかりやすく伝えるために、フィルタリングセットもプロンプトに含めました。
-
SELECT以外は許可しない
これは当たり前ですが、LLMが生成するSQLはSELECTのみにする必要があるので、INSERTやUPDATE, DELETEといったクエリは生成しないようにプロンプト内部で制限しました。
また、生成結果に含まれていないかどうかも明示的に確認する処理も追加しました。
こうすることでLLMが生成するSQLはSELECTのみであることを保証することができるようになりました。
このフィルター処理があることで、プロンプトインジェクションにも対応できるようになっています。
-
Text-to-SQLを実行する時は温度は0にする
細かなTipsですが、Text-to-SQLを行う際、温度は0にしました。
こうすることでLLMがありもしないSQLを生成する(ハルシネーションの)可能性を少しでも減らしています。
3. 生成されたSQLの実行
要約
BigQueryExecutionRepository.execute_query は、受け取ったSQLをまず _validate_and_prepare_sql で検査し、bigquery.Client で標準SQL・課金上限付き(QueryJobConfig)で実行。結果を検証して返却します。エラーはバリデーション、Google Cloud系、その他で分けてハンドリング。
Text-to-SQLを実行し、安全なSQLが作成されたら、以下のサービスクラスのexecute_query関数を用いてSQLを実行します。
以下のような関数を実行します。
class BigQueryExecutionRepository(SQLExecutionInterface):
"""BigQueryに接続し、SQLを実行するサービスクラス"""
def __init__(self) -> None:
# BigQuery Clientの初期化
self.client = bigquery.Client(
project=settings.google_cloud.project_id,
location=settings.google_cloud.bigquery_dataset_location,
)
async def execute_query(self, sql: str) -> list[dict[str, Any]]:
"""受け取ったSQLを実行する関数"""
try:
# バリデーション関数
validated_sql = self._validate_and_prepare_sql(sql)
# ジョブの設定
job_config = bigquery.QueryJobConfig(
use_legacy_sql=False, # Use standard SQL
maximum_bytes_billed=10**9, # 1GB limit for cost control
)
# クエリの実行
query_job = self.client.query(validated_sql, job_config=job_config)
results = query_job.result()
# *************** #
# バリデーションなど #
# *************** #
return rows_data
except ValueError:
logger.exception("バリデーションエラーでごわす")
raise
except GoogleCloudError as e:
error_msg = "エラーでっせだんな。Google Cloud側のエラーでやんすわ。"
raise GoogleCloudError(error_msg) from e
except Exception as e:
error_msg = "よーわからんエラー起きてもうとるがな"
raise Exception(error_msg) from e
やっていることは非常にシンプルで、BigQuery Clientを作成し、Clientを通してSQLを実行。
取得したデータをバリデーションして、それを返却するという関数です。
🐶「このコードは何をやってるの?」
🐶「このクラスって、なにをする役目?」
👴「BigQuery に SQL を投げて結果を取ってくる“実行担当”じゃよ。SQLExecutionInterface を実装したサービスで、アプリ側からは“SQL文字列を渡す→行データの配列が返る”という窓口になるのじゃ。」
🐶「最初の __init__ は?」
👴「BigQuery のクライアントを作っておる。
project=settings.google_cloud.project_id と location=settings.google_cloud.bigquery_dataset_location を指定して、以後のクエリ実行に使うのじゃ。」
🐶「execute_query(sql) の流れを教えて!」
👴「五段の手順じゃ。」
-
バリデーション
_validate_and_prepare_sql(sql)を呼んで、受け取った SQL を検査・整形するのじゃ(ここで不正ならValueErrorが飛ぶ想定)。 -
ジョブ設定
bigquery.QueryJobConfigを作成。
-use_legacy_sql=False… BigQuery 標準SQLを使う指定。-
maximum_bytes_billed=10**9… 課金上限を 1GB に制限(コスト制御)。
-
-
クエリ送信
self.client.query(validated_sql, job_config=job_config)でジョブを投げ、query_job.result()で完了まで待ち、結果イテレータ(行集合)を受け取るのじゃ。 -
結果整形
取得した行をアプリ都合の形へ変換してrows_data(list[dict[str, Any]])を作る。
※サンプルではこの変換処理はコメントで省略されておる。 -
返却
rows_dataをそのまま返すのじゃ。
🐶「例外処理はどうなってる?」
👴「三段に分けておるぞい。」
-
ValueError(バリデーション失敗)
ログを出してそのまま再送出。“SQL が不正”を上位へ知らせるのじゃ。 -
GoogleCloudError(GCP側の障害・権限・クォータ等)
独自メッセージを付けてGoogleCloudErrorを再送出(from eで元エラーを連結)。 -
その他の例外
原因不明のときは汎用Exceptionに包み直して上げるのじゃ。
🐶「つまり?」
👴「“プロジェクト/ロケーションを束ねた BigQuery クライアント”で、
(1)SQL検査 → (2)ジョブ設定 → (3)実行 → (4)行を辞書化 → (5)返す、
失敗時は“バリデーション/GCP/その他”に切り分けて例外を投げ直す――この役者が BigQueryExecutionRepository というわけじゃよ。」
非構造化RAG(ColPali × Cloud Storage)
改めてですが、非構造化RAGの処理の流れは以下のようになっています。
非構造化RAGの処理において重要なのは「ColPaliモデルによる検索」になるので、そこを中心に解説します。
前提として非構造化RAGツールで扱うRAGモデルはColPaliというマルチモーダルモデルを使用しています。
そしてこのプロジェクトではColPaliモデルのラッパーであるByaldiというライブラリを使用しています。
参考:
1. ざっくりとした流れ
要約
非構造化RAGは、ColPali(Byaldi)でGCS上の資料を検索し、得た画像スニペットとテキストを組み合わせてLLMに渡して分析します。
流れは①クエリ強化→②検索・コンテンツ組立→③マルチモーダル分析。
結果無し/失敗時は定形エラー、成功時はoriginal/enhanced_queryとanalysis_resultを返却。
非構造化RAGの処理は以下のようになっています。
def register_unstructured_rag_tools(mcp: FastMCP) -> None:
"""MCP ServerにToolを追加する関数"""
@mcp.tool
async def unstructured_rag(user_prompt: str) -> dict:
"""ユーザーからの質問をもとに、ColPaliモデルを用いてRAGを実行、取得した画像をもとに分析結果を返す関数"""
try:
# RAGを行うColPaliモデルの初期化
rag_model = get_rag_model()
if rag_model is None:
raise RuntimeError("RAGモデルの初期化失敗したでワレ")
# GCSにある画像からRAGを行うサービスクラス
search_processor = UnstructuredRAGSearchProcessor(rag_model)
# 取得した画像データとテキストデータを組み合わせるサービスクラス
content_assembler = MultiModalContentAssembler()
# マルチモーダルコンテンツを分析するサービスクラス
analysis_service = MultiModalAnalysisService()
# クエリ強化を行うサービスクラス
query_enhancement_service = UnstructuredQueryEnhancementService()
# GCSのメタデータの処理を行うサービスクラス
document_manager = MetadataDocumentManagerRepository()
# ユースケース
unstructured_rag_use_case = UnstructuredRagUseCase(
search_processor=search_processor,
content_assembler=content_assembler,
analysis_service=analysis_service,
query_enhancement_service=query_enhancement_service,
document_manager=document_manager,
)
# クエリ強化を行う
# ユーザーからの質問と利用可能なファイルより、適切なRAGを行うためのクエリを生成する
enhancement_result = await unstructured_rag_use_case.enhance_query(
user_prompt
)
if not enhancement_result.success:
return {
"original_query": user_prompt,
"enhanced_query": None,
"analysis_result": None,
"success": False,
"error": enhancement_result.error,
}
# 強化したクエリをもとにRAGを実施、さらに取得したコンテンツを組み合わせる
# 画像データとテキストデータを組み合わせる理由は、マルチモーダルモデルのコンテキストに渡すため
multimodal_content = (
await unstructured_rag_use_case.search_and_assemble_content(
enhancement_result.enhanced_query
)
)
if not multimodal_content.contents:
return {
"original_query": user_prompt,
"enhanced_query": enhancement_result.enhanced_query,
"analysis_result": None,
"success": False,
"error": "No content found in RAG search",
}
# RAGで取得した画像データをLLMのコンテキストとしてわたし、分析を行わせる
try:
analysis_result = (
await unstructured_rag_use_case.analyze_multimodal_content(
original_query=user_prompt,
enhanced_query=enhancement_result.enhanced_query,
multimodal_content=multimodal_content,
)
)
except Exception as e:
return {
"original_query": user_prompt,
"enhanced_query": enhancement_result.enhanced_query,
"analysis_result": None,
"success": False,
"error": f"Content analysis failed: {str(e)}",
}
# 非構造化RAGツールの結果
response = {
"original_query": enhancement_result.original_query,
"enhanced_query": enhancement_result.enhanced_query,
"analysis_result": analysis_result,
"success": True,
"error": None,
}
return response
except Exception as e:
return {
"original_query": user_prompt,
"enhanced_query": None,
"analysis_result": None,
"success": False,
"error": f"Unstructured RAG tool error: {e!s}",
}
🐶「このコードは何をやってるの?」
🐶「この“非構造化RAGツール”、全体で何してるの?」
👴「三段流じゃよ。
① クエリを賢く言い換える(クエリ強化)→ ② ColPali(Byaldi)でGCS上の資料を検索し、画像スニペット+関連テキストを束ねる → ③ それらをマルチモーダルLLMに渡して分析、なのじゃ。成功なら original_query / enhanced_query / analysis_result を返し、失敗や未ヒットは定形のエラー形で返すのう。」
🐶「コードの入口から順に教えて!」
👴「register_unstructured_rag_tools の中で @mcp.tool を付けた unstructured_rag(user_prompt) をMCPサーバのツールとして公開しておる。呼ばれると、こう動くのじゃ。」
-
モデルとサービス群の初期化
-
rag_model = get_rag_model()
「ColPali(Byaldi)ベースのRAG検索モデルを用意。失敗ならそこで打ち切りじゃ。」 -
UnstructuredRAGSearchProcessor(rag_model)
「画像検索担当。GCSの資料群からクエリに合う断片(サムネ・ページスニペット等)を引き当てる役。」 -
MultiModalContentAssembler()
「検索した画像断片+関連テキストを“LLMに渡せる形”へ束ねる役。」 -
MultiModalAnalysisService()
「束ねたマルチモーダル入力をLLMで解析して結論を出す役。」 -
UnstructuredQueryEnhancementService()
「ユーザーの自然文を“検索に強い形”へクエリ強化する役。」 -
MetadataDocumentManagerRepository()
「GCS側のメタデータ(どのファイルが対象か等)の管理・参照役。」 - これらを
UnstructuredRagUseCase(...)に束ね、ユースケース経由で一連の処理を呼ぶ設計じゃ。
-
-
クエリ強化(enhance)
enhancement_result = await use_case.enhance_query(user_prompt)「ユーザー文と言語・利用可能ファイルなどを踏まえ、検索精度の高い強化クエリを作る。
successが偽なら、original_query=user_promptだけ入れて早期returnするのじゃ。」 -
検索→コンテンツ組み立て
multimodal_content = await use_case.search_and_assemble_content(enhanced_query)「強化クエリで ColPali 検索を走らせ、ヒットした画像断片+テキストを LLM へ渡せるマルチモーダル入力に整形する。何も見つからなければ
No content found in RAG searchを返すのう。」 -
マルチモーダル分析
analysis_result = await use_case.analyze_multimodal_content( original_query=user_prompt, enhanced_query=enhancement_result.enhanced_query, multimodal_content=multimodal_content, )「束ねた画像とテキストをマルチモーダルLLMに投げ、要約・根拠説明・回答などを作らせる。ここでの例外は分析失敗として個別に握りつぶし、定形エラーを返しておる。」
-
正常応答を組み立てて返却
{ "original_query": enhancement_result.original_query, "enhanced_query": enhancement_result.enhanced_query, "analysis_result": analysis_result, "success": True, "error": None, }「“非構造化”ゆえ生データ配列は返さず、分析の最終アウトプット(テキスト等)を
analysis_resultに入れて返す形じゃ。」 -
最外周の例外ガード
「想定外の例外は統一メッセージUnstructured RAG tool error: ...に包んで返す。これで呼び出し側は常に同じスキーマで扱えるのう。」
🐶「“画像スニペット+テキスト”を一緒に渡す理由は?」
👴「マルチモーダルの真価を引き出すためじゃよ。図表・スクショ・スキャンPDFなどは画像の文脈が強い。そこへOCRや周辺テキストを添えて渡すと、LLMが図中の数値・注釈・凡例まで読み合わせて推論できるのじゃ。」
🐶「成功・失敗の分岐、もう一度整理して!」
👴「こう覚えるのじゃ。
- クエリ強化が失敗 → 強化なしで即エラー返却。
- 検索でヒット無し →
No content found...を返却。 - 分析で例外 →
Content analysis failed: ...を返却。 - それ以外の例外 →
Unstructured RAG tool error: ...で包んで返却。 - 成功 →
original/enhanced_queryとanalysis_resultをsuccess=Trueで返す、じゃよ。」
🐶「要するに?」
👴「質問を“検索に強い形”へ磨き、ColPali で画像中心の根拠を拾い、テキストと束ねてマルチモーダルLLMで結論に落とす――その一連をMCPツールとして一発で呼べるようにしてある、それがこのコードの肝なのじゃ。」
2. ColPaliモデルによるRAG
要約
Byaldiはローカルディレクトリ前提でインデックスを作成/読込するため、まず GCSをコンテナ内にマウントし、事前に作成したインデックスを RAGMultiModalModel.from_index でロード。初回取得したモデルはグローバルキャッシュして再利用し、以後の呼び出しで初期化コストを抑えます。
検索は UnstructuredRAGSearchProcessor が担当し、自然文クエリで k=10件 を取得→スコアや重複を考慮して抽出→アプリ共通の RAGSearchResult 配列に正規化して返却。失敗時はログを残し空配列で上流へ返すため安全です。これにより「GCS上のPDF/画像 → 類似検索 →(後段で)画像+テキストをLLMへ渡す分析」へと繋がる、堅牢で再利用可能な土台が整います。
RAGモデルの初期化は以下のように行なっています。
from byaldi import RAGMultiModalModel # byaldiのRAGMultiModalModelを使って初期化を行う
# RAGモデルのキャッシュ
_rag_model_cache: RAGMultiModalModel | None = None
def get_rag_model() -> RAGMultiModalModel | None:
"""Get the RAG model instance.
"""
global _rag_model_cache
# すでにRAGモデルがグローバルキャッシュにて定義されている場合は、キャッシュを返す
if _rag_model_cache is not None:
logger.debug("Returning cached RAG model")
return _rag_model_cache
try:
# RAGモデルの初期化
# RAGモデルの初期化の際に、RAG対象となるディレクトリのパスを渡す必要がある
rag_model = RAGMultiModalModel.from_index(
index_path=".",
index_root=settings.rag.local_index_folder, # RAG対象のパス
device="cpu",
verbose=0,
)
_rag_model_cache = rag_model # Cache the initialized model
return rag_model
except Exception:
logger.exception("Error during RAG model initialization")
return None
「Byaldi」ではインデックスを作成するにあたってローカルのディレクトリを指定する必要があるため、本プロジェクトでは事前にGoogle Cloud Storageをアプリケーション内にマウントしています。
Creating an index with byaldi is simple and flexible. You can index a single PDF file, a single image file, or a directory containing multiple of those. Here's how to create an index:
byaldi を使ったインデックス作成はシンプルかつ柔軟です。単一の PDF ファイル、単一の画像ファイル、あるいは複数のファイルを含むディレクトリをインデックス化できます。インデックスの作成方法は以下の通りです。(Google翻訳)
from byaldi import RAGMultiModalModel # Optionally, you can specify an `index_root`, which is where it'll save the index. It defaults to ".byaldi/". RAG = RAGMultiModalModel.from_pretrained("vidore/colqwen2-v1.0") RAG.index( input_path="docs/", # The path to your documents index_name=index_name, # The name you want to give to your index. It'll be saved at `index_root/index_name/`. store_collection_with_index=False, # Whether the index should store the base64 encoded documents. doc_ids=[0, 1, 2], # Optionally, you can specify a list of document IDs. They must be integers and match the number of documents you're passing. Otherwise, doc_ids will be automatically created. metadata=[{"author": "John Doe", "date": "2021-01-01"}], # Optionally, you can specify a list of metadata for each document. They must be a list of dictionaries, with the same length as the number of documents you're passing. overwrite=True # Whether to overwrite an index if it already exists. If False, it'll return None and do nothing if `index_root/index_name` exists. )
参考:
このモデルの初期化は、ワークフローにおける「GCSをマルチモーダルモデルに事前にマウントしておく」の処理に該当します。
このステップを踏むことで、マルチモーダルモデルがGCS上にアップロードされている画像データのRAGを行うことができるようになります。
🐶「RAGモデルの初期化ってなにやってんの?」
🐶「この“ColPali(Byaldi)によるRAGの初期化”、コードでは何をしてるの?」
👴「要は“一度作って保存しておいたインデックスを、起動時(または初回呼び出し時)に読み込んで使い回す”処理じゃよ。流れはこうじゃ。」
-
グローバルキャッシュを確認
if _rag_model_cache is not None: return _rag_model_cache「すでに読み込んだ
RAGMultiModalModelがあれば即返す。重い初期化を何度もやらんための仕掛けじゃ。」 -
Byaldi の既存インデックスをロード
rag_model = RAGMultiModalModel.from_index( index_path=".", index_root=settings.rag.local_index_folder, device="cpu", verbose=0, )「
from_indexは“もう作ってあるインデックスをファイルから読む”APIじゃ。
index_rootに“インデックス一式が置いてあるルートディレクトリ”、index_pathに“その直下の読み込み対象(例:インデックス名のディレクトリ)”を渡す仕組みになっておる。ここではindex_path="."なので、index_root直下をそのまま読むイメージじゃな。」 -
キャッシュして返す
_rag_model_cache = rag_model return rag_model「以後はメモリ上のモデルを再利用できるのじゃ。」
-
失敗時はログを出して
None
「読み込みに失敗したらlogger.exception(...)を吐いてNoneを返す。呼び出し側は“モデル初期化失敗”として扱えるのう。」
🐶「“GCS をマウントする”って何のため?」
👴「Byaldi は“ローカルのパス”を前提にインデックスを作ったり読んだりするのじゃ。そこで、GCS バケットをファイルシステムとしてマウントして、アプリからは“ローカルのディレクトリ”として扱えるようにしておく。図の“GCSをマルチモーダルモデルに事前にマウント”がそれじゃよ。」
- 例:
gs://my-bucket/rag-index/を/mnt/gcs/rag-index/にマウント - 設定:
settings.rag.local_index_folder = "/mnt/gcs/rag-index" - 読み込み:
from_index(index_root="/mnt/gcs/rag-index", index_path=".")
「こうすれば、GCS 上に置いた PDF・画像群から作っておいたインデックスを、そのまま Byaldi に読ませられるのじゃ。」
🐶「“インデックス作成”と“読み込み”の違いは?」
👴「Byaldi には二本立てがあるのう。」
-
作成:
RAG.index(input_path=..., index_name=..., ...)
「ドキュメント群(PDF・画像)を走査して、ベクトル化+インデックス生成をする段じゃ。」 -
読み込み:
RAGMultiModalModel.from_index(index_root=..., index_path=...)
「既に作られたインデックスをディスクからロードする段じゃ。」
「今回のコードは“作成済みを読む”側。つまり“RAG 対象の GCS ディレクトリを事前にインデックス化し、マウントした場所に置いておく”という設計なのじゃ。」
🐶「初期化が終わると、何ができるの?」
👴「RAGMultiModalModel が持つ検索API(例:類似検索)を使って、画像スニペット+関連情報を引き当てられる。後段ではそのスニペット群をテキストと束ね、マルチモーダル LLM に渡して分析する――というのが、非構造化RAGの本丸じゃな。」
🐶「図の“Byaldiによるベクトル検索”の前準備が、まさにこの初期化ってことか!」
👴「そのとおりじゃ。
- 先に GCS をマウント(ローカルパス化)
-
作っておいたインデックスを
from_indexでロード -
一度ロードしたらキャッシュして再利用
――これで、後続の“検索→コンテンツ組立→マルチモーダル分析”に即入れるのじゃよ。」
RAGを行うマルチモーダルモデルの初期化が完了したので、次はそのモデルを使ってRAGを行う処理のコードです。
以下のようなコードを書いて、RAGを実行しました。
class UnstructuredRAGSearchProcessor(UnstructuredRAGSearchInterface):
"""マルチモーダルモデルを使ってRAGを実施するサービスクラス"""
def __init__(self, rag_model: RAGMultiModalModel):
"""Initialize the search processor"""
self.rag_model = rag_model
async def search(
self,
query: str,
category: str = "not_specified",
doc_names: list[str] | None = None,
max_results: int = 10,
) -> list[RAGSearchResult]:
"""RAGの実行"""
try:
# RAG結果の上位10件のみresultsに格納
results = self.rag_model.search(query=query, k=10)
# resultsの中から、関連性の強い結果のみ抽出
extracted_results = self._extract_best_results(results)
# RAGSearchResultの配列になるように変換処理を行う
return self._convert_to_rag_search_results(extracted_results)
except Exception:
logger.exception("RAG失敗してもうたやないかい")
return []
こうすることで、ユーザーの質問をもとにGCS側へ事前アップロードされている画像の検索を実現することができるようになるのです。
🐶「このコードは何をやっているのじゃ!」
🐶「この UnstructuredRAGSearchProcessor、なにしてるの?」
👴「“初期化済みのマルチモーダルRAGモデル(Byaldi/ColPali)”を包んで、検索→抽出→型そろえを一手にやる窓口じゃよ。クエリを渡すと、GCSをインデックス化したコーパスから画像中心のヒットを取り出し、アプリ内の統一型 RAGSearchResult の配列にして返すのじゃ。」
🐶「コードの流れを順番に!」
👴「こう進むのう。」
-
コンストラクタ
def __init__(self, rag_model): self.rag_model = rag_model「前段で作った
RAGMultiModalModel(GCSをマウントして読み込んだインデックス)を握っておくのじゃ。」 -
search(...)本体results = self.rag_model.search(query=query, k=10)「自然文クエリで上位
k=10を取得。ここで返るのは、モデル固有の“検索結果オブジェクト群”(スコアやファイル/ページのメタ情報など)じゃ。」 -
関連度で“いいとこ取り”
extracted = self._extract_best_results(results)「スコア閾値、重複排除、必要ならドキュメント名の絞り込み等でノイズを間引く工程。※詳細はこの補助関数側に隠蔽されておる。」
-
アプリ内の型へ変換
return self._convert_to_rag_search_results(extracted)「モデル依存の形を、アプリ共通の
RAGSearchResult(例:score,doc_name,page,snippet,image_pathなど)に正規化して返すのじゃ。」 -
失敗時の扱い
except Exception: logger.exception(...); return []
「例外はログに残し、空配列で上流へ返す。呼び出し側(ユースケース)は“ヒットなし”として後続(組み立て・分析)をスキップできるのう。」
🐶「最終的に何が手に入るの?」
👴「“画像スニペット中心の根拠候補”のリストじゃ。これを次段の MultiModalContentAssembler が画像+テキストに束ね、マルチモーダルLLMへ渡して分析へ進む。つまり、このクラスは“検索面の責務を集約し、上流のユースケースに扱いやすい統一型で結果を届ける役どころ、というわけじゃよ。」
おわり
最後までお読みいただき誠にありがとうございました。
今回はダッシュボードプロダクトに、構造化RAGと非構造化RAGが行えるAI Assistantを実装したので、そのアウトプット記事を書かせていただきました。
このプロジェクトを通して、
- MCPを用いたToolの提供方法
- MCP ClientとMCP Serverの接続方法
- LangGraphを用いたAgentワークフローの実装方法
- LangGraphの生成結果をストリーミングで返す方法
- マルチモーダルモデルを用いたRAGの実装方法
など非常に幅広い内容を学ぶことができました。
短い期間の間にこのような機能を持つAI Assistantを実装するのは非常に大変だったのですが、なんとか期待されていた機能を期間内に実装することができ、非常に良い経験を積むことができました。
MCPやLangGraphなどのAI Agentはこれからどんどん盛り上げって行く分野だと思うので、引き続きキャッチアップしていこうと思います。
改めてですが、最後までお読みいただき誠にありがとうございました!
🐶 & 👴「ありがとうございました〜!」
