>
← 返回投肯智能知识库首页

LangChain+MCP协议:AI Agent开发实战从请求到工具调用

作者:重庆投肯小刚更新日期:2026年5月阅读时长:25分钟

前言

AI Agent的核心里有两件事:让大模型「会思考」和让大模型「能动手」。LangChain负责前者,MCP(Model Context Protocol)负责后者。两者结合,才构成一套完整的Agent开发体系。

本文面向已有Python基础的AI开发者,从协议原理到代码实战,从单Tool调用到多Tool编排,从手写MCP Server到线上调试,手把手拆解每一个细节。没有营销话术,全是工程干货。

第一章:MCP协议详解

1.1 协议设计理念:为什么需要MCP?

在大模型应用里,「给模型提供上下文」这件事经历了三个阶段:

MCP的核心设计目标是:让AI应用和外部工具之间的接口统一化。无论你的工具是搜索API、数据库还是文件系统,只要遵循MCP协议,任意兼容MCP的客户端都能连接使用。

1.2 协议基础:JSON-RPC 2.0

MCP的消息格式完全遵循JSON-RPC 2.0规范。JSON-RPC是一种无状态的轻量级远程过程调用协议,所有数据以JSON格式传输。

// JSON-RPC 2.0 请求格式
{
  "jsonrpc": "2.0",        // 协议版本,固定为"2.0"
  "id": 1,                // 请求ID,用于匹配响应(整数或字符串)
  "method": "tools/list", // 方法名,MCP定义了一系列标准方法
  "params": {}            // 方法参数,MCP规范定义了各方法的参数结构
}

// JSON-RPC 2.0 响应格式(成功)
{
  "jsonrpc": "2.0",
  "id": 1,                // 与请求ID对应,证明这个响应匹配哪个请求
  "result": {             // 调用结果,各方法返回不同结构
    "tools": [...]
  }
}

// JSON-RPC 2.0 响应格式(错误)
{
  "jsonrpc": "2.0",
  "id": 1,
  "error": {              // 错误对象,包含错误信息
    "code": -32600,       // 错误码,JSON-RPC标准错误码或MCP扩展错误码
    "message": "Invalid Request"
  }
}
为什么选JSON-RPC而不是REST?JSON-RPC是双向的,支持推送(Server→Client通知),而REST天然是请求-响应模式,无法实现Server主动推送。此外JSON-RPC更轻量,method命名空间更清晰。

1.3 三个核心概念:Host / Client / Server

MCP采用经典的客户端-服务器架构,但引入了第三个角色Host,构成三层结构:

Server(工具服务方)

提供工具(tools)、资源(resources)和提示(prompts)的实际提供者。可以理解为一个「工具仓库」,对外暴露一系列可调用的能力。每个Server有独立的命名空间,避免工具名冲突。

Client(连接管理方)

每个Server对应一个Client,建立一对一的WebSocket连接。Client负责维护这个连接,接收Server的响应,并向Server发送请求。Client对上层(Host)屏蔽了通信细节。

Host(应用入口)

用户直接交互的程序。Host持有所有Client的引用,协调整个MCP会话。当用户问一个问题,Host决定调用哪个Server的哪个Tool,汇总结果后返回给用户。

架构图解:

  用户 / 应用程序(Host)
        │
        │ ① 用户提问:"帮我查一下北京天气"
        ▼
   ┌─────────────────────────────────────┐
   │            Host 进程                │
   │  (LangChain Agent / 主应用程序)      │
   │                                     │
   │  · 持有 Client1, Client2, ...        │
   │  · 统一管理所有工具调用               │
   │  · 将工具结果注入LLM上下文            │
   └────────────────┬────────────────────┘
                    │
        ② Host通过Client1发送JSON-RPC请求
        ③ Client1通过WebSocket转发给Server1
        ▼
   ┌─────────────────────────────────────┐
   │  MCP Server (工具提供方)             │
   │  例:weather-server                 │
   │                                     │
   │  · 实现 tools/callable 接口         │
   │  · 访问外部API/数据库/文件系统        │
   │  · 返回JSON-RPC响应                 │
   └─────────────────────────────────────┘

工具调用完整流程

  1. 用户向Host发送自然语言请求
  2. Host的LLM分析意图,决定调用哪个Tool,生成符合MCP规范的JSON-RPC请求
  3. Host找到对应Tool所属的Server,通过该Server的Client发送请求
  4. Client与Server之间通过WebSocket(长期连接)传输JSON-RPC消息
  5. Server接收请求、执行Tool逻辑、返回结果
  6. Client将结果交给Host,Host将结果注入上下文
  7. LLM再次分析,决定是否继续调用其他Tool,直到生成最终答案

1.4 与Function Calling / Tool Use的区别对比

维度Function CallingTool Use(OpenAI格式)MCP协议
标准化程度厂商私有,各家定义不同厂商私有(OpenAI/Bing)社区开放标准,各家兼容
连接方式HTTP轮询HTTP轮询WebSocket长连接
工具发现机制手动注册,每次请求需重复传递每次请求传递tool定义启动时发现,运行时缓存tool列表
双向通信不支持(纯请求-响应)不支持支持(Server可主动推送)
资源管理有(Resources接口)
多工具协调应用层自行实现应用层自行实现Host统一管理,多Server协作
适用场景单厂商模型、快速集成单厂商模型、快速集成多工具复杂应用、需要长期连接
实战建议:如果你的应用只用一个厂商的模型,且工具数量少、调用简单,直接用Function Calling/Tool Use即可。如果你的应用需要对接多个外部系统(搜索+数据库+文件系统),且需要实时推送能力,MCP是更优选择。

第二章:LangChain Agent开发实战

2.1 环境搭建

# Python 3.10+ 环境验证
$ python --version
Python 3.10.13

# 创建虚拟环境(推荐,生产项目隔离依赖)
$ python -m venv .venv
$ source .venv/bin/activate

# 安装LangChain核心库
$ pip install langchain langchain-core

# 安装LangChain社区版(包含大量集成工具)
$ pip install langchain-community

# 安装OpenAI集成(如果用OpenAI模型)
$ pip install openai

# 安装Tavily搜索(常用搜索工具)
$ pip install tavily-python

# 安装JSON-RPC支持(MCP Server开发需要)
$ pip install python-jsonrpc-server jsonrpc-websocket

# 验证安装
$ python -c "import langchain; print('LangChain', langchain.__version__)"
LangChain 0.3.14

2.2 LangChain的Tool和Agent接口

LangChain的Tool是连接LLM和外部世界的桥梁。每个Tool本质是一个函数,有名称、描述(供LLM理解何时调用)和执行逻辑。

使用预置Tool(以Tavily搜索为例)

# 步骤1:导入LangChain内置工具
from langchain_community.tools import TavilySearchResults

# 步骤2:创建工具实例,max_results限制返回条数
# description参数是给LLM看的"使用说明书",决定模型何时选择这个Tool
search_tool = TavilySearchResults(
    max_results=5,
    description=(
        "当用户询问实时信息、新闻、天气预报、"
        "股价、比赛结果等需要最新数据的问题时使用"
    )
)

# 步骤3:测试工具直接调用(不通过Agent)
result = search_tool.invoke({"query": "Python 3.12 新特性"})
print(result)
# 输出格式:[{'url': '...', 'content': '...'}, ...]

