0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

LangGraph 建立调用S3+athena的server

Last updated at Posted at 2025-12-12
import time
import boto3
from mcp.server.fastmcp import FastMCP

# 初始化 Server
mcp = FastMCP("AWS Athena Analyst")

# 配置 (请替换为你的实际配置)
ATHENA_DB = "default"        # 你的 Athena 数据库名
OUTPUT_LOCATION = "s3://your-bucket/athena-results/" # Athena 查询结果存放路径

client = boto3.client('athena')

@mcp.tool()
def execute_sql(sql_query: str) -> str:
    """
    在 AWS Athena 上执行 SQL 查询并返回结果。
    适用于分析 S3 上的大规模 CSV/Parquet 数据。
    
    Args:
        sql_query: 标准的 Presto/Trino SQL 语句 (例如: "SELECT category, count(*) FROM sales GROUP BY category")
    """
    try:
        # 1. 提交查询
        response = client.start_query_execution(
            QueryString=sql_query,
            QueryExecutionContext={'Database': ATHENA_DB},
            ResultConfiguration={'OutputLocation': OUTPUT_LOCATION}
        )
        query_execution_id = response['QueryExecutionId']
        
        # 2. 轮询等待结果 (Athena 是异步的)
        while True:
            stats = client.get_query_execution(QueryExecutionId=query_execution_id)
            status = stats['QueryExecution']['Status']['State']
            
            if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
                break
            time.sleep(1) # 等待 1 秒
            
        # 3. 处理结果
        if status == 'SUCCEEDED':
            # 获取查询结果
            results = client.get_query_results(QueryExecutionId=query_execution_id)
            
            # 将复杂的 Athena JSON 响应转换为易读的 Markdown 表格
            rows = results['ResultSet']['Rows']
            markdown_output = []
            
            for row in rows:
                data = [col.get('VarCharValue', 'NULL') for col in row['Data']]
                markdown_output.append("| " + " | ".join(data) + " |")
                
            # 添加表头分割线
            if len(markdown_output) > 1:
                header_cols = len(rows[0]['Data'])
                separator = "| " + " | ".join(["---"] * header_cols) + " |"
                markdown_output.insert(1, separator)
                
            return "\n".join(markdown_output)
            
        else:
            reason = stats['QueryExecution']['Status'].get('StateChangeReason', 'Unknown Error')
            return f"SQL Execution Failed: {status} - {reason}"

    except Exception as e:
        return f"System Error: {str(e)}"

@mcp.tool()
def list_tables() -> str:
    """列出数据库中所有可用的表,用于了解数据概况"""
    return execute_sql("SHOW TABLES")

@mcp.tool()
def describe_table(table_name: str) -> str:
    """查看表结构(列名和类型),写 SQL 前必须调用此工具"""
    return execute_sql(f"DESCRIBE {table_name}")

if __name__ == "__main__":
    mcp.run()

下段代码展示了如何启动 MCP 客户端,获取工具列表,将其转换为 LangGraph 可用的工具,并构建 Agent。

import asyncio
import os
from typing import List, Any

# MCP SDK
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

# LangChain / LangGraph
from langchain_openai import ChatOpenAI
from langchain_core.tools import StructuredTool
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import HumanMessage

# 配置 API Key
# os.environ["OPENAI_API_KEY"] = "sk-..." 

