Skip to content

高级路由与控制流

在上一节中,我们学习了 LangGraph 的基础概念:State、Node、Edge 和条件边。本节将深入探讨 LangGraph 中更高级的路由和控制流机制,包括:

  • 可复用的路由函数:返回布尔值或枚举的路由模式
  • Send 动态分发:并行处理多个任务
  • Command API:命令式控制流,显式指定下一步
  • Subgraph 子图:将复杂流程模块化

这些特性是 LangGraph 1.x 版本的核心能力,掌握它们能让你构建更复杂、更灵活的 AI Agent。


案例一:可复用的路由函数

案例概览

在之前的条件边示例中,路由函数返回的是节点名称字符串。但这样做有一个问题:如果节点名称改变,路由函数也要跟着改。

更好的做法是:路由函数返回布尔值或枚举,再用 mapping 映射到具体节点。这样路由逻辑和节点名称解耦,更易维护和复用。

核心概念:Boolean Routing

python
def routing_func(state: State) -> bool:
    if state["score"] > 50:
        return True     # 高分
    else:
        return False    # 低分

builder.add_conditional_edges(
    START,
    routing_func,
    {
        True: "node_high",   # True → 高分节点
        False: "node_low"    # False → 低分节点
    }
)

mapping 参数的作用

  • 将路由函数的返回值(True/False)映射到实际节点名称
  • 路由逻辑只关心"满足/不满足条件",不关心具体节点叫什么
  • 节点重命名时,只需修改 mapping,不需要改路由函数

知识点拆解

1. 定义 State

python
from typing import TypedDict
from langchain_core.runnables import RunnableConfig
from langgraph.constants import START, END
from langgraph.graph import StateGraph

class State(TypedDict):
    score: int

2. 定义节点

python
def node_high(state: State, config: RunnableConfig):
    # 高分路径:加 100 分
    return {"score": state["score"] + 100}

def node_low(state: State, config: RunnableConfig):
    # 低分路径:加 5 分
    return {"score": state["score"] + 5}

3. 路由函数返回布尔值

python
def routing_func(state: State) -> bool:
    if state["score"] > 50:
        return True     # True → 走高分路径
    else:
        return False    # False → 走低分路径

关键点

  • 路由函数的返回值类型是 bool,而不是字符串
  • 返回 TrueFalse 代表两种状态

4. 用 mapping 映射

python
builder.add_conditional_edges(
    START,
    routing_func,
    {
        True: "node_high",
        False: "node_low"
    }
)

优势

  • 路由函数可以复用(只要返回 True/False 的逻辑相同)
  • 不同 Graph 可以用同一个路由函数,映射到不同节点

运行示例

python
# 输入 score=30,30 <= 50,返回 False → node_low
print(graph.invoke({"score": 30}))
# 输出: {'score': 35}

# 输入 score=80,80 > 50,返回 True → node_high
print(graph.invoke({"score": 80}))
# 输出: {'score': 180}

案例一完整代码

python
from typing import TypedDict
from langchain_core.runnables import RunnableConfig
from langgraph.constants import START, END
from langgraph.graph import StateGraph


class State(TypedDict):
    score: int


def node_high(state: State, config: RunnableConfig):
    return {"score": state["score"] + 100}

def node_low(state: State, config: RunnableConfig):
    return {"score": state["score"] + 5}


def routing_func(state: State) -> bool:
    if state["score"] > 50:
        return True
    else:
        return False


builder = StateGraph(State)
builder.add_node("node_high", node_high)
builder.add_node("node_low", node_low)

builder.add_conditional_edges(
    START,
    routing_func,
    {
        True: "node_high",
        False: "node_low"
    }
)

builder.add_edge("node_high", END)
builder.add_edge("node_low", END)

graph = builder.compile()

# 测试
print(graph.invoke({"score": 30}))   # {'score': 35}
print(graph.invoke({"score": 80}))   # {'score': 180}

# 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph(xray=True).draw_mermaid_png()))

案例二:Send 动态分发

案例概览

有时候,我们需要根据输入数据动态决定执行多少次。比如:

  • 收到一批文件,每个文件需要单独处理
  • 收到多个问题,每个问题需要单独回答
  • 收到多个任务,需要并行处理

LangGraph 的 Send 机制就是为此设计的:路由函数可以返回多个 Send 指令,让同一个节点被并行触发多次。

核心概念:Send

python
from langgraph.types import Send

def routing_func(state: State):
    result = []
    for item in state["items"]:
        # 每个 item 生成一个 Send 指令
        result.append(Send("processor", {"item": item}))
    return result

