← 数据分析 AI 第三方库 →

🧠 Keras 3.x 完整教程

Keras 3 是最新的高级神经网络 API,支持多后端(TensorFlow、PyTorch、JAX),让深度学习开发和部署更加灵活。本教程将从零开始,系统讲解 Keras 的架构、核心类和高级应用。

Keras 3 核心特性:

  • 🔄 多后端支持:可切换 TensorFlow、PyTorch 或 JAX 后端
  • 🚀 性能优化:自动启用 XLA 编译加速
  • 🎯 原生支持:支持原生 TensorFlow、PyTorch 和 JAX 工作流
  • 📦 向后兼容:完全兼容 Keras 2.x API
  • 🔧 灵活部署:支持多种模型格式导出

📚 学习路径

1
架构理解
了解 Keras 三层架构
2
核心类
掌握 Model、Layer、Optimizer
3
基础模型
Sequential 和函数式 API
4
模型训练
compile、fit、evaluate
5
高级应用
自定义、回调、部署

一、Keras 架构图

Keras 采用清晰的三层架构设计,从用户应用到后端实现,层次分明,易于理解和扩展。

🏗️ Keras 3 架构总览

📱 应用层 (Applications)
预训练模型 | 示例应用 | 用户项目
🔌 API 层 (Keras API)
Models | Layers | Losses | Optimizers | Metrics
⚙️ TensorFlow 后端
🔥 PyTorch 后端
🔬 JAX 后端

架构层次说明

层次 组件 功能 示例
应用层 预训练模型、示例 直接可用的深度学习应用 keras.applications.VGG16
API 层 Models、Layers、Optimizers 构建和训练模型的核心 API keras.Model, keras.layers.Dense
后端层 TensorFlow/PyTorch/JAX 底层张量运算和自动微分 keras.ops.matmul

二、核心类详解

Keras 的核心类构成了深度学习模型的基础,理解这些类是掌握 Keras 的关键。

2.1 Model 类 - 模型基类

📦 keras.Model

所有模型的基类,支持函数式 API 和自定义模型

  • 主要方法call(), compile(), fit(), evaluate(), predict()
  • 属性layers, inputs, outputs, weights
  • 用途:构建复杂的有向无环图 (DAG) 模型
import keras

# 函数式 API 示例
inputs = keras.Input(shape=(784,))
x = keras.layers.Dense(128, activation='relu')(inputs)
outputs = keras.layers.Dense(10, activation='softmax')(x)
model = keras.Model(inputs=inputs, outputs=outputs)

2.2 Sequential 类 - 顺序模型

📋 keras.Sequential

简单的层堆叠模型,适用于线性结构

  • 主要方法add(), build(), summary()
  • 限制:不支持多输入/多输出、共享层、非线拓扑
  • 用途:快速构建简单的前馈网络
from keras import Sequential, layers

model = Sequential([
    layers.Input(shape=(784,)),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.2),
    layers.Dense(10, activation='softmax')
])

2.3 Layer 类 - 层基类

🧱 keras.layers.Layer

所有层的基类,用于创建自定义层

  • 主要方法build(), call(), add_weight()
  • 状态:可训练权重 (trainable_weights)
  • 用途:创建自定义神经网络层
import keras
import keras.ops as ops

class CustomLayer(keras.layers.Layer):
    def __init__(self, units=32):
        super().__init__()
        self.units = units
    
    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer='glorot_uniform',
            trainable=True
        )
        self.b = self.add_weight(
            shape=(self.units,),
            initializer='zeros',
            trainable=True
        )
    
    def call(self, inputs):
        return ops.matmul(inputs, self.w) + self.b

2.4 Optimizer 类 - 优化器

keras.optimizers.Optimizer

更新模型权重以最小化损失函数

  • 常用优化器Adam, SGD, RMSprop, Adagrad
  • 主要方法apply_gradients(), minimize()
  • 关键参数learning_rate, momentum, decay
from keras import optimizers

# Adam 优化器(推荐默认选择)
optimizer = optimizers.Adam(
    learning_rate=0.001,
    beta_1=0.9,
    beta_2=0.999,
    epsilon=1e-7
)

# SGD 优化器(带 momentum)
optimizer = optimizers.SGD(
    learning_rate=0.01,
    momentum=0.9,
    nesterov=True
)

2.5 Loss 类 - 损失函数

📉 keras.losses.Loss

计算预测值与真实值之间的差异

  • 分类损失CategoricalCrossentropy, SparseCategoricalCrossentropy, BinaryCrossentropy
  • 回归损失MSE, MAE, Huber
  • 自定义损失:继承 Loss
from keras import losses

# 分类损失
loss_fn = losses.SparseCategoricalCrossentropy()

# 回归损失
loss_fn = losses.MeanSquaredError()

# 自定义损失
class DiceLoss(losses.Loss):
    def call(self, y_true, y_pred):
        intersection = ops.sum(y_true * y_pred)
        union = ops.sum(y_true) + ops.sum(y_pred)
        return 1 - (2. * intersection + 1) / (union + 1)

2.6 Metric 类 - 评估指标

📊 keras.metrics.Metric

评估模型性能,不参与训练

  • 常用指标Accuracy, Precision, Recall, AUC
  • 主要方法update_state(), result(), reset_state()
  • 与损失区别:指标不用于反向传播
from keras import metrics

# 分类指标
metric = metrics.CategoricalAccuracy()

# 回归指标
metric = metrics.MeanAbsoluteError()

# 多指标评估
model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy', 'precision', 'recall']
)

核心类关系图

Model
模型容器
Layer
层基类
Optimizer
优化器
↓ 组合使用
Loss
损失函数
Metric
评估指标
Callback
回调函数

三、Keras API 详细参考

深入讲解 Keras 的五大核心模块:Models、Layers、Losses、Optimizers、Metrics。

3.1 Models - 模型模块 📦

Models 是 Keras 的核心,用于组织和训练神经网络。

🌳 Models 类层次结构

Module
keras
Model
基类
Sequential
顺序模型
Functional Model
函数式模型
Custom Model
自定义模型

📋 Model 类详细 API

类别 方法/属性 说明 示例
构建 __init__() 初始化模型 super().__init__()
build(input_shape) 构建模型权重 model.build((None, 784))
add(layer) 添加层(Sequential) model.add(Dense(64))
summary() 打印模型摘要 model.summary()
编译 compile() 配置训练参数 model.compile(optimizer, loss, metrics)
compiled_loss 编译后的损失 model.compiled_loss
compiled_metrics 编译后的指标 model.compiled_metrics
optimizer 优化器 model.optimizer
训练 fit() 训练模型 model.fit(x, y, epochs=10)
evaluate() 评估模型 model.evaluate(x_test, y_test)
predict() 模型预测 model.predict(x)
train_on_batch() 单批次训练 model.train_on_batch(x, y)
test_on_batch() 单批次测试 model.test_on_batch(x, y)
预测 call(inputs) 前向传播 model(x)
predict_on_batch() 单批次预测 model.predict_on_batch(x)
step_function() 逐步预测 model.step_function(x)
get_layer() 获取指定层 model.get_layer('dense_1')
权重 get_weights() 获取权重 weights = model.get_weights()
set_weights() 设置权重 model.set_weights(weights)
save_weights() 保存权重 model.save_weights('model.h5')
load_weights() 加载权重 model.load_weights('model.h5')
trainable_weights 可训练权重 model.trainable_weights
配置 get_config() 获取配置 config = model.get_config()
from_config() 从配置创建 Model.from_config(config)
to_json() 导出 JSON json_str = model.to_json()
save() 保存模型 model.save('model.keras')