# ==========================================
# 1. MCP to LangChain 适配器
# ==========================================
async def load_mcp_tools(session: ClientSession) -> List[StructuredTool]:
    """
    从 MCP Session 中获取工具列表,并转换为 LangChain 的 StructuredTool
    """
    mcp_tools = await session.list_tools()
    langchain_tools = []

    for tool in mcp_tools.tools:
        # 定义一个闭包函数来执行具体的 MCP 工具调用
        # 注意:这里需要捕获 tool.name
        async def _dynamic_tool_func(**kwargs):
            # 调用 MCP Client 执行工具
            result = await session.call_tool(tool.name, arguments=kwargs)
            # 返回结果文本
            if result.content:
                return result.content[0].text
            return "No output returned."

        # 创建 LangChain 工具
        # 我们直接使用 MCP 提供的 JSON Schema 作为工具的 args_schema
        # 注意:简单场景下可以让 LLM 直接推断 Schema,严谨场景建议转换 Pydantic
        lc_tool = StructuredTool.from_function(
            func=None,
            coroutine=_dynamic_tool_func, # 这是一个异步工具
            name=tool.name,
            description=tool.description,
            # 将 MCP 的 JSON Schema 映射给 LangChain
            # 这样 OpenAI 才知道如何填充参数
            args_schema=None, 
        )
        
        # 修正 Schema:由于 StructuredTool.from_function 自动推断可能不准
        # 我们手动把 MCP 的 schema 塞进去,确保 LLM 理解参数结构
        lc_tool.args = tool.inputSchema.get("properties", {})
        lc_tool.description = f"{tool.description}\nParameters: {tool.inputSchema}"
        
        langchain_tools.append(lc_tool)

    return langchain_tools

# ==========================================
# 2. 构建并运行 LangGraph Agent
# ==========================================
async def main():
    # 定义 Server 参数 (指向你保存的 athena_server.py)
    server_params = StdioServerParameters(
        command="python",
        args=["athena_server.py"], # 确保文件名正确
        env=os.environ.copy()      # 传递环境变量给子进程(包含 AWS 凭证)
    )

    print("🔌 Connecting to Athena MCP Server...")
    
    # 使用上下文管理器启动 MCP Client
    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            
            # A. 初始化:获取 MCP 工具
            await session.initialize()
            tools = await load_mcp_tools(session)
            print(f"✅ Loaded {len(tools)} tools from MCP: {[t.name for t in tools]}")

            # B. 定义 LLM (使用支持 Tool Calling 的模型)
            llm = ChatOpenAI(model="gpt-4o", temperature=0)

            # C. 构建 LangGraph Agent (使用预构建的 ReAct Agent)
            # create_react_agent 会自动处理 "LLM -> Tools -> LLM" 的循环
            graph = create_react_agent(llm, tools=tools)

            # D. 执行交互
            print("\n🤖 Agent is running. Analyzing Athena data...\n")
            
            # 示例问题:请根据你的实际表名调整
            query = "请查看 default 数据库里有哪些表,然后选择其中一张表查看前 5 条数据结构。"
            
            input_state = {"messages": [HumanMessage(content=query)]}
            
            # 流式输出 Agent 的思考过程
            async for chunk in graph.astream(input_state, stream_mode="values"):
                # 获取最后一条消息
                message = chunk["messages"][-1]
                
                if message.type == "ai":
                    # 如果有工具调用,打印出来
                    if message.tool_calls:
                        for tc in message.tool_calls:
                            print(f"🛠️  Calling Tool: {tc['name']} with args: {tc['args']}")
                    else:
                        print(f"🤖 AI: {message.content}")
                elif message.type == "tool":
                    print(f"📦 Tool Output: {message.content[:200]}...") # 只打印前200字符

if __name__ == "__main__":
    asyncio.run(main())

用MultiServerMCPClient(通常来自 langchain-mcp-adapters 这样的封装库),你的 main.py 代码会变得极度简洁。

最大的变化是:你可以删掉那个复杂的 load_mcp_tools 函数了。这个 Client 会自动帮你把 MCP 工具转换成 LangChain 工具。

import asyncio
import os
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import HumanMessage

# 引入那个"排插"类 (根据你使用的具体库,导入路径可能略有不同)
from langchain_mcp_adapters.client import MultiServerMCPClient

