基于自编码器的心电图信号异常检测(Python)

使用的数据集来自PTB心电图数据库,包括14552个心电图记录,包括两类:正常心跳和异常心跳,采样频率为125Hz。

import numpy as np
np.set_printoptions(suppress=True)
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.io import arff
from sklearn.model_selection import train_test_split
import matplotlib
matplotlib.rcParams["figure.figsize"] = (6, 4)
plt.style.use("ggplot")
import tensorflow as tf
from tensorflow import data
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.metrics import mae
from tensorflow.keras import layers
from tensorflow import keras
from sklearn.metrics import accuracy_score, recall_score, precision_score, confusion_matrix, f1_score, classification_report
import os
isGPU = tf.config.list_physical_devices('GPU')
directory_path = r'ECG Heartbeat Categorization Dataset'
for dirname, _, filenames in os.walk(directory_path):for filename in filenames:print(os.path.join(dirname, filename))
normal_df = pd.read_csv("ECG Heartbeat Categorization Dataset/ptbdb_normal.csv").iloc[:, :-1]
anomaly_df = pd.read_csv("ECG Heartbeat Categorization Dataset/ptbdb_abnormal.csv").iloc[:, :-1]
print("Shape of Normal data", normal_df.shape)
print("Shape of Abnormal data", anomaly_df.shape)
Shape of Normal data (4045, 187)
Shape of Abnormal data (10505, 187)
def plot_sample(normal, anomaly):index = np.random.randint(0, len(normal_df), 2)fig, ax = plt.subplots(1, 2, sharey=True, figsize=(10, 4))ax[0].plot(normal.iloc[index[0], :].values, label=f"Case {index[0]}")ax[0].plot(normal.iloc[index[1], :].values, label=f"Case {index[1]}")ax[0].legend(shadow=True, frameon=True, facecolor="inherit", loc=1, fontsize=9)ax[0].set_title("Normal")ax[1].plot(anomaly.iloc[index[0], :].values, label=f"Case {index[0]}")ax[1].plot(anomaly.iloc[index[1], :].values, label=f"Case {index[1]}")ax[1].legend(shadow=True, frameon=True, facecolor="inherit", loc=1, fontsize=9)ax[1].set_title("Anomaly")plt.tight_layout()plt.show()
plot_sample(normal_df, anomaly_df)

CLASS_NAMES = ["Normal", "Anomaly"]normal_df_copy = normal_df.copy()
anomaly_df_copy = anomaly_df.copy()
print(anomaly_df_copy.columns.equals(normal_df_copy.columns))
normal_df_copy = normal_df_copy.set_axis(range(1, 188), axis=1)
anomaly_df_copy = anomaly_df_copy.set_axis(range(1, 188), axis=1)
normal_df_copy = normal_df_copy.assign(target = CLASS_NAMES[0])
anomaly_df_copy = anomaly_df_copy.assign(target = CLASS_NAMES[1])df = pd.concat((normal_df_copy, anomaly_df_copy))
def plot_smoothed_mean(data, class_name = "normal", step_size=5, ax=None):df = pd.DataFrame(data)roll_df = df.rolling(step_size)smoothed_mean = roll_df.mean().dropna().reset_index(drop=True)smoothed_std = roll_df.std().dropna().reset_index(drop=True)margin = 3*smoothed_stdlower_bound = (smoothed_mean - margin).values.flatten()upper_bound = (smoothed_mean + margin).values.flatten()ax.plot(smoothed_mean.index, smoothed_mean)ax.fill_between(smoothed_mean.index, lower_bound, y2=upper_bound, alpha=0.3, color="red")ax.set_title(class_name, fontsize=9)
fig, axes = plt.subplots(1, 2, figsize=(8, 4), sharey=True)
axes = axes.flatten()
for i, label in enumerate(CLASS_NAMES, start=1):data_group = df.groupby("target")data = data_group.get_group(label).mean(axis=0, numeric_only=True).to_numpy()plot_smoothed_mean(data, class_name=label, step_size=20, ax=axes[i-1])
fig.suptitle("Plot of smoothed mean for each class", y=0.95, weight="bold")
plt.tight_layout()

