基于自编码器的心电信号异常检测(Pytorch)

代码较为简单,很容易读懂。

# Importing necessary libraries for TensorFlow, pandas, numpy, and matplotlib
import tensorflow as tf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import copy# Importing the PyTorch library
import torch# Importing additional libraries for data manipulation, visualization, and machine learning
import copy
import seaborn as sns
from pylab import rcParams
from matplotlib import rc
from sklearn.model_selection import train_test_split# Importing PyTorch modules for neural network implementation
from torch import nn, optim
import torch.nn.functional as F
import torch.nn as nn# Ignoring warnings to enhance code cleanliness
import warnings
warnings.filterwarnings('ignore')
df = pd.read_csv('http://storage.googleapis.com/download.tensorflow.org/data/ecg.csv',header=None)
df.head().T

df.describe()

df.isna().sum()
0      0
1      0
2      0
3      0
4      0..
136    0
137    0
138    0
139    0
140    0
Length: 141, dtype: int64
df.dtypes
0      float64
1      float64
2      float64
3      float64
4      float64...   
136    float64
137    float64
138    float64
139    float64
140    float64
Length: 141, dtype: object
new_columns = list(df.columns)
new_columns[-1] = 'target'
df.columns = new_columns
df.target.value_counts()
1.0    2919
0.0    2079
Name: target, dtype: int64
value_counts = df['target'].value_counts()# Plotting
plt.figure(figsize=(8, 6))
value_counts.plot(kind='bar', color='skyblue')
plt.title('Value Counts of Target Column')
plt.xlabel('Target Values')
plt.ylabel('Count')# Display the count values on top of the bars
for i, count in enumerate(value_counts):plt.text(i, count + 0.1, str(count), ha='center', va='bottom')plt.show()

classes = df.target.unique()def plot_ecg(data, class_name, ax, n_steps=10):# Convert data to a DataFrametime_series_df = pd.DataFrame(data)# Apply a moving average for smoothingsmooth_data = time_series_df.rolling(window=n_steps, min_periods=1).mean()# Calculate upper and lower bounds for confidence intervaldeviation = time_series_df.rolling(window=n_steps, min_periods=1).std()upper_bound = smooth_data + deviationlower_bound = smooth_data - deviation# Plot the smoothed dataax.plot(smooth_data, color='black', linewidth=2)# Plot the confidence intervalax.fill_between(time_series_df.index, lower_bound[0], upper_bound[0], color='black', alpha=0.2)# Set the titleax.set_title(class_name)
# Plotting setup
fig, axs = plt.subplots(nrows=len(classes) // 3 + 1,ncols=3,sharey=True,figsize=(14, 8)
)# Plot for each class
for i, cls in enumerate(classes):ax = axs.flat[i]data = df[df.target == cls].drop(labels='target', axis=1).mean(axis=0).to_numpy()plot_ecg(data, cls, ax)  # Using 'cls' directly as class name# Adjust layout and remove extra axes
fig.delaxes(axs.flat[-1])
fig.tight_layout()plt.show()

normal_df = df[df.target == 1].drop(labels='target', axis=1)
normal_df.shape
(2919, 140)
anomaly_df = df[df.target != 1].drop(labels='target', axis=1)
anomaly_df.shape
(2079, 140)
# Splitting the Dataset# Initial Train-Validation Split:
# The dataset 'normal_df' is divided into training and validation sets.
# 15% of the data is allocated to the validation set.
# The use of 'random_state=42' ensures reproducibility.train_df, val_df = train_test_split(normal_df,test_size=0.15,random_state=42
)# Further Splitting for Validation and Test:
# The validation set obtained in the previous step is further split into validation and test sets.
# 33% of the validation set is allocated to the test set.
# The same 'random_state=42' is used for consistency in randomization.val_df, test_df = train_test_split(val_df,test_size=0.30,random_state=42
)
# Function to Create a Dataset
def create_dataset(df):# Convert DataFrame to a list of sequences, each represented as a list of floatssequences = df.astype(np.float32).to_numpy().tolist()# Convert sequences to PyTorch tensors, each with shape (sequence_length, 1, num_features)dataset = [torch.tensor(s).unsqueeze(1).float() for s in sequences]# Extract dimensions of the datasetn_seq, seq_len, n_features = torch.stack(dataset).shape# Return the dataset, sequence length, and number of featuresreturn dataset, seq_len, n_features
# Create the training dataset from train_df
train_dataset, seq_len, n_features = create_dataset(train_df)# Create the validation dataset from val_df
val_dataset, _, _ = create_dataset(val_df)# Create the test dataset for normal cases from test_df
test_normal_dataset, _, _ = create_dataset(test_df)# Create the test dataset for anomalous cases from anomaly_df
test_anomaly_dataset, _, _ = create_dataset(anomaly_df)

