Skip to content

10.7 综合实战:构建端到端 AI 应用

从实验到生产:完整的 ML 生命周期

到目前为止,我们已经学习了 AI/ML 的各个组件。现在,让我们将所有知识整合,构建一个生产级的端到端 AI 应用。

💡 核心理念:在 Jupyter Notebook 里训练模型只是开始,真正的挑战是如何将模型部署到生产环境,并持续维护和改进。

完整的 ML 生命周期

1. 问题定义 → 确定业务目标和成功指标
2. 数据收集 → 构建数据 Pipeline
3. 探索分析 → EDA 和特征工程
4. 模型训练 → 实验跟踪和版本管理
5. 模型评估 → 离线评估和 A/B 测试
6. 模型部署 → API 服务和推理优化
7. 监控运维 → 性能监控和模型更新
8. 反馈迭代 → 收集用户反馈,持续改进

本章将构建一个智能客服问答系统,涵盖所有环节。


项目概述:智能客服问答系统

1. 业务需求

目标:构建一个智能客服系统,能够:

  • 理解用户问题
  • 从知识库中检索相关文档
  • 生成准确、自然的回答
  • 处理多轮对话
  • 支持高并发(1000+ QPS)

技术栈

  • 数据处理:Pandas, NumPy
  • 向量数据库:Chroma
  • 模型:Sentence Transformers + GPT
  • 后端框架:FastAPI
  • 实验跟踪:MLflow
  • 容器化:Docker
  • 监控:Prometheus + Grafana

2. 系统架构

用户请求

FastAPI 服务

    ├→ 问题理解(NLU)
    ├→ 检索增强(RAG)
    │   ├→ 向量检索(Chroma)
    │   └→ 重排序(Cross-Encoder)
    ├→ 答案生成(LLM)
    └→ 日志记录

返回答案

监控和分析

第一阶段:数据准备与特征工程

1. 数据采集

python
import pandas as pd
import numpy as np
from typing import List, Dict
import json

class DataCollector:
    """
    数据采集器

    功能:
    - 从多个来源采集数据
    - 数据清洗和标准化
    - 构建知识库
    """

    def __init__(self):
        self.knowledge_base = []

    def collect_from_faq(self, faq_file: str) -> List[Dict]:
        """从 FAQ 文件采集"""
        # 示例 FAQ 数据
        faq_data = [
            {
                "question": "如何重置密码?",
                "answer": "您可以点击登录页面的'忘记密码'链接,输入注册邮箱,我们会发送重置链接到您的邮箱。",
                "category": "账户管理",
                "keywords": ["密码", "重置", "找回"]
            },
            {
                "question": "如何联系客服?",
                "answer": "您可以通过以下方式联系我们:\n1. 在线客服(工作日 9:00-18:00)\n2. 客服邮箱:support@example.com\n3. 客服热线:400-123-4567",
                "category": "客户服务",
                "keywords": ["客服", "联系", "帮助"]
            },
            {
                "question": "订单多久发货?",
                "answer": "正常情况下,订单会在48小时内发货。节假日可能会延迟,具体请以物流信息为准。",
                "category": "物流配送",
                "keywords": ["发货", "订单", "物流"]
            }
        ]

        return faq_data

    def collect_from_docs(self, doc_dir: str) -> List[Dict]:
        """从文档目录采集"""
        # 实际项目中会读取 PDF、Markdown 等文档
        doc_data = [
            {
                "title": "产品使用指南",
                "content": "本产品是一款智能客服系统,支持自然语言问答...",
                "source": "user_manual.pdf",
                "page": 1
            }
        ]
        return doc_data

    def clean_and_structure(self, raw_data: List[Dict]) -> pd.DataFrame:
        """清洗和结构化数据"""
        df = pd.DataFrame(raw_data)

        # 数据清洗
        df = df.dropna(subset=['question', 'answer'])
        df['question'] = df['question'].str.strip()
        df['answer'] = df['answer'].str.strip()

        # 添加元数据
        df['doc_id'] = range(len(df))
        df['created_at'] = pd.Timestamp.now()

        return df

# 使用示例
collector = DataCollector()
faq_data = collector.collect_from_faq("faq.json")
df = collector.clean_and_structure(faq_data)