# 步骤4:绑定到LLM,创建Agent
from langchain_openai import ChatOpenAI
from langchain.agents import AgentType, create_react_agent
from langchain import hub

# 初始化大模型(verbose=True开启详细日志)
llm = ChatOpenAI(
    model="gpt-4o",
    temperature=0,
    api_key="your-api-key",    # 实际使用时请替换,或通过环境变量 OPENAI_API_KEY
    verbose=True               # 开启后会在控制台打印Agent推理全过程
)

# 从LangChain Hub拉取ReAct Agent的提示词模板
# prompt = hub.pull("hwchase17/react-chat")  # 聊天版本
prompt = hub.pull("hwchase17/react")  # 标准ReAct版本

# 创建Agent,将工具列表传入
agent = create_react_agent(
    llm=llm,
    tools=[search_tool],
    prompt=prompt
)

# 步骤5:运行Agent
from langchain.agents import AgentExecutor

agent_executor = AgentExecutor(
    agent=agent,
    tools=[search_tool],
    verbose=True,              # 打印完整推理过程,方便调试
    max_iterations=10,         # 限制最大推理步数,防止死循环
    handle_parsing_errors=True # 解析出错时自动修正,而非直接崩溃
)

response = agent_executor.invoke({"input": "2024年诺贝尔物理学奖得主是谁?"})
print(response["output"])

2.3 ReAct模式的完整代码实现

ReAct(Reasoning + Acting)是目前最主流的Agent推理模式。核心思想是让模型交替进行「推理」和「行动」,每一步的推理决定下一步的行动。

# ============================================================
# ReAct 模式完整实现:不依赖LangChain内置Agent,手写推理循环
# 展示ReAct的内部原理,理解每一步在做什么
# ============================================================

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from langchain_community.tools import TavilySearchResults
import os

# 设置API Key(生产环境推荐用环境变量)
os.environ["OPENAI_API_KEY"] = "your-api-key"
os.environ["TAVILY_API_KEY"] = "your-tavily-key"  # 从 tavily.ai 免费申请

# 初始化模型和工具
llm = ChatOpenAI(model="gpt-4o", temperature=0)
search_tool = TavilySearchResults(max_results=3)
tools = {search_tool.name: search_tool}  # 用字典方便按名称查找Tool

# ============================================================
# ReAct提示词模板:包含推理格式的指令
# 格式为:Thought(思考)→ Action(行动)→ Observation(观察)
# ============================================================
REACT_SYSTEM_PROMPT = """你是一个善于使用工具的问题解决助手。

针对用户问题,按照以下格式循环:

Thought: 分析当前情况,决定下一步行动
Action: tool_name{"tool_argument"}  # 格式:工具名{参数}
Observation: 工具返回结果  # 观察结果,进入下一轮推理
...(重复上述步骤直到得到答案)
Final Answer: 最终答案

可用的工具:
- tavily_search_results: 搜索互联网获取实时信息,参数格式:{"query": "搜索关键词"}

开始!"""

# ============================================================
# 手动实现ReAct推理循环
# ============================================================
def react_loop(question: str, max_steps: int = 10):
    """
    手动实现ReAct循环,理解每一步发生了什么

    参数:
        question: 用户问题
        max_steps: 最大推理步数,防止死循环

    返回:最终答案字符串
    """
    # 构建消息历史,以SystemMessage开头定义行为规则
    messages = [
        SystemMessage(content=REACT_SYSTEM_PROMPT),
        HumanMessage(content=question)
    ]

    for step in range(max_steps):
        print(f"\n{'='*50}")
        print(f"🔄 推理步骤 {step + 1}/{max_steps}")

        # ----------------------------------------
        # Step 1:让LLM生成下一步行动(包含Action)
        # ----------------------------------------
        response = llm.invoke(messages)
        response_text = response.content
        print(f"🤖 LLM输出:\n{response_text}")

        # 将LLM回复加入历史(用于下一轮上下文)
        messages.append(AIMessage(content=response_text))

        # ----------------------------------------
        # Step 2:解析LLM输出中的Action
        # ----------------------------------------
        # 查找 Action: 行的格式:Action: tool_name{"..."}
        import re
        # 匹配格式:Action: search_tool{"query": "..."}
        action_pattern = r"Action:\s*(\w+)\s*\{(.+)\}"
        match = re.search(action_pattern, response_text, re.DOTALL)

        if not match:
            # 没有匹配到Action,说明模型认为已经得到答案
            # 检查是否有 Final Answer
            final_pattern = r"Final Answer:\s*(.+)"
            final_match = re.search(final_pattern, response_text, re.DOTALL)
            if final_match:
                return final_match.group(1).strip()
            # 没有Action也没有Final Answer,说明输出格式异常
            # 将解析错误反馈给模型让它自我修正
            messages.append(HumanMessage(
                content=f"你的输出无法解析,请严格按照格式输出。"
                f"格式:Thought: ...\nAction: tool_name{{...参数...}}\n"
                f"或者 Final Answer: 最终答案"
            ))
            continue

        # 提取工具名和参数
        tool_name = match.group(1)
        tool_args_str = match.group(2).strip()

        # 解析参数JSON(处理单引号问题)
        import json
        try:
            # 有些模型输出用单引号,转成双引号
            tool_args_str_normalized = tool_args_str.replace("'", '"')
            tool_args = json.loads(tool_args_str_normalized)
        except json.JSONDecodeError as e:
            print(f"⚠️ 参数JSON解析失败:{e},原始文本:{tool_args_str}")
            messages.append(HumanMessage(
                content=f"参数格式错误:{tool_args_str},请修正为合法的JSON格式。"
            ))
            continue

        # ----------------------------------------
        # Step 3:检查工具是否存在
        # ----------------------------------------
        if tool_name not in tools:
            print(f"⚠️ 工具 {tool_name} 不存在,跳过")
            messages.append(HumanMessage(
                content=f"工具 {tool_name} 不存在。可用工具:{list(tools.keys())}"
            ))
            continue

        # ----------------------------------------
        # Step 4:调用工具
        # ----------------------------------------
        print(f"🔧 调用工具:{tool_name},参数:{tool_args}")
        try:
            tool_result = tools[tool_name].invoke(tool_args)
            # 工具结果转字符串加入观察
            observation = f"Observation: {str(tool_result)}"
            print(f"✅ 工具返回:{tool_result}")
        except Exception as e:
            observation = f"Observation: 工具执行出错:{str(e)}"
            print(f"❌ 工具执行出错:{e}")

        # ----------------------------------------
        # Step 5:将观察结果加入上下文,进入下一轮
        # ----------------------------------------
        messages.append(HumanMessage(content=observation))

    # 超过最大步数仍未结束
    return "推理超时,请简化问题或增加max_steps"

# 测试运行
if __name__ == "__main__":
    result = react_loop("2026年Python最新版本是多少?")
    print(f"\n🎯 最终答案:{result}")

2.4 自定义Tool的两种方式

方式一:@tool装饰器(简洁写法)

# 方式一:使用 @tool 装饰器,最简单直接
from langchain_core.tools import tool