💻 Models 使用示例

import keras
from keras import layers, models

# ========== 1. Sequential 模型 ==========
seq_model = keras.Sequential([
    layers.Input(shape=(784,)),
    layers.Dense(128, activation='relu', name='dense_1'),
    layers.Dropout(0.2),
    layers.Dense(10, activation='softmax', name='output')
], name='sequential_model')

# ========== 2. 函数式 API ==========
inputs = keras.Input(shape=(784,), name='input')
x = layers.Dense(128, activation='relu', name='dense_1')(inputs)
x = layers.Dropout(0.2, name='dropout')(x)
outputs = layers.Dense(10, activation='softmax', name='predictions')(x)
func_model = keras.Model(inputs=inputs, outputs=outputs, name='functional_model')

# ========== 3. 自定义模型 ==========
class CustomModel(keras.Model):
    def __init__(self, num_classes=10):
        super().__init__()
        self.dense1 = layers.Dense(128, activation='relu', name='dense_1')
        self.dropout = layers.Dropout(0.2, name='dropout')
        self.classifier = layers.Dense(num_classes, activation='softmax', name='output')
    
    def call(self, inputs, training=False):
        x = self.dense1(inputs)
        x = self.dropout(x, training=training)
        return self.classifier(x)

custom_model = CustomModel()

# ========== 4. 模型操作 ==========
# 编译
seq_model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy', 'precision', 'recall']
)

# 查看模型结构
seq_model.summary()
seq_model.get_config()

# 获取特定层
dense_layer = seq_model.get_layer('dense_1')
print(f"层权重形状:{dense_layer.get_weights()[0].shape}")

# 保存和加载
seq_model.save('saved_model.keras')
loaded_model = keras.models.load_model('saved_model.keras')

# 权重操作
weights = seq_model.get_weights()
seq_model.set_weights(weights)

3.2 Layers - 层模块 🧱

Layers 是构建神经网络的基本单元,Keras 提供了丰富的内置层。

🌳 Layers 分类体系

Layer
基类
核心层
卷积层
池化层
RNN 层
正则化层
Dense
Activation
Dropout
Conv1D
Conv2D
Conv3D
MaxPool1D
MaxPool2D
AvgPool2D
SimpleRNN
LSTM
GRU
BatchNorm
LayerNorm
Dropout

📋 核心层详细 API

层类型 类名 关键参数 输入形状 输出形状
Dense layers.Dense units, activation, use_bias (..., input_dim) (..., units)
Dropout layers.Dropout rate, noise_shape 任意 同输入
Activation layers.Activation activation 任意 同输入
Flatten layers.Flatten data_format (..., h, w, c) (..., h*w*c)
Reshape layers.Reshape target_shape 任意 target_shape
BatchNorm layers.BatchNormalization axis, momentum, epsilon 任意 同输入
LayerNorm layers.LayerNormalization axis, epsilon 任意 同输入
Embedding layers.Embedding input_dim, output_dim (batch, seq_len) (batch, seq_len, dim)

📋 卷积层详细 API

层类型 类名 关键参数 输入形状 输出形状
Conv1D layers.Conv1D filters, kernel_size, strides, padding (batch, steps, channels) (batch, new_steps, filters)
Conv2D layers.Conv2D filters, kernel_size, strides, padding (batch, h, w, channels) (batch, new_h, new_w, filters)
Conv3D layers.Conv3D filters, kernel_size, strides (batch, d, h, w, c) (batch, new_d, new_h, new_w, f)
DepthwiseConv2D layers.DepthwiseConv2D kernel_size, depth_multiplier (batch, h, w, c) (batch, new_h, new_w, c*multiplier)
SeparableConv2D layers.SeparableConv2D filters, kernel_size (batch, h, w, c) (batch, new_h, new_w, filters)

📋 池化层详细 API

层类型 类名 关键参数 功能
MaxPooling1D layers.MaxPooling1D pool_size, strides 1D 最大池化
MaxPooling2D layers.MaxPooling2D pool_size, strides 2D 最大池化
AveragePooling2D layers.AveragePooling2D pool_size, strides 2D 平均池化
GlobalMaxPooling2D layers.GlobalMaxPooling2D data_format 全局最大池化
GlobalAveragePooling2D layers.GlobalAveragePooling2D data_format 全局平均池化

📋 RNN 层详细 API

层类型 类名 关键参数 特点
SimpleRNN layers.SimpleRNN units, activation 基础 RNN,易梯度消失
LSTM layers.LSTM units, return_sequences 长短期记忆,处理长序列
GRU layers.GRU units, return_sequences LSTM 简化版,更快
Bidirectional layers.Bidirectional layer, merge_mode 双向 RNN
RNN layers.RNN cell, return_sequences 自定义 RNN 单元

💻 Layers 使用示例

import keras
from keras import layers
import keras.ops as ops

# ========== 1. 核心层 ==========
x = layers.Input(shape=(100,))
x = layers.Dense(64, activation='relu', kernel_initializer='he_normal')(x)
x = layers.BatchNormalization()(x)
x = layers.Dropout(0.3)(x)
x = layers.Activation('relu')(x)

# ========== 2. 卷积层 ==========
# 1D 卷积(文本/序列)
conv1d = layers.Conv1D(
    filters=128,
    kernel_size=3,
    strides=1,
    padding='same',
    activation='relu',
    kernel_initializer='he_normal'
)

# 2D 卷积(图像)
conv2d = layers.Conv2D(
    filters=64,
    kernel_size=(3, 3),
    strides=(1, 1),
    padding='same',
    activation='relu',
    kernel_regularizer=keras.regularizers.l2(0.01)
)

# 深度可分离卷积
sep_conv = layers.SeparableConv2D(
    filters=128,
    kernel_size=(3, 3),
    padding='same',
    depth_multiplier=1
)

# ========== 3. 池化层 ==========
max_pool = layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2))
avg_pool = layers.AveragePooling2D(pool_size=(2, 2))
global_pool = layers.GlobalAveragePooling2D()

# ========== 4. RNN 层 ==========
# LSTM
lstm = layers.LSTM(
    units=128,
    return_sequences=True,
    dropout=0.2,
    recurrent_dropout=0.2
)

