import time
import boto3
from mcp.server.fastmcp import FastMCP
# 初始化 Server
mcp = FastMCP("AWS Athena Analyst")
# 配置 (请替换为你的实际配置)
ATHENA_DB = "default" # 你的 Athena 数据库名
OUTPUT_LOCATION = "s3://your-bucket/athena-results/" # Athena 查询结果存放路径
client = boto3.client('athena')
@mcp.tool()
def execute_sql(sql_query: str) -> str:
"""
在 AWS Athena 上执行 SQL 查询并返回结果。
适用于分析 S3 上的大规模 CSV/Parquet 数据。
Args:
sql_query: 标准的 Presto/Trino SQL 语句 (例如: "SELECT category, count(*) FROM sales GROUP BY category")
"""
try:
# 1. 提交查询
response = client.start_query_execution(
QueryString=sql_query,
QueryExecutionContext={'Database': ATHENA_DB},
ResultConfiguration={'OutputLocation': OUTPUT_LOCATION}
)
query_execution_id = response['QueryExecutionId']
# 2. 轮询等待结果 (Athena 是异步的)
while True:
stats = client.get_query_execution(QueryExecutionId=query_execution_id)
status = stats['QueryExecution']['Status']['State']
if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
break
time.sleep(1) # 等待 1 秒
# 3. 处理结果
if status == 'SUCCEEDED':
# 获取查询结果
results = client.get_query_results(QueryExecutionId=query_execution_id)
# 将复杂的 Athena JSON 响应转换为易读的 Markdown 表格
rows = results['ResultSet']['Rows']
markdown_output = []
for row in rows:
data = [col.get('VarCharValue', 'NULL') for col in row['Data']]
markdown_output.append("| " + " | ".join(data) + " |")
# 添加表头分割线
if len(markdown_output) > 1:
header_cols = len(rows[0]['Data'])
separator = "| " + " | ".join(["---"] * header_cols) + " |"
markdown_output.insert(1, separator)
return "\n".join(markdown_output)
else:
reason = stats['QueryExecution']['Status'].get('StateChangeReason', 'Unknown Error')
return f"SQL Execution Failed: {status} - {reason}"
except Exception as e:
return f"System Error: {str(e)}"
@mcp.tool()
def list_tables() -> str:
"""列出数据库中所有可用的表,用于了解数据概况"""
return execute_sql("SHOW TABLES")
@mcp.tool()
def describe_table(table_name: str) -> str:
"""查看表结构(列名和类型),写 SQL 前必须调用此工具"""
return execute_sql(f"DESCRIBE {table_name}")
if __name__ == "__main__":
mcp.run()
下段代码展示了如何启动 MCP 客户端,获取工具列表,将其转换为 LangGraph 可用的工具,并构建 Agent。
import asyncio
import os
from typing import List, Any
# MCP SDK
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
# LangChain / LangGraph
from langchain_openai import ChatOpenAI
from langchain_core.tools import StructuredTool
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import HumanMessage
# 配置 API Key
# os.environ["OPENAI_API_KEY"] = "sk-..."
# ==========================================
# 1. MCP to LangChain 适配器
# ==========================================
async def load_mcp_tools(session: ClientSession) -> List[StructuredTool]:
"""
从 MCP Session 中获取工具列表,并转换为 LangChain 的 StructuredTool
"""
mcp_tools = await session.list_tools()
langchain_tools = []
for tool in mcp_tools.tools:
# 定义一个闭包函数来执行具体的 MCP 工具调用
# 注意:这里需要捕获 tool.name
async def _dynamic_tool_func(**kwargs):
# 调用 MCP Client 执行工具
result = await session.call_tool(tool.name, arguments=kwargs)
# 返回结果文本
if result.content:
return result.content[0].text
return "No output returned."
# 创建 LangChain 工具
# 我们直接使用 MCP 提供的 JSON Schema 作为工具的 args_schema
# 注意:简单场景下可以让 LLM 直接推断 Schema,严谨场景建议转换 Pydantic
lc_tool = StructuredTool.from_function(
func=None,
coroutine=_dynamic_tool_func, # 这是一个异步工具
name=tool.name,
description=tool.description,
# 将 MCP 的 JSON Schema 映射给 LangChain
# 这样 OpenAI 才知道如何填充参数
args_schema=None,
)
# 修正 Schema:由于 StructuredTool.from_function 自动推断可能不准
# 我们手动把 MCP 的 schema 塞进去,确保 LLM 理解参数结构
lc_tool.args = tool.inputSchema.get("properties", {})
lc_tool.description = f"{tool.description}\nParameters: {tool.inputSchema}"
langchain_tools.append(lc_tool)
return langchain_tools
# ==========================================
# 2. 构建并运行 LangGraph Agent
# ==========================================
async def main():
# 定义 Server 参数 (指向你保存的 athena_server.py)
server_params = StdioServerParameters(
command="python",
args=["athena_server.py"], # 确保文件名正确
env=os.environ.copy() # 传递环境变量给子进程(包含 AWS 凭证)
)
print("🔌 Connecting to Athena MCP Server...")
# 使用上下文管理器启动 MCP Client
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# A. 初始化:获取 MCP 工具
await session.initialize()
tools = await load_mcp_tools(session)
print(f"✅ Loaded {len(tools)} tools from MCP: {[t.name for t in tools]}")
# B. 定义 LLM (使用支持 Tool Calling 的模型)
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# C. 构建 LangGraph Agent (使用预构建的 ReAct Agent)
# create_react_agent 会自动处理 "LLM -> Tools -> LLM" 的循环
graph = create_react_agent(llm, tools=tools)
# D. 执行交互
print("\n🤖 Agent is running. Analyzing Athena data...\n")
# 示例问题:请根据你的实际表名调整
query = "请查看 default 数据库里有哪些表,然后选择其中一张表查看前 5 条数据结构。"
input_state = {"messages": [HumanMessage(content=query)]}
# 流式输出 Agent 的思考过程
async for chunk in graph.astream(input_state, stream_mode="values"):
# 获取最后一条消息
message = chunk["messages"][-1]
if message.type == "ai":
# 如果有工具调用,打印出来
if message.tool_calls:
for tc in message.tool_calls:
print(f"🛠️ Calling Tool: {tc['name']} with args: {tc['args']}")
else:
print(f"🤖 AI: {message.content}")
elif message.type == "tool":
print(f"📦 Tool Output: {message.content[:200]}...") # 只打印前200字符
if __name__ == "__main__":
asyncio.run(main())
用MultiServerMCPClient(通常来自 langchain-mcp-adapters 这样的封装库),你的 main.py 代码会变得极度简洁。
最大的变化是:你可以删掉那个复杂的 load_mcp_tools 函数了。这个 Client 会自动帮你把 MCP 工具转换成 LangChain 工具。
import asyncio
import os
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import HumanMessage
# 引入那个"排插"类 (根据你使用的具体库,导入路径可能略有不同)
from langchain_mcp_adapters.client import MultiServerMCPClient
async def main():
# ==========================================
# 1. 配置服务器 (这就是截图里的写法)
# ==========================================
server_config = {
"athena-analyst": {
"command": "python",
"args": ["athena_server.py"],
"transport": "stdio",
# ⚠️ 关键点:一定要把 AWS 凭证传进去,否则子进程连不上 AWS
"env": os.environ.copy()
}
}
print("🔌 Connecting via MultiServerMCPClient...")
# ==========================================
# 2. 自动连接与工具加载
# ==========================================
# 这个 context manager 会自动启动所有服务器
async with MultiServerMCPClient(server_config) as client:
# ✨ 魔法时刻:一行代码直接拿到 LangChain 可用的工具列表
# 它自动处理了 schema 转换和 tool binding
tools = client.get_tools()
print(f"✅ Loaded {len(tools)} tools: {[t.name for t in tools]}")
# ==========================================
# 3. Agent 逻辑 (完全不变)
# ==========================================
llm = ChatOpenAI(model="gpt-4o", temperature=0)
graph = create_react_agent(llm, tools=tools)
print("\n🤖 Agent is running...\n")
query = "请列出 default 数据库的表,并查看第一张表的结构。"
async for chunk in graph.astream({"messages": [HumanMessage(content=query)]}, stream_mode="values"):
message = chunk["messages"][-1]
if message.type == "ai":
print(f"🤖 AI: {message.content}")
elif message.type == "tool":
print(f"📦 Tool Output: {message.content[:100]}...")
if __name__ == "__main__":
asyncio.run(main())
LLM调用MCP里的tool
让 LLM 执行 execute_sql 不需要你手动去写 SQL,这是 Tool Calling (函数调用) 的核心魔法所在。
简单来说,你不需要告诉它“怎么调用”,但你需要告诉它“何时调用”以及“遵守什么规则”。
以下是 LLM 执行 execute_sql 的实际工作流程和通过 Prompt 优化的技巧。
1. 核心机制:它是如何自动发生的?
当你把 execute_sql 定义为 MCP Tool 并传给 LangGraph 时,发生了以下事情:
Schema 传递:程序启动时,MCP 会把这个函数的签名(名字、参数 sql_query、描述)变成一段 JSON 描述发给 LLM(如 GPT-4)。
LLM 看到的信息:"有一个工具叫 execute_sql,它接受一个字符串参数 sql_query,用于在 Athena 查询数据。"
意图识别:当你问“统计一下上个月的销售额”时,LLM 会思考:
“我要统计数据 -> 我需要数据 -> 我需要运行 SQL -> 我应该调用 execute_sql 工具。”
参数生成 (关键步骤):
LLM 会根据它的内部知识(Presto/Trino SQL 语法)自动为你写出 SQL 语句,例如 SELECT sum(amount) FROM sales WHERE ...。
然后它向程序发出指令:Call execute_sql(sql_query="SELECT ...")。
2. 实际执行流程 (The Workflow)
要在 LangGraph 中完美执行它,LLM 通常需要经过 “三步走”(否则它会瞎猜表名报错):
探查 (Discovery):LLM 发现自己不知道表名,先调 list_tables()。
理解 (Schema):LLM 拿到表名后,调 describe_table(table_name) 搞清楚有哪些列。
执行 (Execution):LLM 根据列名,写出正确的 SQL,调 execute_sql(query)。
3. 如何在代码中控制它? (System Prompt)
为了确保 LLM 能写出正确的 SQL 并成功执行,你必须在 main.py 里加一个 System Prompt(系统提示词)。
这是修改后的 main.py 关键部分:
from langchain_core.messages import SystemMessage
# ... (前面的加载工具代码保持不变) ...
# 定义系统提示词:这是教 LLM 怎么用 execute_sql 的关键
system_instruction = """
你是一个 AWS Athena 数据分析专家。你的任务是根据用户问题查询数据。
请严格遵守以下步骤(不要跳过):
1. 先使用 `list_tables` 查看有哪些表。
2. 即使你觉得猜到了表名,也必须使用 `describe_table` 查看表的具体列名和类型。
3. 根据表结构编写 Presto/Trino 语法的 SQL。
4. 使用 `execute_sql` 执行查询。
SQL 编写规则:
- 总是使用 `LIMIT 10` 来进行初步数据查看,防止结果过大。
- 如果查询包含中文,请确保 Athena 配置支持,或者只查询英文字段。
- 所有的字符串值通常需要单引号。
"""
# 在创建 Agent 时注入 System Message
# state_modifier 参数就是用来放 System Prompt 的
graph = create_react_agent(
llm,
tools=tools,
state_modifier=system_instruction
)
# ... (运行部分不变) ...