Skip to content

LangGraph 人在环 (HITL) 详细解读

概述

Human-in-the-Loop (HITL),中文译为"人在环"或"人机协同",是 AI 工作流中一种让人类参与到 AI 决策过程中的机制。简单来说,就是让 AI 在关键步骤暂停,等待人类审核、确认或提供输入后,再继续执行。

想象一下:AI 就像一个勤劳的助手,HITL 机制让你可以在任何时候喊"停",检查它在做什么,给它反馈,然后让它继续干活。


术语表

术语名称通俗解释Python 语法重要程度
interrupt()LangGraph 1.0 新增的"暂停键",在代码里调用它就能暂停图的执行from langgraph.types import interrupt核心
Command恢复执行的"遥控器",用来告诉图继续执行并传入人类的输入from langgraph.types import Command核心
resumeCommand 的参数,存放人类提供的输入值Command(resume="your_input")核心
Checkpointer状态存档器,自动保存图执行的进度,就像游戏存档一样InMemorySaver()PostgresSaver()核心
thread_id会话标识符,用于区分不同的对话/用户,类似于聊天窗口ID{"configurable": {"thread_id": "123"}}核心
interrupt_before旧版静态断点,在指定节点执行前暂停(不推荐用于 HITL)graph.compile(interrupt_before=["node"])了解
interrupt_after旧版静态断点,在指定节点执行后暂停(不推荐用于 HITL)graph.compile(interrupt_after=["node"])了解

核心概念

为什么需要 HITL?

在实际应用中,完全自主的 AI Agent 存在风险:

  1. 安全性:AI 可能执行危险操作(如删除数据、转账)
  2. 准确性:AI 可能做出错误判断需要人工纠正
  3. 合规性:某些操作必须有人工审批记录
  4. 用户体验:需要在关键步骤获取用户确认或额外信息

LangGraph 1.0 的 interrupt() vs 旧版断点

特性interrupt() (推荐)interrupt_before/after (旧版)
灵活性可在代码任意位置调用只能在节点边界设置
传值直接返回人类输入值需要手动更新状态
条件控制可以放在 if 语句中无法条件触发
生产就绪设计用于生产环境主要用于调试

HITL 的三大应用场景

+-----------------------------------------------------------+
|                    Human-in-the-Loop                       |
+---------------+-------------------+-----------------------+
|   审批 Approve |   编辑 Edit       |   输入 Input          |
+---------------+-------------------+-----------------------+
| - 工具调用前确认| - 修改 AI 生成内容 | - 获取用户澄清信息     |
| - 敏感操作审批  | - 纠正错误判断    | - 多轮对话收集信息     |
| - 高风险动作拦截| - 调整参数设置    | - 动态补充上下文       |
+---------------+-------------------+-----------------------+

代码实现详解

示例一:最简单的 HITL(入门级)

这是最基础的 HITL 示例,展示如何暂停并等待用户输入:

python
"""
示例1:最简单的 HITL
目标:在工作流中暂停,等待用户确认后继续
"""
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt
from langgraph.checkpoint.memory import InMemorySaver

# 第一步:定义状态(图需要记住什么)
class State(TypedDict):
    message: str
    confirmed: bool

# 第二步:定义节点函数
def ask_confirmation(state: State):
    """这个节点会暂停,等待用户确认"""
    print(f"收到消息: {state['message']}")

    # 关键:调用 interrupt() 暂停执行
    # 括号里的字符串是提示信息,会返回给调用方
    user_response = interrupt("请确认是否继续?(yes/no)")

    # 用户恢复执行后,user_response 就是用户输入的值
    is_confirmed = user_response.lower() == "yes"
    return {"confirmed": is_confirmed}

def process_result(state: State):
    """根据用户确认结果处理"""
    if state["confirmed"]:
        print("用户已确认,继续执行...")
        return {"message": "操作已完成!"}
    else:
        print("用户取消,停止执行...")
        return {"message": "操作已取消"}

# 第三步:构建图
builder = StateGraph(State)
builder.add_node("ask", ask_confirmation)
builder.add_node("process", process_result)
builder.add_edge(START, "ask")
builder.add_edge("ask", "process")
builder.add_edge("process", END)

# 关键:必须有 checkpointer 才能使用 interrupt
memory = InMemorySaver()
graph = builder.compile(checkpointer=memory)

