Skip to content

3.6 Streaming

本节介绍 LangChain 中的流式输出机制。


为什么需要 Streaming?

"Streaming is crucial for enhancing the responsiveness of applications built on LLMs."

流式输出对于提升 LLM 应用的响应性至关重要:

  • 更好的用户体验 - 实时看到输出,无需等待完整响应
  • 进度可见 - 了解 Agent 正在执行什么操作
  • 快速反馈 - 发现问题可以及早中断

四种流式模式

LangChain 提供四种流式输出模式:

1. Agent 进度 (stream_mode="updates")

每个 Agent 步骤后发送事件:

python
from langchain.agents import create_agent

agent = create_agent("gpt-4o", tools=[get_weather, search])

for event in agent.stream(
    {"messages": [{"role": "user", "content": "北京天气怎么样?"}]},
    stream_mode="updates"
):
    print(event)

输出示例:

python
# 事件 1: LLM 决定调用工具
{"llm": {"tool_calls": [{"name": "get_weather", "args": {"city": "北京"}}]}}

# 事件 2: 工具执行结果
{"tools": {"get_weather": "北京:晴,25度"}}

# 事件 3: LLM 生成最终回复
{"llm": {"content": "北京今天天气晴朗,温度25度,非常适合外出。"}}

2. LLM Token (stream_mode="messages")

逐 token 流式输出:

python
for event in agent.stream(
    {"messages": [{"role": "user", "content": "讲个故事"}]},
    stream_mode="messages"
):
    if hasattr(event, "content"):
        print(event.content, end="", flush=True)

输出:

从|前|有|一|座|山|,|山|里|有|座|庙|...

3. 自定义更新 (stream_mode="custom")

在工具中发送自定义信号:

python
from langchain.agents import get_stream_writer, ToolRuntime
from langchain_core.tools import tool

@tool
def analyze_data(data: str, runtime: ToolRuntime) -> str:
    """分析数据"""
    writer = get_stream_writer()

    writer("正在加载数据...")
    # 处理步骤 1

    writer("正在分析数据...")
    # 处理步骤 2

    writer("正在生成报告...")
    # 处理步骤 3

    return "分析完成"

# 接收自定义更新
for event in agent.stream(
    {"messages": [...]},
    stream_mode="custom"
):
    print(f"进度: {event}")

输出:

进度: 正在加载数据...
进度: 正在分析数据...
进度: 正在生成报告...

4. 组合模式

同时使用多种流式模式:

python
for event in agent.stream(
    {"messages": [{"role": "user", "content": "分析数据并总结"}]},
    stream_mode=["updates", "custom"]
):
    if "custom" in event:
        print(f"[进度] {event['custom']}")
    elif "llm" in event:
        print(f"[LLM] {event['llm']}")
    elif "tools" in event:
        print(f"[工具] {event['tools']}")

模型级流式输出

直接对模型使用流式输出:

python
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o")

# 同步流式
for chunk in model.stream("讲一个关于 AI 的故事"):
    print(chunk.content, end="", flush=True)

异步流式

python
async def stream_response():
    model = ChatOpenAI(model="gpt-4o")

    async for chunk in model.astream("讲一个故事"):
        print(chunk.content, end="", flush=True)

import asyncio
asyncio.run(stream_response())

处理流式工具调用

工具调用也可以流式输出:

python
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o")
model_with_tools = model.bind_tools([get_weather])

for chunk in model_with_tools.stream("北京和上海天气怎么样?"):
    # 检查是否有工具调用
    if chunk.tool_call_chunks:
        for tool_chunk in chunk.tool_call_chunks:
            print(f"工具: {tool_chunk.get('name', '')}")
            print(f"参数片段: {tool_chunk.get('args', '')}")

    # 检查是否有文本内容
    if chunk.content:
        print(chunk.content, end="")

禁用流式输出

在特定场景下禁用流式:

python
from langchain_openai import ChatOpenAI

# 创建模型时禁用
model = ChatOpenAI(model="gpt-4o", streaming=False)

# 或在调用时禁用
response = model.invoke("你好", config={"streaming": False})

多 Agent 系统中选择性禁用:

python
# 只让主 Agent 流式输出,子 Agent 不流式
main_agent = create_agent("gpt-4o", tools=[...], streaming=True)
sub_agent = create_agent("gpt-3.5-turbo", tools=[...], streaming=False)

完整流式示例

python
from langchain.agents import create_agent, get_stream_writer, ToolRuntime
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI

@tool
def process_file(filename: str, runtime: ToolRuntime) -> str:
    """处理文件"""
    writer = get_stream_writer()

    writer(f"正在读取文件: {filename}")
    # 模拟处理
    import time
    time.sleep(1)

    writer("正在解析内容...")
    time.sleep(1)

    writer("处理完成!")
    return f"文件 {filename} 处理完成,共 100 行"

# 创建 Agent
agent = create_agent(
    ChatOpenAI(model="gpt-4o"),
    tools=[process_file],
    system_prompt="你是文件处理助手"
)

# 流式执行
print("开始处理...\n")

for event in agent.stream(
    {"messages": [{"role": "user", "content": "处理 data.csv 文件"}]},
    stream_mode=["updates", "custom", "messages"]
):
    # 自定义进度
    if "custom" in event:
        print(f"[进度] {event['custom']}")

    # Agent 步骤更新
    elif "updates" in event:
        update = event["updates"]
        if "llm" in update:
            print(f"[思考] {update['llm'].get('content', '')[:50]}...")
        elif "tools" in update:
            print(f"[工具结果] {update['tools']}")

    # Token 流
    elif "messages" in event:
        msg = event["messages"]
        if hasattr(msg, "content") and msg.content:
            print(msg.content, end="", flush=True)

print("\n\n处理结束!")

流式事件结构

updates 模式事件

python
{
    "llm": {
        "content": "...",
        "tool_calls": [...],
        "response_metadata": {...}
    }
}

# 或

{
    "tools": {
        "tool_name": "工具返回结果"
    }
}

messages 模式事件

python
AIMessageChunk(
    content="token内容",
    id="msg_xxx",
    response_metadata={...}
)

custom 模式事件

python
"自定义消息字符串"
# 或任何可序列化对象

最佳实践

场景推荐模式
聊天应用messages - 逐字显示回复
复杂任务updates - 显示执行步骤
长时间处理custom - 显示进度更新
调试开发["updates", "messages"] - 完整信息

注意事项

  1. 网络开销 - 流式会增加网络请求次数
  2. 错误处理 - 流式过程中的错误需要特别处理
  3. 取消支持 - 考虑支持用户中断长时间流式
  4. 缓冲区 - 某些环境需要刷新缓冲区才能实时显示

上一节3.5 Short-term Memory

下一节3.7 Structured Output

基于 MIT 许可证发布。内容版权归作者所有。