Implementation of LSTM-Based Autoencoder for ECG Anomaly Detection

class Encoder(nn.Module):def __init__(self, seq_len, n_features, embedding_dim=64):super(Encoder, self).__init__()self.seq_len, self.n_features = seq_len, n_featuresself.embedding_dim, self.hidden_dim = embedding_dim, 2 * embedding_dimself.rnn1 = nn.LSTM(input_size=n_features,hidden_size=self.hidden_dim,num_layers=1,batch_first=True)self.rnn2 = nn.LSTM(input_size=self.hidden_dim,hidden_size=embedding_dim,num_layers=1,batch_first=True)def forward(self, x):x = x.reshape((1, self.seq_len, self.n_features))x, (_, _) = self.rnn1(x)x, (hidden_n, _) = self.rnn2(x)return hidden_n.reshape((self.n_features, self.embedding_dim))
class Decoder(nn.Module):def __init__(self, seq_len, input_dim=64, n_features=1):super(Decoder, self).__init__()self.seq_len, self.input_dim = seq_len, input_dimself.hidden_dim, self.n_features = 2 * input_dim, n_featuresself.rnn1 = nn.LSTM(input_size=input_dim,hidden_size=input_dim,num_layers=1,batch_first=True)self.rnn2 = nn.LSTM(input_size=input_dim,hidden_size=self.hidden_dim,num_layers=1,batch_first=True)self.output_layer = nn.Linear(self.hidden_dim, n_features)def forward(self, x):x = x.repeat(self.seq_len, self.n_features)x = x.reshape((self.n_features, self.seq_len, self.input_dim))x, (hidden_n, cell_n) = self.rnn1(x)x, (hidden_n, cell_n) = self.rnn2(x)x = x.reshape((self.seq_len, self.hidden_dim))return self.output_layer(x)
class Autoencoder(nn.Module):def __init__(self, seq_len, n_features, embedding_dim=64):super(Autoencoder, self).__init__()self.encoder = Encoder(seq_len, n_features, embedding_dim).to(device)self.decoder = Decoder(seq_len, embedding_dim, n_features).to(device)def forward(self, x):x = self.encoder(x)x = self.decoder(x)return x
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Autoencoder(seq_len, n_features, 128)
model = model.to(device)

Training and Visualization of ECG Autoencoder Model