# 双向 LSTM
bi_lstm = layers.Bidirectional(
    layers.LSTM(64, return_sequences=True),
    merge_mode='concat'
)

# GRU
gru = layers.GRU(
    units=64,
    return_sequences=False,
    reset_after=True
)

# ========== 5. 自定义层 ==========
class GELU(layers.Layer):
    """GELU 激活函数"""
    def __init__(self):
        super().__init__()
    
    def call(self, inputs):
        return 0.5 * inputs * (1 + ops.tanh(ops.sqrt(2 / ops.pi) * (inputs + 0.044715 * ops.power(inputs, 3))))

# 使用自定义层
x = layers.Dense(128)(inputs)
x = GELU()(x)

# ========== 6. 完整模型示例 ==========
def create_cnn_block(filters, kernel_size=3, pool_size=2):
    """创建可复用的 CNN 块"""
    return keras.Sequential([
        layers.Conv2D(filters, kernel_size, padding='same', activation='relu'),
        layers.BatchNormalization(),
        layers.Conv2D(filters, kernel_size, padding='same', activation='relu'),
        layers.BatchNormalization(),
        layers.MaxPooling2D(pool_size),
        layers.Dropout(0.25)
    ])

# 使用函数创建模型
inputs = layers.Input(shape=(224, 224, 3))
x = create_cnn_block(32)(inputs)
x = create_cnn_block(64)(x)
x = create_cnn_block(128)(x)
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dense(256, activation='relu')(x)
outputs = layers.Dense(10, activation='softmax')(x)
model = keras.Model(inputs, outputs)

3.3 Losses - 损失函数 📉

损失函数衡量模型预测与真实值的差距,指导模型优化方向。

🌳 Losses 分类体系

Losses
分类损失
回归损失
排序损失
对比损失
Categorical
Crossentropy
Sparse
Categorical
Binary
Crossentropy
MSE
MAE
Huber
Ranking
Contrastive
Triplet

📋 损失函数详细 API

损失类型 类名 公式 适用场景
分类交叉熵 CategoricalCrossentropy -Σ y_true * log(y_pred) 多分类(one-hot 标签)
稀疏分类交叉熵 SparseCategoricalCrossentropy -log(y_pred[y_true]) 多分类(整数标签)
二元交叉熵 BinaryCrossentropy -(y*log(p) + (1-y)*log(1-p)) 二分类/多标签
均方误差 MeanSquaredError (y_pred - y_true)² 回归问题
平均绝对误差 MeanAbsoluteError |y_pred - y_true| 回归问题(鲁棒)
Huber 损失 Huber 0.5*e² (|e|≤δ), δ*|e|-0.5*δ² (|e|>δ) 回归(抗异常值)
Hinge 损失 Hinge max(0, 1 - y_true * y_pred) SVM 分类
余弦相似度 CosineSimilarity 1 - cos(θ) 向量相似度

💻 Losses 使用示例

import keras
from keras import losses
import keras.ops as ops

# ========== 1. 分类损失 ==========
# 多分类(one-hot 标签)
cce = losses.CategoricalCrossentropy(from_logits=False, label_smoothing=0.1)
y_true = [[0, 1, 0], [0, 0, 1]]
y_pred = [[0.1, 0.8, 0.1], [0.2, 0.3, 0.5]]
loss_value = cce(y_true, y_pred)

# 多分类(整数标签)
sce = losses.SparseCategoricalCrossentropy(from_logits=False)
y_true_int = [1, 2]
loss_value = sce(y_true_int, y_pred)

# 二分类
bce = losses.BinaryCrossentropy(from_logits=False)
y_true_bin = [0, 1]
y_pred_bin = [0.1, 0.9]
loss_value = bce(y_true_bin, y_pred_bin)

# ========== 2. 回归损失 ==========
# MSE
mse = losses.MeanSquaredError()
y_true_reg = [1.0, 2.0, 3.0]
y_pred_reg = [1.1, 1.9, 3.2]
mse_value = mse(y_true_reg, y_pred_reg)

# MAE
mae = losses.MeanAbsoluteError()
mae_value = mae(y_true_reg, y_pred_reg)

# Huber(抗异常值)
huber = losses.Huber(delta=1.0)
huber_value = huber(y_true_reg, y_pred_reg)

# ========== 3. 自定义损失 ==========
class FocalLoss(losses.Loss):
    """Focal Loss - 处理类别不平衡"""
    def __init__(self, alpha=0.25, gamma=2.0):
        super().__init__()
        self.alpha = alpha
        self.gamma = gamma
    
    def call(self, y_true, y_pred):
        bce = ops.binary_crossentropy(y_true, y_pred)
        p_t = y_true * y_pred + (1 - y_true) * (1 - y_pred)
        factor = self.alpha * ops.power(1 - p_t, self.gamma)
        return ops.mean(factor * bce)

class DiceLoss(losses.Loss):
    """Dice Loss - 图像分割"""
    def __init__(self, smooth=1.0):
        super().__init__()
        self.smooth = smooth
    
    def call(self, y_true, y_pred):
        intersection = ops.sum(y_true * y_pred, axis=[1, 2, 3])
        union = ops.sum(y_true, axis=[1, 2, 3]) + ops.sum(y_pred, axis=[1, 2, 3])
        dice = (2. * intersection + self.smooth) / (union + self.smooth)
        return ops.mean(1 - dice)

# 使用自定义损失
model.compile(
    optimizer='adam',
    loss=FocalLoss(alpha=0.25, gamma=2.0),
    metrics=['accuracy']
)

# ========== 4. 损失函数选择指南 ==========
loss_guide = {
    '二分类': 'BinaryCrossentropy',
    '多分类(one-hot)': 'CategoricalCrossentropy',
    '多分类(整数标签)': 'SparseCategoricalCrossentropy',
    '回归(正态分布)': 'MeanSquaredError',
    '回归(有异常值)': 'Huber 或 MeanAbsoluteError',
    '类别不平衡': 'FocalLoss',
    '图像分割': 'DiceLoss',
    '多标签分类': 'BinaryCrossentropy',
}

3.4 Optimizers - 优化器 ⚡

优化器根据损失函数的梯度更新模型权重,是训练过程的核心。

🌳 Optimizers 分类体系

Optimizers
一阶优化器
自适应优化器
二阶优化器
SGD
SGD+Momentum
Adam
AdamW
RMSprop
Adagrad
L-BFGS

📋 优化器详细 API

优化器 类名 关键参数 特点 适用场景
SGD optimizers.SGD lr, momentum, nesterov 简单,需调参 CNN、小数据集
Adam optimizers.Adam lr, beta1, beta2, epsilon 自适应,收敛快 默认选择,RNN
AdamW optimizers.AdamW lr, weight_decay Adam+ 权重衰减 Transformer
RMSprop optimizers.RMSprop lr, rho, epsilon 适合 RNN RNN、LSTM
Adagrad optimizers.Adagrad lr, epsilon 适合稀疏数据 NLP、推荐
Adadelta optimizers.Adadelta lr, rho, epsilon 无需手动调 lr 深度网络
Nadam optimizers.Nadam lr, beta1, beta2 Adam+Nesterov 通用

