Skip to content

6.3 实战:异步 Agent

🎯 小白理解指南:这一节讲什么?

前面学了异步的基础知识,这一节要真正用起来

我们会实现:

  1. 异步 LangChain Agent:让 AI 助手同时调用多个工具
  2. 流式响应:像 ChatGPT 那样一个字一个字"打印"出来
  3. 批处理系统:同时处理大量任务
  4. 重试机制:API 失败时自动重试
  5. 性能监控:追踪每个函数的执行时间

异步 LangChain Agent

🎯 小白理解指南:@tool 装饰器的异步版本

@tool 可以用在 async def 上,让工具变成异步的。 Agent 调用这个工具时,会用 await 等待结果,期间可以做其他事。

python
import asyncio
from typing import List, Dict, Any
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import tool

@tool
async def async_search(query: str) -> str:
    """异步搜索工具"""
    await asyncio.sleep(0.5)  # 模拟 API 延迟(实际会是真实的搜索请求)
    return f"搜索结果: {query}"

@tool
async def async_calculator(expression: str) -> str:
    """异步计算工具"""
    await asyncio.sleep(0.3)
    try:
        result = eval(expression)
        return f"计算结果: {result}"
    except Exception as e:
        return f"错误: {e}"

class AsyncResearchAgent:
    """异步研究 Agent"""
    
    def __init__(self, model: str = "gpt-4"):
        self.llm = ChatOpenAI(model=model, temperature=0)
        self.tools = [async_search, async_calculator]
        
        # 创建 prompt
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", "你是一个研究助手,擅长搜索和计算。"),
            ("user", "{input}"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ])
        
        # 创建 agent
        agent = create_openai_tools_agent(
            llm=self.llm,
            tools=self.tools,
            prompt=self.prompt
        )
        
        self.executor = AgentExecutor(
            agent=agent,
            tools=self.tools,
            verbose=True
        )
    
    async def arun(self, query: str) -> Dict[str, Any]:
        """异步执行查询"""
        result = await self.executor.ainvoke({"input": query})
        return result

async def main():
    agent = AsyncResearchAgent()
    
    # 并发执行多个查询
    queries = [
        "搜索 Python 异步编程最佳实践",
        "计算 123 + 456",
        "搜索 LangChain 最新版本"
    ]
    
    import time
    start = time.time()
    
    results = await asyncio.gather(*[
        agent.arun(query) for query in queries
    ])
    
    end = time.time()
    
    print(f"\n总耗时: {end - start:.2f}秒")
    for query, result in zip(queries, results):
        print(f"\n查询: {query}")
        print(f"结果: {result['output']}")

# 运行
if __name__ == "__main__":
    asyncio.run(main())

异步流式响应

🎯 小白理解指南:什么是"流式响应"?

用过 ChatGPT 吗?它的回答是**一个字一个字"打出来"**的,而不是等全部生成完再显示。

这就是"流式响应"(Streaming):

  • 传统方式:等 AI 想完所有内容,一次性返回(可能等很久)
  • 流式方式:AI 边想边返回,用户边看边等(体验更好)

在 Python 中,用 async for 来接收流式数据,用 yield 来发送流式数据。

python
import asyncio
from typing import AsyncIterator
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

class StreamingAgent:
    """流式响应 Agent"""

    def __init__(self, model: str = "gpt-4"):
        self.llm = ChatOpenAI(
            model=model,
            streaming=True,  # 🎯 开启流式模式
            temperature=0.7
        )

    async def astream(self, query: str) -> AsyncIterator[str]:
        """异步流式生成响应"""
        # 🎯 astream() 返回异步迭代器,每次 yield 一小块内容
        async for chunk in self.llm.astream([HumanMessage(content=query)]):
            if hasattr(chunk, 'content'):
                yield chunk.content  # 逐块返回,实现"打字机效果"

async def demo_streaming():
    """演示流式响应"""
    agent = StreamingAgent()
    
    print("用户: 解释什么是异步编程\n")
    print("Agent: ", end="", flush=True)
    
    async for token in agent.astream("解释什么是异步编程,用简单的语言"):
        print(token, end="", flush=True)
        await asyncio.sleep(0.05)  # 模拟打字机效果
    
    print("\n")

# 运行
asyncio.run(demo_streaming())

异步批处理系统

🎯 小白理解指南:什么是"批处理"?

想象你是餐厅服务员,有 100 个订单要处理:

  • 一个一个做:太慢了!
  • 100 个同时做:厨房忙不过来,会出错!
  • 批处理:同时做 5 个,完成 1 个再开始下一个,既快又稳定

异步批处理就是这个原理:

  • 用**队列(Queue)**存放待处理的任务
  • 用**工作协程(Worker)**来处理任务
  • 用**信号量(Semaphore)**限制同时处理的数量