Send 的作用

  • Send(node_name, private_state) 表示"向 node_name 发送一个带私有状态的执行请求"
  • 返回多个 Send,LangGraph 会并行执行这些请求
  • 每个 Send 携带的是私有状态(PrivateState),不是全局 State

知识点拆解

1. 定义全局 State 和私有 State

python
from typing import TypedDict, Annotated, List
from operator import add

class State(TypedDict):
    items: Annotated[List[str], add]  # 全局状态,自动累加

class PrivateState(TypedDict):
    item: str  # 私有状态,只传给单个节点执行

两种 State 的区别

类型作用生命周期
State(全局)所有节点共享,会被持久化整个 Graph 执行期间
PrivateState(私有)只传给单次节点执行单次节点执行

2. 定义处理节点

python
def processor(state: PrivateState) -> State:
    res = state["item"] + " ✓"
    return {"items": [res]}  # 会 append 到全局 items

注意

  • 节点接收的是 PrivateState(私有状态)
  • 节点返回的是 State(全局状态更新)

3. 路由函数返回 Send 列表

python
def routing_func(state: State):
    result = []
    for item in state["items"]:
        result.append(Send("processor", {"item": item}))
    return result

执行过程

  1. 假设输入 items = ["苹果", "香蕉", "橙子", "葡萄"]
  2. routing_func 生成 4 个 Send 指令
  3. LangGraph 并行执行 4 次 processor
  4. 每次执行返回一个处理后的结果
  5. 结果被 append 到全局 items

运行示例

python
result = graph.invoke({"items": ["苹果", "香蕉", "橙子", "葡萄"]})
print(result)
# {'items': ['苹果', '香蕉', '橙子', '葡萄', '苹果 ✓', '香蕉 ✓', '橙子 ✓', '葡萄 ✓']}

案例二完整代码

python
from typing import TypedDict, Annotated, List
from langgraph.graph import StateGraph
from langgraph.types import Send
from langgraph.constants import START, END
from operator import add


class State(TypedDict):
    items: Annotated[List[str], add]


class PrivateState(TypedDict):
    item: str


def processor(state: PrivateState) -> State:
    res = state["item"] + " ✓"
    return {"items": [res]}


def routing_func(state: State):
    result = []
    for item in state["items"]:
        result.append(Send("processor", {"item": item}))
    return result


builder = StateGraph(State)
builder.add_node("processor", processor)
builder.add_conditional_edges(START, routing_func, ["processor"])
builder.add_edge("processor", END)

graph = builder.compile()

# 测试
result = graph.invoke({"items": ["苹果", "香蕉", "橙子", "葡萄"]})
print(result)

# 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph(xray=True).draw_mermaid_png()))

Send 进阶:多目标分发

Send 不仅可以向同一个节点发送多次,还可以向不同节点分发

python
def smart_router(state: State):
    sends = []
    for task in state["tasks"]:
        if task["type"] == "text":
            sends.append(Send("text_processor", {"task": task}))
        elif task["type"] == "image":
            sends.append(Send("image_processor", {"task": task}))
        else:
            sends.append(Send("default_processor", {"task": task}))
    return sends

应用场景

  • 根据任务类型分发到不同的处理器
  • 实现 Map-Reduce 模式:先并行处理,再汇总结果
  • 构建多 Agent 协作系统

Send vs 条件边

特性Send条件边
触发数量可以触发多次只能走一条路
并行能力天然支持并行串行执行
状态传递可传私有状态共享全局状态
典型场景批量处理、fan-out分支选择

案例三:Command API

案例概览

在前面的例子中,节点通过返回 State 更新来控制数据流,而流程走向由 Edge 决定。

但有时候,我们希望节点自己决定下一步去哪。LangGraph 1.x 引入的 Command API 就提供了这种能力。

核心概念:Command

python
from langgraph.types import Command

def node(state: State):
    return Command(
        goto=END,                    # 显式指定下一步去哪
        update={"tasks": [...]}      # 更新 State
    )

Command 的两个核心参数

  • goto:显式指定下一个节点(覆盖边的自动路由)
  • update:要更新的状态字段

知识点拆解

1. 普通返回 vs Command 返回

普通返回(之前的方式):

python
def node(state: State):
    return {"tasks": ["完成"]}  # 只更新状态,走默认边

Command 返回(命令式控制):

python
def node(state: State):
    return Command(
        goto=END,                  # 覆盖默认边,直接去 END
        update={"tasks": ["完成"]}  # 同时更新状态
    )

2. Command 的优势