💻 Optimizers 使用示例

import keras
from keras import optimizers
from keras.optimizers import schedules

# ========== 1. 基础优化器 ==========
# SGD(带动量)
sgd = optimizers.SGD(
    learning_rate=0.01,
    momentum=0.9,
    nesterov=True,
    weight_decay=0.0001
)

# Adam(推荐默认)
adam = optimizers.Adam(
    learning_rate=0.001,
    beta_1=0.9,
    beta_2=0.999,
    epsilon=1e-7,
    weight_decay=0.01,  # Keras 3 支持
    amsgrad=False
)

# AdamW(Transformer 推荐)
adamw = optimizers.AdamW(
    learning_rate=0.001,
    weight_decay=0.05,
    beta_1=0.9,
    beta_2=0.95
)

# RMSprop(RNN 推荐)
rmsprop = optimizers.RMSprop(
    learning_rate=0.001,
    rho=0.9,
    epsilon=1e-7,
    centered=True
)

# ========== 2. 学习率调度器 ==========
# 分段常数衰减
piecewise_lr = schedules.PiecewiseConstantDecay(
    boundaries=[5000, 10000],
    values=[0.1, 0.01, 0.001]
)

# 指数衰减
exp_lr = schedules.ExponentialDecay(
    initial_learning_rate=0.001,
    decay_steps=1000,
    decay_rate=0.96,
    staircase=True
)

# 余弦退火
cosine_lr = schedules.CosineDecay(
    initial_learning_rate=0.001,
    decay_steps=10000,
    alpha=0.0  # 最小学习率比例
)

# 带预热的余弦退火
cosine_restart = schedules.CosineDecayRestarts(
    initial_learning_rate=0.001,
    t_mul=2.0,  # 周期倍数
    m_mul=0.9,  # 最小 lr 比例
    alpha=0.0
)

# 使用调度器
optimizer = optimizers.SGD(learning_rate=piecewise_lr)

# ========== 3. 自定义优化器 ==========
class Lookahead(optimizers.Optimizer):
    """Lookahead 优化器包装器"""
    def __init__(self, optimizer, k=5, alpha=0.5):
        super().__init__()
        self.optimizer = optimizer
        self.k = k
        self.alpha = alpha
        self.step_counter = 0
    
    def update_step(self, gradient, variable):
        self.optimizer.update_step(gradient, variable)
        self.step_counter += 1
        
        if self.step_counter >= self.k:
            # 更新慢权重
            for var in self.variables:
                var.assign(var + self.alpha * (var - var.fast_weight))
            self.step_counter = 0

# ========== 4. 优化器选择指南 ==========
optimizer_guide = {
    'CNN 图像分类': 'Adam 或 SGD+Momentum',
    'RNN/LSTM': 'Adam 或 RMSprop',
    'Transformer': 'AdamW',
    '小数据集': 'SGD+Momentum',
    '大数据集': 'Adam',
    '稀疏数据': 'Adagrad 或 Adam',
    '需要泛化性好': 'SGD',
    '快速原型': 'Adam',
}

# ========== 5. 完整训练示例 ==========
model.compile(
    optimizer=optimizers.AdamW(
        learning_rate=schedules.CosineDecay(0.001, 10000),
        weight_decay=0.05
    ),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

3.5 Metrics - 评估指标 📊

评估指标用于衡量模型性能,与损失函数不同,指标不参与梯度计算。

🌳 Metrics 分类体系

Metrics
分类指标
回归指标
排序指标
概率指标
Accuracy
Precision
Recall
F1Score
MAE
MSE
RMSE
AUC
PR AUC

📋 指标详细 API

指标类型 类名 公式 适用场景
准确率 CategoricalAccuracy correct / total 分类(平衡数据)
精确率 Precision TP / (TP + FP) 关注查准率
召回率 Recall TP / (TP + FN) 关注查全率
F1 分数 F1Score 2 * P * R / (P + R) 平衡查准查全
AUC AUC ROC 曲线下面积 不平衡数据
MAE MeanAbsoluteError |y_pred - y_true| 回归
MSE MeanSquaredError (y_pred - y_true)² 回归(惩罚大误差)
RSquare 1 - SS_res / SS_tot 回归拟合度

💻 Metrics 使用示例

import keras
from keras import metrics
import numpy as np

# ========== 1. 分类指标 ==========
# 准确率
acc = metrics.CategoricalAccuracy()
acc.update_state(y_true=[[0, 1, 0], [0, 0, 1]],
                 y_pred=[[0.1, 0.8, 0.1], [0.2, 0.3, 0.5]])
print(f"准确率:{acc.result().numpy():.4f}")
acc.reset_state()

# 稀疏准确率
sparse_acc = metrics.SparseCategoricalAccuracy()
sparse_acc.update_state(y_true=[1, 2],
                        y_pred=[[0.1, 0.8, 0.1], [0.2, 0.3, 0.5]])

# 精确率、召回率、F1
precision = metrics.Precision()
recall = metrics.Recall()
f1 = metrics.F1Score(num_classes=10)

precision.update_state(y_true, y_pred)
recall.update_state(y_true, y_pred)

print(f"精确率:{precision.result().numpy():.4f}")
print(f"召回率:{recall.result().numpy():.4f}")

# AUC(不平衡数据推荐)
auc = metrics.AUC(curve='ROC')
auc.update_state(y_true, y_pred)
print(f"AUC: {auc.result().numpy():.4f}")

# PR AUC(正样本很少时)
pr_auc = metrics.AUC(curve='PR')

# ========== 2. 回归指标 ==========
# MAE
mae = metrics.MeanAbsoluteError()
mae.update_state([1, 2, 3], [1.1, 1.9, 3.2])
print(f"MAE: {mae.result().numpy():.4f}")

# MSE
mse = metrics.MeanSquaredError()
mse.update_state([1, 2, 3], [1.1, 1.9, 3.2])
print(f"MSE: {mse.result().numpy():.4f}")

# RMSE(自定义)
class RMSE(metrics.MeanSquaredError):
    def result(self):
        return ops.sqrt(super().result())

rmse = RMSE()

# R²(决定系数)
r2 = metrics.RSquare()
r2.update_state(y_true, y_pred)
print(f"R²: {r2.result().numpy():.4f}")

# ========== 3. 多标签指标 ==========
# 多标签精确率
ml_precision = metrics.Precision(
    thresholds=5,
    num_thresholds=10,
    class_id=None  # 所有类别平均
)

# Top-K 准确率
top5_acc = metrics.SparseTopKCategoricalAccuracy(k=5)

# ========== 4. 混淆矩阵 ==========
conf_matrix = metrics.ConfusionMatrix(
    num_classes=10,
    class_names=['class_0', 'class_1', ...]
)
conf_matrix.update_state(y_true, y_pred)
print(f"混淆矩阵:\n{conf_matrix.result().numpy()}")

# ========== 5. 自定义指标 ==========
class MatthewsCorrelationCoefficient(metrics.Metric):
    """MCC - 平衡分类指标"""
    def __init__(self, num_classes=2, name='mcc'):
        super().__init__(name=name)
        self.num_classes = num_classes
        self.conf_matrix = metrics.ConfusionMatrix(num_classes)
    
    def update_state(self, y_true, y_pred, sample_weight=None):
        self.conf_matrix.update_state(y_true, y_pred)
    
    def result(self):
        cm = self.conf_matrix.result()
        # MCC 计算公式
        # ... 实现略
        return mcc_value
    
    def reset_state(self):
        self.conf_matrix.reset_state()

# ========== 6. 模型中使用多个指标 ==========
model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=[
        'accuracy',  # 简写
        metrics.Precision(name='precision'),
        metrics.Recall(name='recall'),
        metrics.AUC(name='auc'),
        metrics.F1Score(num_classes=10, name='f1')
    ]
)