print(df.head())

2. 构建向量索引

python
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings

class VectorIndexBuilder:
    """
    向量索引构建器

    功能:
    - 使用 Sentence Transformers 生成嵌入
    - 存储到 Chroma 向量数据库
    - 支持增量更新
    """

    def __init__(
        self,
        model_name: str = "sentence-transformers/all-MiniLM-L6-v2",
        persist_directory: str = "./chroma_db"
    ):
        # 加载嵌入模型
        self.embedding_model = SentenceTransformer(model_name)

        # 初始化 Chroma 客户端
        self.chroma_client = chromadb.Client(Settings(
            chroma_db_impl="duckdb+parquet",
            persist_directory=persist_directory
        ))

        # 创建或获取 collection
        self.collection = self.chroma_client.get_or_create_collection(
            name="knowledge_base",
            metadata={"description": "FAQ and documentation"}
        )

    def build_index(self, df: pd.DataFrame) -> None:
        """构建向量索引"""
        # 生成嵌入
        questions = df['question'].tolist()
        embeddings = self.embedding_model.encode(questions, show_progress_bar=True)

        # 准备文档
        documents = []
        metadatas = []
        ids = []

        for idx, row in df.iterrows():
            doc_id = f"doc_{row['doc_id']}"
            ids.append(doc_id)

            # 组合问题和答案作为文档
            doc = f"问题:{row['question']}\n答案:{row['answer']}"
            documents.append(doc)

            # 元数据
            metadata = {
                "question": row['question'],
                "answer": row['answer'],
                "category": row.get('category', 'general')
            }
            metadatas.append(metadata)

        # 添加到 Chroma
        self.collection.add(
            embeddings=embeddings.tolist(),
            documents=documents,
            metadatas=metadatas,
            ids=ids
        )

        print(f"成功索引 {len(documents)} 个文档")

    def search(self, query: str, top_k: int = 5) -> List[Dict]:
        """
        语义搜索

        Args:
            query: 查询文本
            top_k: 返回前 k 个结果

        Returns:
            搜索结果列表
        """
        # 生成查询嵌入
        query_embedding = self.embedding_model.encode([query])[0]

        # 检索
        results = self.collection.query(
            query_embeddings=[query_embedding.tolist()],
            n_results=top_k
        )

        # 解析结果
        retrieved_docs = []
        for i in range(len(results['ids'][0])):
            doc = {
                'id': results['ids'][0][i],
                'document': results['documents'][0][i],
                'metadata': results['metadatas'][0][i],
                'distance': results['distances'][0][i] if 'distances' in results else None
            }
            retrieved_docs.append(doc)

        return retrieved_docs

# 使用示例
indexer = VectorIndexBuilder()
indexer.build_index(df)

# 测试检索
query = "忘记密码怎么办?"
results = indexer.search(query, top_k=3)

print(f"\n查询:{query}\n")
for i, result in enumerate(results):
    print(f"{i+1}. {result['metadata']['question']}")
    print(f"   相似度: {1 - result['distance']:.4f}")
    print()

第二阶段:模型训练与实验管理

1. 实验跟踪(MLflow)

python
import mlflow
import mlflow.pytorch
from typing import Dict, Any

class ExperimentTracker:
    """
    实验跟踪器

    功能:
    - 记录超参数和指标
    - 版本管理模型
    - 可视化实验结果
    """

    def __init__(self, experiment_name: str = "qa_system"):
        mlflow.set_experiment(experiment_name)
        self.experiment_name = experiment_name

    def log_params(self, params: Dict[str, Any]) -> None:
        """记录超参数"""
        mlflow.log_params(params)

    def log_metrics(self, metrics: Dict[str, float], step: int = None) -> None:
        """记录指标"""
        mlflow.log_metrics(metrics, step=step)

    def log_model(self, model, artifact_path: str = "model") -> None:
        """记录模型"""
        mlflow.pytorch.log_model(model, artifact_path)

    def start_run(self, run_name: str = None):
        """开始一个实验运行"""
        return mlflow.start_run(run_name=run_name)

# 使用示例
tracker = ExperimentTracker()

