Skip to content

7.2 HTTP 请求与 API 调用

🎯 小白理解:什么是 HTTP 请求和 API?

想象你去餐厅吃饭:

你(客户端)          服务员(API)           厨房(服务器)
     │                    │                     │
     │──── 点菜单 ────────→│                     │
     │    (HTTP 请求)      │──── 转达订单 ────→│
     │                    │                     │
     │                    │←──── 做好的菜 ─────│
     │←──── 上菜 ─────────│                     │
          (HTTP 响应)

关键概念

概念餐厅比喻编程解释
API服务员程序之间通信的接口
HTTP 请求你的点菜单你发给服务器的指令
HTTP 响应上的菜服务器返回的数据
GET 请求问"有什么菜?"获取数据
POST 请求"我要点这些菜"发送数据

OpenAI API 就是这样:你发请求说"请回答这个问题",它返回 AI 的回答。

requests 库基础

🎯 小白理解requests 是 Python 最流行的 HTTP 库,用来"发送请求、接收响应"。

安装

bash
pip install requests

基本请求

🎯 小白理解:HTTP 请求的四种常用方法

方法用途类比
GET获取数据查看菜单
POST发送数据下订单
PUT更新数据修改订单
DELETE删除数据取消订单
python
import requests
from typing import Dict, Any

# GET 请求
response = requests.get("https://api.github.com/users/python")
print(response.status_code)  # 200
print(response.json())  # 解析 JSON 响应

# POST 请求
payload = {"name": "Agent", "status": "active"}
response = requests.post(
    "https://api.example.com/agents",
    json=payload,
    headers={"Content-Type": "application/json"}
)

# 带参数的 GET 请求
params = {"q": "python", "per_page": 10}
response = requests.get("https://api.github.com/search/repositories", params=params)

# 带认证的请求
headers = {"Authorization": "Bearer YOUR_TOKEN"}
response = requests.get("https://api.example.com/data", headers=headers)

OpenAI API 调用

🎯 小白理解:调用 OpenAI API 的流程

1. 准备请求
   ├── API Key(身份证明)
   ├── 模型名称(gpt-4)
   └── 消息内容(你的问题)

2. 发送 POST 请求到 OpenAI 服务器

3. 接收响应
   └── AI 的回答在 response["choices"][0]["message"]["content"]

消息格式

python
messages = [
    {"role": "system", "content": "你是一个助手"},  # 系统设定
    {"role": "user", "content": "你好"},           # 用户说的话
    {"role": "assistant", "content": "你好!"}     # AI 的回复
]

这就是 ChatGPT 的工作原理!

python
import os
import requests
from typing import List, Dict, Any, Optional

