Skip to content

LangGraph 多智能体协同研究详细解读

📚 概述

Multi-Agent Supervisor Pattern 是 Deep Research 系统的高级架构,用于处理复杂的多主题研究任务。本章讲解如何使用 Supervisor 协调多个 Research Agent 并行工作。

核心价值:

  • 🔀 上下文隔离 - 每个 Agent 专注单一主题
  • 并行加速 - 同时研究多个子主题
  • 🎯 质量提升 - 避免单 Agent 的上下文冲突
  • 🧩 灵活分解 - Supervisor 智能决定是否并行

🎯 核心问题:为什么需要多智能体?

单 Agent 的局限

问题场景:对比任务

python
User: "对比 OpenAI、Anthropic、Google 的 AI 安全方法"

单 Agent 执行:
  Search 1: "OpenAI AI safety approach"
  Search 2: "Anthropic AI safety Constitutional AI"
  Search 3: "Google AI safety research"
  Search 4: "OpenAI alignment research"  # 回到 OpenAI
  Search 5: "Anthropic interpretability"  # 回到 Anthropic
  ...

问题:
  ❌ 上下文混乱 - 三个主题的信息交织
  ❌ Context Clash - "OpenAI""Anthropic" 信息相互干扰
  ❌ 搜索低效 - 在主题间反复跳转
  ❌ token 浪费 - 所有信息都在一个上下文中

多 Agent 的优势

python
Supervisor 决策:
  "这是对比任务,分解为 3 个独立子主题"

Agent A 专注: OpenAI AI 安全
  Search 1: "OpenAI AI safety approach"
  Search 2: "OpenAI alignment team research"
  结论: 基于 RLHF 和 superalignment...

Agent B 专注: Anthropic AI 安全
  Search 1: "Anthropic Constitutional AI"
  Search 2: "Anthropic interpretability research"
  结论: 基于 Constitutional AI 和可解释性...

Agent C 专注: Google AI 安全
  Search 1: "Google AI safety research DeepMind"
  Search 2: "Google responsible AI principles"
  结论: 基于伦理审查和测试框架...

优势:
  ✅ 上下文清晰 - 每个 Agent 只关注一个主题
  ✅ 深度聚焦 - 可以深入挖掘细节
  ✅ 并行执行 - 总时间 = max(各 Agent 时间)
  ✅ 质量更高 - 避免信息混淆

🏗️ Supervisor Pattern 架构

整体流程

研究简报输入

┌──────────────────────────────────┐
│ Supervisor 分析                   │
│ - 这是对比任务吗?                │
│ - 需要分解为几个子主题?          │
│ - 每个子主题的研究重点?          │
└──────────────────────────────────┘

┌──────────────────────────────────┐
│ Supervisor 决策                   │
│                                  │
│ 选项 A: 单 Agent                  │
│  → 简单查询,无需并行             │
│                                  │
│ 选项 B: 多 Agent 并行             │
│  → 对比任务,分解为子主题         │
└──────────────────────────────────┘
    ↓ (选择 B)
┌────────┬────────┬────────┐
│ Agent A│ Agent B│ Agent C│  ← 并行执行
│ 主题 1 │ 主题 2 │ 主题 3 │
│        │        │        │
│ 研究中 │ 研究中 │ 研究中 │
│   ↓    │   ↓    │   ↓    │
│ 压缩   │ 压缩   │ 压缩   │
└────────┴────────┴────────┘

┌──────────────────────────────────┐
│ Supervisor 聚合                   │
│ - 收集所有 Agent 的研究结果       │
│ - 合并压缩结果和原始笔记          │
└──────────────────────────────────┘

最终研究成果

🔧 核心技术实现

1. 状态定义

python
class SupervisorState(TypedDict):
    """Supervisor 状态"""
    # Supervisor 的对话历史
    supervisor_messages: Annotated[Sequence[BaseMessage], add_messages]
    # 研究简报(来自 Scoping)
    research_brief: str
    # 压缩后的研究笔记(来自各 Agent)
    notes: Annotated[list[str], operator.add]
    # 研究迭代计数
    research_iterations: int
    # 原始笔记(未压缩)
    raw_notes: Annotated[list[str], operator.add]