@tool
def calculate_bmi(weight_kg: float, height_m: float) -> str:
    """
    计算BMI身体质量指数。

    参数:
        weight_kg: 体重(公斤)
        height_m: 身高(米)

    返回:
        BMI值和健康建议
    """
    bmi = weight_kg / (height_m ** 2)
    if bmi < 18.5:
        advice = "偏瘦,建议适当增重"
    elif bmi < 24:
        advice = "正常范围,继续保持"
    elif bmi < 28:
        advice = "偏胖,建议适当运动"
    else:
        advice = "肥胖,建议咨询医生"

    return f"BMI = {bmi:.2f},{advice}"

# @tool装饰器自动提取函数签名和docstring生成Tool定义
# 无需手动指定name和description,name默认用函数名
# description默认从docstring第一行提取

# 测试调用
result = calculate_bmi.invoke({"weight_kg": 70, "height_m": 1.75})
print(result)
# 输出:BMI = 22.86,正常范围,继续保持

# ============================================================
# 带描述的@tool(推荐写法)
# ============================================================
@tool
def get_weather(city: str, country: str = "中国") -> str:
    """
    查询指定城市的实时天气。

    当用户询问"某城市天气如何"、"要不要带伞"时使用。

    参数:
        city: 城市名称(中文),如"北京"
        country: 国家,默认为"中国",国外城市需指定

    返回:
        天气描述,包括温度、湿度、天气状况
    """
    # 这里可以接真实天气API,此处简化演示
    return f"{city}今天晴,气温18-26°C,湿度45%,适合出行"

# 在Agent中使用
from langchain.agents import create_react_agent
from langchain import hub

agent = create_react_agent(
    llm=llm,
    tools=[calculate_bmi, get_weather],
    prompt=hub.pull("hwchase17/react")
)

from langchain.agents import AgentExecutor
executor = AgentExecutor(
    agent=agent,
    tools=[calculate_bmi, get_weather],
    verbose=True
)
print(executor.invoke({"input": "我体重75公斤,身高1米8,帮我算一下BMI"})["output"])

方式二:Pydantic模型(类型安全,写法更规范)

# 方式二:继承 BaseTool,使用Pydantic模型定义参数
# 优点:参数类型检查更严格,支持嵌套参数,适合复杂工具

from langchain_core.tools import BaseTool, tool
from pydantic import BaseModel, Field
from typing import Optional

# ============================================================
# 第一步:定义工具的参数模型(输入)
# ============================================================
class SearchCodeInput(BaseModel):
    """搜索代码示例的输入参数模型"""
    query: str = Field(
        description="搜索关键词,如'Python异步编程'",
        examples=["Python异步", "JavaScript事件循环"]
    )
    language: Optional[str] = Field(
        default="python",
        description="编程语言,默认为Python"
    )
    max_results: int = Field(
        default=5,
        description="最多返回结果数"
    )

# ============================================================
# 第二步:继承 BaseTool,实现核心方法
# ============================================================
class CodeSearchTool(BaseTool):
    """
    代码示例搜索引擎

    当用户询问"怎么实现XXX功能"、
    "给我一个XXX的代码示例"时使用。
    """
    name: str = "code_search"          # Tool的唯一标识名
    description: str = "搜索编程代码示例" # 供LLM理解何时调用
    args_schema: type = SearchCodeInput # 参数模型(用于验证和文档)

    # ----------------------------------------
    # _run:同步执行模式(最常用)
    # ----------------------------------------
    def _run(self, query: str, language: str = "python", max_results: int = 5):
        """
        同步执行工具逻辑

        参数由args_schema定义并验证,返回字符串作为工具结果
        """
        # 这里接真实代码搜索API(如GitHub Search API)
        mock_results = [
            f"示例{i}:{query} - {language}实现(第{i}个结果)"
            for i in range(1, max_results + 1)
        ]
        return "\n".join(mock_results)

    # ----------------------------------------
    # _arun:异步执行模式(当工具需要异步IO时实现)
    # ----------------------------------------
    async def _arun(self, query: str, language: str = "python", max_results: int = 5):
        """
        异步执行工具逻辑
        当需要调用异步API(如aiohttp)时使用此方法
        """
        import asyncio
        await asyncio.sleep(0.1)  # 模拟异步请求
        return self._run(query, language, max_results)

# ============================================================
# 第三步:在Agent中使用Pydantic Tool
# ============================================================
code_search = CodeSearchTool()

agent = create_react_agent(
    llm=llm,
    tools=[code_search],
    prompt=hub.pull("hwchase17/react")
)

executor = AgentExecutor(
    agent=agent,
    tools=[code_search],
    verbose=True
)

result = executor.invoke({
    "input": "怎么用Python实现一个简单的Web服务器?给我5个示例"
})
print(result["output"])
何时用@tool,何时用BaseTool?@tool适合简单函数转换,一个函数搞定一切。BaseTool适合逻辑复杂、参数复杂、需要区分同步异步的场景。企业级项目推荐BaseTool,参数验证更可靠。

2.5 多Tool协作编排

多Tool协作的核心问题:工具之间有依赖关系,顺序很重要。典型的编排模式有两种:顺序编排(结果链式传递)和并行编排(互不依赖的工具同时调用)。

顺序编排:Tool输出作为下一个Tool输入

# ============================================================
# 场景:用户问"帮我查一下腾讯最新季报,然后算一下同比增长"
#
# 需要两个Tool协作:
# 1. search_financial_report:查财报(先执行)
# 2. calculate_growth:计算增长率(依赖财报结果,后执行)
# ============================================================

from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_react_agent
from langchain import hub

llm = ChatOpenAI(model="gpt-4o", temperature=0)

@tool
def search_financial_report(company_name: str, period: str) -> str:
    """
    搜索指定公司的财务报表数据。

    参数:
        company_name: 公司名称,如"腾讯"、"阿里巴巴"
        period: 财报周期,如"2024Q1"、"2023年报"
    """
    # 真实场景调用财务数据API,此处模拟
    mock_data = {
        "腾讯_2024Q1": {"revenue": 1595, "profit": 502, "currency": "亿元"},
        "腾讯_2023Q1": {"revenue": 1500, "profit": 480, "currency": "亿元"},
    }
    key = f"{company_name}_{period}"
    if key in mock_data:
        return str(mock_data[key])
    return f"未找到{company_name}{period}数据"

@tool
def calculate_growth(current_value: float, previous_value: float) -> str:
    """
    计算增长率。

    参数:
        current_value: 本期数值
        previous_value: 上期数值
    """
    if previous_value == 0:
        return "除数不能为0"
    growth = ((current_value - previous_value) / previous_value) * 100
    return f"同比增长 {growth:.2f}%"

# 使用LangChain的Runnable接口编排工具链
from langchain_core.runnables import RunnableSequence, RunnableParallel

# 方案A:使用LangChain Expression Language(LCEL)编排
# RunnableSequence:顺序执行,上一个输出传给下一个
financial_chain = RunnableSequence(
    first=search_financial_report,  # 第一步:查财报
    last=calculate_growth            # 第二步:算增长率
)

# 由于calculate_growth需要两个参数(current和previous),
# 而search_financial_report只返回一个值,我们需要写一个转换函数