class OpenAIClient:
    """OpenAI API 客户端"""
    
    def __init__(self, api_key: Optional[str] = None):
        self.api_key = api_key or os.getenv("OPENAI_API_KEY")
        self.base_url = "https://api.openai.com/v1"
        
        if not self.api_key:
            raise ValueError("API Key 未配置")
    
    def _headers(self) -> Dict[str, str]:
        """生成请求头"""
        return {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
    
    def chat_completion(
        self,
        messages: List[Dict[str, str]],
        model: str = "gpt-4",
        temperature: float = 0.7,
        max_tokens: int = 2000
    ) -> Dict[str, Any]:
        """调用 Chat Completion API"""
        url = f"{self.base_url}/chat/completions"
        
        payload = {
            "model": model,
            "messages": messages,
            "temperature": temperature,
            "max_tokens": max_tokens
        }
        
        response = requests.post(
            url,
            headers=self._headers(),
            json=payload,
            timeout=30
        )
        
        response.raise_for_status()  # 抛出 HTTP 错误
        return response.json()
    
    def get_models(self) -> List[str]:
        """获取可用模型列表"""
        url = f"{self.base_url}/models"
        
        response = requests.get(
            url,
            headers=self._headers(),
            timeout=10
        )
        
        response.raise_for_status()
        data = response.json()
        
        return [model["id"] for model in data["data"]]

# 使用示例
client = OpenAIClient()

# 发送消息
messages = [
    {"role": "system", "content": "你是一个有用的助手。"},
    {"role": "user", "content": "解释什么是 API"}
]

result = client.chat_completion(messages)
print(result["choices"][0]["message"]["content"])

# 获取模型列表
models = client.get_models()
print(f"可用模型: {len(models)} 个")

异步 HTTP 请求(httpx)

🎯 小白理解:为什么需要"异步"请求?

同步请求(一个一个来):

请求1 ──等待──→ 响应1
                  请求2 ──等待──→ 响应2
                                    请求3 ──等待──→ 响应3
总时间: 3秒 + 3秒 + 3秒 = 9秒

异步请求(同时发出):

请求1 ──等待──→ 响应1
请求2 ──等待──→ 响应2  (同时进行!)
请求3 ──等待──→ 响应3
总时间: 约 3秒

类比:同步 = 一个服务员,一次只能服务一桌;异步 = 一个服务员,可以同时记录多桌的订单

什么时候用异步?

  • 需要同时调用多个 API
  • 需要同时处理多个用户的请求
  • 需要提高程序响应速度

安装

bash
pip install httpx

异步 API 客户端

python
import asyncio
import httpx
from typing import List, Dict, Any
import os

class AsyncOpenAIClient:
    """异步 OpenAI API 客户端"""
    
    def __init__(self, api_key: str = None):
        self.api_key = api_key or os.getenv("OPENAI_API_KEY")
        self.base_url = "https://api.openai.com/v1"
        self.client = httpx.AsyncClient(
            headers={
                "Authorization": f"Bearer {self.api_key}",
                "Content-Type": "application/json"
            },
            timeout=30.0
        )
    
    async def chat_completion(
        self,
        messages: List[Dict[str, str]],
        model: str = "gpt-4",
        temperature: float = 0.7
    ) -> Dict[str, Any]:
        """异步调用 Chat API"""
        url = f"{self.base_url}/chat/completions"
        
        payload = {
            "model": model,
            "messages": messages,
            "temperature": temperature
        }
        
        response = await self.client.post(url, json=payload)
        response.raise_for_status()
        return response.json()
    
    async def batch_completion(
        self,
        queries: List[str],
        system_prompt: str = "你是一个有用的助手。"
    ) -> List[str]:
        """批量处理查询"""
        tasks = []
        
        for query in queries:
            messages = [
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": query}
            ]
            tasks.append(self.chat_completion(messages))
        
        results = await asyncio.gather(*tasks)
        
        return [
            result["choices"][0]["message"]["content"]
            for result in results
        ]
    
    async def close(self):
        """关闭客户端"""
        await self.client.aclose()

# 使用示例
async def demo_async_client():
    client = AsyncOpenAIClient()
    
    try:
        # 并发处理多个查询
        queries = [
            "什么是 Python?",
            "什么是异步编程?",
            "什么是 LangChain?"
        ]
        
        import time
        start = time.time()
        
        results = await client.batch_completion(queries)
        
        end = time.time()
        
        print(f"处理 {len(queries)} 个查询,耗时: {end - start:.2f}\n")
        
        for query, result in zip(queries, results):
            print(f"Q: {query}")
            print(f"A: {result[:100]}...\n")
    
    finally:
        await client.close()

# 运行
# asyncio.run(demo_async_client())

错误处理与重试

🎯 小白理解:为什么 API 调用需要"重试"?

网络请求可能失败,原因有很多:

错误类型含义应对策略
401身份验证失败检查 API Key
429请求太频繁等一会儿再试
500服务器内部错误过会儿重试
Timeout请求超时重试或增加超时时间

指数退避策略

第1次失败 → 等 1 秒 → 重试
第2次失败 → 等 2 秒 → 重试
第3次失败 → 等 4 秒 → 重试
第4次失败 → 放弃,报错

等待时间翻倍,避免一直"骚扰"已经很忙的服务器。

python
import requests
import time
from typing import Dict, Any, Optional, Callable
from functools import wraps

class APIError(Exception):
    """API 错误基类"""
    pass

class RateLimitError(APIError):
    """速率限制错误"""
    pass

class AuthenticationError(APIError):
    """认证错误"""
    pass

def retry_on_error(
    max_retries: int = 3,
    initial_delay: float = 1.0,
    backoff_factor: float = 2.0,
    exceptions: tuple = (requests.RequestException,)
):
    """重试装饰器"""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs):
            delay = initial_delay
            last_exception = None
            
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    
                    if attempt < max_retries - 1:
                        print(f"重试 {attempt + 1}/{max_retries}: {e}")
                        time.sleep(delay)
                        delay *= backoff_factor
            
            raise last_exception
        
        return wrapper
    return decorator

class RobustAPIClient:
    """健壮的 API 客户端"""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.openai.com/v1"
    
    def _handle_response(self, response: requests.Response) -> Dict[str, Any]:
        """处理响应和错误"""
        if response.status_code == 200:
            return response.json()
        
        elif response.status_code == 401:
            raise AuthenticationError("API Key 无效")
        
        elif response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 60))
            raise RateLimitError(f"速率限制,请等待 {retry_after} 秒")
        
        elif response.status_code >= 500:
            raise APIError(f"服务器错误: {response.status_code}")
        
        else:
            raise APIError(f"请求失败: {response.status_code} - {response.text}")
    
    @retry_on_error(max_retries=3, exceptions=(requests.RequestException, RateLimitError))
    def chat_completion(
        self,
        messages: List[Dict[str, str]],
        model: str = "gpt-4"
    ) -> Dict[str, Any]:
        """调用 Chat API(带重试)"""
        url = f"{self.base_url}/chat/completions"
        
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "model": model,
            "messages": messages
        }
        
        try:
            response = requests.post(
                url,
                headers=headers,
                json=payload,
                timeout=30
            )
            
            return self._handle_response(response)
        
        except requests.Timeout:
            raise APIError("请求超时")
        
        except requests.ConnectionError:
            raise APIError("连接失败")