这样既能利用异步的速度优势,又不会同时发起太多请求导致系统崩溃。

python
import asyncio
from typing import List, Dict, Any
from dataclasses import dataclass, field
from datetime import datetime

@dataclass
class Task:
    """任务数据类"""
    id: str
    query: str
    priority: int = 1
    created_at: datetime = field(default_factory=datetime.now)
    result: Any = None
    status: str = "pending"  # pending, processing, completed, failed

class AsyncBatchProcessor:
    """异步批处理器"""

    def __init__(self, max_workers: int = 5):
        self.max_workers = max_workers  # 最多同时处理 5 个任务
        self.queue: asyncio.Queue = asyncio.Queue()  # 任务队列
        self.results: Dict[str, Task] = {}  # 存储结果
        # 🎯 Semaphore(信号量):限制并发数的"门票"
        # 同一时间最多有 max_workers 个任务能拿到"门票"执行
        self.semaphore = asyncio.Semaphore(max_workers)
    
    async def add_task(self, task: Task):
        """添加任务到队列"""
        await self.queue.put(task)
        self.results[task.id] = task
    
    async def process_task(self, task: Task) -> Task:
        """处理单个任务"""
        async with self.semaphore:  # 限制并发数
            task.status = "processing"
            print(f"处理任务 {task.id}: {task.query}")
            
            try:
                # 模拟处理(实际场景中调用 LLM)
                await asyncio.sleep(task.priority * 0.5)
                task.result = f"结果: {task.query}"
                task.status = "completed"
            except Exception as e:
                task.status = "failed"
                task.result = str(e)
            
            return task
    
    async def worker(self):
        """工作协程"""
        while True:
            task = await self.queue.get()
            if task is None:  # 停止信号
                break
            
            await self.process_task(task)
            self.queue.task_done()
    
    async def process_all(self, tasks: List[Task]) -> List[Task]:
        """批量处理所有任务"""
        # 添加所有任务
        for task in tasks:
            await self.add_task(task)
        
        # 启动工作协程
        workers = [
            asyncio.create_task(self.worker())
            for _ in range(self.max_workers)
        ]
        
        # 等待所有任务完成
        await self.queue.join()
        
        # 停止工作协程
        for _ in range(self.max_workers):
            await self.queue.put(None)
        
        await asyncio.gather(*workers)
        
        return list(self.results.values())

async def demo_batch_processing():
    """演示批处理"""
    processor = AsyncBatchProcessor(max_workers=3)
    
    # 创建任务
    tasks = [
        Task(id=f"task-{i}", query=f"查询 {i}", priority=i % 3 + 1)
        for i in range(10)
    ]
    
    import time
    start = time.time()
    
    results = await processor.process_all(tasks)
    
    end = time.time()
    
    print(f"\n处理完成,耗时: {end - start:.2f}秒")
    print(f"成功: {sum(1 for t in results if t.status == 'completed')}")
    print(f"失败: {sum(1 for t in results if t.status == 'failed')}")

# 运行
asyncio.run(demo_batch_processing())

异步重试与错误处理

🎯 小白理解指南:为什么需要"重试机制"?

调用外部 API 经常会失败:

  • 网络抖动
  • 服务器暂时过载
  • API 限流(调用太频繁)

这些失败通常是暂时的,过一会儿再试就好了。

**指数退避(Exponential Backoff)**是最佳实践:

  • 第 1 次失败:等 1 秒再试
  • 第 2 次失败:等 2 秒再试
  • 第 3 次失败:等 4 秒再试
  • ...

等待时间越来越长,给服务器喘息的时间。

python
import asyncio
from typing import Callable, Any, TypeVar
from functools import wraps

T = TypeVar('T')

def async_retry(
    max_retries: int = 3,      # 最多重试 3 次
    delay: float = 1.0,         # 初始等待 1 秒
    backoff: float = 2.0,       # 每次等待时间翻倍
    exceptions: tuple = (Exception,)  # 捕获哪些异常
):
    """异步重试装饰器"""
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @wraps(func)
        async def wrapper(*args, **kwargs) -> T:
            current_delay = delay
            last_exception = None

            for attempt in range(max_retries):
                try:
                    return await func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    if attempt < max_retries - 1:
                        print(f"重试 {attempt + 1}/{max_retries}: {e}")
                        await asyncio.sleep(current_delay)
                        current_delay *= backoff  # 等待时间翻倍

            raise last_exception  # 全部重试失败,抛出最后的异常

        return wrapper
    return decorator

class APIError(Exception):
    """API 错误"""
    pass

