LangGraph Map-Reduce 详细解读
📚 概述
本文档详细解读 LangGraph 中的 Map-Reduce 模式。这是一种经典的分布式计算模式,在 LangGraph 中用于高效的任务分解和并行处理。通过 Map-Reduce,我们可以将复杂任务拆分成多个子任务并行执行,然后聚合结果。
🎯 核心概念
什么是 Map-Reduce?
Map-Reduce 是一种编程模型,包含两个阶段:
Map(映射)阶段
- 将大任务分解为多个小的子任务
- 并行处理每个子任务
- 每个子任务独立执行,互不影响
Reduce(归约)阶段
- 收集所有子任务的结果
- 对结果进行聚合、汇总或筛选
- 产生最终输出
📚 术语表
| 术语名称 | LangGraph 定义和解读 | Python 定义和说明 | 重要程度 |
|---|---|---|---|
| Map-Reduce | 分解任务并行处理(Map),然后聚合结果(Reduce)的编程模式 | 经典分布式计算模式,Google MapReduce 论文提出 | ⭐⭐⭐⭐⭐ |
| Send API | 动态创建并行任务的核心 API,格式:Send(节点名, 状态字典) | from langgraph.types import Send,运行时决定并行数量 | ⭐⭐⭐⭐⭐ |
| Map 阶段 | 将大任务分解为多个小任务并并行处理的阶段 | 每个子任务独立执行,互不影响,数量动态可变 | ⭐⭐⭐⭐⭐ |
| Reduce 阶段 | 收集所有子任务结果并聚合、汇总的阶段 | 使用 reducer(如 operator.add)合并并行结果 | ⭐⭐⭐⭐⭐ |
| 动态分发 | 根据运行时数据决定并行任务数量的机制 | 通过 Send 返回列表实现,列表长度即并行任务数 | ⭐⭐⭐⭐⭐ |
| 条件边 + Send | add_conditional_edges(源节点, 返回Send列表的函数, [目标节点]) | LangGraph 实现动态并行的关键,自动处理并行和聚合 | ⭐⭐⭐⭐⭐ |
| Pydantic BaseModel | 定义数据模型并支持自动验证的类 | from pydantic import BaseModel,用于 LLM 结构化输出 | ⭐⭐⭐⭐ |
| with_structured_output() | LangChain 方法,强制 LLM 返回符合指定模型的 JSON | llm.with_structured_output(Model).invoke(prompt) | ⭐⭐⭐⭐ |
| OverallState vs 局部State | 全局状态贯穿整个图,局部状态仅用于特定节点 | OverallState 收集所有结果,局部State 只包含必需字段 | ⭐⭐⭐⭐ |
| operator.add | Python 内置函数,用作列表拼接的 reducer | operator.add([1,2], [3,4]) 返回 [1,2,3,4] | ⭐⭐⭐⭐ |
经典应用场景
- 文档处理:分段处理长文档,然后汇总摘要
- 数据分析:并行分析多个数据源,聚合统计结果
- 内容生成:生成多个候选内容,选择最佳结果
- 多源查询:并行查询多个 API,合并结果
🎭 实战案例:笑话生成系统
我们将构建一个智能笑话生成系统,演示 Map-Reduce 的完整流程:
需求:
- Map 阶段:根据一个主题(如"动物"),生成多个子主题的笑话
- Reduce 阶段:从所有生成的笑话中,选出最好的一个
系统架构图
用户输入主题 "animals"
↓
[generate_topics] 生成子主题: [mammals, reptiles, birds]
↓
(Send API 动态分发)
↓
┌────────┬────────┬────────┐
↓ ↓ ↓ ↓
[joke-1] [joke-2] [joke-3] (Map 阶段:并行生成)
↓ ↓ ↓
└────────┴────────┴────────┘
↓
[best_joke] (Reduce 阶段:选择最佳)
↓
返回结果🔧 代码实现详解
1. 定义提示词和模型
from langchain_openai import ChatOpenAI
# 三个关键提示词
subjects_prompt = """Generate a list of 3 sub-topics that are all related to this overall topic: {topic}."""
joke_prompt = """Generate a joke about {subject}"""
best_joke_prompt = """Below are a bunch of jokes about {topic}. Select the best one! Return the ID of the best one, starting 0 as the ID for the first joke. Jokes: \n\n {jokes}"""
# 初始化 LLM
model = ChatOpenAI(model="gpt-5-nano", temperature=0)说明:
subjects_prompt:将主题拆分为 3 个子主题joke_prompt:为单个子主题生成笑话best_joke_prompt:从多个笑话中选择最好的
2. 定义状态(State)
这是 Map-Reduce 的关键!我们需要两种状态:
全局状态(OverallState)
import operator
from typing import Annotated
from typing_extensions import TypedDict
from pydantic import BaseModel
class Subjects(BaseModel):
subjects: list[str]
class BestJoke(BaseModel):
id: int
class OverallState(TypedDict):
topic: str # 用户输入的主题
subjects: list # 生成的子主题列表
jokes: Annotated[list, operator.add] # 收集的笑话(支持并行追加)
best_selected_joke: str # 最终选出的最佳笑话关键点:
jokes使用operator.addreducer,允许多个并行节点同时写入- 这与之前学习的 parallelization 知识一致
局部状态(JokeState)
class JokeState(TypedDict):
subject: str # 单个笑话生成节点只需要知道主题
class Joke(BaseModel):
joke: strPython 知识点:Pydantic BaseModel
BaseModel 是 Pydantic 库的核心类,用于:
- 数据验证
- 类型检查
- 结构化输出
# 使用示例
class Joke(BaseModel):
joke: str
# LLM 会返回符合这个结构的数据
response = model.with_structured_output(Joke).invoke(prompt)
# response.joke 就是字符串类型的笑话内容3. Map 阶段:生成子主题
def generate_topics(state: OverallState):
prompt = subjects_prompt.format(topic=state["topic"])
response = model.with_structured_output(Subjects).invoke(prompt)
return {"subjects": response.subjects}功能: 将用户输入的主题(如 "animals")分解为 3 个子主题(如 ["mammals", "reptiles", "birds"])
4. 动态任务分发:Send API ⭐
这是 Map-Reduce 的核心魔法!
from langgraph.types import Send
def continue_to_jokes(state: OverallState):
# 为每个子主题创建一个 Send 任务
return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]Send API 详解:
Send("generate_joke", {"subject": s})
# ^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^
# 目标节点名称 发送的状态数据重要特性:
- 动态并行化:自动为列表中的每个元素创建并行任务
- 状态灵活性:可以发送任意状态,不需要与
OverallState完全匹配 - 自动扩展:无论有 3 个还是 300 个子主题,都能自动并行处理
为什么强大?
传统方式需要预先知道有多少个并行任务,而 Send 可以根据运行时数据动态创建任务:
# 如果有 3 个主题,Send 会创建 3 个并行任务
subjects = ["mammals", "reptiles", "birds"]
# 相当于:
# - Send("generate_joke", {"subject": "mammals"})
# - Send("generate_joke", {"subject": "reptiles"})
# - Send("generate_joke", {"subject": "birds"})
# 如果有 10 个主题,自动创建 10 个并行任务!5. Map 阶段:并行生成笑话
def generate_joke(state: JokeState):
prompt = joke_prompt.format(subject=state["subject"])
response = model.with_structured_output(Joke).invoke(prompt)
return {"jokes": [response.joke]}关键细节:
- 输入:
JokeState(只包含subject) - 输出:
{"jokes": [response.joke]}(注意是列表!) - 返回的数据会被写回
OverallState的jokes字段 - 由于
jokes有operator.addreducer,多个并行节点的输出会自动拼接
执行流程示意:
generate_joke(subject="mammals") → 返回 {"jokes": ["joke1"]}
generate_joke(subject="reptiles") → 返回 {"jokes": ["joke2"]}
generate_joke(subject="birds") → 返回 {"jokes": ["joke3"]}
最终 OverallState.jokes = ["joke1", "joke2", "joke3"]6. Reduce 阶段:选择最佳笑话
def best_joke(state: OverallState):
# 将所有笑话合并为一个字符串
jokes = "\n\n".join(state["jokes"])
# 让 LLM 选择最好的笑话
prompt = best_joke_prompt.format(topic=state["topic"], jokes=jokes)
response = model.with_structured_output(BestJoke).invoke(prompt)
# 返回被选中的笑话
return {"best_selected_joke": state["jokes"][response.id]}功能:
- 接收所有并行生成的笑话
- 使用 LLM 评估并选择最佳笑话
BestJoke模型返回最佳笑话的索引(id)- 根据索引从
state["jokes"]中取出最佳笑话
7. 构建图
from langgraph.graph import END, StateGraph, START
# 创建图
graph = StateGraph(OverallState)
# 添加节点
graph.add_node("generate_topics", generate_topics)
graph.add_node("generate_joke", generate_joke)
graph.add_node("best_joke", best_joke)
# 添加边
graph.add_edge(START, "generate_topics")
# 条件边:使用 Send 动态分发任务
graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
graph.add_edge("generate_joke", "best_joke")
graph.add_edge("best_joke", END)
# 编译
app = graph.compile()
# 🎨 可视化图结构
from IPython.display import Image, display
display(Image(app.get_graph().draw_mermaid_png()))LangGraph 知识点:条件边与 Send
graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
# ^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^
# 源节点 条件函数 可能的目标节点continue_to_jokes返回Send对象列表- LangGraph 会自动为每个
Send创建一个到generate_joke的并行路径 - 所有并行任务完成后,自动进入
best_joke节点
8. 执行图
for s in app.stream({"topic": "animals"}):
print(s)输出示例:
{'generate_topics': {'subjects': ['mammals', 'reptiles', 'birds']}}
{'generate_joke': {'jokes': ["Why don't mammals ever get lost? Because they always follow their 'instincts'!"]}}
{'generate_joke': {'jokes': ["Why don't alligators like fast food? Because they can't catch it!"]}}
{'generate_joke': {'jokes': ["Why do birds fly south for the winter? Because it's too far to walk!"]}}
{'best_joke': {'best_selected_joke': "Why don't alligators like fast food? Because they can't catch it!"}}观察要点:
- 首先生成 3 个子主题
- 然后并行生成 3 个笑话(注意输出顺序可能不同)
- 最后选出最佳笑话
🎓 核心知识点总结
LangGraph 特有概念
1. Send API
作用: 动态创建并行任务
Send(node_name, state_dict)特点:
- 运行时动态决定并行数量
- 可以发送自定义状态(不必是完整的图状态)
- 自动处理并行执行和结果聚合
2. Map-Reduce 模式
| 阶段 | 作用 | 节点数量 | 执行方式 |
|---|---|---|---|
| Map | 分解任务,生成结果 | 动态(根据数据) | 并行 |
| Reduce | 聚合结果 | 1 个 | 串行(等待所有 Map 完成) |
3. 多状态设计
- OverallState:全局状态,贯穿整个图
- JokeState:局部状态,只用于特定节点
- 通过
Send可以在不同状态之间传递数据
4. 条件边 + Send
graph.add_conditional_edges(
"source_node", # 源节点
condition_function, # 返回 Send 列表的函数
["target_node"] # 目标节点(Send 指向的节点)
)Python 特有知识点
1. Pydantic BaseModel
from pydantic import BaseModel
class Joke(BaseModel):
joke: str
# 用于 LLM 结构化输出
response = model.with_structured_output(Joke).invoke(prompt)优势:
- 自动类型验证
- 清晰的数据结构
- 与 LangChain 无缝集成
2. TypedDict vs BaseModel
| 特性 | TypedDict | BaseModel |
|---|---|---|
| 类型检查 | 静态(IDE 提示) | 运行时验证 |
| 验证 | 无 | 有 |
| 用途 | 状态定义 | 数据模型、API 输出 |
| 性能 | 更快(无验证) | 稍慢(有验证) |
# TypedDict - 用于 State
class OverallState(TypedDict):
topic: str
# BaseModel - 用于结构化输出
class Joke(BaseModel):
joke: str3. 列表推导式 + Send
# 创建多个 Send 任务的优雅方式
[Send("generate_joke", {"subject": s}) for s in subjects]
# 等价于:
result = []
for s in subjects:
result.append(Send("generate_joke", {"subject": s}))💡 最佳实践
1. 何时使用 Map-Reduce?
✅ 适用场景:
- 需要对多个数据项执行相同操作(如批量翻译、批量摘要)
- 任务可以自然分解为独立子任务(如分段处理文档)
- 需要从多个候选结果中筛选(如生成多个答案选最佳)
- 数据量大,需要并行加速(如分析多个数据源)
❌ 不适用场景:
- 子任务之间有依赖关系
- 无法自然分解的任务
- 单一数据源的简单查询
2. Send API 使用技巧
技巧 1:动态控制并行数量
def continue_to_jokes(state: OverallState):
# 可以根据条件过滤
subjects = [s for s in state["subjects"] if len(s) > 3]
return [Send("generate_joke", {"subject": s}) for s in subjects]技巧 2:发送额外上下文
def continue_to_jokes(state: OverallState):
return [
Send("generate_joke", {
"subject": s,
"original_topic": state["topic"], # 传递额外信息
"style": "family-friendly"
})
for s in state["subjects"]
]技巧 3:条件性发送
def continue_to_jokes(state: OverallState):
sends = []
for i, s in enumerate(state["subjects"]):
if i < 5: # 最多只处理前 5 个
sends.append(Send("generate_joke", {"subject": s}))
return sends3. 状态设计原则
原则 1:最小化局部状态
# ✅ 好的设计 - 只包含必需字段
class JokeState(TypedDict):
subject: str
# ❌ 不好的设计 - 包含不需要的字段
class JokeState(TypedDict):
subject: str
topic: str # generate_joke 不需要这个
jokes: list # 也不需要这个原则 2:使用 Reducer 聚合结果
# Map 节点的输出字段必须有 reducer
class OverallState(TypedDict):
jokes: Annotated[list, operator.add] # ✅ 正确
# jokes: list # ❌ 错误!并行更新会冲突原则 3:清晰的状态流转
OverallState (完整状态)
↓
Send → JokeState (子集)
↓
返回 → OverallState.jokes (部分更新)🚀 进阶技巧
1. 多层 Map-Reduce
可以嵌套多个 Map-Reduce 层次:
主题
↓ Map
子主题 (Level 1)
↓ Map
详细主题 (Level 2)
↓ Reduce
汇总子主题
↓ Reduce
最终结果2. 带错误处理的 Map-Reduce
def generate_joke(state: JokeState):
try:
prompt = joke_prompt.format(subject=state["subject"])
response = model.with_structured_output(Joke).invoke(prompt)
return {"jokes": [response.joke]}
except Exception as e:
# 返回错误标记或默认值
return {"jokes": [f"Error generating joke for {state['subject']}"]}3. 限制并行度
虽然 Send 会自动并行,但有时需要限制同时执行的任务数(如 API 速率限制):
# 方法 1:在条件函数中限制
def continue_to_jokes(state: OverallState):
# 只发送前 N 个
max_parallel = 5
subjects = state["subjects"][:max_parallel]
return [Send("generate_joke", {"subject": s}) for s in subjects]
# 方法 2:分批处理(需要更复杂的图设计)📊 Map-Reduce vs 简单并行
| 特性 | Map-Reduce (Send) | 简单并行 (Fan-out) |
|---|---|---|
| 并行数量 | 动态(运行时决定) | 静态(设计时固定) |
| 状态传递 | 灵活(可自定义) | 固定(使用图状态) |
| 适用场景 | 列表处理、批量任务 | 固定数量的并行路径 |
| 复杂度 | 中等 | 低 |
| 扩展性 | 高 | 低 |
示例对比:
# 简单并行 - 固定 3 个路径
builder.add_edge("start", "task1")
builder.add_edge("start", "task2")
builder.add_edge("start", "task3")
# Map-Reduce - 动态 N 个任务
def send_tasks(state):
return [Send("task", {"data": d}) for d in state["data_list"]]
builder.add_conditional_edges("start", send_tasks, ["task"])🎯 实际应用案例
案例 1:文档摘要
# Map: 为每个段落生成摘要
def summarize_chunk(state: ChunkState):
summary = llm.invoke(f"Summarize: {state['chunk']}")
return {"summaries": [summary]}
# Reduce: 合并所有段落摘要
def combine_summaries(state: OverallState):
all_summaries = "\n".join(state["summaries"])
final = llm.invoke(f"Create final summary: {all_summaries}")
return {"final_summary": final}案例 2:多语言翻译
# Map: 翻译到多种语言
def send_to_translate(state):
languages = ["es", "fr", "de", "zh"]
return [Send("translate", {"lang": lang, "text": state["text"]})
for lang in languages]
# Reduce: 收集所有翻译
def collect_translations(state):
return {"all_translations": state["translations"]}案例 3:多角度分析
# Map: 从不同角度分析文本
def send_to_analyze(state):
perspectives = ["technical", "business", "user", "security"]
return [Send("analyze", {"perspective": p, "text": state["text"]})
for p in perspectives]
# Reduce: 综合所有分析
def synthesize(state):
combined = llm.invoke(f"Synthesize these analyses: {state['analyses']}")
return {"final_analysis": combined}📖 扩展阅读
🔍 常见问题
Q1: Send 和普通边有什么区别?
普通边: 静态路由,设计时确定
graph.add_edge("A", "B") # A 总是流向 BSend: 动态路由,运行时确定
# 根据数据动态创建多个并行任务
return [Send("B", data) for data in dynamic_data]Q2: 为什么 generate_joke 返回 {"jokes": [joke]} 而不是 {"jokes": joke}?
因为 OverallState.jokes 使用了 operator.add reducer,它期望操作列表:
# 正确 ✅
operator.add(["joke1"], ["joke2"]) # → ["joke1", "joke2"]
# 错误 ❌
operator.add("joke1", "joke2") # → "joke1joke2" (字符串拼接)Q3: 可以在 Send 中发送完全不同的状态吗?
可以!Send 发送的状态不需要与 OverallState 匹配:
# OverallState 有 topic, subjects, jokes
# 但 Send 可以发送任意结构
Send("generate_joke", {"subject": "cats", "style": "silly"})只要目标节点(generate_joke)能处理这个状态即可。
总结:Map-Reduce 是 LangGraph 中处理批量、并行任务的强大模式。通过 Send API,我们可以动态创建任意数量的并行任务,然后用 reducer 优雅地聚合结果。这是构建可扩展、高性能 AI 应用的关键技术!
完整案例代码(可直接运行)
以下是一个完整的 Map-Reduce 示例,展示了笑话生成系统的动态并行处理:
"""
LangGraph Map-Reduce 完整示例
演示 Send API 动态创建并行任务
场景:智能笑话生成系统
1. Map 阶段:根据主题生成多个子主题的笑话
2. Reduce 阶段:从所有笑话中选出最好的一个
"""
import operator
from typing import Annotated, List
from typing_extensions import TypedDict
from pydantic import BaseModel
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
from langchain_openai import ChatOpenAI
# ========== 1. 数据模型定义 ==========
class Subjects(BaseModel):
"""子主题列表模型"""
subjects: List[str]
class Joke(BaseModel):
"""笑话模型"""
joke: str
class BestJoke(BaseModel):
"""最佳笑话选择模型"""
id: int # 最佳笑话的索引
# ========== 2. 状态定义 ==========
# 全局状态(贯穿整个图)
class OverallState(TypedDict):
topic: str # 用户输入的主题
subjects: List[str] # 生成的子主题列表
jokes: Annotated[List[str], operator.add] # 所有生成的笑话(支持并行追加)
best_selected_joke: str # 最终选出的最佳笑话
# 局部状态(只用于 Map 节点)
class JokeState(TypedDict):
subject: str # 单个笑话生成节点只需要知道子主题
# ========== 3. 初始化 LLM ==========
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
# ========== 4. 提示词模板 ==========
subjects_prompt = """请根据以下主题生成 3 个相关的子主题,用于创作笑话。
主题: {topic}
要求:
- 每个子主题应该是具体的、有趣的
- 子主题之间应该有一定差异性
- 适合创作幽默内容
请直接返回 3 个子主题。"""
joke_prompt = """请创作一个关于 "{subject}" 的笑话。
要求:
- 笑话要简短有趣
- 适合所有年龄段
- 尽量出人意料的结局
请直接返回笑话内容。"""
best_joke_prompt = """以下是几个关于 "{topic}" 的笑话,请选出最好的一个。
笑话列表:
{jokes}
请返回最好笑话的编号(从 0 开始)。"""
# ========== 5. 定义节点函数 ==========
def generate_topics(state: OverallState):
"""
生成子主题(Map 之前的准备步骤)
将用户输入的主题分解为多个子主题
"""
topic = state["topic"]
prompt = subjects_prompt.format(topic=topic)
response = llm.with_structured_output(Subjects).invoke(prompt)
print(f"🎯 主题: {topic}")
print(f"📝 生成了 {len(response.subjects)} 个子主题: {response.subjects}")
return {"subjects": response.subjects}
def continue_to_jokes(state: OverallState):
"""
动态任务分发函数(Map 的核心)
为每个子主题创建一个 Send 任务
"""
subjects = state["subjects"]
# ⭐ 关键:使用 Send API 动态创建并行任务
sends = [Send("generate_joke", {"subject": s}) for s in subjects]
print(f"📤 分发 {len(sends)} 个笑话生成任务")
return sends
def generate_joke(state: JokeState):
"""
生成单个笑话(Map 节点)
每个并行任务独立执行
"""
subject = state["subject"]
prompt = joke_prompt.format(subject=subject)
response = llm.with_structured_output(Joke).invoke(prompt)
print(f"😂 生成笑话 - 主题: {subject}")
# 返回列表!因为 jokes 字段使用 operator.add 作为 reducer
return {"jokes": [response.joke]}
def best_joke(state: OverallState):
"""
选择最佳笑话(Reduce 节点)
收集所有并行生成的笑话,选出最好的一个
"""
topic = state["topic"]
jokes = state["jokes"]
# 格式化笑话列表
jokes_formatted = "\n\n".join([
f"笑话 {i}:\n{joke}"
for i, joke in enumerate(jokes)
])
prompt = best_joke_prompt.format(topic=topic, jokes=jokes_formatted)
response = llm.with_structured_output(BestJoke).invoke(prompt)
# 获取被选中的笑话
selected_joke = jokes[response.id]
print(f"🏆 选出最佳笑话(编号 {response.id})")
return {"best_selected_joke": selected_joke}
# ========== 6. 构建图 ==========
def build_joke_generator():
"""构建笑话生成图"""
builder = StateGraph(OverallState)
# 添加节点
builder.add_node("generate_topics", generate_topics)
builder.add_node("generate_joke", generate_joke)
builder.add_node("best_joke", best_joke)
# 添加边
builder.add_edge(START, "generate_topics")
# ⭐ 条件边:使用 Send 动态分发任务
builder.add_conditional_edges(
"generate_topics", # 源节点
continue_to_jokes, # 返回 Send 列表的函数
["generate_joke"] # 可能的目标节点
)
# 所有 Map 任务完成后,进入 Reduce 节点
builder.add_edge("generate_joke", "best_joke")
builder.add_edge("best_joke", END)
return builder.compile()
# ========== 7. 主程序 ==========
if __name__ == "__main__":
# 构建图
graph = build_joke_generator()
# 可视化图结构
print("=" * 60)
print("📊 图结构可视化")
print("=" * 60)
try:
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:
print(graph.get_graph().draw_mermaid())
# 执行测试
print("\n" + "=" * 60)
print("🚀 执行笑话生成系统")
print("=" * 60 + "\n")
topic = "动物"
# 使用 stream 模式观察执行过程
print("⏳ 执行中...\n")
for step in graph.stream({"topic": topic}):
for node_name, output in step.items():
if node_name == "generate_joke":
print(f" 📌 笑话生成完成")
else:
print(f"📌 节点 [{node_name}] 完成")
# 获取最终结果
result = graph.invoke({"topic": topic})
# 显示结果
print("\n" + "=" * 60)
print("📋 最终结果")
print("=" * 60)
print(f"\n🎯 主题: {result['topic']}")
print(f"\n📝 子主题: {result['subjects']}")
print(f"\n😂 所有生成的笑话:")
for i, joke in enumerate(result['jokes']):
print(f"\n 笑话 {i}:")
print(f" {joke}")
print(f"\n🏆 最佳笑话:")
print(f" {result['best_selected_joke']}")
# 测试不同主题
print("\n" + "=" * 60)
print("🔄 测试其他主题")
print("=" * 60 + "\n")
for test_topic in ["科技", "美食"]:
print(f"主题: {test_topic}")
result = graph.invoke({"topic": test_topic})
print(f"最佳笑话: {result['best_selected_joke'][:50]}...\n")运行结果示例
============================================================
📊 图结构可视化
============================================================
%%{init: {'flowchart': {'curve': 'linear'}}}%%
graph TD;
__start__([__start__]):::startclass;
generate_topics([generate_topics]):::otherclass;
generate_joke([generate_joke]):::otherclass;
best_joke([best_joke]):::otherclass;
__end__([__end__]):::endclass;
__start__ --> generate_topics;
generate_topics -.-> generate_joke;
generate_joke --> best_joke;
best_joke --> __end__;
============================================================
🚀 执行笑话生成系统
============================================================
⏳ 执行中...
🎯 主题: 动物
📝 生成了 3 个子主题: ['猫', '狗', '大象']
📤 分发 3 个笑话生成任务
📌 节点 [generate_topics] 完成
😂 生成笑话 - 主题: 猫
📌 笑话生成完成
😂 生成笑话 - 主题: 狗
📌 笑话生成完成
😂 生成笑话 - 主题: 大象
📌 笑话生成完成
🏆 选出最佳笑话(编号 1)
📌 节点 [best_joke] 完成
============================================================
📋 最终结果
============================================================
🎯 主题: 动物
📝 子主题: ['猫', '狗', '大象']
😂 所有生成的笑话:
笑话 0:
为什么猫总是看起来很傲慢?因为它们知道每次你叫它们的名字,它们选择不回应!
笑话 1:
狗为什么不会用电脑?因为它们总是把"Bark"错打成"Bytes"!
笑话 2:
大象为什么从来不玩捉迷藏?因为它太大了,藏哪都被发现!
🏆 最佳笑话:
狗为什么不会用电脑?因为它们总是把"Bark"错打成"Bytes"!
============================================================
🔄 测试其他主题
============================================================
主题: 科技
最佳笑话: 为什么程序员总是穿灰色衣服?因为他们不想处理"颜色"异常!...
主题: 美食
最佳笑话: 为什么披萨从来不说谎?因为它总是"薄饼"坦白!...核心知识点回顾
| 概念 | 说明 | 代码示例 |
|---|---|---|
| Send API | 动态创建并行任务 | Send("node_name", {"key": value}) |
| 动态分发 | 运行时决定并行数量 | [Send(...) for item in items] |
| OverallState | 全局状态,包含所有字段 | jokes: Annotated[List[str], operator.add] |
| 局部 State | Map 节点只需要的字段 | class JokeState: subject: str |
| 条件边 + Send | 实现动态 Map | add_conditional_edges(src, send_func, [targets]) |
| Reducer | 合并并行结果 | operator.add 拼接列表 |
| Map 阶段 | 并行处理每个子任务 | generate_joke 节点 |
| Reduce 阶段 | 聚合所有结果 | best_joke 节点 |