一,类模块定义:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import Tensorclass ResBlock(nn.Module):def __init__(self, inchannel, outchannel, stride=1) -> None:super(ResBlock, self).__init__()# 这里定义了残差块内连续的2个卷积层self.conv1 = nn.Conv2d(inchannel, outchannel, kernel_size=3, stride=stride, padding=1, bias=False)self.bn1 = nn.BatchNorm2d(outchannel)self.relu = nn.ReLU(inplace=True)self.conv2 = nn.Conv2d(outchannel, outchannel, kernel_size=3, stride=1, padding=1, bias=False)self.bn2 = nn.BatchNorm2d(outchannel)self.downsample = nn.Sequential()if stride != 1 or inchannel != outchannel:# shortcut,这里为了跟2个卷积层的结果结构一致,要做处理self.downsample = nn.Sequential(nn.Conv2d(inchannel, outchannel, kernel_size=1, stride=stride, bias=False),nn.BatchNorm2d(outchannel))def forward(self, x):out = self.conv1(x)out = self.bn1(out)out = self.relu(out)out = self.conv2(out)out = self.bn2(out)out = out + self.downsample(x)out = self.relu(out)return outclass ResNet18(nn.Module):def __init__(self, ResBlock, num_classes=1000) -> None:super(ResNet18, self).__init__()self.inchannel = 64self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)self.bn1 = nn.BatchNorm2d(64)self.relu = nn.ReLU(inplace=True)self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1, ceil_mode=False)self.layer1 = self.make_layer(ResBlock, 64, 2, stride=1)self.layer2 = self.make_layer(ResBlock, 128, 2, stride=2)self.layer3 = self.make_layer(ResBlock, 256, 2, stride=2)self.layer4 = self.make_layer(ResBlock, 512, 2, stride=2)self.avgpool = nn.AdaptiveAvgPool2d(output_size=(1, 1))self.fc = nn.Linear(512, num_classes)def forward(self, x: Tensor) -> Tensor:out = self.conv1(x)out = self.bn1(out)out = self.relu(out)out = self.maxpool(out)out = self.layer1(out)out = self.layer2(out)out = self.layer3(out)out = self.layer4(out)out = self.avgpool(out)out = out.view(out.size(0), -1)out = self.fc(out)return outdef make_layer(self, block, channels, num_blocks, stride):strides = [stride] + [1] * (num_blocks - 1)layers = []for stride in strides:layers.append(block(self.inchannel, channels, stride))self.inchannel = channelsreturn nn.Sequential(*layers)if __name__ == '__main__':model = ResNet18(ResBlock)print(model)
二,对比Pytorch官方提供的预训练模型 加载xxx.pht文件
# 方案一: 使用官方自带的resnet18加载预训练模型
from torchvision import models# 当 xxx.pth预训练模型不存在时,可以联网直接下载
# model = models.resnet18(weights=ResNet18_Weights.DEFAULT) # 载入预训练模型
model = models.resnet18()# 加载与训练模型
weights_dict = torch.load('C:\\Users\\torch\\hub\\checkpoints\\resnet18-f37072fd.pth')model.load_state_dict(weights_dict, strict=True)
print(model)# 方案二: 使用自定义的ResNet18加载预训练模型
model = ResNet18(ResBlock)
weights_dict = torch.load('C:\\Users\\torch\\hub\\checkpoints\\resnet18-f37072fd.pth')model.load_state_dict(weights_dict, strict=True)
print(model)
三,用自定义的ResNet18记载Pytorch官网提供的预训练模型,训练自己的图像分类数据,完整代码
import matplotlib.pyplot as plt
from torchvision.models import ResNet18_Weightsimport warnings
warnings.filterwarnings("ignore") # 忽略烦人的红色提示import time
import os
from tqdm import tqdm
import pandas as pd
import numpy as np
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F# 导入训练需使用的工具包
from torchvision import models
import torch.optim as optim
from torch.optim import lr_schedulerfrom sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.metrics import roc_auc_score''' 运行一个 batch 的训练,返回当前 batch 的训练日志 '''
log_train = {}
def train_one_batch(images, labels, epoch, batch_idx):# 获得一个 batch 的数据和标注images = images.to(device)labels = labels.to(device)# images = [32, 3, 224, 224]outputs = model(images) # 输入模型,执行前向预测(mat1 and mat2 shapes cannot be multiplied (32x25088 and 512x30))loss = criterion(outputs, labels) # 计算当前 batch 中,每个样本的平均交叉熵损失函数值# 优化更新权重optimizer.zero_grad()loss.backward()optimizer.step()# 获取当前 batch 的标签类别和预测类别_, preds = torch.max(outputs, 1) # 获得当前 batch 所有图像的预测类别preds = preds.cpu().numpy()loss = loss.detach().cpu().numpy()outputs = outputs.detach().cpu().numpy()labels = labels.detach().cpu().numpy()log_train['epoch'] = epochlog_train['batch'] = batch_idx# 计算分类评估指标log_train['train_loss'] = losslog_train['train_accuracy'] = accuracy_score(labels, preds)log_train['train_precision'] = precision_score(labels, preds, average='macro')log_train['train_recall'] = recall_score(labels, preds, average='macro')log_train['train_f1-score'] = f1_score(labels, preds, average='macro')return log_train''' 在整个测试集上评估,返回分类评估指标日志 '''
def evaluate_testset(epoch):loss_list = []labels_list = []preds_list = []with torch.no_grad():for images, labels in test_loader: # 生成一个 batch 的数据和标注images = images.to(device)labels = labels.to(device)outputs = model(images) # 输入模型,执行前向预测loss = criterion(outputs, labels) # 由 logit,计算当前 batch 中,每个样本的平均交叉熵损失函数值# 获取整个测试集的标签类别和预测类别_, preds = torch.max(outputs, 1) # 获得当前 batch 所有图像的预测类别preds = preds.cpu().numpy()loss = loss.detach().cpu().numpy()outputs = outputs.detach().cpu().numpy()labels = labels.detach().cpu().numpy()loss_list.append(loss)labels_list.extend(labels)preds_list.extend(preds)log_test = {}log_test['epoch'] = epoch# 计算分类评估指标log_test['test_loss'] = np.mean(loss_list)log_test['test_accuracy'] = accuracy_score(labels_list, preds_list)log_test['test_precision'] = precision_score(labels_list, preds_list, average='macro')log_test['test_recall'] = recall_score(labels_list, preds_list, average='macro')log_test['test_f1-score'] = f1_score(labels_list, preds_list, average='macro')return log_testdef saveLog():# 训练日志-训练集df_train_log = pd.DataFrame()log_train = {}log_train['epoch'] = 0log_train['batch'] = 0images, labels = next(iter(train_loader))log_train.update(train_one_batch(images, labels, 0, 0))df_train_log = df_train_log.append(log_train, ignore_index=True)# 训练日志-测试集df_test_log = pd.DataFrame()log_test = {}log_test['epoch'] = 0log_test.update(evaluate_testset(0))df_test_log = df_test_log.append(log_test, ignore_index=True)return df_train_log, df_test_logif __name__ == '__main__':ntime = time.strftime('%Y%m%d_%H%M%S', time.localtime(time.time()))ntime = str(ntime)device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')print('device', device)from torchvision import transforms# 训练集图像预处理:缩放裁剪、图像增强、转 Tensor、归一化train_transform = transforms.Compose([transforms.RandomResizedCrop(224),transforms.RandomHorizontalFlip(),transforms.ToTensor(),transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])# 测试集图像预处理-RCTN:缩放、裁剪、转 Tensor、归一化test_transform = transforms.Compose([transforms.Resize(256),transforms.CenterCrop(224),transforms.ToTensor(),transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])])# 数据集文件夹路径dataset_dir = 'D:\\dl_workspace\\datasets\\fruit30_split'train_path = os.path.join(dataset_dir, 'train')test_path = os.path.join(dataset_dir, 'val')print('训练集路径', train_path)print('测试集路径', test_path)from torchvision import datasetstrain_dataset = datasets.ImageFolder(train_path, train_transform) # 载入训练集test_dataset = datasets.ImageFolder(test_path, test_transform) # 载入测试集# 各类别名称class_names = train_dataset.classesn_class = len(class_names)train_dataset.class_to_idx # 映射关系:类别 到 索引号idx_to_labels = {y: x for x, y in train_dataset.class_to_idx.items()} # 映射关系:索引号 到 类别# 保存为本地的 npy 文件# np.save('idx_to_labels.npy', idx_to_labels)# np.save('labels_to_idx.npy', train_dataset.class_to_idx)from torch.utils.data import DataLoaderBATCH_SIZE = 256# 训练集的数据加载器train_loader = DataLoader(train_dataset,batch_size=BATCH_SIZE,shuffle=True,num_workers=4)# 测试集的数据加载器test_loader = DataLoader(test_dataset,batch_size=BATCH_SIZE,shuffle=False,num_workers=4)from Utils import pyutils# 只微调训练模型最后一层(全连接分类层)# model = models.resnet18(weights=ResNet18_Weights.DEFAULT) # 载入预训练模型# model = models.resnet18()# print(model)# print('pymodel:', pyutils.getOrderedDictKeys(model.state_dict()))from ResNet18_Model import ResNet18, ResBlock, ResNetmodel = ResNet18(ResBlock)# 给自定义模型,加载预训练模型权重,(strict=False 可以看到具有相同网络层名称的网络被初始化,不具有的网络层的参数不会被初始化)weights_dict = torch.load('C:\\Users\\Administrator/.cache\\torch\\hub\\checkpoints\\resnet18-f37072fd.pth')model.load_state_dict(weights_dict, strict=True)# 修改全连接层,使得全连接层的输出与当前数据集类别数对应(新建的层默认 requires_grad=True)# 只微调训练最后一层全连接层的参数,其它层冻结(1000分类改成30分类)model.fc = nn.Linear(model.fc.in_features, n_class)optimizer = optim.Adam(model.fc.parameters())print(model)# 训练配置model = model.to(device)criterion = nn.CrossEntropyLoss() # 交叉熵损失函数EPOCHS = 30 # 训练轮次 Epoch(训练集当中所有的训练数据扫一遍算作一个epoch)'''学习率的降低优化策略,每经过5个epoch,学习率降低为原来的一半(lr = lr*gamma)'''lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5) # 学习率降低策略# df_train_log, df_test_log = saveLog()df_train_log = pd.DataFrame()df_test_log = pd.DataFrame()epoch = 0batch_idx = 0best_test_accuracy = 0# 运行训练for epoch in range(1, EPOCHS + 1):print(f'Epoch {epoch}/{EPOCHS}')## 训练阶段model.train()for images, labels in tqdm(train_loader): # 获得一个 batch 的数据和标注batch_idx += 1log_train = train_one_batch(images, labels, epoch, batch_idx)df_train_log = df_train_log.append(log_train, ignore_index=True)# wandb.log(log_train)lr_scheduler.step() # 学习率优化策略,跟新学习率## 测试阶段model.eval() # 将模型的模式从训练模式改成评估模式log_test = evaluate_testset(epoch) # 在整个测试集上评估,并且返回测试结果df_test_log = df_test_log.append(log_test, ignore_index=True)# wandb.log(log_test)# 保存最新的最佳模型文件if log_test['test_accuracy'] > best_test_accuracy:# 删除旧的最佳模型文件(如有)old_best_checkpoint_path = 'checkpoint/best-{:.3f}.pth'.format(best_test_accuracy)if os.path.exists(old_best_checkpoint_path):os.remove(old_best_checkpoint_path)# 保存新的最佳模型文件best_test_accuracy = log_test['test_accuracy']new_best_checkpoint_path = './checkpoint/{0}_best-{1:.3f}.pth'.format(ntime, log_test['test_accuracy'])torch.save(model, new_best_checkpoint_path)print('保存新的最佳模型', './checkpoint/{0}_best-{1:.3f}.pth'.format(ntime, best_test_accuracy))best_test_accuracy = log_test['test_accuracy']print(f'测试准确率: {best_test_accuracy} / {epoch}')df_train_log.to_csv('训练日志-训练集-{0}.csv'.format(ntime), index=False)df_test_log.to_csv('训练日志-测试集-{0}.csv'.format(ntime), index=False)# 测试集上的准确率为 87.662 %