特性普通返回Command 返回
状态更新✅ 支持✅ 支持
控制流向❌ 只能走预定义的边✅ 可以动态决定
条件跳转需要条件边直接在节点内实现

3. 使用场景

  • 早期终止:处理完成后直接跳到 END
  • 错误处理:出错时跳到错误处理节点
  • 循环控制:满足条件时跳回某个节点

运行示例

python
result = graph.invoke(
    {"tasks": ["学习Python", "写代码", "测试程序", "部署上线"]}
)
print(result)
# {'tasks': ['学习Python', '写代码', '测试程序', '部署上线',
#            '学习Python [完成]', '写代码 [完成]', '测试程序 [完成]', '部署上线 [完成]']}

案例三完整代码

python
from operator import add
from typing import TypedDict, Annotated

from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import Command


class State(TypedDict):
    tasks: Annotated[list[str], add]


def task_processor(state: State):
    completed_tasks = []
    for task in state["tasks"]:
        completed_tasks.append(task + " [完成]")

    return Command(
        goto=END,
        update={"tasks": completed_tasks}
    )


builder = StateGraph(State)
builder.add_node("task_processor", task_processor)
builder.add_edge(START, "task_processor")

graph = builder.compile()

# 测试
result = graph.invoke(
    {"tasks": ["学习Python", "写代码", "测试程序", "部署上线"]}
)
print(result)

# 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph(xray=True).draw_mermaid_png()))

案例四:Command 循环控制

案例概览

Command 不仅可以跳到 END,还可以跳到任意节点,甚至跳回自己实现循环。

这在需要重试、迭代处理的场景中非常有用。

核心概念:动态 goto

python
def step_two(state: State):
    current_step = state["step"]

    if current_step < 3:
        next_node = "step_two"  # 循环回自己
    else:
        next_node = END         # 结束

    return Command(
        goto=next_node,
        update={"step": current_step + 1}
    )

执行流程图解

初始状态: step=0

step_one:
  - 设置 step=1
  - goto="step_two"

step_two (第1次):
  - step=1 < 3,继续循环
  - 设置 step=2
  - goto="step_two"

step_two (第2次):
  - step=2 < 3,继续循环
  - 设置 step=3
  - goto="step_two"

step_two (第3次):
  - step=3 >= 3,结束
  - goto=END

结束

案例四完整代码

python
from operator import add
from typing import TypedDict, Annotated

from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import Command


class State(TypedDict):
    logs: Annotated[list[str], add]
    step: int


def step_one(state: State):
    return Command(
        goto="step_two",
        update={
            "logs": ["步骤一:初始化完成"],
            "step": 1
        }
    )


def step_two(state: State):
    current_step = state["step"]

    if current_step < 3:
        next_node = "step_two"
    else:
        next_node = END

    return Command(
        goto=next_node,
        update={
            "logs": [f"步骤二:处理中... (第{current_step + 1}次)"],
            "step": current_step + 1
        }
    )


builder = StateGraph(State)
builder.add_node("step_one", step_one)
builder.add_node("step_two", step_two)
builder.add_edge(START, "step_one")

graph = builder.compile()

# 测试
result = graph.invoke({"logs": ["流程启动"], "step": 0})
print("执行日志:")
for log in result["logs"]:
    print(f"  - {log}")


# 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph(xray=True).draw_mermaid_png()))

# 输出:
#   - 流程启动
#   - 步骤一:初始化完成
#   - 步骤二:处理中... (第2次)
#   - 步骤二:处理中... (第3次)
#   - 步骤二:处理中... (第4次)

案例五:Subgraph 子图嵌套

案例概览

当 Graph 变得复杂时,我们可以将其拆分成多个子图,然后在父图中调用。

这就像编程中的"函数封装":

  • 子图:封装一个独立的处理流程
  • 父图:组合多个子图,形成完整流程

核心概念:Subgraph 作为节点

python
# 1. 构建子图
subgraph_builder = StateGraph(State)
subgraph_builder.add_node("sub_processor", sub_processor)
subgraph_builder.add_edge(START, "sub_processor")
subgraph_builder.add_edge("sub_processor", END)
subgraph = subgraph_builder.compile()

# 2. 在父图中使用子图作为节点
builder = StateGraph(State)
builder.add_node("subgraph_node", subgraph)  # 子图作为节点
builder.add_edge(START, "subgraph_node")
builder.add_edge("subgraph_node", END)
graph = builder.compile()

知识点拆解

1. 子图的独立性

子图是一个完整的 Graph

  • 有自己的 START 和 END
  • 可以独立 compile 和 invoke
  • 可以单独测试