def extract_for_growth(report_result: str) -> dict:
    """
    从财报结果中提取数值,准备计算增长率
    财报数据格式如:{'revenue': 1595, 'profit': 502, 'currency': '亿元'}
    """
    import ast
    data = ast.literal_eval(report_result)
    return {
        "current_value": data["revenue"],
        "previous_value": data["revenue"] * 0.94  # 模拟去年同期94%
    }

# 用LCEL链起来
chain_with_extraction = search_financial_report | extract_for_growth | calculate_growth

# 测试
result = chain_with_extraction.invoke({
    "company_name": "腾讯",
    "period": "2024Q1"
})
print(f"计算结果:{result}")
# 输出:同比增长 6.33%

并行编排:互不依赖的工具同时执行

# ============================================================
# 场景:用户问"帮我同时查一下北京、上海、广州今天的天气"
#
# 三个城市的天气查询互不依赖,可以并行执行提高效率
# ============================================================

from langchain_core.tools import tool
from langchain_core.runnables import RunnableParallel
import concurrent.futures

@tool
def get_weather(city: str) -> str:
    """查询单个城市的天气,参数:city-城市名"""
    weathers = {
        "北京": "晴,18-26°C,PM2.5优",
        "上海": "多云,20-28°C,PM2.5良",
        "广州": "雷阵雨,25-32°C,PM2.5中"
    }
    return weathers.get(city, f"未找到{city}数据")

# RunnableParallel:并行执行,所有工具同时运行
parallel_weather = RunnableParallel(
    beijing=get_weather,
    shanghai=get_weather,
    guangzhou=get_weather
)

# invoke一次,所有城市并行查询
result = parallel_weather.invoke({
    "beijing": {"city": "北京"},
    "shanghai": {"city": "上海"},
    "guangzhou": {"city": "广州"}
})
print(result)
# 输出:{'beijing': '晴,18-26°C,PM2.5优',
#        'shanghai': '多云,20-28°C,PM2.5良',
#        'guangzhou': '雷阵雨,25-32°C,PM2.5中'}

# ============================================================
# 复杂编排:并行查数据,汇总后再分析
# ============================================================
from langchain_core.runnables import RunnableBranch

# 并行执行多个独立查询(不互相依赖的工具)
parallel_search = RunnableParallel(
    revenue=lambda _: "腾讯Q1营收1595亿元",
    user_active=lambda _: "微信月活13.5亿",
    market_share=lambda _: "市占率约35%"
)

# 汇总后做分析(等并行结果都返回后才执行)
def summarize_analysis(search_results: dict) -> str:
    """汇总分析三个查询结果"""
    return (
        f"财报分析:\n"
        f"1. 营收:{search_results['revenue']}\n"
        f"2. 用户活跃:{search_results['user_active']}\n"
        f"3. 市占率:{search_results['market_share']}\n"
        f"综合判断:业务稳健,用户基数大,营收增长空间可观"
    )

full_chain = parallel_search | summarize_analysis
analysis_result = full_chain.invoke({})
print(analysis_result)

第三章:MCP Server开发

3.1 手写一个最简单的MCP Server

MCP Server的实现核心是暴露三个标准接口:initialize(初始化握手)、tools/list(列出可用工具)、tools/call(调用指定工具)。以下是一个最小可用MCP Server的完整实现:

# ============================================================
# mcp_simple_server.py
# 最小化MCP Server实现:搜索工具 + 计算器工具
# 运行方式:python mcp_simple_server.py
# 或通过stdio方式运行:python -m uvicorn mcp_simple_server:app
# ============================================================

import json
import sys
from datetime import datetime

# ----------------------------------------
# 工具定义:MCP规范下每个工具是一个对象
# 包含 name(唯一标识)、description(供LLM理解)、input_schema(参数校验)
# ----------------------------------------
TOOLS = [
    {
        "name": "web_search",                  # 工具唯一标识
        "description": "搜索互联网获取信息。用户询问实时新闻、"
                       "天气、股价等需要最新数据的问题时使用。",
        "inputSchema": {                        # JSON Schema格式的参数定义
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "搜索关键词"
                },
                "max_results": {
                    "type": "integer",
                    "description": "最多返回条数,默认5",
                    "default": 5
                }
            },
            "required": ["query"]
        }
    },
    {
        "name": "calculator",                   # 计算器工具
        "description": "执行数学计算,支持加减乘除和幂运算。",
        "inputSchema": {
            "type": "object",
            "properties": {
                "expression": {
                    "type": "string",
                    "description": "数学表达式,如 '2+3*5' 或 '10**2'"
                }
            },
            "required": ["expression"]
        }
    }
]

# ----------------------------------------
# 工具执行函数映射表
# key为工具名,value为执行函数
# ----------------------------------------
def execute_web_search(args: dict) -> str:
    """执行搜索(这里用模拟数据,生产环境接真实搜索API)"""
    query = args["query"]
    max_results = args.get("max_results", 5)
    results = [
        f"结果{i}:关于「{query}」的相关信息(第{i}条)"
        for i in range(1, max_results + 1)
    ]
    return "\n".join(results)

def execute_calculator(args: dict) -> str:
    """执行数学计算(使用安全eval,不直接用python eval)"""
    expression = args["expression"]
    # 安全的数学表达式计算(只允许数字和运算符)
    allowed_chars = set("0123456789+-*/.()** ")
    if not all(c in allowed_chars for c in expression):
        return f"错误:表达式包含非法字符,只支持数字和运算符 +-*/()"
    try:
        result = eval(expression, {"__builtins__": {}}, {})  # 受限eval
        return f"计算结果:{expression} = {result}"
    except Exception as e:
        return f"计算错误:{str(e)}"

TOOL_EXECATORS = {
    "web_search": execute_web_search,
    "calculator": execute_calculator
}

# ----------------------------------------
# JSON-RPC请求处理主函数
# ----------------------------------------
def handle_request(request: dict) -> dict:
    """
    处理收到的JSON-RPC请求,返回符合规范的响应

    参数:
        request: JSON-RPC格式的请求对象

    返回:
        JSON-RPC格式的响应对象(成功用result,失败用error)
    """
    jsonrpc = request.get("jsonrpc", "2.0")
    req_id = request.get("id")
    method = request.get("method")
    params = request.get("params", {})

    # ----------------------------------------
    # 方法1:initialize - 握手初始化
    # Client连接Server时首先发送此请求,交换协议版本和能力
    # ----------------------------------------
    if method == "initialize":
        return {
            "jsonrpc": "2.0",
            "id": req_id,
            "result": {
                "protocolVersion": "2024-11-05",  # MCP协议版本
                "capabilities": {                   # Server声明自己的能力
                    "tools": {},                   # 声明自己支持tools能力
                    # "resources": {},              # 如需资源管理能力,在此声明
                    # "prompts": {}                 # 如需提示管理能力,在此声明
                },
                "serverInfo": {
                    "name": "simple-mcp-server",
                    "version": "1.0.0"
                }
            }
        }

    # ----------------------------------------
    # 方法2:tools/list - 列出所有可用工具
    # Host通过此请求获取Server提供哪些工具及其定义
    # ----------------------------------------
    elif method == "tools/list":
        return {
            "jsonrpc": "2.0",
            "id": req_id,
            "result": {
                "tools": TOOLS  # 返回工具定义列表
            }
        }

    # ----------------------------------------
    # 方法3:tools/call - 调用具体工具
    # Host决定调用某个工具时发送此请求
    # params格式:{"name": "工具名", "arguments": {...}}
    # ----------------------------------------
    elif method == "tools/call":
        tool_name = params.get("name")
        tool_args = params.get("arguments", {})

        # 工具不存在检查
        if tool_name not in TOOL_EXECATORS:
            return {
                "jsonrpc": "2.0",
                "id": req_id,
                "error": {
                    "code": -32601,              # JSON-RPC标准错误码:方法未找到
                    "message": f"工具 '{tool_name}' 不存在"
                }
            }

        # 执行工具
        try:
            result = TOOL_EXECATORS[tool_name](tool_args)
            return {
                "jsonrpc": "2.0",
                "id": req_id,
                "result": {
                    "content": [                   # MCP规定格式:content数组
                        {
                            "type": "text",
                            "text": result
                        }
                    ]
                }
            }
        except Exception as e:
            # 工具执行出错
            return {
                "jsonrpc": "2.0",
                "id": req_id,
                "error": {
                    "code": -32603,              # JSON-RPC标准错误码:内部错误
                    "message": f"工具执行出错:{str(e)}"
                }
            }

    # ----------------------------------------
    # 方法4:notifications/initialized - 客户端握手完成通知
    # Client在收到initialize响应后发送此通知,表示握手完成
    # Server收到后可以开始正常通信
    # ----------------------------------------
    elif method == "notifications/initialized":
        # 通知类方法不需要返回响应
        return None

    else:
        return {
            "jsonrpc": "2.0",
            "id": req_id,
            "error": {
                "code": -32601,
                "message": f"方法 '{method}' 不存在或不支持"
            }
        }