2. Supervisor 工具

python
@tool
class ConductResearch(BaseModel):
    """委托研究任务给子 Agent"""
    research_topic: str = Field(
        description="要研究的主题。应该是单一主题,并详细描述(至少一段话)"
    )

@tool
class ResearchComplete(BaseModel):
    """标记研究完成"""
    pass

使用方式:

python
# Supervisor 调用 ConductResearch
Tool Call: ConductResearch(
    research_topic="深入研究 OpenAI 的 AI 安全方法,包括..."
)

# 触发一个 Research Agent 执行

3. Supervisor 决策节点(关键)

python
async def supervisor(state: SupervisorState) -> Command:
    """
    Supervisor 的核心决策逻辑

    分析研究简报,决定:
    1. 是否需要分解任务?
    2. 分解为几个子任务?
    3. 每个子任务的具体内容?
    """
    supervisor_messages = state.get("supervisor_messages", [])

    # Prompt 包含决策启发式
    system_message = lead_researcher_prompt.format(
        date=get_today_str(),
        max_concurrent_research_units=3,  # 最多 3 个并行 Agent
        max_researcher_iterations=6        # 每个 Agent 最多 6 次工具调用
    )

    messages = [SystemMessage(content=system_message)] + supervisor_messages

    # LLM 决策
    response = await supervisor_model_with_tools.ainvoke(messages)

    return Command(
        goto="supervisor_tools",
        update={
            "supervisor_messages": [response],
            "research_iterations": state.get("research_iterations", 0) + 1
        }
    )

Prompt 中的关键启发式:

python
"""
<Scaling Rules>
Simple fact-finding, lists, rankings: Use 1 sub-agent
  Example: "List top 10 SF coffee shops" → 1 agent

Comparisons: Use 1 sub-agent per element
  Example: "Compare OpenAI vs Anthropic vs DeepMind" → 3 agents
  Delegate clear, distinct, non-overlapping subtopics

<Hard Limits>
- Bias towards single agent for simplicity
- Maximum {max_concurrent_research_units} parallel agents
- Stop after {max_researcher_iterations} tool calls if no progress
</Hard Limits>
"""

4. 并行执行节点(核心技术)

python
import asyncio

async def supervisor_tools(state: SupervisorState) -> Command:
    """
    执行 Supervisor 的决策:
    - 调用 think_tool 进行反思
    - 启动并行 Research Agents
    - 聚合结果
    """
    most_recent_message = state["supervisor_messages"][-1]
    tool_calls = most_recent_message.tool_calls

    # 分离不同类型的工具调用
    think_tool_calls = [
        tc for tc in tool_calls if tc["name"] == "think_tool"
    ]

    conduct_research_calls = [
        tc for tc in tool_calls if tc["name"] == "ConductResearch"
    ]

    tool_messages = []
    all_raw_notes = []

    # 执行 think_tool(同步)
    for tool_call in think_tool_calls:
        observation = think_tool.invoke(tool_call["args"])
        tool_messages.append(
            ToolMessage(
                content=observation,
                name="think_tool",
                tool_call_id=tool_call["id"]
            )
        )

    # 并行执行 Research Agents(关键!)
    if conduct_research_calls:
        # 创建异步任务列表
        research_tasks = [
            researcher_agent.ainvoke({
                "researcher_messages": [
                    HumanMessage(content=tc["args"]["research_topic"])
                ],
                "research_topic": tc["args"]["research_topic"]
            })
            for tc in conduct_research_calls
        ]

        # 并行执行所有任务
        research_results = await asyncio.gather(*research_tasks)

        # 格式化结果
        for result, tool_call in zip(research_results, conduct_research_calls):
            # 压缩的研究作为 ToolMessage 返回
            tool_messages.append(
                ToolMessage(
                    content=result.get("compressed_research", "Error"),
                    name="ConductResearch",
                    tool_call_id=tool_call["id"]
                )
            )

            # 收集原始笔记
            all_raw_notes.append("\n".join(result.get("raw_notes", [])))

    # 决定下一步
    if should_end(state):
        return Command(
            goto=END,
            update={
                "notes": get_notes_from_tool_calls(state["supervisor_messages"]),
                "raw_notes": all_raw_notes
            }
        )
    else:
        return Command(
            goto="supervisor",
            update={
                "supervisor_messages": tool_messages,
                "raw_notes": all_raw_notes
            }
        )