with tracker.start_run(run_name="baseline_model"):
    # 记录超参数
    params = {
        "embedding_model": "all-MiniLM-L6-v2",
        "top_k": 5,
        "temperature": 0.7
    }
    tracker.log_params(params)

    # 训练模型...
    # 记录指标
    metrics = {
        "accuracy": 0.85,
        "f1_score": 0.82,
        "latency_ms": 150
    }
    tracker.log_metrics(metrics)

2. 模型微调

python
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer
)
from datasets import Dataset
import torch

class IntentClassifier:
    """
    意图分类器

    功能:
    - 识别用户意图(问候、查询、投诉等)
    - 基于 BERT 微调
    - 支持多分类
    """

    def __init__(self, model_name: str = "bert-base-chinese", num_labels: int = 5):
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForSequenceClassification.from_pretrained(
            model_name,
            num_labels=num_labels
        )

        self.label_map = {
            0: "greeting",  # 问候
            1: "question",  # 问题查询
            2: "complaint",  # 投诉
            3: "feedback",  # 反馈
            4: "other"  # 其他
        }

    def train(
        self,
        train_texts: List[str],
        train_labels: List[int],
        output_dir: str = "./intent_model"
    ):
        """训练模型"""
        # 创建数据集
        train_dataset = Dataset.from_dict({
            "text": train_texts,
            "label": train_labels
        })

        def tokenize_function(examples):
            return self.tokenizer(
                examples["text"],
                padding="max_length",
                truncation=True,
                max_length=128
            )

        train_dataset = train_dataset.map(tokenize_function, batched=True)

        # 训练参数
        training_args = TrainingArguments(
            output_dir=output_dir,
            num_train_epochs=3,
            per_device_train_batch_size=16,
            save_steps=500,
            logging_steps=100,
            evaluation_strategy="steps",
            eval_steps=500
        )

        # Trainer
        trainer = Trainer(
            model=self.model,
            args=training_args,
            train_dataset=train_dataset,
        )

        # 训练
        trainer.train()

        # 保存模型
        self.model.save_pretrained(output_dir)
        self.tokenizer.save_pretrained(output_dir)

    def predict(self, text: str) -> Dict[str, Any]:
        """预测意图"""
        inputs = self.tokenizer(
            text,
            return_tensors="pt",
            padding=True,
            truncation=True,
            max_length=128
        )

        with torch.no_grad():
            outputs = self.model(**inputs)
            logits = outputs.logits
            probabilities = torch.softmax(logits, dim=-1)

        pred_label = probabilities.argmax().item()
        pred_score = probabilities.max().item()

        return {
            "intent": self.label_map[pred_label],
            "confidence": pred_score
        }

# 使用示例(假设有训练数据)
# classifier = IntentClassifier()
# classifier.train(train_texts, train_labels)
# result = classifier.predict("如何重置我的密码?")
# print(result)

第三阶段:构建 RAG 系统

1. 检索增强生成(RAG)

python
from transformers import AutoTokenizer, AutoModelForCausalLM
from typing import List, Tuple
import torch