# ----------------------------------------
# 标准输入输出模式(stdio):
# MCP Client通过stdin/stdout与Server通信
# 适用于本地进程场景
# ----------------------------------------
def run_stdio_server():
    """通过标准输入输出运行MCP Server(最常用的本地模式)"""
    print("INFO: MCP Server启动,使用stdio模式", file=sys.stderr)

    for line in sys.stdin:
        line = line.strip()
        if not line:
            continue

        try:
            request = json.loads(line)
        except json.JSONDecodeError:
            # JSON解析失败,返回错误响应
            print(json.dumps({
                "jsonrpc": "2.0",
                "id": None,
                "error": {
                    "code": -32700,
                    "message": "无效的JSON格式"
                }
            }), file=sys.stdout)
            continue

        # 处理请求
        response = handle_request(request)

        # notifications/initialized 等通知类返回None,不输出
        if response is not None:
            print(json.dumps(response), file=sys.stdout)
            sys.stdout.flush()

if __name__ == "__main__":
    run_stdio_server()

3.2 工具注册和请求处理流程

工具注册的核心是在启动时将所有Tool的定义(name、description、inputSchema)和执行函数(executor)绑定在一起。MCP Server启动后会:

  1. 等待Client连接(stdin模式或WebSocket模式)
  2. Client发送initialize请求 → Server返回协议版本和能力列表
  3. Client发送notifications/initialized → 握手完成
  4. Client发送tools/list → Server返回工具定义列表
  5. 每次用户请求涉及某Tool时,Client发送tools/call → Server执行并返回结果
# ============================================================
# 工具注册和请求处理的完整流程
# 演示如何组织代码结构,便于维护和扩展
# ============================================================

from typing import Callable, Any
from dataclasses import dataclass

# ----------------------------------------
# 工具定义数据类
# ----------------------------------------
@dataclass
class ToolDefinition:
    """
    工具定义数据结构
    """
    name: str                              # 唯一标识
    description: str                      # 供LLM理解何时使用
    input_schema: dict                     # JSON Schema格式的参数定义
    executor: Callable[[dict], Any]       # 执行函数

# ----------------------------------------
# 工具注册表(全局)
# 所有工具在此注册,tools/list返回此表
# ----------------------------------------
_tool_registry: dict[str, ToolDefinition] = {}

def register_tool(
    name: str,
    description: str,
    input_schema: dict,
    executor: Callable[[dict], Any]
):
    """
    工具注册函数

    参数:
        name: 工具名,全局唯一
        description: 给LLM看的描述
        input_schema: 参数JSON Schema
        executor: 执行函数,接收参数字典
    """
    _tool_registry[name] = ToolDefinition(
        name=name,
        description=description,
        input_schema=input_schema,
        executor=executor
    )

def get_tool_definitions() -> list[dict]:
    """
    获取所有工具定义(用于tools/list响应)
    不包含executor,只包含元数据
    """
    return [
        {
            "name": t.name,
            "description": t.description,
            "inputSchema": t.input_schema
        }
        for t in _tool_registry.values()
    ]

def execute_tool(name: str, arguments: dict) -> Any:
    """
    执行指定工具
    参数:
        name: 工具名
        arguments: 参数字典
    """
    if name not in _tool_registry:
        raise ValueError(f"工具 {name} 不存在")
    return _tool_registry[name].executor(arguments)

# ----------------------------------------
# 注册工具示例(用装饰器方式注册)
# ----------------------------------------
def define_tool(name: str, description: str, input_schema: dict):
    """
    工具定义装饰器

    用法:
        @define_tool("tool_name", "description", {"type": "object", ...})
        def my_tool(args):
            ...
    """
    def decorator(func: Callable[[dict], Any]):
        register_tool(name, description, input_schema, func)
        return func
    return decorator

# 使用装饰器注册工具
@define_tool(
    name="currency_converter",
    description="货币换算,将一个金额从一种货币换算成另一种货币",
    input_schema={
        "type": "object",
        "properties": {
            "amount": {"type": "number", "description": "金额"},
            "from_currency": {"type": "string", "description": "源货币,如USD、CNY"},
            "to_currency": {"type": "string", "description": "目标货币,如EUR、JPY"}
        },
        "required": ["amount", "from_currency", "to_currency"]
    }
)
def currency_converter(args: dict) -> str:
    """货币换算工具"""
    rates = {"USD_CNY": 7.2, "USD_EUR": 0.92, "CNY_JPY": 20.5}
    amount = args["amount"]
    from_c = args["from_currency"]
    to_c = args["to_currency"]
    key = f"{from_c}_{to_c}"

    if key in rates:
        result = amount * rates[key]
        return f"{amount} {from_c} = {result:.2f} {to_c}"
    return f"暂不支持 {from_c} 到 {to_c} 的换算"

@define_tool(
    name="file_reader",
    description="读取文件内容,返回文件文本",
    input_schema={
        "type": "object",
        "properties": {
            "path": {"type": "string", "description": "文件路径"}
        },
        "required": ["path"]
    }
)
def file_reader(args: dict) -> str:
    """读取文件工具"""
    try:
        with open(args["path"], "r", encoding="utf-8") as f:
            return f.read(4096)  # 最多读取4KB,防止内存溢出
    except FileNotFoundError:
        return f"文件不存在:{args['path']}"
    except Exception as e:
        return f"读取出错:{str(e)}"

# ----------------------------------------
# 初始化时注册所有工具
# ----------------------------------------
def initialize_tools():
    """在Server启动时调用,初始化所有工具注册"""
    currency_converter({"amount": 1, "from_currency": "USD", "to_currency": "CNY"})  # 注册
    file_reader({"path": "/tmp/dummy"})  # 注册

