面向强噪声数据的深度神经网络:深度残差收缩网络的Python编程复现

电子说

1.4w人已加入

描述

在旋转机械故障诊断领域,如何从强噪声干扰的振动信号中提取敏感特征始终是核心难题。经典的深度学习模型如卷积神经网络(Convolutional Neural Networks, CNN)在实验室干净数据集上表现优异,但在面对复杂的工业实测数据时,冗余的噪声特征可能会导致模型准确率有所下降。为了解决这一问题,论文“Deep Residual Shrinkage Networks for Fault Diagnosis”提出了一种创新的结构——深度残差收缩网络(Deep Residual Shrinkage Network, DRSN)。

DRSN的核心思想在于将“软阈值化 (Soft Thresholding)”这一经典的信号处理降噪技术集成到残差网络中。通过引入注意力机制,模型能够自适应地学习每一组特征图的收缩阈值。在特征传递的过程中,那些接近于零的、被视为噪声的特征会被自动置为零,而强特征则得以保留。这种结构有助于提高模型在强噪声环境下的鲁棒性(即抗干扰能力),还实现了端到端的自适应特征提取,无需依赖复杂的专家先验知识。

1.从残差块到自适应收缩

DRSN的核心组件是“带有通道级阈值的残差收缩构建块 (Residual Shrinkage Building Unit with Channel-wise thresholds, RSBU-CW)”。该模块在传统残差学习的基础上,并行了一个用于计算阈值的子网络。

在RSBU-CW模块中,输入特征经过两次卷积和批归一化(Batch Normalization, BN)处理后,会进入一个注意力分支。首先,通过取绝对值和全局平均池化(Global Average Pooling, GAP)将空间维度的特征压缩,计算出每个通道的绝对值平均值。接着,利用两个全连接层和Sigmoid激活函数学习出一个缩放因子α,取值范围在0到1之间。收缩阈值τ的计算公式为:τ = α * average(abs(x))。

得到阈值后,模型应用软阈值化算子处理特征图,公式为:y = sign(x) * max(abs(x) - τ, 0)。这种设计允许模型为每个特征通道独立设置阈值。

python图1. 深度残差收缩网络

2. 实验设置

为了验证DRSN-CW的性能,选择了轴承诊断领域的标准基准——西储大学(CWRU)轴承数据集。实验涵盖了正常状态以及内圈故障、外圈故障和滚珠故障10类标签。每个样本采用1024个采样点的滑动窗口进行切分。

python图2. 类别划分

在数据工程模块,除了常规的标准化处理,还设计了一套“在线实时增强”流水线,以模拟极端的工业场景。这包括:

(1)环移位(Rolling Shift):模拟传感器采样起始时刻的不确定性。

(2)瞬态冲击注入:模拟机器偶尔出现的磕碰干扰。

(3)加性高斯白噪声(Additive White Gaussian Noise, AWGN):在训练过程中动态混合不同信噪比(Signal-to-Noise Ratio, SNR)的噪声,尽量让模型在各种环境下保持特征一致性。特别地,构建了一个SNR为-8dB的测试环境,这在工业诊断中属于较强的背景噪声干扰。

具体的TensorFlow代码如下:

 

"""
项目名称:深度残差收缩网络 (DRSN-CW) - 旋转机械故障诊断复现
论文参考:Zhao, M., et al. "Deep Residual Shrinkage Networks for Fault Diagnosis," IEEE TII, 2020.

算法核心逻辑:
1. 软阈值化 (Soft Thresholding):通过非线性映射,将接近于零的噪声特征置为零,保留强特征。
2. 注意力机制 (Attention):利用小型子网络自动学习每个通道的收缩阈值,实现自适应去噪。
3. 残差学习 (Residual Learning):解决深层网络梯度消失问题,确保特征传递的稳定性。
"""

import os
import sys
import logging
import numpy as np
import scipy.io as sio
import tensorflow as tf
from tensorflow.keras import layers, Model, regularizers
from sklearn.model_selection import train_test_split as split_data

# =============================================================================
# 1. 环境与资源配置模块
# =============================================================================

logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s')

