>
一个通用的AI助手(如GPT-4、Claude)可以回答各种问题,但它的能力是"大而全"而非"专而深"。当业务场景需要专业化能力时,单Agent的局限性就显现出来:
多Agent系统的核心理念是专业分工 + 有序协作:让每个Agent专注于自己擅长的领域,通过标准化的消息传递协议实现协作。
# 单Agent vs 多Agent 对比
# 单Agent(ChatGPT风格):
# 用户: "帮我分析销售数据,然后写成报告"
# AI: [理解任务] → [分析数据] → [写报告] → 返回结果
# 问题:一个模型要同时做数据分析和文章写作,两项都不够专业
# 多Agent(分工协作):
# 用户: "帮我分析销售数据,然后写成报告"
#
# orchestrator(调度员)
# ↓ 分解任务
# ┌──────────┴──────────┐
# ↓ ↓
# data_analyst report_writer
# (数据分析专家) (文档写作专家)
# ↓ ↓
# 数据分析报告 初稿文本
# ↓ ↓
# └──────────┬──────────┘
# ↓
# orchestrator(汇总)
# ↓
# 最终报告
#
# 关键:每个Agent只做自己擅长的事,通过结构化消息传递协作
LangGraph是LangChain团队推出的用于构建有状态的多Agent系统的框架。它的核心思想是将Agent工作流建模为一个有向图(Directed Graph):
状态是LangGraph中最核心的概念。它定义了在整个工作流中需要传递什么信息。
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
import operator
# ============ 定义Agent协作的状态结构 ============
class AgentState(TypedDict):
"""
多Agent系统的共享状态
每个Agent都可以读取和修改这个状态
"""
# 消息历史(用于上下文传递)
# 使用 operator.iconcat 可以将新消息追加到列表,而不是覆盖
messages: Annotated[Sequence[BaseMessage], operator.iconcat]
# 用户原始输入
user_input: str
# 当前任务状态
current_task: str
# 任务分解结果
subtasks: list[str]
# 各Agent的执行结果
analysis_result: dict | None # data_analyst的结果
report_draft: str | None # report_writer的结果
review_result: dict | None # reviewer的结果
# 工作流控制
next_agent: str # 下一步应该执行哪个Agent
iteration_count: int # 迭代次数(用于防止无限循环)
error_messages: list[str] # 错误记录
# 最终输出
final_output: str | None
# AgentState字段说明:
# messages: 保留完整的对话历史,每个Agent的输出都会追加到messages
# subtasks: 由orchestrator分解出的子任务列表
# analysis_result: data_analyst输出的结构化数据
# report_draft: report_writer生成的报告文本
# review_result: reviewer的审核意见
# next_agent: 决定下一步执行哪个Agent(支持条件路由)
# iteration_count: 防止Agent之间无限循环,超过阈值强制终止
# error_messages: 记录执行过程中的错误,便于事后排查
# ============ 节点的定义方式 ============
# 节点就是一个Python函数,接收当前状态,返回更新后的状态
def data_analyst_node(state: AgentState) -> AgentState:
"""
数据分析Agent节点
接收状态,执行业务逻辑,返回更新后的状态
"""
# 读取状态中的信息
user_input = state["user_input"]
current_task = state["current_task"]
# 执行业务逻辑(这里简化,实际会调用LLM或外部API)
analysis = perform_data_analysis(user_input)
# 更新状态
new_state = state.copy()
new_state["analysis_result"] = analysis
new_state["messages"] = state["messages"] + [
AIMessage(content=f"数据分析完成,结果:{analysis}")
]
new_state["next_agent"] = "report_writer" # 下一步去report_writer
return new_state
# ============ 边的定义方式 ============
# 边定义了节点之间的流转关系
# 方式1:固定边(无条件跳转)
# node_a → node_b:node_a执行完后无条件执行node_b
graph.add_edge("node_a", "node_b")
# 方式2:条件边(有条件跳转)
# 根据状态中的某个字段决定下一步去哪
def route_based_on_task(state: AgentState) -> str:
"""根据任务类型决定下一步"""
task = state.get("current_task", "")
if "分析" in task or "数据" in task:
return "data_analyst"
elif "写作" in task or "报告" in task:
return "report_writer"
elif "审核" in task or "检查" in task:
return "reviewer"
else:
return "orchestrator" # 默认回调度员
# 条件边的语法:from node + 条件函数 + to 多个候选节点
graph.add_conditional_edges(
"orchestrator",
route_based_on_task,
{
"data_analyst": "data_analyst",
"report_writer": "report_writer",
"reviewer": "reviewer",
"END": "__end__" # 结束工作流
}
)
"""
销售数据分析报告生成系统
架构:
User → Orchestrator → [DataAnalyst → ReportWriter → Reviewer] → User
↓
(如果Review不通过)
↓
ReportWriter(修订)
↓
Reviewer(复核)
"""
# ============ 完整可运行的代码 ============
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
import operator
# ============ Step 1: 定义状态 ============
class SalesReportState(TypedDict):
"""
销售报告生成系统的状态
"""
# 消息历史(operator.iconcat实现追加而非覆盖)
messages: Annotated[Sequence[BaseMessage], operator.iconcat]
# 用户输入
user_request: str
# 数据分析结果
sales_data: dict | None
# 报告草稿
report_draft: str | None
# 审核结果
review_feedback: str | None
review_passed: bool
# 报告终稿
final_report: str | None
# 迭代控制
revision_count: int # 修订次数
max_revisions: int # 最大修订次数
# ============ Step 2: 定义LLM ============
# 使用Ollama作为后端(也可以换成OpenAI)
llm = ChatOpenAI(
base_url="http://localhost:11434/v1",
api_key="ollama", # Ollama不需要真实API Key
model="qwen2.5-14b-instruct",
temperature=0.7,
max_tokens=4096,
)
# ============ Step 3: 定义各Agent节点 ============
def orchestrator_node(state: SalesReportState) -> SalesReportState:
"""
调度员节点:接收用户请求,分解任务,判断工作流走向
"""
user_request = state["user_request"]
prompt = f"""你是一个销售报告生成系统的调度员。
用户请求:{user_request}
请分析这个请求,判断需要哪些步骤来完成:
1. 是否需要数据分析?(涉及销售数据、增长率、对比等)
2. 是否需要生成报告?(涉及文档写作、结构化输出)
3. 是否需要审核?
请输出一个JSON格式的任务分解:
{{
"needs_analysis": true/false,
"needs_report": true/false,
"priority_order": ["analysis", "report", "review"] 或其他顺序,
"special_requirements": "任何特殊要求"
}}
只输出JSON,不要有其他内容。
"""
response = llm.invoke([HumanMessage(content=prompt)])
# 解析JSON(简化处理,实际需要更健壮的解析)
import json
try:
task_plan = json.loads(response.content)
except:
task_plan = {"needs_analysis": True, "needs_report": True, "priority_order": ["analysis", "report", "review"]}
new_state = state.copy()
new_state["messages"] = state["messages"] + [
AIMessage(content=f"任务已接收,正在分析请求...\n任务计划:{task_plan}")
]
print(f"[Orchestrator] 任务计划: {task_plan}")
return new_state
def data_analyst_node(state: SalesReportState) -> SalesReportState:
"""
数据分析Agent:分析销售数据,生成结构化分析结果
"""
user_request = state["user_request"]
# 构建分析prompt
analysis_prompt = f"""你是一个专业的数据分析师。
用户请求:{user_request}
请基于以下模拟销售数据进行分析(实际使用时替换为真实数据库查询):
销售数据(2024年Q1-Q4):
- Q1: 营收1200万,新客500,老客800,复购率40%
- Q2: 营收1500万,新客700,老客900,复购率44%
- Q3: 营收1350万,新客600,老客850,复购率42%
- Q4: 营收1800万,新客900,老客1050,复购率46%
请输出:
1. 同比/环比增长率分析
2. 客户结构分析(新客vs老客趋势)
3. 复购率变化及原因推断
4. 关键洞察(3-5条)
5. 数据可信度评估
输出格式:结构化Markdown,包含表格和关键数据。
"""
response = llm.invoke([HumanMessage(content=analysis_prompt)])
analysis_result = response.content
new_state = state.copy()
new_state["sales_data"] = {
"analysis_text": analysis_result,
"data_source": "模拟数据(替换为真实查询)",
"generated_at": "2026-05-28"
}
new_state["messages"] = state["messages"] + [
AIMessage(content=f"✅ 数据分析完成\n\n{analysis_result}")
]
print(f"[DataAnalyst] 分析完成")
return new_state
def report_writer_node(state: SalesReportState) -> SalesReportState:
"""
报告写作Agent:根据数据分析结果生成报告草稿
如果有审核反馈,需要修订
"""
analysis_result = state["sales_data"]["analysis_text"] if state["sales_data"] else "无数据"
review_feedback = state.get("review_feedback", "")
# 如果有审核反馈,说明是修订版本
is_revision = bool(review_feedback)
revision_note = f"\n\n【修订要求】\n审核反馈:{review_feedback}\n请根据反馈修订报告。" if is_revision else ""
report_prompt = f"""你是一个专业的商业报告撰写师。
请基于以下数据分析结果,生成一份销售分析报告。
【数据分析结果】
{analysis_result}
{revision_note}
报告要求:
1. 结构清晰:摘要 → 数据概览 → 深度分析 → 建议 → 风险提示
2. 语言专业但易懂,适合管理层阅读
3. 包含具体数字和百分比
4. 每个建议都要有数据支撑
请生成完整的报告。
"""
response = llm.invoke([HumanMessage(content=report_prompt)])
report_draft = response.content
new_state = state.copy()
new_state["report_draft"] = report_draft
new_state["messages"] = state["messages"] + [
AIMessage(content=f"{'📝 报告已修订' if is_revision else '📝 报告草稿已生成'}\n\n{report_draft[:200]}...")
]
print(f"[ReportWriter] {'修订' if is_revision else '生成'}完成")
return new_state
def reviewer_node(state: SalesReportState) -> SalesReportState:
"""
审核Agent:审核报告质量,检查数据准确性和逻辑完整性
"""
report = state["report_draft"]
review_prompt = f"""你是一个严格的报告审核专家。
请审核以下销售分析报告,检查以下方面:
1. 【数据准确性】报告中的数字是否与分析数据一致?
2. 【逻辑完整性】分析是否有逻辑漏洞或矛盾?
3. 【建议可行性】提出的建议是否具体可执行?
4. 【格式规范性】结构是否清晰,格式是否专业?
5. 【风险提示】是否识别了潜在风险?
报告内容:
{report}
请输出审核结果,格式:
{{
"passed": true/false,
"issues": ["问题1", "问题2", ...],
"overall_score": 1-10,
"recommendations": "改进建议"
}}
如果报告质量合格(80分以上),passed设为true。
"""
response = llm.invoke([HumanMessage(content=review_prompt)])
review_text = response.content
# 简单解析passed字段(实际需要更健壮的解析)
passed = "passed\": true" in review_text or "passed" not in review_text
score_text = review_text
new_state = state.copy()
new_state["review_feedback"] = review_text
new_state["review_passed"] = passed
new_state["messages"] = state["messages"] + [
AIMessage(content=f"🔍 审核完成\n\n{review_text[:300]}...")
]
print(f"[Reviewer] 审核{'通过' if passed else '未通过'}")
return new_state
def should_revise(state: SalesReportState) -> str:
"""
条件路由函数:决定是否需要修订报告
"""
# 如果审核通过,结束;如果不通过且还有修订机会,继续修订
if state["review_passed"]:
return "END"
if state["revision_count"] >= state["max_revisions"]:
# 达到最大修订次数,强制结束
return "END"
return "report_writer" # 未通过审核,返回report_writer修订
# ============ Step 4: 构建图 ============
from langgraph.graph import StateGraph, START, END
# 创建图
graph = StateGraph(SalesReportState)
# 注册节点
graph.add_node("orchestrator", orchestrator_node)
graph.add_node("data_analyst", data_analyst_node)
graph.add_node("report_writer", report_writer_node)
graph.add_node("reviewer", reviewer_node)
# 添加边
graph.add_edge(START, "orchestrator")
graph.add_edge("orchestrator", "data_analyst")
graph.add_edge("data_analyst", "report_writer")
graph.add_edge("report_writer", "reviewer")
# 条件边:reviewer之后根据审核结果决定下一步
graph.add_conditional_edges(
"reviewer",
should_revise,
{
"report_writer": "report_writer", # 不通过,回去修订
"END": END # 通过,结束
}
)
# 编译图
compiled_graph = graph.compile()
# ============ Step 5: 运行工作流 ============
def run_sales_report(user_request: str) -> str:
"""
运行销售报告生成工作流
"""
initial_state: SalesReportState = {
"messages": [HumanMessage(content=user_request)],
"user_request": user_request,
"sales_data": None,
"report_draft": None,
"review_feedback": None,
"review_passed": False,
"final_report": None,
"revision_count": 0,
"max_revisions": 3
}
print(f"\n{'='*60}")
print(f"开始执行工作流...")
print(f"用户请求:{user_request}")
print(f"{'='*60}\n")
# 执行工作流(会打印中间状态)
final_state = compiled_graph.invoke(initial_state)
print(f"\n{'='*60}")
print(f"工作流执行完成")
print(f"修订次数:{final_state['revision_count']}")
print(f"审核通过:{final_state['review_passed']}")
print(f"{'='*60}\n")
return final_state["report_draft"]
# ============ 执行示例 ============
if __name__ == "__main__":
user_request = "帮我分析2024年各季度销售数据,生成一份给CEO看的季度报告,包含增长趋势、客户分析和下季度建议"
final_report = run_sales_report(user_request)
print("\n" + "="*60)
print("最终报告:")
print("="*60)
print(final_report)
# 模式1:线性流程(最简单)
# A → B → C → D,单向执行,无需回头
#
# 使用场景:固定流程,如:数据导入 → 数据清洗 → 分析 → 报告
# 代码示例:
graph.add_edge(START, "data_import")
graph.add_edge("data_import", "data_clean")
graph.add_edge("data_clean", "analysis")
graph.add_edge("analysis", "report")
graph.add_edge("report", END)
# 模式2:条件循环(带反馈)
# A → B → C → [条件判断] → 通过?→ END : 返回B
#
# 使用场景:需要迭代优化,如:生成 → 审核 → 不通过则修订 → 重新审核
# 代码示例:
def route_after_review(state):
if state["approved"]:
return END
elif state["iteration"] >= state["max_iterations"]:
return "escalation" # 升级处理
else:
return "generator" # 继续修订
graph.add_conditional_edges(
"reviewer",
route_after_review,
{
END: END,
"escalation": "escalation",
"generator": "generator"
}
)
# 模式3:并行分发 + 汇总(最复杂但最强大)
# → Branch_A →
# START → Router ─────────────────→ Aggregator → END
# → Branch_B →
# → Branch_C →
#
# 使用场景:多维度分析后需要汇总,如:同时分析销售/市场/运营三方数据后汇总
from langgraph.constants import Send
def route_to_branches(state) -> list:
"""并行分发到多个分支"""
return [
Send("branch_a", state),
Send("branch_b", state),
Send("branch_c", state)
]
graph.add_conditional_edges(
"router",
route_to_branches,
["branch_a", "branch_b", "branch_c"]
)
def aggregator_node(state) -> SalesReportState:
"""汇总各分支结果"""
a_result = state["branch_a_result"]
b_result = state["branch_b_result"]
c_result = state["branch_c_result"]
summary = f"""
综合分析报告:
【销售分析】{a_result}
【市场分析】{b_result}
【运营分析】{c_result}
【综合结论】
...
"""
return {"messages": state["messages"] + [AIMessage(content=summary)]}
# 良好设计的消息格式应该包含:
# 1. 明确的角色标识(谁在说)
# 2. 清晰的内容类型(说什么)
# 3. 结构化的数据(便于解析)
from enum import Enum
from dataclasses import dataclass
from typing import Optional
class MessageType(Enum):
"""消息类型枚举"""
USER_INPUT = "user_input"
AGENT_OUTPUT = "agent_output"
SYSTEM_INSTRUCTION = "system_instruction"
ERROR_REPORT = "error_report"
HANDOFF = "handoff" # 交接给其他Agent
class ContentFormat(Enum):
"""内容格式枚举"""
TEXT = "text"
JSON = "json"
MARKDOWN = "markdown"
TABLE = "table"
CODE = "code"
@dataclass
class AgentMessage:
"""
标准化的Agent间通信消息格式
"""
sender: str # 发送者Agent名称
receiver: str # 接收者Agent名称("*"表示广播)
message_type: MessageType
content_format: ContentFormat
content: str # 主内容
metadata: dict # 元数据(如置信度、时间戳等)
def to_langchain_message(self) -> AIMessage:
"""转换为LangChain消息格式"""
full_content = f"""【来自】{self.sender}
【类型】{self.message_type.value}
【格式】{self.content_format.value}
【内容】
{self.content}"""
return AIMessage(content=full_content)
# 使用示例
msg = AgentMessage(
sender="data_analyst",
receiver="report_writer",
message_type=MessageType.AGENT_OUTPUT,
content_format=ContentFormat.JSON,
content='{"revenue_growth": 0.25, "new_customers": 2700}',
metadata={"confidence": 0.95, "data_source": "ERP系统"}
)
# 在Agent中传递
def data_analyst_node(state: AgentState) -> AgentState:
# ... 执行分析 ...
msg = AgentMessage(
sender="data_analyst",
receiver="report_writer",
message_type=MessageType.AGENT_OUTPUT,
content_format=ContentFormat.JSON,
content=json.dumps(analysis_result),
metadata={"confidence": 0.92}
)
new_state = state.copy()
new_state["messages"] = state["messages"] + [msg.to_langchain_message()]
return new_state
# ============ 错误处理节点示例 ============
def error_handler_node(state: AgentState) -> AgentState:
"""
全局错误处理节点
捕获各Agent抛出的异常,记录到状态中,并决定如何恢复
"""
error_messages = state.get("error_messages", [])
current_task = state.get("current_task", "unknown")
# 构建错误恢复prompt
recovery_prompt = f"""发生错误,请分析原因并提出恢复方案。
当前任务:{current_task}
错误历史:
{chr(10).join(error_messages[-3:])} # 只保留最近3条错误
请决定:
1. 是否应该重试?(如果错误是临时性的,如网络超时)
2. 是否应该跳过当前步骤?(如果错误不影响主流程)
3. 是否应该升级人工处理?(如果错误无法自动恢复)
4. 是否应该终止工作流?(如果错误导致系统不可用)
输出格式:
{{
"action": "retry" | "skip" | "escalate" | "abort",
"reason": "原因说明",
"next_step": "建议的下一步"
}}
"""
response = llm.invoke([HumanMessage(content=recovery_prompt)])
try:
decision = json.loads(response.content)
except:
decision = {"action": "escalate", "reason": "无法解析LLM响应", "next_step": "人工介入"}
new_state = state.copy()
new_state["messages"] = state["messages"] + [
AIMessage(content=f"⚠️ 错误处理决策:{decision}")
]
return new_state
# 在图中添加错误处理边
def route_on_error(state: AgentState) -> str:
"""错误后的路由决策"""
error_count = len(state.get("error_messages", []))
if error_count >= 3:
# 连续3个错误,升级处理
return "escalation"
elif error_count > 0:
# 有错误但不多,尝试恢复
return "error_handler"
else:
# 没有错误,正常流程
return "normal"
# 在每个Agent节点添加错误捕获
def wrapped_data_analyst_node(state: AgentState) -> AgentState:
"""
包装后的data_analyst_node,自动捕获错误
"""
try:
return data_analyst_node(state)
except Exception as e:
error_msg = f"[data_analyst] 错误:{str(e)}"
print(f"❌ {error_msg}")
new_state = state.copy()
new_state["error_messages"] = state.get("error_messages", []) + [error_msg]
new_state["messages"] = state["messages"] + [
AIMessage(content=f"❌ {error_msg}")
]
return new_state
# 用包装后的节点替换原节点
graph.add_node("data_analyst", wrapped_data_analyst_node)
# 检查点机制允许在某个状态保存工作流,稍后从该状态恢复
# 适用于:1)人工审核后恢复执行 2)系统崩溃后从断点恢复 3)A/B测试不同策略
from langgraph.checkpoint.memory import MemorySaver
# 创建内存检查点存储(生产环境应使用持久化存储)
checkpointer = MemorySaver()
# 编译图时添加检查点
compiled_graph = graph.compile(
checkpointer=checkpointer,
interrupt_before=["reviewer"], # 在reviewer节点前中断,等待人工确认
)
# ============ 保存和恢复检查点的示例 ============
def save_checkpoint(graph, state, thread_id: str):
"""保存检查点"""
config = {"configurable": {"thread_id": thread_id}}
graph.update_state(config, state)
print(f"✅ 检查点已保存:thread_id={thread_id}")
def resume_from_checkpoint(graph, thread_id: str):
"""从检查点恢复并继续执行"""
config = {"configurable": {"thread_id": thread_id}}
# 从检查点恢复执行(会从interrupt_before中断点继续)
result = graph.invoke(None, config=config) # None表示使用保存的状态
return result
# ============ 人工介入工作流示例 ============
# Step 1: 运行到reviewer前中断
initial_state = {"user_request": "分析Q1销售数据", ...}
config = {"configurable": {"thread_id": "session-001"}}
# invoke会在reviewer之前中断(因为我们设置了interrupt_before=["reviewer"])
result = compiled_graph.invoke(initial_state, config=config)
# 此时系统暂停,等待人工确认
print("⏸️ 工作流已暂停,等待人工审核...")
# Step 2: 人工审核(通过API或其他方式)
# 假设人工确认报告需要修改
human_feedback = {
"approved": False,
"feedback": "第三部分的数据单位有误,应该是'万元'而不是'亿元',请修订"
}
# Step 3: 将人工反馈写入状态,然后恢复执行
new_state = {
"review_passed": False,
"review_feedback": human_feedback["feedback"],
"human_override": True
}
compiled_graph.update_state(config, new_state)
# Step 4: 恢复执行
# 继续从中断点执行,会进入reviewer → 根据状态决定继续修订
final_result = compiled_graph.invoke(None, config=config)
print("✅ 工作流完成")
# 问题:工作流执行后,不知道中间状态发生了什么
# 解决方案:添加状态追踪装饰器
from functools import wraps
def trace_state_changes(func):
"""追踪状态变化的装饰器"""
@wraps(func)
def wrapper(state):
input_keys = list(state.keys())
print(f"\n📥 [{func.__name__}] 输入状态 keys: {input_keys}")
result = func(state)
output_keys = list(result.keys())
changed_keys = [k for k in output_keys if state.get(k) != result.get(k)]
print(f"📤 [{func.__name__}] 输出状态 变更字段: {changed_keys}")
# 打印messages数量变化
old_msg_count = len(state.get("messages", []))
new_msg_count = len(result.get("messages", []))
print(f" messages: {old_msg_count} → {new_msg_count}")
return result
return wrapper
# 应用到所有节点
wrapped_orchestrator = trace_state_changes(orchestrator_node)
wrapped_data_analyst = trace_state_changes(data_analyst_node)
wrapped_report_writer = trace_state_changes(report_writer_node)
wrapped_reviewer = trace_state_changes(reviewer_node)
# 注册包装后的节点
graph.add_node("orchestrator", wrapped_orchestrator)
graph.add_node("data_analyst", wrapped_data_analyst)
graph.add_node("report_writer", wrapped_reviewer) # 注意:reivewer注册为wrapped_reviewer
# ...
# 错误1:State被后续节点覆盖
# 问题:某个节点更新了状态,但后续节点读取时发现没有更新
# 原因:没有使用 Annotated[list, operator.iconcat],导致list被覆盖而不是追加
# 修复:
# ❌ 错误写法
messages: list[BaseMessage] # 这样messages会被覆盖
# ✅ 正确写法
messages: Annotated[list[BaseMessage], operator.iconcat]
# operator.iconcat 会将新消息追加到列表末尾,而不是覆盖
# 错误2:循环依赖导致无限循环
# 问题:两个Agent互相调用,无法结束
# 原因:没有设置iteration_count上限,没有退出条件
# 修复:
def route_after_agent_b(state):
iteration = state.get("iteration_count", 0) + 1
if iteration >= 5: # 最多循环5次
return "escalation"
if some_exit_condition(state):
return END
return "other_agent"
# 在状态更新时增加计数
new_state = state.copy()
new_state["iteration_count"] = state.get("iteration_count", 0) + 1
# 错误3:条件边返回了不存在的节点名
# 问题:add_conditional_edges 返回的节点名拼写错误
# 修复:
# 使用枚举或常量定义所有合法的节点名称
VALID_NODES = {"orchestrator", "data_analyst", "report_writer", "reviewer", END}
def validate_route(route: str):
if route not in VALID_NODES:
raise ValueError(f"无效的路由目标:{route},有效值:{VALID_NODES}")
return route
# 错误4:LLM输出格式不符合预期,导致JSON解析失败
# 问题:llm输出的JSON包含额外的前缀文字,如"以下是JSON:{...}"
# 修复:使用更严格的Prompt,并添加解析容错
def robust_json_parse(text: str) -> dict:
"""健壮的JSON解析,处理LLM输出的各种格式问题"""
import json
import re
# 方法1:直接尝试
try:
return json.loads(text)
except:
pass
# 方法2:去除markdown代码块
text = re.sub(r'```json\s*', '', text)
text = re.sub(r'```\s*', '', text)
try:
return json.loads(text)
except:
pass
# 方法3:提取第一个 {...} 或 [...]
match = re.search(r'\{[^{}]*\}', text)
if match:
try:
return json.loads(match.group())
except:
pass
# 方法4:提取第一个数组
match = re.search(r'\[[^\[\]]*\]', text)
if match:
try:
return json.loads(match.group())
except:
pass
# 所有方法都失败,返回默认空值
return {}
# config.py - 集中管理所有配置
import os
from dataclasses import dataclass
@dataclass
class AgentConfig:
"""Agent系统配置"""
# LLM配置
llm_base_url: str = os.getenv("LLM_BASE_URL", "http://localhost:11434/v1")
llm_api_key: str = os.getenv("LLM_API_KEY", "ollama")
llm_model: str = os.getenv("LLM_MODEL", "qwen2.5-14b-instruct")
llm_temperature: float = float(os.getenv("LLM_TEMPERATURE", "0.7"))
llm_max_tokens: int = int(os.getenv("LLM_MAX_TOKENS", "4096"))
# 工作流配置
max_iterations: int = int(os.getenv("MAX_ITERATIONS", "10"))
timeout_seconds: int = int(os.getenv("TIMEOUT_SECONDS", "300"))
# 检查点配置
checkpoint_enabled: bool = os.getenv("CHECKPOINT_ENABLED", "true").lower() == "true"
checkpoint_dir: str = os.getenv("CHECKPOINT_DIR", "/data/checkpoints")
# 错误处理
max_retry: int = int(os.getenv("MAX_RETRY", "3"))
escalation_threshold: int = int(os.getenv("ESCALATION_THRESHOLD", "3"))
config = AgentConfig()
# 在初始化LLM时使用配置
llm = ChatOpenAI(
base_url=config.llm_base_url,
api_key=config.llm_api_key,
model=config.llm_model,
temperature=config.llm_temperature,
max_tokens=config.llm_max_tokens,
)
# 需要监控的关键指标:
# 1. 工作流执行时间
import time
from functools import wraps
def monitor_execution_time(func):
@wraps(func)
def wrapper(state):
start = time.time()
result = func(state)
elapsed = time.time() - start
print(f"⏱️ [{func.__name__}] 执行耗时: {elapsed:.2f}s")
# 将耗时记录到状态中(便于后续分析)
result["execution_times"] = state.get("execution_times", {})
result["execution_times"][func.__name__] = elapsed
return result
return wrapper
# 2. Token消耗统计
# 使用LangSmith或其他追踪工具记录每次LLM调用的token消耗
# 3. 错误率统计
# 在状态中添加error_tracking
def track_errors(state):
error_count = len(state.get("error_messages", []))
total_iterations = state.get("iteration_count", 1)
error_rate = error_count / total_iterations
metrics = {
"error_count": error_count,
"iteration_count": total_iterations,
"error_rate": error_rate,
"success": error_count == 0
}
print(f"📊 指标: {metrics}")
return metrics
LangGraph是构建复杂多Agent系统的强大工具,本指南涵盖了从概念到实现的完整知识。核心要点回顾:
多Agent系统不是银弹,建议从简单场景入手,逐步增加复杂度。