# 在模块加载时自动注册(也可以在Server启动时显式调用)
initialize_tools()

3.3 错误处理和返回值规范

MCP的返回值规范要求工具结果放在result.content数组里,每个元素有type和text字段。错误遵循JSON-RPC 2.0标准错误码规范。

# ============================================================
# MCP 错误处理规范
# ============================================================

# JSON-RPC 标准错误码(部分)
MCP_ERROR_CODES = {
    -32700: "解析错误 - 收到无效JSON",           # Parse error
    -32600: "非法请求 - JSON格式正确但结构非法",   # Invalid Request
    -32601: "方法未找到",                         # Method not found
    -32602: "参数无效",                           # Invalid params
    -32603: "内部错误",                           # DM Sansnal error
    # MCP扩展错误码(从-32000开始)
    -32000: "工具执行失败",
    -32001: "工具超时",
    -32002: "资源不可用",
}

# ----------------------------------------
# 错误响应构建函数
# ----------------------------------------
def mcp_error(code: int, message: str, req_id=None) -> dict:
    """构建MCP错误响应"""
    return {
        "jsonrpc": "2.0",
        "id": req_id,
        "error": {
            "code": code,
            "message": message,
            "data": None  # 可扩展:附加错误详情
        }
    }

# ----------------------------------------
# 工具返回值规范
# MCP要求content字段是一个content数组
# 每个元素可以是text(文本)、image(图片)、blob(二进制)
# ----------------------------------------
def mcp_success(text: str, req_id=None) -> dict:
    """构建MCP成功响应"""
    return {
        "jsonrpc": "2.0",
        "id": req_id,
        "result": {
            "content": [
                {"type": "text", "text": text}
            ],
            "isError": False  # 标记是否为错误类型结果
        }
    }

# 工具执行失败的响应(isError=True,HTTP 200但内容含错误信息)
def mcp_tool_error(text: str, req_id=None) -> dict:
    """工具执行出错时的响应(isError=True但HTTP状态码200)"""
    return {
        "jsonrpc": "2.0",
        "id": req_id,
        "result": {
            "content": [
                {"type": "text", "text": f"工具执行失败:{text}"}
            ],
            "isError": True
        }
    }

# ============================================================
# 完整错误处理示例:参数校验 → 执行 → 异常捕获
# ============================================================
def safe_execute_tool(tool_name: str, arguments: dict, req_id=None) -> dict:
    """
    带完整错误处理的工具执行函数

    错误处理流程:
    1. 检查工具是否存在 → -32601
    2. 校验参数格式 → -32602
    3. 执行并捕获异常 → -32603 或 -32000
    """
    # Step 1: 工具存在性检查
    if tool_name not in _tool_registry:
        return mcp_error(
            -32601,
            f"工具 '{tool_name}' 不存在,可用工具:{list(_tool_registry.keys())}",
            req_id
        )

    tool_def = _tool_registry[tool_name]

    # Step 2: 参数校验(基于inputSchema)
    # 生产环境推荐用jsonschema库做严格校验
    import jsonschema
    try:
        jsonschema.validate(instance=arguments, schema=tool_def.input_schema)
    except jsonschema.ValidationError as e:
        return mcp_error(
            -32602,
            f"参数校验失败:{e.message},期望格式:{tool_def.input_schema}",
            req_id
        )

    # Step 3: 执行并捕获异常
    try:
        result = tool_def.executor(arguments)
        return mcp_success(str(result), req_id)
    except TimeoutError:
        return mcp_error(-32001, "工具执行超时", req_id)
    except Exception as e:
        return mcp_error(-32000, f"工具执行异常:{str(e)}", req_id)

第四章:调试经验

4.1 LangChain Agent调试技巧

技巧1:verbose=True日志分析

# ============================================================
# 开启verbose=True后,LangChain会打印完整的推理过程
# 分析这些日志可以定位Agent"在想什么"
# ============================================================

from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_react_agent
from langchain import hub
from langchain_community.tools import TavilySearchResults

llm = ChatOpenAI(model="gpt-4o", temperature=0)
search = TavilySearchResults(max_results=3)

agent = create_react_agent(llm=llm, tools=[search], prompt=hub.pull("hwchase17/react"))
executor = AgentExecutor(
    agent=agent,
    tools=[search],
    verbose=True,  # 🔑 开启详细日志
    max_iterations=5
)

executor.invoke({"input": "2026年FIFA世界杯在哪里举办?"})

# ============================================================
# verbose=True 输出的结构解析
# ============================================================
"""
👀 Entering new AgentExecutor chain...

# Step 1:第一轮推理
Thought: 用户问的是2026年世界杯举办地,这是实时信息,需要用搜索工具。
Action: tavily_search_results{"query": "2026年FIFA世界杯举办地"}
Action: tavily_search_results call started...

# Step 2:工具执行中
[tavily] Entering child run - call: tavily_search_results with input: {'query': '2026年FIFA世界杯举办地'}
[tavily] Finished chain.

# Step 3:工具返回结果给LLM
Observation: ['结果1:2026年FIFA世界杯将在美国、加拿大、墨西哥联合举办...']

# Step 4:第二轮推理(根据工具结果给出最终答案)
Thought: 已得到答案,2026年世界杯由北美三国联合举办。
Final Answer: 2026年FIFA世界杯将由美国、加拿大和墨西哥联合举办,这是历史上首次由三国联合举办的世界杯。
"""
# ============================================================
# 当Agent行为异常时,从日志中定位问题:
# 1. Thought行:检查LLM的推理是否合理
# 2. Action行:检查工具选择是否正确
# 3. Observation行:检查工具返回是否符合预期
# ============================================================

技巧2:中间结果检查点

# ============================================================
# 自定义回调:实时监控每一步推理和工具调用
# 适用于生产环境需要记录完整执行轨迹
# ============================================================

from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.agent import AgentFinish, AgentAction

class DebugCallback(BaseCallbackHandler):
    """
    自定义调试回调,打印Agent执行过程中的每一步

    继承BaseCallbackHandler,覆盖感兴趣的事件方法
    """

    def on_agent_action(self, action: AgentAction, **kwargs):
        """每当Agent执行一个Tool时调用"""
        print(f"\n🔧 [Tool调用] 工具名:{action.tool}")
        print(f"📥 [Tool参数] {action.tool_input}")

    def on_agent_finish(self, finish: AgentFinish, **kwargs):
        """当Agent完成最终输出时调用"""
        print(f"\n✅ [最终答案] {finish.return_values['output']}")

    def on_chain_end(self, chain_end: dict, **kwargs):
        """每个Chain执行完毕时调用,可用于统计耗时"""
        print(f"\n📊 [Chain结束] 输出长度:{len(str(chain_end))}")

# 使用回调
from langchain_core.callbacks import StdOutCallbackHandler

agent = create_react_agent(llm=llm, tools=[search], prompt=hub.pull("hwchase17/react"))
executor = AgentExecutor(
    agent=agent,
    tools=[search],
    callbacks=[DebugCallback()]  # 🔑 注入自定义回调
)

executor.invoke({"input": "Python之父是谁?"})

4.2 MCP协议抓包分析

# ============================================================
# MCP协议调试:stdio模式下的日志输出
# 在Server代码中加入结构化日志,便于分析协议交互
# ============================================================