2. 父图调用子图

当父图执行到 subgraph_node 时:

  1. 将当前 State 传给子图
  2. 子图执行完整流程(START → ... → END)
  3. 子图的输出状态合并回父图

3. State 共享

父图和子图通常使用相同的 State 类型

  • 子图可以读取父图传入的状态
  • 子图的更新会反映到父图

运行示例

python
result = graph.invoke({"logs": ["[父图] 开始执行"]})
print("执行日志:")
for log in result["logs"]:
    print(f"  - {log}")

# 输出:
#   - [父图] 开始执行
#   - [父图] 开始执行(子图收到的初始状态)
#   - [子图] 数据处理完成

案例五完整代码

python
from operator import add
from typing import TypedDict, Annotated

from langgraph.constants import END, START
from langgraph.graph import StateGraph


class State(TypedDict):
    logs: Annotated[list[str], add]


def sub_processor(state: State) -> dict:
    return {"logs": ["[子图] 数据处理完成"]}


# 构建子图
subgraph_builder = StateGraph(State)
subgraph_builder.add_node("sub_processor", sub_processor)
subgraph_builder.add_edge(START, "sub_processor")
subgraph_builder.add_edge("sub_processor", END)
subgraph = subgraph_builder.compile()


# 构建父图
builder = StateGraph(State)
builder.add_node("subgraph_node", subgraph)
builder.add_edge(START, "subgraph_node")
builder.add_edge("subgraph_node", END)
graph = builder.compile()


# 测试
result = graph.invoke({"logs": ["[父图] 开始执行"]})
print("执行日志:")
for log in result["logs"]:
    print(f"  - {log}")


# 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph(xray=True).draw_mermaid_png()))


深入理解:四种边的本质区别

LangGraph 提供了四种控制流程的方式,它们各有特点:

1. 普通边(Normal Edge)

python
builder.add_edge("node_a", "node_b")

特点:无条件跳转,适合固定流程。

2. 条件边(Conditional Edge)

python
builder.add_conditional_edges(
    "node_a",
    routing_func,
    {"case1": "node_b", "case2": "node_c"}
)

特点:根据状态动态选择一条路径。

3. Send 边

python
def router(state):
    return [Send("worker", {"task": t}) for t in state["tasks"]]

特点:可以触发多个并行执行,每个执行携带独立的私有状态。

4. Command

python
def node(state):
    return Command(goto="next_node", update={"key": "value"})

特点:在节点内部决定流向,覆盖预定义的边。

选择指南

需要做决策吗?
├── 否 → 普通边
└── 是 → 决策点在哪?
         ├── 边上(路由函数)
         │    ├── 选一条路 → 条件边
         │    └── 多条并行 → Send
         └── 节点内部 → Command

组合使用示例

实际项目中,这些机制经常组合使用:

python
def smart_processor(state: State):
    # 根据状态决定行为
    if state["error_count"] > 3:
        # 使用 Command 直接跳转到错误处理
        return Command(goto="error_handler", update={"status": "failed"})

    if state["needs_review"]:
        # 正常返回状态,让条件边决定下一步
        return {"processed": True}

    # 使用 Command 跳过审核直接完成
    return Command(goto=END, update={"processed": True})

本章总结

本节介绍了 LangGraph 的高级路由和控制流机制:

机制核心特点适用场景
Boolean Routing路由函数返回布尔/枚举,用 mapping 映射可复用的路由逻辑
Send动态生成多个执行指令,并行处理批量处理、Map-Reduce
Command节点内显式指定下一步动态流程控制、循环
Subgraph子图作为节点,模块化设计复杂流程拆分

选择指南

  • 简单条件分支 → 用普通条件边
  • 可复用的分支逻辑 → 用 Boolean Routing
  • 并行处理多个任务 → 用 Send
  • 节点内动态决定流向 → 用 Command
  • 复杂流程模块化 → 用 Subgraph

思考题

  1. 如果 Send 返回的是空列表 [],会发生什么?
  2. Command 的 goto 可以跳到还没执行过的节点吗?
  3. 子图和父图可以使用不同的 State 类型吗?如果可以,怎么做?
  4. 如何用 Command 实现一个最多重试 3 次的错误处理机制?

下一步

掌握了这些高级特性后,你已经可以构建相当复杂的 AI Agent 了。接下来的章节会介绍:

  • Human-in-the-loop:人机协作
  • Persistence:状态持久化
  • Streaming:流式输出

这些特性将让你的 Agent 更加实用和强大!

基于 MIT 许可证发布。内容版权归作者所有。