目录
1 数据处理
1.1 数据集介绍
1.2 数据读取
1.3 构造Dataset类
2 模型构建
3 模型训练
4 模型评价
5 模型预测
6 什么是预训练模型和迁移学习
7 比较“使用预训练模型”和“不使用预训练模型”的效果。
总结
在本实践中,我们实践一个更通用的图像分类任务。
图像分类(Image Classification)是计算机视觉中的一个基础任务,将图像的语义将不同图像划分到不同类别。很多任务也可以转换为图像分类任务。比如人脸检测就是判断一个区域内是否有人脸,可以看作一个二分类的图像分类任务。
- 数据集:CIFAR-10数据集,
- 网络:ResNet18模型,
- 损失函数:交叉熵损失,
- 优化器:Adam优化器,Adam优化器的介绍参考NNDL第7.2.4.3节。
- 评价指标:准确率
引入头文件:
import os
import pickle
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
from PIL import Image
import torch.nn.functional as F
import torch.optim as opt
from nndl import RunnerV3, Accuracy
from nndl import plot
nndl.py
import torch
import matplotlib.pyplot as plt
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")class RunnerV3(object):def __init__(self, model, optimizer, loss_fn, metric, **kwargs):self.model = modelself.optimizer = optimizerself.loss_fn = loss_fnself.metric = metric # 只用于计算评价指标# 记录训练过程中的评价指标变化情况self.dev_scores = []# 记录训练过程中的损失函数变化情况self.train_epoch_losses = [] # 一个epoch记录一次lossself.train_step_losses = [] # 一个step记录一次lossself.dev_losses = []# 记录全局最优指标self.best_score = 0def train(self, train_loader, dev_loader=None, **kwargs):# 将模型切换为训练模式self.model.train()# 传入训练轮数,如果没有传入值则默认为0num_epochs = kwargs.get("num_epochs", 0)# 传入log打印频率,如果没有传入值则默认为100log_steps = kwargs.get("log_steps", 100)# 评价频率eval_steps = kwargs.get("eval_steps", 0)# 传入模型保存路径,如果没有传入值则默认为"best_model.pdparams"save_path = kwargs.get("save_path", "best_model.pdparams")custom_print_log = kwargs.get("custom_print_log", None)# 训练总的步数num_training_steps = num_epochs * len(train_loader)if eval_steps:if self.metric is None:raise RuntimeError('Error: Metric can not be None!')if dev_loader is None:raise RuntimeError('Error: dev_loader can not be None!')# 运行的step数目global_step = 0# 进行num_epochs轮训练for epoch in range(num_epochs):# 用于统计训练集的损失total_loss = 0for step, data in enumerate(train_loader):X, y = data# 获取模型预测logits = self.model(X.to(device))loss = self.loss_fn(logits, y.long().to(device)) # 默认求meantotal_loss += loss# 训练过程中,每个step的loss进行保存self.train_step_losses.append((global_step, loss.item()))if log_steps and global_step % log_steps == 0:print(f"[Train] epoch: {epoch}/{num_epochs}, step: {global_step}/{num_training_steps}, loss: {loss.item():.5f}")# 梯度反向传播,计算每个参数的梯度值loss.backward()if custom_print_log:custom_print_log(self)# 小批量梯度下降进行参数更新self.optimizer.step()# 梯度归零self.optimizer.zero_grad()# 判断是否需要评价if eval_steps > 0 and global_step > 0 and \(global_step % eval_steps == 0 or global_step == (num_training_steps - 1)):dev_score, dev_loss = self.evaluate(dev_loader, global_step=global_step)print(f"[Evaluate] dev score: {dev_score:.5f}, dev loss: {dev_loss:.5f}")# 将模型切换为训练模式self.model.train()# 如果当前指标为最优指标,保存该模型if dev_score > self.best_score:self.save_model(save_path)print(f"[Evaluate] best accuracy performence has been updated: {self.best_score:.5f} --> {dev_score:.5f}")self.best_score = dev_scoreglobal_step += 1# 当前epoch 训练loss累计值trn_loss = (total_loss / len(train_loader)).item()# epoch粒度的训练loss保存self.train_epoch_losses.append(trn_loss)print("[Train] Training done!")# 模型评估阶段,使用'torch.no_grad()'控制不计算和存储梯度@torch.no_grad()def evaluate(self, dev_loader, **kwargs):assert self.metric is not None# 将模型设置为评估模式self.model.eval()global_step = kwargs.get("global_step", -1)# 用于统计训练集的损失total_loss = 0# 重置评价self.metric.reset()# 遍历验证集每个批次for batch_id, data in enumerate(dev_loader):X, y = data# 计算模型输出logits = self.model(X.to(device))# 计算损失函数loss = self.loss_fn(logits, y.long().to(device)).item()# 累积损失total_loss += loss# 累积评价self.metric.update(logits, y.to(device))dev_loss = (total_loss / len(dev_loader))dev_score = self.metric.accumulate()# 记录验证集lossif global_step != -1:self.dev_losses.append((global_step, dev_loss))self.dev_scores.append(dev_score)return dev_score, dev_loss# 模型评估阶段,使用'torch.no_grad()'控制不计算和存储梯度@torch.no_grad()def predict(self, x, **kwargs):# 将模型设置为评估模式self.model.eval()# 运行模型前向计算,得到预测值logits = self.model(x.to(device))return logitsdef save_model(self, save_path):torch.save(self.model.state_dict(), save_path)def load_model(self, model_path):state_dict = torch.load(model_path)self.model.load_state_dict(state_dict)class Accuracy():def __init__(self, is_logist=True):# 用于统计正确的样本个数self.num_correct = 0# 用于统计样本的总数self.num_count = 0self.is_logist = is_logistdef update(self, outputs, labels):# 判断是二分类任务还是多分类任务,shape[1]=1时为二分类任务,shape[1]>1时为多分类任务if outputs.shape[1] == 1: # 二分类outputs = torch.squeeze(outputs, dim=-1)if self.is_logist:# logist判断是否大于0preds = torch.tensor((outputs >= 0), dtype=torch.float32)else:# 如果不是logist,判断每个概率值是否大于0.5,当大于0.5时,类别为1,否则类别为0preds = torch.tensor((outputs >= 0.5), dtype=torch.float32)else:# 多分类时,使用'torch.argmax'计算最大元素索引作为类别preds = torch.argmax(outputs, dim=1)# 获取本批数据中预测正确的样本个数labels = torch.squeeze(labels, dim=-1)batch_correct = torch.sum((preds == labels).float()).clone().detach()batch_count = len(labels)# 更新num_correct 和 num_countself.num_correct += batch_correctself.num_count += batch_countdef accumulate(self):# 使用累计的数据,计算总的指标if self.num_count == 0:return 0return self.num_correct / self.num_countdef reset(self):# 重置正确的数目和总数self.num_correct = 0self.num_count = 0def name(self):return "Accuracy"def plot(runner, fig_name):plt.figure(figsize=(10, 5))plt.subplot(1, 2, 1)train_items = runner.train_step_losses[::30]train_steps = [x[0] for x in train_items]train_losses = [x[1] for x in train_items]plt.plot(train_steps, train_losses, color='#8E004D', label="Train loss")if runner.dev_losses[0][0] != -1:dev_steps = [x[0] for x in runner.dev_losses]dev_losses = [x[1] for x in runner.dev_losses]plt.plot(dev_steps, dev_losses, color='#E20079', linestyle='--', label="Dev loss")# 绘制坐标轴和图例plt.ylabel("loss", fontsize='x-large')plt.xlabel("step", fontsize='x-large')plt.legend(loc='upper right', fontsize='x-large')plt.subplot(1, 2, 2)# 绘制评价准确率变化曲线dev_scores_cpu = [t.cpu() for t in runner.dev_scores]if runner.dev_losses[0][0] != -1:plt.plot(dev_steps, dev_scores_cpu,color='#E20079', linestyle="--", label="Dev accuracy")else:plt.plot(list(range(len(runner.dev_scores))), dev_scores_cpu,color='#E20079', linestyle="--", label="Dev accuracy")# 绘制坐标轴和图例plt.ylabel("score", fontsize='x-large')plt.xlabel("step", fontsize='x-large')plt.legend(loc='lower right', fontsize='x-large')plt.savefig(fig_name)plt.show()
1 数据处理
1.1 数据集介绍
CIFAR-10数据集包含了10种不同的类别、共60,000张图像,其中每个类别的图像都是6000张,图像大小均为 像素。CIFAR-10数据集的示例如图所示。
1.2 数据读取
在本实验中,将原始训练集拆分成了train_set、dev_set两个部分,分别包括40 000条和10 000条样本。将data_batch_1到data_batch_4作为训练集,data_batch_5作为验证集,test_batch作为测试集。
最终的数据集构成为:
- 训练集:40 000条样本。
- 验证集:10 000条样本。
- 测试集:10 000条样本。
读取一个batch数据的代码如下所示:
def load_cifar10_batch(folder_path, batch_id=1, mode='train'):if mode == 'test':file_path = os.path.join(folder_path, 'test_batch')else:file_path = os.path.join(folder_path, 'data_batch_' + str(batch_id))# 加载数据集文件with open(file_path, 'rb') as batch_file:batch = pickle.load(batch_file, encoding='latin1')imgs = batch['data'].reshape((len(batch['data']), 3, 32, 32)) / 255.labels = batch['labels']return np.array(imgs, dtype='float32'), np.array(labels)imgs_batch, labels_batch = load_cifar10_batch(folder_path=r'C:\Users\29134\PycharmProjects\pythonProject\DL\实验13\cifar-10-batches-py',batch_id=1, mode='train')
查看数据的维度:
# 打印一下每个batch中X和y的维度
print("batch of imgs shape: ", imgs_batch.shape, "batch of labels shape: ", labels_batch.shape)
可视化观察其中的一张样本图像和对应的标签,代码如下所示:
image, label = imgs_batch[1], labels_batch[1]
print("The label in the picture is {}".format(label))
plt.figure(figsize=(2, 2))
plt.imshow(image.transpose(1, 2, 0))
plt.savefig('cnn-car.pdf')
1.3 构造Dataset类
class CIFAR10Dataset(Dataset):def __init__(self, folder_path=r'C:\Users\29134\PycharmProjects\pythonProject\DL\实验13\cifar-10-batches-py',mode='train'):if mode == 'train':# 加载batch1-batch4作为训练集self.imgs, self.labels = load_cifar10_batch(folder_path=folder_path, batch_id=1, mode='train')for i in range(2, 5):imgs_batch, labels_batch = load_cifar10_batch(folder_path=folder_path, batch_id=i, mode='train')self.imgs, self.labels = np.concatenate([self.imgs, imgs_batch]), np.concatenate([self.labels, labels_batch])elif mode == 'dev':# 加载batch5作为验证集self.imgs, self.labels = load_cifar10_batch(folder_path=folder_path, batch_id=5, mode='dev')elif mode == 'test':# 加载测试集self.imgs, self.labels = load_cifar10_batch(folder_path=folder_path, mode='test')self.transforms = transforms.Compose([transforms.Resize(32), transforms.ToTensor(),transforms.Normalize(mean=[0.4914, 0.4822, 0.4465],std=[0.2023, 0.1994, 0.2010])])def __getitem__(self, idx):img, label = self.imgs[idx], self.labels[idx]img = self.transforms(Image.fromarray((img.reshape([32,32,3]) * 255).astype('uint8')))return img, labeldef __len__(self):return len(self.imgs)train_dataset = CIFAR10Dataset(folder_path=r'C:\Users\29134\PycharmProjects\pythonProject\DL\实验13\cifar-10-batches-py', mode='train')
dev_dataset = CIFAR10Dataset(folder_path=r'C:\Users\29134\PycharmProjects\pythonProject\DL\实验13\cifar-10-batches-py',mode='dev')
test_dataset = CIFAR10Dataset(folder_path=r'C:\Users\29134\PycharmProjects\pythonProject\DL\实验13\cifar-10-batches-py',mode='test')
2 模型构建
使用pytorch中的Resnet18进行图像分类实验。
resnet18_model = resnet18(pretrained=True)
3 模型训练
复用RunnerV3类,实例化RunnerV3类,并传入训练配置。
使用训练集和验证集进行模型训练,共训练30个epoch。
在实验中,保存准确率最高的模型作为最佳模型。代码实现如下:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")# 学习率大小
lr = 0.001
# 批次大小
batch_size = 64
# 加载数据
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
dev_loader = DataLoader(dev_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)
# 定义网络
model = resnet18_model.to(device)
# 定义优化器,这里使用Adam优化器以及l2正则化策略,相关内容在7.3.3.2和7.6.2中会进行详细介绍
optimizer = opt.Adam(lr=lr, params=model.parameters(), weight_decay=0.005)
# 定义损失函数
loss_fn = F.cross_entropy
loss_fn = loss_fn
# 定义评价指标
metric = Accuracy(is_logist=True)
# 实例化RunnerV3
runner = RunnerV3(model, optimizer, loss_fn, metric)
# 启动训练
log_steps = 3000
eval_steps = 3000
runner.train(train_loader, dev_loader, num_epochs=30, log_steps=log_steps,eval_steps=eval_steps, save_path="best_model.pdparams")
可视化观察训练集与验证集的准确率及损失变化情况。
plot(runner, fig_name='cnn-loss4.pdf')
4 模型评价
使用测试数据对在训练过程中保存的最佳模型进行评价,观察模型在测试集上的准确率以及损失情况。代码实现如下:
# 加载最优模型
runner.load_model('best_model.pdparams')
# 模型评价
score, loss = runner.evaluate(test_loader)
print("[Test] accuracy/loss: {:.4f}/{:.4f}".format(score, loss))
5 模型预测
同样地,也可以使用保存好的模型,对测试集中的数据进行模型预测,观察模型效果,具体代码实现如下:
#获取测试集中的一个batch的数据
X, label = next(iter(test_loader))
logits = runner.predict(X,dim=1)
#多分类,使用softmax计算预测概率
pred = F.softmax(logits)
# print(pred)
#获取概率最大的类别
pred_class = torch.argmax(pred[2][0]).cpu().numpy()
label = label[2].item()
#输出真实类别与预测类别
print("The true category is {} and the predicted category is {}".format(label, pred_class))
#可视化图片
plt.figure(figsize=(2, 2))
imgs, labels = load_cifar10_batch(folder_path=r'C:\Users\29134\PycharmProjects\pythonProject\DL\实验13\cifar-10-batches-py',mode='test')
plt.imshow(imgs[2].transpose(1,2,0))
plt.savefig('cnn-test-vis.pdf')
6 什么是预训练模型和迁移学习
什么是预训练呢?举例子进行简单的介绍
假设已有A训练集,先用A对网络进行预训练,在A任务上学会网络参数,然后保存以备后用,当来一个新的任务B,采取相同的网络结构,网络参数初始化的时候可以加载A学习好的参数,其他的高层参数随机初始化,之后用B任务的训练数据来训练网络,当加载的参数保持不变时,称为"frozen",当加载的参数随着B任务的训练进行不断的改变,称为“fine-tuning”,即更好地把参数进行调整使得更适合当前的B任务
对于迁移学习,就是将在任务A上学习的网络参数加载到B的过程叫做迁移学习
为什么会有预训练,大概概括为以下几点:
- 数据效率:大规模数据在深度学习中扮演着至关重要的角色,然而获取标注数据是一项昂贵和耗时的工作。预训练可以大大提升训练模型的数据量提高准确性,大大提高泛化能力。
- 知识共享:在预训练模型中,已经学习到的知识可以被迁移到新的任务中,从而减少了针对新任务的训练成本和时间。
- 解决梯度消失问题:通过预训练模型,可以使得初始的权重参数更加合理,从而缓解了这些问题。
7 比较“使用预训练模型”和“不使用预训练模型”的效果。
通过实验结果我们可以发现,使用预训练模型收敛速度更快,更稳定(可视化误差结果),准确率更高(运行结果)。
总结
最开始没用gpu跑,跑了两天,都快跑麻了,因为总是跑半天报一个错,跑半个点报个错后来改为了gpu,效果就好了很多,博客会在过几天详细更新(因为我流了,发烧好几天先把作业内容写完了,展开部分之后再写)