class GPUConfig:
    """
    计算资源管理器:负责 TensorFlow 运行时环境的初始化与硬件加速配置。
    """
    
    @staticmethod
    def init_tf():
        """
        配置计算后端:
        - 抑制冗余日志:减少非关键性的系统警告。
        - 显存按需分配:防止 TensorFlow 启动时预占全部显存,允许与其他进程共用 GPU。
        """
        os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
        
        physical_gpu_list = tf.config.list_physical_devices('GPU')
        if physical_gpu_list:
            try:
                for gpu_device in physical_gpu_list:
                    # 开启显存动态增长模式
                    tf.config.experimental.set_memory_growth(gpu_device, True)
                logging.info("GPU 硬件加速就绪:检测到 {0} 个计算单元,已启用动态显存模式。".format(len(physical_gpu_list)))
            except RuntimeError as hardware_error:
                logging.warning("GPU 后端配置失败(可能已被占用): %s", hardware_error)
        else:
            logging.info("未检测到 GPU,系统将使用 CPU 进行计算(训练速度可能受限)。")

# 执行全局初始化
GPUConfig.init_tf()

# =============================================================================
# 2. 数据工程模块 (ETL - Extract, Transform, Load)
# =============================================================================

class CWRULoader:
    """
    CWRU 数据集解析器:负责原始 .mat 振动信号的读取、分段与特征重构。
    """
    
    def __init__(self, dataset_root, window_size=1024):
        """
        :param dataset_root: 数据集存储根目录
        :param window_size: 样本长度(窗口步长,通常设为 1024 或 2048)
        """
        self.base_directory = os.path.abspath(dataset_root)
        self.sample_length = window_size
        self.sampling_interval = window_size 

    def _parse_mat_content(self, target_file):
        """
        从 MATLAB 容器中提取驱动端(DE)时间序列数据。
        """
        try:
            storage = sio.loadmat(target_file)
            for identifier in storage.keys():
                # 匹配驱动端加速度计信号键名
                if 'DE_time' in identifier:
                    return storage[identifier].flatten()
        except Exception as parse_error:
            logging.debug("读取文件 %s 异常: %s", target_file, parse_error)
            return None
        return None

    def load_data(self, category_dictionary):
        """
        构建训练数据集。
        :param category_dictionary: 标签与文件名的映射关系字典。
        :return: (X_data, y_label) 的 Numpy 数组。
        """
        feature_collection, label_collection = [], []
        is_data_found = False
        
        for class_idx, name_list in category_dictionary.items():
            for filename in name_list:
                full_path = os.path.join(self.base_directory, "{0}.mat".format(filename))
                if not os.path.exists(full_path):
                    continue
                
                vibration_series = self._parse_mat_content(full_path)
                if vibration_series is None:
                    continue
                
                is_data_found = True
                # 非重叠滑动窗口采样:将长序列切割为定长的样本块
                for pointer in range(0, len(vibration_series) - self.sample_length + 1, self.sampling_interval):
                    sub_sequence = vibration_series[pointer : pointer + self.sample_length]
                    feature_collection.append(sub_sequence)
                    label_collection.append(class_idx)
        
        if not is_data_found:
            raise FileNotFoundError("路径下未找到 CWRU 相关 .mat 文件,请检查路径。")
            
        return np.array(feature_collection, dtype='float32'), np.array(label_collection, dtype='int32')

def add_awgn(signal_input, snr_value):
    """
    加性高斯白噪声 (AWGN) 注入模块。
    用于模拟真实工业场景下的背景噪声,测试模型的鲁棒性。
    计算公式:P_noise = P_signal / 10^(SNR/10)
    """
    signal_input = np.array(signal_input)
    random_engine = np.random.default_rng()
    
    # 支持固定 SNR 或 SNR 范围随机采样
    target_snr = snr_value if not isinstance(snr_value, (list, tuple)) 
                 else random_engine.uniform(snr_value[0], snr_value[1])
    
    # 计算信号功率并推导噪声标准差
    signal_power = np.mean(np.square(signal_input), axis=1, keepdims=True)
    noise_variance = signal_power / (10 ** (target_snr / 10.0))
    noise_component = random_engine.normal(0, np.sqrt(noise_variance), signal_input.shape)
    
    return (signal_input + noise_component).astype('float32')

# =============================================================================
# 3. 神经网络组件定义 (DRSN Core)
# =============================================================================

class SoftThresholdOperator(layers.Layer):
    """
    软阈值化算子 (Custom Layer):
    DRSN 的非线性核心,通过阈值 tau 对特征映射进行收缩处理。
    公式:y = sign(x) * max(|x| - tau, 0)
    """
    def __init__(self, **kwargs):
        super(SoftThresholdOperator, self).__init__(**kwargs)

    def call(self, inputs):
        """
        x_conv: 输入特征图 (Batch, Steps, Channels)
        tau: 学习到的阈值 (Batch, Channels)
        """
        x_conv, tau = inputs
        # 将阈值扩展至与特征图空间维度匹配
        expanded_tau = tf.expand_dims(tau, axis=1)
        return tf.sign(x_conv) * tf.maximum(tf.abs(x_conv) - expanded_tau, 0.0)