# 第四步:执行图
config = {"configurable": {"thread_id": "user-123"}}

# 首次运行 - 会在 interrupt() 处暂停
print("=== 开始执行 ===")
for event in graph.stream({"message": "Hello World"}, config):
    print(event)

print("\n=== 图已暂停,等待用户输入 ===\n")

# 模拟用户输入 "yes",使用 Command 恢复执行
from langgraph.types import Command

print("=== 用户输入 yes,恢复执行 ===")
for event in graph.stream(Command(resume="yes"), config):
    print(event)

运行结果:

=== 开始执行 ===
收到消息: Hello World
{'__interrupt__': (Interrupt(value='请确认是否继续?(yes/no)'),)}

=== 图已暂停,等待用户输入 ===

=== 用户输入 yes,恢复执行 ===
用户已确认,继续执行...
{'process': {'message': '操作已完成!'}}

示例二:审批/拒绝模式(中级)

这个示例展示如何根据用户的审批结果,路由到不同的处理分支:

python
"""
示例2:审批/拒绝模式
目标:用户可以批准或拒绝操作,图根据结果走不同分支
"""
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver

class WorkflowState(TypedDict):
    task: str
    user_decision: str
    status: str

def get_approval(state: WorkflowState):
    """暂停并等待用户审批"""
    print(f"待审批任务: {state['task']}")

    # 暂停等待用户输入
    decision = interrupt({
        "prompt": "请审批此任务",
        "task": state["task"],
        "options": ["approve", "reject"]
    })

    print(f"用户决定: {decision}")
    return {"user_decision": decision}

def router(state: WorkflowState) -> Command:
    """根据用户决定路由到不同节点"""
    decision = state.get("user_decision", "").strip().lower()

    if decision == "approve":
        return Command(goto="complete_task")
    else:
        return Command(goto="cancel_task")

def complete_task(state: WorkflowState):
    """批准后执行任务"""
    print("任务已批准并完成!")
    return {"status": "completed"}

def cancel_task(state: WorkflowState):
    """拒绝后取消任务"""
    print("任务已被拒绝并取消")
    return {"status": "cancelled"}

# 构建图
builder = StateGraph(WorkflowState)
builder.add_node("get_approval", get_approval)
builder.add_node("router", router)
builder.add_node("complete_task", complete_task)
builder.add_node("cancel_task", cancel_task)

builder.add_edge(START, "get_approval")
builder.add_edge("get_approval", "router")
builder.add_edge("complete_task", END)
builder.add_edge("cancel_task", END)

memory = InMemorySaver()
graph = builder.compile(checkpointer=memory)

# 测试场景1:批准
print("\n" + "="*50)
print("场景1:用户批准任务")
print("="*50)

config1 = {"configurable": {"thread_id": "test-1"}}
initial_input = {"task": "部署生产环境新功能"}

# 运行到中断点
for event in graph.stream(initial_input, config1, stream_mode="values"):
    print(f"状态: {event}")

# 用户批准
print("\n--- 用户输入: approve ---\n")
for event in graph.stream(Command(resume="approve"), config1, stream_mode="values"):
    print(f"状态: {event}")

# 测试场景2:拒绝
print("\n" + "="*50)
print("场景2:用户拒绝任务")
print("="*50)

config2 = {"configurable": {"thread_id": "test-2"}}

for event in graph.stream(initial_input, config2, stream_mode="values"):
    print(f"状态: {event}")

print("\n--- 用户输入: reject ---\n")
for event in graph.stream(Command(resume="reject"), config2, stream_mode="values"):
    print(f"状态: {event}")

示例三:输入验证循环(中高级)

有时候用户的输入需要验证,无效输入需要重新请求。这个示例展示如何使用循环实现输入验证:

python
"""
示例3:输入验证循环
目标:持续请求用户输入,直到输入有效为止
"""
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver

class FormState(TypedDict):
    name: str
    age: int
    email: str

def collect_age(state: FormState):
    """收集并验证年龄输入"""
    prompt = "请输入您的年龄(正整数):"

    while True:
        # 每次循环都会暂停等待输入
        answer = interrupt(prompt)

        # 验证输入
        try:
            age = int(answer)
            if age > 0 and age < 150:
                print(f"年龄验证通过: {age}")
                return {"age": age}
            else:
                prompt = f"'{answer}' 不是有效年龄,请输入1-150之间的数字:"
        except ValueError:
            prompt = f"'{answer}' 不是数字,请输入有效的年龄:"