import json
import sys
from datetime import datetime

def log_json_rpc(direction: str, message: dict):
    """
    打印JSON-RPC消息的日志(用于调试分析)

    参数:
        direction: "→" 表示收到的请求,"←" 表示发出的响应
        message: JSON-RPC消息对象
    """
    timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
    method = message.get("method", "response/batch")
    req_id = message.get("id", "-")

    # 彩色输出(Unix终端)
    arrow = "→" if direction == "IN" else "←"
    print(
        f"[{timestamp}] {arrow} {method} (id={req_id})",
        file=sys.stderr
    )
    # 如果是debug模式,打印完整payload
    if len(sys.argv) > 1 and sys.argv[1] == "--debug":
        print(
            json.dumps(message, indent=2, ensure_ascii=False),
            file=sys.stderr
        )
    sys.stderr.flush()

# 在stdio循环中:
def run_debug_server():
    """带调试输出的Server循环"""
    for line in sys.stdin:
        line = line.strip()
        if not line:
            continue

        request = json.loads(line)
        log_json_rpc("IN", request)    # 打印收到的请求

        response = handle_request(request)
        if response is not None:
            log_json_rpc("OUT", response)  # 打印发出的响应

        if response is not None:
            print(json.dumps(response), file=sys.stdout)
            sys.stdout.flush()

# ----------------------------------------
# 启动命令:
# python mcp_server.py --debug 2>&1 | grep "→\|←"
# ----------------------------------------

# ----------------------------------------
# WebSocket模式的抓包(使用python jsonrpc-websocket)
# ----------------------------------------
"""
import asyncio
import json
from jsonrpc_websocket import Server

async def test_mcp_ws():
    server = Server("ws://localhost:8000/mcp")

    # 初始化握手
    init_result = await server.send("initialize", {
        "protocolVersion": "2024-11-05",
        "capabilities": {"tools": {}},
        "clientInfo": {"name": "test-client", "version": "1.0.0"}
    })
    print("初始化响应:", init_result)

    # 列出工具
    tools_result = await server.send("tools/list")
    print("工具列表:", tools_result)

    # 调用工具
    call_result = await server.send("tools/call", {
        "name": "calculator",
        "arguments": {"expression": "2**10"}
    })
    print("工具调用:", call_result)

asyncio.run(test_mcp_ws())
"""

4.3 常见报错排查步骤

报错1:「Tool叫不到」

典型表现:Agent运行时提示"Tool not found"或Agent根本不调用Tool,直接回答

排查步骤:

  1. 检查Tool是否正确注册:运行Agent后看verbose日志中是否列出了该Tool
  2. 检查description是否足够清晰:description是LLM判断是否调用Tool的唯一依据,如果描述模糊,LLM无法理解何时使用
  3. 检查是否传入了AgentExecutor:create_react_agent的tools参数和AgentExecutor的tools参数必须一致
  4. 检查模型是否支持Tool Use:GPT-3.5-turbo-0613以下版本对Tool支持有限,升级模型
# 调试:打印Agent知道的工具列表
print("Agent可用工具:", [t.name for t in executor.tools])
# 应该包含你注册的所有工具

报错2:「参数格式错误」

典型表现:Tool被执行了但参数为空、或参数类型不匹配

排查步骤:

  1. 检查inputSchema的required字段:必填参数未传时会报错
  2. 检查参数类型:schema定义integer但传了string是最常见的类型错误
  3. 检查Pydantic模型的Field描述:Field的description会被用来生成Tool描述,建议填写
# 常见错误:参数类型不匹配
# 定义:weight: int = Field(description="体重(公斤)")
# 传入:{"weight": "75"}  ← 字符串,应该传整数 75

# 解决:让LLM传正确的类型,或在Tool内部做类型转换
def _run(self, weight_kg: int, **kwargs):
    weight = int(weight_kg)  # 做容错处理
    ...

报错3:「MCP Server连接超时」

典型表现:WebSocket连接失败、stdio进程无响应

排查步骤:

  1. stdio模式:检查进程是否正常启动,stderr是否有错误输出
  2. WebSocket模式:检查端口是否被占用、防火墙是否放行
  3. 检查协议版本匹配:Client和Server的protocolVersion不一致会导致握手失败
  4. 设置超时:在Client端设置合理的超时时间,避免无限等待
# WebSocket Server超时设置
import asyncio
from jsonrpc_websocket import Server

async def call_with_timeout():
    server = Server("ws://localhost:8000/mcp")
    try:
        # 5秒超时
        result = await asyncio.wait_for(
            server.send("tools/list"),
            timeout=5.0
        )
        return result
    except asyncio.TimeoutError:
        print("Server连接超时,检查Server是否正常运行")
        return None

4.4 Streaming模式的实现和注意事项

# ============================================================
# LangChain Agent的Streaming模式
# 将LLM的输出流式返回给用户,提升响应体验
# 适用于打字效果、长文本生成的场景
# ============================================================

from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_react_agent
from langchain import hub

llm = ChatOpenAI(model="gpt-4o", temperature=0, streaming=True)  # 🔑 开启流式
search = TavilySearchResults(max_results=3)

agent = create_react_agent(llm=llm, tools=[search], prompt=hub.pull("hwchase17/react"))
executor = AgentExecutor(
    agent=agent,
    tools=[search],
    streaming=True  # 🔑 AgentExecutor也要开启streaming
)

# 使用streaming模式
from langchain_core.callbacks import StdOutCallbackHandler

# 方法1:通过回调实时获取流式输出
print("Agent回答:", end="", flush=True)
for chunk in executor.stream({"input": "解释一下什么是量子计算"}):
    # chunk是每个推理步骤的输出
    if "output" in chunk:
        print(chunk["output"], end="", flush=True)
    elif "steps" in chunk:
        # 工具调用步骤的输出
        pass
print()  # 最后换行

# ----------------------------------------
# Streaming注意事项:
# ----------------------------------------
# 1. Tool执行本身不支持Streaming(Tool是整块返回结果的)
#    只有LLM的思考过程是流式的
# 2. verbose=True和streaming=True同时开启时,日志会有交织
#    生产环境建议分开调试
# 3. 流式输出时需要前端配合(如SSE/WebSocket推送),后端改动不大
# ----------------------------------------

第五章:性能优化

5.1 批量Tool调用的并发控制

# ============================================================
# 批量Tool调用的并发控制
# 场景:Agent一次需要调用多个不相关的Tool
# 方案:asyncio + ThreadPoolExecutor 控制并发度
# ============================================================

import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List, Callable, Any

# ----------------------------------------
# 方案1:信号量控制并发数(Semaphore)
# 适合IO密集型Tool(网络请求)
# ----------------------------------------
async def parallel_tool_calls_semaphore(
    tool_calls: List[dict],
    max_concurrency: int = 3
) -> List[Any]:
    """
    使用asyncio.Semaphore控制Tool并发数

    参数:
        tool_calls: [{"tool": tool1, "args": {...}}, ...]
        max_concurrency: 最大同时执行的Tool数
    """
    semaphore = asyncio.Semaphore(max_concurrency)

    async def call_with_semaphore(tool_call: dict) -> Any:
        async with semaphore:
            # 在这里调用Tool(如果是同步函数,用run_in_executor包装)
            return await asyncio.get_event_loop().run_in_executor(
                None,  # 使用默认线程池
                tool_call["tool"].invoke,
                tool_call["args"]
            )

    tasks = [call_with_semaphore(tc) for tc in tool_calls]
    # 🔑 asyncio.gather: 并发执行所有任务,结果按顺序返回
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # 处理异常结果(某Tool出错不影响其他Tool)
    return [
        r if not isinstance(r, Exception) else f"错误:{str(r)}"
        for r in results
    ]