class RSBU_CW(layers.Layer):
    """
    残差收缩构建块 (Residual Shrinkage Building Unit with Channel-wise thresholds):
    集成了多通道注意力机制的残差块,能够为每个通道独立生成阈值。
    """
    def __init__(self, filters, kernel_size, strides=1, **kwargs):
        super(RSBU_CW, self).__init__(**kwargs)
        self.num_kernels = filters
        self.step_size = strides
        self.width = kernel_size
        self.weight_decay = regularizers.l2(1e-4)

        # 恒等映射路径 (Residual Shortcut)
        self.shortcut = None
        
        # 主变换分支:采用经典的 BN-ReLU-Conv 结构
        self.bn_alpha = layers.BatchNormalization()
        self.relu_alpha = layers.Activation('relu')
        self.conv_alpha = layers.Conv1D(filters, kernel_size, strides=strides, padding='same', 
                                       kernel_initializer='he_normal', kernel_regularizer=self.weight_decay)
        
        self.bn_beta = layers.BatchNormalization()
        self.relu_beta = layers.Activation('relu')
        self.conv_beta = layers.Conv1D(filters, kernel_size, strides=1, padding='same', 
                                      kernel_initializer='he_normal', kernel_regularizer=self.weight_decay)
        
        # 注意力子网络:计算通道级收缩阈值
        self.gap = layers.GlobalAveragePooling1D()
        self.fc1 = layers.Dense(filters, kernel_initializer='he_normal')
        self.bn_gamma = layers.BatchNormalization()
        self.relu_gamma = layers.Activation('relu')
        self.fc2 = layers.Dense(filters, activation='sigmoid') # 归一化缩放因子
        self.threshold_op = SoftThresholdOperator()

    def build(self, input_dim):
        """
        动态调整 Shortcut:当步长不为 1 或通道数变化时,使用 1x1 卷积对齐残差。
        """
        if self.step_size != 1 or input_dim[-1] != self.num_kernels:
            self.shortcut = tf.keras.Sequential([
                layers.Conv1D(self.num_kernels, 1, strides=self.step_size, padding='same', use_bias=False),
                layers.BatchNormalization()
            ])
        super(RSBU_CW, self).build(input_dim)

    def call(self, layer_inputs):
        """
        逻辑流:特征提取 - > 通道全局特征感知 - > 动态阈值计算 - > 软阈值降噪 - > 残差相加
        """
        identity = layer_inputs
        if self.shortcut:
            identity = self.shortcut(layer_inputs)

        # 两次卷积处理得到中间特征图 x_conv
        x_conv = self.bn_alpha(layer_inputs)
        x_conv = self.relu_alpha(x_conv)
        x_conv = self.conv_alpha(x_conv)
        x_conv = self.bn_beta(x_conv)
        x_conv = self.relu_beta(x_conv)
        x_conv = self.conv_beta(x_conv)

        # 计算特征图各通道的绝对值均值作为全局统计量
        x_abs = tf.abs(x_conv)
        abs_mean = self.gap(x_abs)
        
        # 通过子网络输出 alpha (0,1),阈值 tau = alpha * abs_mean
        z = self.fc1(abs_mean)
        z = self.bn_gamma(z)
        z = self.relu_gamma(z)
        alpha = self.fc2(z)
        
        tau = tf.multiply(alpha, abs_mean)
        
        # 应用软阈值收缩并进行残差融合
        denoised_output = self.threshold_op([x_conv, tau])
        return layers.Add()([denoised_output, identity])