class RAGSystem:
    """
    检索增强生成系统

    流程:
    1. 语义检索相关文档
    2. 构建上下文 prompt
    3. LLM 生成答案
    """

    def __init__(
        self,
        retriever: VectorIndexBuilder,
        model_name: str = "gpt2"
    ):
        self.retriever = retriever

        # 加载生成模型
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForCausalLM.from_pretrained(model_name)

        # 设置 pad_token
        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token

        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model.to(self.device)

    def retrieve(self, query: str, top_k: int = 3) -> List[Dict]:
        """检索相关文档"""
        return self.retriever.search(query, top_k=top_k)

    def build_prompt(self, query: str, context_docs: List[Dict]) -> str:
        """构建 prompt"""
        # 组织上下文
        context = "\n\n".join([
            f"文档 {i+1}:\n{doc['document']}"
            for i, doc in enumerate(context_docs)
        ])

        # 构建 prompt
        prompt = f"""基于以下文档回答用户问题。如果文档中没有相关信息,请如实告知。

{context}

用户问题:{query}

回答:"""

        return prompt

    def generate(
        self,
        prompt: str,
        max_length: int = 200,
        temperature: float = 0.7
    ) -> str:
        """生成答案"""
        inputs = self.tokenizer(
            prompt,
            return_tensors="pt",
            truncation=True,
            max_length=1024
        ).to(self.device)

        with torch.no_grad():
            outputs = self.model.generate(
                inputs['input_ids'],
                max_length=len(inputs['input_ids'][0]) + max_length,
                temperature=temperature,
                top_p=0.9,
                do_sample=True,
                pad_token_id=self.tokenizer.eos_token_id
            )

        generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)

        # 提取答案部分(去掉 prompt)
        answer = generated_text[len(prompt):].strip()

        return answer

    def answer_question(self, query: str) -> Dict[str, Any]:
        """
        完整的问答流程

        Returns:
            {
                'query': 原始问题,
                'answer': 生成的答案,
                'retrieved_docs': 检索到的文档,
                'confidence': 置信度
            }
        """
        # 1. 检索
        retrieved_docs = self.retrieve(query, top_k=3)

        # 2. 构建 prompt
        prompt = self.build_prompt(query, retrieved_docs)

        # 3. 生成答案
        answer = self.generate(prompt)

        # 4. 计算置信度(基于检索分数)
        if retrieved_docs:
            confidence = 1 - retrieved_docs[0]['distance']
        else:
            confidence = 0.0

        return {
            'query': query,
            'answer': answer,
            'retrieved_docs': retrieved_docs,
            'confidence': confidence
        }

# 使用示例
# rag_system = RAGSystem(indexer)
# result = rag_system.answer_question("如何重置密码?")
# print(result['answer'])

第四阶段:API 服务部署

1. FastAPI 服务

python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List
import uvicorn
import time

app = FastAPI(
    title="智能客服 API",
    description="基于 RAG 的问答系统",
    version="1.0.0"
)

# 请求模型
class QuestionRequest(BaseModel):
    question: str
    top_k: Optional[int] = 3
    temperature: Optional[float] = 0.7

class QuestionResponse(BaseModel):
    answer: str
    confidence: float
    retrieved_docs: List[Dict]
    latency_ms: float

# 初始化系统(实际应用中应该在启动时加载)
# indexer = VectorIndexBuilder()
# rag_system = RAGSystem(indexer)

@app.get("/")
async def root():
    """健康检查"""
    return {"status": "healthy", "message": "智能客服 API 正在运行"}

@app.post("/ask", response_model=QuestionResponse)
async def ask_question(request: QuestionRequest):
    """
    问答接口

    Args:
        request: 包含问题的请求

    Returns:
        答案和相关信息
    """
    try:
        start_time = time.time()

        # 调用 RAG 系统
        # result = rag_system.answer_question(request.question)

        # 模拟响应(实际应该用上面的 result)
        result = {
            'answer': "这是一个模拟答案。",
            'confidence': 0.95,
            'retrieved_docs': []
        }

        latency_ms = (time.time() - start_time) * 1000

        return QuestionResponse(
            answer=result['answer'],
            confidence=result['confidence'],
            retrieved_docs=result['retrieved_docs'],
            latency_ms=latency_ms
        )

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/feedback")
async def submit_feedback(
    question: str,
    answer: str,
    rating: int,
    comment: Optional[str] = None
):
    """
    用户反馈接口

    用于收集用户对答案的评价,改进模型
    """
    # 存储反馈到数据库
    feedback_data = {
        "question": question,
        "answer": answer,
        "rating": rating,
        "comment": comment,
        "timestamp": time.time()
    }

    # 实际应该存储到数据库
    # db.save_feedback(feedback_data)

    return {"status": "success", "message": "感谢您的反馈!"}

# 运行服务
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

2. Dockerfile

dockerfile
# Dockerfile
FROM python:3.10-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY . .

# 下载模型(可选,也可以在运行时下载)
# RUN python -c "from transformers import AutoModel; AutoModel.from_pretrained('bert-base-chinese')"

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

3. Docker Compose

yaml
# docker-compose.yml
version: '3.8'

services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - MODEL_NAME=bert-base-chinese
      - CHROMA_PERSIST_DIR=/data/chroma_db
    volumes:
      - ./data:/data
    restart: unless-stopped

  prometheus:
    image: prom/prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'

  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - grafana-storage:/var/lib/grafana

