LangGraph Time Travel 详细解读
📚 概述
本文档详细解读 LangGraph 中的 Time Travel(时间旅行) 功能。这是一项强大的调试和状态管理能力,允许你查看、回放,甚至从图的历史状态中"分叉"出新的执行路径。通过 Time Travel,你可以像操作版本控制系统一样管理 Agent 的执行历史。
📚 术语表
| 术语名称 | LangGraph 定义和解读 | Python 定义和说明 | 重要程度 |
|---|---|---|---|
| Time Travel | 图历史状态的浏览、回放和分叉能力,允许从任意历史点重新执行或创建新分支 | 基于 checkpointer 保存的状态快照实现 | ⭐⭐⭐⭐⭐ |
| get_state_history() | 获取图完整执行历史的方法,返回按时间倒序排列的所有 StateSnapshot | list(graph.get_state_history(config)) | ⭐⭐⭐⭐⭐ |
| StateSnapshot | 状态快照对象,包含 values(状态数据)、config(含 checkpoint_id)、metadata 等 | LangGraph 返回的状态快照数据结构 | ⭐⭐⭐⭐⭐ |
| Checkpoint | 图执行过程中某个时刻的完整状态备份,由 checkpointer 自动在每个节点后保存 | 持久化的状态快照,包含唯一的 checkpoint_id | ⭐⭐⭐⭐⭐ |
| checkpoint_id | 检查点的唯一标识符,用于指定回放或分叉的起点 | 字符串类型,通常是 UUID 格式 | ⭐⭐⭐⭐⭐ |
| 回放(Replaying) | 从历史 checkpoint 重新执行图,不修改状态,用于重现执行过程 | graph.stream(None, checkpoint_config) | ⭐⭐⭐⭐⭐ |
| 分叉(Forking) | 从历史 checkpoint 创建新执行分支,修改状态后继续执行,探索替代路径 | update_state() + stream(None) 组合 | ⭐⭐⭐⭐⭐ |
| parent_config | StateSnapshot 的属性,指向父 checkpoint 的配置,用于追溯历史链 | 字典类型,包含父 checkpoint_id | ⭐⭐⭐⭐ |
| 消息覆盖机制 | 分叉时保持相同的 Message ID 实现消息替换,而非追加新消息 | 利用 add_messages reducer 的 ID 匹配逻辑 | ⭐⭐⭐⭐⭐ |
| 历史分支 | 分叉操作创建的新执行路径,与原始历史并存,互不影响 | 类似 Git 的分支概念 | ⭐⭐⭐⭐ |
| Thread 隔离 | 不同 thread_id 拥有独立的状态和历史,互不干扰 | 通过 configurable.thread_id 区分 | ⭐⭐⭐⭐⭐ |
| MemorySaver | 内存版 checkpointer,适合开发测试,数据不持久化 | from langgraph.checkpoint.memory import MemorySaver | ⭐⭐⭐⭐ |
🎯 核心概念
什么是 Time Travel?
Time Travel 是 LangGraph 提供的一种高级状态管理功能,包含三个核心能力:
浏览历史(Browsing History)
- 查看图执行过程中的所有历史状态
- 访问每个 checkpoint(检查点)的完整数据
- 了解图在每个步骤的状态快照
回放(Replaying)
- 从历史中的某个 checkpoint 重新执行图
- 用于复现问题或验证修复
- 不改变历史,只是重新运行
分叉(Forking)
- 从历史状态创建新的执行分支
- 修改历史状态并继续执行
- 探索"如果当时输入不同会怎样"的场景
为什么需要 Time Travel?
在开发 AI Agent 时,我们经常遇到这些需求:
- 调试问题:Agent 在第 5 步出错了,想回到第 3 步重新执行
- 探索假设:如果用户输入的是 "5 * 3" 而不是 "2 * 3",结果会怎样?
- 恢复执行:Agent 意外中断,想从上次的状态继续
- 测试修改:修改了 Agent 逻辑,想用历史数据测试新版本
Human-in-the-Loop 的三大应用
Time Travel 是 Human-in-the-Loop 模式的重要组成部分:
| 应用 | 说明 | 使用场景 |
|---|---|---|
| Approval(审批) | 在关键节点暂停,等待人类批准 | 执行敏感操作前确认 |
| Debugging(调试) | 回放历史状态,重现问题 | 排查 bug,验证修复 |
| Editing(编辑) | 修改状态,改变执行路径 | 纠正错误,探索可能性 |
🎭 实战案例:数学计算 Agent
我们将构建一个简单的数学计算 Agent,演示 Time Travel 的完整功能:
Agent 功能:
- 接收用户的数学问题(如 "Multiply 2 and 3")
- 调用工具执行计算
- 返回结果
Time Travel 演示:
- 浏览 Agent 的执行历史
- 回放某个历史状态
- 分叉历史状态并修改输入
系统架构图
用户输入:"Multiply 2 and 3"
↓
[assistant] 解析意图 → 调用 multiply tool
↓
[tools] 执行计算 → 返回结果
↓
[assistant] 生成回复 → "The result is 6"
↓
(END)
每一步都会保存 checkpoint,可以随时回溯!🔧 代码实现详解
1. 定义工具函数
def multiply(a: int, b: int) -> int:
"""Multiply a and b.
Args:
a: first int
b: second int
"""
return a * b
def add(a: int, b: int) -> int:
"""Adds a and b.
Args:
a: first int
b: second int
"""
return a + b
def divide(a: int, b: int) -> float:
"""Divide a by b.
Args:
a: first int
b: second int
"""
return a / b
tools = [add, multiply, divide]说明:
- 定义三个基本数学运算工具
- 使用标准的 docstring 格式描述工具功能
- LLM 会根据 docstring 理解工具的用途
2. 绑定工具到 LLM
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-5-nano")
llm_with_tools = llm.bind_tools(tools)LangChain 知识点:bind_tools
bind_tools() 将工具注册到 LLM,使其能够:
- 理解工具的功能
- 决定何时调用工具
- 生成工具调用的参数
3. 定义图结构
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import MessagesState, StateGraph, START, END
from langgraph.prebuilt import tools_condition, ToolNode
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
# 系统提示词
sys_msg = SystemMessage(content="You are a helpful assistant tasked with performing arithmetic on a set of inputs.")
# 定义 assistant 节点
def assistant(state: MessagesState):
return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}
# 创建图
builder = StateGraph(MessagesState)
# 添加节点
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
# 添加边
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
"assistant",
tools_condition, # 自动判断是否需要调用工具
)
builder.add_edge("tools", "assistant")
# 编译图(带 checkpointer)
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)关键概念解析:
MessagesState
这是 LangGraph 内置的状态类,专门用于管理对话消息:
class MessagesState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
# add_messages 是内置 reducer,自动合并消息MemorySaver
这是 LangGraph 的 checkpointer(检查点管理器):
checkpointer=MemorySaver()作用:
- 在每个节点执行后自动保存状态
- 存储在内存中(适合开发测试)
- 生产环境可以使用持久化的 checkpointer(如数据库)
重要: Time Travel 功能依赖 checkpointer!没有 checkpointer 就没有历史状态。
tools_condition
这是 LangGraph 预构建的条件函数:
tools_condition(state)功能:
- 检查最后一条消息是否包含工具调用
- 如果是工具调用 → 路由到 "tools" 节点
- 如果不是 → 路由到 END
4. 执行图
# 输入
initial_input = {"messages": HumanMessage(content="Multiply 2 and 3")}
# 线程配置(用于标识会话)
thread = {"configurable": {"thread_id": "1"}}
# 运行图
for event in graph.stream(initial_input, thread, stream_mode="values"):
event['messages'][-1].pretty_print()输出:
================================ Human Message =================================
Multiply 2 and 3
================================== Ai Message ==================================
Tool Calls:
multiply (call_ikJxMpb777bKMYgmM3d9mYjW)
Args:
a: 2
b: 3
================================= Tool Message =================================
Name: multiply
6
================================== Ai Message ==================================
The result of multiplying 2 and 3 is 6.执行流程:
- 用户输入 "Multiply 2 and 3"
- Assistant 节点识别需要调用
multiply工具 - Tools 节点执行
multiply(2, 3)返回6 - Assistant 节点生成最终回复
Python 知识点:thread_id
thread = {"configurable": {"thread_id": "1"}}thread_id用于唯一标识一个会话- 同一个
thread_id的所有执行共享历史状态 - 不同的
thread_id拥有独立的状态和历史
🔍 浏览历史(Browsing History)
查看当前状态
graph.get_state({'configurable': {'thread_id': '1'}})返回的 StateSnapshot:
StateSnapshot(
values={
'messages': [
HumanMessage(content='Multiply 2 and 3', id='4ee8c440-...'),
AIMessage(content='', tool_calls=[...], id='run-bc24d334-...'),
ToolMessage(content='6', name='multiply', id='1012611a-...'),
AIMessage(content='The result of multiplying 2 and 3 is 6.', id='run-b46f3fed-...')
]
},
next=(), # 下一个要执行的节点(空表示已完成)
config={
'configurable': {
'thread_id': '1',
'checkpoint_ns': '',
'checkpoint_id': '1ef6a440-ac9e-6024-8003-6fd8435c1d3b'
}
},
metadata={'source': 'loop', 'step': 3, ...},
created_at='2024-09-03T22:29:54.309727+00:00',
parent_config={...}, # 父 checkpoint
tasks=()
)StateSnapshot 字段解析:
| 字段 | 说明 | 用途 |
|---|---|---|
values | 当前状态的完整数据 | 查看状态内容 |
next | 下一个要执行的节点 | 判断执行进度 |
config | 配置信息(包含 checkpoint_id) | 用于回放或分叉 |
metadata | 元数据(步数、来源等) | 调试和追踪 |
created_at | 创建时间 | 时间线分析 |
parent_config | 父 checkpoint 的配置 | 追溯历史 |
tasks | 待执行的任务 | 查看中断状态 |
获取完整历史
all_states = [s for s in graph.get_state_history(thread)]
len(all_states) # 输出:5历史结构:
all_states[0] # 当前状态(最新)
all_states[1] # 倒数第 2 个状态
all_states[2] # 倒数第 3 个状态
...
all_states[-1] # 初始状态(最早)可视化历史:

Step 0 (all_states[-1]): 用户输入
↓
Step 1 (all_states[-2]): Assistant 生成工具调用
↓
Step 2 (all_states[-3]): Tools 执行计算
↓
Step 3 (all_states[-4]): Assistant 生成最终回复
↓
Step 4 (all_states[0]): 当前状态(完成)查看特定历史状态
# 查看第一次接收用户输入后的状态
to_replay = all_states[-2]
# 查看状态内容
to_replay.values
# 输出:{'messages': [HumanMessage(content='Multiply 2 and 3', ...)]}
# 查看下一个要执行的节点
to_replay.next
# 输出:('assistant',)
# 查看 checkpoint ID
to_replay.config
# 输出:{'configurable': {'thread_id': '1', 'checkpoint_id': '1ef6a440-...'}}🔄 回放(Replaying)

回放允许你从历史中的某个 checkpoint 重新执行图。
核心概念
回放 vs 继续执行:
| 操作 | 说明 | 状态 |
|---|---|---|
| 继续执行 | 从当前状态继续未完成的任务 | 使用当前 checkpoint |
| 回放 | 从历史状态重新执行已完成的任务 | 使用历史 checkpoint |
回放示例
# 获取要回放的历史状态
to_replay = all_states[-2]
# 从该 checkpoint 开始重新执行
for event in graph.stream(None, to_replay.config, stream_mode="values"):
event['messages'][-1].pretty_print()关键点:
- 第一个参数是
None:不提供新输入,使用 checkpoint 中保存的状态 - 第二个参数是
to_replay.config:包含checkpoint_id,指定从哪个历史状态开始 - 图会自动识别这是一个已执行过的 checkpoint,进行回放
输出:
================================ Human Message =================================
Multiply 2 and 3
================================== Ai Message ==================================
Tool Calls:
multiply (call_SABfB57CnDkMu9HJeUE0mvJ9)
Args:
a: 2
b: 3
================================= Tool Message =================================
Name: multiply
6
================================== Ai Message ==================================
The result of multiplying 2 and 3 is 6.注意: 由于 LLM 的随机性,回放的结果可能与原始执行略有不同(例如工具调用 ID 不同)。
🌿 分叉(Forking)