def plot_input_reconstruction(model, dataset, epoch):model = model.eval()plt.figure(figsize=(10, 5))# Take the first sequence from the datasetseq_true = dataset[0].to(device)seq_pred = model(seq_true)with torch.no_grad():# Squeeze the sequences to ensure they are 1-dimensionalinput_sequence = seq_true.squeeze().cpu().numpy()reconstruction_sequence = seq_pred.squeeze().cpu().numpy()# Check the shape after squeezingif input_sequence.ndim != 1 or reconstruction_sequence.ndim != 1:raise ValueError("Input and reconstruction sequences must be 1-dimensional after squeezing.")# Plotting the sequencesplt.plot(input_sequence, label='Input Sequence', color='black')plt.plot(reconstruction_sequence, label='Reconstruction Sequence', color='red')plt.fill_between(range(len(input_sequence)), input_sequence, reconstruction_sequence, color='gray', alpha=0.5)plt.title(f'Input vs Reconstruction - Epoch {epoch}')plt.legend()plt.show()import torch
import numpy as np
import copydef train_model(model, train_dataset, val_dataset, n_epochs, save_path):optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)criterion = torch.nn.L1Loss(reduction='sum').to(device)history = {'train': [], 'val': []}best_model_wts = copy.deepcopy(model.state_dict())best_loss = float('inf')for epoch in range(1, n_epochs + 1):model.train()train_losses = []for seq_true in train_dataset:optimizer.zero_grad()seq_true = seq_true.to(device)seq_pred = model(seq_true)loss = criterion(seq_pred, seq_true)loss.backward()optimizer.step()train_losses.append(loss.item())val_losses = []model.eval()with torch.no_grad():for seq_true in val_dataset:seq_true = seq_true.to(device)seq_pred = model(seq_true)loss = criterion(seq_pred, seq_true)val_losses.append(loss.item())train_loss = np.mean(train_losses)val_loss = np.mean(val_losses)history['train'].append(train_loss)history['val'].append(val_loss)if val_loss < best_loss:best_loss = val_lossbest_model_wts = copy.deepcopy(model.state_dict())# Save the best model weightsprint("Saving best model")torch.save(model.state_dict(), save_path)print(f'Epoch {epoch}: train loss {train_loss} val loss {val_loss}')if epoch == 1 or epoch % 5 == 0:plot_input_reconstruction(model, val_dataset, epoch)# Load the best model weights before returningmodel.load_state_dict(best_model_wts)return model.eval(), history
save_path = 'best_model.pth'  # Replace with your actual path
model, history = train_model(model, train_dataset, val_dataset, 100, save_path)

ax = plt.figure().gca()ax.plot(history['train'],label='Train Loss', color='black')
ax.plot(history['val'],label='Val Loss', color='red')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['train', 'test'])
plt.title('Loss over training epochs')
plt.show();

ECG Anomaly Detection Model Evaluation and Visualization

model = Autoencoder(seq_len, n_features, 128)model.load_state_dict(torch.load('best_model.pth'))model = model.to(device)
model.eval()
Autoencoder((encoder): Encoder((rnn1): LSTM(1, 256, batch_first=True)(rnn2): LSTM(256, 128, batch_first=True))(decoder): Decoder((rnn1): LSTM(128, 128, batch_first=True)(rnn2): LSTM(128, 256, batch_first=True)(output_layer): Linear(in_features=256, out_features=1, bias=True))
)
def predict(model, dataset):predictions, losses = [], []criterion = nn.L1Loss(reduction='sum').to(device)with torch.no_grad():model = model.eval()for seq_true in dataset:seq_true = seq_true.to(device)seq_pred = model(seq_true)loss = criterion(seq_pred, seq_true)predictions.append(seq_pred.cpu().numpy().flatten())losses.append(loss.item())return predictions, losses
_, losses = predict(model, train_dataset)sns.distplot(losses, bins=50, kde=True, label='Train',color='black');#Visualising train loss

Threshold = 25
predictions, pred_losses = predict(model, test_normal_dataset)
sns.distplot(pred_losses, bins=50, kde=True,color='black')

correct = sum(l <= 25 for l in pred_losses)
print(f'Correct normal predictions: {correct}/{len(test_normal_dataset)}')
Correct normal predictions: 141/145
anomaly_dataset = test_anomaly_dataset[:len(test_normal_dataset)]
predictions, pred_losses = predict(model, anomaly_dataset)
sns.distplot(pred_losses, bins=50, kde=True,color='red');

correct = sum(l > 25 for l in pred_losses)
print(f'Correct anomaly predictions: {correct}/{len(anomaly_dataset)}')

Correct anomaly predictions: 145/145

def plot_prediction(data, model, title, ax):predictions, pred_losses = predict(model, [data])ax.plot(data, label='true',color='black')ax.plot(predictions[0], label='reconstructed',color='red')ax.set_title(f'{title} (loss: {np.around(pred_losses[0], 2)})')ax.legend()
fig, axs = plt.subplots(nrows=2,ncols=4,sharey=True,sharex=True,figsize=(22, 8)
)for i, data in enumerate(test_normal_dataset[:4]):plot_prediction(data, model, title='Normal', ax=axs[0, i])for i, data in enumerate(test_anomaly_dataset[:4]):plot_prediction(data, model, title='Anomaly', ax=axs[1, i])fig.tight_layout();

