Skip to content

6.4 小结和复习

本章核心概念

🎯 小白总结:异步编程核心记忆卡

概念用途生活比喻
async def定义异步函数创建一个"可暂停"的任务
await等待操作完成等外卖时可以刷手机
gather()同时执行多个任务同时点咖啡和蛋糕
Semaphore限制并发数餐厅只有5个厨师
Queue任务排队取号机
流式响应边生成边返回打字机效果

1. 异步编程基础

  • async/await 语法:定义和调用异步函数
  • asyncio.gather():并发执行多个协程
  • asyncio.create_task():创建后台任务
  • 并发 vs 并行:理解 I/O 密集型任务的优化

🎯 小白注意:并发 ≠ 并行

  • 并发(Concurrent):交替执行多个任务(单核也能做到)
  • 并行(Parallel):真正同时执行(需要多核)

异步是并发,适合 I/O 密集型任务(等网络、等文件)。 多进程是并行,适合 CPU 密集型任务(复杂计算)。

2. 异步工具

  • 异步上下文管理器__aenter____aexit__
  • 异步迭代器__aiter____anext__
  • 异步工具类:实现 arun() 方法
  • 异步装饰器:包装异步函数

3. 实战技巧

  • 流式响应:实时生成和展示结果
  • 批处理系统:队列 + 工作协程 + Semaphore
  • 重试机制:指数退避策略
  • 性能监控:跟踪耗时和错误率

知识自测

🎯 小白提示:做题前的核心概念回顾

  1. 异步 = 不傻等:遇到等待就去做别的事
  2. I/O 密集型用异步:网络请求、文件读写、数据库查询
  3. CPU 密集型用多进程:复杂计算、图像处理
  4. 关键语法async def 定义、await 等待、gather() 并发

选择题

  1. 下面哪个不是使用异步编程的主要原因?

    • A. 提高 I/O 密集型任务的效率
    • B. 实现 CPU 密集型任务的并行计算
    • C. 减少等待时间
    • D. 提高系统吞吐量
  2. asyncio.gather()asyncio.wait() 的主要区别是?

    • A. gather 返回结果列表,wait 返回完成和待完成的集合
    • B. gather 更快
    • C. wait 更安全
    • D. 没有区别
  3. 异步上下文管理器需要实现哪些方法?

    • A. __enter____exit__
    • B. __aenter____aexit__
    • C. __init____del__
    • D. __call____await__
查看答案
  1. B - 异步编程主要用于 I/O 密集型任务,CPU 密集型任务需要多进程
  2. A - gather 返回结果列表,wait 返回 (done, pending) 集合
  3. B - 异步上下文管理器使用 __aenter____aexit__

高难度编程挑战

🎯 小白须知:这些挑战是干什么的?

这两个挑战是进阶练习,用来检验你对异步编程的深入理解。

  • 挑战 1(调度器):像医院的叫号系统——管理任务的优先级、等待、执行
  • 挑战 2(分布式):像外卖平台——多个骑手(Agent)协作完成多个订单

如果感觉太难,可以先跳过,学完后面的章节再回来挑战!

挑战 1:智能任务调度器(难度:⭐⭐⭐⭐)

🎯 小白理解指南:什么是"任务调度器"?

想象你是医院的导诊台,需要安排病人看病:

调度器功能医院比喻
优先级队列急诊优先、老人优先
任务依赖先抽血,再看化验结果
并发限制只有 5 个诊室
超时控制等太久就重新排队
重试机制检查失败,重做一次

这个调度器就是用代码实现这套"医院管理系统"!

需求: 实现一个智能的异步任务调度器,支持以下功能:

  1. 优先级队列(高优先级任务优先执行)
  2. 任务依赖(任务 B 依赖任务 A 的结果)
  3. 并发限制(最多同时执行 N 个任务)
  4. 超时控制(任务超时自动取消)
  5. 重试机制(失败任务自动重试)
  6. 实时监控(显示任务状态和进度)

代码框架

🎯 代码解读:关键概念速查

代码作用生活比喻
heapq优先级队列(自动排序)VIP 通道
TaskStatus任务状态枚举订单状态:待处理/进行中/已完成
@dataclass自动生成 __init__ 等方法简化代码的语法糖
Semaphore限制并发数只有 N 个窗口
dependencies任务依赖关系先做 A 才能做 B
python
import asyncio
from typing import List, Dict, Any, Optional, Callable, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import heapq  # 🎯 堆队列,自动按优先级排序

class TaskStatus(Enum):
    """任务状态——像订单的状态流转"""
    PENDING = "pending"      # 待处理
    WAITING = "waiting"      # 等待依赖(比如等化验结果)
    RUNNING = "running"      # 执行中
    COMPLETED = "completed"  # 已完成
    FAILED = "failed"        # 失败
    TIMEOUT = "timeout"      # 超时
    CANCELLED = "cancelled"  # 已取消

