7.3 实战:多 API 集成系统
🎯 小白理解:为什么需要"多 API 集成"?
现实中,你可能需要同时使用多个 AI 服务:
场景 解决方案 OpenAI 太贵 有些任务用便宜的模型 OpenAI 挂了 自动切换到 Anthropic 需要对比 同时问多个 AI,看谁答得好 本节的架构:
┌─────────────────────────────┐ │ MultiLLMManager │ │ (多 API 管理器) │ └──────────┬──────────────────┘ │ ┌───────────────────┼───────────────────┐ │ │ │ ▼ ▼ ▼ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ OpenAI │ │ Anthropic │ │ Google │ │ Client │ │ Client │ │ Client │ └─────────────┘ └─────────────┘ └─────────────┘核心思想:统一接口 + 智能路由 + 自动降级
统一 API 接口设计
🎯 小白理解:什么是"统一接口"?
不同的 AI 公司,API 格式都不一样:
- OpenAI:
response["choices"][0]["message"]["content"]- Anthropic:
response["content"][0]["text"]每次换 API 都要改代码?太麻烦!
统一接口的做法:
python# 不管用哪个 API,调用方式都一样 response = client.chat_completion(messages) print(response.content) # 统一的格式!这就是"面向接口编程"——把不同的实现藏在统一的接口后面。
python
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
class APIProvider(Enum):
"""API 提供商"""
OPENAI = "openai"
ANTHROPIC = "anthropic"
GOOGLE = "google"
DEEPSEEK = "deepseek"
@dataclass
class Message:
"""统一消息格式"""
role: str # system, user, assistant
content: str
@dataclass
class CompletionResponse:
"""统一响应格式"""
content: str
model: str
provider: APIProvider
usage: Dict[str, int]
raw_response: Dict[str, Any]
class BaseLLMClient(ABC):
"""LLM 客户端基类"""
def __init__(self, api_key: str, model: str):
self.api_key = api_key
self.model = model
@abstractmethod
def chat_completion(
self,
messages: List[Message],
temperature: float = 0.7,
max_tokens: int = 2000
) -> CompletionResponse:
"""聊天补全接口"""
pass
@abstractmethod
async def achat_completion(
self,
messages: List[Message],
temperature: float = 0.7,
max_tokens: int = 2000
) -> CompletionResponse:
"""异步聊天补全接口"""
passOpenAI 客户端实现
python
import requests
import httpx
from typing import List, Dict, Any
class OpenAIClient(BaseLLMClient):
"""OpenAI API 客户端"""
def __init__(self, api_key: str, model: str = "gpt-4"):
super().__init__(api_key, model)
self.base_url = "https://api.openai.com/v1"
def _headers(self) -> Dict[str, str]:
return {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
def _convert_messages(self, messages: List[Message]) -> List[Dict[str, str]]:
"""转换为 OpenAI 格式"""
return [
{"role": msg.role, "content": msg.content}
for msg in messages
]
def chat_completion(
self,
messages: List[Message],
temperature: float = 0.7,
max_tokens: int = 2000
) -> CompletionResponse:
"""同步调用"""
url = f"{self.base_url}/chat/completions"
payload = {
"model": self.model,
"messages": self._convert_messages(messages),
"temperature": temperature,
"max_tokens": max_tokens
}
response = requests.post(
url,
headers=self._headers(),
json=payload,
timeout=30
)
response.raise_for_status()
data = response.json()
return CompletionResponse(
content=data["choices"][0]["message"]["content"],
model=data["model"],
provider=APIProvider.OPENAI,
usage=data["usage"],
raw_response=data
)
async def achat_completion(
self,
messages: List[Message],
temperature: float = 0.7,
max_tokens: int = 2000
) -> CompletionResponse:
"""异步调用"""
url = f"{self.base_url}/chat/completions"
payload = {
"model": self.model,
"messages": self._convert_messages(messages),
"temperature": temperature,
"max_tokens": max_tokens
}
async with httpx.AsyncClient() as client:
response = await client.post(
url,
headers=self._headers(),
json=payload,
timeout=30
)
response.raise_for_status()
data = response.json()
return CompletionResponse(
content=data["choices"][0]["message"]["content"],
model=data["model"],
provider=APIProvider.OPENAI,
usage=data["usage"],
raw_response=data
)Anthropic 客户端实现
python
class AnthropicClient(BaseLLMClient):
"""Anthropic API 客户端"""
def __init__(self, api_key: str, model: str = "claude-3-5-sonnet-20241022"):
super().__init__(api_key, model)
self.base_url = "https://api.anthropic.com/v1"
def _headers(self) -> Dict[str, str]:
return {
"x-api-key": self.api_key,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json"
}
def _convert_messages(self, messages: List[Message]) -> tuple:
"""转换为 Anthropic 格式(分离 system 和 messages)"""
system = None
converted_messages = []
for msg in messages:
if msg.role == "system":
system = msg.content
else:
converted_messages.append({
"role": msg.role,
"content": msg.content
})
return system, converted_messages
def chat_completion(
self,
messages: List[Message],
temperature: float = 0.7,
max_tokens: int = 2000
) -> CompletionResponse:
"""同步调用"""
url = f"{self.base_url}/messages"
system, converted_messages = self._convert_messages(messages)
payload = {
"model": self.model,
"messages": converted_messages,
"max_tokens": max_tokens,
"temperature": temperature
}
if system:
payload["system"] = system
response = requests.post(
url,
headers=self._headers(),
json=payload,
timeout=30
)
response.raise_for_status()
data = response.json()
return CompletionResponse(
content=data["content"][0]["text"],
model=data["model"],
provider=APIProvider.ANTHROPIC,
usage={
"prompt_tokens": data["usage"]["input_tokens"],
"completion_tokens": data["usage"]["output_tokens"],
"total_tokens": data["usage"]["input_tokens"] + data["usage"]["output_tokens"]
},
raw_response=data
)
async def achat_completion(
self,
messages: List[Message],
temperature: float = 0.7,
max_tokens: int = 2000
) -> CompletionResponse:
"""异步调用"""
url = f"{self.base_url}/messages"
system, converted_messages = self._convert_messages(messages)
payload = {
"model": self.model,
"messages": converted_messages,
"max_tokens": max_tokens,
"temperature": temperature
}
if system:
payload["system"] = system
async with httpx.AsyncClient() as client:
response = await client.post(
url,
headers=self._headers(),
json=payload,
timeout=30
)
response.raise_for_status()
data = response.json()
return CompletionResponse(
content=data["content"][0]["text"],
model=data["model"],
provider=APIProvider.ANTHROPIC,
usage={
"prompt_tokens": data["usage"]["input_tokens"],
"completion_tokens": data["usage"]["output_tokens"],
"total_tokens": data["usage"]["input_tokens"] + data["usage"]["output_tokens"]
},
raw_response=data
)多 API 管理器
python
import os
from typing import Optional, Dict, List
import asyncio
class MultiLLMManager:
"""多 LLM 管理器"""
def __init__(self):
self.clients: Dict[APIProvider, BaseLLMClient] = {}
self._initialize_clients()
def _initialize_clients(self):
"""初始化所有可用的客户端"""
# OpenAI
if openai_key := os.getenv("OPENAI_API_KEY"):
self.clients[APIProvider.OPENAI] = OpenAIClient(
api_key=openai_key,
model=os.getenv("OPENAI_MODEL", "gpt-4")
)
# Anthropic
if anthropic_key := os.getenv("ANTHROPIC_API_KEY"):
self.clients[APIProvider.ANTHROPIC] = AnthropicClient(
api_key=anthropic_key,
model=os.getenv("ANTHROPIC_MODEL", "claude-3-5-sonnet-20241022")
)
def get_client(self, provider: APIProvider) -> Optional[BaseLLMClient]:
"""获取指定的客户端"""
return self.clients.get(provider)
def list_providers(self) -> List[APIProvider]:
"""列出所有可用的提供商"""
return list(self.clients.keys())
def chat_completion(
self,
messages: List[Message],
provider: APIProvider = APIProvider.OPENAI,
temperature: float = 0.7,
max_tokens: int = 2000
) -> CompletionResponse:
"""使用指定提供商完成聊天"""
client = self.get_client(provider)
if not client:
raise ValueError(f"提供商 {provider.value} 未配置")
return client.chat_completion(messages, temperature, max_tokens)
async def achat_completion(
self,
messages: List[Message],
provider: APIProvider = APIProvider.OPENAI,
temperature: float = 0.7,
max_tokens: int = 2000
) -> CompletionResponse:
"""异步聊天补全"""
client = self.get_client(provider)
if not client:
raise ValueError(f"提供商 {provider.value} 未配置")
return await client.achat_completion(messages, temperature, max_tokens)
async def parallel_completion(
self,
messages: List[Message],
providers: Optional[List[APIProvider]] = None,
temperature: float = 0.7,
max_tokens: int = 2000
) -> Dict[APIProvider, CompletionResponse]:
"""并发调用多个提供商"""
if providers is None:
providers = self.list_providers()
tasks = []
valid_providers = []
for provider in providers:
if provider in self.clients:
tasks.append(
self.achat_completion(messages, provider, temperature, max_tokens)
)
valid_providers.append(provider)
results = await asyncio.gather(*tasks, return_exceptions=True)
return {
provider: result
for provider, result in zip(valid_providers, results)
if not isinstance(result, Exception)
}
# 使用示例
manager = MultiLLMManager()
# 列出可用提供商
print(f"可用提供商: {[p.value for p in manager.list_providers()]}")
# 使用 OpenAI
messages = [
Message(role="system", content="你是一个有用的助手。"),
Message(role="user", content="解释什么是 API")
]
try:
response = manager.chat_completion(messages, provider=APIProvider.OPENAI)
print(f"\n{response.provider.value} 响应:")
print(response.content)
print(f"Token 使用: {response.usage}")
except Exception as e:
print(f"错误: {e}")
# 异步并发调用
async def demo_parallel():
responses = await manager.parallel_completion(messages)
for provider, response in responses.items():
print(f"\n{provider.value} 响应:")
print(response.content[:200] + "...")
print(f"Token: {response.usage}")
# asyncio.run(demo_parallel())智能路由系统
🎯 小白理解:什么是"智能路由"?
想象你是外卖平台,要把订单分配给骑手:
- 笨方法:永远只用第一个骑手(可能他已经送不过来了)
- 智能方法:看谁离得近、谁评分高、谁还有空
智能路由做了什么?
1. 统计每个 API 的表现(成功率、速度、成本) 2. 自动选择最优的 API 3. 如果失败了,自动切换到备用 API(降级)关键指标:
指标 含义 错误率 调用失败的比例 平均延迟 响应速度 总 Token 消耗了多少资源 通过这些指标,自动选择"又快又稳"的 API!
python
from typing import Optional, List
import time
class SmartRouter:
"""智能 API 路由器"""
def __init__(self, manager: MultiLLMManager):
self.manager = manager
self.usage_stats: Dict[APIProvider, Dict[str, Any]] = {
provider: {
"calls": 0,
"errors": 0,
"total_tokens": 0,
"avg_latency": 0.0
}
for provider in APIProvider
}
def _update_stats(
self,
provider: APIProvider,
success: bool,
tokens: int = 0,
latency: float = 0.0
):
"""更新统计信息"""
stats = self.usage_stats[provider]
stats["calls"] += 1
if not success:
stats["errors"] += 1
else:
stats["total_tokens"] += tokens
# 计算移动平均延迟
if stats["avg_latency"] == 0:
stats["avg_latency"] = latency
else:
stats["avg_latency"] = (stats["avg_latency"] * 0.9 + latency * 0.1)
def select_provider(
self,
preferred: Optional[APIProvider] = None,
fallback: bool = True
) -> APIProvider:
"""智能选择提供商"""
available_providers = self.manager.list_providers()
if not available_providers:
raise ValueError("没有可用的 API 提供商")
# 如果指定了首选提供商且可用
if preferred and preferred in available_providers:
return preferred
# 否则选择错误率最低的提供商
best_provider = None
best_score = float('inf')
for provider in available_providers:
stats = self.usage_stats[provider]
if stats["calls"] == 0:
# 新提供商,优先尝试
return provider
# 计算分数(错误率 + 延迟)
error_rate = stats["errors"] / stats["calls"]
score = error_rate * 100 + stats["avg_latency"]
if score < best_score:
best_score = score
best_provider = provider
return best_provider or available_providers[0]
async def chat_completion_with_fallback(
self,
messages: List[Message],
preferred: Optional[APIProvider] = None,
temperature: float = 0.7,
max_tokens: int = 2000
) -> CompletionResponse:
"""带降级的聊天补全"""
providers = self.manager.list_providers()
# 按优先级排序
if preferred and preferred in providers:
providers.remove(preferred)
providers.insert(0, preferred)
last_error = None
for provider in providers:
try:
start = time.time()
response = await self.manager.achat_completion(
messages,
provider=provider,
temperature=temperature,
max_tokens=max_tokens
)
latency = time.time() - start
# 更新统计
self._update_stats(
provider,
success=True,
tokens=response.usage.get("total_tokens", 0),
latency=latency
)
return response
except Exception as e:
print(f"{provider.value} 失败: {e},尝试降级...")
self._update_stats(provider, success=False)
last_error = e
continue
# 所有提供商都失败
raise Exception(f"所有 API 提供商都失败了。最后错误: {last_error}")
def get_stats(self) -> Dict[str, Any]:
"""获取统计报告"""
return {
provider.value: stats
for provider, stats in self.usage_stats.items()
if stats["calls"] > 0
}
# 使用示例
async def demo_smart_router():
manager = MultiLLMManager()
router = SmartRouter(manager)
messages = [
Message(role="user", content="你好,介绍一下自己")
]
# 使用智能路由
for i in range(5):
try:
response = await router.chat_completion_with_fallback(
messages,
preferred=APIProvider.OPENAI
)
print(f"\n请求 {i+1}:")
print(f"提供商: {response.provider.value}")
print(f"响应: {response.content[:100]}...")
except Exception as e:
print(f"请求失败: {e}")
# 打印统计
print("\n=== 统计报告 ===")
for provider, stats in router.get_stats().items():
print(f"\n{provider}:")
print(f" 调用次数: {stats['calls']}")
print(f" 错误次数: {stats['errors']}")
print(f" 平均延迟: {stats['avg_latency']:.2f}秒")
print(f" 总 Token: {stats['total_tokens']}")
# asyncio.run(demo_smart_router())成本管理
🎯 小白理解:为什么要管理 API 成本?
AI API 是按量付费的,用多少付多少:
模型 输入价格 (每1000 tokens) 输出价格 GPT-4 $0.03 $0.06 GPT-3.5 $0.001 $0.002 Claude 3.5 $0.003 $0.015 不管理成本会发生什么?
一个 bug 导致无限循环调用 API ↓ 一晚上花掉几百美元 ↓ 收到信用卡账单时崩溃成本追踪器能做什么?
- 记录每次调用的花费
- 统计每日/每月总成本
- 设置预算上限,超了就报警
python
from dataclasses import dataclass
from typing import Dict
from datetime import datetime, timedelta
@dataclass
class PricingConfig:
"""定价配置"""
input_price_per_1k: float # 输入 token 价格(每 1000 tokens)
output_price_per_1k: float # 输出 token 价格
# 各提供商定价(示例)
PRICING = {
APIProvider.OPENAI: {
"gpt-4": PricingConfig(input_price_per_1k=0.03, output_price_per_1k=0.06),
"gpt-3.5-turbo": PricingConfig(input_price_per_1k=0.001, output_price_per_1k=0.002),
},
APIProvider.ANTHROPIC: {
"claude-3-5-sonnet-20241022": PricingConfig(input_price_per_1k=0.003, output_price_per_1k=0.015),
}
}
class CostTracker:
"""成本追踪器"""
def __init__(self):
self.daily_costs: Dict[str, float] = {}
self.monthly_limit: float = 100.0 # 每月预算(美元)
def calculate_cost(
self,
provider: APIProvider,
model: str,
input_tokens: int,
output_tokens: int
) -> float:
"""计算单次调用成本"""
pricing = PRICING.get(provider, {}).get(model)
if not pricing:
return 0.0
input_cost = (input_tokens / 1000) * pricing.input_price_per_1k
output_cost = (output_tokens / 1000) * pricing.output_price_per_1k
return input_cost + output_cost
def track_usage(
self,
provider: APIProvider,
model: str,
usage: Dict[str, int]
) -> float:
"""追踪使用情况"""
cost = self.calculate_cost(
provider,
model,
usage.get("prompt_tokens", 0),
usage.get("completion_tokens", 0)
)
today = datetime.now().strftime("%Y-%m-%d")
self.daily_costs[today] = self.daily_costs.get(today, 0.0) + cost
return cost
def get_monthly_cost(self) -> float:
"""获取本月总成本"""
now = datetime.now()
month_start = now.replace(day=1).strftime("%Y-%m-%d")
return sum(
cost for date, cost in self.daily_costs.items()
if date >= month_start
)
def check_budget(self) -> Dict[str, Any]:
"""检查预算状态"""
monthly_cost = self.get_monthly_cost()
remaining = self.monthly_limit - monthly_cost
usage_percent = (monthly_cost / self.monthly_limit) * 100
return {
"monthly_cost": monthly_cost,
"monthly_limit": self.monthly_limit,
"remaining": remaining,
"usage_percent": usage_percent,
"alert": usage_percent > 80
}
# 使用示例
tracker = CostTracker()
# 追踪一次调用
usage = {"prompt_tokens": 100, "completion_tokens": 500}
cost = tracker.track_usage(APIProvider.OPENAI, "gpt-4", usage)
print(f"本次调用成本: ${cost:.4f}")
# 检查预算
budget_status = tracker.check_budget()
print(f"\n本月成本: ${budget_status['monthly_cost']:.2f}")
print(f"预算剩余: ${budget_status['remaining']:.2f}")
print(f"使用率: {budget_status['usage_percent']:.1f}%")
if budget_status['alert']:
print("⚠️ 警告: 预算使用已超过 80%!")关键要点
- 统一接口:使用抽象基类统一不同提供商的 API
- 智能路由:根据性能和可用性自动选择最佳提供商
- 降级策略:主提供商失败时自动切换到备用提供商
- 成本管理:追踪和控制 API 使用成本
- 统计监控:跟踪调用成功率、延迟等关键指标
下一节:7.4 小结和复习