normal_df.drop("target", axis=1, errors="ignore", inplace=True)
normal = normal_df.to_numpy()
anomaly_df.drop("target", axis=1, errors="ignore", inplace=True)
anomaly = anomaly_df.to_numpy()X_train, X_test = train_test_split(normal, test_size=0.15, random_state=45, shuffle=True)
print(f"Train shape: {X_train.shape}, Test shape: {X_test.shape}, anomaly shape: {anomaly.shape}")
Train shape: (3438, 187), Test shape: (607, 187), anomaly shape: (10505, 187)
class AutoEncoder(Model):def __init__(self, input_dim, latent_dim):super(AutoEncoder, self).__init__()self.input_dim = input_dimself.latent_dim = latent_dimself.encoder = tf.keras.Sequential([layers.Input(shape=(input_dim,)),layers.Reshape((input_dim, 1)),  # Reshape to 3D for Conv1Dlayers.Conv1D(128, 3, strides=1, activation='relu', padding="same"),layers.BatchNormalization(),layers.MaxPooling1D(2, padding="same"),layers.Conv1D(128, 3, strides=1, activation='relu', padding="same"),layers.BatchNormalization(),layers.MaxPooling1D(2, padding="same"),layers.Conv1D(latent_dim, 3, strides=1, activation='relu', padding="same"),layers.BatchNormalization(),layers.MaxPooling1D(2, padding="same"),])# Previously, I was using UpSampling. I am trying Transposed Convolution this time around.self.decoder = tf.keras.Sequential([layers.Conv1DTranspose(latent_dim, 3, strides=1, activation='relu', padding="same"),
#             layers.UpSampling1D(2),layers.BatchNormalization(),layers.Conv1DTranspose(128, 3, strides=1, activation='relu', padding="same"),
#             layers.UpSampling1D(2),layers.BatchNormalization(),layers.Conv1DTranspose(128, 3, strides=1, activation='relu', padding="same"),
#             layers.UpSampling1D(2),layers.BatchNormalization(),layers.Flatten(),layers.Dense(input_dim)])def call(self, X):encoded = self.encoder(X)decoded = self.decoder(encoded)return decodedinput_dim = X_train.shape[-1]
latent_dim = 32model = AutoEncoder(input_dim, latent_dim)
model.build((None, input_dim))
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.01), loss="mae")
model.summary()
Model: "auto_encoder"
_________________________________________________________________Layer (type)                Output Shape              Param #   
=================================================================sequential (Sequential)     (None, 24, 32)            63264     sequential_1 (Sequential)   (None, 187)               640603    =================================================================
Total params: 703867 (2.69 MB)
Trainable params: 702715 (2.68 MB)
Non-trainable params: 1152 (4.50 KB)
epochs = 100
batch_size = 128
early_stopping = EarlyStopping(patience=10, min_delta=1e-3, monitor="val_loss", restore_best_weights=True)history = model.fit(X_train, X_train, epochs=epochs, batch_size=batch_size,validation_split=0.1, callbacks=[early_stopping])

plt.plot(history.history['loss'], label="Training loss")
plt.plot(history.history['val_loss'], label="Validation loss", ls="--")
plt.legend(shadow=True, frameon=True, facecolor="inherit", loc="best", fontsize=9)
plt.title("Training loss")
plt.ylabel("Loss")
plt.xlabel("Epoch")
plt.show()

train_mae = model.evaluate(X_train, X_train, verbose=0)
test_mae = model.evaluate(X_test, X_test, verbose=0)
anomaly_mae = model.evaluate(anomaly_df, anomaly_df, verbose=0)print("Training dataset error: ", train_mae)
print("Testing dataset error: ", test_mae)
print("Anormaly dataset error: ", anomaly_mae)

Training dataset error:  0.014224529266357422
Testing dataset error:  0.01488062646239996
Anormaly dataset error:  0.043484628200531006
def predict(model, X):pred = model.predict(X, verbose=False)loss = mae(pred, X)return pred, loss

_, train_loss = predict(model, X_train)
_, test_loss = predict(model, X_test)
_, anomaly_loss = predict(model, anomaly)
threshold = np.mean(train_loss) + np.std(train_loss) # Setting threshold for distinguish normal data from anomalous databins = 40
plt.figure(figsize=(9, 5), dpi=100)
sns.histplot(np.clip(train_loss, 0, 0.5), bins=bins, kde=True, label="Train Normal")
sns.histplot(np.clip(test_loss, 0, 0.5), bins=bins, kde=True, label="Test Normal")
sns.histplot(np.clip(anomaly_loss, 0, 0.5), bins=bins, kde=True, label="anomaly")ax = plt.gca()  # Get the current Axes
ylim = ax.get_ylim()
plt.vlines(threshold, 0, ylim[-1], color="k", ls="--")
plt.annotate(f"Threshold: {threshold:.3f}", xy=(threshold, ylim[-1]), xytext=(threshold+0.009, ylim[-1]),arrowprops=dict(facecolor='black', shrink=0.05), fontsize=9)
plt.legend(shadow=True, frameon=True, facecolor="inherit", loc="best", fontsize=9)
plt.show()

