6.3 Double Texting - 详细解读
一、概述
1.1 本节简介
在真实的生产环境中,用户经常会快速连续发送多条消息,而不是耐心等待 AI 回复完成。这种行为被称为 Double Texting(双击发送/连发消息)。
典型场景:
用户:"帮我添加一个待办事项..."
↓ (AI 正在处理)
用户:"哦,等等,还要添加另一个..."
↓ (第一个还没完成!)
用户:"算了,改成..."如果系统不能优雅地处理这种情况,会导致:
- 请求冲突
- 数据不一致
- 用户体验差
- 系统崩溃
1.2 为什么 Double Texting 是个问题?
问题的本质:并发访问同一个线程
Thread (线程)
↓
Run 1 正在执行
↓ (修改状态)
Run 2 也想开始
↓ (也要修改状态)
冲突!可能的后果:
- 数据竞争(Race Condition)
- 状态不一致
- 部分更新丢失
- 执行顺序混乱
1.3 四种处理策略
LangGraph Platform 提供了四种策略来处理 double texting:
处理策略
├── 1. Reject(拒绝)
│ └── 拒绝新请求,返回错误
│
├── 2. Enqueue(排队)
│ └── 将新请求加入队列,顺序执行
│
├── 3. Interrupt(中断)
│ └── 中断当前运行,保存进度,执行新请求
│
└── 4. Rollback(回滚)
└── 删除第一个运行,只执行第二个1.4 策略对比速览
| 策略 | 第一个请求 | 第二个请求 | 用户看到 | 适用场景 |
|---|---|---|---|---|
| Reject | ✅ 完成 | ❌ 被拒绝 | 只有第一个结果 | 严格顺序控制 |
| Enqueue | ✅ 完成 | ✅ 完成 | 两个结果都有 | 所有请求都重要 |
| Interrupt | ⚠️ 中断但保存 | ✅ 完成 | 第二个结果 + 第一个历史 | 用户纠正/补充 |
| Rollback | ❌ 删除 | ✅ 完成 | 只有第二个结果 | 用户完全改变主意 |
1.5 学习目标
通过本节学习,你将掌握:
理解 Double Texting 问题
- 什么是 double texting
- 为什么需要处理
- 实际应用场景
四种策略的使用
- Reject:如何拒绝并发请求
- Enqueue:如何排队处理
- Interrupt:如何中断和继续
- Rollback:如何回滚重来
策略选择
- 不同场景选择不同策略
- 权衡利弊
- 最佳实践
📚 术语表
| 术语名称 | LangGraph 定义和解读 | Python 定义和说明 | 重要程度 |
|---|---|---|---|
| Double Texting | 用户快速连续发送多条消息,第二条请求在第一条未完成时到达,导致并发访问同一线程的问题。 | 生产环境常见的并发场景,如果处理不当会导致数据竞争、状态不一致等问题。 | ⭐⭐⭐⭐⭐ |
| multitask_strategy | 处理并发请求的策略参数,在 runs.create() 中指定,包括 reject/enqueue/interrupt/rollback 四种。 | 字符串参数,默认为 None(允许并发),设置策略后控制第二个请求的行为。 | ⭐⭐⭐⭐⭐ |
| Reject 策略 | 拒绝新的并发请求,返回 HTTP 409 Conflict 错误,直到当前运行完成。 | 使用 multitask_strategy="reject",适合严格顺序控制和资源受限场景,需客户端处理 409 错误并重试。 | ⭐⭐⭐⭐⭐ |
| Enqueue 策略 | 将新请求加入队列,当前运行完成后自动按顺序执行,所有请求都会被处理。 | 使用 multitask_strategy="enqueue",两个运行都完成,历史包含两个完整对话,适合批量操作。 | ⭐⭐⭐⭐⭐ |
| Interrupt 策略 | 中断当前运行但保存已完成的工作和输入消息,立即执行新请求。 | 使用 multitask_strategy="interrupt",第一个运行状态变为 interrupted,适合用户纠正/补充。 | ⭐⭐⭐⭐⭐ |
| Rollback 策略 | 完全删除第一个运行(包括输入消息),只执行第二个请求,就像第一个从未发生。 | 使用 multitask_strategy="rollback",第一个运行无法查询(404),历史最干净,适合用户完全改变主意。 | ⭐⭐⭐⭐⭐ |
| Race Condition | 数据竞争,多个运行同时修改同一线程状态时,可能导致部分更新丢失或状态不一致。 | 并发编程中的经典问题,LangGraph 通过 multitask_strategy 提供四种解决方案。 | ⭐⭐⭐⭐ |
| HTTP 409 Conflict | 表示资源冲突的 HTTP 状态码,Reject 策略会返回此错误,表示线程正忙。 | 客户端应捕获此异常 httpx.HTTPStatusError,实现等待重试或提示用户。 | ⭐⭐⭐⭐ |
| interrupted 状态 | 运行被中断的特殊状态,表示运行未完成但工作已保存,可通过 runs.get() 查询。 | 与 success/error/pending 不同,只在 Interrupt 策略中出现,表示部分执行。 | ⭐⭐⭐⭐ |
| Queue Length | 等待执行的运行数量,队列过长时可能需要动态调整策略或限流。 | 通过 runs.list() 获取 pending 状态的运行数量,用于实现自适应策略。 | ⭐⭐⭐ |
| Idempotency | 幂等性,操作可以安全重复执行而不产生副作用,对于有副作用的操作应避免使用 Interrupt/Rollback。 | 设计原则,确保操作(如发送邮件、扣款)只执行一次,或多次执行结果相同。 | ⭐⭐⭐⭐ |
| Exponential Backoff | 指数退避,重试时等待时间指数增长(1秒、2秒、4秒...),避免频繁重试造成资源浪费。 | Python 实现:await asyncio.sleep(2 ** attempt),常用于 Reject 策略的重试逻辑。 | ⭐⭐⭐ |
二、环境准备
2.1 安装 SDK
%%capture --no-stderr
%pip install -U langgraph_sdk2.2 连接到部署
from langgraph_sdk import get_client
url_for_cli_deployment = "http://localhost:8123"
client = get_client(url=url_for_cli_deployment)前提条件:
- 部署已启动(
docker compose up) task_maistro图已部署- API 可访问
三、策略 1:Reject(拒绝)
3.1 概念
Reject 策略:拒绝任何新的并发请求,直到当前运行完成。
用户请求 1
↓
Run 1 开始执行
↓ (执行中...)
用户请求 2
↓
❌ 拒绝!返回 409 Conflict
↓
Run 1 继续执行
↓
Run 1 完成
↓
现在可以接受新请求3.2 实现代码
import httpx
from langchain_core.messages import HumanMessage
# 创建线程
thread = await client.threads.create()
# 准备两个请求
user_input_1 = "Add a ToDo to follow-up with DI Repairs."
user_input_2 = "Add a ToDo to mount dresser to the wall."
config = {"configurable": {"user_id": "Test-Double-Texting"}}
graph_name = "task_maistro"
# 创建第一个运行
run = await client.runs.create(
thread["thread_id"],
graph_name,
input={"messages": [HumanMessage(content=user_input_1)]},
config=config,
)
# 尝试创建第二个运行(会被拒绝)
try:
await client.runs.create(
thread["thread_id"],
graph_name,
input={"messages": [HumanMessage(content=user_input_2)]},
config=config,
multitask_strategy="reject", # 关键参数!
)
except httpx.HTTPStatusError as e:
print("Failed to start concurrent run", e)输出:
Failed to start concurrent run Client error '409 Conflict' for url 'http://localhost:8123/threads/2b58630e-00fd-4c35-afad-a6b59e9b9104/runs'
For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/4093.3 代码详解
关键参数:
multitask_strategy="reject"这个参数告诉服务器:
- 如果线程上已有运行在执行
- 拒绝这个新请求
- 返回 HTTP 409 Conflict
HTTP 状态码 409:
- 含义:Conflict(冲突)
- 场景:资源状态冲突
- 处理:客户端应该等待并重试
3.4 验证第一个运行完成
from langchain_core.messages import convert_to_messages
# 等待第一个运行完成
await client.runs.join(thread["thread_id"], run["run_id"])
# 获取线程状态
state = await client.threads.get_state(thread["thread_id"])
for m in convert_to_messages(state["values"]["messages"]):
m.pretty_print()输出:
================================ Human Message =================================
Add a ToDo to follow-up with DI Repairs.
================================== Ai Message ==================================
Tool Calls:
UpdateMemory (call_6xqHubCPNufS0bg4tbUxC0FU)
Call ID: call_6xqHubCPNufS0bg4tbUxC0FU
Args:
update_type: todo
================================= Tool Message =================================
New ToDo created:
Content: {'task': 'Follow-up with DI Repairs', 'time_to_complete': 30, 'deadline': None, ...}
================================== Ai Message ==================================
I've added a task to follow-up with DI Repairs to your ToDo list.观察:
- ✅ 第一个请求成功完成
- ❌ 第二个请求被拒绝,没有在历史中
- 线程只包含第一个请求的内容
3.5 Reject 策略的特点
优点:
- ✅ 简单明了
- ✅ 避免冲突
- ✅ 保持执行顺序
- ✅ 资源使用可控
缺点:
- ❌ 用户的第二个请求丢失
- ❌ 需要客户端处理 409 错误
- ❌ 用户体验可能不好(请求被拒绝)
3.6 使用场景
适合 Reject 的场景:
严格顺序要求
银行转账:必须逐个处理,不能并发资源受限
昂贵的 API 调用:避免浪费资源明确告知用户等待
UI 显示"正在处理中,请稍候..." 禁用发送按钮
客户端处理示例:
async def create_run_with_retry(client, thread_id, assistant_id, input, config, max_retries=3):
"""创建运行,如果遇到 409 则重试"""
for attempt in range(max_retries):
try:
return await client.runs.create(
thread_id,
assistant_id,
input=input,
config=config,
multitask_strategy="reject"
)
except httpx.HTTPStatusError as e:
if e.response.status_code == 409 and attempt < max_retries - 1:
print(f"Conflict, retrying... (attempt {attempt + 1})")
await asyncio.sleep(1) # 等待 1 秒
else:
raise四、策略 2:Enqueue(排队)
4.1 概念
Enqueue 策略:将新请求加入队列,等当前运行完成后按顺序执行。
用户请求 1
↓
Run 1 开始执行
↓
用户请求 2
↓
✅ 接受并加入队列
↓
Run 1 完成
↓
Run 2 自动开始
↓
Run 2 完成
↓
两个请求都处理完成4.2 实现代码
# 创建新线程
thread = await client.threads.create()
# 准备两个请求
user_input_1 = "Send Erik his t-shirt gift this weekend."
user_input_2 = "Get cash and pay nanny for 2 weeks. Do this by Friday."
config = {"configurable": {"user_id": "Test-Double-Texting"}}
graph_name = "task_maistro"
# 创建第一个运行
first_run = await client.runs.create(
thread["thread_id"],
graph_name,
input={"messages": [HumanMessage(content=user_input_1)]},
config=config,
)
# 创建第二个运行(会被排队)
second_run = await client.runs.create(
thread["thread_id"],
graph_name,
input={"messages": [HumanMessage(content=user_input_2)]},
config=config,
multitask_strategy="enqueue", # 排队策略
)
# 等待第二个运行完成(会自动等第一个完成)
await client.runs.join(thread["thread_id"], second_run["run_id"])
# 获取线程状态
state = await client.threads.get_state(thread["thread_id"])
for m in convert_to_messages(state["values"]["messages"]):
m.pretty_print()输出:
================================ Human Message =================================
Send Erik his t-shirt gift this weekend.
================================== Ai Message ==================================
Tool Calls:
UpdateMemory (call_svTeXPmWGTLY8aQ8EifjwHAa)
Call ID: call_svTeXPmWGTLY8aQ8EifjwHAa
Args:
update_type: todo
================================= Tool Message =================================
New ToDo created:
Content: {'task': 'Send Erik his t-shirt gift', 'deadline': '2024-11-19T23:59:00', ...}
================================== Ai Message ==================================
I've updated your ToDo list to send Erik his t-shirt gift this weekend.
================================ Human Message =================================
Get cash and pay nanny for 2 weeks. Do this by Friday.
================================== Ai Message ==================================
Tool Calls:
UpdateMemory (call_Cq0Tfn6yqccHH8n0DOucz5OQ)
Call ID: call_Cq0Tfn6yqccHH8n0DOucz5OQ
Args:
update_type: todo
================================= Tool Message =================================
New ToDo created:
Content: {'task': 'Get cash and pay nanny for 2 weeks', 'deadline': '2024-11-17T23:59:00', ...}
Document af1fe011-f3c5-4c1c-b98b-181869bc2944 updated:
Plan: Update the deadline for sending Erik his t-shirt gift to this weekend, which is by 2024-11-17.
================================== Ai Message ==================================
I've updated your ToDo list to ensure you get cash and pay the nanny for 2 weeks by Friday.4.3 代码详解
关键参数:
multitask_strategy="enqueue"执行流程:
1. first_run 创建并开始执行
↓
2. second_run 创建,自动加入队列
↓ (等待...)
3. first_run 完成
↓
4. second_run 自动从队列中取出并执行
↓
5. second_run 完成关键点:
client.runs.join(thread_id, second_run_id)会等待直到 second_run 完成- 这隐式地也等待了 first_run 完成
- 两个请求的消息都出现在线程历史中
4.4 观察结果
线程包含两个完整的对话轮次:
- 第一轮:添加 Erik 礼物待办
- 第二轮:添加付保姆钱待办
注意:第二个请求还更新了第一个待办的截止日期!
Document af1fe011-f3c5-4c1c-b98b-181869bc2944 updated:
Plan: Update the deadline for sending Erik his t-shirt gift to this weekend, which is by 2024-11-17.这说明:
- 第二个运行可以访问第一个运行的结果
- Store 中的数据在两个运行之间共享
- AI 能够关联两个请求的上下文
4.5 Enqueue 策略的特点
优点:
- ✅ 所有请求都会被处理
- ✅ 保持执行顺序
- ✅ 避免数据冲突
- ✅ 用户无感知(不需要处理错误)
缺点:
- ⚠️ 第二个请求需要等待
- ⚠️ 如果第一个请求很慢,第二个会延迟
- ⚠️ 队列可能会变长
4.6 使用场景
适合 Enqueue 的场景:
批量操作
python用户快速输入: "添加待办:买牛奶" "添加待办:打电话给妈妈" "添加待办:预约牙医" → 全部排队,依次处理相关请求
python用户: "总结我的待办" "先做哪个?" → 第二个请求依赖第一个的结果不能丢失任何请求
python客户服务:所有用户消息都必须回复 表单提交:所有数据都必须保存
实际应用示例:
# 聊天机器人示例
async def handle_user_messages(client, thread_id, messages):
"""处理用户的多条消息,使用 enqueue 确保都被处理"""
runs = []
for message in messages:
run = await client.runs.create(
thread_id,
"chatbot",
input={"messages": [HumanMessage(content=message)]},
config=config,
multitask_strategy="enqueue"
)
runs.append(run)
# 等待所有运行完成
for run in runs:
await client.runs.join(thread_id, run["run_id"])
return runs五、策略 3:Interrupt(中断)
5.1 概念
Interrupt 策略:中断当前运行,但保存已完成的工作,然后执行新请求。
用户请求 1
↓
Run 1 开始执行
↓ (执行了一部分...)
↓ (已完成的工作被保存)
用户请求 2
↓
❗ 中断 Run 1
↓
保存 Run 1 的进度
↓
Run 2 开始执行
↓
Run 2 完成
↓
Run 1 的部分工作和 Run 2 的完整工作都在历史中5.2 实现代码
import asyncio
# 创建新线程
thread = await client.threads.create()
# 准备两个请求
user_input_1 = "Give me a summary of my ToDos due tomrrow."
user_input_2 = "Never mind, create a ToDo to Order Ham for Thanksgiving by next Friday."
config = {"configurable": {"user_id": "Test-Double-Texting"}}
graph_name = "task_maistro"
# 创建第一个运行
interrupted_run = await client.runs.create(
thread["thread_id"],
graph_name,
input={"messages": [HumanMessage(content=user_input_1)]},
config=config,
)
# 等待一秒,让第一个运行执行一部分
await asyncio.sleep(1)
# 创建第二个运行(会中断第一个)
second_run = await client.runs.create(
thread["thread_id"],
graph_name,
input={"messages": [HumanMessage(content=user_input_2)]},
config=config,
multitask_strategy="interrupt", # 中断策略
)
# 等待第二个运行完成
await client.runs.join(thread["thread_id"], second_run["run_id"])
# 获取线程状态
state = await client.threads.get_state(thread["thread_id"])
for m in convert_to_messages(state["values"]["messages"]):
m.pretty_print()输出:
================================ Human Message =================================
Give me a summary of my ToDos due tomrrow.
================================ Human Message =================================
Never mind, create a ToDo to Order Ham for Thanksgiving by next Friday.
================================== Ai Message ==================================
Tool Calls:
UpdateMemory (call_Rk80tTSJzik2oY44tyUWk8FM)
Call ID: call_Rk80tTSJzik2oY44tyUWk8FM
Args:
update_type: todo
================================= Tool Message =================================
New ToDo created:
Content: {'task': 'Order Ham for Thanksgiving', 'deadline': '2024-11-22T23:59:59', ...}
================================== Ai Message ==================================
I've added the task "Order Ham for Thanksgiving" to your ToDo list.5.3 代码详解
关键点 1:await asyncio.sleep(1)
await asyncio.sleep(1)- 等待 1 秒
- 给第一个运行一些执行时间
- 确保第一个运行已经开始(可能部分完成)
如果不等待,第一个运行可能还没开始就被中断了。
关键点 2:multitask_strategy="interrupt"
multitask_strategy="interrupt"- 中断当前正在执行的运行
- 保存已完成的工作
- 开始执行新运行
5.4 验证中断状态
# 确认第一个运行被中断
status = (await client.runs.get(thread["thread_id"], interrupted_run["run_id"]))["status"]
print(status)输出:
interrupted运行状态说明:
interrupted:运行被中断- 与
success、error、pending不同 - 表示运行没有完成,但工作被保存
5.5 观察结果
线程包含:
- ✅ 第一个请求的输入(Human Message)
- ❌ 第一个请求的输出(被中断,没有完成)
- ✅ 第二个请求的完整对话
解释:
第一个运行:
- 输入消息被保存(已经加入 state)
- AI 响应未生成(被中断)
第二个运行:
- 从当前 state 继续
- 看到了第一个请求的输入
- 生成完整响应5.6 Interrupt vs Enqueue
关键区别:
| 对比点 | Enqueue | Interrupt |
|---|---|---|
| 第一个运行 | ✅ 完整执行 | ⚠️ 被中断 |
| 第一个结果 | ✅ 有完整响应 | ❌ 没有响应 |
| 第二个运行 | ✅ 在第一个之后 | ✅ 立即执行 |
| 历史记录 | 两个完整对话 | 一个完整 + 一个部分 |
| 执行时间 | 第一个 + 第二个 | 约等于第二个 |
5.7 Interrupt 策略的特点
优点:
- ✅ 快速响应新请求
- ✅ 保存第一个请求的输入(上下文保留)
- ✅ 避免浪费时间在过时的请求上
- ✅ 用户能立即看到新的响应
缺点:
- ⚠️ 第一个请求没有完整结果
- ⚠️ 第一个请求的部分工作可能浪费
- ⚠️ 可能导致状态不一致(如果有副作用)
5.8 使用场景
适合 Interrupt 的场景:
用户纠正错误
用户:"预订飞往纽约的机票" ↓ (AI 正在搜索...) 用户:"等等,我是说洛杉矶!" ↓ (中断,立即改为洛杉矶)用户补充信息
用户:"总结我的待办" ↓ (AI 正在整理...) 用户:"只看明天到期的" ↓ (中断,重新筛选)用户改变主意
用户:"给我推荐一些餐厅" ↓ (AI 正在搜索...) 用户:"算了,我想知道电影院" ↓ (中断,改为搜索电影院)优先级变化
用户:"分析这个大文件" ↓ (处理中...) 用户:"快速查一下今天的天气" ↓ (中断大任务,先回答天气)
不适合 Interrupt 的场景:
- ❌ 有副作用的操作(如已发送邮件、已扣款)
- ❌ 不可逆的操作
- ❌ 需要所有请求都完整执行的场景
六、策略 4:Rollback(回滚)
6.1 概念
Rollback 策略:完全删除第一个运行,就像它从未发生过,只执行第二个请求。
用户请求 1
↓
Run 1 开始执行
↓
用户请求 2
↓
❌ 删除 Run 1(包括输入)
↓
Run 2 开始执行
↓
Run 2 完成
↓
线程中只有 Run 2,Run 1 完全消失6.2 实现代码
# 创建新线程
thread = await client.threads.create()
# 准备两个请求
user_input_1 = "Add a ToDo to call to make appointment at Yoga."
user_input_2 = "Actually, add a ToDo to drop by Yoga in person on Sunday."
config = {"configurable": {"user_id": "Test-Double-Texting"}}
graph_name = "task_maistro"
# 创建第一个运行
rolled_back_run = await client.runs.create(
thread["thread_id"],
graph_name,
input={"messages": [HumanMessage(content=user_input_1)]},
config=config,
)
# 创建第二个运行(会回滚第一个)
second_run = await client.runs.create(
thread["thread_id"],
graph_name,
input={"messages": [HumanMessage(content=user_input_2)]},
config=config,
multitask_strategy="rollback", # 回滚策略
)
# 等待第二个运行完成
await client.runs.join(thread["thread_id"], second_run["run_id"])
# 获取线程状态
state = await client.threads.get_state(thread["thread_id"])
for m in convert_to_messages(state["values"]["messages"]):
m.pretty_print()输出:
================================ Human Message =================================
Actually, add a ToDo to drop by Yoga in person on Sunday.
================================== Ai Message ==================================
It looks like the task "Drop by Yoga in person" is already on your ToDo list
with a deadline of November 19, 2024. Would you like me to update the deadline
to the upcoming Sunday instead?6.3 代码详解
关键参数:
multitask_strategy="rollback"结果观察:
- ❌ 第一个请求完全消失
- ❌ 连输入消息都没有
- ✅ 只有第二个请求的完整对话
6.4 验证回滚
# 确认第一个运行被删除
try:
await client.runs.get(thread["thread_id"], rolled_back_run["run_id"])
except httpx.HTTPStatusError as _:
print("Original run was correctly deleted")输出:
Original run was correctly deleted尝试获取第一个运行的信息会抛出 HTTP 404 错误,因为它已经被完全删除了。
6.5 Rollback vs Interrupt
关键区别:
| 对比点 | Interrupt | Rollback |
|---|---|---|
| 第一个输入 | ✅ 保留 | ❌ 删除 |
| 第一个状态 | 保存 | 删除 |
| 线程历史 | 包含第一个输入 | 只有第二个请求 |
| Run 记录 | status = interrupted | 完全不存在 |
| 可恢复性 | 可以查看被中断的运行 | 无法查看第一个运行 |
直观对比:
Interrupt:
Thread History
├── Human: "总结待办" (第一个请求的输入)
└── Human: "创建新待办" (第二个请求)
└── AI: "已创建"
Rollback:
Thread History
└── Human: "创建新待办" (只有第二个请求)
└── AI: "已创建"6.6 Rollback 策略的特点
优点:
- ✅ 历史最干净
- ✅ 就像第一个请求从未发生
- ✅ 避免混淆(没有未完成的请求)
- ✅ 节省存储空间
缺点:
- ❌ 第一个请求的上下文完全丢失
- ❌ 无法追踪用户改变主意的历史
- ❌ 不可逆(无法恢复第一个请求)
- ❌ 可能丢失有价值的信息
6.7 使用场景
适合 Rollback 的场景:
用户完全改变主意
用户:"预订去巴黎的酒店" ↓ 用户:"不对,改成东京" ↓ (完全不同的请求,旧的没用了)输入错误
用户:"添加待办:买牛奶...123ABC" ↓ (误触发送,输入未完成) 用户:"添加待办:买牛奶和面包" ↓ (完整正确的输入)测试/实验
用户:"试试这个功能" ↓ (只是测试) 用户:"正式开始工作" ↓ (真正的任务)用户想要"重新开始"
聊天机器人: 用户:"你好" 用户:"不对,直接说正事" ↓ (用户不想要寒暄)
不适合 Rollback 的场景:
- ❌ 第一个请求的上下文对第二个请求有用
- ❌ 需要审计追踪(audit trail)
- ❌ 第一个请求可能有副作用
七、策略对比和选择
7.1 详细对比表
| 特性 | Reject | Enqueue | Interrupt | Rollback |
|---|---|---|---|---|
| 第一个运行 | ✅ 完成 | ✅ 完成 | ⚠️ 中断 | ❌ 删除 |
| 第二个运行 | ❌ 拒绝 | ✅ 排队后完成 | ✅ 立即完成 | ✅ 立即完成 |
| 第一个输入 | ✅ 保留 | ✅ 保留 | ✅ 保留 | ❌ 删除 |
| 第一个输出 | ✅ 完整 | ✅ 完整 | ❌ 无 | ❌ 无 |
| 响应时间 | 快(只一个) | 慢(两个顺序) | 中(第二个立即) | 中(第二个立即) |
| 用户体验 | 需处理错误 | 无感知等待 | 快速响应 | 最干净 |
| 数据完整性 | ✅ 最高 | ✅ 高 | ⚠️ 中 | ⚠️ 低 |
| 副作用处理 | ✅ 安全 | ✅ 安全 | ⚠️ 需注意 | ❌ 不安全 |
| 可追溯性 | ✅ 完整 | ✅ 完整 | ✅ 中(有interrupted状态) | ❌ 无 |
7.2 决策树
用户发送第二条消息
↓
第一条消息还在处理中?
↓
YES
↓
问:第一条消息重要吗?
↓
YES → 问:能等吗?
↓ ↓
YES NO
↓ ↓
Enqueue Interrupt
(排队) (中断)
NO → 问:需要第一条的上下文吗?
↓ ↓
YES NO
↓ ↓
Interrupt Rollback
(中断) (回滚)
问:系统资源紧张吗?
↓
YES
↓
Reject
(拒绝)7.3 场景选择指南
场景 1:聊天机器人
情况:用户在聊天
用户:"你好"
用户:"帮我查天气"推荐:Enqueue
- 原因:两条消息都是独立的,都应该回复
- 用户期望:收到两条回复
场景 2:搜索/查询
情况:用户在搜索
用户:"搜索巴黎酒店"
用户:"不对,搜索东京酒店"推荐:Rollback
- 原因:用户改变了搜索目标
- 第一个搜索结果无用
场景 3:表单填写
情况:用户在填写表单
用户:"我的名字是..."
用户:"等等,我再想想"推荐:Interrupt
- 原因:保留部分填写的内容
- 用户可能继续填写
场景 4:交易操作
情况:用户在转账
用户:"转账1000元到账户A"
用户:"不对,转到账户B"推荐:Reject
- 原因:金融交易不能有任何不确定性
- 必须等第一个完全处理完
- 或者UI层面禁止快速连发
场景 5:文件处理
情况:用户上传文件处理
用户:"分析这个大文件"
用户:"等等,先看这个小文件"推荐:Interrupt
- 原因:大文件处理耗时,小文件优先级更高
- 节省资源
7.4 Python 实现辅助函数
async def create_run_with_strategy(
client,
thread_id,
assistant_id,
input,
config,
scenario="chat"
):
"""根据场景自动选择策略"""
strategy_map = {
"chat": "enqueue", # 聊天:全部处理
"search": "rollback", # 搜索:最新优先
"form": "interrupt", # 表单:保留进度
"transaction": "reject", # 交易:严格顺序
"file": "interrupt" # 文件:可中断
}
strategy = strategy_map.get(scenario, "enqueue")
try:
return await client.runs.create(
thread_id,
assistant_id,
input=input,
config=config,
multitask_strategy=strategy
)
except httpx.HTTPStatusError as e:
if e.response.status_code == 409:
# Reject 策略返回 409,等待后重试
await asyncio.sleep(1)
return await create_run_with_strategy(
client, thread_id, assistant_id, input, config, scenario
)
else:
raise八、高级主题
8.1 动态策略选择
根据运行时条件动态选择策略:
async def adaptive_create_run(client, thread_id, assistant_id, input, config):
"""自适应策略选择"""
# 检查当前运行状态
runs = await client.runs.list(thread_id)
active_runs = [r for r in runs if r["status"] in ["pending", "running"]]
if not active_runs:
# 没有活跃运行,直接创建
strategy = None
elif len(active_runs) == 1:
# 一个活跃运行
run = active_runs[0]
# 检查运行时长
created_at = datetime.fromisoformat(run["created_at"])
elapsed = datetime.now(timezone.utc) - created_at
if elapsed.seconds < 5:
# 刚开始不久,可以回滚
strategy = "rollback"
elif elapsed.seconds < 30:
# 执行中,中断
strategy = "interrupt"
else:
# 执行很久了,等待或拒绝
strategy = "enqueue"
else:
# 多个活跃运行,拒绝
strategy = "reject"
return await client.runs.create(
thread_id,
assistant_id,
input=input,
config=config,
multitask_strategy=strategy
)8.2 用户反馈机制
让用户选择如何处理:
async def create_run_with_user_choice(client, thread_id, assistant_id, input, config):
"""让用户选择策略"""
# 检查是否有活跃运行
runs = await client.runs.list(thread_id)
active_runs = [r for r in runs if r["status"] in ["pending", "running"]]
if active_runs:
# 询问用户
print("检测到正在处理的请求,你想要:")
print("1. 等待当前请求完成后处理 (Enqueue)")
print("2. 取消当前请求,立即处理新请求 (Interrupt)")
print("3. 取消当前请求并删除 (Rollback)")
print("4. 取消新请求 (Reject)")
choice = input("请选择 (1-4): ")
strategy_map = {
"1": "enqueue",
"2": "interrupt",
"3": "rollback",
"4": "reject"
}
strategy = strategy_map.get(choice, "enqueue")
else:
strategy = None
if strategy == "reject":
print("新请求已取消")
return None
return await client.runs.create(
thread_id,
assistant_id,
input=input,
config=config,
multitask_strategy=strategy
)8.3 监控和日志
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def create_run_with_logging(
client, thread_id, assistant_id, input, config, strategy
):
"""带详细日志的运行创建"""
logger.info(f"Creating run with strategy: {strategy}")
logger.info(f"Thread: {thread_id}")
logger.info(f"Input: {input}")
try:
run = await client.runs.create(
thread_id,
assistant_id,
input=input,
config=config,
multitask_strategy=strategy
)
logger.info(f"Run created: {run['run_id']}")
logger.info(f"Status: {run['status']}")
return run
except httpx.HTTPStatusError as e:
if e.response.status_code == 409:
logger.warning(f"Conflict (409): Run rejected by {strategy} strategy")
else:
logger.error(f"HTTP error: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise九、最佳实践
9.1 选择策略的原则
1. 默认使用 Enqueue
# 对于大多数场景,enqueue 是最安全的选择
strategy = "enqueue"2. 明确用户意图时使用 Rollback
# 用户明确说"不对"、"算了"、"改成"
if "不对" in user_input or "算了" in user_input:
strategy = "rollback"3. 资源受限时使用 Reject
# API 调用次数有限、费用昂贵
if expensive_operation:
strategy = "reject"4. 用户体验优先时使用 Interrupt
# 响应速度重要,旧请求可以中断
if user_priority == "speed":
strategy = "interrupt"9.2 错误处理模板
async def robust_create_run(
client, thread_id, assistant_id, input, config, strategy="enqueue", max_retries=3
):
"""健壮的运行创建"""
for attempt in range(max_retries):
try:
return await client.runs.create(
thread_id,
assistant_id,
input=input,
config=config,
multitask_strategy=strategy
)
except httpx.HTTPStatusError as e:
if e.response.status_code == 409:
# Conflict - 根据策略处理
if strategy == "reject":
if attempt < max_retries - 1:
logger.info(f"Retry attempt {attempt + 1}/{max_retries}")
await asyncio.sleep(2 ** attempt) # 指数退避
continue
else:
logger.error("Max retries reached")
raise
else:
# 其他策略不应该返回 409
logger.error(f"Unexpected 409 with strategy {strategy}")
raise
else:
logger.error(f"HTTP {e.response.status_code}: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise9.3 测试不同策略
async def test_all_strategies(client, assistant_id, config):
"""测试所有策略"""
strategies = ["reject", "enqueue", "interrupt", "rollback"]
for strategy in strategies:
print(f"\n=== Testing {strategy} ===")
# 创建线程
thread = await client.threads.create()
# 第一个请求
run1 = await client.runs.create(
thread["thread_id"],
assistant_id,
input={"messages": [HumanMessage(content="First request")]},
config=config
)
# 等待一点时间
await asyncio.sleep(0.5)
# 第二个请求
try:
run2 = await client.runs.create(
thread["thread_id"],
assistant_id,
input={"messages": [HumanMessage(content="Second request")]},
config=config,
multitask_strategy=strategy
)
print(f"Run 2 created: {run2['run_id']}")
except httpx.HTTPStatusError as e:
print(f"Run 2 rejected: {e.response.status_code}")
# 等待完成
await asyncio.sleep(2)
# 检查结果
state = await client.threads.get_state(thread["thread_id"])
messages = state["values"]["messages"]
print(f"Total messages: {len(messages)}")
# 检查第一个运行状态
run1_status = (await client.runs.get(thread["thread_id"], run1["run_id"]))["status"]
print(f"Run 1 status: {run1_status}")十、常见问题和解决方案
10.1 问题:Enqueue 队列过长
现象:用户发送很多消息,队列积压
解决方案:
# 检查队列长度
async def get_queue_length(client, thread_id):
runs = await client.runs.list(thread_id)
pending_runs = [r for r in runs if r["status"] in ["pending"]]
return len(pending_runs)
# 动态调整策略
async def smart_create_run(client, thread_id, assistant_id, input, config):
queue_length = await get_queue_length(client, thread_id)
if queue_length > 5:
# 队列过长,改用 rollback
strategy = "rollback"
else:
strategy = "enqueue"
return await client.runs.create(
thread_id, assistant_id, input=input, config=config,
multitask_strategy=strategy
)10.2 问题:Interrupt 导致副作用丢失
现象:运行被中断,但已经发送了邮件
解决方案:
# 方案 1:使用 Enqueue 替代 Interrupt
# 对于有副作用的操作,不要使用 interrupt
# 方案 2:幂等性设计
# 确保操作可以安全重试10.3 问题:Rollback 丢失重要上下文
现象:用户的第一条消息包含重要信息
解决方案:
# 方案 1:在 UI 层面确认
# "确定要取消之前的请求吗?"
# 方案 2:使用 Interrupt 替代 Rollback
# 保留第一个请求的输入
# 方案 3:合并两个请求
async def merge_requests(client, thread_id, assistant_id, request1, request2, config):
"""合并两个请求"""
merged_input = {
"messages": [
HumanMessage(content=f"Context: {request1}\n\nCurrent request: {request2}")
]
}
return await client.runs.create(
thread_id,
assistant_id,
input=merged_input,
config=config,
multitask_strategy="rollback"
)十一、总结
11.1 核心要点
Double Texting 是生产环境的常见问题
- 用户不会等待
- 必须优雅处理并发请求
四种策略各有适用场景
- Reject:严格顺序控制
- Enqueue:所有请求都重要
- Interrupt:快速响应优先
- Rollback:最新请求优先
选择策略的关键因素
- 用户意图
- 请求重要性
- 响应速度要求
- 副作用考虑
没有万能策略
- 根据场景选择
- 可以动态调整
- 需要权衡利弊
11.2 快速参考
# Reject - 拒绝新请求
await client.runs.create(..., multitask_strategy="reject")
# 返回 409 Conflict
# Enqueue - 排队处理
await client.runs.create(..., multitask_strategy="enqueue")
# 两个请求都完成
# Interrupt - 中断当前
await client.runs.create(..., multitask_strategy="interrupt")
# 保存第一个输入,执行第二个
# Rollback - 回滚删除
await client.runs.create(..., multitask_strategy="rollback")
# 删除第一个,只执行第二个11.3 决策速查表
| 你的需求 | 推荐策略 |
|---|---|
| 所有请求都必须处理 | Enqueue |
| 新请求优先级更高 | Interrupt 或 Rollback |
| 第一个请求不能中断 | Reject 或 Enqueue |
| 需要快速响应 | Interrupt 或 Rollback |
| 有副作用操作 | Reject 或 Enqueue |
| 用户明确取消第一个 | Rollback |
| 保留所有历史 | Enqueue 或 Interrupt |
| 历史最干净 | Rollback |
11.4 下节预告
在 6.4-assistant 中,我们将学习:
- 什么是 Assistants(助手)
- 如何创建和管理 assistants
- 配置不同的 assistants
- Assistants 的版本控制
- 实际应用场景
文档版本:1.0 最后更新:2024-11-05 作者:AI Assistant 基于:LangChain Academy Module-6 Lesson 6.3
恭喜你掌握了 Double Texting 的所有处理策略!现在你可以构建能优雅处理并发请求的生产级应用了。🎉