LangGraph 多智能体协同研究详细解读
📚 概述
Multi-Agent Supervisor Pattern 是 Deep Research 系统的高级架构,用于处理复杂的多主题研究任务。本章讲解如何使用 Supervisor 协调多个 Research Agent 并行工作。
核心价值:
- 🔀 上下文隔离 - 每个 Agent 专注单一主题
- ⚡ 并行加速 - 同时研究多个子主题
- 🎯 质量提升 - 避免单 Agent 的上下文冲突
- 🧩 灵活分解 - Supervisor 智能决定是否并行
🎯 核心问题:为什么需要多智能体?
单 Agent 的局限
问题场景:对比任务
python
User: "对比 OpenAI、Anthropic、Google 的 AI 安全方法"
单 Agent 执行:
Search 1: "OpenAI AI safety approach"
Search 2: "Anthropic AI safety Constitutional AI"
Search 3: "Google AI safety research"
Search 4: "OpenAI alignment research" # 回到 OpenAI
Search 5: "Anthropic interpretability" # 回到 Anthropic
...
问题:
❌ 上下文混乱 - 三个主题的信息交织
❌ Context Clash - "OpenAI" 和 "Anthropic" 信息相互干扰
❌ 搜索低效 - 在主题间反复跳转
❌ token 浪费 - 所有信息都在一个上下文中多 Agent 的优势
python
Supervisor 决策:
"这是对比任务,分解为 3 个独立子主题"
Agent A 专注: OpenAI AI 安全
Search 1: "OpenAI AI safety approach"
Search 2: "OpenAI alignment team research"
结论: 基于 RLHF 和 superalignment...
Agent B 专注: Anthropic AI 安全
Search 1: "Anthropic Constitutional AI"
Search 2: "Anthropic interpretability research"
结论: 基于 Constitutional AI 和可解释性...
Agent C 专注: Google AI 安全
Search 1: "Google AI safety research DeepMind"
Search 2: "Google responsible AI principles"
结论: 基于伦理审查和测试框架...
优势:
✅ 上下文清晰 - 每个 Agent 只关注一个主题
✅ 深度聚焦 - 可以深入挖掘细节
✅ 并行执行 - 总时间 = max(各 Agent 时间)
✅ 质量更高 - 避免信息混淆🏗️ Supervisor Pattern 架构
整体流程
研究简报输入
↓
┌──────────────────────────────────┐
│ Supervisor 分析 │
│ - 这是对比任务吗? │
│ - 需要分解为几个子主题? │
│ - 每个子主题的研究重点? │
└──────────────────────────────────┘
↓
┌──────────────────────────────────┐
│ Supervisor 决策 │
│ │
│ 选项 A: 单 Agent │
│ → 简单查询,无需并行 │
│ │
│ 选项 B: 多 Agent 并行 │
│ → 对比任务,分解为子主题 │
└──────────────────────────────────┘
↓ (选择 B)
┌────────┬────────┬────────┐
│ Agent A│ Agent B│ Agent C│ ← 并行执行
│ 主题 1 │ 主题 2 │ 主题 3 │
│ │ │ │
│ 研究中 │ 研究中 │ 研究中 │
│ ↓ │ ↓ │ ↓ │
│ 压缩 │ 压缩 │ 压缩 │
└────────┴────────┴────────┘
↓
┌──────────────────────────────────┐
│ Supervisor 聚合 │
│ - 收集所有 Agent 的研究结果 │
│ - 合并压缩结果和原始笔记 │
└──────────────────────────────────┘
↓
最终研究成果🔧 核心技术实现
1. 状态定义
python
class SupervisorState(TypedDict):
"""Supervisor 状态"""
# Supervisor 的对话历史
supervisor_messages: Annotated[Sequence[BaseMessage], add_messages]
# 研究简报(来自 Scoping)
research_brief: str
# 压缩后的研究笔记(来自各 Agent)
notes: Annotated[list[str], operator.add]
# 研究迭代计数
research_iterations: int
# 原始笔记(未压缩)
raw_notes: Annotated[list[str], operator.add]2. Supervisor 工具
python
@tool
class ConductResearch(BaseModel):
"""委托研究任务给子 Agent"""
research_topic: str = Field(
description="要研究的主题。应该是单一主题,并详细描述(至少一段话)"
)
@tool
class ResearchComplete(BaseModel):
"""标记研究完成"""
pass使用方式:
python
# Supervisor 调用 ConductResearch
Tool Call: ConductResearch(
research_topic="深入研究 OpenAI 的 AI 安全方法,包括..."
)
# 触发一个 Research Agent 执行3. Supervisor 决策节点(关键)
python
async def supervisor(state: SupervisorState) -> Command:
"""
Supervisor 的核心决策逻辑
分析研究简报,决定:
1. 是否需要分解任务?
2. 分解为几个子任务?
3. 每个子任务的具体内容?
"""
supervisor_messages = state.get("supervisor_messages", [])
# Prompt 包含决策启发式
system_message = lead_researcher_prompt.format(
date=get_today_str(),
max_concurrent_research_units=3, # 最多 3 个并行 Agent
max_researcher_iterations=6 # 每个 Agent 最多 6 次工具调用
)
messages = [SystemMessage(content=system_message)] + supervisor_messages
# LLM 决策
response = await supervisor_model_with_tools.ainvoke(messages)
return Command(
goto="supervisor_tools",
update={
"supervisor_messages": [response],
"research_iterations": state.get("research_iterations", 0) + 1
}
)Prompt 中的关键启发式:
python
"""
<Scaling Rules>
Simple fact-finding, lists, rankings: Use 1 sub-agent
Example: "List top 10 SF coffee shops" → 1 agent
Comparisons: Use 1 sub-agent per element
Example: "Compare OpenAI vs Anthropic vs DeepMind" → 3 agents
Delegate clear, distinct, non-overlapping subtopics
<Hard Limits>
- Bias towards single agent for simplicity
- Maximum {max_concurrent_research_units} parallel agents
- Stop after {max_researcher_iterations} tool calls if no progress
</Hard Limits>
"""4. 并行执行节点(核心技术)
python
import asyncio
async def supervisor_tools(state: SupervisorState) -> Command:
"""
执行 Supervisor 的决策:
- 调用 think_tool 进行反思
- 启动并行 Research Agents
- 聚合结果
"""
most_recent_message = state["supervisor_messages"][-1]
tool_calls = most_recent_message.tool_calls
# 分离不同类型的工具调用
think_tool_calls = [
tc for tc in tool_calls if tc["name"] == "think_tool"
]
conduct_research_calls = [
tc for tc in tool_calls if tc["name"] == "ConductResearch"
]
tool_messages = []
all_raw_notes = []
# 执行 think_tool(同步)
for tool_call in think_tool_calls:
observation = think_tool.invoke(tool_call["args"])
tool_messages.append(
ToolMessage(
content=observation,
name="think_tool",
tool_call_id=tool_call["id"]
)
)
# 并行执行 Research Agents(关键!)
if conduct_research_calls:
# 创建异步任务列表
research_tasks = [
researcher_agent.ainvoke({
"researcher_messages": [
HumanMessage(content=tc["args"]["research_topic"])
],
"research_topic": tc["args"]["research_topic"]
})
for tc in conduct_research_calls
]
# 并行执行所有任务
research_results = await asyncio.gather(*research_tasks)
# 格式化结果
for result, tool_call in zip(research_results, conduct_research_calls):
# 压缩的研究作为 ToolMessage 返回
tool_messages.append(
ToolMessage(
content=result.get("compressed_research", "Error"),
name="ConductResearch",
tool_call_id=tool_call["id"]
)
)
# 收集原始笔记
all_raw_notes.append("\n".join(result.get("raw_notes", [])))
# 决定下一步
if should_end(state):
return Command(
goto=END,
update={
"notes": get_notes_from_tool_calls(state["supervisor_messages"]),
"raw_notes": all_raw_notes
}
)
else:
return Command(
goto="supervisor",
update={
"supervisor_messages": tool_messages,
"raw_notes": all_raw_notes
}
)asyncio.gather 关键点:
python
# 串行执行(慢)
results = []
for task in tasks:
result = await agent.ainvoke(task) # 等待每个完成
results.append(result)
# 总时间 = sum(各 Agent 时间)
# 并行执行(快)
results = await asyncio.gather(*[
agent.ainvoke(task) for task in tasks
])
# 总时间 = max(各 Agent 时间)示例:3 个 Agent,每个需要 30 秒
- 串行:90 秒
- 并行:30 秒
💡 核心技术点详解
1. 上下文隔离(Context Isolation)
python
# Agent A 的上下文
{
"researcher_messages": [
HumanMessage("研究 OpenAI AI 安全方法"),
AIMessage("搜索..."),
ToolMessage("结果 A1"),
AIMessage("再搜索..."),
ToolMessage("结果 A2"),
...
]
}
# Agent B 的上下文(完全独立)
{
"researcher_messages": [
HumanMessage("研究 Anthropic AI 安全方法"),
AIMessage("搜索..."),
ToolMessage("结果 B1"),
...
]
}好处:
- ✅ 无干扰 - Agent A 的搜索结果不会影响 Agent B
- ✅ 深度聚焦 - 每个 Agent 可以深入单一主题
- ✅ 并行安全 - 不会相互覆盖状态
2. 研究结果聚合
python
def get_notes_from_tool_calls(messages: list[BaseMessage]) -> list[str]:
"""
从 Supervisor 的消息历史中提取研究笔记
当 Supervisor 调用 ConductResearch 时,
子 Agent 返回的 compressed_research 作为 ToolMessage.content
这个函数提取所有这些 ToolMessage 的内容
"""
return [
tool_msg.content
for tool_msg in filter_messages(messages, include_types="tool")
]执行流程:
Supervisor 调用 ConductResearch(topic="OpenAI 安全")
↓
Research Agent 执行
↓
返回: {
"compressed_research": "OpenAI 使用 RLHF...", ← 这个
"raw_notes": ["详细笔记 1", "详细笔记 2"]
}
↓
Supervisor 收到 ToolMessage(
content="OpenAI 使用 RLHF...", ← 压缩结果
tool_call_id="..."
)
↓
最终,get_notes_from_tool_calls 提取所有 ToolMessage.content
→ ["OpenAI 使用 RLHF...", "Anthropic 使用 Constitutional AI...", ...]3. 终止条件
python
def should_end(state):
"""决定是否结束 Supervisor 循环"""
iterations = state["research_iterations"]
last_message = state["supervisor_messages"][-1]
# 条件 1: 达到最大迭代次数
if iterations >= MAX_ITERATIONS:
return True
# 条件 2: 没有工具调用
if not last_message.tool_calls:
return True
# 条件 3: LLM 调用了 ResearchComplete
if any(tc["name"] == "ResearchComplete" for tc in last_message.tool_calls):
return True
return False🎭 实战示例
示例:对比 OpenAI vs Anthropic
python
研究简报: "对比 OpenAI 和 Anthropic 的 Deep Research 产品"
Supervisor 分析:
"这是对比任务,涉及两个不同产品"
Supervisor 决策:
Tool Call 1: think_tool(
"这是对比任务,应该为每个产品分配一个 Agent"
)
Tool Call 2: ConductResearch(
research_topic="深入研究 OpenAI Deep Research 产品的功能、
性能、使用体验、技术实现等方面..."
)
Tool Call 3: ConductResearch(
research_topic="深入研究 Anthropic (Claude) 的研究能力,
包括功能、性能、使用体验等..."
)
并行执行:
┌─────────────────┬─────────────────┐
│ Agent A │ Agent B │
├─────────────────┼─────────────────┤
│ OpenAI研究 │ Anthropic研究 │
│ Search 1... │ Search 1... │
│ think_tool... │ think_tool... │
│ Search 2... │ Search 2... │
│ Compress... │ Compress... │
└─────────────────┴─────────────────┘
↓ ↓
"OpenAI DR "Anthropic Claude
支持多轮迭代, 具有强大的分析能力,
深度搜索..." 可以..."
聚合结果:
notes = [
"OpenAI DR 支持多轮迭代...",
"Anthropic Claude 具有强大的分析能力..."
]📊 决策启发式
何时使用单 Agent?
| 任务类型 | 示例 | Agent 数量 |
|---|---|---|
| 事实查询 | "特斯拉 CEO 是谁?" | 1 |
| 列表 | "列出 SF 最佳咖啡店" | 1 |
| 排名 | "排名前 10 的 Python 库" | 1 |
| 总结 | "总结这篇文章" | 1 |
何时使用多 Agent?
| 任务类型 | 示例 | Agent 数量 |
|---|---|---|
| 对比 | "对比 A vs B vs C" | 3(每个一个) |
| 多维分析 | "分析特斯拉的技术、市场、财务" | 3(每个维度一个) |
| 并行研究 | "研究 AI 在医疗、教育、金融的应用" | 3(每个领域一个) |
💡 最佳实践
1. 子主题定义
python
# ❌ 不清晰的分解
Agent A: "研究 OpenAI"
Agent B: "研究 Anthropic"
# 问题:范围太广,可能重叠
# ✅ 清晰的分解
Agent A: "研究 OpenAI Deep Research 产品的功能、性能、
使用场景、技术实现、优缺点分析"
Agent B: "研究 Anthropic (Claude) 的深度研究能力,包括
功能特性、性能表现、使用体验、技术特点"
# 好处:明确、独立、无重叠2. 避免过度并行
python
# ❌ 过度分解
User: "研究 SF 最佳咖啡店"
Supervisor: 为每家咖啡店分配一个 Agent(10 个 Agent)
# 问题:过于复杂,反而降低效率
# ✅ 合理分解
User: "研究 SF 最佳咖啡店"
Supervisor: 使用 1 个 Agent(列表任务无需并行)3. 错误处理
python
async def supervisor_tools(state):
try:
results = await asyncio.gather(*research_tasks)
except Exception as e:
# 部分失败处理
results = await asyncio.gather(
*research_tasks,
return_exceptions=True # 收集异常而非抛出
)
# 过滤成功的结果
valid_results = [r for r in results if not isinstance(r, Exception)]
failed_count = len(results) - len(valid_results)
if failed_count > 0:
# 添加警告消息
tool_messages.append(
ToolMessage(
content=f"{failed_count} 个子研究失败,已忽略",
tool_call_id="error"
)
)🎓 核心知识点总结
Supervisor Pattern 优势
- 上下文隔离 - 避免信息混淆
- 并行加速 - 减少总执行时间
- 质量提升 - 每个 Agent 专注单一主题
- 灵活扩展 - 根据任务动态调整 Agent 数量
asyncio.gather 并行执行
python
# 并行执行多个异步任务
results = await asyncio.gather(*tasks)
# 关键点:
# - 所有任务同时开始
# - 等待所有任务完成
# - 返回结果列表(顺序与输入一致)决策启发式
- 简单任务 → 单 Agent
- 对比任务 → 每个对象一个 Agent
- 多维分析 → 每个维度一个 Agent
- 最大并发 → 3 个 Agent(可配置)
🚀 下一步
完成本节,你已经掌握了多智能体协同的核心技术。
下一章:9.5 完整系统集成 - 学习如何将 Scoping、Research、Supervisor 和 Report Generation 整合为端到端的 Deep Research 系统!