LangGraph 并行节点执行详细解读
📚 概述
本文档详细解读 LangGraph 中的并行节点执行(Parallelization)机制。这是构建高效多智能体系统的核心技术之一,能让多个任务同时执行,大幅提升性能。
📚 术语表
| 术语名称 | LangGraph 定义和解读 | Python 定义和说明 | 重要程度 |
|---|---|---|---|
| Parallelization(并行执行) | 让多个节点在同一时间步(step)内同时运行的执行模式 | 利用多核或异步机制同时执行多个任务,提升处理效率 | ⭐⭐⭐⭐⭐ |
| Fan-out(扇出) | 一个节点的输出分发到多个并行节点的模式 | 将单个数据源分发到多个处理单元的设计模式 | ⭐⭐⭐⭐⭐ |
| Fan-in(扇入) | 多个并行节点的输出汇聚到一个节点的模式 | 将多个处理结果合并到单一处理单元的设计模式 | ⭐⭐⭐⭐⭐ |
| Reducer | 定义如何合并多个并行更新的函数,通过 Annotated 指定 | 接收两个参数并返回合并结果的纯函数 | ⭐⭐⭐⭐⭐ |
| Annotated | Python 类型注解工具,用于为类型添加元数据(如 reducer) | from typing import Annotated,格式:Annotated[类型, 元数据] | ⭐⭐⭐⭐ |
| operator.add | Python 内置的加法运算符函数,常用作列表拼接的 reducer | operator.add([1, 2], [3, 4]) 返回 [1, 2, 3, 4] | ⭐⭐⭐⭐ |
| Step(执行步骤) | LangGraph 执行流程的基本单位,并行节点在同一 step 中执行 | 图执行的一个逻辑时刻,同一 step 的节点可并发运行 | ⭐⭐⭐⭐ |
| TypedDict | 定义字典结构和类型的 Python 工具类 | from typing_extensions import TypedDict,提供类型检查和 IDE 补全 | ⭐⭐⭐ |
| InvalidUpdateError | 当并行节点试图更新同一 state 键但未指定 reducer 时抛出的错误 | LangGraph 特定异常,提示需要使用 Annotated 和 reducer | ⭐⭐⭐⭐ |
| Callable Class(可调用类) | 实现 __call__ 方法的类,实例可以像函数一样调用 | 通过 def __call__(self, *args) 使对象可被调用 | ⭐⭐⭐ |
🎯 核心概念
什么是并行执行?
在 LangGraph 中,并行执行指的是让多个节点在同一时间步(step)内同时运行,而不是按顺序一个接一个执行。这种模式常被称为 Fan-out(扇出)和 Fan-in(扇入):
- Fan-out(扇出):一个节点的输出分发到多个并行节点
- Fan-in(扇入):多个并行节点的输出汇聚到一个节点
🔧 基础示例:线性执行 vs 并行执行
1. 线性执行图(Sequential)
首先看一个简单的线性执行示例:
from typing import Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
state: str
class ReturnNodeValue:
def __init__(self, node_secret: str):
self._value = node_secret
def __call__(self, state: State) -> Any:
print(f"Adding {self._value} to {state['state']}")
return {"state": [self._value]}
# 构建图
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
# 线性流程:START → a → b → c → d → END
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("b", "c")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
# 🎨 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))执行结果:
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm B"]
Adding I'm D to ["I'm C"]
{'state': ["I'm D"]}分析: 每个节点按顺序执行,后一个节点会覆盖前一个节点的状态。
2. 并行执行图(Parallel)- 遇到问题!
现在让我们尝试让 b 和 c 并行执行:
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
# 并行流程:a 扇出到 b 和 c,然后扇入到 d
builder.add_edge(START, "a")
builder.add_edge("a", "b") # a → b
builder.add_edge("a", "c") # a → c(并行)
builder.add_edge("b", "d") # b → d
builder.add_edge("c", "d") # c → d
builder.add_edge("d", END)
graph = builder.compile()
# 🎨 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))执行时会报错!
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
An error occurred: At key 'state': Can receive only one value per step.
Use an Annotated key to handle multiple values.为什么会报错?
因为在同一个时间步(step)内,b 和 c 都试图更新 state 这个键。LangGraph 不知道如何合并这两个更新,所以抛出 InvalidUpdateError 错误。
🔑 解决方案:使用 Reducer
什么是 Reducer?
Reducer 是一个函数,用于定义如何合并多个并行更新。在 Python 中,我们使用 Annotated 类型提示来指定 reducer。
Python 知识点:Annotated 类型提示
Annotated 是 Python 3.9+ 引入的类型提示工具,允许我们为类型添加元数据:
from typing import Annotated
import operator
# 语法:Annotated[类型, 元数据]
state: Annotated[list, operator.add]
# ^^^^^^^^ ^^^^ ^^^^^^^^^^^^^
# 关键字 类型 reducer 函数使用 operator.add 作为 Reducer
operator.add 是 Python 内置的加法运算符函数。对于列表,它执行列表拼接:
import operator
# operator.add 对列表的作用
result = operator.add([1, 2], [3, 4]) # 结果: [1, 2, 3, 4]修复后的代码:
import operator
from typing import Annotated
class State(TypedDict):
# 使用 operator.add 作为 reducer,让 state 支持追加
state: Annotated[list, operator.add]
# 构建图(结构同上)
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
# 🎨 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))执行结果:
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm D to ["I'm A", "I'm B", "I'm C"]
{'state': ["I'm A", "I'm B", "I'm C", "I'm D"]}✅ 成功! 现在 b 和 c 的更新都被保留了,它们的结果被拼接到 state 列表中。
⏳ 等待并行节点完成
不同长度的并行路径
当并行路径的长度不同时会发生什么?
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("b2", ReturnNodeValue("I'm B2")) # b 路径的额外节点
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b2") # b → b2(更长的路径)
builder.add_edge(["b2", "c"], "d") # 等待 b2 和 c 都完成
builder.add_edge("d", END)
graph = builder.compile()
# 🎨 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))执行结果:
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm B2 to ["I'm A", "I'm B", "I'm C"]
Adding I'm D to ["I'm A", "I'm B", "I'm C", "I'm B2"]关键点:
b、b2和c都属于同一个执行步骤(step)- 虽然
c先完成,但图会等待b2也完成后,才进入d节点 - 这是 LangGraph 的自动同步机制
🎨 控制状态更新的顺序
问题:默认顺序不可控
在上面的例子中,虽然都是同一步,但状态更新的顺序是:["I'm A", "I'm B", "I'm C", "I'm B2"]
注意 "I'm C" 出现在 "I'm B2" 之前,即使 b2 是 b 的后续节点。
原因: LangGraph 根据图的拓扑结构决定更新顺序,我们无法直接控制。
解决方案:自定义 Reducer
我们可以编写自定义 reducer 来排序状态更新:
def sorting_reducer(left, right):
"""合并并排序列表中的值"""
# 确保 left 是列表
if not isinstance(left, list):
left = [left]
# 确保 right 是列表
if not isinstance(right, list):
right = [right]
# 合并并排序
return sorted(left + right, reverse=False)
class State(TypedDict):
# 使用自定义的 sorting_reducer
state: Annotated[list, sorting_reducer]执行结果:
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm B2 to ["I'm A", "I'm B", "I'm C"]
Adding I'm D to ["I'm A", "I'm B", "I'm B2", "I'm C"]
{'state': ["I'm A", "I'm B", "I'm B2", "I'm C", "I'm D"]}注意: 现在顺序变成了 ["I'm A", "I'm B", "I'm B2", "I'm C", "I'm D"],按字母顺序排列!
🔍 Reducer 函数详解
自定义 reducer 的工作原理:
def sorting_reducer(left, right):
# left: 当前状态的值
# right: 新节点返回的值
# 1. 标准化为列表
if not isinstance(left, list):
left = [left]
if not isinstance(right, list):
right = [right]
# 2. 合并并处理(这里是排序)
return sorted(left + right, reverse=False)其他 reducer 示例:
# 只保留最新的 N 个值
def keep_last_n(n=5):
def reducer(left, right):
combined = (left if isinstance(left, list) else [left]) + \
(right if isinstance(right, list) else [right])
return combined[-n:]
return reducer
# 去重
def unique_reducer(left, right):
combined = (left if isinstance(left, list) else [left]) + \
(right if isinstance(right, list) else [right])
return list(dict.fromkeys(combined)) # 保持顺序的去重🤖 实战案例:并行检索生成答案
场景
我们要构建一个问答系统,同时从两个来源获取信息:
- Wikipedia - 获取百科知识
- Web Search (Tavily) - 获取最新网络信息
然后让 LLM 基于这些信息生成答案。
State 定义
import operator
from typing import Annotated
from typing_extensions import TypedDict
class State(TypedDict):
question: str # 用户问题
answer: str # 生成的答案
context: Annotated[list, operator.add] # 检索到的上下文(支持并行追加)关键点:
context使用operator.add作为 reducer,允许两个检索节点并行写入
节点实现
1. Web 搜索节点
from langchain_community.tools import TavilySearchResults
def search_web(state):
"""从网络搜索获取文档"""
# 使用 Tavily 搜索
tavily_search = TavilySearchResults(max_results=3)
search_docs = tavily_search.invoke(state['question'])
# 格式化搜索结果
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document href="{doc["url"]}">\n{doc["content"]}\n</Document>'
for doc in search_docs
]
)
return {"context": [formatted_search_docs]}知识点:
- 返回
{"context": [formatted_search_docs]},注意context的值是一个列表 - 这样
operator.add可以将其拼接到现有的 context 中
2. Wikipedia 搜索节点
from langchain_community.document_loaders import WikipediaLoader
def search_wikipedia(state):
"""从 Wikipedia 获取文档"""
# 搜索 Wikipedia
search_docs = WikipediaLoader(
query=state['question'],
load_max_docs=2
).load()
# 格式化搜索结果
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}">\n{doc.page_content}\n</Document>'
for doc in search_docs
]
)
return {"context": [formatted_search_docs]}3. 生成答案节点
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
llm = ChatOpenAI(model="gpt-5-nano", temperature=0)
def generate_answer(state):
"""基于上下文生成答案"""
# 获取状态
context = state["context"]
question = state["question"]
# 构建提示词
answer_template = """Answer the question {question} using this context: {context}"""
answer_instructions = answer_template.format(
question=question,
context=context
)
# 调用 LLM 生成答案
answer = llm.invoke([
SystemMessage(content=answer_instructions),
HumanMessage(content="Answer the question.")
])
return {"answer": answer}构建并行图
from langgraph.graph import StateGraph, START, END
builder = StateGraph(State)
# 添加节点
builder.add_node("search_web", search_web)
builder.add_node("search_wikipedia", search_wikipedia)
builder.add_node("generate_answer", generate_answer)
# 并行流程
builder.add_edge(START, "search_wikipedia") # START → search_wikipedia
builder.add_edge(START, "search_web") # START → search_web(并行)
builder.add_edge("search_wikipedia", "generate_answer") # 汇聚
builder.add_edge("search_web", "generate_answer") # 汇聚
builder.add_edge("generate_answer", END)
graph = builder.compile()
# 🎨 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))流程图:
START
/ \
/ \
search_web search_wikipedia
\ /
\ /
generate_answer
|
END执行查询
result = graph.invoke({
"question": "How were Nvidia's Q2 2024 earnings"
})
print(result['answer'].content)输出示例:
Nvidia's Q2 2024 earnings were strong, showcasing record revenue and a robust
performance in its data center division. The company reported revenue of $30.0
billion, which was up 15% from the previous quarter and up 122% from a year ago.
GAAP earnings per diluted share were $0.67, up 12% from the previous quarter and
up 168% from a year ago, while non-GAAP earnings per diluted share were $0.68...🎓 核心知识点总结
LangGraph 特有概念
并行执行模式
- Fan-out:一个节点扇出到多个节点
- Fan-in:多个节点汇聚到一个节点
Reducer 机制
- 必须使用 reducer 处理并行更新同一 state 键的情况
operator.add用于列表拼接- 可以自定义 reducer 控制合并逻辑
执行步骤(Step)
- 并行节点在同一个 step 中执行
- 图会等待所有并行路径完成后再进入下一步
状态更新顺序
- 默认顺序由 LangGraph 根据图拓扑决定
- 可通过自定义 reducer 控制顺序
Python 特有知识点
TypedDictpythonfrom typing_extensions import TypedDict class State(TypedDict): field1: str field2: int用于定义字典的类型结构
Annotated类型提示pythonfrom typing import Annotated # 语法:Annotated[类型, 元数据...] field: Annotated[list, operator.add]为类型添加额外的元数据
operator.addpythonimport operator # 对列表执行拼接 operator.add([1, 2], [3, 4]) # [1, 2, 3, 4]可调用类(Callable Class)
pythonclass ReturnNodeValue: def __init__(self, value): self._value = value def __call__(self, state): # 实现 __call__ 方法 return {"state": [self._value]}实现
__call__方法使对象可以像函数一样调用
💡 最佳实践
1. 何时使用并行执行?
✅ 适用场景:
- 多个独立的数据源检索(如同时查询数据库、API、搜索引擎)
- 多个独立的处理任务(如同时进行文本分类、实体提取、情感分析)
- 需要多个 Agent 同时工作的场景
❌ 不适用场景:
- 任务之间有依赖关系
- 需要严格的执行顺序
- 资源受限(如 API 调用限制)
2. Reducer 选择指南
| 场景 | Reducer | 说明 |
|---|---|---|
| 收集所有结果 | operator.add | 拼接所有更新 |
| 需要排序 | 自定义 sorting_reducer | 合并后排序 |
| 去重 | 自定义 unique_reducer | 只保留唯一值 |
| 只保留最新 | 自定义 lambda left, right: right | 覆盖模式 |
| 保留最近 N 个 | 自定义 keep_last_n | 滑动窗口 |
3. 状态设计建议
class State(TypedDict):
# 输入字段(不会被并行更新)
question: str
# 并行收集的数据(使用 reducer)
search_results: Annotated[list, operator.add]
# 最终输出(单一节点更新)
answer: str🚀 进阶技巧
稳定排序(Stable Sorting)
对于需要精确控制顺序的场景,可以使用"临时字段 + 汇聚节点"模式:
class State(TypedDict):
question: str
temp_results: dict # 临时存储并行结果
final_results: list # 排序后的最终结果
def collect_results(state):
"""汇聚节点:合并并排序临时结果"""
results = state['temp_results']
# 按自定义顺序排序
ordered = [results['source1'], results['source2'], results['source3']]
return {"final_results": ordered, "temp_results": {}} # 清空临时字段详见 官方文档
📖 扩展阅读
- LangGraph 并行执行官方文档
- LangGraph State 和 Reducer 详解
- Python
typing模块文档
总结:并行执行是 LangGraph 的强大功能,通过合理使用 reducer 和图结构设计,可以构建高效的多任务、多智能体系统。掌握这个概念是进入高级 LangGraph 开发的关键一步!
完整案例代码(可直接运行)
以下是一个完整的并行执行示例,展示了如何并行检索多个数据源并生成答案:
"""
LangGraph 并行执行完整示例
演示 Fan-out/Fan-in 模式和 Reducer 机制
"""
import operator
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
# ========== 1. 状态定义 ==========
class State(TypedDict):
"""问答系统状态"""
question: str # 用户问题
context: Annotated[list, operator.add] # 检索到的上下文(支持并行追加)
answer: str # 生成的答案
# ========== 2. 初始化 LLM ==========
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# ========== 3. 定义节点函数 ==========
def search_wikipedia(state: State):
"""
模拟 Wikipedia 搜索
实际应用中可使用: from langchain_community.document_loaders import WikipediaLoader
"""
question = state["question"]
# 模拟搜索结果
wiki_result = f"""
<Document source="Wikipedia">
Wikipedia 搜索结果关于: {question}
LangGraph 是 LangChain 团队开发的用于构建复杂 AI 应用的框架。
它支持循环、分支、持久化等高级功能。
主要特点包括:状态管理、检查点、人机协作等。
</Document>
"""
print(f"🔍 Wikipedia 搜索完成")
return {"context": [wiki_result]}
def search_web(state: State):
"""
模拟网络搜索
实际应用中可使用: from langchain_community.tools import TavilySearchResults
"""
question = state["question"]
# 模拟搜索结果
web_result = f"""
<Document source="Web Search">
网络搜索结果关于: {question}
LangGraph 提供了构建 AI Agent 的完整解决方案。
支持多智能体协作、工具调用、记忆系统等功能。
广泛应用于 RAG、对话系统、自动化工作流等场景。
</Document>
"""
print(f"🌐 网络搜索完成")
return {"context": [web_result]}
def search_docs(state: State):
"""
模拟本地文档搜索
实际应用中可使用向量数据库如 Chroma、FAISS 等
"""
question = state["question"]
# 模拟搜索结果
docs_result = f"""
<Document source="Local Docs">
本地文档搜索结果关于: {question}
LangGraph 的核心概念包括:
1. StateGraph - 状态图,管理应用状态
2. Node - 节点,执行具体操作
3. Edge - 边,定义执行流程
4. Checkpointer - 检查点,保存状态历史
</Document>
"""
print(f"📚 本地文档搜索完成")
return {"context": [docs_result]}
def generate_answer(state: State):
"""基于检索到的上下文生成答案"""
question = state["question"]
context = state["context"]
# 合并所有上下文
combined_context = "\n\n".join(context)
# 构建提示词
system_prompt = f"""你是一个智能问答助手。请根据以下检索到的信息回答用户问题。
检索到的信息:
{combined_context}
要求:
1. 综合多个来源的信息
2. 给出准确、简洁的回答
3. 如果信息不足,请说明"""
# 调用 LLM 生成答案
response = llm.invoke([
SystemMessage(content=system_prompt),
HumanMessage(content=question)
])
print(f"✅ 答案生成完成")
return {"answer": response.content}
# ========== 4. 构建图 ==========
def build_parallel_qa_graph():
"""构建并行检索问答图"""
builder = StateGraph(State)
# 添加节点
builder.add_node("search_wikipedia", search_wikipedia)
builder.add_node("search_web", search_web)
builder.add_node("search_docs", search_docs)
builder.add_node("generate_answer", generate_answer)
# 并行执行:START 扇出到三个搜索节点
builder.add_edge(START, "search_wikipedia")
builder.add_edge(START, "search_web")
builder.add_edge(START, "search_docs")
# Fan-in:三个搜索节点汇聚到生成答案节点
builder.add_edge("search_wikipedia", "generate_answer")
builder.add_edge("search_web", "generate_answer")
builder.add_edge("search_docs", "generate_answer")
# 结束
builder.add_edge("generate_answer", END)
return builder.compile()
# ========== 5. 主程序 ==========
if __name__ == "__main__":
# 构建图
graph = build_parallel_qa_graph()
# 可视化图结构
print("=" * 50)
print("📊 图结构可视化")
print("=" * 50)
try:
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:
# 如果无法显示图片,打印 Mermaid 代码
print(graph.get_graph().draw_mermaid())
# 执行查询
print("\n" + "=" * 50)
print("🚀 执行并行检索查询")
print("=" * 50)
question = "什么是 LangGraph?它有哪些主要功能?"
print(f"\n📝 问题: {question}\n")
# 使用 stream 模式查看执行过程
print("⏳ 执行中...\n")
for step in graph.stream({"question": question}):
for node_name, output in step.items():
print(f"📌 节点 [{node_name}] 完成")
# 获取最终结果
result = graph.invoke({"question": question})
print("\n" + "=" * 50)
print("📋 最终结果")
print("=" * 50)
print(f"\n🔹 问题: {result['question']}")
print(f"\n🔹 检索到 {len(result['context'])} 个上下文源")
print(f"\n🔹 答案:\n{result['answer']}")运行结果示例
==================================================
📊 图结构可视化
==================================================
%%{init: {'flowchart': {'curve': 'linear'}}}%%
graph TD;
__start__([__start__]):::startclass;
search_wikipedia([search_wikipedia]):::otherclass;
search_web([search_web]):::otherclass;
search_docs([search_docs]):::otherclass;
generate_answer([generate_answer]):::otherclass;
__end__([__end__]):::endclass;
__start__ --> search_wikipedia;
__start__ --> search_web;
__start__ --> search_docs;
search_wikipedia --> generate_answer;
search_web --> generate_answer;
search_docs --> generate_answer;
generate_answer --> __end__;
==================================================
🚀 执行并行检索查询
==================================================
📝 问题: 什么是 LangGraph?它有哪些主要功能?
⏳ 执行中...
🔍 Wikipedia 搜索完成
📌 节点 [search_wikipedia] 完成
🌐 网络搜索完成
📌 节点 [search_web] 完成
📚 本地文档搜索完成
📌 节点 [search_docs] 完成
✅ 答案生成完成
📌 节点 [generate_answer] 完成
==================================================
📋 最终结果
==================================================
🔹 问题: 什么是 LangGraph?它有哪些主要功能?
🔹 检索到 3 个上下文源
🔹 答案:
LangGraph 是由 LangChain 团队开发的用于构建复杂 AI 应用的框架。
主要功能包括:
1. **状态管理**:通过 StateGraph 管理应用状态
2. **循环和分支**:支持复杂的控制流程
3. **检查点机制**:保存状态历史,支持恢复和调试
4. **人机协作**:支持人工干预和审核
5. **多智能体协作**:支持多个 AI Agent 协同工作
6. **工具调用**:集成外部工具和 API
7. **记忆系统**:支持长期记忆和上下文管理
LangGraph 广泛应用于 RAG 系统、对话系统、自动化工作流等场景。核心知识点回顾
| 概念 | 说明 | 代码示例 |
|---|---|---|
| Reducer | 合并并行节点的输出 | Annotated[list, operator.add] |
| Fan-out | 一个节点扇出到多个节点 | add_edge(START, "node1") × 3 |
| Fan-in | 多个节点汇聚到一个节点 | add_edge("node1", "final") × 3 |
| 并行执行 | 同一 step 内同时运行 | 三个搜索节点同时执行 |
| 自动同步 | 等待所有并行节点完成 | generate_answer 等待三个搜索完成 |