🧠 Keras 3.x 完整教程
Keras 3 是最新的高级神经网络 API,支持多后端(TensorFlow、PyTorch、JAX),让深度学习开发和部署更加灵活。本教程将从零开始,系统讲解 Keras 的架构、核心类和高级应用。
Keras 3 核心特性:
- 🔄 多后端支持:可切换 TensorFlow、PyTorch 或 JAX 后端
- 🚀 性能优化:自动启用 XLA 编译加速
- 🎯 原生支持:支持原生 TensorFlow、PyTorch 和 JAX 工作流
- 📦 向后兼容:完全兼容 Keras 2.x API
- 🔧 灵活部署:支持多种模型格式导出
📚 学习路径
一、Keras 架构图
Keras 采用清晰的三层架构设计,从用户应用到后端实现,层次分明,易于理解和扩展。
🏗️ Keras 3 架构总览
预训练模型 | 示例应用 | 用户项目
Models | Layers | Losses | Optimizers | Metrics
架构层次说明
| 层次 | 组件 | 功能 | 示例 |
|---|---|---|---|
| 应用层 | 预训练模型、示例 | 直接可用的深度学习应用 | keras.applications.VGG16 |
| API 层 | Models、Layers、Optimizers | 构建和训练模型的核心 API | keras.Model, keras.layers.Dense |
| 后端层 | TensorFlow/PyTorch/JAX | 底层张量运算和自动微分 | keras.ops.matmul |
二、核心类详解
Keras 的核心类构成了深度学习模型的基础,理解这些类是掌握 Keras 的关键。
2.1 Model 类 - 模型基类
📦 keras.Model
所有模型的基类,支持函数式 API 和自定义模型
- 主要方法:
call(),compile(),fit(),evaluate(),predict() - 属性:
layers,inputs,outputs,weights - 用途:构建复杂的有向无环图 (DAG) 模型
import keras
# 函数式 API 示例
inputs = keras.Input(shape=(784,))
x = keras.layers.Dense(128, activation='relu')(inputs)
outputs = keras.layers.Dense(10, activation='softmax')(x)
model = keras.Model(inputs=inputs, outputs=outputs)
2.2 Sequential 类 - 顺序模型
📋 keras.Sequential
简单的层堆叠模型,适用于线性结构
- 主要方法:
add(),build(),summary() - 限制:不支持多输入/多输出、共享层、非线拓扑
- 用途:快速构建简单的前馈网络
from keras import Sequential, layers
model = Sequential([
layers.Input(shape=(784,)),
layers.Dense(128, activation='relu'),
layers.Dropout(0.2),
layers.Dense(10, activation='softmax')
])
2.3 Layer 类 - 层基类
🧱 keras.layers.Layer
所有层的基类,用于创建自定义层
- 主要方法:
build(),call(),add_weight() - 状态:可训练权重 (
trainable_weights) - 用途:创建自定义神经网络层
import keras
import keras.ops as ops
class CustomLayer(keras.layers.Layer):
def __init__(self, units=32):
super().__init__()
self.units = units
def build(self, input_shape):
self.w = self.add_weight(
shape=(input_shape[-1], self.units),
initializer='glorot_uniform',
trainable=True
)
self.b = self.add_weight(
shape=(self.units,),
initializer='zeros',
trainable=True
)
def call(self, inputs):
return ops.matmul(inputs, self.w) + self.b
2.4 Optimizer 类 - 优化器
⚡ keras.optimizers.Optimizer
更新模型权重以最小化损失函数
- 常用优化器:
Adam,SGD,RMSprop,Adagrad - 主要方法:
apply_gradients(),minimize() - 关键参数:
learning_rate,momentum,decay
from keras import optimizers
# Adam 优化器(推荐默认选择)
optimizer = optimizers.Adam(
learning_rate=0.001,
beta_1=0.9,
beta_2=0.999,
epsilon=1e-7
)
# SGD 优化器(带 momentum)
optimizer = optimizers.SGD(
learning_rate=0.01,
momentum=0.9,
nesterov=True
)
2.5 Loss 类 - 损失函数
📉 keras.losses.Loss
计算预测值与真实值之间的差异
- 分类损失:
CategoricalCrossentropy,SparseCategoricalCrossentropy,BinaryCrossentropy - 回归损失:
MSE,MAE,Huber - 自定义损失:继承
Loss类
from keras import losses
# 分类损失
loss_fn = losses.SparseCategoricalCrossentropy()
# 回归损失
loss_fn = losses.MeanSquaredError()
# 自定义损失
class DiceLoss(losses.Loss):
def call(self, y_true, y_pred):
intersection = ops.sum(y_true * y_pred)
union = ops.sum(y_true) + ops.sum(y_pred)
return 1 - (2. * intersection + 1) / (union + 1)
2.6 Metric 类 - 评估指标
📊 keras.metrics.Metric
评估模型性能,不参与训练
- 常用指标:
Accuracy,Precision,Recall,AUC - 主要方法:
update_state(),result(),reset_state() - 与损失区别:指标不用于反向传播
from keras import metrics
# 分类指标
metric = metrics.CategoricalAccuracy()
# 回归指标
metric = metrics.MeanAbsoluteError()
# 多指标评估
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy', 'precision', 'recall']
)
核心类关系图
模型容器
层基类
优化器
损失函数
评估指标
回调函数
三、Keras API 详细参考
深入讲解 Keras 的五大核心模块:Models、Layers、Losses、Optimizers、Metrics。
3.1 Models - 模型模块 📦
Models 是 Keras 的核心,用于组织和训练神经网络。
🌳 Models 类层次结构
keras
基类
顺序模型
函数式模型
自定义模型
📋 Model 类详细 API
| 类别 | 方法/属性 | 说明 | 示例 |
|---|---|---|---|
| 构建 | __init__() |
初始化模型 | super().__init__() |
build(input_shape) |
构建模型权重 | model.build((None, 784)) |
|
add(layer) |
添加层(Sequential) | model.add(Dense(64)) |
|
summary() |
打印模型摘要 | model.summary() |
|
| 编译 | compile() |
配置训练参数 | model.compile(optimizer, loss, metrics) |
compiled_loss |
编译后的损失 | model.compiled_loss |
|
compiled_metrics |
编译后的指标 | model.compiled_metrics |
|
optimizer |
优化器 | model.optimizer |
|
| 训练 | fit() |
训练模型 | model.fit(x, y, epochs=10) |
evaluate() |
评估模型 | model.evaluate(x_test, y_test) |
|
predict() |
模型预测 | model.predict(x) |
|
train_on_batch() |
单批次训练 | model.train_on_batch(x, y) |
|
test_on_batch() |
单批次测试 | model.test_on_batch(x, y) |
|
| 预测 | call(inputs) |
前向传播 | model(x) |
predict_on_batch() |
单批次预测 | model.predict_on_batch(x) |
|
step_function() |
逐步预测 | model.step_function(x) |
|
get_layer() |
获取指定层 | model.get_layer('dense_1') |
|
| 权重 | get_weights() |
获取权重 | weights = model.get_weights() |
set_weights() |
设置权重 | model.set_weights(weights) |
|
save_weights() |
保存权重 | model.save_weights('model.h5') |
|
load_weights() |
加载权重 | model.load_weights('model.h5') |
|
trainable_weights |
可训练权重 | model.trainable_weights |
|
| 配置 | get_config() |
获取配置 | config = model.get_config() |
from_config() |
从配置创建 | Model.from_config(config) |
|
to_json() |
导出 JSON | json_str = model.to_json() |
|
save() |
保存模型 | model.save('model.keras') |
💻 Models 使用示例
import keras
from keras import layers, models
# ========== 1. Sequential 模型 ==========
seq_model = keras.Sequential([
layers.Input(shape=(784,)),
layers.Dense(128, activation='relu', name='dense_1'),
layers.Dropout(0.2),
layers.Dense(10, activation='softmax', name='output')
], name='sequential_model')
# ========== 2. 函数式 API ==========
inputs = keras.Input(shape=(784,), name='input')
x = layers.Dense(128, activation='relu', name='dense_1')(inputs)
x = layers.Dropout(0.2, name='dropout')(x)
outputs = layers.Dense(10, activation='softmax', name='predictions')(x)
func_model = keras.Model(inputs=inputs, outputs=outputs, name='functional_model')
# ========== 3. 自定义模型 ==========
class CustomModel(keras.Model):
def __init__(self, num_classes=10):
super().__init__()
self.dense1 = layers.Dense(128, activation='relu', name='dense_1')
self.dropout = layers.Dropout(0.2, name='dropout')
self.classifier = layers.Dense(num_classes, activation='softmax', name='output')
def call(self, inputs, training=False):
x = self.dense1(inputs)
x = self.dropout(x, training=training)
return self.classifier(x)
custom_model = CustomModel()
# ========== 4. 模型操作 ==========
# 编译
seq_model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy', 'precision', 'recall']
)
# 查看模型结构
seq_model.summary()
seq_model.get_config()
# 获取特定层
dense_layer = seq_model.get_layer('dense_1')
print(f"层权重形状:{dense_layer.get_weights()[0].shape}")
# 保存和加载
seq_model.save('saved_model.keras')
loaded_model = keras.models.load_model('saved_model.keras')
# 权重操作
weights = seq_model.get_weights()
seq_model.set_weights(weights)
3.2 Layers - 层模块 🧱
Layers 是构建神经网络的基本单元,Keras 提供了丰富的内置层。
🌳 Layers 分类体系
基类
📋 核心层详细 API
| 层类型 | 类名 | 关键参数 | 输入形状 | 输出形状 |
|---|---|---|---|---|
| Dense | layers.Dense |
units, activation, use_bias |
(..., input_dim) |
(..., units) |
| Dropout | layers.Dropout |
rate, noise_shape |
任意 |
同输入 |
| Activation | layers.Activation |
activation |
任意 |
同输入 |
| Flatten | layers.Flatten |
data_format |
(..., h, w, c) |
(..., h*w*c) |
| Reshape | layers.Reshape |
target_shape |
任意 |
target_shape |
| BatchNorm | layers.BatchNormalization |
axis, momentum, epsilon |
任意 |
同输入 |
| LayerNorm | layers.LayerNormalization |
axis, epsilon |
任意 |
同输入 |
| Embedding | layers.Embedding |
input_dim, output_dim |
(batch, seq_len) |
(batch, seq_len, dim) |
📋 卷积层详细 API
| 层类型 | 类名 | 关键参数 | 输入形状 | 输出形状 |
|---|---|---|---|---|
| Conv1D | layers.Conv1D |
filters, kernel_size, strides, padding |
(batch, steps, channels) |
(batch, new_steps, filters) |
| Conv2D | layers.Conv2D |
filters, kernel_size, strides, padding |
(batch, h, w, channels) |
(batch, new_h, new_w, filters) |
| Conv3D | layers.Conv3D |
filters, kernel_size, strides |
(batch, d, h, w, c) |
(batch, new_d, new_h, new_w, f) |
| DepthwiseConv2D | layers.DepthwiseConv2D |
kernel_size, depth_multiplier |
(batch, h, w, c) |
(batch, new_h, new_w, c*multiplier) |
| SeparableConv2D | layers.SeparableConv2D |
filters, kernel_size |
(batch, h, w, c) |
(batch, new_h, new_w, filters) |
📋 池化层详细 API
| 层类型 | 类名 | 关键参数 | 功能 |
|---|---|---|---|
| MaxPooling1D | layers.MaxPooling1D |
pool_size, strides |
1D 最大池化 |
| MaxPooling2D | layers.MaxPooling2D |
pool_size, strides |
2D 最大池化 |
| AveragePooling2D | layers.AveragePooling2D |
pool_size, strides |
2D 平均池化 |
| GlobalMaxPooling2D | layers.GlobalMaxPooling2D |
data_format |
全局最大池化 |
| GlobalAveragePooling2D | layers.GlobalAveragePooling2D |
data_format |
全局平均池化 |
📋 RNN 层详细 API
| 层类型 | 类名 | 关键参数 | 特点 |
|---|---|---|---|
| SimpleRNN | layers.SimpleRNN |
units, activation |
基础 RNN,易梯度消失 |
| LSTM | layers.LSTM |
units, return_sequences |
长短期记忆,处理长序列 |
| GRU | layers.GRU |
units, return_sequences |
LSTM 简化版,更快 |
| Bidirectional | layers.Bidirectional |
layer, merge_mode |
双向 RNN |
| RNN | layers.RNN |
cell, return_sequences |
自定义 RNN 单元 |
💻 Layers 使用示例
import keras
from keras import layers
import keras.ops as ops
# ========== 1. 核心层 ==========
x = layers.Input(shape=(100,))
x = layers.Dense(64, activation='relu', kernel_initializer='he_normal')(x)
x = layers.BatchNormalization()(x)
x = layers.Dropout(0.3)(x)
x = layers.Activation('relu')(x)
# ========== 2. 卷积层 ==========
# 1D 卷积(文本/序列)
conv1d = layers.Conv1D(
filters=128,
kernel_size=3,
strides=1,
padding='same',
activation='relu',
kernel_initializer='he_normal'
)
# 2D 卷积(图像)
conv2d = layers.Conv2D(
filters=64,
kernel_size=(3, 3),
strides=(1, 1),
padding='same',
activation='relu',
kernel_regularizer=keras.regularizers.l2(0.01)
)
# 深度可分离卷积
sep_conv = layers.SeparableConv2D(
filters=128,
kernel_size=(3, 3),
padding='same',
depth_multiplier=1
)
# ========== 3. 池化层 ==========
max_pool = layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2))
avg_pool = layers.AveragePooling2D(pool_size=(2, 2))
global_pool = layers.GlobalAveragePooling2D()
# ========== 4. RNN 层 ==========
# LSTM
lstm = layers.LSTM(
units=128,
return_sequences=True,
dropout=0.2,
recurrent_dropout=0.2
)
# 双向 LSTM
bi_lstm = layers.Bidirectional(
layers.LSTM(64, return_sequences=True),
merge_mode='concat'
)
# GRU
gru = layers.GRU(
units=64,
return_sequences=False,
reset_after=True
)
# ========== 5. 自定义层 ==========
class GELU(layers.Layer):
"""GELU 激活函数"""
def __init__(self):
super().__init__()
def call(self, inputs):
return 0.5 * inputs * (1 + ops.tanh(ops.sqrt(2 / ops.pi) * (inputs + 0.044715 * ops.power(inputs, 3))))
# 使用自定义层
x = layers.Dense(128)(inputs)
x = GELU()(x)
# ========== 6. 完整模型示例 ==========
def create_cnn_block(filters, kernel_size=3, pool_size=2):
"""创建可复用的 CNN 块"""
return keras.Sequential([
layers.Conv2D(filters, kernel_size, padding='same', activation='relu'),
layers.BatchNormalization(),
layers.Conv2D(filters, kernel_size, padding='same', activation='relu'),
layers.BatchNormalization(),
layers.MaxPooling2D(pool_size),
layers.Dropout(0.25)
])
# 使用函数创建模型
inputs = layers.Input(shape=(224, 224, 3))
x = create_cnn_block(32)(inputs)
x = create_cnn_block(64)(x)
x = create_cnn_block(128)(x)
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dense(256, activation='relu')(x)
outputs = layers.Dense(10, activation='softmax')(x)
model = keras.Model(inputs, outputs)
3.3 Losses - 损失函数 📉
损失函数衡量模型预测与真实值的差距,指导模型优化方向。
🌳 Losses 分类体系
Crossentropy
Categorical
Crossentropy
📋 损失函数详细 API
| 损失类型 | 类名 | 公式 | 适用场景 |
|---|---|---|---|
| 分类交叉熵 | CategoricalCrossentropy |
-Σ y_true * log(y_pred) |
多分类(one-hot 标签) |
| 稀疏分类交叉熵 | SparseCategoricalCrossentropy |
-log(y_pred[y_true]) |
多分类(整数标签) |
| 二元交叉熵 | BinaryCrossentropy |
-(y*log(p) + (1-y)*log(1-p)) |
二分类/多标签 |
| 均方误差 | MeanSquaredError |
(y_pred - y_true)² |
回归问题 |
| 平均绝对误差 | MeanAbsoluteError |
|y_pred - y_true| |
回归问题(鲁棒) |
| Huber 损失 | Huber |
0.5*e² (|e|≤δ), δ*|e|-0.5*δ² (|e|>δ) |
回归(抗异常值) |
| Hinge 损失 | Hinge |
max(0, 1 - y_true * y_pred) |
SVM 分类 |
| 余弦相似度 | CosineSimilarity |
1 - cos(θ) |
向量相似度 |
💻 Losses 使用示例
import keras
from keras import losses
import keras.ops as ops
# ========== 1. 分类损失 ==========
# 多分类(one-hot 标签)
cce = losses.CategoricalCrossentropy(from_logits=False, label_smoothing=0.1)
y_true = [[0, 1, 0], [0, 0, 1]]
y_pred = [[0.1, 0.8, 0.1], [0.2, 0.3, 0.5]]
loss_value = cce(y_true, y_pred)
# 多分类(整数标签)
sce = losses.SparseCategoricalCrossentropy(from_logits=False)
y_true_int = [1, 2]
loss_value = sce(y_true_int, y_pred)
# 二分类
bce = losses.BinaryCrossentropy(from_logits=False)
y_true_bin = [0, 1]
y_pred_bin = [0.1, 0.9]
loss_value = bce(y_true_bin, y_pred_bin)
# ========== 2. 回归损失 ==========
# MSE
mse = losses.MeanSquaredError()
y_true_reg = [1.0, 2.0, 3.0]
y_pred_reg = [1.1, 1.9, 3.2]
mse_value = mse(y_true_reg, y_pred_reg)
# MAE
mae = losses.MeanAbsoluteError()
mae_value = mae(y_true_reg, y_pred_reg)
# Huber(抗异常值)
huber = losses.Huber(delta=1.0)
huber_value = huber(y_true_reg, y_pred_reg)
# ========== 3. 自定义损失 ==========
class FocalLoss(losses.Loss):
"""Focal Loss - 处理类别不平衡"""
def __init__(self, alpha=0.25, gamma=2.0):
super().__init__()
self.alpha = alpha
self.gamma = gamma
def call(self, y_true, y_pred):
bce = ops.binary_crossentropy(y_true, y_pred)
p_t = y_true * y_pred + (1 - y_true) * (1 - y_pred)
factor = self.alpha * ops.power(1 - p_t, self.gamma)
return ops.mean(factor * bce)
class DiceLoss(losses.Loss):
"""Dice Loss - 图像分割"""
def __init__(self, smooth=1.0):
super().__init__()
self.smooth = smooth
def call(self, y_true, y_pred):
intersection = ops.sum(y_true * y_pred, axis=[1, 2, 3])
union = ops.sum(y_true, axis=[1, 2, 3]) + ops.sum(y_pred, axis=[1, 2, 3])
dice = (2. * intersection + self.smooth) / (union + self.smooth)
return ops.mean(1 - dice)
# 使用自定义损失
model.compile(
optimizer='adam',
loss=FocalLoss(alpha=0.25, gamma=2.0),
metrics=['accuracy']
)
# ========== 4. 损失函数选择指南 ==========
loss_guide = {
'二分类': 'BinaryCrossentropy',
'多分类(one-hot)': 'CategoricalCrossentropy',
'多分类(整数标签)': 'SparseCategoricalCrossentropy',
'回归(正态分布)': 'MeanSquaredError',
'回归(有异常值)': 'Huber 或 MeanAbsoluteError',
'类别不平衡': 'FocalLoss',
'图像分割': 'DiceLoss',
'多标签分类': 'BinaryCrossentropy',
}
3.4 Optimizers - 优化器 ⚡
优化器根据损失函数的梯度更新模型权重,是训练过程的核心。
🌳 Optimizers 分类体系
📋 优化器详细 API
| 优化器 | 类名 | 关键参数 | 特点 | 适用场景 |
|---|---|---|---|---|
| SGD | optimizers.SGD |
lr, momentum, nesterov |
简单,需调参 | CNN、小数据集 |
| Adam | optimizers.Adam |
lr, beta1, beta2, epsilon |
自适应,收敛快 | 默认选择,RNN |
| AdamW | optimizers.AdamW |
lr, weight_decay |
Adam+ 权重衰减 | Transformer |
| RMSprop | optimizers.RMSprop |
lr, rho, epsilon |
适合 RNN | RNN、LSTM |
| Adagrad | optimizers.Adagrad |
lr, epsilon |
适合稀疏数据 | NLP、推荐 |
| Adadelta | optimizers.Adadelta |
lr, rho, epsilon |
无需手动调 lr | 深度网络 |
| Nadam | optimizers.Nadam |
lr, beta1, beta2 |
Adam+Nesterov | 通用 |
💻 Optimizers 使用示例
import keras
from keras import optimizers
from keras.optimizers import schedules
# ========== 1. 基础优化器 ==========
# SGD(带动量)
sgd = optimizers.SGD(
learning_rate=0.01,
momentum=0.9,
nesterov=True,
weight_decay=0.0001
)
# Adam(推荐默认)
adam = optimizers.Adam(
learning_rate=0.001,
beta_1=0.9,
beta_2=0.999,
epsilon=1e-7,
weight_decay=0.01, # Keras 3 支持
amsgrad=False
)
# AdamW(Transformer 推荐)
adamw = optimizers.AdamW(
learning_rate=0.001,
weight_decay=0.05,
beta_1=0.9,
beta_2=0.95
)
# RMSprop(RNN 推荐)
rmsprop = optimizers.RMSprop(
learning_rate=0.001,
rho=0.9,
epsilon=1e-7,
centered=True
)
# ========== 2. 学习率调度器 ==========
# 分段常数衰减
piecewise_lr = schedules.PiecewiseConstantDecay(
boundaries=[5000, 10000],
values=[0.1, 0.01, 0.001]
)
# 指数衰减
exp_lr = schedules.ExponentialDecay(
initial_learning_rate=0.001,
decay_steps=1000,
decay_rate=0.96,
staircase=True
)
# 余弦退火
cosine_lr = schedules.CosineDecay(
initial_learning_rate=0.001,
decay_steps=10000,
alpha=0.0 # 最小学习率比例
)
# 带预热的余弦退火
cosine_restart = schedules.CosineDecayRestarts(
initial_learning_rate=0.001,
t_mul=2.0, # 周期倍数
m_mul=0.9, # 最小 lr 比例
alpha=0.0
)
# 使用调度器
optimizer = optimizers.SGD(learning_rate=piecewise_lr)
# ========== 3. 自定义优化器 ==========
class Lookahead(optimizers.Optimizer):
"""Lookahead 优化器包装器"""
def __init__(self, optimizer, k=5, alpha=0.5):
super().__init__()
self.optimizer = optimizer
self.k = k
self.alpha = alpha
self.step_counter = 0
def update_step(self, gradient, variable):
self.optimizer.update_step(gradient, variable)
self.step_counter += 1
if self.step_counter >= self.k:
# 更新慢权重
for var in self.variables:
var.assign(var + self.alpha * (var - var.fast_weight))
self.step_counter = 0
# ========== 4. 优化器选择指南 ==========
optimizer_guide = {
'CNN 图像分类': 'Adam 或 SGD+Momentum',
'RNN/LSTM': 'Adam 或 RMSprop',
'Transformer': 'AdamW',
'小数据集': 'SGD+Momentum',
'大数据集': 'Adam',
'稀疏数据': 'Adagrad 或 Adam',
'需要泛化性好': 'SGD',
'快速原型': 'Adam',
}
# ========== 5. 完整训练示例 ==========
model.compile(
optimizer=optimizers.AdamW(
learning_rate=schedules.CosineDecay(0.001, 10000),
weight_decay=0.05
),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
3.5 Metrics - 评估指标 📊
评估指标用于衡量模型性能,与损失函数不同,指标不参与梯度计算。
🌳 Metrics 分类体系
📋 指标详细 API
| 指标类型 | 类名 | 公式 | 适用场景 |
|---|---|---|---|
| 准确率 | CategoricalAccuracy |
correct / total |
分类(平衡数据) |
| 精确率 | Precision |
TP / (TP + FP) |
关注查准率 |
| 召回率 | Recall |
TP / (TP + FN) |
关注查全率 |
| F1 分数 | F1Score |
2 * P * R / (P + R) |
平衡查准查全 |
| AUC | AUC |
ROC 曲线下面积 |
不平衡数据 |
| MAE | MeanAbsoluteError |
|y_pred - y_true| |
回归 |
| MSE | MeanSquaredError |
(y_pred - y_true)² |
回归(惩罚大误差) |
| R² | RSquare |
1 - SS_res / SS_tot |
回归拟合度 |
💻 Metrics 使用示例
import keras
from keras import metrics
import numpy as np
# ========== 1. 分类指标 ==========
# 准确率
acc = metrics.CategoricalAccuracy()
acc.update_state(y_true=[[0, 1, 0], [0, 0, 1]],
y_pred=[[0.1, 0.8, 0.1], [0.2, 0.3, 0.5]])
print(f"准确率:{acc.result().numpy():.4f}")
acc.reset_state()
# 稀疏准确率
sparse_acc = metrics.SparseCategoricalAccuracy()
sparse_acc.update_state(y_true=[1, 2],
y_pred=[[0.1, 0.8, 0.1], [0.2, 0.3, 0.5]])
# 精确率、召回率、F1
precision = metrics.Precision()
recall = metrics.Recall()
f1 = metrics.F1Score(num_classes=10)
precision.update_state(y_true, y_pred)
recall.update_state(y_true, y_pred)
print(f"精确率:{precision.result().numpy():.4f}")
print(f"召回率:{recall.result().numpy():.4f}")
# AUC(不平衡数据推荐)
auc = metrics.AUC(curve='ROC')
auc.update_state(y_true, y_pred)
print(f"AUC: {auc.result().numpy():.4f}")
# PR AUC(正样本很少时)
pr_auc = metrics.AUC(curve='PR')
# ========== 2. 回归指标 ==========
# MAE
mae = metrics.MeanAbsoluteError()
mae.update_state([1, 2, 3], [1.1, 1.9, 3.2])
print(f"MAE: {mae.result().numpy():.4f}")
# MSE
mse = metrics.MeanSquaredError()
mse.update_state([1, 2, 3], [1.1, 1.9, 3.2])
print(f"MSE: {mse.result().numpy():.4f}")
# RMSE(自定义)
class RMSE(metrics.MeanSquaredError):
def result(self):
return ops.sqrt(super().result())
rmse = RMSE()
# R²(决定系数)
r2 = metrics.RSquare()
r2.update_state(y_true, y_pred)
print(f"R²: {r2.result().numpy():.4f}")
# ========== 3. 多标签指标 ==========
# 多标签精确率
ml_precision = metrics.Precision(
thresholds=5,
num_thresholds=10,
class_id=None # 所有类别平均
)
# Top-K 准确率
top5_acc = metrics.SparseTopKCategoricalAccuracy(k=5)
# ========== 4. 混淆矩阵 ==========
conf_matrix = metrics.ConfusionMatrix(
num_classes=10,
class_names=['class_0', 'class_1', ...]
)
conf_matrix.update_state(y_true, y_pred)
print(f"混淆矩阵:\n{conf_matrix.result().numpy()}")
# ========== 5. 自定义指标 ==========
class MatthewsCorrelationCoefficient(metrics.Metric):
"""MCC - 平衡分类指标"""
def __init__(self, num_classes=2, name='mcc'):
super().__init__(name=name)
self.num_classes = num_classes
self.conf_matrix = metrics.ConfusionMatrix(num_classes)
def update_state(self, y_true, y_pred, sample_weight=None):
self.conf_matrix.update_state(y_true, y_pred)
def result(self):
cm = self.conf_matrix.result()
# MCC 计算公式
# ... 实现略
return mcc_value
def reset_state(self):
self.conf_matrix.reset_state()
# ========== 6. 模型中使用多个指标 ==========
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=[
'accuracy', # 简写
metrics.Precision(name='precision'),
metrics.Recall(name='recall'),
metrics.AUC(name='auc'),
metrics.F1Score(num_classes=10, name='f1')
]
)
# 训练时查看所有指标
history = model.fit(x_train, y_train, validation_split=0.1)
print(history.history.keys())
# 输出:['loss', 'accuracy', 'precision', 'recall', 'auc', ...]
四、常用层 (Layers) 详解
Keras 提供了丰富的内置层,覆盖各种深度学习任务需求。
3.1 核心层
| 层类型 | 类名 | 参数 | 用途 |
|---|---|---|---|
| 全连接层 | Dense |
units, activation |
标准神经网络层,每个神经元连接所有输入 |
| Dropout 层 | Dropout |
rate |
随机丢弃神经元,防止过拟合 |
| 激活层 | Activation |
activation |
应用激活函数 |
| 输入层 | Input |
shape, dtype |
定义模型输入 |
3.2 卷积层 (CNN)
| 层类型 | 类名 | 参数 | 用途 |
|---|---|---|---|
| 2D 卷积 | Conv2D |
filters, kernel_size, strides |
图像特征提取 |
| 2D 池化 | MaxPooling2D |
pool_size, strides |
下采样,减少参数 |
| 全局池化 | GlobalAveragePooling2D |
- | 将特征图转为向量 |
| BatchNorm | BatchNormalization |
- | 批归一化,加速训练 |
3.3 循环层 (RNN)
| 层类型 | 类名 | 参数 | 用途 |
|---|---|---|---|
| LSTM | LSTM |
units, return_sequences |
长短期记忆,处理序列 |
| GRU | GRU |
units |
门控循环单元,LSTM 简化版 |
| SimpleRNN | SimpleRNN |
units |
基础 RNN 层 |
3.4 嵌入层 (NLP)
| 层类型 | 类名 | 参数 | 用途 |
|---|---|---|---|
| 词嵌入 | Embedding |
input_dim, output_dim |
将词索引转为稠密向量 |
| 位置编码 | PositionalEncoding |
- | Transformer 位置信息 |
层使用示例
from keras import Sequential, layers
# CNN 模型示例
cnn_model = Sequential([
layers.Input(shape=(224, 224, 3)),
# 卷积块 1
layers.Conv2D(32, (3, 3), activation='relu', padding='same'),
layers.BatchNormalization(),
layers.Conv2D(32, (3, 3), activation='relu', padding='same'),
layers.MaxPooling2D((2, 2)),
layers.Dropout(0.25),
# 卷积块 2
layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
layers.BatchNormalization(),
layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
layers.MaxPooling2D((2, 2)),
layers.Dropout(0.25),
# 分类头
layers.GlobalAveragePooling2D(),
layers.Dense(128, activation='relu'),
layers.Dropout(0.5),
layers.Dense(10, activation='softmax')
])
# RNN 模型示例
rnn_model = Sequential([
layers.Input(shape=(None, 128)), # 序列长度可变
layers.LSTM(64, return_sequences=True),
layers.Dropout(0.2),
layers.LSTM(32),
layers.Dense(10, activation='softmax')
])
五、模型构建方法
Keras 提供三种模型构建方式,从简单到复杂,满足不同需求。
5.1 Sequential 模型(最简单)
适用于简单的层堆叠,线性结构。
from keras import Sequential, layers
# 方法 1:直接传入层列表
model = Sequential([
layers.Input(shape=(784,)),
layers.Dense(128, activation='relu'),
layers.Dense(10, activation='softmax')
])
# 方法 2:使用 add() 逐个添加
model = Sequential()
model.add(layers.Input(shape=(784,)))
model.add(layers.Dense(128, activation='relu'))
model.add(layers.Dense(10, activation='softmax'))
model.summary()
5.2 函数式 API(推荐)
适用于复杂结构:多输入/多输出、共享层、残差连接等。
import keras
from keras import layers
# 单输入单输出
inputs = keras.Input(shape=(784,))
x = layers.Dense(128, activation='relu')(inputs)
x = layers.Dropout(0.2)(x)
outputs = layers.Dense(10, activation='softmax')(x)
model = keras.Model(inputs=inputs, outputs=outputs)
# 多输入多输出
input1 = keras.Input(shape=(100,))
input2 = keras.Input(shape=(50,))
x1 = layers.Dense(64, activation='relu')(input1)
x2 = layers.Dense(64, activation='relu')(input2)
# 合并输入
merged = layers.Concatenate()([x1, x2])
output1 = layers.Dense(10, activation='softmax', name='class_output')(merged)
output2 = layers.Dense(1, name='reg_output')(merged)
model = keras.Model(inputs=[input1, input2], outputs=[output1, output2])
5.3 自定义模型(最灵活)
继承 keras.Model 类,完全控制前向传播。
import keras
from keras import layers
import keras.ops as ops
class ResidualBlock(keras.Model):
"""自定义残差块"""
def __init__(self, filters):
super().__init__()
self.conv1 = layers.Conv2D(filters, 3, padding='same', activation='relu')
self.conv2 = layers.Conv2D(filters, 3, padding='same', activation='relu')
self.bn1 = layers.BatchNormalization()
self.bn2 = layers.BatchNormalization()
def call(self, inputs, training=False):
x = self.conv1(inputs)
x = self.bn1(x, training=training)
x = ops.relu(x)
x = self.conv2(x)
x = self.bn2(x, training=training)
# 残差连接
x = ops.add([inputs, x])
return ops.relu(x)
class CustomCNN(keras.Model):
"""自定义 CNN 模型"""
def __init__(self, num_classes=10):
super().__init__()
self.conv1 = layers.Conv2D(32, 3, activation='relu', padding='same')
self.res_block = ResidualBlock(32)
self.pool = layers.GlobalAveragePooling2D()
self.dense = layers.Dense(num_classes, activation='softmax')
def call(self, inputs, training=False):
x = self.conv1(inputs)
x = self.res_block(x, training=training)
x = self.pool(x)
return self.dense(x)
# 使用自定义模型
model = CustomCNN(num_classes=10)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
六、模型训练完整流程
从数据准备到模型评估,掌握完整的训练流程。
6.1 数据准备
import keras
import numpy as np
from keras.datasets import mnist
from keras.utils import to_categorical
# 加载数据
(x_train, y_train), (x_test, y_test) = mnist.load_data()
# 数据预处理
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0
x_train = x_train.reshape(-1, 28, 28, 1)
x_test = x_test.reshape(-1, 28, 28, 1)
# 标签编码
y_train_cat = to_categorical(y_train, 10)
y_test_cat = to_categorical(y_test, 10)
print(f"训练集:{x_train.shape}, 测试集:{x_test.shape}")
6.2 编译模型
from keras import Sequential, layers, optimizers, losses, metrics
# 构建模型
model = Sequential([
layers.Input(shape=(28, 28, 1)),
layers.Conv2D(32, 3, activation='relu'),
layers.MaxPooling2D(),
layers.Conv2D(64, 3, activation='relu'),
layers.MaxPooling2D(),
layers.Flatten(),
layers.Dense(128, activation='relu'),
layers.Dropout(0.5),
layers.Dense(10, activation='softmax')
])
# 编译模型
model.compile(
optimizer=optimizers.Adam(learning_rate=0.001),
loss=losses.SparseCategoricalCrossentropy(),
metrics=[
metrics.Accuracy(name='accuracy'),
metrics.Precision(name='precision'),
metrics.Recall(name='recall')
]
)
6.3 训练模型
from keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau, TensorBoard
# 定义回调
callbacks = [
# 模型检查点
ModelCheckpoint(
'best_model.keras',
monitor='val_loss',
save_best_only=True,
mode='min',
verbose=1
),
# 早停
EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True
),
# 学习率衰减
ReduceLROnPlateau(
monitor='val_loss',
factor=0.5,
patience=5,
min_lr=1e-7,
verbose=1
),
# TensorBoard
TensorBoard(log_dir='./logs')
]
# 训练模型
history = model.fit(
x_train, y_train,
batch_size=128,
epochs=50,
validation_split=0.1,
callbacks=callbacks,
verbose=1
)
print(f"训练完成!最终准确率:{history.history['accuracy'][-1]:.4f}")
6.4 评估和预测
import numpy as np
# 评估模型
test_results = model.evaluate(x_test, y_test, verbose=0)
print(f"测试损失:{test_results[0]:.4f}")
print(f"测试准确率:{test_results[1]:.4f}")
# 预测
predictions = model.predict(x_test[:10])
predicted_classes = np.argmax(predictions, axis=1)
print(f"预测类别:{predicted_classes}")
print(f"真实类别:{y_test[:10]}")
# 详细分类报告
from sklearn.metrics import classification_report
y_pred = model.predict(x_test)
y_pred_classes = np.argmax(y_pred, axis=1)
print(classification_report(y_test, y_pred_classes))
七、高级主题
7.1 迁移学习
import keras
from keras import layers
# 加载预训练模型(不包括顶层)
base_model = keras.applications.ResNet50(
weights='imagenet',
include_top=False,
input_shape=(224, 224, 3)
)
# 冻结基础模型
base_model.trainable = False
# 添加自定义层
inputs = keras.Input(shape=(224, 224, 3))
x = keras.applications.resnet.preprocess_input(inputs)
x = base_model(x, training=False)
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dropout(0.2)(x)
outputs = layers.Dense(100, activation='softmax')(x)
model = keras.Model(inputs, outputs)
# 编译和训练
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# 训练顶层
# model.fit(train_data, train_labels, epochs=10)
# 微调:解冻部分层
base_model.trainable = True
for layer in base_model.layers[:-20]:
layer.trainable = False
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=1e-5),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
7.2 数据增强
from keras import Sequential, layers
# 数据增强层
data_augmentation = Sequential([
layers.RandomFlip('horizontal'),
layers.RandomRotation(0.1),
layers.RandomZoom(0.1),
layers.RandomContrast(0.1),
layers.RandomTranslation(0.1, 0.1),
], name='data_augmentation')
# 在模型中使用
model = Sequential([
layers.Input(shape=(224, 224, 3)),
data_augmentation,
layers.Rescaling(1./255),
layers.Conv2D(32, 3, activation='relu'),
# ... 更多层
])
7.3 自定义训练循环
import keras
import keras.ops as ops
# 获取优化器
optimizer = keras.optimizers.Adam()
loss_fn = keras.losses.SparseCategoricalCrossentropy()
train_acc_metric = keras.metrics.Accuracy()
# 自定义训练步骤
@ops.jit_compile # XLA 加速
def train_step(x_batch, y_batch):
with ops.GradientTape() as tape:
predictions = model(x_batch, training=True)
loss = loss_fn(y_batch, predictions)
gradients = tape.gradient(loss, model.trainable_weights)
optimizer.apply_gradients(zip(gradients, model.trainable_weights))
train_acc_metric.update_state(y_batch, predictions)
return loss
# 训练循环
for epoch in range(epochs):
for x_batch, y_batch in train_dataset:
loss = train_step(x_batch, y_batch)
acc = train_acc_metric.result()
print(f"Epoch {epoch+1}: loss={loss:.4f}, acc={acc:.4f}")
train_acc_metric.reset_states()
7.4 模型导出和部署
# 保存为 Keras 格式
model.save('model.keras')
loaded_model = keras.models.load_model('model.keras')
# 导出为 SavedModel(TensorFlow 部署)
model.export('saved_model/')
# 导出为 ONNX(跨平台部署)
# pip install tf2onnx
import tf2onnx
onnx_model, _ = tf2onnx.convert.from_keras(model)
# 序列化模型架构
config = model.get_config()
json_config = model.to_json()
# 从配置重建模型
reconstructed = keras.Model.from_config(config)
八、Keras 2 vs Keras 3
| 特性 | Keras 2.x | Keras 3.x |
|---|---|---|
| 后端支持 | 仅 TensorFlow | TensorFlow, PyTorch, JAX |
| 导入方式 | from tensorflow import keras |
import keras |
| 输入层 | input_shape 参数 |
Input() 层(推荐) |
| 模型格式 | .h5 |
.keras |
| 导出 API | model.save() |
model.export() |
| ops 模块 | 使用 tf.* |
使用 keras.ops.* |
| XLA 加速 | 手动配置 | 自动启用 |
九、最佳实践
✅ 推荐做法:
- 使用
Input()层明确定义输入 - 优先使用函数式 API 构建模型
- 添加
BatchNormalization加速训练 - 使用
Dropout防止过拟合 - 使用回调函数监控训练过程
- 保存最佳模型权重
- 使用混合精度训练加速(
keras.mixed_precision)
❌ 避免做法:
- 避免在模型中使用硬编码的维度
- 避免过大的学习率导致训练不稳定
- 避免在没有 BatchNorm 时使用 bias
- 避免在测试时忘记设置
training=False
十、实战案例
通过实际项目案例,掌握 Keras 在真实场景中的应用。
📝 案例 1:MNIST 手写数字识别 入门
使用 CNN 识别手写数字,准确率可达 99% 以上。
🌳 模型架构图
28×28×1
32 滤波器
32 滤波器
2×2
0.25
64 滤波器
64 滤波器
2×2
0.25
128 神经元
0.5
10 类别
💻 完整代码
import keras
from keras import Sequential, layers
from keras.datasets import mnist
from keras.callbacks import EarlyStopping, ReduceLROnPlateau
import numpy as np
# 1. 加载和预处理数据
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0
x_train = x_train.reshape(-1, 28, 28, 1)
x_test = x_test.reshape(-1, 28, 28, 1)
# 2. 构建模型
model = Sequential([
layers.Input(shape=(28, 28, 1)),
# 卷积块 1
layers.Conv2D(32, (3, 3), activation='relu', padding='same'),
layers.Conv2D(32, (3, 3), activation='relu', padding='same'),
layers.MaxPooling2D((2, 2)),
layers.Dropout(0.25),
# 卷积块 2
layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
layers.MaxPooling2D((2, 2)),
layers.Dropout(0.25),
# 分类头
layers.Flatten(),
layers.Dense(128, activation='relu'),
layers.Dropout(0.5),
layers.Dense(10, activation='softmax')
])
# 3. 编译模型
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# 4. 训练
callbacks = [
EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5)
]
history = model.fit(
x_train, y_train,
batch_size=128,
epochs=20,
validation_split=0.1,
callbacks=callbacks
)
# 5. 评估
test_loss, test_acc = model.evaluate(x_test, y_test)
print(f"测试准确率:{test_acc:.4f}")
# 6. 预测可视化
predictions = model.predict(x_test[:10])
for i, pred in enumerate(predictions):
print(f"样本{i}: 预测={np.argmax(pred)}, 真实={y_test[i]}")
🎬 案例 2:IMDB 电影评论情感分析 中级
使用 LSTM 处理文本序列,判断评论是正面还是负面。
🌳 模型架构图
序列长度 200
10000×128
0.2
64 单元
0.2
32 神经元
0.3
1 神经元 (sigmoid)
💻 完整代码
import keras
from keras import Sequential, layers
from keras.datasets import imdb
from keras.preprocessing.sequence import pad_sequences
# 1. 加载数据
max_words = 10000
max_len = 200
(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=max_words)
x_train = pad_sequences(x_train, maxlen=max_len)
x_test = pad_sequences(x_test, maxlen=max_len)
# 2. 构建模型
model = Sequential([
layers.Input(shape=(max_len,)),
layers.Embedding(max_words, 128),
layers.SpatialDropout1D(0.2),
layers.LSTM(64, dropout=0.2, recurrent_dropout=0.2),
layers.Dense(32, activation='relu'),
layers.Dropout(0.3),
layers.Dense(1, activation='sigmoid')
])
# 3. 编译和训练
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
history = model.fit(
x_train, y_train,
batch_size=128,
epochs=10,
validation_split=0.1,
callbacks=[
keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
]
)
# 4. 评估
test_loss, test_acc = model.evaluate(x_test, y_test)
print(f"测试准确率:{test_acc:.4f}")
# 5. 预测新评论
def predict_sentiment(text):
# 简化版:实际需要使用分词器
print(f"评论:{text[:50]}...")
return "正面" if model.predict([text])[0][0] > 0.5 else "负面"
print(predict_sentiment("This movie was absolutely amazing!"))
🔄 案例 3:ResNet 迁移学习图像分类 高级
使用预训练 ResNet50 进行图像分类,支持自定义类别。
🔄 训练流程图
💻 完整代码
import keras
from keras import layers, applications, callbacks
import tensorflow as tf
# 1. 数据准备
img_size = 224
batch_size = 32
train_ds = tf.keras.utils.image_dataset_from_directory(
'data/train',
image_size=(img_size, img_size),
batch_size=batch_size
)
val_ds = tf.keras.utils.image_dataset_from_directory(
'data/val',
image_size=(img_size, img_size),
batch_size=batch_size
)
# 数据增强
data_augmentation = keras.Sequential([
layers.RandomFlip('horizontal'),
layers.RandomRotation(0.1),
layers.RandomZoom(0.1),
layers.RandomContrast(0.1),
])
# 2. 构建模型(使用函数式 API)
inputs = keras.Input(shape=(img_size, img_size, 3))
x = data_augmentation(inputs)
x = applications.resnet.preprocess_input(x)
# 加载预训练模型
base_model = applications.ResNet50(
include_top=False,
weights='imagenet',
input_shape=(img_size, img_size, 3)
)
base_model.trainable = False # 冻结
x = base_model(x, training=False)
x = layers.GlobalAveragePooling2D()(x)
x = layers.BatchNormalization()(x)
x = layers.Dense(256, activation='relu')(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(10, activation='softmax')(x) # 10 个类别
model = keras.Model(inputs, outputs)
# 3. 编译模型
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy', keras.metrics.Precision(), keras.metrics.Recall()]
)
# 4. 训练顶层
callbacks_list = [
callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-7),
callbacks.ModelCheckpoint('best_model.keras', save_best_only=True, monitor='val_loss', mode='min')
]
history = model.fit(
train_ds,
validation_data=val_ds,
epochs=20,
callbacks=callbacks_list
)
# 5. 微调:解冻部分层
base_model.trainable = True
for layer in base_model.layers[:-40]:
layer.trainable = False
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=1e-5),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# 继续训练
history_fine = model.fit(
train_ds,
validation_data=val_ds,
epochs=30,
callbacks=callbacks_list
)
# 6. 保存和导出
model.save('final_model.keras')
model.export('saved_model/')
print("模型训练完成并保存!")
📈 案例 4:LSTM 时间序列预测 中级
使用 LSTM 预测股票价格或任何时间序列数据。
💻 完整代码
import keras
from keras import Sequential, layers
import numpy as np
import pandas as pd
# 1. 准备时间序列数据
def create_sequences(data, seq_length=60):
"""创建序列样本"""
X, y = [], []
for i in range(len(data) - seq_length):
X.append(data[i:i+seq_length])
y.append(data[i+seq_length])
return np.array(X), np.array(y)
# 示例:生成模拟数据
np.random.seed(42)
data = np.cumsum(np.random.randn(1000)) + 100 # 随机游走
data = (data - data.mean()) / data.std() # 标准化
# 创建序列
seq_length = 60
X, y = create_sequences(data, seq_length)
# 划分训练测试集
split = int(0.8 * len(X))
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]
# 重塑为 [samples, timesteps, features]
X_train = X_train.reshape(-1, seq_length, 1)
X_test = X_test.reshape(-1, seq_length, 1)
# 2. 构建 LSTM 模型
model = Sequential([
layers.Input(shape=(seq_length, 1)),
layers.LSTM(50, return_sequences=True),
layers.Dropout(0.2),
layers.LSTM(50),
layers.Dropout(0.2),
layers.Dense(25, activation='relu'),
layers.Dense(1) # 回归输出
])
# 3. 编译模型
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss='mse',
metrics=['mae']
)
# 4. 训练
history = model.fit(
X_train, y_train,
batch_size=32,
epochs=50,
validation_split=0.1,
callbacks=[
keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5)
]
)
# 5. 评估
test_loss, test_mae = model.evaluate(X_test, y_test)
print(f"测试 MAE: {test_mae:.4f}")
# 6. 预测
predictions = model.predict(X_test)
# 7. 可视化
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 6))
plt.plot(y_test[:100], label='真实值')
plt.plot(predictions[:100].flatten(), label='预测值', alpha=0.7)
plt.legend()
plt.title('时间序列预测')
plt.show()
🔍 案例 5:自编码器异常检测 高级
使用自编码器检测异常数据点,适用于欺诈检测、故障诊断等场景。
🌳 模型架构图
原始特征
64
32
16
8 维
16
32
64
重构输入
💻 完整代码
import keras
from keras import layers, Model
import numpy as np
from sklearn.preprocessing import StandardScaler
# 1. 准备数据(使用正常数据训练)
np.random.seed(42)
n_samples = 1000
n_features = 20
# 生成正常数据
X_normal = np.random.randn(n_samples, n_features) * 0.5 + 1
# 生成异常数据(用于测试)
X_anomaly = np.random.randn(100, n_features) * 2 + 3
# 标准化
scaler = StandardScaler()
X_normal_scaled = scaler.fit_transform(X_normal)
X_anomaly_scaled = scaler.transform(X_anomaly)
# 2. 构建自编码器
input_dim = X_normal.shape[1]
# 编码器
encoder_input = layers.Input(shape=(input_dim,))
x = layers.Dense(64, activation='relu')(encoder_input)
x = layers.BatchNormalization()(x)
x = layers.Dropout(0.3)(x)
x = layers.Dense(32, activation='relu')(x)
x = layers.BatchNormalization()(x)
x = layers.Dense(16, activation='relu')(x)
encoded = layers.Dense(8, activation='relu')(x) # 瓶颈层
# 解码器
x = layers.Dense(16, activation='relu')(encoded)
x = layers.BatchNormalization()(x)
x = layers.Dense(32, activation='relu')(x)
x = layers.BatchNormalization()(x)
x = layers.Dense(64, activation='relu')(x)
decoded = layers.Dense(input_dim, activation='linear')(x)
# 创建模型
autoencoder = Model(encoder_input, decoded)
autoencoder.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss='mse'
)
autoencoder.summary()
# 3. 训练(只用正常数据)
history = autoencoder.fit(
X_normal_scaled, X_normal_scaled,
batch_size=32,
epochs=100,
validation_split=0.1,
callbacks=[
keras.callbacks.EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True),
keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=10)
]
)
# 4. 计算重构误差
reconstructed_normal = autoencoder.predict(X_normal_scaled)
reconstructed_anomaly = autoencoder.predict(X_anomaly_scaled)
# 计算 MSE 误差
mse_normal = np.mean(np.square(X_normal_scaled - reconstructed_normal), axis=1)
mse_anomaly = np.mean(np.square(X_anomaly_scaled - reconstructed_anomaly), axis=1)
# 5. 设置异常阈值
threshold = np.percentile(mse_normal, 95) # 95% 分位数
print(f"异常阈值:{threshold:.4f}")
# 6. 异常检测
def is_anomaly(mse, threshold):
return mse > threshold
print(f"正常数据误报率:{np.mean(is_anomaly(mse_normal, threshold)):.2%}")
print(f"异常数据检出率:{np.mean(is_anomaly(mse_anomaly, threshold)):.2%}")
# 7. 可视化
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
plt.hist(mse_normal, bins=50, alpha=0.7, label='正常数据')
plt.hist(mse_anomaly, bins=50, alpha=0.7, label='异常数据')
plt.axvline(threshold, color='red', linestyle='--', label=f'阈值:{threshold:.4f}')
plt.xlabel('重构误差 (MSE)')
plt.ylabel('频数')
plt.legend()
plt.title('自编码器异常检测')
plt.show()
十一、模型部署
将训练好的模型部署到生产环境。
11.1 保存和加载
import keras
# 保存整个模型(推荐)
model.save('my_model.keras')
# 加载模型
loaded_model = keras.models.load_model('my_model.keras')
# 保存权重
model.save_weights('model.weights.h5')
# 加载权重
model.load_weights('model.weights.h5')
# 仅保存架构
config = model.get_config()
json_config = model.to_json()
# 从配置重建
reconstructed = keras.Model.from_config(config)
11.2 导出格式对比
| 格式 | 用途 | 优点 | 代码 |
|---|---|---|---|
| .keras | Keras 原生格式 | 完整保存模型和权重 | model.save('model.keras') |
| SavedModel | TensorFlow 部署 | 支持 TF Serving、TF Lite | model.export('saved_model/') |
| ONNX | 跨平台部署 | 支持多种推理引擎 | tf2onnx.convert.from_keras() |
| TorchScript | PyTorch 后端 | PyTorch 生态部署 | model.export('model.pt') |
11.3 TensorFlow Serving 部署
# 1. 导出为 SavedModel
model.export('saved_model/1')
# 2. 使用 Docker 运行 TF Serving
# docker run -p 8501:8501 \
# --mount type=bind,source=$(pwd)/saved_model,target=/models/my_model \
# -e MODEL_NAME=my_model -t tensorflow/serving
# 3. 调用服务
import requests
import json
data = json.dumps({
"signature_name": "serving_default",
"instances": [[0.1, 0.2, 0.3, ...]] # 输入数据
})
response = requests.post(
'http://localhost:8501/v1/models/my_model:predict',
data=data
)
predictions = response.json()['predictions']
11.4 Web 应用部署(Flask)
from flask import Flask, request, jsonify
import keras
import numpy as np
app = Flask(__name__)
# 加载模型
model = keras.models.load_model('my_model.keras')
@app.route('/predict', methods=['POST'])
def predict():
try:
data = request.json['data']
data = np.array(data)
# 预处理
data = data.reshape(1, -1) # 根据模型调整
# 预测
prediction = model.predict(data)[0]
return jsonify({
'prediction': prediction.tolist(),
'class': int(np.argmax(prediction))
})
except Exception as e:
return jsonify({'error': str(e)}), 400
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)
十二、学习资源
📚 官方文档
- Keras 官网 - 最新文档和 API 参考
- TensorFlow 官网 - TensorFlow 后端文档
- PyTorch 官网 - PyTorch 后端文档
🎓 在线课程
- Deep Learning Specialization (Coursera)
- Fast.ai Practical Deep Learning
- TensorFlow Developer Certificate
💻 实战项目
- Kaggle 竞赛 - 实践深度学习
- GitHub 开源项目 - 学习优秀代码
- 个人项目 - 解决实际问题