def collect_email(state: FormState):
    """收集并验证邮箱输入"""
    prompt = "请输入您的邮箱地址:"

    while True:
        answer = interrupt(prompt)

        # 简单的邮箱验证
        if "@" in answer and "." in answer:
            print(f"邮箱验证通过: {answer}")
            return {"email": answer}
        else:
            prompt = f"'{answer}' 不是有效邮箱,请重新输入(需包含@和.):"

def show_summary(state: FormState):
    """显示收集结果"""
    print("\n" + "="*40)
    print("信息收集完成!")
    print(f"   姓名: {state['name']}")
    print(f"   年龄: {state['age']}")
    print(f"   邮箱: {state['email']}")
    print("="*40)
    return state

# 构建图
builder = StateGraph(FormState)
builder.add_node("collect_age", collect_age)
builder.add_node("collect_email", collect_email)
builder.add_node("show_summary", show_summary)

builder.add_edge(START, "collect_age")
builder.add_edge("collect_age", "collect_email")
builder.add_edge("collect_email", "show_summary")
builder.add_edge("show_summary", END)

memory = InMemorySaver()
graph = builder.compile(checkpointer=memory)

# 执行示例
config = {"configurable": {"thread_id": "form-123"}}
initial = {"name": "张三", "age": 0, "email": ""}

print("=== 开始收集信息 ===\n")

# 第一次运行,会在第一个 interrupt 处暂停
for event in graph.stream(initial, config, stream_mode="values"):
    if "__interrupt__" in str(event):
        print(f"系统提示: {event}")

# 模拟用户输入无效年龄
print("\n用户输入: 'abc' (无效)")
for event in graph.stream(Command(resume="abc"), config, stream_mode="values"):
    if "__interrupt__" in str(event):
        print(f"系统提示: {event}")

# 再次输入无效年龄
print("\n用户输入: '-5' (无效)")
for event in graph.stream(Command(resume="-5"), config, stream_mode="values"):
    if "__interrupt__" in str(event):
        print(f"系统提示: {event}")

# 输入有效年龄,进入下一步
print("\n用户输入: '25' (有效)")
for event in graph.stream(Command(resume="25"), config, stream_mode="values"):
    if "__interrupt__" in str(event):
        print(f"系统提示: {event}")

# 输入无效邮箱
print("\n用户输入: 'invalid' (无效邮箱)")
for event in graph.stream(Command(resume="invalid"), config, stream_mode="values"):
    if "__interrupt__" in str(event):
        print(f"系统提示: {event}")

# 输入有效邮箱
print("\n用户输入: 'zhangsan@example.com' (有效)")
for event in graph.stream(Command(resume="zhangsan@example.com"), config, stream_mode="values"):
    print(f"最终状态: {event}")

示例四:工具调用审批(高级)

这是最常见的生产场景:AI Agent 决定调用某个工具前,需要人工审批:

python
"""
示例4:工具调用审批
目标:AI Agent 在调用工具前需要人工审批
"""
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI

# 定义工具
@tool
def send_email(to: str, subject: str, body: str) -> str:
    """发送邮件给指定收件人"""
    # 实际应用中这里会调用邮件服务
    return f"邮件已发送给 {to},主题: {subject}"

@tool
def delete_file(filename: str) -> str:
    """删除指定文件(危险操作)"""
    return f"文件 {filename} 已删除"

@tool
def get_weather(city: str) -> str:
    """获取城市天气(安全操作)"""
    return f"{city}今天天气晴朗,温度25度C"

# 定义哪些工具需要审批
TOOLS_REQUIRING_APPROVAL = {"send_email", "delete_file"}

# 状态定义
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]