def plot_examples(model, data, ax, title):pred, loss = predict(model, data)ax.plot(data.flatten(), label="Actual")ax.plot(pred[0], label = "Predicted")ax.fill_between(range(1, 188), data.flatten(), pred[0], alpha=0.3, color="r")ax.legend(shadow=True, frameon=True,facecolor="inherit", loc=1, fontsize=7)
#                bbox_to_anchor = (0, 0, 0.8, 0.25))ax.set_title(f"{title} (loss: {loss[0]:.3f})", fontsize=9.5)
fig, axes = plt.subplots(2, 5, sharey=True, sharex=True, figsize=(12, 6))
random_indexes = np.random.randint(0, len(X_train), size=5)for i, idx in enumerate(random_indexes):data = X_train[[idx]]plot_examples(model, data, ax=axes[0, i], title="Normal")for i, idx in enumerate(random_indexes):data = anomaly[[idx]]plot_examples(model, data, ax=axes[1, i], title="anomaly")plt.tight_layout()
fig.suptitle("Sample plots (Actual vs Reconstructed by the CNN autoencoder)", y=1.04, weight="bold")
fig.savefig("autoencoder.png")
plt.show()

Model Evaluation

def evaluate_model(model, data):pred, loss = predict(model, data)if id(data) == id(anomaly):accuracy = np.sum(loss > threshold)/len(data)else:accuracy = np.sum(loss <= threshold)/len(data)return f"Accuracy: {accuracy:.2%}"
print("Training", evaluate_model(model, X_train))
print("Testing", evaluate_model(model, X_test))
print("Anomaly", evaluate_model(model, anomaly))
Training Accuracy: 88.66%
Testing Accuracy: 84.51%
Anomaly Accuracy: 77.34%
def prepare_labels(model, train, test, anomaly, threshold=threshold):ytrue = np.concatenate((np.ones(len(X_train)+len(X_test), dtype=int), np.zeros(len(anomaly), dtype=int)))_, train_loss = predict(model, train)_, test_loss = predict(model, test)_, anomaly_loss = predict(model, anomaly)train_pred = (train_loss <= threshold).numpy().astype(int)test_pred = (test_loss <= threshold).numpy().astype(int)anomaly_pred = (anomaly_loss < threshold).numpy().astype(int)ypred = np.concatenate((train_pred, test_pred, anomaly_pred))return ytrue, ypred
def plot_confusion_matrix(model, train, test, anomaly, threshold=threshold):ytrue, ypred = prepare_labels(model, train, test, anomaly, threshold=threshold)accuracy = accuracy_score(ytrue, ypred)precision = precision_score(ytrue, ypred)recall = recall_score(ytrue, ypred)f1 = f1_score(ytrue, ypred)print(f"""\Accuracy: {accuracy:.2%}Precision: {precision:.2%}Recall: {recall:.2%}f1: {f1:.2%}\n""")cm = confusion_matrix(ytrue, ypred)cm_norm = confusion_matrix(ytrue, ypred, normalize="true")data = np.array([f"{count}\n({pct:.2%})" for count, pct in zip(cm.ravel(), cm_norm.ravel())]).reshape(cm.shape)labels = ["Anomaly", "Normal"]plt.figure(figsize=(5, 4))sns.heatmap(cm, annot=data, fmt="", xticklabels=labels, yticklabels=labels)plt.ylabel("Actual")plt.xlabel("Predicted")plt.title("Confusion Matrix", weight="bold")plt.tight_layout()
plot_confusion_matrix(model, X_train, X_test, anomaly, threshold=threshold)
Accuracy: 80.32%
Precision: 59.94%
Recall: 88.03%
f1: 71.32%

ytrue, ypred = prepare_labels(model, X_train, X_test, anomaly, threshold=threshold)
print(classification_report(ytrue, ypred, target_names=CLASS_NAMES))