@dataclass  # 🎯 自动生成 __init__、__repr__ 等方法
class Task:
    """任务数据类"""
    id: str
    func: Callable
    args: tuple = field(default_factory=tuple)
    kwargs: dict = field(default_factory=dict)
    priority: int = 1  # 数字越小优先级越高
    dependencies: Set[str] = field(default_factory=set)
    timeout: float = 30.0
    max_retries: int = 3
    retry_delay: float = 1.0
    
    # 运行时状态
    status: TaskStatus = TaskStatus.PENDING
    result: Any = None
    error: Optional[Exception] = None
    retries: int = 0
    created_at: datetime = field(default_factory=datetime.now)
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    
    def __lt__(self, other):
        """支持优先级队列"""
        return self.priority < other.priority

class SmartScheduler:
    """智能任务调度器"""
    
    def __init__(self, max_workers: int = 5):
        self.max_workers = max_workers
        # TODO: 实现以下功能
        # 1. 使用 heapq 实现优先级队列
        # 2. 实现依赖关系管理
        # 3. 使用 Semaphore 控制并发
        # 4. 实现超时和重试
        # 5. 实现实时监控
        pass
    
    async def add_task(self, task: Task) -> str:
        """添加任务"""
        # TODO: 实现任务添加逻辑
        pass
    
    async def process_task(self, task: Task) -> Task:
        """处理单个任务"""
        # TODO: 实现任务处理逻辑
        # - 检查依赖是否完成
        # - 执行任务(带超时)
        # - 处理重试
        pass
    
    async def run(self) -> Dict[str, Task]:
        """运行调度器"""
        # TODO: 实现主调度循环
        pass
    
    def get_stats(self) -> Dict[str, Any]:
        """获取统计信息"""
        # TODO: 返回任务状态统计
        pass

# 测试代码
async def test_scheduler():
    scheduler = SmartScheduler(max_workers=3)
    
    # 定义测试任务
    async def task_a():
        await asyncio.sleep(1)
        return "A完成"
    
    async def task_b(a_result: str):
        await asyncio.sleep(1)
        return f"B完成(依赖: {a_result})"
    
    async def task_c():
        await asyncio.sleep(0.5)
        return "C完成"
    
    async def task_fail():
        raise Exception("模拟失败")
    
    # 添加任务
    task_a_id = await scheduler.add_task(Task(
        id="task-a",
        func=task_a,
        priority=1
    ))
    
    task_b_id = await scheduler.add_task(Task(
        id="task-b",
        func=task_b,
        priority=2,
        dependencies={"task-a"}
    ))
    
    await scheduler.add_task(Task(
        id="task-c",
        func=task_c,
        priority=1
    ))
    
    await scheduler.add_task(Task(
        id="task-fail",
        func=task_fail,
        max_retries=2
    ))
    
    # 运行调度器
    results = await scheduler.run()
    
    # 打印结果
    print("\n=== 任务结果 ===")
    for task_id, task in results.items():
        print(f"{task_id}: {task.status.value} - {task.result or task.error}")
    
    print(f"\n=== 统计信息 ===")
    print(scheduler.get_stats())

# asyncio.run(test_scheduler())

评分标准

  • 优先级队列正确实现(20分)
  • 依赖关系正确处理(25分)
  • 并发控制和超时处理(20分)
  • 重试机制(15分)
  • 实时监控和统计(20分)

挑战 2:分布式 Agent 系统(难度:⭐⭐⭐⭐⭐)

🎯 小白理解指南:什么是"分布式 Agent 系统"?

想象你在运营一个外卖平台

系统组件外卖平台比喻
Agent外卖骑手(各有专长:送餐、取餐)
Coordinator调度中心(分配订单)
能力注册骑手登记自己会送什么区域
任务分发根据骑手位置分配订单
消息传递骑手和调度中心的对讲机
负载均衡不让一个骑手太忙
容错机制骑手请假,订单转给别人

为什么这么难? 因为需要同时考虑:

  • 多个 Agent 并发运行
  • Agent 之间的通信
  • 任务的智能分配
  • 故障的自动恢复

需求: 设计一个分布式 Agent 系统,支持多个 Agent 协作完成复杂任务。

核心功能

  1. Agent 注册与发现:Agent 可以注册自己的能力
  2. 任务分发:根据 Agent 能力自动分发任务
  3. 消息传递:Agent 之间可以异步通信
  4. 负载均衡:自动分配任务到空闲 Agent
  5. 容错机制:Agent 失败时自动重新分配任务
  6. 协调器:中央协调器管理所有 Agent

代码框架

python
import asyncio
from typing import Dict, List, Set, Any, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
from abc import ABC, abstractmethod
import json