# 训练时查看所有指标
history = model.fit(x_train, y_train, validation_split=0.1)
print(history.history.keys())
# 输出:['loss', 'accuracy', 'precision', 'recall', 'auc', ...]

四、常用层 (Layers) 详解

Keras 提供了丰富的内置层,覆盖各种深度学习任务需求。

3.1 核心层

层类型 类名 参数 用途
全连接层 Dense units, activation 标准神经网络层,每个神经元连接所有输入
Dropout 层 Dropout rate 随机丢弃神经元,防止过拟合
激活层 Activation activation 应用激活函数
输入层 Input shape, dtype 定义模型输入

3.2 卷积层 (CNN)

层类型 类名 参数 用途
2D 卷积 Conv2D filters, kernel_size, strides 图像特征提取
2D 池化 MaxPooling2D pool_size, strides 下采样,减少参数
全局池化 GlobalAveragePooling2D - 将特征图转为向量
BatchNorm BatchNormalization - 批归一化,加速训练

3.3 循环层 (RNN)

层类型 类名 参数 用途
LSTM LSTM units, return_sequences 长短期记忆,处理序列
GRU GRU units 门控循环单元,LSTM 简化版
SimpleRNN SimpleRNN units 基础 RNN 层

3.4 嵌入层 (NLP)

层类型 类名 参数 用途
词嵌入 Embedding input_dim, output_dim 将词索引转为稠密向量
位置编码 PositionalEncoding - Transformer 位置信息

层使用示例

from keras import Sequential, layers

# CNN 模型示例
cnn_model = Sequential([
    layers.Input(shape=(224, 224, 3)),
    
    # 卷积块 1
    layers.Conv2D(32, (3, 3), activation='relu', padding='same'),
    layers.BatchNormalization(),
    layers.Conv2D(32, (3, 3), activation='relu', padding='same'),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.25),
    
    # 卷积块 2
    layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
    layers.BatchNormalization(),
    layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.25),
    
    # 分类头
    layers.GlobalAveragePooling2D(),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(10, activation='softmax')
])

# RNN 模型示例
rnn_model = Sequential([
    layers.Input(shape=(None, 128)),  # 序列长度可变
    layers.LSTM(64, return_sequences=True),
    layers.Dropout(0.2),
    layers.LSTM(32),
    layers.Dense(10, activation='softmax')
])

五、模型构建方法

Keras 提供三种模型构建方式,从简单到复杂,满足不同需求。

5.1 Sequential 模型(最简单)

适用于简单的层堆叠,线性结构。

from keras import Sequential, layers

# 方法 1:直接传入层列表
model = Sequential([
    layers.Input(shape=(784,)),
    layers.Dense(128, activation='relu'),
    layers.Dense(10, activation='softmax')
])

# 方法 2:使用 add() 逐个添加
model = Sequential()
model.add(layers.Input(shape=(784,)))
model.add(layers.Dense(128, activation='relu'))
model.add(layers.Dense(10, activation='softmax'))

model.summary()

5.2 函数式 API(推荐)

适用于复杂结构:多输入/多输出、共享层、残差连接等。

import keras
from keras import layers

# 单输入单输出
inputs = keras.Input(shape=(784,))
x = layers.Dense(128, activation='relu')(inputs)
x = layers.Dropout(0.2)(x)
outputs = layers.Dense(10, activation='softmax')(x)
model = keras.Model(inputs=inputs, outputs=outputs)

# 多输入多输出
input1 = keras.Input(shape=(100,))
input2 = keras.Input(shape=(50,))

x1 = layers.Dense(64, activation='relu')(input1)
x2 = layers.Dense(64, activation='relu')(input2)

# 合并输入
merged = layers.Concatenate()([x1, x2])
output1 = layers.Dense(10, activation='softmax', name='class_output')(merged)
output2 = layers.Dense(1, name='reg_output')(merged)

model = keras.Model(inputs=[input1, input2], outputs=[output1, output2])

5.3 自定义模型(最灵活)

继承 keras.Model 类,完全控制前向传播。

import keras
from keras import layers
import keras.ops as ops

class ResidualBlock(keras.Model):
    """自定义残差块"""
    def __init__(self, filters):
        super().__init__()
        self.conv1 = layers.Conv2D(filters, 3, padding='same', activation='relu')
        self.conv2 = layers.Conv2D(filters, 3, padding='same', activation='relu')
        self.bn1 = layers.BatchNormalization()
        self.bn2 = layers.BatchNormalization()
    
    def call(self, inputs, training=False):
        x = self.conv1(inputs)
        x = self.bn1(x, training=training)
        x = ops.relu(x)
        
        x = self.conv2(x)
        x = self.bn2(x, training=training)
        
        # 残差连接
        x = ops.add([inputs, x])
        return ops.relu(x)

class CustomCNN(keras.Model):
    """自定义 CNN 模型"""
    def __init__(self, num_classes=10):
        super().__init__()
        self.conv1 = layers.Conv2D(32, 3, activation='relu', padding='same')
        self.res_block = ResidualBlock(32)
        self.pool = layers.GlobalAveragePooling2D()
        self.dense = layers.Dense(num_classes, activation='softmax')
    
    def call(self, inputs, training=False):
        x = self.conv1(inputs)
        x = self.res_block(x, training=training)
        x = self.pool(x)
        return self.dense(x)

# 使用自定义模型
model = CustomCNN(num_classes=10)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

六、模型训练完整流程

从数据准备到模型评估,掌握完整的训练流程。

6.1 数据准备

import keras
import numpy as np
from keras.datasets import mnist
from keras.utils import to_categorical

# 加载数据
(x_train, y_train), (x_test, y_test) = mnist.load_data()

# 数据预处理
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0
x_train = x_train.reshape(-1, 28, 28, 1)
x_test = x_test.reshape(-1, 28, 28, 1)