class AsyncAPIClient:
    """异步 API 客户端"""
    
    def __init__(self):
        self.call_count = 0
    
    @async_retry(max_retries=3, delay=0.5, exceptions=(APIError,))
    async def call_api(self, endpoint: str) -> Dict[str, Any]:
        """调用 API(带重试)"""
        self.call_count += 1
        
        # 模拟前两次失败
        if self.call_count < 3:
            raise APIError(f"API 调用失败 (尝试 {self.call_count})")
        
        # 第三次成功
        await asyncio.sleep(0.1)
        return {"status": "success", "data": endpoint}

async def demo_retry():
    """演示重试机制"""
    client = AsyncAPIClient()
    
    try:
        result = await client.call_api("/chat")
        print(f"成功: {result}")
    except APIError as e:
        print(f"最终失败: {e}")

# 运行
asyncio.run(demo_retry())

性能监控

🎯 小白理解指南:为什么需要"性能监控"?

当你的 Agent 跑起来后,你需要知道:

  • 每个函数执行多久?(是否太慢?)
  • 哪个函数调用最频繁?
  • 有多少请求失败了?

性能监控就是给函数装个"计时器",自动记录这些数据。

有了这些数据,你就能找到瓶颈,针对性地优化。

python
import asyncio
import time
from typing import Callable, Any
from functools import wraps
from dataclasses import dataclass, field
from collections import defaultdict

@dataclass
class PerformanceMetrics:
    """性能指标——记录一个函数的执行统计"""
    total_calls: int = 0       # 总调用次数
    total_time: float = 0.0    # 总耗时
    min_time: float = float('inf')  # 最短耗时
    max_time: float = 0.0      # 最长耗时
    errors: int = 0            # 错误次数

    @property
    def avg_time(self) -> float:
        """平均耗时"""
        return self.total_time / self.total_calls if self.total_calls > 0 else 0.0

class PerformanceMonitor:
    """性能监控器"""
    
    def __init__(self):
        self.metrics: Dict[str, PerformanceMetrics] = defaultdict(PerformanceMetrics)
    
    def track(self, func: Callable) -> Callable:
        """跟踪函数性能"""
        @wraps(func)
        async def wrapper(*args, **kwargs) -> Any:
            func_name = func.__name__
            start = time.time()
            
            try:
                result = await func(*args, **kwargs)
                elapsed = time.time() - start
                
                # 更新指标
                metrics = self.metrics[func_name]
                metrics.total_calls += 1
                metrics.total_time += elapsed
                metrics.min_time = min(metrics.min_time, elapsed)
                metrics.max_time = max(metrics.max_time, elapsed)
                
                return result
            
            except Exception as e:
                elapsed = time.time() - start
                self.metrics[func_name].errors += 1
                raise
        
        return wrapper
    
    def report(self) -> str:
        """生成性能报告"""
        lines = ["\n=== 性能报告 ==="]
        
        for func_name, metrics in self.metrics.items():
            lines.append(f"\n{func_name}:")
            lines.append(f"  调用次数: {metrics.total_calls}")
            lines.append(f"  平均耗时: {metrics.avg_time:.4f}秒")
            lines.append(f"  最小耗时: {metrics.min_time:.4f}秒")
            lines.append(f"  最大耗时: {metrics.max_time:.4f}秒")
            lines.append(f"  错误次数: {metrics.errors}")
        
        return "\n".join(lines)

# 使用示例
monitor = PerformanceMonitor()

class MonitoredAgent:
    """带监控的 Agent"""
    
    @monitor.track
    async def process_query(self, query: str) -> str:
        """处理查询"""
        await asyncio.sleep(0.1 + len(query) * 0.01)
        return f"处理完成: {query}"
    
    @monitor.track
    async def batch_process(self, queries: List[str]) -> List[str]:
        """批量处理"""
        results = await asyncio.gather(*[
            self.process_query(q) for q in queries
        ])
        return results

async def demo_monitoring():
    """演示性能监控"""
    agent = MonitoredAgent()
    
    # 单次调用
    await agent.process_query("测试查询")
    
    # 批量调用
    queries = [f"查询 {i}" for i in range(10)]
    await agent.batch_process(queries)
    
    # 打印报告
    print(monitor.report())

# 运行
asyncio.run(demo_monitoring())

关键要点

🎯 小白总结:这一节学了什么?

技术解决什么问题核心 API
异步 Agent同时调用多个 APIawait, gather()
流式响应实时显示 AI 输出async for, yield
批处理控制并发数量Queue, Semaphore
重试机制处理临时失败async_retry 装饰器
性能监控找出性能瓶颈time.time(), 装饰器
  1. 并发控制:使用 Semaphore 限制并发数,避免资源耗尽
  2. 错误处理:实现重试机制和异常处理
  3. 流式响应:提供实时反馈,改善用户体验
  4. 批处理:使用队列和工作协程处理大量任务
  5. 性能监控:跟踪关键指标,优化性能

下一节:6.4 小结和复习

基于 MIT 许可证发布。内容版权归作者所有。