# 创建带审批功能的工具节点
def tool_node_with_approval(state: AgentState):
    """执行工具调用,敏感工具需要审批"""
    last_message = state["messages"][-1]

    results = []
    for tool_call in last_message.tool_calls:
        tool_name = tool_call["name"]
        tool_args = tool_call["args"]

        # 检查是否需要审批
        if tool_name in TOOLS_REQUIRING_APPROVAL:
            print(f"\n敏感操作需要审批: {tool_name}")
            print(f"   参数: {tool_args}")

            # 暂停等待审批
            approval = interrupt({
                "type": "tool_approval",
                "tool": tool_name,
                "args": tool_args,
                "message": f"是否允许执行 {tool_name}?(approve/reject)"
            })

            if approval.lower() != "approve":
                results.append({
                    "tool_call_id": tool_call["id"],
                    "content": f"操作被用户拒绝: {tool_name}"
                })
                continue

            print(f"用户已批准 {tool_name}")

        # 执行工具
        tool_map = {
            "send_email": send_email,
            "delete_file": delete_file,
            "get_weather": get_weather
        }

        result = tool_map[tool_name].invoke(tool_args)
        results.append({
            "tool_call_id": tool_call["id"],
            "content": result
        })

    from langchain_core.messages import ToolMessage
    return {"messages": [ToolMessage(**r) for r in results]}

# 创建 Agent
tools = [send_email, delete_file, get_weather]
llm = ChatOpenAI(model="gpt-4").bind_tools(tools)

def agent(state: AgentState):
    """Agent 节点:调用 LLM 决定下一步"""
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

# 构建图
builder = StateGraph(AgentState)
builder.add_node("agent", agent)
builder.add_node("tools", tool_node_with_approval)

builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", tools_condition)
builder.add_edge("tools", "agent")

memory = InMemorySaver()
graph = builder.compile(checkpointer=memory)

# 测试
config = {"configurable": {"thread_id": "agent-test"}}

print("=== 测试1:查询天气(不需要审批)===")
for event in graph.stream(
    {"messages": [("user", "北京今天天气怎么样?")]},
    config,
    stream_mode="values"
):
    if event["messages"]:
        print(event["messages"][-1])

print("\n=== 测试2:发送邮件(需要审批)===")
config2 = {"configurable": {"thread_id": "agent-test-2"}}

for event in graph.stream(
    {"messages": [("user", "帮我发一封邮件给 boss@company.com,主题是请假申请")]},
    config2,
    stream_mode="values"
):
    if "__interrupt__" in str(event):
        print(f"\n等待审批...")
        break
    if event["messages"]:
        print(event["messages"][-1])

# 用户审批
print("\n用户输入: approve")
for event in graph.stream(Command(resume="approve"), config2, stream_mode="values"):
    if event["messages"]:
        print(event["messages"][-1])

示例五:多步骤工作流 + 状态编辑(高级)

这个示例展示如何在暂停时让用户编辑图的状态:

python
"""
示例5:多步骤工作流 + 状态编辑
目标:用户可以在中断时查看并修改 Agent 的工作状态
"""
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver

class DocumentState(TypedDict):
    title: str
    content: str
    summary: str
    approved: bool

def generate_summary(state: DocumentState):
    """AI 生成摘要"""
    # 模拟 AI 生成摘要
    summary = f"这是关于'{state['title']}'的摘要:{state['content'][:50]}..."
    print(f"AI 生成摘要: {summary}")
    return {"summary": summary}

def review_summary(state: DocumentState):
    """人工审核摘要,可以编辑"""
    print("\n" + "="*50)
    print("请审核 AI 生成的摘要:")
    print(f"   标题: {state['title']}")
    print(f"   摘要: {state['summary']}")
    print("="*50)

    # 暂停等待用户反馈
    feedback = interrupt({
        "type": "review",
        "current_summary": state["summary"],
        "options": [
            "approve - 批准当前摘要",
            "edit:新摘要内容 - 修改摘要",
            "reject - 拒绝"
        ]
    })

    feedback = feedback.strip()

    if feedback.lower() == "approve":
        print("摘要已批准")
        return {"approved": True}
    elif feedback.lower().startswith("edit:"):
        new_summary = feedback[5:].strip()
        print(f"摘要已修改为: {new_summary}")
        return {"summary": new_summary, "approved": True}
    else:
        print("摘要被拒绝")
        return {"approved": False}

def finalize(state: DocumentState):
    """最终处理"""
    if state["approved"]:
        print(f"\n文档处理完成!最终摘要: {state['summary']}")
    else:
        print("\n文档处理未完成,摘要被拒绝")
    return state

# 构建图
builder = StateGraph(DocumentState)
builder.add_node("generate", generate_summary)
builder.add_node("review", review_summary)
builder.add_node("finalize", finalize)