# 标签编码
y_train_cat = to_categorical(y_train, 10)
y_test_cat = to_categorical(y_test, 10)

print(f"训练集:{x_train.shape}, 测试集:{x_test.shape}")

6.2 编译模型

from keras import Sequential, layers, optimizers, losses, metrics

# 构建模型
model = Sequential([
    layers.Input(shape=(28, 28, 1)),
    layers.Conv2D(32, 3, activation='relu'),
    layers.MaxPooling2D(),
    layers.Conv2D(64, 3, activation='relu'),
    layers.MaxPooling2D(),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(10, activation='softmax')
])

# 编译模型
model.compile(
    optimizer=optimizers.Adam(learning_rate=0.001),
    loss=losses.SparseCategoricalCrossentropy(),
    metrics=[
        metrics.Accuracy(name='accuracy'),
        metrics.Precision(name='precision'),
        metrics.Recall(name='recall')
    ]
)

6.3 训练模型

from keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau, TensorBoard

# 定义回调
callbacks = [
    # 模型检查点
    ModelCheckpoint(
        'best_model.keras',
        monitor='val_loss',
        save_best_only=True,
        mode='min',
        verbose=1
    ),
    # 早停
    EarlyStopping(
        monitor='val_loss',
        patience=10,
        restore_best_weights=True
    ),
    # 学习率衰减
    ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=5,
        min_lr=1e-7,
        verbose=1
    ),
    # TensorBoard
    TensorBoard(log_dir='./logs')
]

# 训练模型
history = model.fit(
    x_train, y_train,
    batch_size=128,
    epochs=50,
    validation_split=0.1,
    callbacks=callbacks,
    verbose=1
)

print(f"训练完成!最终准确率:{history.history['accuracy'][-1]:.4f}")

6.4 评估和预测

import numpy as np

# 评估模型
test_results = model.evaluate(x_test, y_test, verbose=0)
print(f"测试损失:{test_results[0]:.4f}")
print(f"测试准确率:{test_results[1]:.4f}")

# 预测
predictions = model.predict(x_test[:10])
predicted_classes = np.argmax(predictions, axis=1)

print(f"预测类别:{predicted_classes}")
print(f"真实类别:{y_test[:10]}")

# 详细分类报告
from sklearn.metrics import classification_report
y_pred = model.predict(x_test)
y_pred_classes = np.argmax(y_pred, axis=1)
print(classification_report(y_test, y_pred_classes))

七、高级主题

7.1 迁移学习

import keras
from keras import layers

# 加载预训练模型(不包括顶层)
base_model = keras.applications.ResNet50(
    weights='imagenet',
    include_top=False,
    input_shape=(224, 224, 3)
)

# 冻结基础模型
base_model.trainable = False

# 添加自定义层
inputs = keras.Input(shape=(224, 224, 3))
x = keras.applications.resnet.preprocess_input(inputs)
x = base_model(x, training=False)
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dropout(0.2)(x)
outputs = layers.Dense(100, activation='softmax')(x)

model = keras.Model(inputs, outputs)

# 编译和训练
model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

# 训练顶层
# model.fit(train_data, train_labels, epochs=10)

# 微调:解冻部分层
base_model.trainable = True
for layer in base_model.layers[:-20]:
    layer.trainable = False

model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=1e-5),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

7.2 数据增强

from keras import Sequential, layers

# 数据增强层
data_augmentation = Sequential([
    layers.RandomFlip('horizontal'),
    layers.RandomRotation(0.1),
    layers.RandomZoom(0.1),
    layers.RandomContrast(0.1),
    layers.RandomTranslation(0.1, 0.1),
], name='data_augmentation')

# 在模型中使用
model = Sequential([
    layers.Input(shape=(224, 224, 3)),
    data_augmentation,
    layers.Rescaling(1./255),
    layers.Conv2D(32, 3, activation='relu'),
    # ... 更多层
])

7.3 自定义训练循环

import keras
import keras.ops as ops

# 获取优化器
optimizer = keras.optimizers.Adam()
loss_fn = keras.losses.SparseCategoricalCrossentropy()
train_acc_metric = keras.metrics.Accuracy()

# 自定义训练步骤
@ops.jit_compile  # XLA 加速
def train_step(x_batch, y_batch):
    with ops.GradientTape() as tape:
        predictions = model(x_batch, training=True)
        loss = loss_fn(y_batch, predictions)
    
    gradients = tape.gradient(loss, model.trainable_weights)
    optimizer.apply_gradients(zip(gradients, model.trainable_weights))
    train_acc_metric.update_state(y_batch, predictions)
    return loss

# 训练循环
for epoch in range(epochs):
    for x_batch, y_batch in train_dataset:
        loss = train_step(x_batch, y_batch)
    
    acc = train_acc_metric.result()
    print(f"Epoch {epoch+1}: loss={loss:.4f}, acc={acc:.4f}")
    train_acc_metric.reset_states()

7.4 模型导出和部署

# 保存为 Keras 格式
model.save('model.keras')
loaded_model = keras.models.load_model('model.keras')

# 导出为 SavedModel(TensorFlow 部署)
model.export('saved_model/')

# 导出为 ONNX(跨平台部署)
# pip install tf2onnx
import tf2onnx
onnx_model, _ = tf2onnx.convert.from_keras(model)

# 序列化模型架构
config = model.get_config()
json_config = model.to_json()

# 从配置重建模型
reconstructed = keras.Model.from_config(config)

八、Keras 2 vs Keras 3

特性 Keras 2.x Keras 3.x
后端支持 仅 TensorFlow TensorFlow, PyTorch, JAX
导入方式 from tensorflow import keras import keras
输入层 input_shape 参数 Input() 层(推荐)
模型格式 .h5 .keras
导出 API model.save() model.export()
ops 模块 使用 tf.* 使用 keras.ops.*
XLA 加速 手动配置 自动启用

九、最佳实践

✅ 推荐做法:

  • 使用 Input() 层明确定义输入
  • 优先使用函数式 API 构建模型
  • 添加 BatchNormalization 加速训练
  • 使用 Dropout 防止过拟合
  • 使用回调函数监控训练过程
  • 保存最佳模型权重
  • 使用混合精度训练加速(keras.mixed_precision

❌ 避免做法:

  • 避免在模型中使用硬编码的维度
  • 避免过大的学习率导致训练不稳定
  • 避免在没有 BatchNorm 时使用 bias
  • 避免在测试时忘记设置 training=False

十、实战案例

通过实际项目案例,掌握 Keras 在真实场景中的应用。

📝 案例 1:MNIST 手写数字识别 入门

使用 CNN 识别手写数字,准确率可达 99% 以上。

🌳 模型架构图
输入层
28×28×1
Conv2D
32 滤波器
Conv2D
32 滤波器
MaxPooling
2×2
Dropout
0.25
Conv2D
64 滤波器
Conv2D
64 滤波器
MaxPooling
2×2
Dropout
0.25
Flatten
Dense
128 神经元
Dropout
0.5
输出层
10 类别
💻 完整代码
import keras
from keras import Sequential, layers
from keras.datasets import mnist
from keras.callbacks import EarlyStopping, ReduceLROnPlateau
import numpy as np

# 1. 加载和预处理数据
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0
x_train = x_train.reshape(-1, 28, 28, 1)
x_test = x_test.reshape(-1, 28, 28, 1)

# 2. 构建模型
model = Sequential([
    layers.Input(shape=(28, 28, 1)),
    
    # 卷积块 1
    layers.Conv2D(32, (3, 3), activation='relu', padding='same'),
    layers.Conv2D(32, (3, 3), activation='relu', padding='same'),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.25),
    
    # 卷积块 2
    layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
    layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.25),
    
    # 分类头
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(10, activation='softmax')
])

