人机协作与时间回溯
在构建 AI 应用时,有两个场景非常常见:
- 让人类参与决策:AI 给出建议,但关键决定由人来做
- 回到过去重来:不满意当前结果,回到之前的状态重新尝试
LangGraph 1.x 提供了两个强大的特性来支持这些场景:Human-in-the-Loop (HITL) 和 Time Travel。
什么是 Human-in-the-Loop?
通俗解释
想象你在使用一个 AI 翻译工具:
- 没有 HITL:AI 翻译完就直接发送,你没有机会检查
- 有 HITL:AI 翻译完后会暂停,问你"确认发送吗?",你说"好"才继续
这就是 Human-in-the-Loop —— 让人类在关键节点介入决策。
为什么需要 HITL?
| 场景 | 没有 HITL 的风险 | 有 HITL 的保障 |
|---|---|---|
| AI 客服 | 自动发送不当回复 | 敏感回复需人工确认 |
| 自动交易 | 执行错误的买卖指令 | 大额交易需人工批准 |
| 内容审核 | 误删或漏删内容 | 边界案例由人工判断 |
| 代码生成 | 直接执行危险代码 | 用户确认后才执行 |
LangGraph 的 HITL 实现
LangGraph 1.x 使用 interrupt() 和 Command 来实现 HITL:
from langgraph.types import interrupt, Command
def review_node(state):
# 暂停执行,等待人工决策
decision = interrupt({
"question": "是否批准这个操作?",
"data": state["pending_action"]
})
if decision:
return Command(goto="execute")
else:
return Command(goto="cancel")工作原理:
interrupt()会暂停 Graph 执行- 返回一个包含
__interrupt__的结果给调用者 - 调用者收集人工决策后,用
Command(resume=value)恢复执行 interrupt()返回resume的值,继续执行后续逻辑
HITL 实现的关键要点
在实际项目中实现人机协作时,需要注意以下几点:
| 要点 | 说明 |
|---|---|
| 必须配置 Checkpointer | 否则无法保存中断时的任务状态 |
| 必须指定 thread_id | 用于后续恢复任务时找到正确的会话 |
| 中断时间有限制 | 中断时间过长可能导致无法恢复(取决于 Checkpointer 配置) |
| resume 值可以是任意类型 | 可以是 True/False,也可以是字典等复杂数据 |
典型流程图
┌─────────────┐
│ 调用 LLM │
└──────┬──────┘
│
▼
┌─────────────┐
│ 人工审核点 │
│ interrupt() │
└──────┬──────┘
│
┌────────────┼────────────┐
│ │ │
▼ │ ▼
┌─────────┐ │ ┌─────────┐
│ 批准 │ │ │ 拒绝 │
│ goto=A │ │ │ goto=B │
└────┬────┘ │ └────┬────┘
│ │ │
▼ │ ▼
[继续执行...] │ [终止或其他处理]案例一:智能翻译审核系统
场景描述
构建一个翻译系统,普通内容直接输出,但涉及法律、医疗等敏感内容时,需要人工审核。
工作流程
用户输入 → 翻译 → 检测敏感词
↓
是敏感内容?
/ \
是 否
↓ ↓
[人工审核] 直接输出
/ \
通过 拒绝
↓ ↓
输出 终止核心代码解析
1. 定义状态
from typing import TypedDict
class TranslationState(TypedDict):
source_text: str # 原文
source_lang: str # 源语言
target_lang: str # 目标语言
translated_text: str # 翻译结果
is_sensitive: bool # 是否敏感
final_output: str # 最终输出2. 翻译节点
def translate_text(state: TranslationState):
# 调用 LLM 翻译
response = llm.invoke(f"翻译: {state['source_text']}")
# 检测敏感词
sensitive_words = ["法律", "医疗", "合同"]
is_sensitive = any(w in state['source_text'] for w in sensitive_words)
return {
"translated_text": response.content,
"is_sensitive": is_sensitive
}3. 人工审核节点(HITL 核心)
from langgraph.types import interrupt, Command
from typing import Literal
def human_review(state: TranslationState) -> Command[Literal["output", "__end__"]]:
# 使用 interrupt 暂停,等待人工决策
approved = interrupt({
"message": "请审核翻译结果",
"original": state["source_text"],
"translated": state["translated_text"]
})
if approved:
return Command(goto="output")
else:
return Command(goto=END, update={"final_output": "翻译已被拒绝"})小白提示:
interrupt()就像按下"暂停键",Graph 会停在这里- 返回值
approved来自后续的Command(resume=True/False)Command(goto=...)决定下一步去哪个节点
4. 构建 Graph
from langgraph.graph import StateGraph
from langgraph.checkpoint.memory import InMemorySaver
builder = StateGraph(TranslationState)
builder.add_node("translate", translate_text)
builder.add_node("human_review", human_review)
builder.add_node("output", output_result)
builder.add_edge(START, "translate")
builder.add_conditional_edges("translate", should_review)
builder.add_edge("output", END)
# 必须使用 checkpointer 才能支持 interrupt
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)5. 使用示例
config = {"configurable": {"thread_id": "translation-001"}}
# 第一步:提交翻译(敏感内容会暂停)
result = graph.invoke({
"source_text": "根据合同法律条款...",
"source_lang": "中文",
"target_lang": "英文"
}, config=config)
print(result) # 包含 __interrupt__,说明在等待审核
# 第二步:人工批准
final_result = graph.invoke(Command(resume=True), config=config)
print(final_result["final_output"]) # 输出翻译结果
# 或者拒绝
# final_result = graph.invoke(Command(resume=False), config=config)什么是 Time Travel?
通俗解释
想象你在玩一个剧情游戏:
- 没有 Time Travel:选错了对话选项,只能重新开始
- 有 Time Travel:可以"读档",回到之前的存档点重新选择
LangGraph 的 Time Travel 就是这个"存档读档"功能 —— 可以回溯到任意历史状态,从那里重新执行。
为什么需要 Time Travel?
| 场景 | 价值 |
|---|---|
| 调试 | 发现问题后,回到问题发生前的状态分析原因 |
| 探索 | 从同一起点尝试不同的路径,对比结果 |
| A/B 测试 | 保持相同的前序状态,测试不同的后续处理 |
| 容错 | 某一步出错后,回退到上一步重试 |
LangGraph 的 Time Travel 实现
LangGraph 通过 Checkpointer 自动保存每一步的状态快照:
# 获取所有历史状态
states = list(graph.get_state_history(config))
for state in states:
print(f"Checkpoint: {state.config['configurable']['checkpoint_id']}")
print(f"Next node: {state.next}")
print(f"Values: {state.values}")工作原理:
- 每次节点执行后,Checkpointer 自动保存状态快照
- 每个快照有唯一的
checkpoint_id - 指定
checkpoint_id可以从该快照恢复执行
Time Travel 实现的三个关键步骤
步骤一:执行并记录
import uuid
# 生成唯一的会话 ID
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
# 正常执行 Graph
result = graph.invoke(initial_input, config)要点:每次执行后,每个节点完成时都会自动生成一个 checkpoint。
步骤二:查看历史检查点
# 获取所有 checkpoint
states = list(graph.get_state_history(config))
for i, state in enumerate(states):
print(f"--- Checkpoint {i} ---")
print(f"下一步: {state.next}") # 该检查点之后要执行的节点
print(f"ID: {state.config['configurable']['checkpoint_id']}")
print(f"状态值: {state.values}")输出示例:
--- Checkpoint 0 ---
下一步: () # 空元组表示已完成
ID: abc-123
--- Checkpoint 1 ---
下一步: ('ending',) # 下一步是 ending 节点
ID: abc-122
--- Checkpoint 2 ---
下一步: ('development',) # 下一步是 development 节点
ID: abc-121步骤三:回溯并重新执行
# 选择要回溯的检查点
target_checkpoint = states[2] # 假设我们要从 development 重新开始
# 可选:修改状态再重新执行
new_config = graph.update_state(
target_checkpoint.config,
values={"author": "鲁迅"} # 修改状态中的某个值
)
# 从该检查点继续执行(传入 None 表示使用检查点的状态)
new_result = graph.invoke(None, new_config)update_state 的妙用
update_state 方法可以在回溯前修改状态,这在以下场景非常有用:
| 场景 | 做法 |
|---|---|
| 纠正错误 | 发现某步骤的输出有误,回溯后修正 |
| A/B 测试 | 保持前序状态,测试不同的后续参数 |
| 手动干预 | 人工修改中间结果,然后继续执行 |
| 调试 | 注入特定值来测试边界条件 |
案例二:故事创作助手
场景描述
构建一个故事创作助手,生成"开头 → 发展 → 结局"。如果对结局不满意,可以"时间回溯"到开头,重新生成不同的发展和结局。
工作流程
主题输入 → 生成开头 → 生成发展 → 生成结局
↑______________|
(Time Travel 回溯)核心代码解析
1. 定义状态
from typing import TypedDict
from typing_extensions import NotRequired
class StoryState(TypedDict):
theme: str # 故事主题
opening: NotRequired[str] # 开头
development: NotRequired[str] # 发展
ending: NotRequired[str] # 结局小白提示:
NotRequired表示这个字段是可选的。故事刚开始时只有 theme,其他字段会逐步填充。
2. 故事节点
def generate_opening(state: StoryState):
prompt = f"根据主题「{state['theme']}」写一个故事开头,50字以内"
response = llm.invoke(prompt)
return {"opening": response.content}
def generate_development(state: StoryState):
prompt = f"故事开头:{state['opening']}\n请续写发展,80字以内"
response = llm.invoke(prompt)
return {"development": response.content}
def generate_ending(state: StoryState):
prompt = f"开头:{state['opening']}\n发展:{state['development']}\n写结局"
response = llm.invoke(prompt)
return {"ending": response.content}3. 构建 Graph
builder = StateGraph(StoryState)
builder.add_node("opening", generate_opening)
builder.add_node("development", generate_development)
builder.add_node("ending", generate_ending)
builder.add_edge(START, "opening")
builder.add_edge("opening", "development")
builder.add_edge("development", "ending")
builder.add_edge("ending", END)
# Time Travel 必须使用 checkpointer
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)4. 第一次创作
import uuid
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
story = graph.invoke({"theme": "程序员发现时间旅行秘密"}, config=config)
print(f"开头: {story['opening']}")
print(f"发展: {story['development']}")
print(f"结局: {story['ending']}")5. 查看 Checkpoints
states = list(graph.get_state_history(config))
for i, state in enumerate(states):
print(f"--- Checkpoint {i} ---")
print(f"Next: {state.next}")
print(f"ID: {state.config['configurable']['checkpoint_id']}")输出示例:
--- Checkpoint 0 ---
Next: () # 已完成
ID: abc-123
--- Checkpoint 1 ---
Next: ('ending',) # 下一步是 ending
ID: abc-122
--- Checkpoint 2 ---
Next: ('development',) # 下一步是 development ← 我们要回到这里
ID: abc-121
--- Checkpoint 3 ---
Next: ('opening',)
ID: abc-1206. Time Travel:重新生成
# 找到开头完成后的 checkpoint
target_state = None
for state in states:
if state.next == ('development',):
target_state = state
break
# 创建回溯配置
time_travel_config = {
"configurable": {
"thread_id": target_state.config["configurable"]["thread_id"],
"checkpoint_id": target_state.config["configurable"]["checkpoint_id"],
}
}
# 从该 checkpoint 重新执行
new_story = graph.invoke(None, config=time_travel_config)
print(f"保留的开头: {new_story['opening']}") # 和之前一样
print(f"新的发展: {new_story['development']}") # 重新生成
print(f"新的结局: {new_story['ending']}") # 重新生成关键点:
invoke(None, config)表示"从指定的 checkpoint 继续执行"- 开头保持不变(因为是从开头完成后的状态开始)
- 发展和结局会重新生成(LLM 的随机性会产生不同结果)
interrupt() 详解
基本语法
from langgraph.types import interrupt
def node_with_interrupt(state):
# interrupt 的参数会作为 __interrupt__ 返回给调用者
result = interrupt({
"question": "请选择一个选项",
"options": ["A", "B", "C"]
})
# result 是用户通过 Command(resume=value) 传入的值
return {"user_choice": result}完整流程
1. graph.invoke(inputs, config)
↓
2. 执行到 interrupt()
↓
3. 返回 {"__interrupt__": [...], ...other_state}
↓
4. 调用者收集用户输入
↓
5. graph.invoke(Command(resume=user_input), config)
↓
6. interrupt() 返回 user_input,继续执行常见用法
1. 简单确认
confirmed = interrupt({"message": "确认执行?"})
if confirmed:
# 执行操作2. 多选项
choice = interrupt({
"question": "选择处理方式",
"options": {"A": "重试", "B": "跳过", "C": "终止"}
})3. 收集额外输入
extra_info = interrupt({
"message": "请提供更多信息",
"current_state": state
})
# extra_info 可以是任意类型的数据Command 详解
基本用法
from langgraph.types import Command
from langgraph.constants import END
# 1. 恢复执行并传值
Command(resume=True)
Command(resume={"name": "Alice", "age": 25})
# 2. 指定下一个节点
Command(goto="next_node")
Command(goto=END) # 结束执行
# 3. 更新状态
Command(update={"status": "approved"})
# 4. 组合使用
Command(goto="process", update={"approved": True})在节点中使用
def decision_node(state) -> Command[Literal["approve", "reject", "__end__"]]:
decision = interrupt({"question": "请选择"})
if decision == "approve":
return Command(goto="approve", update={"status": "approved"})
elif decision == "reject":
return Command(goto="reject", update={"status": "rejected"})
else:
return Command(goto=END)类型提示:
Command[Literal["a", "b"]]告诉 LangGraph 这个节点可能跳转到哪些节点,用于构建正确的图结构。
Checkpointer 选项
1. InMemorySaver(内存)
from langgraph.checkpoint.memory import InMemorySaver
checkpointer = InMemorySaver()- 优点:简单快速
- 缺点:程序重启后丢失
- 适用:开发测试
2. SqliteSaver(SQLite)
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")- 优点:持久化到文件
- 缺点:单机使用
- 适用:单用户应用
3. PostgresSaver(PostgreSQL)
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/db"
)- 优点:生产级、支持多用户
- 缺点:需要数据库
- 适用:生产环境
本章总结
Human-in-the-Loop (HITL)
| 概念 | 作用 |
|---|---|
interrupt(value) | 暂停执行,返回 value 给调用者 |
Command(resume=x) | 恢复执行,x 作为 interrupt 返回值 |
Command(goto="node") | 指定下一步执行的节点 |
Command(update={...}) | 更新状态 |
适用场景:人工审核、敏感操作确认、关键决策点
Time Travel
| 概念 | 作用 |
|---|---|
get_state_history(config) | 获取所有历史 checkpoint |
checkpoint_id | checkpoint 的唯一标识 |
invoke(None, config) | 从指定 checkpoint 继续执行 |
适用场景:调试回溯、A/B 测试、探索不同路径
关键依赖
两个特性都必须配置 Checkpointer:
from langgraph.checkpoint.memory import InMemorySaver
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)没有 Checkpointer:
interrupt()无法暂停和恢复- 无法保存历史状态,Time Travel 不可用
思考题
- 如果一个 Graph 有多个
interrupt()节点,执行顺序是怎样的? - Time Travel 回溯后,原来的历史状态会被覆盖吗?
- 如何实现"审核失败后修改内容,重新提交审核"的流程?
- Checkpointer 会保存哪些数据?是否包含 LLM 的响应?
下一步
掌握了 HITL 和 Time Travel 后,你已经可以构建可靠、可控、可调试的 AI 应用了。建议:
- 尝试在自己的项目中加入人工审核节点
- 使用 Time Travel 调试复杂的多步骤流程
- 探索生产环境的 Checkpointer 配置(PostgreSQL)
- 结合 Stream 实现实时的审核界面