class DRSN_CW(Model):
    """
    DRSN-CW 完整架构:
    将多个 RSBU 模块顺序堆叠,最后通过全连接层进行故障分类。
    """
    def __init__(self, num_classes):
        super(DRSN_CW, self).__init__(name="Bearing_Fault_DRSN")
        self.weight_decay = regularizers.l2(1e-4)
        
        # 输入层:初步感知一维时序信号
        self.conv1 = layers.Conv1D(32, 15, strides=2, padding='same', kernel_initializer='he_normal', kernel_regularizer=self.weight_decay)
        self.bn1 = layers.BatchNormalization()
        self.relu1 = layers.Activation('relu')
        
        # 构建收缩残差块序列 (特征维度由 32 逐渐扩展至 128)
        self.rsbu_blocks = [
            RSBU_CW(32, 5, strides=2),
            RSBU_CW(32, 5, strides=1),
            RSBU_CW(64, 5, strides=2),
            RSBU_CW(64, 5, strides=1),
            RSBU_CW(128, 5, strides=2),
            RSBU_CW(128, 5, strides=1)
        ]
        
        # 输出头:降维后映射至分类空间
        self.post_norm = layers.BatchNormalization()
        self.post_relu = layers.Activation('relu')
        self.gap_layer = layers.GlobalAveragePooling1D()
        self.classifier = layers.Dense(num_classes, activation='softmax', kernel_regularizer=self.weight_decay)

    def call(self, network_input):
        """
        端到端正向推理流程。
        """
        x = self.conv1(network_input)
        x = self.bn1(x)
        x = self.relu1(x)
        
        for block in self.rsbu_blocks:
            x = block(x)
            
        x = self.post_norm(x)
        x = self.post_relu(x)
        x = self.gap_layer(x)
        return self.classifier(x)

# =============================================================================
# 4. 训练、增强与性能评估流
# =============================================================================

def train_and_test(dataset_path, seq_len=1024):
    """
    全流程控制器:涵盖数据预处理、在线增强、模型训练及极端环境(-8dB)评估。
    """
    
    # 定义故障类别(基于 CWRU 文件命名规则)
    label_map = {
        0: ['Normal_0', 'Normal_1', 'Normal_2', 'Normal_3'],
        1: ['IR007_0', 'IR007_1', 'IR007_2', 'IR007_3'],
        2: ['IR014_0', 'IR014_1', 'IR014_2', 'IR014_3'],
        3: ['IR021_0', 'IR021_1', 'IR021_2', 'IR021_3'],
        4: ['B007_0', 'B007_1', 'B007_2', 'B007_3'],
        5: ['B014_0', 'B014_1', 'B014_2', 'B014_3'],
        6: ['B021_0', 'B021_1', 'B021_2', 'B021_3'],
        7: ['OR007@6_0', 'OR007@6_1', 'OR007@6_2', 'OR007@6_3'],
        8: ['OR014@6_0', 'OR014@6_1', 'OR014@6_2', 'OR014@6_3'],
        9: ['OR021@6_0', 'OR021@6_1', 'OR021@6_2', 'OR021@6_3']
    }
    
    data_engine = CWRULoader(dataset_root=dataset_path, window_size=seq_len)
    
    try:
        signals, labels = data_engine.load_data(label_map)
    except Exception as data_err:
        logging.error("数据加载失败: %s", data_err)
        return

    # 随机划分:70% 训练,15% 验证,15% 测试
    train_x_pre, temp_x, train_y_pre, temp_y = split_data(
        signals, labels, test_size=0.3, random_state=42
    )
    val_x_pre, test_x_pre, val_y_pre, test_y_pre = split_data(
        temp_x, temp_y, test_size=0.5, random_state=42
    )
    
    # 标准化处理:使用训练集均值和标准差,防止测试信息泄露
    mu, sigma = np.mean(train_x_pre), np.std(train_x_pre)
    
    def normalize(obs):
        return ((obs - mu) / sigma).reshape(-1, seq_len, 1)

    train_set_x = normalize(train_x_pre)
    val_set_x = normalize(val_x_pre)
    test_set_x = normalize(test_x_pre)
    
    # 标签进行 One-hot 编码
    num_classes = len(label_map)
    train_set_y = tf.keras.utils.to_categorical(train_y_pre, num_classes).astype('float32')
    val_set_y = tf.keras.utils.to_categorical(val_y_pre, num_classes).astype('float32')
    test_set_y = tf.keras.utils.to_categorical(test_y_pre, num_classes).astype('float32')

    # 测试环境:注入极强噪声(-8dB)以验证模型在极端工业背景下的表现
    val_x_awgn = add_awgn(val_set_x, snr_value=-8)
    test_x_awgn = add_awgn(test_set_x, snr_value=-8)

    def augment_batch(feat_batch, label_batch):
        """
        在线实时增强 (Online Data Augmentation):
        1. 循环移位:模拟采样时刻的不确定性。
        2. 瞬态冲击:模拟偶然出现的机器磕碰声。
        3. 混合噪声:提升模型的抗噪阈值。
        """
        rand_gen = np.random.default_rng()
        augmented_x = feat_batch.copy()
        batch_n, steps_n, _ = augmented_x.shape

        # 随机相位平移
        for sample_idx in range(batch_n):
            offset = rand_gen.integers(0, steps_n)
            augmented_x[sample_idx, :, 0] = np.roll(augmented_x[sample_idx, :, 0], offset)

        # 脉冲冲击噪声注入 (10% 概率)
        if rand_gen.random() > 0.9: 
            for sample_idx in range(batch_n):
                if rand_gen.random() > 0.5: 
                    num_spikes = rand_gen.integers(1, 3) 
                    positions = rand_gen.integers(0, steps_n, num_spikes)
                    spike_mag = np.std(augmented_x[sample_idx]) * rand_gen.uniform(1.5, 2.5) 
                    augmented_x[sample_idx, positions, 0] += spike_mag * rand_gen.choice([-1, 1], size=num_spikes)

        # 动态 SNR 混合 (50% 概率)
        if rand_gen.random() > 0.5: 
            augmented_x = add_awgn(augmented_x, snr_value=(-8, 8))

        return augmented_x.astype(np.float32), label_batch.astype(np.float32)

    def _tensor_spec_binding(f_tensor, l_tensor):
        """ 为 tf.data 显式绑定形状信息 """
        f_tensor.set_shape([None, seq_len, 1])
        l_tensor.set_shape([None, num_classes])
        return f_tensor, l_tensor

    # 利用 tf.data 构建高吞吐数据流水线
    training_pipeline = tf.data.Dataset.from_tensor_slices((train_set_x.astype('float32'), train_set_y))
    training_pipeline = training_pipeline.shuffle(len(train_set_x)).batch(64)
    training_pipeline = training_pipeline.map(
        lambda x, y: tf.numpy_function(augment_batch, [x, y], [tf.float32, tf.float32]),
        num_parallel_calls=tf.data.AUTOTUNE
    ).map(_tensor_spec_binding).prefetch(tf.data.AUTOTUNE)

    # 模型实例化与编译
    model_instance = DRSN_CW(num_classes=num_classes)
    model_instance.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3), 
        loss=tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.0), # 交叉熵损失
        metrics=['accuracy']
    )

    logging.info("诊断系统启动:分类数=%d, 序列长度=%d", num_classes, seq_len)
    
    # 动态学习率调整与早停保护
    optimization_callbacks = [
        tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=7, min_lr=1e-6, verbose=1),
        tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True)
    ]

    # 模型拟合
    model_instance.fit(
        training_pipeline,
        epochs=100,
        validation_data=(val_x_awgn, val_set_y),
        callbacks=optimization_callbacks,
        verbose=2
    )

    # 在极低信噪比环境下最终验证性能
    final_loss, final_acc = model_instance.evaluate(test_x_awgn, test_set_y, verbose=0)
    print("n" + "="*50)
    print("模型评估报告 (DRSN-CW)")
    print("评估背景:-8dB SNR (强噪声干扰环境)")
    print("最终识别准确率: {0:.2f}%".format(final_acc * 100))
    print("="*50)

