6.3 实战:异步 Agent
🎯 小白理解指南:这一节讲什么?
前面学了异步的基础知识,这一节要真正用起来!
我们会实现:
- 异步 LangChain Agent:让 AI 助手同时调用多个工具
- 流式响应:像 ChatGPT 那样一个字一个字"打印"出来
- 批处理系统:同时处理大量任务
- 重试机制:API 失败时自动重试
- 性能监控:追踪每个函数的执行时间
异步 LangChain Agent
🎯 小白理解指南:@tool 装饰器的异步版本
@tool可以用在async def上,让工具变成异步的。 Agent 调用这个工具时,会用await等待结果,期间可以做其他事。
python
import asyncio
from typing import List, Dict, Any
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import tool
@tool
async def async_search(query: str) -> str:
"""异步搜索工具"""
await asyncio.sleep(0.5) # 模拟 API 延迟(实际会是真实的搜索请求)
return f"搜索结果: {query}"
@tool
async def async_calculator(expression: str) -> str:
"""异步计算工具"""
await asyncio.sleep(0.3)
try:
result = eval(expression)
return f"计算结果: {result}"
except Exception as e:
return f"错误: {e}"
class AsyncResearchAgent:
"""异步研究 Agent"""
def __init__(self, model: str = "gpt-4"):
self.llm = ChatOpenAI(model=model, temperature=0)
self.tools = [async_search, async_calculator]
# 创建 prompt
self.prompt = ChatPromptTemplate.from_messages([
("system", "你是一个研究助手,擅长搜索和计算。"),
("user", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
# 创建 agent
agent = create_openai_tools_agent(
llm=self.llm,
tools=self.tools,
prompt=self.prompt
)
self.executor = AgentExecutor(
agent=agent,
tools=self.tools,
verbose=True
)
async def arun(self, query: str) -> Dict[str, Any]:
"""异步执行查询"""
result = await self.executor.ainvoke({"input": query})
return result
async def main():
agent = AsyncResearchAgent()
# 并发执行多个查询
queries = [
"搜索 Python 异步编程最佳实践",
"计算 123 + 456",
"搜索 LangChain 最新版本"
]
import time
start = time.time()
results = await asyncio.gather(*[
agent.arun(query) for query in queries
])
end = time.time()
print(f"\n总耗时: {end - start:.2f}秒")
for query, result in zip(queries, results):
print(f"\n查询: {query}")
print(f"结果: {result['output']}")
# 运行
if __name__ == "__main__":
asyncio.run(main())异步流式响应
🎯 小白理解指南:什么是"流式响应"?
用过 ChatGPT 吗?它的回答是**一个字一个字"打出来"**的,而不是等全部生成完再显示。
这就是"流式响应"(Streaming):
- 传统方式:等 AI 想完所有内容,一次性返回(可能等很久)
- 流式方式:AI 边想边返回,用户边看边等(体验更好)
在 Python 中,用
async for来接收流式数据,用yield来发送流式数据。
python
import asyncio
from typing import AsyncIterator
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
class StreamingAgent:
"""流式响应 Agent"""
def __init__(self, model: str = "gpt-4"):
self.llm = ChatOpenAI(
model=model,
streaming=True, # 🎯 开启流式模式
temperature=0.7
)
async def astream(self, query: str) -> AsyncIterator[str]:
"""异步流式生成响应"""
# 🎯 astream() 返回异步迭代器,每次 yield 一小块内容
async for chunk in self.llm.astream([HumanMessage(content=query)]):
if hasattr(chunk, 'content'):
yield chunk.content # 逐块返回,实现"打字机效果"
async def demo_streaming():
"""演示流式响应"""
agent = StreamingAgent()
print("用户: 解释什么是异步编程\n")
print("Agent: ", end="", flush=True)
async for token in agent.astream("解释什么是异步编程,用简单的语言"):
print(token, end="", flush=True)
await asyncio.sleep(0.05) # 模拟打字机效果
print("\n")
# 运行
asyncio.run(demo_streaming())异步批处理系统
🎯 小白理解指南:什么是"批处理"?
想象你是餐厅服务员,有 100 个订单要处理:
- 一个一个做:太慢了!
- 100 个同时做:厨房忙不过来,会出错!
- 批处理:同时做 5 个,完成 1 个再开始下一个,既快又稳定
异步批处理就是这个原理:
- 用**队列(Queue)**存放待处理的任务
- 用**工作协程(Worker)**来处理任务
- 用**信号量(Semaphore)**限制同时处理的数量
这样既能利用异步的速度优势,又不会同时发起太多请求导致系统崩溃。
python
import asyncio
from typing import List, Dict, Any
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class Task:
"""任务数据类"""
id: str
query: str
priority: int = 1
created_at: datetime = field(default_factory=datetime.now)
result: Any = None
status: str = "pending" # pending, processing, completed, failed
class AsyncBatchProcessor:
"""异步批处理器"""
def __init__(self, max_workers: int = 5):
self.max_workers = max_workers # 最多同时处理 5 个任务
self.queue: asyncio.Queue = asyncio.Queue() # 任务队列
self.results: Dict[str, Task] = {} # 存储结果
# 🎯 Semaphore(信号量):限制并发数的"门票"
# 同一时间最多有 max_workers 个任务能拿到"门票"执行
self.semaphore = asyncio.Semaphore(max_workers)
async def add_task(self, task: Task):
"""添加任务到队列"""
await self.queue.put(task)
self.results[task.id] = task
async def process_task(self, task: Task) -> Task:
"""处理单个任务"""
async with self.semaphore: # 限制并发数
task.status = "processing"
print(f"处理任务 {task.id}: {task.query}")
try:
# 模拟处理(实际场景中调用 LLM)
await asyncio.sleep(task.priority * 0.5)
task.result = f"结果: {task.query}"
task.status = "completed"
except Exception as e:
task.status = "failed"
task.result = str(e)
return task
async def worker(self):
"""工作协程"""
while True:
task = await self.queue.get()
if task is None: # 停止信号
break
await self.process_task(task)
self.queue.task_done()
async def process_all(self, tasks: List[Task]) -> List[Task]:
"""批量处理所有任务"""
# 添加所有任务
for task in tasks:
await self.add_task(task)
# 启动工作协程
workers = [
asyncio.create_task(self.worker())
for _ in range(self.max_workers)
]
# 等待所有任务完成
await self.queue.join()
# 停止工作协程
for _ in range(self.max_workers):
await self.queue.put(None)
await asyncio.gather(*workers)
return list(self.results.values())
async def demo_batch_processing():
"""演示批处理"""
processor = AsyncBatchProcessor(max_workers=3)
# 创建任务
tasks = [
Task(id=f"task-{i}", query=f"查询 {i}", priority=i % 3 + 1)
for i in range(10)
]
import time
start = time.time()
results = await processor.process_all(tasks)
end = time.time()
print(f"\n处理完成,耗时: {end - start:.2f}秒")
print(f"成功: {sum(1 for t in results if t.status == 'completed')}")
print(f"失败: {sum(1 for t in results if t.status == 'failed')}")
# 运行
asyncio.run(demo_batch_processing())异步重试与错误处理
🎯 小白理解指南:为什么需要"重试机制"?
调用外部 API 经常会失败:
- 网络抖动
- 服务器暂时过载
- API 限流(调用太频繁)
这些失败通常是暂时的,过一会儿再试就好了。
**指数退避(Exponential Backoff)**是最佳实践:
- 第 1 次失败:等 1 秒再试
- 第 2 次失败:等 2 秒再试
- 第 3 次失败:等 4 秒再试
- ...
等待时间越来越长,给服务器喘息的时间。
python
import asyncio
from typing import Callable, Any, TypeVar
from functools import wraps
T = TypeVar('T')
def async_retry(
max_retries: int = 3, # 最多重试 3 次
delay: float = 1.0, # 初始等待 1 秒
backoff: float = 2.0, # 每次等待时间翻倍
exceptions: tuple = (Exception,) # 捕获哪些异常
):
"""异步重试装饰器"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
async def wrapper(*args, **kwargs) -> T:
current_delay = delay
last_exception = None
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_retries - 1:
print(f"重试 {attempt + 1}/{max_retries}: {e}")
await asyncio.sleep(current_delay)
current_delay *= backoff # 等待时间翻倍
raise last_exception # 全部重试失败,抛出最后的异常
return wrapper
return decorator
class APIError(Exception):
"""API 错误"""
pass
class AsyncAPIClient:
"""异步 API 客户端"""
def __init__(self):
self.call_count = 0
@async_retry(max_retries=3, delay=0.5, exceptions=(APIError,))
async def call_api(self, endpoint: str) -> Dict[str, Any]:
"""调用 API(带重试)"""
self.call_count += 1
# 模拟前两次失败
if self.call_count < 3:
raise APIError(f"API 调用失败 (尝试 {self.call_count})")
# 第三次成功
await asyncio.sleep(0.1)
return {"status": "success", "data": endpoint}
async def demo_retry():
"""演示重试机制"""
client = AsyncAPIClient()
try:
result = await client.call_api("/chat")
print(f"成功: {result}")
except APIError as e:
print(f"最终失败: {e}")
# 运行
asyncio.run(demo_retry())性能监控
🎯 小白理解指南:为什么需要"性能监控"?
当你的 Agent 跑起来后,你需要知道:
- 每个函数执行多久?(是否太慢?)
- 哪个函数调用最频繁?
- 有多少请求失败了?
性能监控就是给函数装个"计时器",自动记录这些数据。
有了这些数据,你就能找到瓶颈,针对性地优化。
python
import asyncio
import time
from typing import Callable, Any
from functools import wraps
from dataclasses import dataclass, field
from collections import defaultdict
@dataclass
class PerformanceMetrics:
"""性能指标——记录一个函数的执行统计"""
total_calls: int = 0 # 总调用次数
total_time: float = 0.0 # 总耗时
min_time: float = float('inf') # 最短耗时
max_time: float = 0.0 # 最长耗时
errors: int = 0 # 错误次数
@property
def avg_time(self) -> float:
"""平均耗时"""
return self.total_time / self.total_calls if self.total_calls > 0 else 0.0
class PerformanceMonitor:
"""性能监控器"""
def __init__(self):
self.metrics: Dict[str, PerformanceMetrics] = defaultdict(PerformanceMetrics)
def track(self, func: Callable) -> Callable:
"""跟踪函数性能"""
@wraps(func)
async def wrapper(*args, **kwargs) -> Any:
func_name = func.__name__
start = time.time()
try:
result = await func(*args, **kwargs)
elapsed = time.time() - start
# 更新指标
metrics = self.metrics[func_name]
metrics.total_calls += 1
metrics.total_time += elapsed
metrics.min_time = min(metrics.min_time, elapsed)
metrics.max_time = max(metrics.max_time, elapsed)
return result
except Exception as e:
elapsed = time.time() - start
self.metrics[func_name].errors += 1
raise
return wrapper
def report(self) -> str:
"""生成性能报告"""
lines = ["\n=== 性能报告 ==="]
for func_name, metrics in self.metrics.items():
lines.append(f"\n{func_name}:")
lines.append(f" 调用次数: {metrics.total_calls}")
lines.append(f" 平均耗时: {metrics.avg_time:.4f}秒")
lines.append(f" 最小耗时: {metrics.min_time:.4f}秒")
lines.append(f" 最大耗时: {metrics.max_time:.4f}秒")
lines.append(f" 错误次数: {metrics.errors}")
return "\n".join(lines)
# 使用示例
monitor = PerformanceMonitor()
class MonitoredAgent:
"""带监控的 Agent"""
@monitor.track
async def process_query(self, query: str) -> str:
"""处理查询"""
await asyncio.sleep(0.1 + len(query) * 0.01)
return f"处理完成: {query}"
@monitor.track
async def batch_process(self, queries: List[str]) -> List[str]:
"""批量处理"""
results = await asyncio.gather(*[
self.process_query(q) for q in queries
])
return results
async def demo_monitoring():
"""演示性能监控"""
agent = MonitoredAgent()
# 单次调用
await agent.process_query("测试查询")
# 批量调用
queries = [f"查询 {i}" for i in range(10)]
await agent.batch_process(queries)
# 打印报告
print(monitor.report())
# 运行
asyncio.run(demo_monitoring())关键要点
🎯 小白总结:这一节学了什么?
技术 解决什么问题 核心 API 异步 Agent 同时调用多个 API await,gather()流式响应 实时显示 AI 输出 async for,yield批处理 控制并发数量 Queue,Semaphore重试机制 处理临时失败 async_retry装饰器性能监控 找出性能瓶颈 time.time(), 装饰器
- 并发控制:使用
Semaphore限制并发数,避免资源耗尽 - 错误处理:实现重试机制和异常处理
- 流式响应:提供实时反馈,改善用户体验
- 批处理:使用队列和工作协程处理大量任务
- 性能监控:跟踪关键指标,优化性能
下一节:6.4 小结和复习