asyncio.gather 关键点:

python
# 串行执行(慢)
results = []
for task in tasks:
    result = await agent.ainvoke(task)  # 等待每个完成
    results.append(result)
# 总时间 = sum(各 Agent 时间)

# 并行执行(快)
results = await asyncio.gather(*[
    agent.ainvoke(task) for task in tasks
])
# 总时间 = max(各 Agent 时间)

示例:3 个 Agent,每个需要 30 秒

  • 串行:90 秒
  • 并行:30 秒

💡 核心技术点详解

1. 上下文隔离(Context Isolation)

python
# Agent A 的上下文
{
    "researcher_messages": [
        HumanMessage("研究 OpenAI AI 安全方法"),
        AIMessage("搜索..."),
        ToolMessage("结果 A1"),
        AIMessage("再搜索..."),
        ToolMessage("结果 A2"),
        ...
    ]
}

# Agent B 的上下文(完全独立)
{
    "researcher_messages": [
        HumanMessage("研究 Anthropic AI 安全方法"),
        AIMessage("搜索..."),
        ToolMessage("结果 B1"),
        ...
    ]
}

好处:

  • 无干扰 - Agent A 的搜索结果不会影响 Agent B
  • 深度聚焦 - 每个 Agent 可以深入单一主题
  • 并行安全 - 不会相互覆盖状态

2. 研究结果聚合

python
def get_notes_from_tool_calls(messages: list[BaseMessage]) -> list[str]:
    """
    从 Supervisor 的消息历史中提取研究笔记

    当 Supervisor 调用 ConductResearch 时,
    子 Agent 返回的 compressed_research 作为 ToolMessage.content

    这个函数提取所有这些 ToolMessage 的内容
    """
    return [
        tool_msg.content
        for tool_msg in filter_messages(messages, include_types="tool")
    ]

执行流程:

Supervisor 调用 ConductResearch(topic="OpenAI 安全")

Research Agent 执行

返回: {
    "compressed_research": "OpenAI 使用 RLHF...",  ← 这个
    "raw_notes": ["详细笔记 1", "详细笔记 2"]
}

Supervisor 收到 ToolMessage(
    content="OpenAI 使用 RLHF...",  ← 压缩结果
    tool_call_id="..."
)

最终,get_notes_from_tool_calls 提取所有 ToolMessage.content
→ ["OpenAI 使用 RLHF...", "Anthropic 使用 Constitutional AI...", ...]

3. 终止条件

python
def should_end(state):
    """决定是否结束 Supervisor 循环"""
    iterations = state["research_iterations"]
    last_message = state["supervisor_messages"][-1]

    # 条件 1: 达到最大迭代次数
    if iterations >= MAX_ITERATIONS:
        return True

    # 条件 2: 没有工具调用
    if not last_message.tool_calls:
        return True

    # 条件 3: LLM 调用了 ResearchComplete
    if any(tc["name"] == "ResearchComplete" for tc in last_message.tool_calls):
        return True

    return False

🎭 实战示例

示例:对比 OpenAI vs Anthropic

python
研究简报: "对比 OpenAI 和 Anthropic 的 Deep Research 产品"

Supervisor 分析:
  "这是对比任务,涉及两个不同产品"

Supervisor 决策:
  Tool Call 1: think_tool(
      "这是对比任务,应该为每个产品分配一个 Agent"
  )

  Tool Call 2: ConductResearch(
      research_topic="深入研究 OpenAI Deep Research 产品的功能、
                     性能、使用体验、技术实现等方面..."
  )

  Tool Call 3: ConductResearch(
      research_topic="深入研究 Anthropic (Claude) 的研究能力,
                     包括功能、性能、使用体验等..."
  )

