用五个不同的网络,然后对分类概率进行平均,得到分类结果。基本上分类精度可以提升10%
1.导入基本库
import torch
import copy
import torch.nn as nn
import torchvision.models as models
from torchvision import datasets
from torchvision import transforms
from tqdm import tqdm
from torch.utils.data import DataLoader
from torch.utils.data import random_split
from transformers import AutoModelForImageClassification,AutoConfig
2.数据集准备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 数据预处理
transform = transforms.Compose([transforms.Resize((224, 224)),transforms.ToTensor(),transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])train_dataset = datasets.ImageFolder(root='./aug_datasets1', transform=transform)
dataset_size = len(train_dataset)train_size = int(0.8 * dataset_size)
val_size = dataset_size - train_sizetrain_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False)
3.定义不同模型与对应的训练策略
模型1 ResNet
class ResNet(nn.Module):def __init__(self, num_classes=21,train=True):super(ResNet, self).__init__()if(train):self.resnet = models.resnet50(weights=torchvision.models.ResNet50_Weights.IMAGENET1K_V1)else:self.resnet = models.resnet50(weights=None)in_features = self.resnet.fc.in_featuresself.resnet.fc = nn.Sequential(nn.Linear(in_features, 512),nn.ReLU(inplace=True),nn.Dropout(0.5),nn.Linear(512, num_classes))self.resnet.to(device)def forward(self, x):return self.resnet(x)# 训练策略def startTrain(self, train_loader, val_loader):criterion = nn.CrossEntropyLoss()optimizer = torch.optim.AdamW(self.parameters(), lr=1e-4, weight_decay=1e-4)scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50)Best_Acc = 0.0print("Training ResNet.....")for epoch in range(10): # 训练 10 个 epochself.train()train_loss = 0for batch in tqdm(train_loader):images, labels = batchimages, labels = images.to(device), labels.to(device)optimizer.zero_grad()# 处理图像并将其传递给模型logits = self(images)# 计算损失并进行反向传播loss = criterion(logits, labels)loss.backward()optimizer.step()train_loss += loss.item()print(f"Epoch {epoch+1}/{10}, Train Loss: {train_loss/len(train_loader)}")scheduler.step()self.eval()val_loss = 0correct = 0total = 0with torch.no_grad():for batch in tqdm(val_loader):images, labels = batchimages, labels = images.to(device), labels.to(device)# 处理图像并传递给模型logits = self(images)# 计算损失loss = criterion(logits, labels)val_loss += loss.item()# 计算准确率_, predicted = torch.max(logits, 1)total += labels.size(0)correct += (predicted == labels).sum().item()print(f"Validation Loss: {val_loss/len(val_loader)}")print(f"Accuracy: {100 * correct / total}%")if(100 * correct / total > Best_Acc):Best_Acc = 100 * correct / totaltorch.save(self.state_dict(), './saved/resnet/model_weights_{}.pth'.format(Best_Acc))
模型2 EfficientNet
class EfficientNet(nn.Module):def __init__(self, num_classes=21,train=True):super(EfficientNet, self).__init__()if(train):self.effnet = models.efficientnet_b2(weights=torchvision.models.EfficientNet_B2_Weights.IMAGENET1K_V1)else:self.effnet = models.efficientnet_b2(weights=None)in_features = self.effnet.classifier[1].in_featuresself.effnet.classifier = nn.Sequential(nn.Linear(in_features, 512),nn.ReLU(inplace=True),nn.Dropout(0.5),nn.Linear(512, num_classes))self.effnet.to(device)def forward(self, x):return self.effnet(x)# 训练策略def startTrain(self, train_loader, val_loader):# 焦点损失,gamma参数增强对少数类的关注criterion = nn.CrossEntropyLoss()optimizer = torch.optim.AdamW(self.parameters(), lr=1e-4, weight_decay=1e-4)scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5)Best_Acc = 0.0print("Training EfficientNet.....")for epoch in range(10): # 训练 10 个 epochself.train()train_loss = 0for batch in tqdm(train_loader):images, labels = batchimages, labels = images.to(device), labels.to(device)optimizer.zero_grad()# 处理图像并将其传递给模型logits = self(images)# 计算损失并进行反向传播loss = criterion(logits, labels)loss.backward()optimizer.step()train_loss += loss.item()print(f"Epoch {epoch+1}/{10}, Train Loss: {train_loss/len(train_loader)}")scheduler.step(train_loss/len(train_loader))self.eval()val_loss = 0correct = 0total = 0with torch.no_grad():for batch in tqdm(val_loader):images, labels = batchimages, labels = images.to(device), labels.to(device)# 处理图像并传递给模型logits = self(images)# 计算损失loss = criterion(logits, labels)val_loss += loss.item()# 计算准确率_, predicted = torch.max(logits, 1)total += labels.size(0)correct += (predicted == labels).sum().item()print(f"Validation Loss: {val_loss/len(val_loader)}")print(f"Accuracy: {100 * correct / total}%")if(100 * correct / total > Best_Acc):Best_Acc = 100 * correct / totaltorch.save(self.state_dict(), './saved/efficientnet/model_weights_{}.pth'.format(Best_Acc))
模型3 DenseNet
class DenseNet(nn.Module):def __init__(self, num_classes=21, train=True):super(DenseNet, self).__init__()self.num_classes = num_classesif(train):self.densenet = models.densenet121(weights=torchvision.models.DenseNet121_Weights.IMAGENET1K_V1)else:self.densenet = models.densenet121(weights=None) in_features = self.densenet.classifier.in_featuresself.densenet.classifier = nn.Sequential(nn.BatchNorm1d(in_features),nn.Linear(in_features, 512),nn.ReLU(inplace=True),nn.Dropout(0.5),nn.Linear(512, num_classes))self.densenet.to(device)def forward(self, x):return self.densenet(x)# 训练策略def startTrain(self, train_loader, val_loader):criterion = nn.CrossEntropyLoss()optimizer = torch.optim.Adam(self.parameters(), lr=1e-4)scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50)Best_Acc = 0.0print("Training DenseNet.....")for epoch in range(10): # 训练 10 个 epochself.train()train_loss = 0for batch in tqdm(train_loader):images, labels = batchimages, labels = images.to(device), labels.to(device)optimizer.zero_grad()# 处理图像并将其传递给模型logits = self(images)# 计算损失并进行反向传播loss = criterion(logits, labels)loss.backward()optimizer.step()train_loss += loss.item()print(f"Epoch {epoch+1}/{10}, Train Loss: {train_loss/len(train_loader)}")scheduler.step()self.eval()val_loss = 0correct = 0total = 0with torch.no_grad():for batch in tqdm(val_loader):images, labels = batchimages, labels = images.to(device), labels.to(device)# 处理图像并传递给模型logits = self(images)# 计算损失loss = criterion(logits, labels)val_loss += loss.item()# 计算准确率_, predicted = torch.max(logits, 1)total += labels.size(0)correct += (predicted == labels).sum().item()print(f"Validation Loss: {val_loss/len(val_loader)}")print(f"Accuracy: {100 * correct / total}%")if(100 * correct / total > Best_Acc):Best_Acc = 100 * correct / totaltorch.save(self.state_dict(), './saved/densenet/model_weights_{}.pth'.format(Best_Acc))
模型4 ResNeXt
class ResNeXt(nn.Module):def __init__(self, num_classes=21,train=True):super(ResNeXt, self).__init__()if(train):self.resnext50 = models.resnext50_32x4d(weights=torchvision.models.ResNeXt50_32X4D_Weights.IMAGENET1K_V1)else:self.resnext50 = models.resnext50_32x4d(weights=None)in_features = self.resnext50.fc.in_featuresself.resnext50.fc = nn.Sequential(nn.BatchNorm1d(in_features),nn.Linear(in_features, 512),nn.ReLU(inplace=True),nn.Dropout(0.5),nn.Linear(512, num_classes))self.resnext50.to(device)self.to(device)def forward(self, x):return self.resnext50(x)def startTrain(self, train_loader, val_loader):optimizer = torch.optim.AdamW(self.parameters(), lr=1e-4)scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=5e-4, epochs=30, steps_per_epoch=len(train_loader)) criterion = nn.CrossEntropyLoss()Best_Acc = 0.0print("Training ResNeXt.....")for epoch in range(10): # 训练 10 个 epochself.train()train_loss = 0for batch in tqdm(train_loader):images, labels = batchimages, labels = images.to(device), labels.to(device)optimizer.zero_grad()# 处理图像并将其传递给模型logits = self(images)# 计算损失并进行反向传播loss = criterion(logits, labels)loss.backward()optimizer.step()train_loss += loss.item()print(f"Epoch {epoch+1}/{10}, Train Loss: {train_loss/len(train_loader)}")scheduler.step(train_loss/len(train_loader))self.eval()val_loss = 0correct = 0total = 0with torch.no_grad():for batch in tqdm(val_loader):images, labels = batchimages, labels = images.to(device), labels.to(device)# 处理图像并传递给模型logits = self(images)# 计算损失loss = criterion(logits, labels)val_loss += loss.item()# 计算准确率_, predicted = torch.max(logits, 1)total += labels.size(0)correct += (predicted == labels).sum().item()print(f"Validation Loss: {val_loss/len(val_loader)}")print(f"Accuracy: {100 * correct / total}%")if(100 * correct / total > Best_Acc):Best_Acc = 100 * correct / totaltorch.save(self.state_dict(), './saved/se-resnext/model_weights_{}.pth'.format(Best_Acc))
模型5 SwinTransformer
class SwinTransformer(nn.Module):def __init__(self, num_classes=21,train=True):super(SwinTransformer, self).__init__()if(train):self.vit = AutoModelForImageClassification.from_pretrained('./swinv2-tiny-patch4-window16-256/models--microsoft--swinv2-tiny-patch4-window16-256/snapshots/f4d3075206f2ad5eda586c30d6b4d0500f312421/') #这个地方怎么写加载模型self.vit.classifier = nn.Sequential(nn.Dropout(0.5),nn.Linear(self.vit.classifier.in_features, num_classes))# 冻结Swin Transformer模型中的所有层for param in self.vit.parameters():param.requires_grad = False # 只解冻最后两个Transformer块和分类头for param in self.vit.swinv2.encoder.layers[-4:].parameters(): # 假设你想解冻最后两层param.requires_grad = Truefor param in self.vit.classifier.parameters():param.requires_grad = Trueelse:# 先加载 config,然后手动修改 num_labelsconfig = AutoConfig.from_pretrained('./saved/swin-transformer/')config.num_labels = 21self.vit = AutoModelForImageClassification.from_pretrained('./saved/swin-transformer/',config=config) self.vit.to(device)def forward(self, x):return self.vit(x)# 训练策略def startTrain(self, train_loader, val_loader):# 使用标签平滑处理,考虑到类别是连续尺度criterion = nn.CrossEntropyLoss()# 两阶段训练策略# 阶段1: 只训练解冻的层num_epochs_stage1 = 10num_epochs_stage2 = 10optimizer_stage1 = torch.optim.AdamW([p for p in self.parameters() if p.requires_grad], lr=1e-3)scheduler_stage1 = torch.optim.lr_scheduler.OneCycleLR(optimizer_stage1, max_lr=1e-3, epochs=num_epochs_stage1, steps_per_epoch=len(train_loader))best_model_wts = copy.deepcopy(self.state_dict())print("Training SwinTransformer.....") print("===== Stage 1 Training =====")Best_Acc = 0.0for epoch in range(num_epochs_stage1): # 训练 10 个 epochself.train()train_loss = 0for batch in tqdm(train_loader):images, labels = batchimages, labels = images.to(device), labels.to(device)optimizer_stage1.zero_grad()# 处理图像并将其传递给模型outputs = self(images)logits = outputs.logits# 计算损失并进行反向传播loss = criterion(logits, labels)loss.backward()optimizer_stage1.step()train_loss += loss.item()print(f"Epoch {epoch+1}/{10}, Train Loss: {train_loss/len(train_loader)}")scheduler_stage1.step()self.eval()val_loss = 0correct = 0total = 0with torch.no_grad():for batch in tqdm(val_loader):images, labels = batchimages, labels = images.to(device), labels.to(device)# 处理图像并传递给模型outputs = self(images)logits = outputs.logits# 计算损失loss = criterion(logits, labels)val_loss += loss.item()# 计算准确率_, predicted = torch.max(logits, 1)total += labels.size(0)correct += (predicted == labels).sum().item()print(f"Validation Loss: {val_loss/len(val_loader)}")print(f"Accuracy: {100 * correct / total}%")if(100 * correct / total > Best_Acc):Best_Acc = 100 * correct / totalbest_model_wts = copy.deepcopy(self.state_dict())self.vit.save_pretrained('./saved/swin-transformer/', safe_serialization=False) # 阶段1结束后加载最佳模型权重self.load_state_dict(best_model_wts) Best_Acc = 0.0print("===== Stage 2 Training =====")# 阶段2: 微调整个网络for param in self.parameters():param.requires_grad = Trueoptimizer_stage2 = torch.optim.Adam(self.parameters(), lr=1e-6)scheduler_stage2 = torch.optim.lr_scheduler.OneCycleLR(optimizer_stage2, max_lr=5e-6, epochs=num_epochs_stage2, steps_per_epoch=len(train_loader))for epoch in range(num_epochs_stage2): # 训练 10 个 epochself.train()train_loss = 0for batch in tqdm(train_loader):images, labels = batchimages, labels = images.to(device), labels.to(device)optimizer_stage2.zero_grad()# 处理图像并将其传递给模型outputs = self(images)logits = outputs.logits# 计算损失并进行反向传播loss = criterion(logits, labels)loss.backward()optimizer_stage2.step()train_loss += loss.item()print(f"Epoch {epoch+1}/{10}, Train Loss: {train_loss/len(train_loader)}")scheduler_stage2.step()self.eval()val_loss = 0correct = 0total = 0with torch.no_grad():for batch in tqdm(val_loader):images, labels = batchimages, labels = images.to(device), labels.to(device)# 处理图像并传递给模型outputs = self(images)logits = outputs.logits# 计算损失loss = criterion(logits, labels)val_loss += loss.item()# 计算准确率_, predicted = torch.max(logits, 1)total += labels.size(0)correct += (predicted == labels).sum().item()print(f"Validation Loss: {val_loss/len(val_loader)}")print(f"Accuracy: {100 * correct / total}%")if(100 * correct / total > Best_Acc):Best_Acc = 100 * correct / totalself.vit.save_pretrained('./saved/swin-transformer/', safe_serialization=False)
4.分别训练,然后得到权重
swinTransformer= SwinTransformer()swinTransformer.startTrain(train_dataloader,val_dataloader)efficientNet= EfficientNet()efficientNet.startTrain(train_dataloader,val_dataloader)resNet= ResNet()resNet.startTrain(train_dataloader,val_dataloader)resNeXt= ResNeXt()resNeXt.startTrain(train_dataloader,val_dataloader)denseNet= DenseNet()denseNet.startTrain(train_dataloader,val_dataloader)
5.构建集成分类模型
import torch
import torchvision.transforms as transforms
import torch.nn as nn
from torchvision import datasets
from torchvision import transforms
from tqdm import tqdm
from torch.utils.data import DataLoader
from torch.utils.data import random_split
from tqdm import tqdm
from PIL import Imagedef remove_prefix_from_state_dict(state_dict, prefix='resnext.'):return {"resnext50." + k[len(prefix):] if k.startswith(prefix) else k: v for k, v in state_dict.items()}# 定义集成模型
class EnsembleModel():def __init__(self, efficientNet, resNet, resNeXt, denseNet,swinTransformer):super(EnsembleModel, self).__init__()self.efficientNet= efficientNet.eval()self.resNet= resNet.eval()self.resNeXt= resNeXt.eval()self.denseNet= denseNet.eval()self.swinTransformer= swinTransformer.eval()def predict(self, x):efficientNet_out = torch.softmax(self.efficientNet(x),dim=1)resNet_out = torch.softmax(self.resNet(x),dim=1)resNeXt_out = torch.softmax(self.resNeXt(x),dim=1)denseNet_out = torch.softmax(self.denseNet(x),dim=1)swinTransformer_out = torch.softmax(self.swinTransformer(x).logits,dim=1)avg_pred = (efficientNet_out + resNet_out + resNeXt_out + denseNet_out + swinTransformer_out ) / 5return avg_pred
这样就可以提升性能