# 3. 编译模型
model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

# 4. 训练
callbacks = [
    EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
    ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5)
]

history = model.fit(
    x_train, y_train,
    batch_size=128,
    epochs=20,
    validation_split=0.1,
    callbacks=callbacks
)

# 5. 评估
test_loss, test_acc = model.evaluate(x_test, y_test)
print(f"测试准确率:{test_acc:.4f}")

# 6. 预测可视化
predictions = model.predict(x_test[:10])
for i, pred in enumerate(predictions):
    print(f"样本{i}: 预测={np.argmax(pred)}, 真实={y_test[i]}")

🎬 案例 2:IMDB 电影评论情感分析 中级

使用 LSTM 处理文本序列,判断评论是正面还是负面。

🌳 模型架构图
输入层
序列长度 200
Embedding
10000×128
SpatialDropout1D
0.2
LSTM
64 单元
Dropout
0.2
Dense
32 神经元
Dropout
0.3
输出层
1 神经元 (sigmoid)
💻 完整代码
import keras
from keras import Sequential, layers
from keras.datasets import imdb
from keras.preprocessing.sequence import pad_sequences

# 1. 加载数据
max_words = 10000
max_len = 200

(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=max_words)
x_train = pad_sequences(x_train, maxlen=max_len)
x_test = pad_sequences(x_test, maxlen=max_len)

# 2. 构建模型
model = Sequential([
    layers.Input(shape=(max_len,)),
    layers.Embedding(max_words, 128),
    layers.SpatialDropout1D(0.2),
    layers.LSTM(64, dropout=0.2, recurrent_dropout=0.2),
    layers.Dense(32, activation='relu'),
    layers.Dropout(0.3),
    layers.Dense(1, activation='sigmoid')
])

# 3. 编译和训练
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

history = model.fit(
    x_train, y_train,
    batch_size=128,
    epochs=10,
    validation_split=0.1,
    callbacks=[
        keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
    ]
)

# 4. 评估
test_loss, test_acc = model.evaluate(x_test, y_test)
print(f"测试准确率:{test_acc:.4f}")

# 5. 预测新评论
def predict_sentiment(text):
    # 简化版:实际需要使用分词器
    print(f"评论:{text[:50]}...")
    return "正面" if model.predict([text])[0][0] > 0.5 else "负面"

print(predict_sentiment("This movie was absolutely amazing!"))

🔄 案例 3:ResNet 迁移学习图像分类 高级

使用预训练 ResNet50 进行图像分类,支持自定义类别。

🔄 训练流程图
开始
加载预训练 ResNet50
冻结基础模型
添加自定义分类头
训练顶层?
解冻部分层
微调训练
保存模型
💻 完整代码
import keras
from keras import layers, applications, callbacks
import tensorflow as tf

# 1. 数据准备
img_size = 224
batch_size = 32

train_ds = tf.keras.utils.image_dataset_from_directory(
    'data/train',
    image_size=(img_size, img_size),
    batch_size=batch_size
)

val_ds = tf.keras.utils.image_dataset_from_directory(
    'data/val',
    image_size=(img_size, img_size),
    batch_size=batch_size
)

# 数据增强
data_augmentation = keras.Sequential([
    layers.RandomFlip('horizontal'),
    layers.RandomRotation(0.1),
    layers.RandomZoom(0.1),
    layers.RandomContrast(0.1),
])

# 2. 构建模型(使用函数式 API)
inputs = keras.Input(shape=(img_size, img_size, 3))
x = data_augmentation(inputs)
x = applications.resnet.preprocess_input(x)

# 加载预训练模型
base_model = applications.ResNet50(
    include_top=False,
    weights='imagenet',
    input_shape=(img_size, img_size, 3)
)
base_model.trainable = False  # 冻结

x = base_model(x, training=False)
x = layers.GlobalAveragePooling2D()(x)
x = layers.BatchNormalization()(x)
x = layers.Dense(256, activation='relu')(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(10, activation='softmax')(x)  # 10 个类别

model = keras.Model(inputs, outputs)

# 3. 编译模型
model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy', keras.metrics.Precision(), keras.metrics.Recall()]
)

# 4. 训练顶层
callbacks_list = [
    callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
    callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-7),
    callbacks.ModelCheckpoint('best_model.keras', save_best_only=True, monitor='val_loss', mode='min')
]

history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=20,
    callbacks=callbacks_list
)

# 5. 微调:解冻部分层
base_model.trainable = True
for layer in base_model.layers[:-40]:
    layer.trainable = False

model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=1e-5),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

# 继续训练
history_fine = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=30,
    callbacks=callbacks_list
)

# 6. 保存和导出
model.save('final_model.keras')
model.export('saved_model/')
print("模型训练完成并保存!")

📈 案例 4:LSTM 时间序列预测 中级

使用 LSTM 预测股票价格或任何时间序列数据。

💻 完整代码
import keras
from keras import Sequential, layers
import numpy as np
import pandas as pd

# 1. 准备时间序列数据
def create_sequences(data, seq_length=60):
    """创建序列样本"""
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i+seq_length])
        y.append(data[i+seq_length])
    return np.array(X), np.array(y)

# 示例:生成模拟数据
np.random.seed(42)
data = np.cumsum(np.random.randn(1000)) + 100  # 随机游走
data = (data - data.mean()) / data.std()  # 标准化

# 创建序列
seq_length = 60
X, y = create_sequences(data, seq_length)

# 划分训练测试集
split = int(0.8 * len(X))
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]

# 重塑为 [samples, timesteps, features]
X_train = X_train.reshape(-1, seq_length, 1)
X_test = X_test.reshape(-1, seq_length, 1)

# 2. 构建 LSTM 模型
model = Sequential([
    layers.Input(shape=(seq_length, 1)),
    layers.LSTM(50, return_sequences=True),
    layers.Dropout(0.2),
    layers.LSTM(50),
    layers.Dropout(0.2),
    layers.Dense(25, activation='relu'),
    layers.Dense(1)  # 回归输出
])

# 3. 编译模型
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.001),
    loss='mse',
    metrics=['mae']
)