precision recall f1-score support
Normal 0.94 0.77 0.85 10505
Anomaly 0.60 0.88 0.71 4045
accuracy 0.80 14550
macro avg 0.77 0.83 0.78 14550
weighted avg 0.85 0.80 0.81 14550

工学博士,担任《Mechanical System and Signal Processing》《中国电机工程学报》《控制与决策》等期刊审稿专家,擅长领域:现代信号处理,机器学习,深度学习,数字孪生,时间序列分析,设备缺陷检测、设备异常检测、设备智能故障诊断与健康管理PHM等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/28907.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

reverse-android-淘最热点so

资源 1. com.maihan.tredian 2021版 淘最热点 2. 该 app 没有加壳 ,也没混淆。 登录抓包 POST: https://api.taozuiredian.com/api/v1/auth/login/sms POST /api/v1/auth/login/sms HTTP/1.1 Content-Type: application/json Connection: close Charset: UTF-8 User-Agen…

RabbitMQ实践——在Ubuntu上安装并启用管理后台

大纲 环境安装启动管理后台 RabbitMQ是一款功能强大、灵活可靠的消息代理软件&#xff0c;为分布式系统中的通信问题提供了优秀的解决方案。无论是在大规模数据处理、实时分析还是微服务架构中&#xff0c;RabbitMQ都能发挥出色的性能&#xff0c;帮助开发者构建高效、稳定的系…

夏季河湖防溺水新举措:EasyCVR+AI视频智能监控系统保障水域安全

近日一则新闻引起大众关注&#xff0c;有网友发布视频称&#xff0c;假期在逛西湖时&#xff0c;发现水面上“平躺”漂浮着一名游客在等待救援。在事发3分钟内&#xff0c;沿湖救生员成功将落水游客救到了岸边。 随着夏季的到来&#xff0c;雨水增多&#xff0c;各危险水域水位…

Windows添加用户

以管理员身份进入CMD命令中心&#xff0c;执行以下命令&#xff1a; // net user 用户名 密码 /add net user admin 123456 /add 执行完成会已添加该用户名&#xff0c;可在系统设置中查看更改

免费插件集-illustrator插件-Ai插件-批量加边框

文章目录 1.介绍2.安装3.通过窗口>扩展>知了插件4.功能解释5.总结 1.介绍 本文介绍一款免费插件&#xff0c;加强illustrator使用人员工作效率&#xff0c;进行批量加边框处理。首先从下载网址下载这款插件 https://download.csdn.net/download/m0_67316550/87890501&am…

vscode字符多行自动增长插件。

多行字符自动增长插件CharAutoIncre 当你使用shiftalt选中了多行,并输入了’1’,这时这几行都变成了’1’. 这时你可以选中&#xff08;shift左键&#xff09;为’1’的这几行, 接下来按下shiftaltq此时’1’变为了’12345’自增长的样式。 同时本插件支持字符’a-z,A-Z’。 目…

离散数学-代数系统证明题归类

什么是独异点&#xff1f; 运算 在B上封闭&#xff0c;运算 可结合&#xff0c;且存在幺元。 学会合理套用题目公式结合律 零元&#xff1f; 群中不可能有零元 几个结论要熟记&#xff1a; 1.当群的阶为1时&#xff0c;它的唯一元素视作幺元e 2.若群的阶大于1时&#xff0c;…

多标签识别:JoyTag模型的图像标注革命【开源】

公共视觉模型通常会对其训练数据集进行严格过滤&#xff0c;这限制了这些基础模型在广泛概念上的表现&#xff0c;进而限制了表达自由、包容性和多样性。JoyTag通过结合Danbooru 2021数据集和一组手动标记的图像&#xff0c;努力提高模型对不同类型图像的泛化能力。 JoyTag项目…

雪花算法和UUID

目录 雪花算法概念优点和不足优点:缺点:解决方案代码示例 UUID优点与不足优点不足 两种算法的比较应用场景区别 雪花算法 概念 雪花算法是一个分布式id生成算法&#xff0c;它生成的id一般情况下具有唯一性。由64位01数字组成&#xff0c;第一位是符号位&#xff0c;始终为0。…

专业纸箱厂:品质之选

