面向强噪声数据的深度神经网络:深度残差收缩网络的Python编程复现 电子说
在旋转机械故障诊断领域,如何从强噪声干扰的振动信号中提取敏感特征始终是核心难题。经典的深度学习模型如卷积神经网络(Convolutional Neural Networks, CNN)在实验室干净数据集上表现优异,但在面对复杂的工业实测数据时,冗余的噪声特征可能会导致模型准确率有所下降。为了解决这一问题,论文“Deep Residual Shrinkage Networks for Fault Diagnosis”提出了一种创新的结构——深度残差收缩网络(Deep Residual Shrinkage Network, DRSN)。
DRSN的核心思想在于将“软阈值化 (Soft Thresholding)”这一经典的信号处理降噪技术集成到残差网络中。通过引入注意力机制,模型能够自适应地学习每一组特征图的收缩阈值。在特征传递的过程中,那些接近于零的、被视为噪声的特征会被自动置为零,而强特征则得以保留。这种结构有助于提高模型在强噪声环境下的鲁棒性(即抗干扰能力),还实现了端到端的自适应特征提取,无需依赖复杂的专家先验知识。
1.从残差块到自适应收缩
DRSN的核心组件是“带有通道级阈值的残差收缩构建块 (Residual Shrinkage Building Unit with Channel-wise thresholds, RSBU-CW)”。该模块在传统残差学习的基础上,并行了一个用于计算阈值的子网络。
在RSBU-CW模块中,输入特征经过两次卷积和批归一化(Batch Normalization, BN)处理后,会进入一个注意力分支。首先,通过取绝对值和全局平均池化(Global Average Pooling, GAP)将空间维度的特征压缩,计算出每个通道的绝对值平均值。接着,利用两个全连接层和Sigmoid激活函数学习出一个缩放因子α,取值范围在0到1之间。收缩阈值τ的计算公式为:τ = α * average(abs(x))。
得到阈值后,模型应用软阈值化算子处理特征图,公式为:y = sign(x) * max(abs(x) - τ, 0)。这种设计允许模型为每个特征通道独立设置阈值。
图1. 深度残差收缩网络
2. 实验设置
为了验证DRSN-CW的性能,选择了轴承诊断领域的标准基准——西储大学(CWRU)轴承数据集。实验涵盖了正常状态以及内圈故障、外圈故障和滚珠故障10类标签。每个样本采用1024个采样点的滑动窗口进行切分。
图2. 类别划分
在数据工程模块,除了常规的标准化处理,还设计了一套“在线实时增强”流水线,以模拟极端的工业场景。这包括:
(1)环移位(Rolling Shift):模拟传感器采样起始时刻的不确定性。
(2)瞬态冲击注入:模拟机器偶尔出现的磕碰干扰。
(3)加性高斯白噪声(Additive White Gaussian Noise, AWGN):在训练过程中动态混合不同信噪比(Signal-to-Noise Ratio, SNR)的噪声,尽量让模型在各种环境下保持特征一致性。特别地,构建了一个SNR为-8dB的测试环境,这在工业诊断中属于较强的背景噪声干扰。
具体的TensorFlow代码如下:
"""
项目名称:深度残差收缩网络 (DRSN-CW) - 旋转机械故障诊断复现
论文参考:Zhao, M., et al. "Deep Residual Shrinkage Networks for Fault Diagnosis," IEEE TII, 2020.
算法核心逻辑:
1. 软阈值化 (Soft Thresholding):通过非线性映射,将接近于零的噪声特征置为零,保留强特征。
2. 注意力机制 (Attention):利用小型子网络自动学习每个通道的收缩阈值,实现自适应去噪。
3. 残差学习 (Residual Learning):解决深层网络梯度消失问题,确保特征传递的稳定性。
"""
import os
import sys
import logging
import numpy as np
import scipy.io as sio
import tensorflow as tf
from tensorflow.keras import layers, Model, regularizers
from sklearn.model_selection import train_test_split as split_data
# =============================================================================
# 1. 环境与资源配置模块
# =============================================================================
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s')
class GPUConfig:
"""
计算资源管理器:负责 TensorFlow 运行时环境的初始化与硬件加速配置。
"""
@staticmethod
def init_tf():
"""
配置计算后端:
- 抑制冗余日志:减少非关键性的系统警告。
- 显存按需分配:防止 TensorFlow 启动时预占全部显存,允许与其他进程共用 GPU。
"""
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
physical_gpu_list = tf.config.list_physical_devices('GPU')
if physical_gpu_list:
try:
for gpu_device in physical_gpu_list:
# 开启显存动态增长模式
tf.config.experimental.set_memory_growth(gpu_device, True)
logging.info("GPU 硬件加速就绪:检测到 {0} 个计算单元,已启用动态显存模式。".format(len(physical_gpu_list)))
except RuntimeError as hardware_error:
logging.warning("GPU 后端配置失败(可能已被占用): %s", hardware_error)
else:
logging.info("未检测到 GPU,系统将使用 CPU 进行计算(训练速度可能受限)。")
# 执行全局初始化
GPUConfig.init_tf()
# =============================================================================
# 2. 数据工程模块 (ETL - Extract, Transform, Load)
# =============================================================================
class CWRULoader:
"""
CWRU 数据集解析器:负责原始 .mat 振动信号的读取、分段与特征重构。
"""
def __init__(self, dataset_root, window_size=1024):
"""
:param dataset_root: 数据集存储根目录
:param window_size: 样本长度(窗口步长,通常设为 1024 或 2048)
"""
self.base_directory = os.path.abspath(dataset_root)
self.sample_length = window_size
self.sampling_interval = window_size
def _parse_mat_content(self, target_file):
"""
从 MATLAB 容器中提取驱动端(DE)时间序列数据。
"""
try:
storage = sio.loadmat(target_file)
for identifier in storage.keys():
# 匹配驱动端加速度计信号键名
if 'DE_time' in identifier:
return storage[identifier].flatten()
except Exception as parse_error:
logging.debug("读取文件 %s 异常: %s", target_file, parse_error)
return None
return None
def load_data(self, category_dictionary):
"""
构建训练数据集。
:param category_dictionary: 标签与文件名的映射关系字典。
:return: (X_data, y_label) 的 Numpy 数组。
"""
feature_collection, label_collection = [], []
is_data_found = False
for class_idx, name_list in category_dictionary.items():
for filename in name_list:
full_path = os.path.join(self.base_directory, "{0}.mat".format(filename))
if not os.path.exists(full_path):
continue
vibration_series = self._parse_mat_content(full_path)
if vibration_series is None:
continue
is_data_found = True
# 非重叠滑动窗口采样:将长序列切割为定长的样本块
for pointer in range(0, len(vibration_series) - self.sample_length + 1, self.sampling_interval):
sub_sequence = vibration_series[pointer : pointer + self.sample_length]
feature_collection.append(sub_sequence)
label_collection.append(class_idx)
if not is_data_found:
raise FileNotFoundError("路径下未找到 CWRU 相关 .mat 文件,请检查路径。")
return np.array(feature_collection, dtype='float32'), np.array(label_collection, dtype='int32')
def add_awgn(signal_input, snr_value):
"""
加性高斯白噪声 (AWGN) 注入模块。
用于模拟真实工业场景下的背景噪声,测试模型的鲁棒性。
计算公式:P_noise = P_signal / 10^(SNR/10)
"""
signal_input = np.array(signal_input)
random_engine = np.random.default_rng()
# 支持固定 SNR 或 SNR 范围随机采样
target_snr = snr_value if not isinstance(snr_value, (list, tuple))
else random_engine.uniform(snr_value[0], snr_value[1])
# 计算信号功率并推导噪声标准差
signal_power = np.mean(np.square(signal_input), axis=1, keepdims=True)
noise_variance = signal_power / (10 ** (target_snr / 10.0))
noise_component = random_engine.normal(0, np.sqrt(noise_variance), signal_input.shape)
return (signal_input + noise_component).astype('float32')
# =============================================================================
# 3. 神经网络组件定义 (DRSN Core)
# =============================================================================
class SoftThresholdOperator(layers.Layer):
"""
软阈值化算子 (Custom Layer):
DRSN 的非线性核心,通过阈值 tau 对特征映射进行收缩处理。
公式:y = sign(x) * max(|x| - tau, 0)
"""
def __init__(self, **kwargs):
super(SoftThresholdOperator, self).__init__(**kwargs)
def call(self, inputs):
"""
x_conv: 输入特征图 (Batch, Steps, Channels)
tau: 学习到的阈值 (Batch, Channels)
"""
x_conv, tau = inputs
# 将阈值扩展至与特征图空间维度匹配
expanded_tau = tf.expand_dims(tau, axis=1)
return tf.sign(x_conv) * tf.maximum(tf.abs(x_conv) - expanded_tau, 0.0)
class RSBU_CW(layers.Layer):
"""
残差收缩构建块 (Residual Shrinkage Building Unit with Channel-wise thresholds):
集成了多通道注意力机制的残差块,能够为每个通道独立生成阈值。
"""
def __init__(self, filters, kernel_size, strides=1, **kwargs):
super(RSBU_CW, self).__init__(**kwargs)
self.num_kernels = filters
self.step_size = strides
self.width = kernel_size
self.weight_decay = regularizers.l2(1e-4)
# 恒等映射路径 (Residual Shortcut)
self.shortcut = None
# 主变换分支:采用经典的 BN-ReLU-Conv 结构
self.bn_alpha = layers.BatchNormalization()
self.relu_alpha = layers.Activation('relu')
self.conv_alpha = layers.Conv1D(filters, kernel_size, strides=strides, padding='same',
kernel_initializer='he_normal', kernel_regularizer=self.weight_decay)
self.bn_beta = layers.BatchNormalization()
self.relu_beta = layers.Activation('relu')
self.conv_beta = layers.Conv1D(filters, kernel_size, strides=1, padding='same',
kernel_initializer='he_normal', kernel_regularizer=self.weight_decay)
# 注意力子网络:计算通道级收缩阈值
self.gap = layers.GlobalAveragePooling1D()
self.fc1 = layers.Dense(filters, kernel_initializer='he_normal')
self.bn_gamma = layers.BatchNormalization()
self.relu_gamma = layers.Activation('relu')
self.fc2 = layers.Dense(filters, activation='sigmoid') # 归一化缩放因子
self.threshold_op = SoftThresholdOperator()
def build(self, input_dim):
"""
动态调整 Shortcut:当步长不为 1 或通道数变化时,使用 1x1 卷积对齐残差。
"""
if self.step_size != 1 or input_dim[-1] != self.num_kernels:
self.shortcut = tf.keras.Sequential([
layers.Conv1D(self.num_kernels, 1, strides=self.step_size, padding='same', use_bias=False),
layers.BatchNormalization()
])
super(RSBU_CW, self).build(input_dim)
def call(self, layer_inputs):
"""
逻辑流:特征提取 - > 通道全局特征感知 - > 动态阈值计算 - > 软阈值降噪 - > 残差相加
"""
identity = layer_inputs
if self.shortcut:
identity = self.shortcut(layer_inputs)
# 两次卷积处理得到中间特征图 x_conv
x_conv = self.bn_alpha(layer_inputs)
x_conv = self.relu_alpha(x_conv)
x_conv = self.conv_alpha(x_conv)
x_conv = self.bn_beta(x_conv)
x_conv = self.relu_beta(x_conv)
x_conv = self.conv_beta(x_conv)
# 计算特征图各通道的绝对值均值作为全局统计量
x_abs = tf.abs(x_conv)
abs_mean = self.gap(x_abs)
# 通过子网络输出 alpha (0,1),阈值 tau = alpha * abs_mean
z = self.fc1(abs_mean)
z = self.bn_gamma(z)
z = self.relu_gamma(z)
alpha = self.fc2(z)
tau = tf.multiply(alpha, abs_mean)
# 应用软阈值收缩并进行残差融合
denoised_output = self.threshold_op([x_conv, tau])
return layers.Add()([denoised_output, identity])
class DRSN_CW(Model):
"""
DRSN-CW 完整架构:
将多个 RSBU 模块顺序堆叠,最后通过全连接层进行故障分类。
"""
def __init__(self, num_classes):
super(DRSN_CW, self).__init__(name="Bearing_Fault_DRSN")
self.weight_decay = regularizers.l2(1e-4)
# 输入层:初步感知一维时序信号
self.conv1 = layers.Conv1D(32, 15, strides=2, padding='same', kernel_initializer='he_normal', kernel_regularizer=self.weight_decay)
self.bn1 = layers.BatchNormalization()
self.relu1 = layers.Activation('relu')
# 构建收缩残差块序列 (特征维度由 32 逐渐扩展至 128)
self.rsbu_blocks = [
RSBU_CW(32, 5, strides=2),
RSBU_CW(32, 5, strides=1),
RSBU_CW(64, 5, strides=2),
RSBU_CW(64, 5, strides=1),
RSBU_CW(128, 5, strides=2),
RSBU_CW(128, 5, strides=1)
]
# 输出头:降维后映射至分类空间
self.post_norm = layers.BatchNormalization()
self.post_relu = layers.Activation('relu')
self.gap_layer = layers.GlobalAveragePooling1D()
self.classifier = layers.Dense(num_classes, activation='softmax', kernel_regularizer=self.weight_decay)
def call(self, network_input):
"""
端到端正向推理流程。
"""
x = self.conv1(network_input)
x = self.bn1(x)
x = self.relu1(x)
for block in self.rsbu_blocks:
x = block(x)
x = self.post_norm(x)
x = self.post_relu(x)
x = self.gap_layer(x)
return self.classifier(x)
# =============================================================================
# 4. 训练、增强与性能评估流
# =============================================================================
def train_and_test(dataset_path, seq_len=1024):
"""
全流程控制器:涵盖数据预处理、在线增强、模型训练及极端环境(-8dB)评估。
"""
# 定义故障类别(基于 CWRU 文件命名规则)
label_map = {
0: ['Normal_0', 'Normal_1', 'Normal_2', 'Normal_3'],
1: ['IR007_0', 'IR007_1', 'IR007_2', 'IR007_3'],
2: ['IR014_0', 'IR014_1', 'IR014_2', 'IR014_3'],
3: ['IR021_0', 'IR021_1', 'IR021_2', 'IR021_3'],
4: ['B007_0', 'B007_1', 'B007_2', 'B007_3'],
5: ['B014_0', 'B014_1', 'B014_2', 'B014_3'],
6: ['B021_0', 'B021_1', 'B021_2', 'B021_3'],
7: ['OR007@6_0', 'OR007@6_1', 'OR007@6_2', 'OR007@6_3'],
8: ['OR014@6_0', 'OR014@6_1', 'OR014@6_2', 'OR014@6_3'],
9: ['OR021@6_0', 'OR021@6_1', 'OR021@6_2', 'OR021@6_3']
}
data_engine = CWRULoader(dataset_root=dataset_path, window_size=seq_len)
try:
signals, labels = data_engine.load_data(label_map)
except Exception as data_err:
logging.error("数据加载失败: %s", data_err)
return
# 随机划分:70% 训练,15% 验证,15% 测试
train_x_pre, temp_x, train_y_pre, temp_y = split_data(
signals, labels, test_size=0.3, random_state=42
)
val_x_pre, test_x_pre, val_y_pre, test_y_pre = split_data(
temp_x, temp_y, test_size=0.5, random_state=42
)
# 标准化处理:使用训练集均值和标准差,防止测试信息泄露
mu, sigma = np.mean(train_x_pre), np.std(train_x_pre)
def normalize(obs):
return ((obs - mu) / sigma).reshape(-1, seq_len, 1)
train_set_x = normalize(train_x_pre)
val_set_x = normalize(val_x_pre)
test_set_x = normalize(test_x_pre)
# 标签进行 One-hot 编码
num_classes = len(label_map)
train_set_y = tf.keras.utils.to_categorical(train_y_pre, num_classes).astype('float32')
val_set_y = tf.keras.utils.to_categorical(val_y_pre, num_classes).astype('float32')
test_set_y = tf.keras.utils.to_categorical(test_y_pre, num_classes).astype('float32')
# 测试环境:注入极强噪声(-8dB)以验证模型在极端工业背景下的表现
val_x_awgn = add_awgn(val_set_x, snr_value=-8)
test_x_awgn = add_awgn(test_set_x, snr_value=-8)
def augment_batch(feat_batch, label_batch):
"""
在线实时增强 (Online Data Augmentation):
1. 循环移位:模拟采样时刻的不确定性。
2. 瞬态冲击:模拟偶然出现的机器磕碰声。
3. 混合噪声:提升模型的抗噪阈值。
"""
rand_gen = np.random.default_rng()
augmented_x = feat_batch.copy()
batch_n, steps_n, _ = augmented_x.shape
# 随机相位平移
for sample_idx in range(batch_n):
offset = rand_gen.integers(0, steps_n)
augmented_x[sample_idx, :, 0] = np.roll(augmented_x[sample_idx, :, 0], offset)
# 脉冲冲击噪声注入 (10% 概率)
if rand_gen.random() > 0.9:
for sample_idx in range(batch_n):
if rand_gen.random() > 0.5:
num_spikes = rand_gen.integers(1, 3)
positions = rand_gen.integers(0, steps_n, num_spikes)
spike_mag = np.std(augmented_x[sample_idx]) * rand_gen.uniform(1.5, 2.5)
augmented_x[sample_idx, positions, 0] += spike_mag * rand_gen.choice([-1, 1], size=num_spikes)
# 动态 SNR 混合 (50% 概率)
if rand_gen.random() > 0.5:
augmented_x = add_awgn(augmented_x, snr_value=(-8, 8))
return augmented_x.astype(np.float32), label_batch.astype(np.float32)
def _tensor_spec_binding(f_tensor, l_tensor):
""" 为 tf.data 显式绑定形状信息 """
f_tensor.set_shape([None, seq_len, 1])
l_tensor.set_shape([None, num_classes])
return f_tensor, l_tensor
# 利用 tf.data 构建高吞吐数据流水线
training_pipeline = tf.data.Dataset.from_tensor_slices((train_set_x.astype('float32'), train_set_y))
training_pipeline = training_pipeline.shuffle(len(train_set_x)).batch(64)
training_pipeline = training_pipeline.map(
lambda x, y: tf.numpy_function(augment_batch, [x, y], [tf.float32, tf.float32]),
num_parallel_calls=tf.data.AUTOTUNE
).map(_tensor_spec_binding).prefetch(tf.data.AUTOTUNE)
# 模型实例化与编译
model_instance = DRSN_CW(num_classes=num_classes)
model_instance.compile(
optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
loss=tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.0), # 交叉熵损失
metrics=['accuracy']
)
logging.info("诊断系统启动:分类数=%d, 序列长度=%d", num_classes, seq_len)
# 动态学习率调整与早停保护
optimization_callbacks = [
tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=7, min_lr=1e-6, verbose=1),
tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True)
]
# 模型拟合
model_instance.fit(
training_pipeline,
epochs=100,
validation_data=(val_x_awgn, val_set_y),
callbacks=optimization_callbacks,
verbose=2
)
# 在极低信噪比环境下最终验证性能
final_loss, final_acc = model_instance.evaluate(test_x_awgn, test_set_y, verbose=0)
print("n" + "="*50)
print("模型评估报告 (DRSN-CW)")
print("评估背景:-8dB SNR (强噪声干扰环境)")
print("最终识别准确率: {0:.2f}%".format(final_acc * 100))
print("="*50)
# =============================================================================
# 程序入口
# =============================================================================
if __name__ == "__main__":
# 配置默认的数据搜索目录
DATA_PATH = os.path.join(os.getcwd(), 'data_path')
if not os.path.exists(DATA_PATH):
logging.warning("未找到默认数据目录: %s", DATA_PATH)
user_input_path = input("请输入 CWRU 原始数据集 (.mat) 所在的完整路径: ").strip()
if user_input_path:
DATA_PATH = user_input_path
else:
logging.critical("未提供有效路径,程序退出。")
sys.exit(0)
# 启动训练与测试
train_and_test(DATA_PATH, seq_len=1024)
3.强噪声环境下的诊断性能分析与复现总结
在复现实验中,使用了Adam优化器进行训练,并结合了学习率动态调整 (ReduceLROnPlateau) 策略。在注入了-8dB的高斯噪声后,DRSN-CW依然保持了90%以上的测试准确率。
图3. 实验结果
论文原文:
论文标题: Deep residual shrinkage networks for fault diagnosis
出版期刊: IEEE Transactions on Industrial Informatics. 2020, 16(7): 4681-4690.
DOI: 10.1109/TII.2019.2943898
https://ieeexplore.ieee.org/document/8850096
审核编辑 黄宇
全部0条评论
快来发表一下你的评论吧 !