38实战Kaggle
比赛:图像分类 (CIFAR-10
)
比赛链接:CIFAR-10 - Object Recognition in Images | Kaggle
导入包
import os
import glob
import pandas as pd
import numpy as np
import torch
import torchvision
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import matplotlib.pyplot as plt
from torch import nn
from d2l import torch as d2l
import liliPytorch as lp
import csv
预处理:数据集分析
# 获取精简数据集
#@save
d2l.DATA_HUB['cifar10_tiny'] = (d2l.DATA_URL + 'kaggle_cifar10_tiny.zip','2068874e4b9a9f0fb07ebe0ad2b29754449ccacd')
# 如果使用完整的Kaggle竞赛的数据集,设置demo为False
demo = True
if demo:data_dir = d2l.download_extract('cifar10_tiny')
else:data_dir = '../data/cifar-10/'train_path = '../data/kaggle_cifar10_tiny/train.csv'
file_path = '../data/kaggle_cifar10_tiny/'# 读取数据
train_data = pd.read_csv(train_path)
# 查看数据
print(train_data['label'].value_counts())
# """
# label
# automobile 112
# frog 107
# truck 103
# horse 102
# airplane 102
# deer 99
# bird 99
# ship 99
# cat 92
# dog 85
# """
1.数据处理与加载
train_path = '../data/kaggle_cifar10_tiny/train.csv'
test_path = '../data/kaggle_cifar10_tiny/test.csv'
file_path = '../data/kaggle_cifar10_tiny/'# 统计label种类,并排序
cifar_labels = sorted(list(set(train_data['label'])))
# 将label对应编号
labels_to_num = dict(zip(cifar_labels, range(len(cifar_labels))))
# print(labels_to_num)
"""
{'airplane': 0, 'automobile': 1, 'bird': 2, 'cat': 3, 'deer': 4,
'dog': 5, 'frog': 6, 'horse': 7, 'ship': 8, 'truck': 9}
"""
# 将编号对应label,用于后续预测
num_to_labels = {value : key for key, value in labels_to_num.items()}
# print(num_to_labels)
"""
{0: 'airplane', 1: 'automobile', 2: 'bird', 3: 'cat', 4: 'deer',
5: 'dog', 6: 'frog', 7: 'horse', 8: 'ship', 9: 'truck'}
"""def get_image_filenames(folder_path, extensions=['.png', '.jpg', '.jpeg']):# 获取指定文件夹中的所有图片文件image_files = []for ext in extensions:image_files.extend(glob.glob(os.path.join(folder_path, f'*{ext}')))# 返回图片文件名列表return [os.path.basename(image) for image in image_files]def save_filenames_to_csv(filenames, csv_path):with open(csv_path, mode='w', newline='', encoding='utf-8') as file:writer = csv.writer(file)# 写入CSV的第一行writer.writerow(['id'])# 写入每个文件名for filename in filenames:writer.writerow([filename])# 获取测试图片名
test_images_path = '../data/kaggle_cifar10_tiny/test'
image_filenames = get_image_filenames(test_images_path)
# 保存到CSV文件
save_filenames_to_csv(image_filenames, file_path + 'test.csv') class CifarDataset(Dataset):def __init__(self, csv_path, file_path, mode='train', valid_ratio=0.2, resize_height=224, resize_width=224):"""初始化 LeavesDataset 对象。参数:csv_path (str): 包含图像路径和标签的 CSV 文件路径。file_path (str): 图像文件所在目录的路径。mode (str, optional): 数据集的模式。可以是 'train', 'valid' 或 'test'。默认值为 'train'。valid_ratio (float, optional): 用于验证的数据比例。默认值为 0.2。resize_height (int, optional): 调整图像高度的大小。默认值为 224。resize_width (int, optional): 调整图像宽度的大小。默认值为 224。"""# 存储图像调整大小的高度和宽度self.resize_height = resize_heightself.resize_width = resize_width# 存储图像文件路径和模式(train/valid/test)if mode == 'train' or mode == 'valid':self.file_path = file_path + 'train/'else:self.file_path = file_path + 'test/'self.mode = mode# 读取包含图像路径和标签的 CSV 文件self.data_info = pd.read_csv(csv_path, header=0)# 获取样本总数self.data_len = len(self.data_info.index)# 计算训练集样本数self.train_len = int(self.data_len * (1 - valid_ratio))# 根据模式处理数据if self.mode == 'train':# 训练模式下的图像和标签self.train_img = np.asarray(self.data_info.iloc[0:self.train_len, 0])self.train_label = np.asarray(self.data_info.iloc[0:self.train_len, 1])self.image_arr = self.train_imgself.label_arr = self.train_labelelif self.mode == 'valid':# 验证模式下的图像和标签self.valid_img = np.asarray(self.data_info.iloc[self.train_len:, 0])self.valid_label = np.asarray(self.data_info.iloc[self.train_len:, 1])self.image_arr = self.valid_imgself.label_arr = self.valid_labelelif self.mode == 'test':# 测试模式下的图像self.test_img = np.asarray(self.data_info.iloc[:, 0])self.image_arr = self.test_img# 获取图像数组的长度self.len_image = len(self.image_arr)print(f'扫描所有 {mode} 数据,共 {self.len_image} 张图像')def __getitem__(self, idx):"""获取指定索引的图像和标签。参数:idx (int): 标签文本对应编号的索引返回:如果是测试模式,返回图像张量;否则返回图像张量和标签。"""# 打开图像文件if self.mode == 'test':self.img = Image.open(self.file_path + str(self.image_arr[idx]))else :self.img = Image.open(self.file_path + str(self.image_arr[idx]) + '.png')if self.mode == 'train':# 训练模式下的数据增强trans =torchvision.transforms.Compose([torchvision.transforms.Resize((self.resize_height, self.resize_width)),torchvision.transforms.RandomHorizontalFlip(p=0.5),torchvision.transforms.RandomVerticalFlip(p=0.5),torchvision.transforms.RandomResizedCrop(32, scale=(0.64, 1.0),ratio=(1.0, 1.0)),torchvision.transforms.RandomRotation(degrees=30),# torchvision.transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),# torchvision.transforms.RandomResizedCrop(size=self.resize_height, scale=(0.8, 1.0)),torchvision.transforms.ToTensor(),torchvision.transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])self.img = trans(self.img)else:# 验证和测试模式下的简单处理trans = torchvision.transforms.Compose([torchvision.transforms.Resize((self.resize_height, self.resize_width)),torchvision.transforms.ToTensor(),torchvision.transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])self.img = trans(self.img)if self.mode == 'test':return self.imgelse:# 获取标签文本对应的编号self.label = labels_to_num[self.label_arr[idx]]return self.img, self.labeldef __call__(self, idx):"""使对象可以像函数一样被调用。参数:idx (int):标签文本对应编号的索引返回:调用 __getitem__ 方法并返回结果。"""return self.__getitem__(idx)def __len__(self):"""获取数据集的长度。返回:数据集中图像的数量。"""return self.len_imagetrain_dataset = CifarDataset(train_path,file_path, mode='train', valid_ratio=0.1, resize_height=40, resize_width=40)
valid_dataset = CifarDataset(train_path, file_path, mode='valid',valid_ratio=0.1, resize_height=40, resize_width=40)
test_dataset = CifarDataset(test_path, file_path, mode='test',valid_ratio=0.1, resize_height=40, resize_width=40)batch_size = 32
train_iter = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True, num_workers=0)
valid_iter = torch.utils.data.DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, drop_last=True, num_workers=0)
test_iter = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False, drop_last=False, num_workers=0)
2.模型训练
def train_batch(net, X, y, loss, trainer, devices):"""使用多GPU训练一个小批量数据。参数:net: 神经网络模型。X: 输入数据,张量或张量列表。y: 标签数据。loss: 损失函数。trainer: 优化器。devices: GPU设备列表。返回:train_loss_sum: 当前批次的训练损失和。train_acc_sum: 当前批次的训练准确度和。"""# 如果输入数据X是列表类型if isinstance(X, list):# 将列表中的每个张量移动到第一个GPU设备X = [x.to(devices[0]) for x in X]else:X = X.to(devices[0])# 如果X不是列表,直接将X移动到第一个GPU设备y = y.to(devices[0])# 将标签数据y移动到第一个GPU设备net.train() # 设置网络为训练模式trainer.zero_grad()# 梯度清零pred = net(X) # 前向传播,计算预测值l = loss(pred, y) # 计算损失l.sum().backward()# 反向传播,计算梯度trainer.step() # 更新模型参数train_loss_sum = l.sum()# 计算当前批次的总损失train_acc_sum = d2l.accuracy(pred, y)# 计算当前批次的总准确度return train_loss_sum, train_acc_sum# 返回训练损失和与准确度和def train(net, train_iter, valid_iter, num_epochs, lr, wd, devices, lr_period, lr_decay,param_group=True):# trainer = torch.optim.SGD(net.parameters(), lr=lr, momentum=0.9,weight_decay=wd)trainer = torch.optim.Adam(net.parameters(), lr=lr,weight_decay=wd)scheduler = torch.optim.lr_scheduler.StepLR(trainer, lr_period, lr_decay)loss = nn.CrossEntropyLoss(reduction="none")num_batches, timer = len(train_iter), d2l.Timer()legend = ['train loss', 'train acc']if valid_iter is not None:legend.append('valid acc')animator = lp.Animator(xlabel='epoch', xlim=[1, num_epochs],legend=legend)net = nn.DataParallel(net, device_ids=devices).to(devices[0])for epoch in range(num_epochs):net.train()metric = lp.Accumulator(3)for i, (features, labels) in enumerate(train_iter):timer.start()l, acc = train_batch(net, features, labels,loss, trainer, devices)metric.add(l, acc, labels.shape[0])timer.stop()train_l = metric[0] / metric[2] # 计算训练损失train_acc = metric[1] / metric[2] # 计算训练准确率if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:animator.add(epoch + (i + 1) / num_batches,(train_l , train_acc,None))if valid_iter is not None:valid_acc = d2l.evaluate_accuracy_gpu(net, valid_iter)animator.add(epoch + 1, (None, None, valid_acc))scheduler.step()print(f'loss {train_l:.3f}, train acc {train_acc:.3f}, 'f'valid_acc {valid_acc:.3f}')measures = (f'train loss {metric[0] / metric[2]:.3f}, 'f'train acc {metric[1] / metric[2]:.3f}')if valid_iter is not None:measures += f', valid acc {valid_acc:.3f}'print(measures + f'\n{metric[2] * num_epochs / timer.sum():.1f}'f' examples/sec on {str(devices)}')
3.定义超参数
# 定义模型
net = d2l.resnet18(len(cifar_labels),3)
devices, num_epochs, lr, wd = d2l.try_all_gpus(), 100, 3e-4, 5e-4
lr_period, lr_decay = 4, 0.9
train(net, train_iter, valid_iter, num_epochs, lr, wd, devices, lr_period, lr_decay)
plt.show()
# train loss 0.153, train acc 0.955, valid acc 0.469
# 873.5 examples/sec on [device(type='cuda', index=0)]
4.模型预测
# 针对测试集进行分类预测
def predict(net, data_loader, devices):"""使用模型进行预测参数:net (torch.nn.Module): 要进行预测的模型data_loader (torch.utils.data.DataLoader): 数据加载器,用于提供待预测的数据devices (list): 计算设备列表(CPU或GPU)返回:all_preds (list): 包含所有预测结果的列表"""all_preds = [] # 存储所有预测结果net.to(devices[0]) # 将模型移动到指定设备net.eval() # 设置模型为评估模式with torch.no_grad(): # 在不需要计算梯度的上下文中进行for X in data_loader: # 遍历数据加载器X = X.to(devices[0]) # 将数据移动到指定设备outputs = net(X) # 前向传播,计算模型输出_, preds = torch.max(outputs, 1) # 获取预测结果all_preds.extend(preds.cpu().numpy()) # 将预测结果添加到列表中return all_preds # 返回所有预测结果# 调用预测函数
predictions = predict(net, test_iter, devices)
# 映射预测结果到标签
mapped_predictions = [num_to_labels[int(i)] for i in predictions]
# 读取测试数据
test_data = pd.read_csv(test_path)
# 将预测结果添加到测试数据中
test_data['label'] = pd.Series(mapped_predictions)
# 创建提交文件
submission = pd.concat([test_data['id'], test_data['label']], axis=1)
# 保存提交文件
submission.to_csv(file_path + 'submission.csv', index=False)