volumes:
  grafana-storage:

第五阶段:监控和优化

1. 性能监控

python
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import Request
import time

# 定义指标
REQUEST_COUNT = Counter('api_requests_total', 'Total API requests', ['method', 'endpoint', 'status'])
REQUEST_LATENCY = Histogram('api_request_latency_seconds', 'Request latency', ['endpoint'])
ACTIVE_USERS = Gauge('active_users', 'Number of active users')
MODEL_CONFIDENCE = Histogram('model_confidence', 'Model confidence scores')

@app.middleware("http")
async def monitor_requests(request: Request, call_next):
    """请求监控中间件"""
    start_time = time.time()

    # 处理请求
    response = await call_next(request)

    # 记录指标
    latency = time.time() - start_time
    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()

    REQUEST_LATENCY.labels(endpoint=request.url.path).observe(latency)

    return response

@app.get("/metrics")
async def metrics():
    """Prometheus 指标端点"""
    return generate_latest()

2. 模型性能分析

python
import psutil
import GPUtil

class ModelProfiler:
    """
    模型性能分析器

    功能:
    - CPU/GPU 使用率
    - 内存占用
    - 推理延迟
    """

    @staticmethod
    def get_system_stats() -> Dict[str, Any]:
        """获取系统资源使用情况"""
        # CPU
        cpu_percent = psutil.cpu_percent(interval=1)

        # 内存
        memory = psutil.virtual_memory()
        memory_percent = memory.percent
        memory_used_gb = memory.used / (1024 ** 3)

        # GPU(如果可用)
        gpu_stats = []
        try:
            gpus = GPUtil.getGPUs()
            for gpu in gpus:
                gpu_stats.append({
                    'name': gpu.name,
                    'load': gpu.load * 100,
                    'memory_used_mb': gpu.memoryUsed,
                    'memory_total_mb': gpu.memoryTotal,
                    'temperature': gpu.temperature
                })
        except:
            pass

        return {
            'cpu_percent': cpu_percent,
            'memory_percent': memory_percent,
            'memory_used_gb': round(memory_used_gb, 2),
            'gpu_stats': gpu_stats
        }

    @staticmethod
    def profile_inference(model, input_data, num_runs: int = 100):
        """
        性能分析

        Returns:
            平均延迟、吞吐量等指标
        """
        import time

        latencies = []

        for _ in range(num_runs):
            start = time.time()
            _ = model(input_data)
            latency = time.time() - start
            latencies.append(latency)

        return {
            'mean_latency_ms': np.mean(latencies) * 1000,
            'p50_latency_ms': np.percentile(latencies, 50) * 1000,
            'p95_latency_ms': np.percentile(latencies, 95) * 1000,
            'p99_latency_ms': np.percentile(latencies, 99) * 1000,
            'throughput_qps': 1 / np.mean(latencies)
        }

# 使用示例
profiler = ModelProfiler()
stats = profiler.get_system_stats()
print(stats)

第六阶段:持续改进

1. A/B 测试框架

python
import random
from typing import Dict

class ABTestingFramework:
    """
    A/B 测试框架

    功能:
    - 流量分配
    - 指标收集
    - 统计分析
    """

    def __init__(self):
        self.experiments = {}

    def create_experiment(
        self,
        experiment_id: str,
        variants: Dict[str, float]
    ):
        """
        创建实验

        Args:
            experiment_id: 实验 ID
            variants: 变体及其流量比例 {'A': 0.5, 'B': 0.5}
        """
        self.experiments[experiment_id] = {
            'variants': variants,
            'results': {variant: [] for variant in variants}
        }

    def assign_variant(self, experiment_id: str, user_id: str) -> str:
        """为用户分配变体"""
        experiment = self.experiments[experiment_id]
        variants = list(experiment['variants'].keys())
        weights = list(experiment['variants'].values())

        # 基于用户 ID 的一致性哈希
        random.seed(hash(user_id))
        variant = random.choices(variants, weights=weights)[0]

        return variant

    def record_result(
        self,
        experiment_id: str,
        variant: str,
        metric_value: float
    ):
        """记录实验结果"""
        self.experiments[experiment_id]['results'][variant].append(metric_value)

    def analyze_results(self, experiment_id: str) -> Dict:
        """分析实验结果"""
        from scipy import stats

        experiment = self.experiments[experiment_id]
        results = experiment['results']

        # 计算统计量
        analysis = {}
        for variant, values in results.items():
            if len(values) > 0:
                analysis[variant] = {
                    'mean': np.mean(values),
                    'std': np.std(values),
                    'count': len(values)
                }

        # t 检验(如果有两个变体)
        if len(results) == 2:
            variant_names = list(results.keys())
            values_a = results[variant_names[0]]
            values_b = results[variant_names[1]]

            if len(values_a) > 0 and len(values_b) > 0:
                t_stat, p_value = stats.ttest_ind(values_a, values_b)
                analysis['t_test'] = {
                    't_statistic': t_stat,
                    'p_value': p_value,
                    'significant': p_value < 0.05
                }

        return analysis