builder.add_edge(START, "generate")
builder.add_edge("generate", "review")
builder.add_edge("review", "finalize")
builder.add_edge("finalize", END)

memory = InMemorySaver()
graph = builder.compile(checkpointer=memory)

# 测试场景:用户编辑摘要
config = {"configurable": {"thread_id": "doc-1"}}

print("=== 开始文档处理 ===\n")
for event in graph.stream({
    "title": "LangGraph 入门指南",
    "content": "LangGraph 是一个用于构建有状态、多角色应用的框架...",
    "summary": "",
    "approved": False
}, config, stream_mode="values"):
    pass

# 用户选择编辑
print("\n用户输入: edit:LangGraph 是构建 AI Agent 的强大框架")
for event in graph.stream(
    Command(resume="edit:LangGraph 是构建 AI Agent 的强大框架"),
    config,
    stream_mode="values"
):
    pass

完整案例代码(可直接运行)

以下是一个完整的、可以直接在 Jupyter Notebook 中运行的代码示例,演示 LangGraph 1.0 HITL 的核心功能:

python
# ============================================================
# LangGraph 1.0 Human-in-the-Loop (HITL) 完整示例
# 演示:interrupt()、Command、InMemorySaver 协同工作
# ============================================================

# --------------------------
# 1. 导入必要的库
# --------------------------
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver
from IPython.display import Image, display

# --------------------------
# 2. 定义状态
# --------------------------
class TaskState(TypedDict):
    task_name: str
    task_params: dict
    user_decision: str
    result: str

# --------------------------
# 3. 定义节点
# --------------------------
def prepare_task(state: TaskState):
    """准备任务,展示将要执行的操作"""
    print(f"\n📋 准备执行任务: {state['task_name']}")
    print(f"   参数: {state['task_params']}")
    return state

def get_user_approval(state: TaskState):
    """暂停等待用户审批"""
    print("\n⏸️ 需要用户审批...")

    # 关键:使用 interrupt() 暂停执行
    decision = interrupt({
        "type": "approval_request",
        "task": state["task_name"],
        "params": state["task_params"],
        "prompt": "请选择: approve (批准) / reject (拒绝) / modify:新参数 (修改)"
    })

    print(f"✅ 收到用户反馈: {decision}")
    return {"user_decision": decision}

def process_decision(state: TaskState) -> Command:
    """根据用户决定路由到不同分支"""
    decision = state["user_decision"].strip().lower()

    if decision == "approve":
        return Command(goto="execute_task")
    elif decision.startswith("modify:"):
        # 提取修改后的参数
        new_param = decision[7:].strip()
        return Command(goto="execute_task", update={"task_params": {"value": new_param}})
    else:
        return Command(goto="cancel_task")

def execute_task(state: TaskState):
    """执行任务"""
    print(f"\n🚀 执行任务: {state['task_name']}")
    print(f"   使用参数: {state['task_params']}")
    result = f"任务 '{state['task_name']}' 执行成功,参数: {state['task_params']}"
    print(f"   结果: {result}")
    return {"result": result}

def cancel_task(state: TaskState):
    """取消任务"""
    print(f"\n❌ 任务已取消: {state['task_name']}")
    return {"result": "任务已取消"}

# --------------------------
# 4. 构建图
# --------------------------
builder = StateGraph(TaskState)

builder.add_node("prepare", prepare_task)
builder.add_node("approval", get_user_approval)
builder.add_node("router", process_decision)
builder.add_node("execute_task", execute_task)
builder.add_node("cancel_task", cancel_task)

builder.add_edge(START, "prepare")
builder.add_edge("prepare", "approval")
builder.add_edge("approval", "router")
builder.add_edge("execute_task", END)
builder.add_edge("cancel_task", END)

# 关键:必须有 Checkpointer 才能使用 interrupt
memory = InMemorySaver()
graph = builder.compile(checkpointer=memory)

# --------------------------
# 5. 可视化图结构
# --------------------------
print("📊 图结构可视化:")
display(Image(graph.get_graph().draw_mermaid_png()))

# --------------------------
# 6. 场景 1:用户批准
# --------------------------
print("\n" + "=" * 60)
print("📗 场景 1:用户批准任务")
print("=" * 60)