class AgentCapability(Enum):
    """Agent 能力"""
    SEARCH = "search"
    CALCULATE = "calculate"
    SUMMARIZE = "summarize"
    TRANSLATE = "translate"

class MessageType(Enum):
    """消息类型"""
    TASK_REQUEST = "task_request"
    TASK_RESULT = "task_result"
    AGENT_STATUS = "agent_status"
    HEARTBEAT = "heartbeat"

@dataclass
class Message:
    """消息数据类"""
    type: MessageType
    sender: str
    receiver: str
    content: Any
    timestamp: float = field(default_factory=lambda: asyncio.get_event_loop().time())

@dataclass
class AgentInfo:
    """Agent 信息"""
    id: str
    capabilities: Set[AgentCapability]
    status: str = "idle"  # idle, busy, offline
    tasks_completed: int = 0
    last_heartbeat: float = field(default_factory=lambda: asyncio.get_event_loop().time())

class BaseAgent(ABC):
    """Agent 基类"""
    
    def __init__(self, agent_id: str, capabilities: Set[AgentCapability]):
        self.id = agent_id
        self.capabilities = capabilities
        self.message_queue: asyncio.Queue = asyncio.Queue()
        self.coordinator: Optional['Coordinator'] = None
        self.running = False
    
    async def connect(self, coordinator: 'Coordinator'):
        """连接到协调器"""
        self.coordinator = coordinator
        await coordinator.register_agent(self)
    
    @abstractmethod
    async def process_task(self, task: Dict[str, Any]) -> Any:
        """处理任务(子类实现)"""
        pass
    
    async def run(self):
        """运行 Agent"""
        # TODO: 实现 Agent 主循环
        # - 接收消息
        # - 处理任务
        # - 发送心跳
        pass
    
    async def send_message(self, message: Message):
        """发送消息"""
        # TODO: 实现消息发送
        pass

class Coordinator:
    """协调器"""
    
    def __init__(self):
        self.agents: Dict[str, AgentInfo] = {}
        self.message_bus: Dict[str, asyncio.Queue] = {}
        self.task_queue: asyncio.Queue = asyncio.Queue()
        self.results: Dict[str, Any] = {}
    
    async def register_agent(self, agent: BaseAgent):
        """注册 Agent"""
        # TODO: 实现 Agent 注册逻辑
        pass
    
    async def assign_task(self, task: Dict[str, Any]) -> str:
        """分配任务"""
        # TODO: 实现任务分配逻辑
        # - 根据能力选择 Agent
        # - 实现负载均衡
        pass
    
    async def monitor_agents(self):
        """监控 Agent 状态"""
        # TODO: 实现监控逻辑
        # - 检查心跳
        # - 处理 Agent 失败
        pass
    
    async def run(self):
        """运行协调器"""
        # TODO: 实现协调器主循环
        pass

# 具体 Agent 实现
class SearchAgent(BaseAgent):
    """搜索 Agent"""
    
    def __init__(self, agent_id: str):
        super().__init__(agent_id, {AgentCapability.SEARCH})
    
    async def process_task(self, task: Dict[str, Any]) -> Any:
        query = task.get("query")
        await asyncio.sleep(1)  # 模拟搜索
        return {"results": f"搜索结果: {query}"}

class CalculatorAgent(BaseAgent):
    """计算 Agent"""
    
    def __init__(self, agent_id: str):
        super().__init__(agent_id, {AgentCapability.CALCULATE})
    
    async def process_task(self, task: Dict[str, Any]) -> Any:
        expression = task.get("expression")
        await asyncio.sleep(0.5)
        return {"result": eval(expression)}

# 测试代码
async def test_distributed_system():
    # 创建协调器
    coordinator = Coordinator()
    
    # 创建 Agent
    agents = [
        SearchAgent("search-1"),
        SearchAgent("search-2"),
        CalculatorAgent("calc-1"),
    ]
    
    # 连接 Agent
    for agent in agents:
        await agent.connect(coordinator)
    
    # 提交任务
    tasks = [
        {"type": AgentCapability.SEARCH, "query": "Python async"},
        {"type": AgentCapability.CALCULATE, "expression": "10 + 20"},
        {"type": AgentCapability.SEARCH, "query": "LangChain"},
    ]
    
    # TODO: 运行系统并获取结果
    pass

# asyncio.run(test_distributed_system())

评分标准

  • Agent 注册与能力管理(20分)
  • 任务分发和负载均衡(25分)
  • 消息传递机制(20分)
  • 容错和故障恢复(20分)
  • 系统监控和统计(15分)

参考解决方案

挑战 1 参考实现(点击展开)
python
import asyncio
from typing import List, Dict, Any, Optional, Callable, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import heapq

class TaskStatus(Enum):
    PENDING = "pending"
    WAITING = "waiting"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    TIMEOUT = "timeout"
    CANCELLED = "cancelled"