# 使用示例
try:
    client = RobustAPIClient(api_key=os.getenv("OPENAI_API_KEY"))
    
    messages = [
        {"role": "user", "content": "Hello"}
    ]
    
    result = client.chat_completion(messages)
    print(result)

except AuthenticationError as e:
    print(f"认证错误: {e}")

except RateLimitError as e:
    print(f"速率限制: {e}")

except APIError as e:
    print(f"API 错误: {e}")

流式响应处理

🎯 小白理解:什么是"流式响应"?

普通响应:等 AI 写完整篇文章,一次性返回

用户提问 ──────────────────────→ [等待 10 秒] ──→ 一次性显示全部回答

流式响应:AI 边写边返回,像打字一样逐字显示

用户提问 → "你" → "好" → "," → "我" → "是" → "AI" → ...

流式响应的好处

  1. 体验更好:用户不用干等,能看到 AI 在"思考"
  2. 感觉更快:第一个字很快就出现了
  3. ChatGPT 就是这样做的!

技术原理:使用 SSE(Server-Sent Events),服务器持续发送小块数据。

python
import requests
import json
from typing import Iterator, Dict, Any

class StreamingAPIClient:
    """流式 API 客户端"""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.openai.com/v1"
    
    def chat_completion_stream(
        self,
        messages: List[Dict[str, str]],
        model: str = "gpt-4"
    ) -> Iterator[str]:
        """流式调用 Chat API"""
        url = f"{self.base_url}/chat/completions"
        
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "model": model,
            "messages": messages,
            "stream": True  # 启用流式响应
        }
        
        with requests.post(
            url,
            headers=headers,
            json=payload,
            stream=True,
            timeout=30
        ) as response:
            response.raise_for_status()
            
            for line in response.iter_lines():
                if not line:
                    continue
                
                # 解码并解析 SSE 格式
                line = line.decode('utf-8')
                
                if line.startswith("data: "):
                    data = line[6:]  # 去掉 "data: " 前缀
                    
                    if data == "[DONE]":
                        break
                    
                    try:
                        chunk = json.loads(data)
                        delta = chunk["choices"][0]["delta"]
                        
                        if "content" in delta:
                            yield delta["content"]
                    
                    except json.JSONDecodeError:
                        continue

# 使用示例
client = StreamingAPIClient(api_key=os.getenv("OPENAI_API_KEY"))

messages = [
    {"role": "user", "content": "写一首关于 Python 的诗"}
]

print("Agent: ", end="", flush=True)

for chunk in client.chat_completion_stream(messages):
    print(chunk, end="", flush=True)

print()

API 速率限制管理

python
import time
from collections import deque
from typing import Callable, Any
from functools import wraps

class RateLimiter:
    """速率限制器"""
    
    def __init__(self, max_calls: int, period: float):
        """
        Args:
            max_calls: 时间窗口内最大调用次数
            period: 时间窗口(秒)
        """
        self.max_calls = max_calls
        self.period = period
        self.calls = deque()
    
    def __call__(self, func: Callable) -> Callable:
        """装饰器"""
        @wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            now = time.time()
            
            # 清理过期的调用记录
            while self.calls and self.calls[0] < now - self.period:
                self.calls.popleft()
            
            # 检查是否超过限制
            if len(self.calls) >= self.max_calls:
                sleep_time = self.period - (now - self.calls[0])
                if sleep_time > 0:
                    print(f"速率限制,等待 {sleep_time:.2f} 秒...")
                    time.sleep(sleep_time)
                    # 重新清理
                    now = time.time()
                    while self.calls and self.calls[0] < now - self.period:
                        self.calls.popleft()
            
            # 记录调用
            self.calls.append(now)
            
            return func(*args, **kwargs)
        
        return wrapper

# 使用示例
class APIClientWithRateLimit:
    """带速率限制的 API 客户端"""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        # 限制: 每分钟最多 20 次调用
        self.rate_limiter = RateLimiter(max_calls=20, period=60)
    
    @property
    def chat_completion(self):
        """返回带速率限制的方法"""
        return self.rate_limiter(self._chat_completion)
    
    def _chat_completion(self, messages: List[Dict[str, str]]) -> Dict[str, Any]:
        """实际的 API 调用"""
        print(f"调用 API: {time.time():.2f}")
        # 实际 API 调用逻辑
        return {"status": "success"}

# 测试
client = APIClientWithRateLimit(api_key="test")

# 快速连续调用
for i in range(25):
    result = client.chat_completion([{"role": "user", "content": f"消息 {i}"}])

关键要点

  1. 错误处理:使用 try-except 处理所有 HTTP 错误
  2. 重试机制:实现指数退避策略
  3. 超时设置:始终设置合理的超时时间
  4. 速率限制:遵守 API 提供商的速率限制
  5. 流式响应:对于长文本生成,使用流式 API

下一节:7.3 实战:多 API 集成系统

基于 MIT 许可证发布。内容版权归作者所有。