分叉是 Time Travel 最强大的功能:从历史状态创建新的执行分支。
核心概念
分叉 vs 回放:
| 操作 | 状态 | 历史 | 用途 |
|---|---|---|---|
| 回放 | 不修改状态 | 不改变历史 | 重现执行过程 |
| 分叉 | 修改状态 | 创建新分支 | 探索替代路径 |
分叉原理
原始历史:
┌─ Step 0: 用户输入 "2 * 3"
├─ Step 1: Assistant 调用工具
├─ Step 2: Tools 返回 6
└─ Step 3: Assistant 回复 "结果是 6"
分叉操作(在 Step 0 修改输入为 "5 * 3"):
┌─ Step 0: 用户输入 "2 * 3" (原始)
│
└─ Step 0': 用户输入 "5 * 3" (分叉) ← 从这里开始新的执行
├─ Step 1': Assistant 调用工具
├─ Step 2': Tools 返回 15
└─ Step 3': Assistant 回复 "结果是 15"分叉实现
步骤 1:选择要分叉的状态
to_fork = all_states[-2] # 选择用户输入后的状态
to_fork.values["messages"]
# 输出:[HumanMessage(content='Multiply 2 and 3', id='4ee8c440-...')]步骤 2:修改状态
fork_config = graph.update_state(
to_fork.config, # 指定要修改的 checkpoint
{"messages": [HumanMessage(
content='Multiply 5 and 3', # 修改后的内容
id=to_fork.values["messages"][0].id # ⭐ 保持相同的 ID!
)]},
)关键点:Message ID 的作用
id=to_fork.values["messages"][0].id为什么要保持相同的 ID?
LangGraph 的 add_messages reducer 有特殊逻辑:
- 如果 ID 相同:覆盖原有消息(更新)
- 如果 ID 不同:追加新消息(插入)
# 示例:
# 原始消息:[HumanMessage(content='Multiply 2 and 3', id='abc')]
# 覆盖(使用相同 ID):
update({"messages": [HumanMessage(content='Multiply 5 and 3', id='abc')]})
# 结果:[HumanMessage(content='Multiply 5 and 3', id='abc')]
# 追加(使用不同 ID):
update({"messages": [HumanMessage(content='Multiply 5 and 3', id='xyz')]})
# 结果:[
# HumanMessage(content='Multiply 2 and 3', id='abc'),
# HumanMessage(content='Multiply 5 and 3', id='xyz')
# ]步骤 3:从分叉的状态继续执行
for event in graph.stream(None, fork_config, stream_mode="values"):
event['messages'][-1].pretty_print()输出:
================================ Human Message =================================
Multiply 5 and 3
================================== Ai Message ==================================
Tool Calls:
multiply (call_KP2CVNMMUKMJAQuFmamHB21r)
Args:
a: 5
b: 3
================================= Tool Message =================================
Name: multiply
15
================================== Ai Message ==================================
The result of multiplying 5 and 3 is 15.步骤 4:验证分叉结果
# 查看当前状态
graph.get_state({'configurable': {'thread_id': '1'}})
# 查看历史
all_states = [state for state in graph.get_state_history(thread)]
all_states[0].values["messages"]
# 输出:[HumanMessage(content='Multiply 5 and 3', ...), ...]注意: 分叉会创建一个新的 checkpoint 分支,但不会删除原始历史。
🌐 使用 LangGraph API 进行 Time Travel
LangGraph 提供了远程 API,可以通过网络调用 Time Travel 功能。
启动 LangGraph 开发服务器
在项目目录下运行:
cd module-3/studio
langgraph dev输出:
- 🚀 API: http://127.0.0.1:2024
- 🎨 Studio UI: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024
- 📚 API Docs: http://127.0.0.1:2024/docs连接到 API
from langgraph_sdk import get_client
client = get_client(url="http://127.0.0.1:2024")通过 API 运行 Agent
from langchain_core.messages import HumanMessage
initial_input = {"messages": HumanMessage(content="Multiply 2 and 3")}
# 创建线程
thread = await client.threads.create()
# 流式运行
async for chunk in client.runs.stream(
thread["thread_id"],
assistant_id="agent", # 在 langgraph.json 中定义
input=initial_input,
stream_mode="updates",
):
if chunk.data:
assistant_node = chunk.data.get('assistant', {}).get('messages', [])
tool_node = chunk.data.get('tools', {}).get('messages', [])
if assistant_node:
print("--------------------Assistant Node--------------------")
print(assistant_node[-1])
elif tool_node:
print("--------------------Tools Node--------------------")
print(tool_node[-1])stream_mode 说明:
| 模式 | 输出内容 | 用途 |
|---|---|---|
"values" | 每个节点执行后的完整状态 | 查看状态变化 |
"updates" | 每个节点对状态的更新 | 查看增量变化 |
通过 API 回放
# 获取历史
states = await client.threads.get_history(thread['thread_id'])
to_replay = states[-2]
# 回放
async for chunk in client.runs.stream(
thread["thread_id"],
assistant_id="agent",
input=None, # 不提供新输入
stream_mode="values",
checkpoint_id=to_replay['checkpoint_id'] # 指定要回放的 checkpoint
):
print(f"Receiving new event of type: {chunk.event}...")
print(chunk.data)通过 API 分叉
# 获取要分叉的状态
states = await client.threads.get_history(thread['thread_id'])
to_fork = states[-2]
# 准备新输入
forked_input = {
"messages": HumanMessage(
content="Multiply 3 and 3",
id=to_fork['values']['messages'][0]['id'] # 保持相同 ID
)
}
# 更新状态(创建分叉)
forked_config = await client.threads.update_state(
thread["thread_id"],
forked_input,
checkpoint_id=to_fork['checkpoint_id']
)
# 从分叉状态继续执行
async for chunk in client.runs.stream(
thread["thread_id"],
assistant_id="agent",
input=None,
stream_mode="updates",
checkpoint_id=forked_config['checkpoint_id']
):
# 处理输出
...🎓 核心知识点总结
LangGraph 特有概念
1. Checkpointer(检查点管理器)
作用: 自动保存图的执行历史
from langgraph.checkpoint.memory import MemorySaver
graph = builder.compile(checkpointer=MemorySaver())类型:
MemorySaver:内存存储(开发测试)SqliteSaver:SQLite 数据库(持久化)- 自定义 checkpointer(生产环境)
保存时机:
- 每个节点执行后自动保存
- 图完成后保存最终状态
- 中断时保存中断状态
2. StateSnapshot
状态快照,包含某个时刻的完整状态信息:
class StateSnapshot:
values: dict # 状态数据
next: tuple # 下一个节点
config: dict # 配置(包含 checkpoint_id)
metadata: dict # 元数据
created_at: datetime # 创建时间
parent_config: dict # 父 checkpoint
tasks: tuple # 待执行任务3. Thread(线程)
用于标识和隔离不同的会话:
thread = {"configurable": {"thread_id": "user_123"}}特性:
- 每个 thread 有独立的状态和历史
- 相同 thread_id 共享历史
- 可用于多用户、多会话场景
4. get_state_history()
获取完整历史的方法:
all_states = list(graph.get_state_history(thread))
# all_states[0] # 最新状态
# all_states[-1] # 最早状态返回: 按时间倒序排列的 StateSnapshot 列表
5. update_state()
修改历史状态,创建分叉:
fork_config = graph.update_state(
checkpoint_config, # 要修改的 checkpoint
state_updates # 状态更新
)返回: 新分叉的 checkpoint 配置
Python 特有知识点
1. MessagesState
LangGraph 内置的消息状态类:
from langgraph.graph import MessagesState
# 等价于:
class MessagesState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]add_messages reducer:
- 自动合并消息列表
- 相同 ID 的消息会被覆盖
- 不同 ID 的消息会被追加
2. Message ID 机制
所有 LangChain 消息都有 id 字段:
msg = HumanMessage(content="Hello", id="abc123")用途:
- 唯一标识消息
- 用于消息去重
- 用于消息更新(分叉时覆盖消息)
自动生成: 如果不提供 ID,LangChain 会自动生成 UUID
3. 异步编程(async/await)
LangGraph API 使用异步接口:
# 异步函数定义
async def my_function():
result = await async_operation()
return result
# 在 Jupyter 中直接使用 await
thread = await client.threads.create()
# 在普通 Python 脚本中需要:
import asyncio
asyncio.run(my_function())异步迭代:
async for chunk in client.runs.stream(...):
process(chunk)💡 最佳实践
1. 何时使用 Time Travel?
✅ 适用场景:
- 调试复杂的 Agent 行为
- 复现用户报告的问题
- 探索不同输入的影响
- A/B 测试不同的 Agent 配置
- 恢复中断的执行
❌ 不适用场景:
- 生产环境的实时执行(性能开销)
- 简单的无状态操作
- 不需要历史追溯的场景
2. Checkpointer 选择
开发环境:
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver() # 简单快速,重启后丢失测试环境:
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("checkpoints.db") # 持久化到文件生产环境:
# 使用分布式数据库
# 例如:PostgreSQL、MongoDB 等3. Thread ID 命名策略
按用户:
thread = {"configurable": {"thread_id": f"user_{user_id}"}}按会话:
thread = {"configurable": {"thread_id": f"session_{session_id}"}}按任务:
thread = {"configurable": {"thread_id": f"task_{task_id}_{timestamp}"}}最佳实践:
- 使用有意义的 ID 便于调试
- 避免 ID 冲突
- 考虑添加时间戳或 UUID
4. 分叉时的注意事项
保持 Message ID:
# ✅ 正确:保持原始 ID 以覆盖消息
HumanMessage(content="New input", id=original_message.id)
# ❌ 错误:不提供 ID 会追加消息
HumanMessage(content="New input")验证分叉结果:
# 分叉后检查状态
fork_state = graph.get_state(fork_config)
print(fork_state.values) # 确认状态已更新清理旧分支:
- 分叉会创建新的历史分支
- 定期清理不需要的 checkpoint 以节省存储
5. 历史查询优化
限制历史数量:
# 只获取最近的 N 个状态
from itertools import islice
recent_states = list(islice(graph.get_state_history(thread), 10))按条件筛选:
# 只获取特定节点的状态
states_at_assistant = [
s for s in graph.get_state_history(thread)
if 'assistant' in s.metadata.get('writes', {})
]🚀 进阶技巧
1. 多分支探索
从同一个 checkpoint 创建多个分叉:
base_checkpoint = all_states[-2]
# 分叉 1:输入 "5 * 3"
fork1 = graph.update_state(
base_checkpoint.config,
{"messages": [HumanMessage(content="Multiply 5 and 3", id=msg_id)]}
)
# 分叉 2:输入 "10 * 3"
fork2 = graph.update_state(
base_checkpoint.config,
{"messages": [HumanMessage(content="Multiply 10 and 3", id=msg_id)]}
)
# 分别执行
for event in graph.stream(None, fork1):
# 处理分支 1
pass
for event in graph.stream(None, fork2):
# 处理分支 2
pass2. 时间线可视化
def visualize_timeline(states):
for i, state in enumerate(reversed(states)):
step = len(states) - i - 1
timestamp = state.created_at.strftime("%H:%M:%S")
next_node = state.next[0] if state.next else "END"
print(f"Step {step} [{timestamp}] → {next_node}")
print(f" Messages: {len(state.values.get('messages', []))}")
print()
visualize_timeline(all_states)3. 自动保存关键 Checkpoint
def save_important_checkpoints(thread):
checkpoints = []
for state in graph.get_state_history(thread):
# 保存所有用户输入的状态
if any(isinstance(m, HumanMessage) for m in state.values['messages']):
checkpoints.append({
'checkpoint_id': state.config['configurable']['checkpoint_id'],
'timestamp': state.created_at,
'messages': state.values['messages']
})
return checkpoints4. 条件回放
根据条件决定是否回放:
def replay_if_error(state):
# 如果发现错误,回放到上一个稳定状态
if "error" in state.metadata.get('writes', {}):
history = list(graph.get_state_history(thread))
for prev_state in history:
if prev_state.next == ('assistant',): # 找到上一个 assistant 节点
print(f"Error detected, replaying from checkpoint {prev_state.config['configurable']['checkpoint_id']}")
for event in graph.stream(None, prev_state.config):
# 回放
pass
break📊 Time Travel vs 其他调试方法
| 方法 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| Time Travel | 可回溯、可分叉、完整历史 | 需要 checkpointer、存储开销 | 复杂 Agent 调试 |
| 日志记录 | 简单、轻量 | 不可回放、无状态快照 | 简单调试、性能分析 |
| Breakpoint | 实时暂停、可修改 | 需要提前设置、无历史回溯 | 开发阶段调试 |
| 单元测试 | 自动化、可重复 | 无法复现实际执行 | 功能验证 |
🎯 实际应用案例
案例 1:客服 Agent 错误修复
场景: 客服 Agent 在第 3 轮对话时给出了错误答案
解决方案:
- 使用
thread_id获取该用户的历史 - 找到第 3 轮对话的 checkpoint
- 回放以重现问题
- 修改 Agent 逻辑
- 分叉重新执行验证修复
# 获取用户历史
thread = {"configurable": {"thread_id": f"user_{user_id}"}}
history = list(graph.get_state_history(thread))
# 找到第 3 轮对话
round_3 = [s for s in history if len(s.values['messages']) == 6][0] # 3 轮 = 6 条消息
# 回放重现问题
for event in graph.stream(None, round_3.config):
# 观察错误
pass
# 修复 Agent 后,分叉重新执行
fork = graph.update_state(round_3.config, {...})
for event in graph.stream(None, fork):
# 验证修复
pass案例 2:A/B 测试不同提示词
场景: 测试两个不同的系统提示词哪个效果更好
base_state = history[-1] # 用户输入后的状态
# 测试提示词 A
fork_a = graph.update_state(
base_state.config,
{"system_prompt": "You are a helpful assistant..."}
)
result_a = list(graph.stream(None, fork_a))
# 测试提示词 B
fork_b = graph.update_state(
base_state.config,
{"system_prompt": "You are a concise assistant..."}
)
result_b = list(graph.stream(None, fork_b))
# 比较结果
compare_results(result_a, result_b)案例 3:用户反馈修正
场景: 用户对 Agent 的回复不满意,想修改输入重新执行
# 用户最初的输入
original_input = "Multiply 2 and 3"
# Agent 给出了回复,用户不满意
# 用户修改输入为 "Multiply 2 and 3, then add 5"
# 获取原始输入的 checkpoint
history = list(graph.get_state_history(thread))
original_checkpoint = history[-2]
# 分叉并修改输入
fork = graph.update_state(
original_checkpoint.config,
{"messages": [HumanMessage(
content="Multiply 2 and 3, then add 5",
id=original_checkpoint.values['messages'][0].id
)]}
)
# 重新执行
for event in graph.stream(None, fork):
print(event)完整案例代码(可直接运行)
以下是一个完整的、可以直接在 Jupyter Notebook 中运行的代码示例,演示 Time Travel 的三大功能:
# ============================================================
# LangGraph Time Travel 完整示例
# 演示:浏览历史、回放、分叉
# ============================================================
# --------------------------
# 1. 导入必要的库
# --------------------------
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langgraph.graph import MessagesState, StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import tools_condition, ToolNode
from IPython.display import Image, display
# --------------------------
# 2. 定义工具
# --------------------------
def multiply(a: int, b: int) -> int:
"""Multiply a and b.
Args:
a: first int
b: second int
"""
return a * b
def add(a: int, b: int) -> int:
"""Adds a and b.
Args:
a: first int
b: second int
"""
return a + b
tools = [add, multiply]
llm = ChatOpenAI(model="gpt-4o-mini")
llm_with_tools = llm.bind_tools(tools)
# --------------------------
# 3. 定义节点
# --------------------------
sys_msg = SystemMessage(content="You are a helpful assistant tasked with performing arithmetic.")
def assistant(state: MessagesState):
"""助手节点"""
return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}
# --------------------------
# 4. 构建图(带 checkpointer)
# --------------------------
builder = StateGraph(MessagesState)
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "assistant")
builder.add_conditional_edges("assistant", tools_condition)
builder.add_edge("tools", "assistant")
# 关键:必须有 checkpointer 才能使用 Time Travel
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
# --------------------------
# 5. 可视化图结构
# --------------------------
print("📊 图结构可视化:")
display(Image(graph.get_graph().draw_mermaid_png()))
# --------------------------
# 6. 执行初始查询
# --------------------------
print("\n" + "=" * 60)
print("🚀 执行初始查询")
print("=" * 60)
thread = {"configurable": {"thread_id": "time-travel-demo"}}
initial_input = {"messages": [HumanMessage(content="Multiply 2 and 3")]}
print("\n执行: Multiply 2 and 3")
for event in graph.stream(initial_input, thread, stream_mode="values"):
event["messages"][-1].pretty_print()
# --------------------------
# 7. 浏览历史
# --------------------------
print("\n" + "=" * 60)
print("📜 浏览执行历史")
print("=" * 60)
all_states = list(graph.get_state_history(thread))
print(f"\n共有 {len(all_states)} 个历史状态(checkpoints):")
for i, state in enumerate(all_states):
step = state.metadata.get("step", "N/A")
next_node = state.next[0] if state.next else "END"
msg_count = len(state.values.get("messages", []))
checkpoint_id = state.config["configurable"]["checkpoint_id"][:8]
print(f" [{i}] Step {step}: → {next_node} | 消息数: {msg_count} | ID: {checkpoint_id}...")
# --------------------------
# 8. 查看特定历史状态
# --------------------------
print("\n" + "=" * 60)
print("🔍 查看特定历史状态")
print("=" * 60)
# 查看用户输入后、assistant 执行前的状态
to_inspect = all_states[-2] # 倒数第二个是初始输入后的状态
print(f"\n选择的历史状态 (Step {to_inspect.metadata.get('step', 'N/A')}):")
print(f" 下一个节点: {to_inspect.next}")
print(f" 消息内容: {to_inspect.values['messages'][0].content}")
print(f" Checkpoint ID: {to_inspect.config['configurable']['checkpoint_id'][:20]}...")
# --------------------------
# 9. 回放(Replaying)
# --------------------------
print("\n" + "=" * 60)
print("🔄 回放历史状态")
print("=" * 60)
to_replay = all_states[-2]
print(f"\n从 Step {to_replay.metadata.get('step', 'N/A')} 回放:")
print("(注意: 回放不修改状态,只是重新执行)")
for event in graph.stream(None, to_replay.config, stream_mode="values"):
event["messages"][-1].pretty_print()
# --------------------------
# 10. 分叉(Forking)
# --------------------------
print("\n" + "=" * 60)
print("🌿 分叉历史状态")
print("=" * 60)
to_fork = all_states[-2]
original_msg = to_fork.values["messages"][0]
print(f"\n原始消息: {original_msg.content}")
print(f"原始消息 ID: {original_msg.id}")
# 关键:保持相同的 ID 以覆盖消息
new_input = HumanMessage(
content="Multiply 5 and 5",
id=original_msg.id # 保持相同 ID!
)
print(f"\n修改为: {new_input.content}")
# 创建分叉
fork_config = graph.update_state(
to_fork.config,
{"messages": [new_input]}
)
print(f"\n新的 Checkpoint ID: {fork_config['configurable']['checkpoint_id'][:20]}...")
# 从分叉继续执行
print("\n从分叉状态继续执行:")
for event in graph.stream(None, fork_config, stream_mode="values"):
event["messages"][-1].pretty_print()
# --------------------------
# 11. 验证分叉结果
# --------------------------
print("\n" + "=" * 60)
print("✅ 验证分叉结果")
print("=" * 60)
# 获取当前状态
current_state = graph.get_state(thread)
print(f"\n当前最新消息:")
print(f" {current_state.values['messages'][-1].content}")
# 检查历史是否包含分叉
new_history = list(graph.get_state_history(thread))
print(f"\n历史状态数: {len(new_history)} (原: {len(all_states)})")
# --------------------------
# 12. 比较原始和分叉的结果
# --------------------------
print("\n" + "=" * 60)
print("📊 对比:原始 vs 分叉")
print("=" * 60)
# 找到原始执行的最终结果
original_final = all_states[0] # 原始执行的最终状态
print(f"\n原始执行:")
print(f" 输入: Multiply 2 and 3")
for msg in original_final.values["messages"]:
if isinstance(msg, AIMessage) and msg.content:
print(f" 结果: {msg.content}")
break
# 分叉后的结果
print(f"\n分叉执行:")
print(f" 输入: Multiply 5 and 5")
print(f" 结果: {current_state.values['messages'][-1].content}")
# --------------------------
# 13. 多分支探索示例
# --------------------------
print("\n" + "=" * 60)
print("🔀 多分支探索")
print("=" * 60)
# 从同一个基础状态创建多个分叉
base_state = all_states[-2]
base_msg_id = base_state.values["messages"][0].id
branches = [
("3 * 3", "Multiply 3 and 3"),
("7 * 7", "Multiply 7 and 7"),
("10 * 10", "Multiply 10 and 10"),
]
results = {}
for name, content in branches:
# 创建分叉
fork = graph.update_state(
base_state.config,
{"messages": [HumanMessage(content=content, id=base_msg_id)]}
)
# 执行并收集结果
final_msg = None
for event in graph.stream(None, fork, stream_mode="values"):
final_msg = event["messages"][-1]
if final_msg and isinstance(final_msg, AIMessage):
results[name] = final_msg.content
print("\n多分支执行结果:")
for name, result in results.items():
print(f" {name}: {result[:50]}...")
print("\n✨ Time Travel 演示完成!")运行结果示例:
📊 图结构可视化:
[显示图:START → assistant → tools → assistant → END]
============================================================
🚀 执行初始查询
============================================================
执行: Multiply 2 and 3
================================ Human Message =================================
Multiply 2 and 3
================================== Ai Message ==================================
Tool Calls:
multiply (call_xxx)
Args:
a: 2
b: 3
================================= Tool Message =================================
Name: multiply
6
================================== Ai Message ==================================
The result of multiplying 2 and 3 is 6.
============================================================
📜 浏览执行历史
============================================================
共有 5 个历史状态(checkpoints):
[0] Step 4: → END | 消息数: 4 | ID: abc12345...
[1] Step 3: → assistant | 消息数: 3 | ID: def67890...
[2] Step 2: → tools | 消息数: 2 | ID: ghi11111...
[3] Step 1: → assistant | 消息数: 1 | ID: jkl22222...
[4] Step 0: → assistant | 消息数: 1 | ID: mno33333...
============================================================
🌿 分叉历史状态
============================================================
原始消息: Multiply 2 and 3
修改为: Multiply 5 and 5
从分叉状态继续执行:
================================ Human Message =================================
Multiply 5 and 5
================================== Ai Message ==================================
Tool Calls:
multiply (call_yyy)
Args:
a: 5
b: 5
================================= Tool Message =================================
Name: multiply
25
================================== Ai Message ==================================
The result of multiplying 5 and 5 is 25.代码要点说明:
| 要点 | 说明 |
|---|---|
checkpointer=MemorySaver() | 必需!Time Travel 依赖 checkpointer 保存历史 |
graph.get_state_history(thread) | 获取完整历史(按时间倒序) |
graph.stream(None, checkpoint.config) | 从指定 checkpoint 回放或继续执行 |
graph.update_state(config, updates) | 修改历史状态创建分叉 |
| 保持 Message ID | 分叉时使用相同 ID 覆盖消息,而非追加 |
Time Travel 三大功能对比:
| 功能 | 操作 | 结果 |
|---|---|---|
| 浏览历史 | get_state_history(thread) | 查看所有 checkpoint |
| 回放 | stream(None, checkpoint.config) | 重新执行(不修改) |
| 分叉 | update_state() + stream() | 修改状态并执行 |
🔍 常见问题
Q1: 没有 checkpointer 可以使用 Time Travel 吗?
不可以。 Time Travel 完全依赖 checkpointer 保存的历史状态。
# ❌ 没有 checkpointer - 无法使用 Time Travel
graph = builder.compile()
# ✅ 有 checkpointer - 可以使用 Time Travel
graph = builder.compile(checkpointer=MemorySaver())生成的流程图:

Q2: 回放会产生副作用吗?
可能会。 如果节点中有副作用操作(如写入数据库、调用外部 API),回放时会再次执行这些操作。
解决方案:
- 在开发环境使用模拟数据
- 添加"回放模式"标志,跳过副作用操作
- 使用幂等操作
def my_node(state):
if state.get("replay_mode"):
return cached_result # 跳过实际操作
else:
return perform_real_operation() # 正常执行Q3: 分叉会影响原始历史吗?
不会。 分叉创建一个新的 checkpoint 分支,原始历史保持不变。
原始历史:A → B → C → D
↓ (分叉)
A → B' → C' → D'
两条分支独立存在,互不影响Q4: 如何清理历史 checkpoint?
目前 LangGraph 不提供自动清理功能,需要手动管理:
# 对于 SqliteSaver
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
# 手动删除数据库文件
import os
os.remove("checkpoints.db")最佳实践:
- 为不同任务使用不同的 thread_id
- 定期清理旧的 thread
- 在测试时使用 MemorySaver(自动清理)
Q5: 可以跨图共享 checkpoint 吗?
不可以。 Checkpoint 与特定的图结构绑定。如果修改了图结构(如添加节点、修改边),旧的 checkpoint 可能无法正确回放。
建议:
- 使用版本化的图(graph_v1, graph_v2)
- 为每个版本使用独立的 checkpointer
- 迁移重要的历史数据时要谨慎
📖 扩展阅读
🎊 总结
Time Travel 是 LangGraph 提供的强大调试和状态管理工具。通过 checkpointer 自动保存的历史状态,我们可以:
- 浏览历史:查看图执行的每一个步骤
- 回放:重新执行历史状态,重现问题
- 分叉:修改历史状态,探索不同的执行路径
这三大功能让我们能够像使用版本控制系统一样管理 Agent 的执行历史,极大地提升了开发和调试效率。
核心要点:
- Time Travel 依赖 checkpointer
- Thread ID 用于标识和隔离会话
- Message ID 用于消息覆盖(分叉时必需)
- 分叉不影响原始历史
- 适合复杂 Agent 的调试和测试
掌握 Time Travel,你就掌握了 LangGraph 开发的"时光机"!