@dataclass
class Task:
    id: str
    func: Callable
    args: tuple = field(default_factory=tuple)
    kwargs: dict = field(default_factory=dict)
    priority: int = 1
    dependencies: Set[str] = field(default_factory=set)
    timeout: float = 30.0
    max_retries: int = 3
    retry_delay: float = 1.0
    
    status: TaskStatus = TaskStatus.PENDING
    result: Any = None
    error: Optional[Exception] = None
    retries: int = 0
    created_at: datetime = field(default_factory=datetime.now)
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    
    def __lt__(self, other):
        return self.priority < other.priority

class SmartScheduler:
    def __init__(self, max_workers: int = 5):
        self.max_workers = max_workers
        self.semaphore = asyncio.Semaphore(max_workers)
        self.tasks: Dict[str, Task] = {}
        self.pending_queue: List[Task] = []
        self.completed_dependencies: Dict[str, Any] = {}
        self.running_tasks: Set[str] = set()
    
    async def add_task(self, task: Task) -> str:
        self.tasks[task.id] = task
        heapq.heappush(self.pending_queue, task)
        return task.id
    
    def _can_run(self, task: Task) -> bool:
        """检查任务是否可以运行"""
        return all(dep in self.completed_dependencies for dep in task.dependencies)
    
    async def _execute_with_timeout(self, task: Task) -> Any:
        """执行任务(带超时)"""
        # 注入依赖结果
        if task.dependencies:
            dep_results = {dep: self.completed_dependencies[dep] 
                          for dep in task.dependencies}
            task.kwargs['dependencies'] = dep_results
        
        return await asyncio.wait_for(
            task.func(*task.args, **task.kwargs),
            timeout=task.timeout
        )
    
    async def process_task(self, task: Task) -> Task:
        async with self.semaphore:
            task.status = TaskStatus.RUNNING
            task.started_at = datetime.now()
            self.running_tasks.add(task.id)
            
            while task.retries <= task.max_retries:
                try:
                    task.result = await self._execute_with_timeout(task)
                    task.status = TaskStatus.COMPLETED
                    task.completed_at = datetime.now()
                    self.completed_dependencies[task.id] = task.result
                    break
                
                except asyncio.TimeoutError:
                    task.status = TaskStatus.TIMEOUT
                    task.error = Exception("任务超时")
                    break
                
                except Exception as e:
                    task.error = e
                    task.retries += 1
                    
                    if task.retries <= task.max_retries:
                        await asyncio.sleep(task.retry_delay * task.retries)
                    else:
                        task.status = TaskStatus.FAILED
                        break
            
            self.running_tasks.remove(task.id)
            return task
    
    async def run(self) -> Dict[str, Task]:
        workers = []
        
        while self.pending_queue or workers:
            # 启动可运行的任务
            ready_tasks = []
            remaining = []
            
            while self.pending_queue:
                task = heapq.heappop(self.pending_queue)
                if self._can_run(task):
                    ready_tasks.append(task)
                else:
                    task.status = TaskStatus.WAITING
                    remaining.append(task)
            
            # 重新加入队列
            for task in remaining:
                heapq.heappush(self.pending_queue, task)
            
            # 启动新任务
            for task in ready_tasks:
                workers.append(asyncio.create_task(self.process_task(task)))
            
            # 等待任务完成
            if workers:
                done, workers = await asyncio.wait(
                    workers,
                    return_when=asyncio.FIRST_COMPLETED
                )
            
            await asyncio.sleep(0.1)  # 短暂休眠,避免busy loop
        
        return self.tasks
    
    def get_stats(self) -> Dict[str, Any]:
        status_counts = {}
        for status in TaskStatus:
            status_counts[status.value] = sum(
                1 for t in self.tasks.values() if t.status == status
            )
        
        return {
            "total": len(self.tasks),
            "status": status_counts,
            "completed": sum(1 for t in self.tasks.values() 
                           if t.status == TaskStatus.COMPLETED),
            "failed": sum(1 for t in self.tasks.values() 
                         if t.status in [TaskStatus.FAILED, TaskStatus.TIMEOUT])
        }

学习资源

🎯 小白学习建议:异步编程的学习路线

  1. 先理解概念:搞清楚"为什么要异步"比"怎么写"更重要
  2. 多写多练:光看不练永远学不会,试着改改示例代码
  3. 从简单开始:先用 gather() 并发几个任务,再学复杂的
  4. 调试技巧:多用 print() 看执行顺序,理解并发行为

推荐阅读

进阶主题

  • asyncio 内部实现原理
  • uvloop 高性能事件循环
  • 异步数据库操作(asyncpg, motor)
  • WebSocket 和 Server-Sent Events

下一章:Module 7 - 环境管理与 API 集成

基于 MIT 许可证发布。内容版权归作者所有。