工学博士,担任《Mechanical System and Signal Processing》《中国电机工程学报》《控制与决策》等期刊审稿专家,擅长领域:现代信号处理,机器学习,深度学习,数字孪生,时间序列分析,设备缺陷检测、设备异常检测、设备智能故障诊断与健康管理PHM等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/29124.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

内网穿透的原理:实现远程访问的技术揭秘

内网穿透的原理&#xff1a;实现远程访问的技术揭秘 引言 内网穿透是一种允许外部网络访问内网服务的技术。这对于远程办公、访问家庭服务器或进行开发测试非常有用。本文将探讨内网穿透的工作原理及其实现方式。 基础知识 内网&#xff08;LAN&#xff09;&#xff1a;局域…

ml307A模块连接阿里云(详细版)

1、需要的信息 MQTT连接参数、订阅或发布的主题、服务器地址、端口1883 服务器地址&#xff1a; alFMz7jnArW.iot-as-mqtt.cn-shanghai.aliyuncs.com 注&#xff1a;重要的信息阿里云信息大家不要透露&#xff0c;写完笔记会及时删除产品及设备&#xff0c;大家用自己的信息…

linux精通 4.1

2.1.3 http服务器实现 目的 reactor应用——webserver webclient 每次上课前 看大纲down code 复习&#xff1a; 不行啊 编译给的代码报错啊 给的最新的不是0430那一版就不行啊 reactor.c:(.text0x254): relocation truncated to fit: R_X86_64_PC32 against symbol begin de…

python 光伏相关packages

除了PVLIB之外&#xff0c;还有一些其他光伏&#xff08;太阳能光伏&#xff09;相关的Python包和工具。这些包提供了各种功能&#xff0c;从光伏系统建模和仿真到数据处理和可视化。以下是一些常见的光伏相关Python包&#xff1a; SolarPy: SolarPy 是一个专注于太阳能数据分析…

待学习记录清单

1、大模型相关的知识和周边知识&#xff0c;相关的论文阅读&#xff1b; 论文0&#xff1a;https://arxiv.org/abs/1706.03762 论文1&#xff1a;https://arxiv.org/pdf/2306.07962 论文2&#xff1a;https://arxiv.org/abs/2405.20323 2代码链接&#xff1a;https://github.co…

《QT实用小工具·七十一》基于Qt+Qml开发的文件传输工具

1、概述 源码放在文章末尾 该项目基于QTQML实现了文件传输的功能&#xff0c;可以在局域网环境下使用(热点)&#xff0c;扫描使用UDP&#xff0c;传输使用TCP&#xff0c;每一个文件传输使用独立的线程进行处理&#xff0c;高效便捷。 开发环境 使用Qt/Qml开发 QT版本&#x…

(自用)关于程序的一些概念3:程序中的“选择“

前言 学习的基本过程有理解→总结→应用这几个步骤.总结的目的大概是概括出大体的一种思路,一些必然和必不然,整理出"概念",并以概念指导应用 引入 尝试做一些和编程有关的概念总结.为了满足那个很朴素的想法:总结出概念,编程的思路就水到渠成地来了.---就好像学了单…

Android --- 异步操作

同步和异步的差异 同步&#xff1a;在发生某件事后什么也不做&#xff0c;直到该事件完成后&#xff0c;再继续进行 异步&#xff1a;在某件事发生后&#xff0c;可以在等待他完成的时候去处理其他事件&#xff0c;等到该事件发生完成后&#xff0c;再回过头来处理它。 异步…

C#心跳机制服务器