# 使用示例
ab_test = ABTestingFramework()
ab_test.create_experiment('model_v2', {'v1': 0.5, 'v2': 0.5})

# 在实际请求中使用
user_id = "user_123"
variant = ab_test.assign_variant('model_v2', user_id)
print(f"用户 {user_id} 被分配到变体:{variant}")

# 记录结果
ab_test.record_result('model_v2', variant, 0.95)  # 满意度分数

# 分析结果
# results = ab_test.analyze_results('model_v2')
# print(results)

2. 模型更新策略

python
class ModelVersionManager:
    """
    模型版本管理器

    功能:
    - 版本切换
    - 灰度发布
    - 回滚机制
    """

    def __init__(self):
        self.models = {}
        self.active_version = None

    def register_model(self, version: str, model):
        """注册模型版本"""
        self.models[version] = {
            'model': model,
            'deployed_at': time.time(),
            'request_count': 0,
            'error_count': 0
        }

    def set_active_version(self, version: str):
        """设置活跃版本"""
        if version not in self.models:
            raise ValueError(f"模型版本 {version} 不存在")
        self.active_version = version

    def get_model(self, version: str = None):
        """获取模型(可指定版本)"""
        if version is None:
            version = self.active_version

        if version not in self.models:
            raise ValueError(f"模型版本 {version} 不存在")

        self.models[version]['request_count'] += 1
        return self.models[version]['model']

    def canary_deployment(
        self,
        new_version: str,
        canary_percent: float = 0.1
    ):
        """
        金丝雀部署

        Args:
            new_version: 新版本
            canary_percent: 流量比例
        """
        if random.random() < canary_percent:
            return self.get_model(new_version)
        else:
            return self.get_model(self.active_version)

# 使用示例
version_manager = ModelVersionManager()
# version_manager.register_model('v1.0', model_v1)
# version_manager.register_model('v1.1', model_v2)
# version_manager.set_active_version('v1.0')

# 金丝雀部署
# model = version_manager.canary_deployment('v1.1', canary_percent=0.1)

小结

在本节中,我们构建了一个完整的端到端 AI 应用:

数据 Pipeline

  • 数据采集和清洗
  • 向量索引构建
  • 增量更新机制

模型训练

  • 实验跟踪(MLflow)
  • 模型微调
  • 超参数优化

RAG 系统

  • 语义检索
  • 上下文构建
  • 答案生成

API 服务

  • FastAPI 后端
  • Docker 容器化
  • 负载均衡

监控运维

  • Prometheus 指标
  • 性能分析
  • 日志记录

持续改进

  • A/B 测试
  • 模型更新
  • 反馈循环

练习题

基础题

  1. 部署一个简单的 FastAPI 服务,实现文本分类接口
  2. 使用 Docker 容器化你的应用
  3. 添加基本的日志记录

进阶题

  1. 实现一个完整的 RAG 系统,包括检索和生成
  2. 添加 Prometheus 监控和 Grafana 可视化
  3. 实现模型版本管理和灰度发布

挑战题

  1. 构建一个支持多轮对话的聊天机器人
  2. 实现分布式推理(多 GPU/多机器)
  3. 设计并实现一个完整的 A/B 测试系统

下一节:10.8 本章小结和复习

最后一节将总结本章所有内容,提供综合练习和学习路径建议!

基于 MIT 许可证发布。内容版权归作者所有。