文章目录
- 一、前言
- 二、前期工作
- 1. 介绍
- 2. 加载和预处理数据
- 二、构建训练和验证集
- 三、数据增强
- 四、数据可视化
- 五、构建3D卷积神经网络模型
- 六、训练模型
- 七、可视化模型性能
- 八、对单次 CT 扫描进行预测
一、前言
我的环境:
- 语言环境:Python3.6.5
- 编译器:jupyter notebook
- 深度学习环境:TensorFlow2.4.1
往期精彩内容:
- 卷积神经网络(CNN)实现mnist手写数字识别
- 卷积神经网络(CNN)多种图片分类的实现
- 卷积神经网络(CNN)衣服图像分类的实现
- 卷积神经网络(CNN)鲜花识别
- 卷积神经网络(CNN)天气识别
- 卷积神经网络(VGG-16)识别海贼王草帽一伙
- 卷积神经网络(ResNet-50)鸟类识别
- 卷积神经网络(AlexNet)鸟类识别
- 卷积神经网络(CNN)识别验证码
来自专栏:机器学习与深度学习算法推荐
二、前期工作
1. 介绍
本案例将展示通过构建 3D 卷积神经网络 (CNN) 来预测计算机断层扫描 (CT) 中病毒性肺炎是否存在。 2D 的 CNN 通常用于处理 RGB 图像(3 个通道)。 3D 的 CNN 仅仅是 3D 等价物,我们可以将 3D 图像简单理解成 2D 图像的叠加。3D 的 CNN 可以理解成是学习立体数据的强大模型。
import os,zipfile
import numpy as np
from tensorflow import keras
from tensorflow.keras import layersimport tensorflow as tf
gpus = tf.config.list_physical_devices("GPU")if gpus:tf.config.experimental.set_memory_growth(gpus[0], True) #设置GPU显存用量按需使用tf.config.set_visible_devices([gpus[0]],"GPU")# 打印显卡信息,确认GPU可用
print(gpus)
2. 加载和预处理数据
数据文件是 Nifti,扩展名为 .nii。我使用nibabel
包来读取文件,你可以通过 pip install nibabel
来安装 nibabel
包。
数据预处理步骤:
- 首先将体积旋转 90 度,确保方向是固定的
- 将 HU 值缩放到 0 和 1 之间。
- 调整宽度、高度和深度。
我定义了几个辅助函数来完成处理数据,这些功能将在构建训练和验证数据集时使用。
import nibabel as nib
from scipy import ndimagedef read_nifti_file(filepath):# 读取文件scan = nib.load(filepath)# 获取数据scan = scan.get_fdata()return scandef normalize(volume):"""归一化"""min = -1000max = 400volume[volume < min] = minvolume[volume > max] = maxvolume = (volume - min) / (max - min)volume = volume.astype("float32")return volumedef resize_volume(img):"""修改图像大小"""# Set the desired depthdesired_depth = 64desired_width = 128desired_height = 128# Get current depthcurrent_depth = img.shape[-1]current_width = img.shape[0]current_height = img.shape[1]# Compute depth factordepth = current_depth / desired_depthwidth = current_width / desired_widthheight = current_height / desired_heightdepth_factor = 1 / depthwidth_factor = 1 / widthheight_factor = 1 / height# 旋转img = ndimage.rotate(img, 90, reshape=False)# 数据调整img = ndimage.zoom(img, (width_factor, height_factor, depth_factor), order=1)return imgdef process_scan(path):# 读取文件volume = read_nifti_file(path)# 归一化volume = normalize(volume)# 调整尺寸 width, height and depthvolume = resize_volume(volume)return volume
读取CT扫描文件的路径
# “CT-0”文件夹中是正常肺组织的CT扫描
normal_scan_paths = [os.path.join(os.getcwd(), "MosMedData/CT-0", x)for x in os.listdir("MosMedData/CT-0")
]# “CT-23”文件夹中是患有肺炎的人的CT扫描
abnormal_scan_paths = [os.path.join(os.getcwd(), "MosMedData/CT-23", x)for x in os.listdir("MosMedData/CT-23")
]print("CT scans with normal lung tissue: " + str(len(normal_scan_paths)))
print("CT scans with abnormal lung tissue: " + str(len(abnormal_scan_paths)))
CT scans with normal lung tissue: 100
CT scans with abnormal lung tissue: 100
# 读取数据并进行预处理
abnormal_scans = np.array([process_scan(path) for path in abnormal_scan_paths])
normal_scans = np.array([process_scan(path) for path in normal_scan_paths])# 标签数字化
abnormal_labels = np.array([1 for _ in range(len(abnormal_scans))])
normal_labels = np.array([0 for _ in range(len(normal_scans))])
二、构建训练和验证集
从类目录中读取扫描并分配标签。对扫描进行下采样以具有 128x128x64 的形状。将原始 HU 值重新调整到 0 到 1 的范围内。最后,将数据集拆分为训练和验证子集。
# 按照7:3的比例划分训练集、验证集
x_train = np.concatenate((abnormal_scans[:70], normal_scans[:70]), axis=0)
y_train = np.concatenate((abnormal_labels[:70], normal_labels[:70]), axis=0)
x_val = np.concatenate((abnormal_scans[70:], normal_scans[70:]), axis=0)
y_val = np.concatenate((abnormal_labels[70:], normal_labels[70:]), axis=0)
print("Number of samples in train and validation are %d and %d."% (x_train.shape[0], x_val.shape[0])
)
Number of samples in train and validation are 140 and 60.
三、数据增强
CT扫描也通过在训练期间在随机角度旋转来增强数据。由于数据存储在Rank-3的形状(样本,高度,宽度,深度)中,因此我们在轴4处添加大小1的尺寸,以便能够对数据执行3D卷积。因此,新形状(样品,高度,宽度,深度,1)。在那里有不同类型的预处理和增强技术,这个例子显示了一些简单的开始。
import random
from scipy import ndimage@tf.function
def rotate(volume):"""不同程度上进行旋转"""def scipy_rotate(volume):# 定义一些旋转角度angles = [-20, -10, -5, 5, 10, 20]# 随机选择一个角度angle = random.choice(angles)volume = ndimage.rotate(volume, angle, reshape=False)volume[volume < 0] = 0volume[volume > 1] = 1return volumeaugmented_volume = tf.numpy_function(scipy_rotate, [volume], tf.float32)return augmented_volumedef train_preprocessing(volume, label):volume = rotate(volume)volume = tf.expand_dims(volume, axis=3)return volume, labeldef validation_preprocessing(volume, label):volume = tf.expand_dims(volume, axis=3)return volume, label
在定义训练和验证数据加载器的同时,训练数据将进行不同角度的随机旋转。训练和验证数据都已重新调整为具有 0 到 1 之间的值。
# 定义数据加载器
train_loader = tf.data.Dataset.from_tensor_slices((x_train, y_train))
validation_loader = tf.data.Dataset.from_tensor_slices((x_val, y_val))batch_size = 2train_dataset = (train_loader.shuffle(len(x_train)).map(train_preprocessing).batch(batch_size).prefetch(2)
)validation_dataset = (validation_loader.shuffle(len(x_val)).map(validation_preprocessing).batch(batch_size).prefetch(2)
)
四、数据可视化
import matplotlib.pyplot as pltdata = train_dataset.take(1)
images, labels = list(data)[0]
images = images.numpy()
image = images[0]
print("Dimension of the CT scan is:", image.shape)
plt.imshow(np.squeeze(image[:, :, 30]), cmap="gray")
Dimension of the CT scan is: (128, 128, 64, 1)
def plot_slices(num_rows, num_columns, width, height, data):"""Plot a montage of 20 CT slices"""data = np.rot90(np.array(data))data = np.transpose(data)data = np.reshape(data, (num_rows, num_columns, width, height))rows_data, columns_data = data.shape[0], data.shape[1]heights = [slc[0].shape[0] for slc in data]widths = [slc.shape[1] for slc in data[0]]fig_width = 12.0fig_height = fig_width * sum(heights) / sum(widths)f, axarr = plt.subplots(rows_data,columns_data,figsize=(fig_width, fig_height),gridspec_kw={"height_ratios": heights},)for i in range(rows_data):for j in range(columns_data):axarr[i, j].imshow(data[i][j], cmap="gray")axarr[i, j].axis("off")plt.subplots_adjust(wspace=0, hspace=0, left=0, right=1, bottom=0, top=1)plt.show()# Visualize montage of slices.
# 4 rows and 10 columns for 100 slices of the CT scan.
plot_slices(4, 10, 128, 128, image[:, :, :40])
五、构建3D卷积神经网络模型
为了使模型更容易理解,我将其构建成块。
def get_model(width=128, height=128, depth=64):"""构建 3D 卷积神经网络模型"""inputs = keras.Input((width, height, depth, 1))x = layers.Conv3D(filters=64, kernel_size=3, activation="relu")(inputs)x = layers.MaxPool3D(pool_size=2)(x)x = layers.BatchNormalization()(x)x = layers.Conv3D(filters=64, kernel_size=3, activation="relu")(x)x = layers.MaxPool3D(pool_size=2)(x)x = layers.BatchNormalization()(x)x = layers.Conv3D(filters=128, kernel_size=3, activation="relu")(x)x = layers.MaxPool3D(pool_size=2)(x)x = layers.BatchNormalization()(x)x = layers.Conv3D(filters=256, kernel_size=3, activation="relu")(x)x = layers.MaxPool3D(pool_size=2)(x)x = layers.BatchNormalization()(x)x = layers.GlobalAveragePooling3D()(x)x = layers.Dense(units=512, activation="relu")(x)x = layers.Dropout(0.3)(x)outputs = layers.Dense(units=1, activation="sigmoid")(x)# 定义模型model = keras.Model(inputs, outputs, name="3dcnn")return model# 构建模型
model = get_model(width=128, height=128, depth=64)
model.summary()
六、训练模型
# 设置动态学习率
initial_learning_rate = 1e-4
lr_schedule = keras.optimizers.schedules.ExponentialDecay(initial_learning_rate, decay_steps=30, decay_rate=0.96, staircase=True
)
# 编译
model.compile(loss="binary_crossentropy",optimizer=keras.optimizers.Adam(learning_rate=lr_schedule),metrics=["acc"],
)
# 保存模型
checkpoint_cb = keras.callbacks.ModelCheckpoint("3d_image_classification.h5", save_best_only=True
)
# 定义早停策略
early_stopping_cb = keras.callbacks.EarlyStopping(monitor="val_acc", patience=15)epochs = 100
model.fit(train_dataset,validation_data=validation_dataset,epochs=epochs,shuffle=True,verbose=2,callbacks=[checkpoint_cb, early_stopping_cb],
)
七、可视化模型性能
fig, ax = plt.subplots(1, 2, figsize=(20, 3))
ax = ax.ravel()for i, metric in enumerate(["acc", "loss"]):ax[i].plot(model.history.history[metric])ax[i].plot(model.history.history["val_" + metric])ax[i].set_title("Model {}".format(metric))ax[i].set_xlabel("epochs")ax[i].set_ylabel(metric)ax[i].legend(["train", "val"])
八、对单次 CT 扫描进行预测
# 加载模型
model.load_weights("3d_image_classification.h5")
prediction = model.predict(np.expand_dims(x_val[0], axis=0))[0]
scores = [1 - prediction[0], prediction[0]]class_names = ["normal", "abnormal"]
for score, name in zip(scores, class_names):print("This model is %.2f percent confident that CT scan is %s"% ((100 * score), name))
This model is 27.88 percent confident that CT scan is normal
This model is 72.12 percent confident that CT scan is abnormal