async def main():
    # ==========================================
    # 1. 配置服务器 (这就是截图里的写法)
    # ==========================================
    server_config = {
        "athena-analyst": {
            "command": "python",
            "args": ["athena_server.py"],
            "transport": "stdio",
            # ⚠️ 关键点:一定要把 AWS 凭证传进去,否则子进程连不上 AWS
            "env": os.environ.copy() 
        }
    }

    print("🔌 Connecting via MultiServerMCPClient...")

    # ==========================================
    # 2. 自动连接与工具加载
    # ==========================================
    # 这个 context manager 会自动启动所有服务器
    async with MultiServerMCPClient(server_config) as client:
        
        # ✨ 魔法时刻:一行代码直接拿到 LangChain 可用的工具列表
        # 它自动处理了 schema 转换和 tool binding
        tools = client.get_tools()
        
        print(f"✅ Loaded {len(tools)} tools: {[t.name for t in tools]}")

        # ==========================================
        # 3. Agent 逻辑 (完全不变)
        # ==========================================
        llm = ChatOpenAI(model="gpt-4o", temperature=0)
        graph = create_react_agent(llm, tools=tools)

        print("\n🤖 Agent is running...\n")
        query = "请列出 default 数据库的表,并查看第一张表的结构。"
        
        async for chunk in graph.astream({"messages": [HumanMessage(content=query)]}, stream_mode="values"):
            message = chunk["messages"][-1]
            if message.type == "ai":
                print(f"🤖 AI: {message.content}")
            elif message.type == "tool":
                print(f"📦 Tool Output: {message.content[:100]}...")

if __name__ == "__main__":
    asyncio.run(main())

LLM调用MCP里的tool


让 LLM 执行 execute_sql 不需要你手动去写 SQL,这是 Tool Calling (函数调用) 的核心魔法所在。

简单来说,你不需要告诉它“怎么调用”,但你需要告诉它“何时调用”以及“遵守什么规则”。

以下是 LLM 执行 execute_sql 的实际工作流程和通过 Prompt 优化的技巧。

1. 核心机制:它是如何自动发生的?
    当你把 execute_sql 定义为 MCP Tool 并传给 LangGraph 时,发生了以下事情:
    Schema 传递:程序启动时,MCP 会把这个函数的签名(名字、参数 sql_query、描述)变成一段 JSON 描述发给 LLM(如 GPT-4)。
    LLM 看到的信息:"有一个工具叫 execute_sql,它接受一个字符串参数 sql_query,用于在 Athena 查询数据。"
    意图识别:当你问“统计一下上个月的销售额”时,LLM 会思考:
    “我要统计数据 -> 我需要数据 -> 我需要运行 SQL -> 我应该调用 execute_sql 工具。”
    参数生成 (关键步骤):
    LLM 会根据它的内部知识(Presto/Trino SQL 语法)自动为你写出 SQL 语句,例如 SELECT sum(amount) FROM sales WHERE ...。
    然后它向程序发出指令:Call execute_sql(sql_query="SELECT ...")。

2. 实际执行流程 (The Workflow)
    要在 LangGraph 中完美执行它,LLM 通常需要经过 “三步走”(否则它会瞎猜表名报错):
    探查 (Discovery):LLM 发现自己不知道表名,先调 list_tables()。
    理解 (Schema):LLM 拿到表名后,调 describe_table(table_name) 搞清楚有哪些列。
    执行 (Execution):LLM 根据列名,写出正确的 SQL,调 execute_sql(query)。

3. 如何在代码中控制它? (System Prompt)
    为了确保 LLM 能写出正确的 SQL 并成功执行,你必须在 main.py 里加一个 System Prompt(系统提示词)。

这是修改后的 main.py 关键部分:

from langchain_core.messages import SystemMessage

# ... (前面的加载工具代码保持不变) ...

# 定义系统提示词:这是教 LLM 怎么用 execute_sql 的关键
system_instruction = """
你是一个 AWS Athena 数据分析专家。你的任务是根据用户问题查询数据。

请严格遵守以下步骤(不要跳过):
1. 先使用 `list_tables` 查看有哪些表。
2. 即使你觉得猜到了表名,也必须使用 `describe_table` 查看表的具体列名和类型。
3. 根据表结构编写 Presto/Trino 语法的 SQL。
4. 使用 `execute_sql` 执行查询。

SQL 编写规则:
- 总是使用 `LIMIT 10` 来进行初步数据查看,防止结果过大。
- 如果查询包含中文,请确保 Athena 配置支持,或者只查询英文字段。
- 所有的字符串值通常需要单引号。
"""

# 在创建 Agent 时注入 System Message
# state_modifier 参数就是用来放 System Prompt 的
graph = create_react_agent(
    llm, 
    tools=tools, 
    state_modifier=system_instruction 
)

# ... (运行部分不变) ...
0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?