Module-2 小结和复习:核心架构模式掌握指南
来自图灵奖获得者的寄语
恭喜你完成了 Module-2 的学习!你已经掌握了 LangGraph 的六大核心架构模式。这些模式是构建生产级 AI Agent 的基石。正如计算机科学中的设计模式一样,理解何时使用哪种模式比记住语法更重要。本复习文档通过 15 个深度问题,帮助你从"知道"进化到"精通"。
记住:伟大的系统架构师不是记住了所有模式,而是能够在正确的场景选择正确的模式。
📋 本章核心知识回顾
学习地图
Module-2: 核心架构模式
├─ 2.1 Simple Graph (基础图)
│ └─ 状态、节点、边的基本概念
├─ 2.2 Chain (链式架构)
│ └─ 消息系统、工具绑定、Reducer
├─ 2.3 Router (路由架构)
│ └─ 条件边、动态路由、工具调用
├─ 2.4 Agent (智能体架构)
│ └─ ReAct 循环、多步推理、工具执行
├─ 2.5 Agent Memory (记忆管理)
│ └─ Checkpointer、Thread、状态持久化
└─ 2.6 Deployment (生产部署)
└─ LangGraph Cloud、Studio、SDK六大模式速查表
| 模式 | 核心特征 | 适用场景 | 关键技术 |
|---|---|---|---|
| Simple Graph | 3-5 节点,条件分支 | 学习基础、简单流程 | TypedDict, Literal |
| Chain | 线性流程,工具绑定 | 单次工具调用 | add_messages, bind_tools |
| Router | LLM 决策路由 | 智能分类、动态选择 | tools_condition, ToolNode |
| Agent | ReAct 循环推理 | 多步任务、复杂推理 | 循环边、工具反馈 |
| Agent Memory | 状态持久化 | 多轮对话、上下文连续 | Checkpointer, Thread |
| Deployment | 生产环境部署 | 真实用户服务 | LangGraph Cloud, SDK |
📚 术语表
| 术语名称 | LangGraph 定义和解读 | Python 定义和说明 | 重要程度 |
|---|---|---|---|
| StateGraph | 核心图构建器,管理有状态工作流的创建和执行 | 接受 State 类型参数的泛型类,提供节点和边的添加方法 | ⭐⭐⭐⭐⭐ |
| MessagesState | 专为对话设计的预定义状态类 | 包含 messages 字段并自动使用 add_messages reducer | ⭐⭐⭐⭐⭐ |
| add_messages | 智能消息合并 reducer,支持追加和去重 | 接收旧消息列表和新消息,返回合并后的列表 | ⭐⭐⭐⭐⭐ |
| Reducer | 控制状态更新方式的函数,实现非覆盖式合并 | 函数签名为 (old_value, new_value) -> merged_value | ⭐⭐⭐⭐⭐ |
| Conditional Edge | 根据状态动态选择下一节点的智能路由边 | 通过 add_conditional_edges 添加,需提供返回节点名的函数 | ⭐⭐⭐⭐⭐ |
| ToolNode | 自动执行工具调用的预构建节点 | 解析 AIMessage.tool_calls,调用函数,返回 ToolMessage | ⭐⭐⭐⭐⭐ |
| Checkpointer | 状态持久化机制,在节点执行后自动保存状态 | 接口类,MemorySaver/PostgresSaver 是其实现 | ⭐⭐⭐⭐⭐ |
| Thread | 独立的对话会话容器,通过 thread_id 隔离状态 | 每个 thread 维护独立的 checkpoint 历史 | ⭐⭐⭐⭐⭐ |
| Agent | 支持多步推理和工具调用的智能体架构 | 通过循环边实现 ReAct 模式(推理-行动-观察) | ⭐⭐⭐⭐⭐ |
| ReAct | Reasoning + Acting,Agent 的经典实现模式 | LLM 基于工具结果持续推理直到得出最终答案 | ⭐⭐⭐⭐⭐ |
| LangGraph Cloud | 托管部署服务,从 GitHub 自动部署到生产环境 | 提供唯一 URL、监控追踪、环境变量管理 | ⭐⭐⭐⭐⭐ |
| LangGraph SDK | Python 客户端库,用于程序化调用部署的图 | 提供异步 API 访问 assistants、threads、runs | ⭐⭐⭐⭐⭐ |
🎯 复习题目列表
本复习包含 15 个渐进式问题,覆盖知识理解、代码实现、架构设计三个层次:
基础理解题(1-5)
- LangGraph 状态管理的核心机制是什么?
- Router 和 Agent 的本质区别是什么?
- 为什么需要 add_messages Reducer?
- Checkpointer 的工作原理是什么?
- 条件边(Conditional Edge)如何决定路由?
代码实现题(6-10) 6. 如何实现一个带多个工具的 Router? 7. 如何让 Agent 避免无限循环? 8. 如何实现跨会话的对话记忆? 9. 如何在本地测试后部署到云端? 10. 如何处理工具调用失败的情况?
架构设计题(11-15) 11. 设计一个客服机器人:应该选择哪种架构模式? 12. 如何优化 Agent 的 Token 使用? 13. 如何设计多用户并发的 Agent 系统? 14. Simple Graph vs Chain vs Agent:如何选择? 15. 如何设计可扩展的工具系统?
📚 详细问答解析
问题 1: LangGraph 状态管理的核心机制是什么?
💡 点击查看答案
答案:
LangGraph 的状态管理基于 TypedDict + Annotated + Reducer 的三层架构。
1. TypedDict:定义状态结构
from typing_extensions import TypedDict
class State(TypedDict):
messages: list
count: int
user_id: str作用:
- 提供类型提示和 IDE 支持
- 定义状态的"骨架"
- 不强制运行时验证(与 Pydantic 不同)
2. Annotated:附加元数据
from typing import Annotated
from langgraph.graph.message import add_messages
class State(TypedDict):
messages: Annotated[list, add_messages]
# ^^^^^^^^ ^^^^^^^^^^^^^
# 基础类型 Reducer 函数
count: int # 没有 Annotated = 默认覆盖行为作用:
- 为字段添加额外信息(Reducer)
- 不影响类型检查
- LangGraph 读取元数据来决定状态更新方式
3. Reducer:控制更新逻辑
默认行为(无 Reducer):
# 初始状态
state = {"count": 5}
# 节点返回
node_output = {"count": 10}
# 结果:覆盖
new_state = {"count": 10} # 5 被替换使用 Reducer(add_messages):
# 初始状态
state = {"messages": [msg1, msg2]}
# 节点返回
node_output = {"messages": [msg3]}
# 结果:追加
new_state = {"messages": [msg1, msg2, msg3]}4. add_messages 的智能特性
from langgraph.graph.message import add_messages
# 特性 1:追加新消息
state = {"messages": [HumanMessage("Hi")]}
update = {"messages": [AIMessage("Hello")]}
# 结果:[HumanMessage("Hi"), AIMessage("Hello")]
# 特性 2:基于 ID 更新(去重)
state = {"messages": [AIMessage("Thinking...", id="msg1")]}
update = {"messages": [AIMessage("Answer is 42", id="msg1")]}
# 结果:[AIMessage("Answer is 42", id="msg1")] ← 替换而非追加5. 自定义 Reducer
import operator
class State(TypedDict):
# 数字累加
score: Annotated[int, operator.add]
# 列表合并
items: Annotated[list, operator.add]
# 自定义:只保留最近 N 条
recent_logs: Annotated[list, lambda old, new: (old + new)[-10:]]执行效果:
# score 使用 operator.add
state = {"score": 5}
update = {"score": 3}
# 结果:{"score": 8} ← 5 + 3
# items 使用 operator.add(列表拼接)
state = {"items": [1, 2]}
update = {"items": [3, 4]}
# 结果:{"items": [1, 2, 3, 4]}
# recent_logs 使用自定义 lambda
state = {"recent_logs": [1,2,3,4,5,6,7,8,9,10]}
update = {"recent_logs": [11, 12]}
# 结果:{"recent_logs": [3,4,5,6,7,8,9,10,11,12]} ← 只保留最后 10 个6. 核心原理总结
# 状态更新流程
初始状态 → 节点执行 → 返回更新 → Reducer 合并 → 新状态
# 伪代码
def update_state(old_state, node_output):
new_state = {}
for key, value in node_output.items():
if has_reducer(key):
new_state[key] = reducer(old_state[key], value)
else:
new_state[key] = value # 直接覆盖
return {**old_state, **new_state}7. 最佳实践
# ✅ 好的设计
class ChatState(TypedDict):
messages: Annotated[list, add_messages] # 对话历史:追加
user_id: str # 用户标识:覆盖
session_count: Annotated[int, operator.add] # 会话计数:累加
# ❌ 不好的设计
class BadState(TypedDict):
messages: list # 没有 Reducer,每次覆盖 → 丢失历史
all_data: dict # 太宽泛,难以维护关键要点
- TypedDict:定义"什么"(数据结构)
- Annotated:定义"如何"(更新方式)
- Reducer:实现"行为"(合并逻辑)
- add_messages:专为对话设计的智能 Reducer
问题 2: Router 和 Agent 的本质区别是什么?
💡 点击查看答案
答案:
Router 和 Agent 的本质区别在于 工具结果的流向 和 决策次数。
1. 架构流程对比
Router 架构:
用户输入 → LLM 决策 → 工具执行 → 直接返回用户
↓
调用一次工具
做一次决策Agent 架构:
用户输入 → LLM 决策 → 工具执行 → 返回 LLM → 继续决策 → ...
↑______________________|
循环反馈
可能调用多次工具
做多次决策2. 代码结构对比
Router 的图结构:
from langgraph.prebuilt import tools_condition, ToolNode
builder = StateGraph(MessagesState)
builder.add_node("llm", call_llm)
builder.add_node("tools", ToolNode([multiply]))
builder.add_edge(START, "llm")
builder.add_conditional_edges("llm", tools_condition)
builder.add_edge("tools", END) # ← 关键:工具后直接结束
graph = builder.compile()执行流程:
START → llm → [有工具调用] → tools → END
[无工具调用] → ENDAgent 的图结构:
builder = StateGraph(MessagesState)
builder.add_node("agent", call_llm)
builder.add_node("tools", ToolNode([multiply]))
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", tools_condition)
builder.add_edge("tools", "agent") # ← 关键:工具后回到 agent,形成循环
graph = builder.compile()执行流程:
START → agent → [有工具调用] → tools → agent → [继续判断] → ...
[无工具调用] → END3. 实际案例对比
场景: 计算 (3 + 4) × 2 ÷ 5
Router 的执行:
# 第 1 次调用图
输入: "Calculate (3 + 4) × 2 ÷ 5"
LLM: 调用 add(3, 4)
工具: 返回 7
输出: ToolMessage(content="7")
# 结束 ❌ 无法继续计算
# 需要手动第 2 次调用
输入: "Now multiply 7 by 2"
LLM: 调用 multiply(7, 2)
工具: 返回 14
输出: ToolMessage(content="14")
# 结束 ❌ 还是无法完成Agent 的执行:
输入: "Calculate (3 + 4) × 2 ÷ 5"
# 循环 1
LLM: "需要先计算 3 + 4" → 调用 add(3, 4)
工具: 返回 7
→ 回到 LLM
# 循环 2
LLM: "得到 7,现在乘以 2" → 调用 multiply(7, 2)
工具: 返回 14
→ 回到 LLM
# 循环 3
LLM: "得到 14,现在除以 5" → 调用 divide(14, 5)
工具: 返回 2.8
→ 回到 LLM
# 循环 4
LLM: "所有计算完成,答案是 2.8"
输出: AIMessage(content="The answer is 2.8")
→ 结束 ✅4. 工具结果的流向
Router:
def tools_node(state):
# 执行工具
result = tool.invoke(...)
return {"messages": [ToolMessage(result)]}
# ↓
# 流向:直接到 END,用户可见Agent:
def tools_node(state):
# 执行工具
result = tool.invoke(...)
return {"messages": [ToolMessage(result)]}
# ↓
# 流向:回到 agent 节点,LLM 分析结果5. 决策次数对比
Router:单次决策
用户问题 → [LLM 思考 1 次] → 调用工具/直接回答 → 结束Agent:多次决策
用户问题 → [LLM 思考 1] → 工具 → [LLM 思考 2] → 工具 → ... → [LLM 思考 N] → 回答6. 适用场景对比
| 场景 | 推荐架构 | 原因 |
|---|---|---|
| 简单问答("今天天气") | Router | 单次查询即可 |
| 信息检索("搜索论文") | Router | 一次搜索返回 |
| 数学计算(多步骤) | Agent | 需要基于中间结果推理 |
| 研究助手(搜索→阅读→总结) | Agent | 需要多步骤协作 |
| 客服(查订单→回答) | Router | 单次工具调用 |
| 数据分析(查询→计算→可视化) | Agent | 需要连续操作 |
7. 代码对比总结
唯一的区别:一条边
# Router
builder.add_edge("tools", END) # 工具 → 结束
# Agent
builder.add_edge("tools", "agent") # 工具 → 回到 agent(循环)这一条边的差异,导致了:
- ✅ Router:简单、快速、可预测
- ✅ Agent:灵活、强大、智能
- ❌ Router:无法多步推理
- ❌ Agent:可能循环、成本高
关键要点
- Router = 单次决策 + 工具直达用户
- Agent = 多次决策 + 工具反馈 LLM
- 区别在于:是否有从 tools 到 agent 的边
- 选择标准:任务是否需要多步推理
问题 3: 为什么需要 add_messages Reducer?
💡 点击查看答案
答案:
add_messages 解决了对话系统中 状态覆盖导致历史丢失 的核心问题。
1. 问题演示:没有 Reducer 会发生什么
# 错误的状态定义(没有 Reducer)
class State(TypedDict):
messages: list # 默认行为:覆盖
# 初始状态
state = {"messages": [HumanMessage("Hi")]}
# 节点 1 执行
def node1(state):
return {"messages": [AIMessage("Hello!")]}
# 更新后的状态
# ❌ 预期:[HumanMessage("Hi"), AIMessage("Hello!")]
# ✅ 实际:[AIMessage("Hello!")] ← HumanMessage 丢失了!问题分析:
- LangGraph 默认使用 字典更新语义(
dict.update()) - 相同 key 的值会被完全替换,而不是合并
- 对话历史被覆盖 → LLM 失去上下文 → 无法理解指代词
2. 真实场景的灾难
# 第一轮对话
user: "我的订单号是 12345"
agent: "好的,我看到您的订单 12345"
# 第二轮对话(没有 Reducer)
state = {"messages": [AIMessage("好的,我看到您的订单 12345")]}
# ❌ 用户的消息丢失了!
user: "那个订单发货了吗?"
agent: "请问您的订单号是多少?" ← 忘记了刚才说的 123453. add_messages 的作用
from langgraph.graph.message import add_messages
from typing import Annotated
class State(TypedDict):
messages: Annotated[list, add_messages]
# ^^^^ ^^^^^^^^^^^^^
# 类型 Reducer 函数执行效果:
# 初始状态
state = {"messages": [HumanMessage("Hi")]}
# 节点返回
update = {"messages": [AIMessage("Hello!")]}
# add_messages 自动合并
new_state = {"messages": [
HumanMessage("Hi"), ← 保留
AIMessage("Hello!") ← 追加
]}4. add_messages 的智能特性
特性 1:自动追加
messages = [msg1, msg2]
add_messages(messages, [msg3])
# 结果:[msg1, msg2, msg3]特性 2:基于 ID 更新(去重)
messages = [AIMessage("Loading...", id="response-1")]
add_messages(messages, [AIMessage("Done!", id="response-1")])
# 结果:[AIMessage("Done!", id="response-1")] ← 替换而非重复特性 3:支持多种输入格式
# 单个消息
add_messages(messages, AIMessage("Hi"))
# 消息列表
add_messages(messages, [msg1, msg2])
# 混合类型
add_messages(messages, [HumanMessage("Q"), AIMessage("A")])5. 没有 Reducer 的后果对比
场景: 三轮对话
没有 add_messages:
# 轮次 1
state = {"messages": [HumanMessage("Hi")]}
update = {"messages": [AIMessage("Hello")]}
# 状态:[AIMessage("Hello")] ❌ Hi 丢失
# 轮次 2
state = {"messages": [AIMessage("Hello")]}
update = {"messages": [HumanMessage("How are you?")]}
# 状态:[HumanMessage("How are you?")] ❌ Hello 丢失
# 轮次 3
state = {"messages": [HumanMessage("How are you?")]}
update = {"messages": [AIMessage("I'm good")]}
# 状态:[AIMessage("I'm good")] ❌ 问题丢失
# LLM 只能看到最后一条消息,无法理解上下文!有 add_messages:
# 轮次 1
state = {"messages": [HumanMessage("Hi")]}
update = {"messages": [AIMessage("Hello")]}
# 状态:[HumanMessage("Hi"), AIMessage("Hello")] ✅
# 轮次 2
state = {"messages": [HumanMessage("Hi"), AIMessage("Hello")]}
update = {"messages": [HumanMessage("How are you?")]}
# 状态:[HumanMessage("Hi"), AIMessage("Hello"), HumanMessage("How are you?")] ✅
# 轮次 3
state = {"messages": [..., HumanMessage("How are you?")]}
update = {"messages": [AIMessage("I'm good")]}
# 状态:[..., HumanMessage("How are you?"), AIMessage("I'm good")] ✅
# LLM 可以看到完整对话历史!6. add_messages 的内部实现(简化版)
def add_messages(existing: list, new: list | BaseMessage) -> list:
"""智能合并消息列表"""
# 1. 标准化输入
if not isinstance(new, list):
new = [new]
# 2. 构建 ID 映射
id_map = {msg.id: i for i, msg in enumerate(existing) if msg.id}
# 3. 合并逻辑
result = existing.copy()
for msg in new:
if msg.id and msg.id in id_map:
# 有 ID 且已存在 → 更新
result[id_map[msg.id]] = msg
else:
# 无 ID 或不存在 → 追加
result.append(msg)
return result7. 其他常用 Reducer
import operator
class State(TypedDict):
# 数字累加
total_cost: Annotated[float, operator.add]
# 列表拼接
search_results: Annotated[list, operator.add]
# 集合合并
visited_pages: Annotated[set, operator.or_]
# 自定义:保留最大值
max_score: Annotated[float, max]8. 何时不需要 Reducer
class State(TypedDict):
messages: Annotated[list, add_messages] # 需要:对话历史
user_id: str # 不需要:固定标识
current_page: int # 不需要:当前值覆盖即可
is_authenticated: bool # 不需要:状态标记原则:
- 需要累积的数据 → 使用 Reducer
- 需要替换的数据 → 不使用 Reducer
关键要点
- 问题: 默认字典更新会覆盖整个列表
- 后果: 对话历史丢失,LLM 失去上下文
- 解决: add_messages 智能追加/更新消息
- 本质: 将"覆盖语义"改为"合并语义"
问题 4: Checkpointer 的工作原理是什么?
💡 点击查看答案
答案:
Checkpointer 是 LangGraph 的 自动快照系统,在每个节点执行后保存状态,实现多轮对话和时间旅行。
1. 核心概念
Checkpoint(检查点): 图执行过程中某个时刻的完整状态快照
checkpoint = {
"state": {"messages": [msg1, msg2]}, # 状态数据
"metadata": {
"node": "agent", # 当前节点
"step": 2, # 执行步数
"timestamp": "2024-03-15T10:30:00"
},
"parent_id": "checkpoint-1", # 父检查点 ID
"id": "checkpoint-2" # 当前检查点 ID
}2. 工作流程
没有 Checkpointer:
graph = builder.compile() # 无状态持久化
# 第一次调用
result1 = graph.invoke({"messages": [msg1]})
# 状态:存在于内存,调用结束后丢失
# 第二次调用
result2 = graph.invoke({"messages": [msg2]})
# 状态:全新开始,不记得 msg1有 Checkpointer:
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
# 第一次调用
config = {"configurable": {"thread_id": "user-123"}}
result1 = graph.invoke({"messages": [msg1]}, config)
# ✅ 状态自动保存到 checkpointer
# 第二次调用(同一 thread_id)
result2 = graph.invoke({"messages": [msg2]}, config)
# ✅ 自动加载之前的状态,msg1 还在!3. 保存时机
Checkpointer 在 每个节点执行后 自动保存:
graph = StateGraph(State)
graph.add_node("node1", node1_func)
graph.add_node("node2", node2_func)
graph.add_edge(START, "node1")
graph.add_edge("node1", "node2")
graph.add_edge("node2", END)
graph_with_memory = graph.compile(checkpointer=memory)执行流程:
START
↓
node1 执行
↓
💾 Checkpoint 1: {"state": {...}, "node": "node1", "step": 1}
↓
node2 执行
↓
💾 Checkpoint 2: {"state": {...}, "node": "node2", "step": 2}
↓
END4. Thread:状态容器
Thread(线程) 是隔离不同会话状态的标识符。
# 用户 A 的对话
config_a = {"configurable": {"thread_id": "user-a"}}
graph.invoke(input, config_a)
# 保存到 thread_id="user-a"
# 用户 B 的对话(完全独立)
config_b = {"configurable": {"thread_id": "user-b"}}
graph.invoke(input, config_b)
# 保存到 thread_id="user-b"Thread 结构:
Checkpointer(全局)
├─ Thread: user-a
│ ├─ Checkpoint 1 (step 1)
│ ├─ Checkpoint 2 (step 2)
│ └─ Checkpoint 3 (step 3)
├─ Thread: user-b
│ ├─ Checkpoint 1
│ └─ Checkpoint 2
└─ Thread: user-c
└─ Checkpoint 15. 实际案例:多轮对话
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "conversation-1"}}
# 轮次 1
graph.invoke({"messages": [HumanMessage("3 + 4 = ?")]}, config)
# 💾 保存:{"messages": [HumanMessage("3 + 4 = ?"), AIMessage("7")]}
# 轮次 2(几分钟后)
graph.invoke({"messages": [HumanMessage("乘以 2")]}, config)
# ✅ 自动加载之前的状态
# 输入变成:{"messages": [
# HumanMessage("3 + 4 = ?"),
# AIMessage("7"),
# HumanMessage("乘以 2") ← 新消息追加
# ]}
# 💾 保存:包含所有历史的新状态6. Checkpointer 类型对比
| 类型 | 存储位置 | 持久化 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|---|---|
| MemorySaver | 进程内存 | ❌ | 开发测试 | 快速、简单 | 重启丢失 |
| SqliteSaver | SQLite 文件 | ✅ | 单机生产 | 持久化、本地 | 不支持分布式 |
| PostgresSaver | PostgreSQL | ✅ | 分布式生产 | 可扩展、并发 | 需要数据库 |
| RedisSaver | Redis | ✅ | 高性能生产 | 低延迟、高吞吐 | 需要 Redis |
代码示例:
# 开发环境
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
# 生产环境
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/db"
)
graph = builder.compile(checkpointer=checkpointer)7. 时间旅行:回溯到历史状态
# 运行多步
config = {"configurable": {"thread_id": "thread-1"}}
graph.invoke(input1, config) # Checkpoint 1
graph.invoke(input2, config) # Checkpoint 2
graph.invoke(input3, config) # Checkpoint 3
# 获取所有检查点
checkpoints = list(graph.checkpointer.list(config))
for cp in checkpoints:
print(f"Step {cp.metadata['step']}: {cp.state}")
# 回溯到 Checkpoint 2
config_with_checkpoint = {
"configurable": {
"thread_id": "thread-1",
"checkpoint_id": checkpoints[1].id # 第 2 个检查点
}
}
graph.invoke(input4, config_with_checkpoint)
# 从 Checkpoint 2 的状态继续执行8. Checkpointer 的内部实现(简化)
class MemorySaver:
def __init__(self):
self.storage = {} # {thread_id: [checkpoint1, checkpoint2, ...]}
def save(self, config, state, metadata):
"""保存检查点"""
thread_id = config["configurable"]["thread_id"]
checkpoint = {
"id": generate_id(),
"state": state,
"metadata": metadata,
"timestamp": now()
}
self.storage.setdefault(thread_id, []).append(checkpoint)
def load(self, config):
"""加载最新检查点"""
thread_id = config["configurable"]["thread_id"]
checkpoints = self.storage.get(thread_id, [])
return checkpoints[-1] if checkpoints else None
def list(self, config):
"""列出所有检查点"""
thread_id = config["configurable"]["thread_id"]
return self.storage.get(thread_id, [])9. 何时需要 Checkpointer
✅ 需要的场景:
- 多轮对话系统(聊天机器人)
- 长时间运行的任务(需要中断恢复)
- 需要上下文连续性("那个"、"它"等指代)
- 调试和审计(查看历史执行)
- A/B 测试(对比不同版本)
❌ 不需要的场景:
- 无状态 API(每次请求独立)
- 批处理任务(不需要记忆)
- 一次性查询(无需保存历史)
10. 最佳实践
# ✅ 好的设计
config = {
"configurable": {
"thread_id": f"user-{user_id}-session-{session_id}",
# 细粒度隔离,便于管理
}
}
# 定期清理
def cleanup_old_threads():
cutoff = datetime.now() - timedelta(days=30)
for thread_id in get_all_threads():
if thread_last_accessed(thread_id) < cutoff:
checkpointer.delete_thread({"configurable": {"thread_id": thread_id}})
# ❌ 不好的设计
config = {"configurable": {"thread_id": "global"}}
# 所有用户共享状态 → 数据混乱关键要点
- Checkpointer = 自动快照系统
- 保存时机:每个节点执行后
- Thread = 隔离不同会话的状态容器
- 用途:多轮对话、状态持久化、时间旅行
- 生产环境:使用 PostgresSaver 或 RedisSaver
问题 5: 条件边(Conditional Edge)如何决定路由?
💡 点击查看答案
答案:
条件边通过 条件函数 动态决定下一个执行的节点,实现图的分支逻辑。
1. 基础概念
普通边(Normal Edge): 固定路由
builder.add_edge("node_a", "node_b")
# node_a 总是流向 node_b条件边(Conditional Edge): 动态路由
builder.add_conditional_edges("node_a", condition_func)
# node_a 根据 condition_func 的返回值决定下一步2. 条件函数的结构
from typing import Literal
def condition_func(state: State) -> Literal["node_b", "node_c", "__end__"]:
"""
条件函数:分析状态,返回目标节点名称
参数:
state: 当前图状态
返回:
str: 目标节点的名称(必须是已添加的节点)
"""
# 读取状态
value = state["some_field"]
# 决策逻辑
if value > 10:
return "node_b"
elif value > 0:
return "node_c"
else:
return "__end__" # 或使用 END关键特性:
- 输入: 当前状态(State 对象)
- 输出: 节点名称(字符串)
- 类型提示:
Literal明确所有可能的路由
3. 完整示例:动态路由
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
from typing import Literal
# 定义状态
class State(TypedDict):
value: int
path: list[str]
# 定义节点
def input_node(state: State):
return {"path": state["path"] + ["input"]}
def process_high(state: State):
return {"path": state["path"] + ["high"]}
def process_low(state: State):
return {"path": state["path"] + ["low"]}
def end_node(state: State):
return {"path": state["path"] + ["end"]}
# 条件函数
def route_based_on_value(state: State) -> Literal["process_high", "process_low", "end"]:
if state["value"] > 10:
return "process_high"
elif state["value"] > 0:
return "process_low"
else:
return "end"
# 构建图
builder = StateGraph(State)
builder.add_node("input", input_node)
builder.add_node("process_high", process_high)
builder.add_node("process_low", process_low)
builder.add_node("end", end_node)
builder.add_edge(START, "input")
builder.add_conditional_edges(
"input", # 源节点
route_based_on_value # 条件函数
)
builder.add_edge("process_high", "end")
builder.add_edge("process_low", "end")
builder.add_edge("end", END)
graph = builder.compile()
# 🎨 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))
# 测试
result1 = graph.invoke({"value": 15, "path": []})
print(result1["path"]) # ['input', 'high', 'end']
result2 = graph.invoke({"value": 5, "path": []})
print(result2["path"]) # ['input', 'low', 'end']
result3 = graph.invoke({"value": -1, "path": []})
print(result3["path"]) # ['input', 'end']4. tools_condition:预构建的条件函数
LangGraph 提供了常用的 tools_condition:
from langgraph.prebuilt import tools_condition
# 内部实现(简化)
def tools_condition(state: MessagesState) -> Literal["tools", "__end__"]:
"""检查是否需要调用工具"""
last_message = state["messages"][-1]
# 检查最后一条消息是否有工具调用
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "tools" # 有工具调用 → 路由到 tools 节点
else:
return "__end__" # 无工具调用 → 结束使用示例:
builder.add_conditional_edges("agent", tools_condition)
# 等价于手动写:
def my_tools_condition(state):
if state["messages"][-1].tool_calls:
return "tools"
return END
builder.add_conditional_edges("agent", my_tools_condition)5. 高级用法:多目标路由映射
方式 1:自动映射(推荐)
def condition(state) -> Literal["node_a", "node_b", "node_c"]:
# 返回值自动匹配节点名称
return "node_a"
builder.add_conditional_edges("source", condition)
# LangGraph 自动创建路由:source → node_a/node_b/node_c方式 2:显式映射
def condition(state) -> str:
return "path_1" # 返回路径名称(而非节点名称)
builder.add_conditional_edges(
"source",
condition,
{
"path_1": "node_a", # 路径名 → 节点名
"path_2": "node_b",
"default": "node_c"
}
)6. 实战案例:客服路由
from langchain_core.messages import HumanMessage
class CustomerServiceState(TypedDict):
messages: list
category: str
def classify_intent(state: CustomerServiceState) -> Literal["billing", "technical", "general"]:
"""根据用户消息分类意图"""
last_message = state["messages"][-1].content.lower()
# 简单关键词匹配(实际应用中使用 LLM 分类)
if "payment" in last_message or "bill" in last_message:
return "billing"
elif "error" in last_message or "bug" in last_message:
return "technical"
else:
return "general"
# 不同部门的处理节点
def billing_handler(state):
return {"messages": [AIMessage("Billing team will assist you...")]}
def technical_handler(state):
return {"messages": [AIMessage("Technical support will help...")]}
def general_handler(state):
return {"messages": [AIMessage("How can I help you today?")]}
# 构建图
builder = StateGraph(CustomerServiceState)
builder.add_node("classifier", lambda s: s) # 占位节点
builder.add_node("billing", billing_handler)
builder.add_node("technical", technical_handler)
builder.add_node("general", general_handler)
builder.add_edge(START, "classifier")
builder.add_conditional_edges("classifier", classify_intent)
builder.add_edge("billing", END)
builder.add_edge("technical", END)
builder.add_edge("general", END)
graph = builder.compile()
# 🎨 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))
# 测试
result = graph.invoke({
"messages": [HumanMessage("I have a payment issue")],
"category": ""
})
# 路由:classifier → billing → END7. 条件边的执行流程
# 1. 源节点执行
state = source_node(state)
# 2. 调用条件函数
next_node_name = condition_func(state)
# 3. 根据返回值路由
if next_node_name == "node_a":
state = node_a(state)
elif next_node_name == "node_b":
state = node_b(state)
elif next_node_name == END:
return state
else:
raise ValueError(f"Unknown node: {next_node_name}")8. 常见错误与调试
错误 1:返回不存在的节点名
def bad_condition(state) -> str:
return "non_existent_node" # ❌ 这个节点没有被添加
builder.add_conditional_edges("source", bad_condition)
# 运行时错误:Node 'non_existent_node' not found修复:
# 方式 1:确保节点存在
builder.add_node("non_existent_node", some_func)
# 方式 2:使用 Literal 类型检查
def good_condition(state) -> Literal["node_a", "node_b"]:
return "node_a" # IDE 会提示可用选项错误 2:忘记添加所有可能路径的边
def condition(state) -> Literal["node_a", "node_b", "node_c"]:
return "node_a"
builder.add_conditional_edges("source", condition)
builder.add_edge("node_a", END)
builder.add_edge("node_b", END)
# ❌ 忘记添加 node_c → END
# 如果条件返回 "node_c",会报错调试技巧:
def debug_condition(state):
result = condition(state)
print(f"Routing from {current_node} to {result}")
print(f"State: {state}")
return result
builder.add_conditional_edges("source", debug_condition)9. 性能优化
# ❌ 不好的实践:条件函数中调用 LLM
def expensive_condition(state):
# 每次路由都调用 LLM,成本高
intent = llm.invoke("Classify: " + state["message"])
return intent
# ✅ 好的实践:在节点中完成分类
def classify_node(state):
intent = llm.invoke("Classify: " + state["message"])
return {"intent": intent}
def simple_condition(state):
# 只读取已有状态,无额外开销
return state["intent"]
builder.add_node("classify", classify_node)
builder.add_conditional_edges("classify", simple_condition)10. 与普通边的对比
| 特性 | 普通边 | 条件边 |
|---|---|---|
| 路由方式 | 固定 | 动态 |
| 添加方法 | add_edge(A, B) | add_conditional_edges(A, func) |
| 使用场景 | 线性流程 | 分支逻辑 |
| 复杂度 | 简单 | 中等 |
| 示例 | START → node1 → END | node1 → [条件判断] → node2/node3 |
关键要点
- 条件边 = 动态路由机制
- 条件函数:输入状态 → 输出节点名
- tools_condition:检查是否需要调用工具
- 使用 Literal 类型提示提高安全性
- 条件逻辑应简单快速,避免复杂计算
问题 6: 如何实现一个带多个工具的 Router?
💡 点击查看答案
答案:
实现多工具 Router 的关键是 正确绑定所有工具 和 使用 ToolNode 自动路由。
1. 完整代码示例
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode, tools_condition
# 定义多个工具
@tool
def add(a: int, b: int) -> int:
"""Add two numbers together."""
return a + b
@tool
def multiply(a: int, b: int) -> int:
"""Multiply two numbers."""
return a * b
@tool
def divide(a: float, b: float) -> float:
"""Divide a by b."""
if b == 0:
return "Error: Division by zero"
return a / b
@tool
def search_web(query: str) -> str:
"""Search the web for information."""
# 实际实现中调用搜索 API
return f"Search results for: {query}"
# 收集所有工具
tools = [add, multiply, divide, search_web]
# 创建 LLM 并绑定工具
llm = ChatOpenAI(model="gpt-4")
llm_with_tools = llm.bind_tools(tools)
# 定义节点
def call_model(state: MessagesState):
response = llm_with_tools.invoke(state["messages"])
return {"messages": [response]}
# 构建图
builder = StateGraph(MessagesState)
builder.add_node("llm", call_model)
builder.add_node("tools", ToolNode(tools)) # ← 传入所有工具
builder.add_edge(START, "llm")
builder.add_conditional_edges("llm", tools_condition)
builder.add_edge("tools", END)
graph = builder.compile()2. 关键技术点
技术点 1:@tool 装饰器
from langchain_core.tools import tool
@tool
def my_function(param: type) -> return_type:
"""Clear description of what this tool does.
Args:
param: Description of parameter
"""
return result自动转换为:
- 工具名称:函数名(
my_function) - 工具描述:Docstring 第一行
- 参数模式:从类型注解生成
- 调用接口:保持原函数逻辑
技术点 2:ToolNode 自动路由
ToolNode(tools)
# 内部实现(简化):
class ToolNode:
def __init__(self, tools):
self.tools_by_name = {t.name: t for t in tools}
def __call__(self, state):
last_message = state["messages"][-1]
results = []
for tool_call in last_message.tool_calls:
tool = self.tools_by_name[tool_call["name"]] # 自动匹配
result = tool.invoke(tool_call["args"])
results.append(ToolMessage(
content=str(result),
tool_call_id=tool_call["id"]
))
return {"messages": results}3. 测试不同工具
from langchain_core.messages import HumanMessage
# 测试 1:数学计算
result1 = graph.invoke({
"messages": [HumanMessage("What is 15 + 23?")]
})
# LLM 选择 add(15, 23) → 返回 38
# 测试 2:乘法
result2 = graph.invoke({
"messages": [HumanMessage("Multiply 7 by 8")]
})
# LLM 选择 multiply(7, 8) → 返回 56
# 测试 3:网络搜索
result3 = graph.invoke({
"messages": [HumanMessage("Search for Python tutorials")]
})
# LLM 选择 search_web("Python tutorials")
# 测试 4:组合查询
result4 = graph.invoke({
"messages": [HumanMessage("Calculate 100 / 4")]
})
# LLM 选择 divide(100, 4) → 返回 25.04. LLM 如何选择工具?
LLM 接收到的工具信息:
{
"tools": [
{
"name": "add",
"description": "Add two numbers together.",
"parameters": {
"type": "object",
"properties": {
"a": {"type": "integer"},
"b": {"type": "integer"}
}
}
},
{
"name": "multiply",
"description": "Multiply two numbers.",
"parameters": {
"type": "object",
"properties": {
"a": {"type": "integer"},
"b": {"type": "integer"}
}
}
},
...
]
}LLM 决策过程:
用户输入: "What is 15 + 23?"
↓
LLM 分析: "这是加法问题"
↓
查找工具: "add" 的描述匹配
↓
提取参数: a=15, b=23
↓
返回: tool_calls=[{"name": "add", "args": {"a": 15, "b": 23}}]5. 错误处理
@tool
def divide(a: float, b: float) -> float:
"""Divide a by b. Returns error message if b is zero."""
try:
if b == 0:
return "Error: Cannot divide by zero"
return a / b
except Exception as e:
return f"Error: {str(e)}"
# 测试错误情况
result = graph.invoke({
"messages": [HumanMessage("What is 10 divided by 0?")]
})
# 工具返回: "Error: Cannot divide by zero"
# LLM 收到错误信息,可以给出友好回复6. 工具分类管理
对于大量工具,可以分类管理:
# 数学工具
math_tools = [add, multiply, divide]
# 搜索工具
search_tools = [search_web, search_database]
# 文件工具
file_tools = [read_file, write_file]
# 场景 1:数学助手(只绑定数学工具)
math_llm = llm.bind_tools(math_tools)
# 场景 2:通用助手(绑定所有工具)
general_llm = llm.bind_tools(math_tools + search_tools + file_tools)7. 动态工具加载
def call_model_dynamic(state: MessagesState):
# 根据用户消息动态选择工具
last_message = state["messages"][-1].content.lower()
if "calculate" in last_message or "math" in last_message:
tools = math_tools
elif "search" in last_message:
tools = search_tools
else:
tools = math_tools + search_tools # 默认全部
llm_with_selected_tools = llm.bind_tools(tools)
response = llm_with_selected_tools.invoke(state["messages"])
return {"messages": [response]}8. 工具调用统计
from collections import Counter
class ToolNodeWithStats(ToolNode):
def __init__(self, tools):
super().__init__(tools)
self.call_stats = Counter()
def __call__(self, state):
last_message = state["messages"][-1]
# 统计工具使用
for tool_call in last_message.tool_calls:
self.call_stats[tool_call["name"]] += 1
# 执行工具
result = super().__call__(state)
# 打印统计
print(f"Tool usage: {dict(self.call_stats)}")
return result
# 使用
tool_node = ToolNodeWithStats(tools)
builder.add_node("tools", tool_node)9. 最佳实践
# ✅ 好的工具定义
@tool
def calculate_discount(price: float, discount_percent: float) -> float:
"""Calculate the final price after applying a discount.
Args:
price: Original price in dollars
discount_percent: Discount percentage (0-100)
Returns:
Final price after discount
"""
if not (0 <= discount_percent <= 100):
return "Error: Discount must be between 0 and 100"
return price * (1 - discount_percent / 100)
# ❌ 不好的工具定义
@tool
def calc(p, d): # 无类型注解
"""Calculate.""" # 描述不清楚
return p * d # 逻辑不明确10. 完整示例:多功能助手
from langchain_core.tools import tool
# 工具集合
@tool
def get_weather(city: str) -> str:
"""Get current weather for a city."""
return f"Weather in {city}: Sunny, 72°F"
@tool
def book_flight(origin: str, destination: str, date: str) -> str:
"""Book a flight ticket."""
return f"Flight booked: {origin} → {destination} on {date}"
@tool
def translate(text: str, target_lang: str) -> str:
"""Translate text to target language."""
return f"[{target_lang}] {text}"
tools = [get_weather, book_flight, translate, add, multiply]
# 构建图
llm_with_tools = ChatOpenAI(model="gpt-4").bind_tools(tools)
def assistant(state: MessagesState):
return {"messages": [llm_with_tools.invoke(state["messages"])]}
builder = StateGraph(MessagesState)
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "assistant")
builder.add_conditional_edges("assistant", tools_condition)
builder.add_edge("tools", END)
graph = builder.compile()
# 🎨 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))
# 测试多样化查询
queries = [
"What's the weather in New York?",
"Book a flight from LA to NYC on March 15",
"Translate 'Hello' to Spanish",
"What is 25 times 4?"
]
for query in queries:
result = graph.invoke({"messages": [HumanMessage(query)]})
print(f"Query: {query}")
print(f"Response: {result['messages'][-1].content}\n")关键要点
- 多工具绑定:
llm.bind_tools([tool1, tool2, ...]) - ToolNode 自动路由: 根据 tool_calls 中的 name 匹配工具
- 工具定义关键: 清晰的 Docstring + 类型注解
- LLM 自动选择: 基于描述和参数匹配
- 可扩展性: 轻松添加新工具,无需修改图结构
问题 7: 如何让 Agent 避免无限循环?
💡 点击查看答案
答案:
防止 Agent 无限循环需要 设置递归限制 和 智能停止条件。
1. 问题:为什么会出现无限循环?
Agent 的循环结构:
builder.add_edge("tools", "agent") # 工具执行后回到 agent
# 流程:
# agent → tools → agent → tools → agent → ...可能导致无限循环的场景:
- LLM 一直认为需要调用工具
- 工具返回的结果无法让 LLM 满意
- LLM 陷入"思考-行动"死循环
2. 方法 1:使用 recursion_limit
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
graph = builder.compile(
checkpointer=memory,
recursion_limit=10 # ← 最多执行 10 次节点
)
# 测试
try:
result = graph.invoke({"messages": [HumanMessage("test")]})
except RecursionError:
print("达到递归限制!")工作原理:
Step 1: agent
Step 2: tools
Step 3: agent
...
Step 10: agent
Step 11: ❌ RecursionError: 达到限制3. 方法 2:在状态中计数
from typing import Annotated
import operator
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
iterations: Annotated[int, operator.add] # 累加计数器
def agent(state: AgentState):
# 检查迭代次数
current_iterations = state.get("iterations", 0)
if current_iterations >= 10:
return {
"messages": [AIMessage("已达到最大迭代次数,停止执行。")],
"iterations": 0 # 重置计数器
}
# 正常执行
response = llm_with_tools.invoke(state["messages"])
return {
"messages": [response],
"iterations": 1 # 每次增加 1
}
# 初始状态
initial_state = {
"messages": [HumanMessage("Start")],
"iterations": 0
}
result = graph.invoke(initial_state)4. 方法 3:智能停止条件
def tools_condition_with_limit(state: AgentState) -> Literal["tools", "__end__"]:
"""增强版 tools_condition,包含多重停止条件"""
# 条件 1:检查迭代次数
if state.get("iterations", 0) >= 15:
return "__end__"
# 条件 2:检查消息数量(防止对话过长)
if len(state["messages"]) > 50:
return "__end__"
# 条件 3:检查是否有工具调用
last_message = state["messages"][-1]
if not hasattr(last_message, "tool_calls") or not last_message.tool_calls:
return "__end__"
# 条件 4:检查是否重复调用相同工具
if len(last_message.tool_calls) > 0:
tool_name = last_message.tool_calls[0]["name"]
# 检查最近 3 次是否都调用同一工具
recent_calls = []
for msg in state["messages"][-6:]: # 最近 3 轮(每轮 2 条消息)
if hasattr(msg, "tool_calls") and msg.tool_calls:
recent_calls.append(msg.tool_calls[0]["name"])
if recent_calls.count(tool_name) >= 3:
print(f"警告:工具 {tool_name} 被重复调用,停止执行")
return "__end__"
return "tools"
# 使用增强版条件
builder.add_conditional_edges("agent", tools_condition_with_limit)5. 方法 4:超时机制
import time
from datetime import datetime, timedelta
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
start_time: float
def agent_with_timeout(state: AgentState):
# 记录开始时间
if "start_time" not in state:
state["start_time"] = time.time()
# 检查超时(例如:最多运行 60 秒)
elapsed = time.time() - state["start_time"]
if elapsed > 60:
return {
"messages": [AIMessage("执行超时,已停止。")],
"start_time": state["start_time"]
}
# 正常执行
response = llm_with_tools.invoke(state["messages"])
return {
"messages": [response],
"start_time": state["start_time"]
}6. 方法 5:工具调用成本限制
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
total_tokens: int
def agent_with_budget(state: AgentState):
# 检查 token 使用量
total_tokens = state.get("total_tokens", 0)
max_tokens = 10000
if total_tokens >= max_tokens:
return {
"messages": [AIMessage(f"已达到 token 限制 ({max_tokens}),停止执行。")],
"total_tokens": total_tokens
}
# 调用 LLM
response = llm_with_tools.invoke(state["messages"])
# 统计 token
tokens_used = response.response_metadata.get("token_usage", {}).get("total_tokens", 0)
return {
"messages": [response],
"total_tokens": total_tokens + tokens_used
}7. 方法 6:LLM 提示词优化
from langchain_core.messages import SystemMessage
system_prompt = SystemMessage(content="""
You are a helpful assistant. Follow these rules:
1. **Efficiency**: Minimize the number of tool calls needed
2. **Completion**: Once you have enough information, provide a final answer
3. **No Loops**: If a tool fails twice, stop trying and explain the issue
4. **Token Awareness**: Keep responses concise
When you have completed the task, respond with a final answer WITHOUT tool calls.
""")
def agent_with_prompt(state: MessagesState):
messages = [system_prompt] + state["messages"]
response = llm_with_tools.invoke(messages)
return {"messages": [response]}8. 完整示例:综合防护
from typing import Annotated
import operator
import time
class RobustAgentState(TypedDict):
messages: Annotated[list, add_messages]
iterations: Annotated[int, operator.add]
total_tokens: int
start_time: float
def robust_agent(state: RobustAgentState):
"""带多重保护的 Agent"""
# 初始化
if "start_time" not in state:
state["start_time"] = time.time()
iterations = state.get("iterations", 0)
total_tokens = state.get("total_tokens", 0)
elapsed = time.time() - state["start_time"]
# 保护 1:迭代次数
if iterations >= 10:
return {
"messages": [AIMessage("达到最大迭代次数 (10)。")],
"iterations": 0
}
# 保护 2:执行时间
if elapsed > 120: # 2 分钟
return {
"messages": [AIMessage("执行超时 (120s)。")],
"iterations": 0
}
# 保护 3:Token 预算
if total_tokens >= 50000:
return {
"messages": [AIMessage("达到 token 限制 (50000)。")],
"iterations": 0
}
# 正常执行
try:
response = llm_with_tools.invoke(state["messages"])
tokens_used = response.response_metadata.get("token_usage", {}).get("total_tokens", 0)
return {
"messages": [response],
"iterations": 1,
"total_tokens": tokens_used
}
except Exception as e:
return {
"messages": [AIMessage(f"执行错误:{str(e)}")],
"iterations": 0
}
def robust_tools_condition(state: RobustAgentState):
"""增强版停止条件"""
last_message = state["messages"][-1]
# 检查是否有工具调用
if not hasattr(last_message, "tool_calls") or not last_message.tool_calls:
return "__end__"
# 检查保护条件
if state.get("iterations", 0) >= 10:
return "__end__"
return "tools"
# 构建图
builder = StateGraph(RobustAgentState)
builder.add_node("agent", robust_agent)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", robust_tools_condition)
builder.add_edge("tools", "agent")
# 使用 recursion_limit 作为最后防线
graph = builder.compile(recursion_limit=25)9. 监控和调试
def agent_with_logging(state: AgentState):
iterations = state.get("iterations", 0)
print(f"[Iteration {iterations}] Agent executing...")
response = llm_with_tools.invoke(state["messages"])
# 记录工具调用
if hasattr(response, "tool_calls") and response.tool_calls:
for tc in response.tool_calls:
print(f" → Calling tool: {tc['name']}")
else:
print(f" → Generating final response")
return {
"messages": [response],
"iterations": 1
}10. 最佳实践总结
| 方法 | 优先级 | 适用场景 | 实现难度 |
|---|---|---|---|
| recursion_limit | 必须 | 所有 Agent | 简单 |
| 迭代计数 | 推荐 | 长时间运行的 Agent | 中等 |
| 超时机制 | 推荐 | 生产环境 | 中等 |
| Token 限制 | 可选 | 成本敏感场景 | 中等 |
| 智能停止条件 | 推荐 | 复杂 Agent | 较难 |
| 提示词优化 | 必须 | 所有 Agent | 简单 |
推荐配置:
# 1. 提示词明确指示何时停止
# 2. recursion_limit=20(合理上限)
# 3. 状态中跟踪迭代次数
# 4. 条件函数检查异常模式
# 5. 生产环境添加超时和 token 限制关键要点
- 多层防护: 不要依赖单一机制
- recursion_limit: 最基本的保护
- 状态计数: 更精细的控制
- 智能条件: 检测异常循环模式
- 监控日志: 便于调试和优化
问题 8: 如何实现跨会话的对话记忆?
💡 点击查看答案
答案:
跨会话对话记忆通过 Checkpointer + Thread ID 实现状态持久化和恢复。
1. 核心概念
会话(Session): 用户与 Agent 的一次完整交互过程 跨会话记忆: 在多次独立会话之间保持用户信息和对话历史
会话 1 (今天上午)
用户: "我的订单号是 12345"
Agent: "好的,订单 12345 已记录"
会话 2 (今天下午,新的浏览器标签)
用户: "那个订单发货了吗?"
Agent: "您的订单 12345 已于今天上午发货" ✅ 记住了订单号2. 基础实现:MemorySaver
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, MessagesState
# 创建检查点器
memory = MemorySaver()
# 编译图
graph = builder.compile(checkpointer=memory)
# 会话 1
config = {"configurable": {"thread_id": "user-123"}}
result1 = graph.invoke(
{"messages": [HumanMessage("我的订单号是 12345")]},
config
)
# 会话 2(稍后,同一 thread_id)
result2 = graph.invoke(
{"messages": [HumanMessage("那个订单发货了吗?")]},
config
)
# ✅ 自动加载之前的对话历史3. 生产实现:PostgreSQL 持久化
from langgraph.checkpoint.postgres import PostgresSaver
import psycopg2
# 连接数据库
conn_string = "postgresql://user:password@localhost:5432/langgraph_db"
checkpointer = PostgresSaver.from_conn_string(conn_string)
# 编译图(与 MemorySaver 接口完全相同)
graph = builder.compile(checkpointer=checkpointer)
# 使用方式完全相同
config = {"configurable": {"thread_id": "user-123"}}
result = graph.invoke({"messages": [...]}, config)PostgreSQL 表结构:
CREATE TABLE checkpoints (
thread_id TEXT,
checkpoint_id TEXT PRIMARY KEY,
parent_id TEXT,
created_at TIMESTAMP,
state JSONB, -- 完整状态数据
metadata JSONB
);
CREATE INDEX idx_thread_id ON checkpoints(thread_id);4. Thread ID 设计模式
模式 1:按用户隔离
def get_config(user_id: str):
return {"configurable": {"thread_id": f"user-{user_id}"}}
# 用户 A
config_a = get_config("alice")
graph.invoke(input, config_a)
# 用户 B(完全独立)
config_b = get_config("bob")
graph.invoke(input, config_b)模式 2:按会话隔离
import uuid
def create_new_session(user_id: str):
session_id = str(uuid.uuid4())
return {"configurable": {"thread_id": f"{user_id}-{session_id}"}}
# 用户开始新对话
config = create_new_session("alice")
# thread_id: "alice-4f8d3c2a-..."模式 3:按功能隔离
def get_config(user_id: str, context: str):
return {"configurable": {"thread_id": f"{user_id}-{context}"}}
# 客服对话
support_config = get_config("alice", "support")
# 订单查询
order_config = get_config("alice", "order")
# 两个独立的对话线程5. 扩展状态:用户偏好记忆
class UserState(TypedDict):
messages: Annotated[list, add_messages]
user_id: str
preferences: dict # 用户偏好
conversation_count: Annotated[int, operator.add]
def agent_with_preferences(state: UserState):
# 读取用户偏好
prefs = state.get("preferences", {})
style = prefs.get("response_style", "formal")
# 根据偏好调整系统提示
if style == "casual":
system_msg = "You are a friendly, casual assistant."
else:
system_msg = "You are a professional, formal assistant."
messages = [SystemMessage(system_msg)] + state["messages"]
response = llm.invoke(messages)
return {
"messages": [response],
"conversation_count": 1
}
# 会话 1:设置偏好
initial_state = {
"messages": [HumanMessage("我喜欢简洁的回答")],
"user_id": "alice",
"preferences": {"response_style": "casual"},
"conversation_count": 0
}
config = {"configurable": {"thread_id": "alice"}}
graph.invoke(initial_state, config)
# 会话 2:自动应用偏好
later_state = {
"messages": [HumanMessage("解释量子计算")]
}
result = graph.invoke(later_state, config)
# ✅ 自动加载 preferences,使用 casual 风格6. 时间限制的记忆
from datetime import datetime, timedelta
class TimedState(TypedDict):
messages: Annotated[list, add_messages]
last_active: str # ISO 格式时间戳
def trim_old_messages(state: TimedState) -> TimedState:
"""删除超过 7 天的消息"""
cutoff = datetime.now() - timedelta(days=7)
# 解析 last_active
if "last_active" in state:
last_active = datetime.fromisoformat(state["last_active"])
if last_active < cutoff:
# 太久没活动,清空历史
return {
"messages": [],
"last_active": datetime.now().isoformat()
}
# 更新活动时间
return {
**state,
"last_active": datetime.now().isoformat()
}
def agent(state: TimedState):
# 先清理旧消息
state = trim_old_messages(state)
# 正常处理
response = llm.invoke(state["messages"])
return {"messages": [response]}7. 多设备同步
from fastapi import FastAPI, HTTPException
from langgraph_sdk import get_client
app = FastAPI()
client = get_client(url="https://your-langgraph-cloud.app")
@app.post("/chat")
async def chat(user_id: str, message: str, device_id: str):
"""支持多设备访问同一对话"""
# 使用 user_id 作为 thread_id(不包含 device_id)
config = {"configurable": {"thread_id": f"user-{user_id}"}}
# 执行图
response = await client.runs.create(
thread_id=config["configurable"]["thread_id"],
assistant_id="agent",
input={"messages": [HumanMessage(message)]},
metadata={"device_id": device_id} # 记录设备信息
)
return {"response": response}
# 使用场景:
# 设备 1 (手机): POST /chat?user_id=alice&message=Hi&device_id=phone
# 设备 2 (电脑): POST /chat?user_id=alice&message=继续&device_id=laptop
# 两个设备看到相同的对话历史8. 查询历史对话
def get_conversation_history(thread_id: str, limit: int = 10):
"""获取对话历史"""
config = {"configurable": {"thread_id": thread_id}}
# 获取所有检查点
checkpoints = list(graph.checkpointer.list(config))
# 按时间倒序
checkpoints.sort(key=lambda x: x.created_at, reverse=True)
# 提取消息
history = []
for cp in checkpoints[:limit]:
messages = cp.state.get("messages", [])
history.append({
"timestamp": cp.created_at,
"messages": [
{"role": m.type, "content": m.content}
for m in messages
]
})
return history
# 使用
history = get_conversation_history("user-alice")
for session in history:
print(f"\n=== {session['timestamp']} ===")
for msg in session['messages']:
print(f"{msg['role']}: {msg['content']}")9. 清理和过期管理
import asyncio
from datetime import datetime, timedelta
async def cleanup_old_threads(checkpointer, days: int = 30):
"""定期清理超过 N 天的对话"""
cutoff = datetime.now() - timedelta(days=days)
# 获取所有 thread
all_threads = await checkpointer.list_threads()
deleted_count = 0
for thread_id in all_threads:
config = {"configurable": {"thread_id": thread_id}}
checkpoints = list(checkpointer.list(config))
if not checkpoints:
continue
# 检查最后活动时间
last_checkpoint = max(checkpoints, key=lambda x: x.created_at)
if last_checkpoint.created_at < cutoff:
checkpointer.delete_thread(config)
deleted_count += 1
print(f"Deleted thread: {thread_id}")
print(f"Total deleted: {deleted_count} threads")
# 定期运行(例如每天凌晨)
async def scheduled_cleanup():
while True:
await cleanup_old_threads(checkpointer, days=30)
await asyncio.sleep(86400) # 24 小时10. 完整示例:生产级记忆系统
from langgraph.checkpoint.postgres import PostgresSaver
from typing import Annotated
import operator
from datetime import datetime
class ProductionState(TypedDict):
messages: Annotated[list, add_messages]
user_id: str
user_name: str
preferences: dict
session_count: Annotated[int, operator.add]
last_active: str
metadata: dict
# 数据库连接
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/langgraph"
)
def production_agent(state: ProductionState):
# 更新活动时间
state["last_active"] = datetime.now().isoformat()
# 个性化问候
if state.get("session_count", 0) == 0:
greeting = f"你好 {state.get('user_name', '用户')}!很高兴为你服务。"
else:
greeting = f"欢迎回来,{state.get('user_name')}!"
# 构建消息
messages = [SystemMessage(greeting)] + state["messages"]
response = llm.invoke(messages)
return {
"messages": [response],
"session_count": 1,
"last_active": state["last_active"]
}
# 构建图
builder = StateGraph(ProductionState)
builder.add_node("agent", production_agent)
builder.add_edge(START, "agent")
builder.add_edge("agent", END)
graph = builder.compile(checkpointer=checkpointer)
# API 端点
@app.post("/api/chat")
async def chat_endpoint(
user_id: str,
message: str,
user_name: str = "用户"
):
config = {"configurable": {"thread_id": f"user-{user_id}"}}
# 调用图
result = await graph.ainvoke(
{
"messages": [HumanMessage(message)],
"user_id": user_id,
"user_name": user_name,
"preferences": {}, # 可以从数据库加载
"session_count": 0,
"metadata": {"endpoint": "/api/chat"}
},
config
)
return {
"response": result["messages"][-1].content,
"session_count": result.get("session_count", 0)
}关键要点
- Checkpointer + Thread ID = 跨会话记忆
- MemorySaver:开发测试
- PostgresSaver:生产环境
- Thread ID 设计:按用户/会话/功能隔离
- 定期清理:避免无限增长
由于篇幅限制,让我继续创建剩余的问题 9-15...
问题 9: 如何在本地测试后部署到云端?
💡 点击查看答案
答案:
本地测试到云端部署的完整流程包括 本地开发 → GitHub 推送 → LangSmith 配置 → 云端部署。
步骤 1:本地开发和测试
# 项目结构
my-agent/
├── agent.py # 图定义
├── langgraph.json # 配置文件
├── .env # 环境变量(不提交)
├── .env.example # 环境变量模板
├── .gitignore # Git 忽略文件
└── requirements.txt # Python 依赖
# 启动本地开发服务器
cd my-agent
langgraph dev
# 输出:
# 🚀 API: http://127.0.0.1:2024
# 🎨 Studio UI: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024步骤 2:Git 准备
# .gitignore 内容
.env
*.env
__pycache__/
.venv/
# 初始化 Git
git init
git add .
git commit -m "Initial commit: LangGraph agent"
# 创建 GitHub 仓库并推送
git remote add origin https://github.com/username/my-agent.git
git push -u origin main步骤 3:LangSmith 部署配置
- 访问 LangSmith
- 导航到 Deployments → New Deployment
- 选择 Deploy from GitHub
- 配置:
- Repository:
username/my-agent - Branch:
main - Config file:
langgraph.json
- Repository:
- 设置环境变量(从
.env复制) - 点击 Deploy
步骤 4:使用 SDK 调用云端
from langgraph_sdk import get_client
# 本地测试
local_client = get_client(url="http://127.0.0.1:2024")
# 云端部署
cloud_client = get_client(url="https://your-deployment.langgraph.app")
# API 完全相同!
async for chunk in cloud_client.runs.stream(
thread_id="user-123",
assistant_id="agent",
input={"messages": [...]},
stream_mode="values"
):
print(chunk)关键要点
- 本地测试:
langgraph dev快速迭代 - Git 管理: 不要提交
.env文件 - 环境变量: 云端需手动配置
- SDK 统一: 本地/云端 API 完全一致
问题 10: 如何处理工具调用失败的情况?
💡 点击查看答案
答案:
工具调用失败处理需要 异常捕获 + 错误消息返回 + 重试机制。
方法 1:工具内部错误处理
from langchain_core.tools import tool
@tool
def divide(a: float, b: float) -> str:
"""Divide a by b with error handling."""
try:
if b == 0:
return "Error: Division by zero is not allowed"
result = a / b
return f"Result: {result}"
except Exception as e:
return f"Error: {str(e)}"方法 2:ToolNode 包装器
class SafeToolNode:
def __init__(self, tools):
self.tool_node = ToolNode(tools)
def __call__(self, state: MessagesState):
try:
return self.tool_node(state)
except Exception as e:
# 返回错误消息给 LLM
last_message = state["messages"][-1]
error_msg = ToolMessage(
content=f"Tool execution failed: {str(e)}",
tool_call_id=last_message.tool_calls[0]["id"] if last_message.tool_calls else "error"
)
return {"messages": [error_msg]}
# 使用
builder.add_node("tools", SafeToolNode(tools))方法 3:重试机制
from tenacity import retry, stop_after_attempt, wait_exponential
@tool
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def api_call(endpoint: str) -> str:
"""Call external API with retry."""
response = requests.get(endpoint, timeout=10)
response.raise_for_status()
return response.text方法 4:Agent 级别处理
def agent_with_error_recovery(state: MessagesState):
last_message = state["messages"][-1]
# 检查是否是错误消息
if isinstance(last_message, ToolMessage) and "Error:" in last_message.content:
# LLM 分析错误并决定下一步
recovery_prompt = SystemMessage(
"The tool call failed. Analyze the error and either retry with different parameters or provide an alternative solution."
)
messages = [recovery_prompt] + state["messages"]
response = llm_with_tools.invoke(messages)
else:
response = llm_with_tools.invoke(state["messages"])
return {"messages": [response]}关键要点
- 工具内部: try-except 捕获异常
- 返回错误消息: 让 LLM 知道失败原因
- 重试机制: 对临时失败自动重试
- Agent 恢复: 分析错误并调整策略
问题 11: 设计一个客服机器人应该选择哪种架构模式?
💡 点击查看答案
答案:
客服机器人应选择 Agent + Memory 架构,结合 Router 进行意图分类。
推荐架构:分层设计
# 第 1 层:意图分类(Router)
def classify_intent(state) -> Literal["faq", "order_query", "complaint", "general"]:
last_message = state["messages"][-1].content
# 使用 LLM 分类或关键词匹配
classification = llm.invoke(f"Classify intent: {last_message}")
return classification
# 第 2 层:专业处理(Agent)
def order_agent(state):
"""处理订单相关查询(可能需要多步骤)"""
tools = [check_order_status, track_shipment, cancel_order]
llm_with_tools = llm.bind_tools(tools)
response = llm_with_tools.invoke(state["messages"])
return {"messages": [response]}
def faq_agent(state):
"""处理常见问题(简单查询)"""
# 从知识库检索答案
answer = search_knowledge_base(state["messages"][-1].content)
return {"messages": [AIMessage(answer)]}
# 构建图
builder = StateGraph(MessagesState)
builder.add_node("classifier", lambda s: s)
builder.add_node("order_agent", order_agent)
builder.add_node("faq_agent", faq_agent)
builder.add_node("tools", ToolNode(tools))
# 路由逻辑
builder.add_edge(START, "classifier")
builder.add_conditional_edges("classifier", classify_intent)
builder.add_conditional_edges("order_agent", tools_condition)
builder.add_edge("tools", "order_agent") # Agent 模式的循环
builder.add_edge("faq_agent", END)
builder.add_edge("order_agent", END)
# 添加 Memory
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string("postgresql://...")
graph = builder.compile(checkpointer=checkpointer)为什么这样设计?
- Router(意图分类): 快速分流简单问题
- Agent(订单处理): 处理需要多步骤的复杂查询
- Memory(对话记忆): 记住用户信息和上下文
- 分层处理: 平衡效率和能力
使用示例
config = {"configurable": {"thread_id": "customer-12345"}}
# 会话 1:订单查询
graph.invoke({"messages": [HumanMessage("我的订单 #98765 在哪里?")]}, config)
# → 路由到 order_agent → 调用 track_shipment → 返回物流信息
# 会话 2:后续问题(利用 Memory)
graph.invoke({"messages": [HumanMessage("那个订单可以取消吗?")]}, config)
# → 记住订单号 #98765 → 调用 cancel_order关键要点
- 分层架构: Router + Agent 结合
- 工具分类: 不同场景使用不同工具集
- Memory 必需: 客服需要记住上下文
- 错误处理: 工具失败时提供人工转接选项
问题 12: 如何优化 Agent 的 Token 使用?
💡 点击查看答案
答案:
优化 Token 使用需要 消息修剪 + 提示词优化 + 工具选择策略。
方法 1:限制消息历史长度
from langchain_core.messages import trim_messages
def agent_with_trim(state: MessagesState):
# 只保留最近 10 条消息
trimmed_messages = trim_messages(
state["messages"],
max_tokens=2000,
strategy="last",
token_counter=llm
)
response = llm_with_tools.invoke(trimmed_messages)
return {"messages": [response]}方法 2:消息摘要
def summarize_old_messages(state: MessagesState):
messages = state["messages"]
if len(messages) > 20:
# 摘要前 10 条消息
old_messages = messages[:10]
summary = llm.invoke([
SystemMessage("Summarize this conversation in 2-3 sentences:"),
*old_messages
])
# 保留摘要 + 最近消息
new_messages = [
SystemMessage(f"Previous conversation summary: {summary.content}"),
*messages[10:]
]
return {"messages": new_messages}
return state方法 3:减少系统提示长度
# ❌ 不好:冗长的系统提示
system_msg = SystemMessage(content="""
You are a highly intelligent, professional, and courteous AI assistant...
[500 words of instructions]
""")
# ✅ 好:简洁的系统提示
system_msg = SystemMessage(content="""
You are a helpful assistant. Be concise and accurate.
Use tools when needed. Provide final answers without tool calls.
""")方法 4:智能工具选择
def agent_with_selective_tools(state: MessagesState):
last_message = state["messages"][-1].content.lower()
# 根据意图只绑定相关工具
if "calculate" in last_message:
tools = [add, multiply, divide]
elif "search" in last_message:
tools = [search_web]
else:
tools = [add, search_web] # 最小工具集
llm_with_tools = llm.bind_tools(tools)
response = llm_with_tools.invoke(state["messages"])
return {"messages": [response]}方法 5:使用更便宜的模型
# 分级模型策略
def get_llm_for_task(task_complexity: str):
if task_complexity == "simple":
return ChatOpenAI(model="gpt-3.5-turbo") # 便宜
elif task_complexity == "complex":
return ChatOpenAI(model="gpt-4") # 强大
else:
return ChatOpenAI(model="gpt-4o-mini") # 平衡方法 6:Token 使用监控
class TokenTrackingState(TypedDict):
messages: Annotated[list, add_messages]
total_tokens: int
cost_usd: float
def agent_with_tracking(state: TokenTrackingState):
response = llm_with_tools.invoke(state["messages"])
# 提取 token 使用
usage = response.response_metadata.get("token_usage", {})
tokens = usage.get("total_tokens", 0)
# 计算成本(GPT-4 示例:$0.03/1K tokens)
cost = (tokens / 1000) * 0.03
return {
"messages": [response],
"total_tokens": tokens,
"cost_usd": cost
}关键要点
- 消息修剪: 限制历史长度
- 摘要策略: 压缩旧对话
- 提示词优化: 简洁明确
- 选择性工具: 减少函数描述开销
- 监控成本: 实时跟踪 token 使用
问题 13: 如何设计多用户并发的 Agent 系统?
💡 点击查看答案
答案:
多用户并发系统需要 Thread ID 隔离 + 数据库 Checkpointer + 异步处理。
核心设计原则
# 每个用户独立的 Thread ID
def get_user_config(user_id: str, session_id: str = None):
if session_id:
thread_id = f"user-{user_id}-session-{session_id}"
else:
thread_id = f"user-{user_id}"
return {"configurable": {"thread_id": thread_id}}生产级架构:FastAPI + PostgreSQL
from fastapi import FastAPI, HTTPException
from langgraph_sdk import get_client
from langgraph.checkpoint.postgres import PostgresSaver
import asyncio
app = FastAPI()
# 数据库 Checkpointer(支持并发)
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/langgraph"
)
# 编译图
graph = builder.compile(checkpointer=checkpointer)
@app.post("/chat")
async def chat_endpoint(user_id: str, message: str):
"""处理单个用户请求(异步)"""
config = get_user_config(user_id)
# 异步调用(非阻塞)
result = await graph.ainvoke(
{"messages": [HumanMessage(message)]},
config
)
return {"response": result["messages"][-1].content}
@app.post("/chat/batch")
async def batch_chat(requests: list[dict]):
"""批量处理多个用户请求(并发)"""
tasks = []
for req in requests:
config = get_user_config(req["user_id"])
task = graph.ainvoke(
{"messages": [HumanMessage(req["message"])]},
config
)
tasks.append(task)
# 并发执行
results = await asyncio.gather(*tasks)
return {"responses": [r["messages"][-1].content for r in results]}负载均衡和扩展
# 使用 Redis 作为分布式锁
from redis import Redis
import hashlib
redis_client = Redis(host='localhost', port=6379, db=0)
def get_shard_for_user(user_id: str, num_shards: int = 4) -> int:
"""将用户分配到不同的处理分片"""
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
return hash_value % num_shards
@app.post("/chat/balanced")
async def balanced_chat(user_id: str, message: str):
"""负载均衡的聊天端点"""
shard = get_shard_for_user(user_id)
# 使用分布式锁防止并发写入
lock_key = f"user_lock:{user_id}"
with redis_client.lock(lock_key, timeout=30):
config = get_user_config(user_id)
result = await graph.ainvoke(
{"messages": [HumanMessage(message)]},
config
)
return {
"response": result["messages"][-1].content,
"shard": shard
}监控和限流
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter
@app.on_event("startup")
async def startup():
await FastAPILimiter.init(redis_client)
@app.post("/chat", dependencies=[RateLimiter(times=10, seconds=60)])
async def rate_limited_chat(user_id: str, message: str):
"""每个用户每分钟最多 10 次请求"""
config = get_user_config(user_id)
result = await graph.ainvoke({"messages": [HumanMessage(message)]}, config)
return {"response": result["messages"][-1].content}关键要点
- Thread ID 隔离: 每个用户独立状态
- 异步处理: 使用
ainvoke和asyncio - 数据库 Checkpointer: PostgreSQL 支持并发
- 负载均衡: 用户分片和分布式锁
- 限流保护: 防止滥用
问题 14: Simple Graph vs Chain vs Agent:如何选择?
💡 点击查看答案
答案:
选择架构模式基于 任务复杂度、决策需求、工具数量。
决策树
你的任务需求:
是否需要 LLM?
├─ 否 → Simple Graph(纯逻辑流程)
└─ 是 ↓
是否需要调用工具?
├─ 否 → 简单 LLM 调用(不需要 LangGraph)
└─ 是 ↓
工具调用后是否需要 LLM 继续推理?
├─ 否 → Router / Chain(单次工具调用)
└─ 是 ↓
是否需要多轮工具调用?
├─ 否 → Router(一次决策)
└─ 是 → Agent(循环推理)
是否需要跨会话记忆?
├─ 否 → 无 Checkpointer
└─ 是 → Agent + Memory详细对比表
| 场景 | Simple Graph | Chain | Router | Agent | Agent + Memory |
|---|---|---|---|---|---|
| Hello World 示例 | ✅ | ❌ | ❌ | ❌ | ❌ |
| 单次工具调用 | ❌ | ✅ | ✅ | 可以 | 可以 |
| 多次工具调用 | ❌ | ❌ | ❌ | ✅ | ✅ |
| 需要上下文记忆 | ❌ | ❌ | ❌ | ❌ | ✅ |
| 复杂度 | 低 | 低 | 中 | 高 | 高 |
| 成本 | 低 | 中 | 中 | 高 | 高 |
实际案例推荐
场景 1:简单问答
# 推荐:Simple Graph 或直接 LLM
response = llm.invoke("What is Python?")场景 2:天气查询
# 推荐:Router
tools = [get_weather]
llm_with_tools = llm.bind_tools(tools)
# 一次调用工具,返回结果场景 3:订单处理
# 推荐:Agent
# 可能需要:查询订单 → 检查库存 → 更新状态
tools = [check_order, check_inventory, update_order]
# 需要多步推理场景 4:客服聊天机器人
# 推荐:Agent + Memory
# 需要记住用户信息和对话历史
checkpointer = PostgresSaver(...)
graph = builder.compile(checkpointer=checkpointer)场景 5:数据分析助手
# 推荐:Agent + Memory
# 查询 → 分析 → 可视化(多步骤)
# 记住数据集和分析历史关键要点
- Simple Graph: 学习和简单流程
- Chain/Router: 单次工具调用
- Agent: 多步推理任务
- Agent + Memory: 多轮对话系统
问题 15: 如何设计可扩展的工具系统?
💡 点击查看答案
答案:
可扩展工具系统需要 模块化设计 + 动态加载 + 工具注册表。
设计 1:工具注册表模式
from typing import Dict, List
from langchain_core.tools import tool
class ToolRegistry:
"""工具注册中心"""
def __init__(self):
self.tools: Dict[str, callable] = {}
self.categories: Dict[str, List[str]] = {}
def register(self, category: str = "general"):
"""装饰器:注册工具"""
def decorator(func):
tool_func = tool(func)
self.tools[func.__name__] = tool_func
self.categories.setdefault(category, []).append(func.__name__)
return tool_func
return decorator
def get_tools(self, category: str = None) -> List:
"""获取工具列表"""
if category:
tool_names = self.categories.get(category, [])
return [self.tools[name] for name in tool_names]
return list(self.tools.values())
# 全局注册表
registry = ToolRegistry()
# 注册工具
@registry.register(category="math")
def add(a: int, b: int) -> int:
"""Add two numbers."""
return a + b
@registry.register(category="math")
def multiply(a: int, b: int) -> int:
"""Multiply two numbers."""
return a * b
@registry.register(category="search")
def search_web(query: str) -> str:
"""Search the web."""
return f"Results for: {query}"
# 使用
math_tools = registry.get_tools("math")
all_tools = registry.get_tools()设计 2:插件化工具加载
import importlib
import os
from pathlib import Path
class ToolLoader:
"""动态加载工具插件"""
def __init__(self, plugin_dir: str = "plugins"):
self.plugin_dir = Path(plugin_dir)
self.tools = []
def load_plugins(self):
"""从插件目录加载所有工具"""
for file in self.plugin_dir.glob("*.py"):
if file.stem.startswith("_"):
continue
# 动态导入模块
module_name = f"plugins.{file.stem}"
module = importlib.import_module(module_name)
# 查找所有 @tool 装饰的函数
for attr_name in dir(module):
attr = getattr(module, attr_name)
if callable(attr) and hasattr(attr, "name"):
self.tools.append(attr)
return self.tools
# 项目结构
# plugins/
# ├── math_tools.py
# ├── search_tools.py
# └── file_tools.py
# 使用
loader = ToolLoader("plugins")
tools = loader.load_plugins()设计 3:条件工具加载
def get_tools_for_context(context: str, user_role: str) -> List:
"""根据上下文和用户角色返回工具"""
base_tools = [search_web, calculate]
if context == "customer_service":
tools = base_tools + [check_order, track_shipment]
if user_role == "admin":
tools.append(cancel_order) # 只有管理员可以取消订单
elif context == "data_analysis":
tools = base_tools + [query_database, generate_chart]
else:
tools = base_tools
return tools
# 使用
def agent_with_dynamic_tools(state: MessagesState):
context = state.get("context", "general")
user_role = state.get("user_role", "user")
tools = get_tools_for_context(context, user_role)
llm_with_tools = llm.bind_tools(tools)
response = llm_with_tools.invoke(state["messages"])
return {"messages": [response]}设计 4:工具版本管理
class VersionedTool:
"""支持版本管理的工具包装器"""
def __init__(self, name: str, func: callable, version: str):
self.name = name
self.func = tool(func)
self.version = version
def __call__(self, *args, **kwargs):
return self.func(*args, **kwargs)
class ToolManager:
def __init__(self):
self.tools: Dict[str, Dict[str, VersionedTool]] = {}
def register(self, name: str, func: callable, version: str = "1.0"):
if name not in self.tools:
self.tools[name] = {}
self.tools[name][version] = VersionedTool(name, func, version)
def get_tool(self, name: str, version: str = "latest"):
if version == "latest":
version = max(self.tools[name].keys())
return self.tools[name][version].func
# 使用
manager = ToolManager()
manager.register("search", search_web_v1, "1.0")
manager.register("search", search_web_v2, "2.0")
# 获取最新版本
tool = manager.get_tool("search", "latest")设计 5:工具组合
from langchain_core.tools import tool
def create_tool_pipeline(*tools):
"""将多个工具组合成一个工具"""
@tool
def pipeline_tool(input_data: str) -> str:
"""Execute a pipeline of tools."""
result = input_data
for t in tools:
result = t.invoke(result)
return result
return pipeline_tool
# 使用
fetch_tool = create_fetch_tool()
parse_tool = create_parse_tool()
summarize_tool = create_summarize_tool()
# 组合工具
research_tool = create_tool_pipeline(fetch_tool, parse_tool, summarize_tool)关键要点
- 注册表模式: 中心化管理所有工具
- 插件系统: 动态加载和卸载
- 条件加载: 根据上下文选择工具
- 版本管理: 支持工具演进
- 工具组合: 复杂功能的模块化
🎉 复习完成!
恭喜你完成了 Module-2 的全部 15 个复习问题!让我们回顾一下你已经掌握的知识:
📊 知识掌握度自测
| 类别 | 问题编号 | 掌握程度 |
|---|---|---|
| 基础理解 | 1-5 | ⬜⬜⬜⬜⬜ |
| 代码实现 | 6-10 | ⬜⬜⬜⬜⬜ |
| 架构设计 | 11-15 | ⬜⬜⬜⬜⬜ |
自测方法:
- ⬜ = 不理解
- ☑ = 基本理解
- ✅ = 完全掌握
🎯 下一步学习路径
完成 Module-2 后,你已经掌握了:
- ✅ LangGraph 的六大核心架构模式
- ✅ 状态管理和 Reducer 机制
- ✅ Router 和 Agent 的区别与应用
- ✅ 工具系统的设计与实现
- ✅ 记忆管理和状态持久化
- ✅ 生产环境部署流程
继续前进:
Module-3:高级状态管理
- 多状态模式(Multiple Schemas)
- 消息过滤和修剪
- 外部记忆系统集成
Module-4:人机协作
- 断点(Breakpoints)机制
- 状态编辑和人工反馈
- 时间旅行调试
Module-5:并行和子图
- Map-Reduce 模式
- 并行节点执行
- 子图复用
💪 实践建议
- 动手编码: 每个问题都尝试自己实现一遍
- 修改实验: 改变参数,观察行为变化
- 构建项目: 选择一个实际场景(如客服机器人)完整实现
- 阅读源码: 深入 LangGraph 源代码理解内部实现
- 社区交流: 在 LangChain Discord 或 GitHub 讨论问题
📚 参考资源
最后寄语
架构模式不是死记硬背的公式,而是解决问题的工具箱。真正的精通来自于:
- 理解每种模式的本质(为什么这样设计?)
- 认识每种模式的边界(什么场景不适用?)
- 掌握模式间的权衡(如何选择和组合?)
继续保持好奇心,勇于实践,你一定能成为 LangGraph 的专家!
—— 来自你的图灵奖获得者朋友
🎊 Module-2 复习完成!共计约 15,000 字深度问答,覆盖所有核心概念。