# 4. 训练
history = model.fit(
    X_train, y_train,
    batch_size=32,
    epochs=50,
    validation_split=0.1,
    callbacks=[
        keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
        keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5)
    ]
)

# 5. 评估
test_loss, test_mae = model.evaluate(X_test, y_test)
print(f"测试 MAE: {test_mae:.4f}")

# 6. 预测
predictions = model.predict(X_test)

# 7. 可视化
import matplotlib.pyplot as plt

plt.figure(figsize=(12, 6))
plt.plot(y_test[:100], label='真实值')
plt.plot(predictions[:100].flatten(), label='预测值', alpha=0.7)
plt.legend()
plt.title('时间序列预测')
plt.show()

🔍 案例 5:自编码器异常检测 高级

使用自编码器检测异常数据点,适用于欺诈检测、故障诊断等场景。

🌳 模型架构图
输入层
原始特征
↓ 编码器
Dense
64
Dense
32
Dense
16
↓ 瓶颈
潜在空间
8 维
↓ 解码器
Dense
16
Dense
32
Dense
64
输出层
重构输入
💻 完整代码
import keras
from keras import layers, Model
import numpy as np
from sklearn.preprocessing import StandardScaler

# 1. 准备数据(使用正常数据训练)
np.random.seed(42)
n_samples = 1000
n_features = 20

# 生成正常数据
X_normal = np.random.randn(n_samples, n_features) * 0.5 + 1

# 生成异常数据(用于测试)
X_anomaly = np.random.randn(100, n_features) * 2 + 3

# 标准化
scaler = StandardScaler()
X_normal_scaled = scaler.fit_transform(X_normal)
X_anomaly_scaled = scaler.transform(X_anomaly)

# 2. 构建自编码器
input_dim = X_normal.shape[1]

# 编码器
encoder_input = layers.Input(shape=(input_dim,))
x = layers.Dense(64, activation='relu')(encoder_input)
x = layers.BatchNormalization()(x)
x = layers.Dropout(0.3)(x)
x = layers.Dense(32, activation='relu')(x)
x = layers.BatchNormalization()(x)
x = layers.Dense(16, activation='relu')(x)
encoded = layers.Dense(8, activation='relu')(x)  # 瓶颈层

# 解码器
x = layers.Dense(16, activation='relu')(encoded)
x = layers.BatchNormalization()(x)
x = layers.Dense(32, activation='relu')(x)
x = layers.BatchNormalization()(x)
x = layers.Dense(64, activation='relu')(x)
decoded = layers.Dense(input_dim, activation='linear')(x)

# 创建模型
autoencoder = Model(encoder_input, decoded)
autoencoder.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.001),
    loss='mse'
)

autoencoder.summary()

# 3. 训练(只用正常数据)
history = autoencoder.fit(
    X_normal_scaled, X_normal_scaled,
    batch_size=32,
    epochs=100,
    validation_split=0.1,
    callbacks=[
        keras.callbacks.EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True),
        keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=10)
    ]
)

# 4. 计算重构误差
reconstructed_normal = autoencoder.predict(X_normal_scaled)
reconstructed_anomaly = autoencoder.predict(X_anomaly_scaled)

# 计算 MSE 误差
mse_normal = np.mean(np.square(X_normal_scaled - reconstructed_normal), axis=1)
mse_anomaly = np.mean(np.square(X_anomaly_scaled - reconstructed_anomaly), axis=1)

# 5. 设置异常阈值
threshold = np.percentile(mse_normal, 95)  # 95% 分位数
print(f"异常阈值:{threshold:.4f}")

# 6. 异常检测
def is_anomaly(mse, threshold):
    return mse > threshold

print(f"正常数据误报率:{np.mean(is_anomaly(mse_normal, threshold)):.2%}")
print(f"异常数据检出率:{np.mean(is_anomaly(mse_anomaly, threshold)):.2%}")

# 7. 可视化
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 6))
plt.hist(mse_normal, bins=50, alpha=0.7, label='正常数据')
plt.hist(mse_anomaly, bins=50, alpha=0.7, label='异常数据')
plt.axvline(threshold, color='red', linestyle='--', label=f'阈值:{threshold:.4f}')
plt.xlabel('重构误差 (MSE)')
plt.ylabel('频数')
plt.legend()
plt.title('自编码器异常检测')
plt.show()

十一、模型部署

将训练好的模型部署到生产环境。

11.1 保存和加载

import keras

# 保存整个模型(推荐)
model.save('my_model.keras')

# 加载模型
loaded_model = keras.models.load_model('my_model.keras')

# 保存权重
model.save_weights('model.weights.h5')

# 加载权重
model.load_weights('model.weights.h5')

# 仅保存架构
config = model.get_config()
json_config = model.to_json()

# 从配置重建
reconstructed = keras.Model.from_config(config)

11.2 导出格式对比

格式 用途 优点 代码
.keras Keras 原生格式 完整保存模型和权重 model.save('model.keras')
SavedModel TensorFlow 部署 支持 TF Serving、TF Lite model.export('saved_model/')
ONNX 跨平台部署 支持多种推理引擎 tf2onnx.convert.from_keras()
TorchScript PyTorch 后端 PyTorch 生态部署 model.export('model.pt')

11.3 TensorFlow Serving 部署

# 1. 导出为 SavedModel
model.export('saved_model/1')

# 2. 使用 Docker 运行 TF Serving
# docker run -p 8501:8501 \
#   --mount type=bind,source=$(pwd)/saved_model,target=/models/my_model \
#   -e MODEL_NAME=my_model -t tensorflow/serving

# 3. 调用服务
import requests
import json

data = json.dumps({
    "signature_name": "serving_default",
    "instances": [[0.1, 0.2, 0.3, ...]]  # 输入数据
})

response = requests.post(
    'http://localhost:8501/v1/models/my_model:predict',
    data=data
)
predictions = response.json()['predictions']

11.4 Web 应用部署(Flask)

from flask import Flask, request, jsonify
import keras
import numpy as np

app = Flask(__name__)

# 加载模型
model = keras.models.load_model('my_model.keras')

@app.route('/predict', methods=['POST'])
def predict():
    try:
        data = request.json['data']
        data = np.array(data)
        
        # 预处理
        data = data.reshape(1, -1)  # 根据模型调整
        
        # 预测
        prediction = model.predict(data)[0]
        
        return jsonify({
            'prediction': prediction.tolist(),
            'class': int(np.argmax(prediction))
        })
    except Exception as e:
        return jsonify({'error': str(e)}), 400

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=5000)

十二、学习资源

📚 官方文档

🎓 在线课程

  • Deep Learning Specialization (Coursera)
  • Fast.ai Practical Deep Learning
  • TensorFlow Developer Certificate

💻 实战项目

  • Kaggle 竞赛 - 实践深度学习
  • GitHub 开源项目 - 学习优秀代码
  • 个人项目 - 解决实际问题
← 数据分析 AI 第三方库 →