# ----------------------------------------
# 方案2:ThreadPoolExecutor控制线程池大小
# 适合需要控制线程资源的场景
# ----------------------------------------
def parallel_tool_calls_threadpool(
    tool_calls: List[dict],
    max_workers: int = 5
) -> List[Any]:
    """
    使用ThreadPoolExecutor控制Tool并发执行

    参数:
        tool_calls: [{"tool": tool, "args": {...}}, ...]
        max_workers: 最大线程数
    """
    with ThreadPoolExecutor(max_workers=max_workers) as pool:
        futures = [
            pool.submit(tc["tool"].invoke, tc["args"])
            for tc in tool_calls
        ]
        results = [f.result() for f in futures]
    return results

# ----------------------------------------
# 示例:同时搜索多个关键词
# ----------------------------------------
@tool
def search_news(query: str) -> str:
    """搜索新闻,参数:query-搜索词"""
    return f"新闻搜索结果:{query}相关资讯..."

async def demo_batch_search():
    queries = [
        {"query": "AI大模型最新进展"},
        {"query": "量子计算突破"},
        {"query": "新能源技术"},
        {"query": "自动驾驶现状"},
        {"query": "AR/VR设备"},
    ]

    tool_calls = [
        {"tool": search_news, "args": q}
        for q in queries
    ]

    print("开始并发搜索(最大并发3)...")
    results = await parallel_tool_calls_semaphore(tool_calls, max_concurrency=3)
    for i, r in enumerate(results):
        print(f"  [{i+1}] {r}")

asyncio.run(demo_batch_search())

5.2 Tool结果缓存策略

# ============================================================
# Tool结果缓存:避免重复调用耗时Tool
# 场景:同一查询短时间内被多次触发
# ============================================================

from functools import lru_cache
from typing import Any, Callable, Hashable
import hashlib
import json

# ----------------------------------------
# 方案1:基于函数参数的LRU缓存(适合纯函数)
# ----------------------------------------
@lru_cache(maxsize=128)
def cached_search(query: str, max_results: int = 5) -> str:
    """
    带LRU缓存的搜索函数

    相同参数(query + max_results)的搜索请求,在缓存有效期内直接返回结果
    适合搜索、查库等幂等操作
    """
    # 这里放真实搜索逻辑
    return f"搜索结果:{query}"

# 清除缓存
# cached_search.cache_clear()

# ----------------------------------------
# 方案2:基于Tool输入的通用缓存装饰器
# 支持不可哈希的参数(字典)
# ----------------------------------------
def hash_args(args: dict) -> str:
    """将参数字典转成哈希字符串,用于缓存key"""
    json_str = json.dumps(args, sort_keys=True, ensure_ascii=True)
    return hashlib.md5(json_str.encode()).hexdigest()

def cache_tool_result(maxsize: int = 128, ttl_seconds: float = 300):
    """
    Tool结果缓存装饰器

    参数:
        maxsize: 缓存容器大小(最近N次结果)
        ttl_seconds: 缓存有效期(秒),默认5分钟
    """
    cache: dict = {}
    cache_time: dict = {}

    def decorator(func: Callable) -> Callable:
        def wrapper(args: dict) -> str:
            cache_key = hash_args(args)
            now = __import__("time").time()

            # 命中缓存且未过期
            if cache_key in cache:
                if now - cache_time[cache_key] < ttl_seconds:
                    return f"[缓存命中] {cache[cache_key]}"
                else:
                    # 过期了,删除缓存
                    del cache[cache_key]
                    del cache_time[cache_key]

            # 执行真实逻辑
            result = func(args)
            cache[cache_key] = result
            cache_time[cache_key] = now

            # 超过maxsize,清除最老的缓存
            if len(cache) > maxsize:
                oldest_key = min(cache_time, key=cache_time.get)
                del cache[oldest_key]
                del cache_time[oldest_key]

            return result
        return wrapper
    return decorator

# 使用示例
@cache_tool_result(maxsize=64, ttl_seconds=600)
def expensive_api_tool(args: dict) -> str:
    """需要调用外部付费API的Tool,应该加缓存"""
    return f"API返回结果:{args}"

5.3 Agent推理步数限制

# ============================================================
# max_iterations / max_execution_time
# 防止Agent死循环或无限执行的核心手段
# ============================================================

from langchain.agents import AgentExecutor, create_react_agent
from langchain import hub
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o", temperature=0)
search = TavilySearchResults(max_results=3)

agent = create_react_agent(llm=llm, tools=[search], prompt=hub.pull("hwchase17/react"))

# ----------------------------------------
# AgentExecutor 核心参数
# ----------------------------------------
executor = AgentExecutor(
    agent=agent,
    tools=[search],

    # 🔑 max_iterations:最大推理步数(每个Tool调用算1步)
    # 设置后,Agent超过此步数会停止并返回中间结果
    max_iterations=10,

    # 🔑 max_execution_time:最大执行时间(秒)
    # 达到时间限制后强制停止
    max_execution_time=60.0,

    # 🔑 early_stopping_method:停止时用什么策略
    # "force" = 强制停止,返回最终状态
    # "generate" = 让Agent生成最终答案再停止
    early_stopping_method="force",

    # 🔑 handle_parsing_errors:遇到解析错误怎么办
    # True = 把错误信息反馈给Agent让它自我修正
    # False = 直接抛出异常
    handle_parsing_errors=True,

    # 🔑 return_intermediate_steps:是否返回中间步骤日志
    # True = 返回所有Tool调用记录,便于调试
    # False = 只返回最终答案(节省token)
    return_intermediate_steps=True
)

# ----------------------------------------
# 检查中间步骤:看Agent走了哪些弯路
# ----------------------------------------
result = executor.invoke({"input": "解释一下什么是RAG"})

print("✅ 最终答案:", result["output"])

# 中间步骤:分析Agent的思考路径
print("\n🔍 Agent推理路径(共{}步):".format(len(result["intermediate_steps"])))
for i, step in enumerate(result["intermediate_steps"]):
    action = step[0]  # AgentAction
    observation = step[1]  # 工具返回值
    print(f"  Step {i+1}: 工具={action.tool}, 参数={action.tool_input}")
    print(f"         工具返回={observation[:80]}...")
生产环境建议:max_iterations设置5-10之间,max_execution_time设置60秒以内。开启return_intermediate_steps用于日志记录,但不要在生产环境打印(token费用)。

总结

本文覆盖了LangChain Agent与MCP协议整合开发的全链路知识:

这套体系的核心价值在于:一次开发,多处复用。你写的MCP Server可以被任意MCP兼容的Agent连接调用,你定义的LangChain Tool可以在不同Agent里重复使用。在AI应用快速迭代的阶段,这种工程化复用能力是竞争力的关键。