并行执行:
  ┌─────────────────┬─────────────────┐
  │ Agent A         │ Agent B         │
  ├─────────────────┼─────────────────┤
  │ OpenAI研究      │ Anthropic研究   │
  │ Search 1...     │ Search 1...
  │ think_tool...   │ think_tool...
  │ Search 2...     │ Search 2...
  │ Compress...     │ Compress...
  └─────────────────┴─────────────────┘
      ↓                   ↓
  "OpenAI DR      "Anthropic Claude
   支持多轮迭代,   具有强大的分析能力,
   深度搜索..."    可以..."

聚合结果:
  notes = [
      "OpenAI DR 支持多轮迭代...",
      "Anthropic Claude 具有强大的分析能力..."
  ]

📊 决策启发式

何时使用单 Agent?

任务类型示例Agent 数量
事实查询"特斯拉 CEO 是谁?"1
列表"列出 SF 最佳咖啡店"1
排名"排名前 10 的 Python 库"1
总结"总结这篇文章"1

何时使用多 Agent?

任务类型示例Agent 数量
对比"对比 A vs B vs C"3(每个一个)
多维分析"分析特斯拉的技术、市场、财务"3(每个维度一个)
并行研究"研究 AI 在医疗、教育、金融的应用"3(每个领域一个)

💡 最佳实践

1. 子主题定义

python
# ❌ 不清晰的分解
Agent A: "研究 OpenAI"
Agent B: "研究 Anthropic"
# 问题:范围太广,可能重叠

# ✅ 清晰的分解
Agent A: "研究 OpenAI Deep Research 产品的功能、性能、
          使用场景、技术实现、优缺点分析"
Agent B: "研究 Anthropic (Claude) 的深度研究能力,包括
          功能特性、性能表现、使用体验、技术特点"
# 好处:明确、独立、无重叠

2. 避免过度并行

python
# ❌ 过度分解
User: "研究 SF 最佳咖啡店"
Supervisor: 为每家咖啡店分配一个 Agent(10 个 Agent)
# 问题:过于复杂,反而降低效率

# ✅ 合理分解
User: "研究 SF 最佳咖啡店"
Supervisor: 使用 1 个 Agent(列表任务无需并行)

3. 错误处理

python
async def supervisor_tools(state):
    try:
        results = await asyncio.gather(*research_tasks)
    except Exception as e:
        # 部分失败处理
        results = await asyncio.gather(
            *research_tasks,
            return_exceptions=True  # 收集异常而非抛出
        )

        # 过滤成功的结果
        valid_results = [r for r in results if not isinstance(r, Exception)]
        failed_count = len(results) - len(valid_results)

        if failed_count > 0:
            # 添加警告消息
            tool_messages.append(
                ToolMessage(
                    content=f"{failed_count} 个子研究失败,已忽略",
                    tool_call_id="error"
                )
            )

🎓 核心知识点总结

Supervisor Pattern 优势

  1. 上下文隔离 - 避免信息混淆
  2. 并行加速 - 减少总执行时间
  3. 质量提升 - 每个 Agent 专注单一主题
  4. 灵活扩展 - 根据任务动态调整 Agent 数量

asyncio.gather 并行执行

python
# 并行执行多个异步任务
results = await asyncio.gather(*tasks)

# 关键点:
# - 所有任务同时开始
# - 等待所有任务完成
# - 返回结果列表(顺序与输入一致)

决策启发式

  • 简单任务 → 单 Agent
  • 对比任务 → 每个对象一个 Agent
  • 多维分析 → 每个维度一个 Agent
  • 最大并发 → 3 个 Agent(可配置)

🚀 下一步

完成本节,你已经掌握了多智能体协同的核心技术。

下一章:9.5 完整系统集成 - 学习如何将 Scoping、Research、Supervisor 和 Report Generation 整合为端到端的 Deep Research 系统!

基于 MIT 许可证发布。内容版权归作者所有。