config1 = {"configurable": {"thread_id": "hitl-demo-1"}}
initial_input = {
    "task_name": "发送邮件",
    "task_params": {"to": "user@example.com", "subject": "测试"},
    "user_decision": "",
    "result": ""
}

# 执行到 interrupt 处暂停
print("\n▶️ 开始执行(会在审批处暂停)...")
for event in graph.stream(initial_input, config1, stream_mode="values"):
    pass

# 检查暂停状态
state1 = graph.get_state(config1)
print(f"\n📍 当前暂停于: {state1.next}")
if state1.tasks and state1.tasks[0].interrupts:
    interrupt_info = state1.tasks[0].interrupts[0]
    print(f"📌 中断信息: {interrupt_info.value}")

# 用户批准,使用 Command(resume=) 恢复执行
print("\n👤 用户输入: approve")
for event in graph.stream(Command(resume="approve"), config1, stream_mode="values"):
    pass

print(f"\n📊 最终结果: {graph.get_state(config1).values['result']}")

# --------------------------
# 7. 场景 2:用户拒绝
# --------------------------
print("\n" + "=" * 60)
print("📕 场景 2:用户拒绝任务")
print("=" * 60)

config2 = {"configurable": {"thread_id": "hitl-demo-2"}}

for event in graph.stream(initial_input, config2, stream_mode="values"):
    pass

print("\n👤 用户输入: reject")
for event in graph.stream(Command(resume="reject"), config2, stream_mode="values"):
    pass

print(f"\n📊 最终结果: {graph.get_state(config2).values['result']}")

# --------------------------
# 8. 场景 3:用户修改参数
# --------------------------
print("\n" + "=" * 60)
print("📘 场景 3:用户修改参数后执行")
print("=" * 60)

config3 = {"configurable": {"thread_id": "hitl-demo-3"}}

for event in graph.stream(initial_input, config3, stream_mode="values"):
    pass

print("\n👤 用户输入: modify:新的收件人")
for event in graph.stream(Command(resume="modify:新的收件人"), config3, stream_mode="values"):
    pass

final_state = graph.get_state(config3).values
print(f"\n📊 最终结果: {final_state['result']}")
print(f"📊 修改后参数: {final_state['task_params']}")

# --------------------------
# 9. 演示:输入验证循环
# --------------------------
print("\n" + "=" * 60)
print("🔄 演示:输入验证循环(多次 interrupt)")
print("=" * 60)

class ValidationState(TypedDict):
    name: str
    age: int

def collect_age(state: ValidationState):
    """收集年龄,验证直到有效"""
    prompt = "请输入您的年龄(1-150的整数):"

    while True:
        answer = interrupt(prompt)

        try:
            age = int(answer)
            if 1 <= age <= 150:
                print(f"✅ 年龄验证通过: {age}")
                return {"age": age}
            else:
                prompt = f"'{answer}' 不在有效范围(1-150),请重新输入:"
        except ValueError:
            prompt = f"'{answer}' 不是数字,请输入有效年龄:"

def show_result(state: ValidationState):
    """显示结果"""
    print(f"\n📊 收集完成!{state['name']} 的年龄是 {state['age']} 岁")
    return state

# 构建验证图
val_builder = StateGraph(ValidationState)
val_builder.add_node("collect_age", collect_age)
val_builder.add_node("show_result", show_result)
val_builder.add_edge(START, "collect_age")
val_builder.add_edge("collect_age", "show_result")
val_builder.add_edge("show_result", END)

val_memory = InMemorySaver()
val_graph = val_builder.compile(checkpointer=val_memory)

config4 = {"configurable": {"thread_id": "validation-demo"}}

# 首次执行
print("\n▶️ 开始收集年龄...")
for event in val_graph.stream({"name": "张三", "age": 0}, config4, stream_mode="values"):
    pass

# 输入无效值
print("\n👤 用户输入: 'abc' (无效)")
for event in val_graph.stream(Command(resume="abc"), config4, stream_mode="values"):
    pass

# 再次输入无效值
print("\n👤 用户输入: '-5' (无效)")
for event in val_graph.stream(Command(resume="-5"), config4, stream_mode="values"):
    pass

# 输入有效值
print("\n👤 用户输入: '25' (有效)")
for event in val_graph.stream(Command(resume="25"), config4, stream_mode="values"):
    pass

