10.3 机器学习实战:Scikit-Learn 完全指南
从数据到智能:机器学习的本质
在掌握了 NumPy 和 Pandas 这些数据处理工具后,我们终于可以回答这个核心问题:如何让计算机从数据中学习?
💡 核心概念:机器学习的本质是从数据中自动发现模式,并用这些模式对未见过的数据做出预测。
传统编程:
规则 + 数据 → 结果机器学习:
数据 + 结果 → 规则Scikit-Learn 是 Python 中最成熟、最易用的机器学习库,被工业界广泛采用。它提供了:
- 统一的 API 设计(所有模型都有
fit()和predict()方法) - 丰富的算法实现(分类、回归、聚类、降维)
- 完善的数据预处理工具
- 强大的模型评估和选择功能
机器学习的三大范式
1. 监督学习(Supervised Learning)
定义:从有标签的数据中学习,预测新数据的标签。
训练数据:(特征, 标签) 对
例如:(房屋面积、位置、房龄, 房价)
(邮件内容, 是否垃圾邮件)两大子类:
- 分类(Classification):预测离散标签(如:垃圾邮件/正常邮件)
- 回归(Regression):预测连续值(如:房价、温度)
2. 无监督学习(Unsupervised Learning)
定义:从无标签的数据中发现隐藏的结构或模式。
训练数据:只有特征,没有标签
例如:(用户浏览记录) → 用户分群
(文章内容) → 主题聚类常见任务:
- 聚类(Clustering):将相似的数据点分组
- 降维(Dimensionality Reduction):压缩高维数据
- 异常检测(Anomaly Detection):发现异常数据点
3. 强化学习(Reinforcement Learning)
定义:通过与环境交互,学习最优策略以最大化累积奖励。
Agent → 执行动作 → 环境反馈奖励 → Agent 学习经典应用:AlphaGo、自动驾驶、机器人控制
📌 本节重点:我们将深入监督学习和无监督学习,强化学习在后续章节介绍。
Scikit-Learn 的设计哲学
统一的 Estimator API
所有 Scikit-Learn 模型都遵循相同的接口:
from sklearn.base import BaseEstimator, ClassifierMixin
# 所有模型都继承自 BaseEstimator
class MyModel(BaseEstimator, ClassifierMixin):
def fit(self, X, y):
"""从训练数据中学习"""
# 学习参数
return self
def predict(self, X):
"""对新数据做预测"""
# 返回预测结果
return predictions
def score(self, X, y):
"""评估模型性能"""
# 返回准确率、R²等指标
return accuracy这种统一的设计让我们可以轻松切换不同算法:
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
# 三个不同的模型,但 API 完全相同
models = [
LogisticRegression(),
RandomForestClassifier(),
SVC()
]
for model in models:
model.fit(X_train, y_train) # 训练
score = model.score(X_test, y_test) # 评估
print(f"{model.__class__.__name__}: {score:.4f}")第一个机器学习项目:鸢尾花分类
让我们用经典的 Iris 数据集开始:
import numpy as np
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report
from typing import Tuple
# 1. 加载数据
iris = load_iris()
X = iris.data # 特征:花萼长度、花萼宽度、花瓣长度、花瓣宽度
y = iris.target # 标签:0=setosa, 1=versicolor, 2=virginica
print(f"数据集形状: {X.shape}")
print(f"特征名称: {iris.feature_names}")
print(f"类别名称: {iris.target_names}")
# 转换为 DataFrame 便于分析
df = pd.DataFrame(X, columns=iris.feature_names)
df['species'] = iris.target_names[y]
print(df.head())
# 2. 数据分割(训练集 80%,测试集 20%)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
print(f"\n训练集: {X_train.shape}, 测试集: {X_test.shape}")
print(f"训练集类别分布: {np.bincount(y_train)}")
print(f"测试集类别分布: {np.bincount(y_test)}")
# 3. 特征标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
print(f"\n标准化前均值: {X_train[:, 0].mean():.4f}, 标准差: {X_train[:, 0].std():.4f}")
print(f"标准化后均值: {X_train_scaled[:, 0].mean():.4f}, 标准差: {X_train_scaled[:, 0].std():.4f}")
# 4. 训练模型
model = LogisticRegression(max_iter=200, random_state=42)
model.fit(X_train_scaled, y_train)
# 5. 预测
y_pred = model.predict(X_test_scaled)
# 6. 评估
accuracy = accuracy_score(y_test, y_pred)
print(f"\n准确率: {accuracy:.4f}")
# 详细的分类报告
print("\n分类报告:")
print(classification_report(y_test, y_pred, target_names=iris.target_names))输出示例:
数据集形状: (150, 4)
训练集: (120, 4), 测试集: (30, 4)
准确率: 1.0000
分类报告:
precision recall f1-score support
setosa 1.00 1.00 1.00 10
versicolor 1.00 1.00 1.00 9
virginica 1.00 1.00 1.00 11
accuracy 1.00 30💡 关键步骤:
- 加载数据:使用内置数据集或读取文件
- 数据分割:训练集用于学习,测试集用于评估
- 特征缩放:让不同尺度的特征在同一量级
- 训练模型:调用
fit()方法- 预测评估:用
predict()和评估指标
数据预处理:机器学习的 80% 工作
1. 特征缩放(Feature Scaling)
为什么需要缩放?
许多机器学习算法(如 KNN、SVM、神经网络)对特征的尺度敏感。考虑这个例子:
# 两个特征:房屋面积(平方米)和房间数
X = np.array([
[50, 2], # 50 平米,2 个房间
[150, 4], # 150 平米,4 个房间
])
# 如果直接计算欧氏距离,面积的影响会远大于房间数
# 因为面积的数值范围 [50, 150],而房间数只有 [2, 4]标准化(Standardization)
from sklearn.preprocessing import StandardScaler
# 公式:z = (x - μ) / σ
# 结果:均值=0,标准差=1
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
print("原始数据:")
print(X)
print("\n标准化后:")
print(X_scaled)
print(f"均值: {X_scaled.mean(axis=0)}")
print(f"标准差: {X_scaled.std(axis=0)}")归一化(Normalization)
from sklearn.preprocessing import MinMaxScaler
# 公式:x' = (x - x_min) / (x_max - x_min)
# 结果:数据范围 [0, 1]
scaler = MinMaxScaler()
X_normalized = scaler.fit_transform(X)
print("归一化后:")
print(X_normalized)
print(f"最小值: {X_normalized.min(axis=0)}")
print(f"最大值: {X_normalized.max(axis=0)}")⚠️ 重要:必须先在训练集上
fit(),再对训练集和测试集都进行transform()。python# ❌ 错误做法 X_train = scaler.fit_transform(X_train) X_test = scaler.fit_transform(X_test) # 会导致数据泄露! # ✅ 正确做法 X_train = scaler.fit_transform(X_train) X_test = scaler.transform(X_test) # 使用训练集的统计量
2. 类别特征编码
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
import pandas as pd
# 示例数据
data = pd.DataFrame({
'city': ['Beijing', 'Shanghai', 'Beijing', 'Guangzhou', 'Shanghai'],
'size': ['S', 'M', 'L', 'M', 'S'],
'price': [100, 200, 150, 180, 120]
})
# 方法 1:标签编码(LabelEncoder)
# 适用于有序类别(如 S < M < L)
le = LabelEncoder()
data['size_encoded'] = le.fit_transform(data['size'])
print("标签编码:")
print(data[['size', 'size_encoded']])
# 方法 2:独热编码(One-Hot Encoding)
# 适用于无序类别(如城市)
data_onehot = pd.get_dummies(data, columns=['city'], prefix='city')
print("\n独热编码:")
print(data_onehot)
# 使用 Scikit-Learn 的 OneHotEncoder
from sklearn.preprocessing import OneHotEncoder
ohe = OneHotEncoder(sparse_output=False)
city_encoded = ohe.fit_transform(data[['city']])
print(f"\nOneHotEncoder 结果形状: {city_encoded.shape}")
print(ohe.get_feature_names_out())3. 缺失值处理
from sklearn.impute import SimpleImputer
import numpy as np
# 创建带缺失值的数据
X = np.array([
[1, 2, np.nan],
[3, np.nan, 6],
[np.nan, 8, 9],
[4, 5, 6]
])
# 策略 1:用均值填充
imputer_mean = SimpleImputer(strategy='mean')
X_mean = imputer_mean.fit_transform(X)
print("均值填充:")
print(X_mean)
# 策略 2:用中位数填充
imputer_median = SimpleImputer(strategy='median')
X_median = imputer_median.fit_transform(X)
print("\n中位数填充:")
print(X_median)
# 策略 3:用最频繁值填充
imputer_frequent = SimpleImputer(strategy='most_frequent')
X_frequent = imputer_frequent.fit_transform(X)
print("\n众数填充:")
print(X_frequent)4. 完整的预处理 Pipeline
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
# 定义数值特征和类别特征的处理流程
numeric_features = ['age', 'income', 'credit_score']
categorical_features = ['city', 'occupation']
# 数值特征 Pipeline
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
# 类别特征 Pipeline
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('onehot', OneHotEncoder(handle_unknown='ignore'))
])
# 组合所有预处理步骤
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
# 完整的 ML Pipeline
full_pipeline = Pipeline(steps=[
('preprocessor', preprocessor),
('classifier', LogisticRegression())
])
# 一步到位:预处理 + 训练
# full_pipeline.fit(X_train, y_train)
# y_pred = full_pipeline.predict(X_test)分类算法详解
1. 逻辑回归(Logistic Regression)
虽然名字里有"回归",但逻辑回归是分类算法。
数学原理:
1. 线性组合:z = w₁x₁ + w₂x₂ + ... + b
2. Sigmoid 函数:σ(z) = 1 / (1 + e⁻ᶻ)
3. 预测概率:P(y=1|x) = σ(z)
4. 决策规则:如果 P(y=1|x) > 0.5,预测类别 1,否则类别 0from sklearn.linear_model import LogisticRegression
import matplotlib.pyplot as plt
# 生成二分类数据
from sklearn.datasets import make_classification
X, y = make_classification(
n_samples=200,
n_features=2,
n_informative=2,
n_redundant=0,
n_clusters_per_class=1,
random_state=42
)
# 训练逻辑回归
lr = LogisticRegression()
lr.fit(X, y)
# 可视化决策边界
def plot_decision_boundary(model, X, y):
h = 0.02 # 网格步长
x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
np.arange(y_min, y_max, h))
Z = model.predict(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
plt.contourf(xx, yy, Z, alpha=0.3)
plt.scatter(X[:, 0], X[:, 1], c=y, edgecolors='k')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.title('Decision Boundary')
plt.show()
plot_decision_boundary(lr, X, y)
# 预测概率
proba = lr.predict_proba(X[:5])
print("预测概率:")
print(proba)2. 决策树(Decision Tree)
决策树通过一系列 if-else 规则进行预测。
from sklearn.tree import DecisionTreeClassifier, plot_tree
import matplotlib.pyplot as plt
# 训练决策树
dt = DecisionTreeClassifier(max_depth=3, random_state=42)
dt.fit(X, y)
# 可视化决策树
plt.figure(figsize=(20, 10))
plot_tree(dt, filled=True, feature_names=['Feature 1', 'Feature 2'])
plt.show()
# 特征重要性
importances = dt.feature_importances_
print(f"特征重要性: {importances}")关键超参数:
max_depth:树的最大深度(防止过拟合)min_samples_split:分裂内部节点所需的最小样本数min_samples_leaf:叶子节点所需的最小样本数
3. 随机森林(Random Forest)
随机森林是集成学习的代表,通过构建多个决策树并投票。
from sklearn.ensemble import RandomForestClassifier
# 训练随机森林
rf = RandomForestClassifier(
n_estimators=100, # 树的数量
max_depth=10,
random_state=42
)
rf.fit(X_train, y_train)
# 评估
accuracy = rf.score(X_test, y_test)
print(f"准确率: {accuracy:.4f}")
# 特征重要性
feature_importances = pd.DataFrame({
'feature': iris.feature_names,
'importance': rf.feature_importances_
}).sort_values('importance', ascending=False)
print("\n特征重要性:")
print(feature_importances)为什么随机森林效果好?
- Bagging:每棵树训练在不同的数据子集上(有放回抽样)
- 特征随机性:每次分裂只考虑部分特征
- 降低方差:多个模型平均可以减少过拟合
4. 支持向量机(SVM)
SVM 寻找最优的决策边界(最大间隔超平面)。
from sklearn.svm import SVC
# 线性 SVM
svm_linear = SVC(kernel='linear', C=1.0)
svm_linear.fit(X_train, y_train)
# RBF 核 SVM(可以处理非线性问题)
svm_rbf = SVC(kernel='rbf', C=1.0, gamma='scale')
svm_rbf.fit(X_train, y_train)
# 对比性能
print(f"线性 SVM 准确率: {svm_linear.score(X_test, y_test):.4f}")
print(f"RBF SVM 准确率: {svm_rbf.score(X_test, y_test):.4f}")关键参数:
C:正则化参数(越大越倾向于拟合训练数据)kernel:核函数类型('linear', 'rbf', 'poly')gamma:RBF 核的参数(影响决策边界的"光滑程度")
5. K近邻(K-Nearest Neighbors)
KNN 是最简单的算法之一:预测新样本时,找到最近的 K 个训练样本,用它们的标签投票。
from sklearn.neighbors import KNeighborsClassifier
# 训练 KNN
knn = KNeighborsClassifier(n_neighbors=5)
knn.fit(X_train, y_train)
# 预测
y_pred = knn.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"准确率: {accuracy:.4f}")
# 找到不同 K 值的最佳设置
k_values = range(1, 31)
accuracies = []
for k in k_values:
knn = KNeighborsClassifier(n_neighbors=k)
knn.fit(X_train, y_train)
accuracies.append(knn.score(X_test, y_test))
plt.plot(k_values, accuracies)
plt.xlabel('K')
plt.ylabel('Accuracy')
plt.title('KNN: Accuracy vs K')
plt.show()回归算法
1. 线性回归(Linear Regression)
from sklearn.linear_model import LinearRegression
from sklearn.datasets import make_regression
# 生成回归数据
X, y = make_regression(n_samples=100, n_features=1, noise=10, random_state=42)
# 训练线性回归
lr = LinearRegression()
lr.fit(X, y)
# 参数
print(f"斜率: {lr.coef_[0]:.4f}")
print(f"截距: {lr.intercept_:.4f}")
# 预测
y_pred = lr.predict(X)
# R² 评分(决定系数)
from sklearn.metrics import r2_score, mean_squared_error
r2 = r2_score(y, y_pred)
mse = mean_squared_error(y, y_pred)
rmse = np.sqrt(mse)
print(f"R²: {r2:.4f}")
print(f"RMSE: {rmse:.4f}")
# 可视化
plt.scatter(X, y, alpha=0.5)
plt.plot(X, y_pred, color='red', linewidth=2)
plt.xlabel('X')
plt.ylabel('y')
plt.title('Linear Regression')
plt.show()2. 正则化回归
Lasso(L1 正则化):
from sklearn.linear_model import Lasso
# Lasso 会将一些系数压缩到 0,实现特征选择
lasso = Lasso(alpha=0.1)
lasso.fit(X_train, y_train)Ridge(L2 正则化):
from sklearn.linear_model import Ridge
# Ridge 会缩小所有系数,但不会变为 0
ridge = Ridge(alpha=1.0)
ridge.fit(X_train, y_train)ElasticNet(L1 + L2):
from sklearn.linear_model import ElasticNet
# 结合了 Lasso 和 Ridge 的优点
elastic = ElasticNet(alpha=0.1, l1_ratio=0.5)
elastic.fit(X_train, y_train)模型评估:不仅仅是准确率
分类评估指标
from sklearn.metrics import (
accuracy_score,
precision_score,
recall_score,
f1_score,
confusion_matrix,
roc_auc_score,
roc_curve
)
# 假设我们有预测结果
y_true = np.array([0, 1, 1, 0, 1, 1, 0, 0, 1, 0])
y_pred = np.array([0, 1, 0, 0, 1, 1, 0, 1, 1, 0])
# 1. 准确率(Accuracy)
accuracy = accuracy_score(y_true, y_pred)
print(f"准确率: {accuracy:.4f}")
# 2. 精确率(Precision):预测为正的样本中,真正为正的比例
precision = precision_score(y_true, y_pred)
print(f"精确率: {precision:.4f}")
# 3. 召回率(Recall):真正为正的样本中,被预测为正的比例
recall = recall_score(y_true, y_pred)
print(f"召回率: {recall:.4f}")
# 4. F1 分数:精确率和召回率的调和平均数
f1 = f1_score(y_true, y_pred)
print(f"F1 分数: {f1:.4f}")
# 5. 混淆矩阵
cm = confusion_matrix(y_true, y_pred)
print("\n混淆矩阵:")
print(cm)混淆矩阵解释:
预测为负 预测为正
实际为负 (TN) 4 1
实际为正 (FN) 1 4- True Positive (TP):预测为正,实际为正
- True Negative (TN):预测为负,实际为负
- False Positive (FP):预测为正,实际为负(第一类错误)
- False Negative (FN):预测为负,实际为正(第二类错误)
# 计算各个指标
TN, FP, FN, TP = cm.ravel()
print(f"TP: {TP}, TN: {TN}, FP: {FP}, FN: {FN}")
print(f"精确率: {TP / (TP + FP):.4f}")
print(f"召回率: {TP / (TP + FN):.4f}")ROC 曲线和 AUC
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
# 生成数据
X, y = make_classification(n_samples=1000, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 训练模型
lr = LogisticRegression()
lr.fit(X_train, y_train)
# 预测概率
y_proba = lr.predict_proba(X_test)[:, 1]
# 计算 ROC 曲线
fpr, tpr, thresholds = roc_curve(y_test, y_proba)
auc = roc_auc_score(y_test, y_proba)
# 绘制 ROC 曲线
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label=f'ROC Curve (AUC = {auc:.4f})')
plt.plot([0, 1], [0, 1], 'k--', label='Random Classifier')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend()
plt.show()💡 AUC 解释:
- AUC = 1.0:完美分类器
- AUC = 0.5:随机猜测
- AUC < 0.5:比随机猜测还差(说明模型反了)
回归评估指标
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
y_true = np.array([3.0, -0.5, 2.0, 7.0])
y_pred = np.array([2.5, 0.0, 2.0, 8.0])
# 1. 平均绝对误差(MAE)
mae = mean_absolute_error(y_true, y_pred)
print(f"MAE: {mae:.4f}")
# 2. 均方误差(MSE)
mse = mean_squared_error(y_true, y_pred)
print(f"MSE: {mse:.4f}")
# 3. 均方根误差(RMSE)
rmse = np.sqrt(mse)
print(f"RMSE: {rmse:.4f}")
# 4. R² 分数(决定系数)
r2 = r2_score(y_true, y_pred)
print(f"R²: {r2:.4f}")交叉验证:更可靠的性能评估
问题:如果只分割一次训练集和测试集,结果可能不稳定(取决于随机分割)。
解决方案:K 折交叉验证(K-Fold Cross-Validation)
from sklearn.model_selection import cross_val_score, KFold
# K 折交叉验证
kfold = KFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(lr, X, y, cv=kfold, scoring='accuracy')
print(f"每折准确率: {scores}")
print(f"平均准确率: {scores.mean():.4f} (+/- {scores.std():.4f})")工作原理:
数据集分为 5 份:
第 1 折:[Test] [Train] [Train] [Train] [Train]
第 2 折:[Train] [Test] [Train] [Train] [Train]
第 3 折:[Train] [Train] [Test] [Train] [Train]
第 4 折:[Train] [Train] [Train] [Test] [Train]
第 5 折:[Train] [Train] [Train] [Train] [Test]
最终结果 = 5 次测试的平均值分层 K 折交叉验证(适用于类别不平衡):
from sklearn.model_selection import StratifiedKFold
# 确保每折中各类别的比例与整体一致
skfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(lr, X, y, cv=skfold, scoring='accuracy')超参数调优
1. 网格搜索(Grid Search)
from sklearn.model_selection import GridSearchCV
from sklearn.svm import SVC
# 定义参数网格
param_grid = {
'C': [0.1, 1, 10, 100],
'gamma': ['scale', 'auto', 0.001, 0.01, 0.1],
'kernel': ['rbf', 'linear']
}
# 网格搜索
grid_search = GridSearchCV(
SVC(),
param_grid,
cv=5,
scoring='accuracy',
n_jobs=-1, # 使用所有 CPU 核心
verbose=2
)
grid_search.fit(X_train, y_train)
# 最佳参数
print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳准确率: {grid_search.best_score_:.4f}")
# 使用最佳模型预测
best_model = grid_search.best_estimator_
y_pred = best_model.predict(X_test)2. 随机搜索(Random Search)
当参数空间很大时,随机搜索比网格搜索更高效。
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import uniform, randint
# 定义参数分布
param_distributions = {
'C': uniform(0.1, 100),
'gamma': uniform(0.001, 0.1),
'kernel': ['rbf', 'linear']
}
# 随机搜索
random_search = RandomizedSearchCV(
SVC(),
param_distributions,
n_iter=50, # 尝试 50 种随机组合
cv=5,
random_state=42,
n_jobs=-1
)
random_search.fit(X_train, y_train)
print(f"最佳参数: {random_search.best_params_}")无监督学习
1. K-Means 聚类
from sklearn.cluster import KMeans
from sklearn.datasets import make_blobs
# 生成聚类数据
X, y_true = make_blobs(n_samples=300, centers=4, random_state=42)
# K-Means 聚类
kmeans = KMeans(n_clusters=4, random_state=42)
y_pred = kmeans.fit_predict(X)
# 可视化
plt.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='viridis')
plt.scatter(kmeans.cluster_centers_[:, 0],
kmeans.cluster_centers_[:, 1],
s=300, c='red', marker='X', label='Centroids')
plt.legend()
plt.title('K-Means Clustering')
plt.show()
# 惯性(Inertia):样本到最近聚类中心的距离平方和
print(f"Inertia: {kmeans.inertia_:.2f}")肘部法则(Elbow Method):选择最佳 K 值
inertias = []
K_range = range(1, 11)
for k in K_range:
kmeans = KMeans(n_clusters=k, random_state=42)
kmeans.fit(X)
inertias.append(kmeans.inertia_)
plt.plot(K_range, inertias, marker='o')
plt.xlabel('Number of Clusters (K)')
plt.ylabel('Inertia')
plt.title('Elbow Method')
plt.show()2. 主成分分析(PCA)
PCA 用于降维,保留最重要的特征。
from sklearn.decomposition import PCA
from sklearn.datasets import load_digits
# 加载手写数字数据集(64 维)
digits = load_digits()
X = digits.data
y = digits.target
print(f"原始维度: {X.shape}")
# 降维到 2 维
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X)
print(f"降维后: {X_pca.shape}")
print(f"解释的方差比例: {pca.explained_variance_ratio_}")
# 可视化
plt.figure(figsize=(10, 8))
scatter = plt.scatter(X_pca[:, 0], X_pca[:, 1], c=y, cmap='tab10', alpha=0.5)
plt.colorbar(scatter)
plt.xlabel('First Principal Component')
plt.ylabel('Second Principal Component')
plt.title('PCA of Digits Dataset')
plt.show()选择合适的主成分数量:
# 保留 95% 的方差
pca_95 = PCA(n_components=0.95)
X_pca_95 = pca_95.fit_transform(X)
print(f"保留 95% 方差需要 {X_pca_95.shape[1]} 个主成分")综合实战:客户流失预测
让我们构建一个完整的机器学习项目:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
from typing import Tuple, Dict, Any
import matplotlib.pyplot as plt
import seaborn as sns
class ChurnPredictionPipeline:
"""
客户流失预测完整流程
包含:
1. 数据加载和探索
2. 特征工程
3. 模型训练和评估
4. 超参数调优
5. 模型解释
"""
def __init__(self):
self.models = {}
self.best_model = None
self.scaler = StandardScaler()
self.label_encoders = {}
def generate_data(self, n_samples: int = 5000) -> pd.DataFrame:
"""生成模拟客户数据"""
np.random.seed(42)
data = {
'customer_id': range(1, n_samples + 1),
'age': np.random.randint(18, 70, n_samples),
'tenure_months': np.random.randint(1, 72, n_samples),
'monthly_charges': np.random.uniform(20, 120, n_samples),
'total_charges': np.random.uniform(100, 8000, n_samples),
'num_products': np.random.randint(1, 5, n_samples),
'has_internet': np.random.choice([0, 1], n_samples),
'has_phone': np.random.choice([0, 1], n_samples),
'contract_type': np.random.choice(['Month-to-month', 'One year', 'Two year'], n_samples),
'payment_method': np.random.choice(['Electronic', 'Mail', 'Bank', 'Credit'], n_samples),
'support_calls': np.random.randint(0, 10, n_samples),
}
df = pd.DataFrame(data)
# 基于特征生成流失标签(有一定的逻辑关系)
churn_prob = (
0.1 +
0.3 * (df['contract_type'] == 'Month-to-month').astype(int) +
0.2 * (df['tenure_months'] < 12).astype(int) +
0.1 * (df['support_calls'] > 5).astype(int) +
0.15 * (df['monthly_charges'] > 80).astype(int)
)
churn_prob = np.clip(churn_prob, 0, 1)
df['churn'] = np.random.binomial(1, churn_prob)
return df
def eda(self, df: pd.DataFrame) -> None:
"""探索性数据分析"""
print("=" * 60)
print("数据概览")
print("=" * 60)
print(f"数据集形状: {df.shape}")
print(f"\n流失率: {df['churn'].mean():.2%}")
print("\n数值特征统计:")
print(df.describe())
print("\n缺失值统计:")
print(df.isnull().sum())
# 可视化流失率
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# 1. 合同类型 vs 流失率
churn_by_contract = df.groupby('contract_type')['churn'].mean()
axes[0, 0].bar(churn_by_contract.index, churn_by_contract.values)
axes[0, 0].set_title('Churn Rate by Contract Type')
axes[0, 0].set_ylabel('Churn Rate')
# 2. 在网时长 vs 流失率
tenure_bins = [0, 12, 24, 36, 48, 72]
df['tenure_group'] = pd.cut(df['tenure_months'], bins=tenure_bins)
churn_by_tenure = df.groupby('tenure_group')['churn'].mean()
axes[0, 1].bar(range(len(churn_by_tenure)), churn_by_tenure.values)
axes[0, 1].set_title('Churn Rate by Tenure')
axes[0, 1].set_ylabel('Churn Rate')
axes[0, 1].set_xticklabels(churn_by_tenure.index, rotation=45)
# 3. 月费用分布
axes[1, 0].hist(df[df['churn'] == 0]['monthly_charges'], alpha=0.5, label='No Churn', bins=30)
axes[1, 0].hist(df[df['churn'] == 1]['monthly_charges'], alpha=0.5, label='Churn', bins=30)
axes[1, 0].set_title('Monthly Charges Distribution')
axes[1, 0].legend()
# 4. 相关性热图
numeric_cols = df.select_dtypes(include=[np.number]).columns
corr = df[numeric_cols].corr()
sns.heatmap(corr, annot=True, fmt='.2f', ax=axes[1, 1], cmap='coolwarm')
axes[1, 1].set_title('Feature Correlation')
plt.tight_layout()
plt.show()
def feature_engineering(self, df: pd.DataFrame) -> pd.DataFrame:
"""特征工程"""
df = df.copy()
# 1. 创建新特征
df['avg_monthly_charges'] = df['total_charges'] / (df['tenure_months'] + 1)
df['charges_to_products'] = df['monthly_charges'] / (df['num_products'] + 1)
df['tenure_x_products'] = df['tenure_months'] * df['num_products']
# 2. 分箱特征
df['age_group'] = pd.cut(df['age'], bins=[0, 30, 50, 100], labels=['young', 'middle', 'senior'])
df['tenure_group'] = pd.cut(df['tenure_months'], bins=[0, 12, 24, 72], labels=['new', 'medium', 'long'])
# 3. 是否高价值客户
df['is_high_value'] = (df['monthly_charges'] > df['monthly_charges'].median()).astype(int)
return df
def prepare_data(self, df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""准备训练数据"""
# 删除不需要的列
df = df.drop(['customer_id'], axis=1)
# 编码类别特征
categorical_cols = df.select_dtypes(include=['object', 'category']).columns
for col in categorical_cols:
if col != 'churn':
le = LabelEncoder()
df[col] = le.fit_transform(df[col].astype(str))
self.label_encoders[col] = le
# 分离特征和目标
X = df.drop('churn', axis=1)
y = df['churn']
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# 标准化
X_train = self.scaler.fit_transform(X_train)
X_test = self.scaler.transform(X_test)
return X_train, X_test, y_train, y_test
def train_models(
self,
X_train: np.ndarray,
y_train: np.ndarray
) -> Dict[str, Any]:
"""训练多个模型"""
models = {
'Logistic Regression': LogisticRegression(max_iter=1000, random_state=42),
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42)
}
print("\n" + "=" * 60)
print("训练模型(5 折交叉验证)")
print("=" * 60)
results = {}
for name, model in models.items():
scores = cross_val_score(model, X_train, y_train, cv=5, scoring='roc_auc')
results[name] = {
'model': model,
'cv_mean': scores.mean(),
'cv_std': scores.std()
}
print(f"{name:25s}: AUC = {scores.mean():.4f} (+/- {scores.std():.4f})")
# 训练完整模型
model.fit(X_train, y_train)
self.models = results
return results
def evaluate_model(
self,
model: Any,
X_test: np.ndarray,
y_test: np.ndarray,
model_name: str
) -> None:
"""评估单个模型"""
y_pred = model.predict(X_test)
y_proba = model.predict_proba(X_test)[:, 1]
print(f"\n{'=' * 60}")
print(f"{model_name} - 测试集评估")
print(f"{'=' * 60}")
# 分类报告
print("\n分类报告:")
print(classification_report(y_test, y_pred, target_names=['No Churn', 'Churn']))
# AUC
auc = roc_auc_score(y_test, y_proba)
print(f"\nROC AUC: {auc:.4f}")
# 混淆矩阵
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title(f'{model_name} - Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()
def tune_best_model(
self,
X_train: np.ndarray,
y_train: np.ndarray
) -> Any:
"""对最佳模型进行超参数调优"""
print("\n" + "=" * 60)
print("超参数调优(Random Forest)")
print("=" * 60)
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [10, 20, 30, None],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4]
}
rf = RandomForestClassifier(random_state=42)
grid_search = GridSearchCV(
rf,
param_grid,
cv=5,
scoring='roc_auc',
n_jobs=-1,
verbose=1
)
grid_search.fit(X_train, y_train)
print(f"\n最佳参数: {grid_search.best_params_}")
print(f"最佳 AUC: {grid_search.best_score_:.4f}")
self.best_model = grid_search.best_estimator_
return self.best_model
def feature_importance(self, feature_names: list) -> None:
"""分析特征重要性"""
if self.best_model is None:
print("请先训练模型")
return
importances = self.best_model.feature_importances_
indices = np.argsort(importances)[::-1]
plt.figure(figsize=(12, 8))
plt.title("Feature Importances")
plt.bar(range(len(importances)), importances[indices])
plt.xticks(range(len(importances)), [feature_names[i] for i in indices], rotation=90)
plt.tight_layout()
plt.show()
print("\nTop 10 重要特征:")
for i in range(min(10, len(importances))):
idx = indices[i]
print(f"{i+1:2d}. {feature_names[idx]:30s}: {importances[idx]:.4f}")
# 运行完整 Pipeline
pipeline = ChurnPredictionPipeline()
# 1. 生成数据
df = pipeline.generate_data(n_samples=5000)
# 2. 探索性分析
pipeline.eda(df)
# 3. 特征工程
df_engineered = pipeline.feature_engineering(df)
# 4. 准备数据
X_train, X_test, y_train, y_test = pipeline.prepare_data(df_engineered)
feature_names = [col for col in df_engineered.columns if col not in ['churn', 'customer_id']]
print(f"\n特征数量: {X_train.shape[1]}")
print(f"训练集大小: {X_train.shape[0]}")
print(f"测试集大小: {X_test.shape[0]}")
# 5. 训练多个模型
results = pipeline.train_models(X_train, y_train)
# 6. 评估所有模型
for name, result in results.items():
pipeline.evaluate_model(result['model'], X_test, y_test, name)
# 7. 超参数调优
best_model = pipeline.tune_best_model(X_train, y_train)
# 8. 最终评估
pipeline.evaluate_model(best_model, X_test, y_test, "Tuned Random Forest")
# 9. 特征重要性分析
pipeline.feature_importance(feature_names)偏差-方差权衡(Bias-Variance Tradeoff)
这是机器学习中最重要的概念之一。
# 总误差 = 偏差² + 方差 + 不可约误差高偏差(High Bias):模型太简单,欠拟合(underfitting)
- 例如:用线性模型拟合非线性数据
高方差(High Variance):模型太复杂,过拟合(overfitting)
- 例如:用高阶多项式拟合少量数据
示例:
from sklearn.linear_model import Ridge
from sklearn.preprocessing import PolynomialFeatures
from sklearn.pipeline import make_pipeline
# 生成非线性数据
np.random.seed(0)
X = np.sort(np.random.rand(40, 1), axis=0)
y = np.sin(2 * np.pi * X).ravel() + np.random.normal(0, 0.1, 40)
# 测试不同复杂度的模型
degrees = [1, 4, 15]
plt.figure(figsize=(15, 5))
for i, degree in enumerate(degrees):
ax = plt.subplot(1, 3, i + 1)
# 创建多项式回归模型
model = make_pipeline(PolynomialFeatures(degree), Ridge())
model.fit(X, y)
# 绘制预测曲线
X_test = np.linspace(0, 1, 100).reshape(-1, 1)
y_pred = model.predict(X_test)
plt.scatter(X, y, color='blue', alpha=0.5)
plt.plot(X_test, y_pred, color='red', linewidth=2)
plt.title(f'Degree {degree}')
plt.ylim(-2, 2)
plt.show()如何解决:
- 正则化:L1/L2 惩罚
- 更多数据:更多训练数据可以降低方差
- 特征选择:去除无关特征
- 集成方法:Bagging 降低方差,Boosting 降低偏差
实用技巧和最佳实践
1. 处理类别不平衡
from sklearn.utils.class_weight import compute_class_weight
# 方法 1:类权重
class_weights = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
model = LogisticRegression(class_weight='balanced')
# 方法 2:过采样(SMOTE)
from imblearn.over_sampling import SMOTE
smote = SMOTE(random_state=42)
X_resampled, y_resampled = smote.fit_resample(X_train, y_train)
# 方法 3:欠采样
from imblearn.under_sampling import RandomUnderSampler
rus = RandomUnderSampler(random_state=42)
X_resampled, y_resampled = rus.fit_resample(X_train, y_train)2. 特征选择
from sklearn.feature_selection import SelectKBest, f_classif, RFE
# 方法 1:单变量特征选择
selector = SelectKBest(f_classif, k=10)
X_selected = selector.fit_transform(X_train, y_train)
# 方法 2:递归特征消除(RFE)
rfe = RFE(estimator=RandomForestClassifier(), n_features_to_select=10)
X_rfe = rfe.fit_transform(X_train, y_train)
# 方法 3:基于模型的特征选择
from sklearn.feature_selection import SelectFromModel
selector = SelectFromModel(RandomForestClassifier(n_estimators=100))
selector.fit(X_train, y_train)
X_selected = selector.transform(X_train)3. 模型持久化
import joblib
# 保存模型
joblib.dump(model, 'model.pkl')
# 加载模型
loaded_model = joblib.load('model.pkl')
predictions = loaded_model.predict(X_test)小结
在本节中,我们深入学习了:
✅ 机器学习三大范式
- 监督学习:分类和回归
- 无监督学习:聚类和降维
- 强化学习:策略学习
✅ Scikit-Learn 核心 API
- 统一的 Estimator 接口
- Pipeline 和 ColumnTransformer
- 模型持久化
✅ 数据预处理
- 特征缩放(标准化、归一化)
- 类别编码(Label Encoding、One-Hot Encoding)
- 缺失值处理
✅ 分类算法
- 逻辑回归、决策树、随机森林
- SVM、KNN
- 集成学习
✅ 模型评估
- 准确率、精确率、召回率、F1
- ROC 曲线和 AUC
- 交叉验证
✅ 超参数调优
- 网格搜索(Grid Search)
- 随机搜索(Random Search)
✅ 实战项目
- 客户流失预测完整流程
- 特征工程和模型解释
练习题
基础题
- 使用 Scikit-Learn 的
load_breast_cancer数据集,训练一个逻辑回归模型并计算准确率 - 用网格搜索为决策树找到最佳的
max_depth参数 - 绘制一个模型的 ROC 曲线
进阶题
- 实现一个完整的 Pipeline,包含:
- 数值特征标准化
- 类别特征 One-Hot 编码
- PCA 降维到 10 维
- 随机森林分类
- 对一个不平衡数据集,比较使用类权重、SMOTE、欠采样三种方法的效果
- 用递归特征消除(RFE)选择最重要的 5 个特征
挑战题
- 构建一个 Voting Classifier,结合逻辑回归、随机森林和 SVM,比较与单个模型的性能
- 实现一个自定义的 Transformer,可以在 Pipeline 中使用
- 用 K-Means 对 Iris 数据集聚类,然后用 PCA 可视化结果,并与真实标签对比
🔗 与 LangChain 的联系
Scikit-Learn 在 AI Agent 开发中的应用:
文本分类器:用于意图识别
pythonfrom sklearn.feature_extraction.text import TfidfVectorizer from sklearn.naive_bayes import MultinomialNB # 训练意图分类器 vectorizer = TfidfVectorizer() classifier = MultinomialNB()聚类:用于对话历史分组
异常检测:识别异常用户行为
推荐系统:基于相似度的文档推荐
在下一章,我们将学习深度学习框架 PyTorch 和 TensorFlow,它们将为我们打开神经网络的大门!
下一节:10.4 深度学习框架:PyTorch 与 TensorFlow
在下一节,我们将从传统机器学习迈向深度学习,学习 PyTorch 的张量操作、自动微分和神经网络构建。