在繁忙的工业园区&#xff0c;我们的纸箱厂以其卓越的品质和高效的生产能力脱颖而出。我们深谙纸箱制造的精髓&#xff0c;不断推陈出新&#xff0c;将传统工艺与现代科技完美结合。我们的纸箱不仅坚固耐用&#xff0c;而且设计独特&#xff0c;能够满足各种包装需求。 田东美达…

宝塔安装了redis但是远程无法连接

服务器&#xff1a;阿里云 宝塔版本&#xff1a;8.0.5 redis版本&#xff1a;7.2.4 操作步骤&#xff1a; 1.在阿里云上开放redis端口&#xff1a;6379 2.在宝塔上开发端口 3.修改redis配置文件&#xff1a; 修改一&#xff1a; 注释&#xff1a;bind 127.0.0.1&#xff0c;…

Chromium 开发指南2024 Mac篇-编译前的准备工作(一)

1.引言 Chromium 是一款开源的网页浏览器项目&#xff0c;作为 Google Chrome 浏览器的基础&#xff0c;其卓越的性能和广泛的应用使其成为众多开发者研究和学习的对象。对于希望深入了解浏览器内核&#xff0c;或是计划在 Chromium 基础上开发自定义浏览器的开发者来说&#…

ANSYS EMC解决方案与经典案例

EMC问题非常复杂&#xff0c;各行各业都会涉及&#xff0c;例如航空、航天、船舶、汽车、火车、高科技、物联网、消费电子。要考虑EMC的对象很多&#xff0c;包含整个系统、设备、PCB、线缆、电源、芯片封装。而且技术领域覆盖广&#xff0c;涉及高频问题、低频问题&#xff1b…

实用软件下载:UltraEditUEStudio最新安装包及详细安装教程

​UEStudio简介&#xff1a;UEStudio建立在上文本编辑器UltraEdit的功能基础上&#xff0c;并为团队和开发人员提供了其他功能&#xff0c;例如深度Git集成&#xff0c;您可以直接在UEStudio中克隆&#xff0c;签出&#xff0c;更新&#xff0c;提交&#xff0c;推入/拉入等操作…

揭秘“湖仓一体”——Flink+Paimon+StarRocks,打造实时分析新纪元

1.湖仓一体 数据湖仓是 Flink 流批一体发挥重要作用的场景,使用 Flink Paimon starRocks 来构建湖仓一体数据分析. Apache Paimon 是一个专为实时数据处理而设计的湖表格式&#xff0c;它最大的亮点是使用了 LSM Tree 技术。与 Hudi 相比&#xff0c;Paimon 在更新插入&…

javaWeb项目-ssm+vue大学生心理健康管理系统功能介绍

本项目源码&#xff1a;java-springboot大学生心理健康管理系统的设计与实现源码资源-CSDN文库 项目关键技术 开发工具&#xff1a;IDEA 、Eclipse 编程语言: Java 数据库: MySQL5.7 框架&#xff1a;ssm、Springboot 前端&#xff1a;Vue、ElementUI 关键技术&#xff1a;spr…

celery使用 Zookeeper 或 kafka 作为broker,使用 mysql 作为 backend

背景介绍: 先贴上celery官方文档:Celery - Distributed Task Queue — Celery 5.4.0 documentation xxx项目中单点环境运行celery + redis,使用流畅,不做过多介绍。 切换高可用环境时,客户redis使用的是cluster集群,官方文档中并没有对redis cluster的支持,查看githu…

Kubernetes 1.18 部署 Traefik2.0

Kubernetes 1.18部署 Traefik2.0 参考资料&#xff1a; Traefik 2.0 官方文档&#xff1a;https://doc.traefik.io/traefik/v2.0/Kubernetes 1.18.3 部署 Traefik2.0&#xff1a;https://www.cnblogs.com/heian99/p/14608414.html 1. Traefik 介绍 traefik 是一款反向代理、…

CRMEB多门店的门店后台首页路由

如何在输入 http://localhost:8080/、http://localhost:8080/store/、http://localhost:8080/custom-store/ 这三个中任意一个链接都能正确跳转到 http://localhost:8080/store/home/index 。要实这个要求&#xff0c;有两种方式&#xff1a; 重定向 const router new VueRo…

App端接口用例设计方法和测试方法

&#x1f345; 视频学习&#xff1a;文末有免费的配套视频可观看 &#x1f345; 点击文末小卡片 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 前言 接口测试作为测试的重要一环&#xff0c;重点关注的是数据层面的输入输出&#xff0c;今天…