print("\n✨ HITL 演示完成!")

运行结果示例:

📊 图结构可视化:
[显示图:START → prepare → approval → router → execute_task/cancel_task → END]

============================================================
📗 场景 1:用户批准任务
============================================================

▶️ 开始执行(会在审批处暂停)...

📋 准备执行任务: 发送邮件
   参数: {'to': 'user@example.com', 'subject': '测试'}

⏸️ 需要用户审批...

📍 当前暂停于: ('approval',)
📌 中断信息: {'type': 'approval_request', 'task': '发送邮件', ...}

👤 用户输入: approve
✅ 收到用户反馈: approve

🚀 执行任务: 发送邮件
   使用参数: {'to': 'user@example.com', 'subject': '测试'}
   结果: 任务 '发送邮件' 执行成功...

📊 最终结果: 任务 '发送邮件' 执行成功...

============================================================
📕 场景 2:用户拒绝任务
============================================================
...
📊 最终结果: 任务已取消

============================================================
🔄 演示:输入验证循环(多次 interrupt)
============================================================

👤 用户输入: 'abc' (无效)
[系统等待新输入...]

👤 用户输入: '25' (有效)
✅ 年龄验证通过: 25

📊 收集完成!张三 的年龄是 25 岁

代码要点说明:

要点说明
interrupt(payload)暂停执行,payload 返回给调用方(必须可 JSON 序列化)
Command(resume=value)恢复执行,将 value 作为 interrupt 的返回值
Command(goto="node")路由到指定节点
Command(goto="node", update={...})路由并同时更新状态
InMemorySaver()内存 Checkpointer(开发用)
thread_id区分不同会话/用户

HITL 三大模式:

模式说明实现
审批 (Approve)用户确认后才执行interrupt() → 用户输入 → Command(resume=)
编辑 (Edit)用户修改 AI 结果interrupt() → 用户修改 → Command(update={...})
输入 (Input)收集用户信息while True: interrupt() 循环验证

生产环境最佳实践

1. 使用持久化 Checkpointer

开发时使用 InMemorySaver,生产环境使用持久化存储:

python
# 开发环境
from langgraph.checkpoint.memory import InMemorySaver
memory = InMemorySaver()

# 生产环境(推荐 PostgreSQL)
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver

async with AsyncPostgresSaver.from_conn_string(
    "postgresql://user:pass@localhost/db"
) as checkpointer:
    graph = builder.compile(checkpointer=checkpointer)

2. 避免常见错误

python
# 错误:不要用 try-except 包裹 interrupt
def bad_node(state):
    try:
        result = interrupt("prompt")  # interrupt 使用异常机制!
    except:
        pass  # 这会吞掉 interrupt 异常

# 正确:直接调用
def good_node(state):
    result = interrupt("prompt")
    return {"data": result}

# 错误:interrupt 前的副作用不是幂等的
def bad_side_effect(state):
    send_notification("开始处理")  # 恢复时会重复发送!
    result = interrupt("confirm?")
    return {"result": result}

# 正确:确保副作用幂等或放在 interrupt 之后
def good_side_effect(state):
    result = interrupt("confirm?")
    send_notification("处理完成")  # 在 interrupt 之后
    return {"result": result}

3. interrupt 的 payload 必须可 JSON 序列化

python
# 错误:传递函数或复杂对象
result = interrupt(lambda x: x)  # 函数不能 JSON 序列化

# 正确:使用字典、列表、字符串等基本类型
result = interrupt({
    "type": "approval",
    "message": "请确认",
    "options": ["yes", "no"]
})

参考资料

官方文档

教程与指南

中文资源

设计模式


小结

概念一句话总结
HITL让人类参与 AI 决策的机制
interrupt()LangGraph 1.0 的"暂停键",随时暂停等待输入
Command(resume=)恢复执行并传入人类输入的"遥控器"
Checkpointer保存执行进度的"存档器",必须配置
thread_id区分不同会话的唯一标识

核心心智模型:把 interrupt() 想象成游戏中的暂停键,Command(resume=) 是继续键,Checkpointer 是自动存档功能。有了这三个,你的 AI Agent 就可以在任何时候暂停、等待人类指示、然后继续执行了!

基于 MIT 许可证发布。内容版权归作者所有。