# =============================================================================
# 程序入口
# =============================================================================

if __name__ == "__main__":
    # 配置默认的数据搜索目录
    DATA_PATH = os.path.join(os.getcwd(), 'data_path')
    
    if not os.path.exists(DATA_PATH):
        logging.warning("未找到默认数据目录: %s", DATA_PATH)
        user_input_path = input("请输入 CWRU 原始数据集 (.mat) 所在的完整路径: ").strip()
        if user_input_path:
            DATA_PATH = user_input_path
        else:
            logging.critical("未提供有效路径,程序退出。")
            sys.exit(0)

    # 启动训练与测试
    train_and_test(DATA_PATH, seq_len=1024)

 

3.强噪声环境下的诊断性能分析与复现总结

在复现实验中,使用了Adam优化器进行训练,并结合了学习率动态调整 (ReduceLROnPlateau) 策略。在注入了-8dB的高斯噪声后,DRSN-CW依然保持了90%以上的测试准确率。

python图3. 实验结果

论文原文:

论文标题: Deep residual shrinkage networks for fault diagnosis

出版期刊: IEEE Transactions on Industrial Informatics. 2020, 16(7): 4681-4690.

DOI: 10.1109/TII.2019.2943898

https://ieeexplore.ieee.org/document/8850096

审核编辑 黄宇

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分