控制台应用项目 Program.cs internal class Program {static Server server;static void Main(string[] args){Server server new Server(IPAddress.Any,3333);server.Start();// 除了服务器监听方法&#xff0c;监听客户连接的方法&#xff0c;扫描客户端是否在线的方法//如…

Linux服务器上激活conda环境conda: error: argument COMMAND: invalid choice: ‘activate‘

正常我们使用如下来流程&#xff1a; 创建环境&#xff1a;conda create -n 环境名称 激活环境&#xff1a;conda activate 环境名称 但是&#xff0c;在Linux服务器上&#xff0c;使用conda activate 环境名称&#xff0c;出现如上图所示的报错。conda: error: argument CO…

2.1 嵌入式八股文(三)

一、C中类成员的访问权限&#xff1f; C通过public、protected、private 三个关键字来控制成员变量和成员函数的访问权限&#xff0c;它们分别表示公有的、受保护的、私有的&#xff0c;被称为成员访问限定符。在类的内部&#xff08;定义类的代码内部&#xff09;&#xff0c…

Excel 常用技巧(六)

Microsoft Excel 是微软为 Windows、macOS、Android 和 iOS 开发的电子表格软件&#xff0c;可以用来制作电子表格、完成许多复杂的数据运算&#xff0c;进行数据的分析和预测&#xff0c;并且具有强大的制作图表的功能。由于 Excel 具有十分友好的人机界面和强大的计算功能&am…

CMSIS-RTOS2简介

本文介绍CMSIS-RTOS2。 1.引入 CMSIS-RTOS2在基于Arm Cortex处理器的设备上运行的实时操作系统内核上指定了通用RTOS接口。应用程序和中间件组件可以使用CMSIS-RTOS2 API在各种软件生态系统中实现更好的代码重用和更简单的集成。 CMSIS-RTOS2还指定了RTOS内核使用的标准OS T…

windows上安装redis,并且用pycharm联通调用测试

在 Windows 上启动 Redis&#xff0c;官网版本不支持windows直接安装&#xff0c;你可以按照以下步骤进行操作&#xff1a; 使用Github Redis 版本启动 Redis 如果你想使用 Redis 在 Windows 上启动 Redis&#xff0c;以下是基本的步骤&#xff1a; 下载 Redis&#xff1a; 访…

回溯算法2(c++)

棋盘问题 题目描述 在一个给定形状的棋盘&#xff08;形状可能是不规则的&#xff09;上面摆放棋子&#xff0c;棋子没有区别。 要求摆放时任意的两个棋子不能放在棋盘中的同一行或者同一列&#xff0c;请编程求解对于给定形状和大小的棋盘&#xff0c;摆放 k个棋子的所有可…

自定义starter并发布maven私服

一、搭建nexus私服 nexus就是maven的私有服务器&#xff0c;这个搭建教程可以在网络上找到很多&#xff0c;这里就不赘述了。搭建完成之后再进行下一步 二、本地maven的setting配置文件中配置nexus的用户名和密码 <servers><server><id>nexus-releases<…

索引在手,查询无忧:MySQL索引简介

在数据库的世界里&#xff0c;MySQL作为一款广泛使用的关系型数据库管理系统。在DB-Engines的2024年5月的数据库管理系统流行度排名中得分1084&#xff0c;仅次于老大哥Oracle&#xff0c;足以MySQL在全球数据库市场中占有重要地位&#xff0c;当然MySQL在2009年被Oracle公司收…

解决使用Jmeter进行测试时出现“302“,‘‘401“等用户未登录的问题

使用 JMeter 压力测试时解决登录问题的两种方法 在使用 JMeter 进行压力测试时&#xff0c;可能会遇程序存在安全验证&#xff0c;必须登录后才能对里面的具体方法进行测试&#xff1a; 如果遇到登录问题&#xff0c;通常是因为 JMeter 无法模拟用户的登录状态&#xff0c;导…

表单中的常用元素

10.图像形式上传文件 <input type“image”>定义图像形式的提交。 src 属性和alt属性必须与<input type“image””>结合使用。 input type"image"src"img/l.jpg"alt"submit"/> 11.下拉列表框 <select>标签定义下拉列表框…

JS中splice怎么使用

在JavaScript中&#xff0c;splice() 是一个数组方法&#xff0c;用于添加/删除项目&#xff0c;并返回被删除的项目。这个方法会改变原始数组。 splice() 方法的基本语法如下&#xff1a; array.splice(start[, deleteCount[, item1[, item2[, ...]]]]) start&#xff08;必…