Vision Transformer(ViT)简介
近些年,随着基于自注意(Self-Attention)结构的模型的发展,特别是Transformer模型的提出,极大地促进了自然语言处理模型的发展。由于Transformers的计算效率和可扩展性,它已经能够训练具有超过100B参数的空前规模的模型。
ViT则是自然语言处理和计算机视觉两个领域的融合结晶。在不依赖卷积操作的情况下,依然可以在图像分类任务上达到很好的效果。
模型结构
ViT模型的主体结构是基于Transformer模型的Encoder部分(部分结构顺序有调整,如:Normalization的位置与标准Transformer不同),其结构图[1]如下:
模型特点
ViT模型主要应用于图像分类领域。因此,其模型结构相较于传统的Transformer有以下几个特点:
- 数据集的原图像被划分为多个patch(图像块)后,将二维patch(不考虑channel)转换为一维向量,再加上类别向量与位置向量作为模型输入。
- 模型主体的Block结构是基于Transformer的Encoder结构,但是调整了Normalization的位置,其中,最主要的结构依然是Multi-head Attention结构。
- 模型在Blocks堆叠后接全连接层,接受类别向量的输出作为输入并用于分类。通常情况下,我们将最后的全连接层称为Head,Transformer Encoder部分为backbone。
环境准备与数据读取
开始实验之前,请确保本地已经安装了Python环境并安装了MindSpore。
首先我们需要下载本案例的数据集,可通过http://image-net.org下载完整的ImageNet数据集,本案例应用的数据集是从ImageNet中筛选出来的子集。
from download import downloaddataset_url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/vit_imagenet_dataset.zip"
path = "./"path = download(dataset_url, path, kind="zip", replace=True)import osimport mindspore as ms
from mindspore.dataset import ImageFolderDataset
import mindspore.dataset.vision as transformsdata_path = './dataset/'
mean = [0.485 * 255, 0.456 * 255, 0.406 * 255]
std = [0.229 * 255, 0.224 * 255, 0.225 * 255]dataset_train = ImageFolderDataset(os.path.join(data_path, "train"), shuffle=True)trans_train = [transforms.RandomCropDecodeResize(size=224,scale=(0.08, 1.0),ratio=(0.75, 1.333)),transforms.RandomHorizontalFlip(prob=0.5),transforms.Normalize(mean=mean, std=std),transforms.HWC2CHW()
]dataset_train = dataset_train.map(operations=trans_train, input_columns=["image"])
dataset_train = dataset_train.batch(batch_size=16, drop_remainder=True)
模型解析
下面将通过代码来细致剖析ViT模型的内部结构。
Transformer基本原理
Transformer模型源于2017年的一篇文章[2]。在这篇文章中提出的基于Attention机制的编码器-解码器型结构在自然语言处理领域获得了巨大的成功。模型结构如下图所示:
其主要结构为多个Encoder和Decoder模块所组成,其中Encoder和Decoder的详细结构如下图[2]所示:
Encoder与Decoder由许多结构组成,如:多头注意力(Multi-Head Attention)层,Feed Forward层,Normaliztion层,甚至残差连接(Residual Connection,图中的“Add”)。不过,其中最重要的结构是多头注意力(Multi-Head Attention)结构,该结构基于自注意力(Self-Attention)机制,是多个Self-Attention的并行组成。
所以,理解了Self-Attention就抓住了Transformer的核心。
Attention模块
以下是Self-Attention的解释,其核心内容是为输入向量的每个单词学习一个权重。通过给定一个任务相关的查询向量Query向量,计算Query和各个Key的相似性或者相关性得到注意力分布,即得到每个Key对应Value的权重系数,然后对Value进行加权求和得到最终的Attention数值。
在Self-Attention中:
- 最初的输入向量首先会经过Embedding层映射成Q(Query),K(Key),V(Value)三个向量,由于是并行操作,所以代码中是映射成为dim x 3的向量然后进行分割,换言之,如果你的输入向量为一个向量序列(𝑥1𝑥1,𝑥2𝑥2,𝑥3𝑥3),其中的𝑥1𝑥1,𝑥2𝑥2,𝑥3𝑥3都是一维向量,那么每一个一维向量都会经过Embedding层映射出Q,K,V三个向量,只是Embedding矩阵不同,矩阵参数也是通过学习得到的。这里大家可以认为,Q,K,V三个矩阵是发现向量之间关联信息的一种手段,需要经过学习得到,至于为什么是Q,K,V三个,主要是因为需要两个向量点乘以获得权重,又需要另一个向量来承载权重向加的结果,所以,最少需要3个矩阵。
- 自注意力机制的自注意主要体现在它的Q,K,V都来源于其自身,也就是该过程是在提取输入的不同顺序的向量的联系与特征,最终通过不同顺序向量之间的联系紧密性(Q与K乘积经过Softmax的结果)来表现出来。Q,K,V得到后就需要获取向量间权重,需要对Q和K进行点乘并除以维度的平方根,对所有向量的结果进行Softmax处理,通过公式(2)的操作,我们获得了向量之间的关系权重。
- 其最终输出则是通过V这个映射后的向量与Q,K经过Softmax结果进行weight sum获得,这个过程可以理解为在全局上进行自注意表示。每一组Q,K,V最后都有一个V输出,这是Self-Attention得到的最终结果,是当前向量在结合了它与其他向量关联权重后得到的结果。
通过下图可以整体把握Self-Attention的全部过程。
多头注意力机制就是将原本self-Attention处理的向量分割为多个Head进行处理,这一点也可以从代码中体现,这也是attention结构可以进行并行加速的一个方面。
总结来说,多头注意力机制在保持参数总量不变的情况下,将同样的query, key和value映射到原来的高维空间(Q,K,V)的不同子空间(Q_0,K_0,V_0)中进行自注意力的计算,最后再合并不同子空间中的注意力信息。
所以,对于同一个输入向量,多个注意力机制可以同时对其进行处理,即利用并行计算加速处理过程,又在处理的时候更充分的分析和利用了向量特征。下图展示了多头注意力机制,其并行能力的主要体现在下图中的𝑎1𝑎1和𝑎2𝑎2是同一个向量进行分割获得的。
以下是Multi-Head Attention代码,结合上文的解释,代码清晰的展现了这一过程。
from mindspore import nn, opsclass Attention(nn.Cell):def __init__(self,dim: int,num_heads: int = 8,keep_prob: float = 1.0,attention_keep_prob: float = 1.0):super(Attention, self).__init__()self.num_heads = num_headshead_dim = dim // num_headsself.scale = ms.Tensor(head_dim ** -0.5)self.qkv = nn.Dense(dim, dim * 3)self.attn_drop = nn.Dropout(p=1.0-attention_keep_prob)self.out = nn.Dense(dim, dim)self.out_drop = nn.Dropout(p=1.0-keep_prob)self.attn_matmul_v = ops.BatchMatMul()self.q_matmul_k = ops.BatchMatMul(transpose_b=True)self.softmax = nn.Softmax(axis=-1)def construct(self, x):"""Attention construct."""b, n, c = x.shapeqkv = self.qkv(x)qkv = ops.reshape(qkv, (b, n, 3, self.num_heads, c // self.num_heads))qkv = ops.transpose(qkv, (2, 0, 3, 1, 4))q, k, v = ops.unstack(qkv, axis=0)attn = self.q_matmul_k(q, k)attn = ops.mul(attn, self.scale)attn = self.softmax(attn)attn = self.attn_drop(attn)out = self.attn_matmul_v(attn, v)out = ops.transpose(out, (0, 2, 1, 3))out = ops.reshape(out, (b, n, c))out = self.out(out)out = self.out_drop(out)return out
接下来就利用Self-Attention来构建ViT模型中的TransformerEncoder部分,类似于构建了一个Transformer的编码器部分,如下图[1]所示:
-
ViT模型中的基础结构与标准Transformer有所不同,主要在于Normalization的位置是放在Self-Attention和Feed Forward之前,其他结构如Residual Connection,Feed Forward,Normalization都如Transformer中所设计。
-
从Transformer结构的图片可以发现,多个子encoder的堆叠就完成了模型编码器的构建,在ViT模型中,依然沿用这个思路,通过配置超参数num_layers,就可以确定堆叠层数。
-
Residual Connection,Normalization的结构可以保证模型有很强的扩展性(保证信息经过深层处理不会出现退化的现象,这是Residual Connection的作用),Normalization和dropout的应用可以增强模型泛化能力。
从以下源码中就可以清晰看到Transformer的结构。将TransformerEncoder结构和一个多层感知器(MLP)结合,就构成了ViT模型的backbone部分。
class TransformerEncoder(nn.Cell):def __init__(self,dim: int,num_layers: int,num_heads: int,mlp_dim: int,keep_prob: float = 1.,attention_keep_prob: float = 1.0,drop_path_keep_prob: float = 1.0,activation: nn.Cell = nn.GELU,norm: nn.Cell = nn.LayerNorm):super(TransformerEncoder, self).__init__()layers = []for _ in range(num_layers):normalization1 = norm((dim,))normalization2 = norm((dim,))attention = Attention(dim=dim,num_heads=num_heads,keep_prob=keep_prob,attention_keep_prob=attention_keep_prob)feedforward = FeedForward(in_features=dim,hidden_features=mlp_dim,activation=activation,keep_prob=keep_prob)layers.append(nn.SequentialCell([ResidualCell(nn.SequentialCell([normalization1, attention])),ResidualCell(nn.SequentialCell([normalization2, feedforward]))]))self.layers = nn.SequentialCell(layers)def construct(self, x):"""Transformer construct."""return self.layers(x)
ViT模型的输入
传统的Transformer结构主要用于处理自然语言领域的词向量(Word Embedding or Word Vector),词向量与传统图像数据的主要区别在于,词向量通常是一维向量进行堆叠,而图片则是二维矩阵的堆叠,多头注意力机制在处理一维词向量的堆叠时会提取词向量之间的联系也就是上下文语义,这使得Transformer在自然语言处理领域非常好用,而二维图片矩阵如何与一维词向量进行转化就成为了Transformer进军图像处理领域的一个小门槛。
在ViT模型中:
-
通过将输入图像在每个channel上划分为16个patch,这一步是通过卷积操作来完成的,当然也可以人工进行划分,但卷积操作也可以达到目的同时还可以进行一次而外的数据处理;*例如一幅输入224 x 224的图像,首先经过卷积处理得到16 x 16个patch,那么每一个patch的大小就是14 x 14。
-
再将每一个patch的矩阵拉伸成为一个一维向量,从而获得了近似词向量堆叠的效果。上一步得到的14 x 14的patch就转换为长度为196的向量。
这是图像输入网络经过的第一步处理。具体Patch Embedding的代码如下所示:
class PatchEmbedding(nn.Cell):MIN_NUM_PATCHES = 4def __init__(self,image_size: int = 224,patch_size: int = 16,embed_dim: int = 768,input_channels: int = 3):super(PatchEmbedding, self).__init__()self.image_size = image_sizeself.patch_size = patch_sizeself.num_patches = (image_size // patch_size) ** 2self.conv = nn.Conv2d(input_channels, embed_dim, kernel_size=patch_size, stride=patch_size, has_bias=True)def construct(self, x):"""Path Embedding construct."""x = self.conv(x)b, c, h, w = x.shapex = ops.reshape(x, (b, c, h * w))x = ops.transpose(x, (0, 2, 1))return x
输入图像在划分为patch之后,会经过pos_embedding 和 class_embedding两个过程。
-
class_embedding主要借鉴了BERT模型的用于文本分类时的思想,在每一个word vector之前增加一个类别值,通常是加在向量的第一位,上一步得到的196维的向量加上class_embedding后变为197维。
-
增加的class_embedding是一个可以学习的参数,经过网络的不断训练,最终以输出向量的第一个维度的输出来决定最后的输出类别;由于输入是16 x 16个patch,所以输出进行分类时是取 16 x 16个class_embedding进行分类。
-
pos_embedding也是一组可以学习的参数,会被加入到经过处理的patch矩阵中。
-
由于pos_embedding也是可以学习的参数,所以它的加入类似于全链接网络和卷积的bias。这一步就是创造一个长度维197的可训练向量加入到经过class_embedding的向量中。
实际上,pos_embedding总共有4种方案。但是经过作者的论证,只有加上pos_embedding和不加pos_embedding有明显影响,至于pos_embedding是一维还是二维对分类结果影响不大,所以,在我们的代码中,也是采用了一维的pos_embedding,由于class_embedding是加在pos_embedding之前,所以pos_embedding的维度会比patch拉伸后的维度加1。
总的而言,ViT模型还是利用了Transformer模型在处理上下文语义时的优势,将图像转换为一种“变种词向量”然后进行处理,而这样转换的意义在于,多个patch之间本身具有空间联系,这类似于一种“空间语义”,从而获得了比较好的处理效果。
整体构建ViT
以下代码构建了一个完整的ViT模型。
from mindspore.common.initializer import Normal
from mindspore.common.initializer import initializer
from mindspore import Parameterdef init(init_type, shape, dtype, name, requires_grad):"""Init."""initial = initializer(init_type, shape, dtype).init_data()return Parameter(initial, name=name, requires_grad=requires_grad)class ViT(nn.Cell):def __init__(self,image_size: int = 224,input_channels: int = 3,patch_size: int = 16,embed_dim: int = 768,num_layers: int = 12,num_heads: int = 12,mlp_dim: int = 3072,keep_prob: float = 1.0,attention_keep_prob: float = 1.0,drop_path_keep_prob: float = 1.0,activation: nn.Cell = nn.GELU,norm: Optional[nn.Cell] = nn.LayerNorm,pool: str = 'cls') -> None:super(ViT, self).__init__()self.patch_embedding = PatchEmbedding(image_size=image_size,patch_size=patch_size,embed_dim=embed_dim,input_channels=input_channels)num_patches = self.patch_embedding.num_patchesself.cls_token = init(init_type=Normal(sigma=1.0),shape=(1, 1, embed_dim),dtype=ms.float32,name='cls',requires_grad=True)self.pos_embedding = init(init_type=Normal(sigma=1.0),shape=(1, num_patches + 1, embed_dim),dtype=ms.float32,name='pos_embedding',requires_grad=True)self.pool = poolself.pos_dropout = nn.Dropout(p=1.0-keep_prob)self.norm = norm((embed_dim,))self.transformer = TransformerEncoder(dim=embed_dim,num_layers=num_layers,num_heads=num_heads,mlp_dim=mlp_dim,keep_prob=keep_prob,attention_keep_prob=attention_keep_prob,drop_path_keep_prob=drop_path_keep_prob,activation=activation,norm=norm)self.dropout = nn.Dropout(p=1.0-keep_prob)self.dense = nn.Dense(embed_dim, num_classes)def construct(self, x):"""ViT construct."""x = self.patch_embedding(x)cls_tokens = ops.tile(self.cls_token.astype(x.dtype), (x.shape[0], 1, 1))x = ops.concat((cls_tokens, x), axis=1)x += self.pos_embeddingx = self.pos_dropout(x)x = self.transformer(x)x = self.norm(x)x = x[:, 0]if self.training:x = self.dropout(x)x = self.dense(x)return x
整体流程图如下所示:
模型训练与推理
模型训练
模型开始训练前,需要设定损失函数,优化器,回调函数等。
完整训练ViT模型需要很长的时间,实际应用时建议根据项目需要调整epoch_size,当正常输出每个Epoch的step信息时,意味着训练正在进行,通过模型输出可以查看当前训练的loss值和时间等指标。
from mindspore.nn import LossBase
from mindspore.train import LossMonitor, TimeMonitor, CheckpointConfig, ModelCheckpoint
from mindspore import train# define super parameter
epoch_size = 10
momentum = 0.9
num_classes = 1000
resize = 224
step_size = dataset_train.get_dataset_size()# construct model
network = ViT()# load ckpt
vit_url = "https://download.mindspore.cn/vision/classification/vit_b_16_224.ckpt"
path = "./ckpt/vit_b_16_224.ckpt"vit_path = download(vit_url, path, replace=True)
param_dict = ms.load_checkpoint(vit_path)
ms.load_param_into_net(network, param_dict)# define learning rate
lr = nn.cosine_decay_lr(min_lr=float(0),max_lr=0.00005,total_step=epoch_size * step_size,step_per_epoch=step_size,decay_epoch=10)# define optimizer
network_opt = nn.Adam(network.trainable_params(), lr, momentum)# define loss function
class CrossEntropySmooth(LossBase):"""CrossEntropy."""def __init__(self, sparse=True, reduction='mean', smooth_factor=0., num_classes=1000):super(CrossEntropySmooth, self).__init__()self.onehot = ops.OneHot()self.sparse = sparseself.on_value = ms.Tensor(1.0 - smooth_factor, ms.float32)self.off_value = ms.Tensor(1.0 * smooth_factor / (num_classes - 1), ms.float32)self.ce = nn.SoftmaxCrossEntropyWithLogits(reduction=reduction)def construct(self, logit, label):if self.sparse:label = self.onehot(label, ops.shape(logit)[1], self.on_value, self.off_value)loss = self.ce(logit, label)return lossnetwork_loss = CrossEntropySmooth(sparse=True,reduction="mean",smooth_factor=0.1,num_classes=num_classes)# set checkpoint
ckpt_config = CheckpointConfig(save_checkpoint_steps=step_size, keep_checkpoint_max=100)
ckpt_callback = ModelCheckpoint(prefix='vit_b_16', directory='./ViT', config=ckpt_config)# initialize model
# "Ascend + mixed precision" can improve performance
ascend_target = (ms.get_context("device_target") == "Ascend")
if ascend_target:model = train.Model(network, loss_fn=network_loss, optimizer=network_opt, metrics={"acc"}, amp_level="O2")
else:model = train.Model(network, loss_fn=network_loss, optimizer=network_opt, metrics={"acc"}, amp_level="O0")# train model
model.train(epoch_size,dataset_train,callbacks=[ckpt_callback, LossMonitor(125), TimeMonitor(125)],dataset_sink_mode=False,)
模型验证
模型验证过程主要应用了ImageFolderDataset,CrossEntropySmooth和Model等接口。
ImageFolderDataset主要用于读取数据集。
CrossEntropySmooth是损失函数实例化接口。
Model主要用于编译模型。
与训练过程相似,首先进行数据增强,然后定义ViT网络结构,加载预训练模型参数。随后设置损失函数,评价指标等,编译模型后进行验证。本案例采用了业界通用的评价标准Top_1_Accuracy和Top_5_Accuracy评价指标来评价模型表现。
在本案例中,这两个指标代表了在输出的1000维向量中,以最大值或前5的输出值所代表的类别为预测结果时,模型预测的准确率。这两个指标的值越大,代表模型准确率越高。
dataset_val = ImageFolderDataset(os.path.join(data_path, "val"), shuffle=True)trans_val = [transforms.Decode(),transforms.Resize(224 + 32),transforms.CenterCrop(224),transforms.Normalize(mean=mean, std=std),transforms.HWC2CHW()
]dataset_val = dataset_val.map(operations=trans_val, input_columns=["image"])
dataset_val = dataset_val.batch(batch_size=16, drop_remainder=True)# construct model
network = ViT()# load ckpt
param_dict = ms.load_checkpoint(vit_path)
ms.load_param_into_net(network, param_dict)network_loss = CrossEntropySmooth(sparse=True,reduction="mean",smooth_factor=0.1,num_classes=num_classes)# define metric
eval_metrics = {'Top_1_Accuracy': train.Top1CategoricalAccuracy(),'Top_5_Accuracy': train.Top5CategoricalAccuracy()}if ascend_target:model = train.Model(network, loss_fn=network_loss, optimizer=network_opt, metrics=eval_metrics, amp_level="O2")
else:model = train.Model(network, loss_fn=network_loss, optimizer=network_opt, metrics=eval_metrics, amp_level="O0")# evaluate model
result = model.eval(dataset_val)
print(result)
从结果可以看出,由于我们加载了预训练模型参数,模型的Top_1_Accuracy和Top_5_Accuracy达到了很高的水平,实际项目中也可以以此准确率为标准。如果未使用预训练模型参数,则需要更多的epoch来训练。
模型推理
在进行模型推理之前,首先要定义一个对推理图片进行数据预处理的方法。该方法可以对我们的推理图片进行resize和normalize处理,这样才能与我们训练时的输入数据匹配。
本案例采用了一张Doberman的图片作为推理图片来测试模型表现,期望模型可以给出正确的预测结果。
dataset_infer = ImageFolderDataset(os.path.join(data_path, "infer"), shuffle=True)trans_infer = [transforms.Decode(),transforms.Resize([224, 224]),transforms.Normalize(mean=mean, std=std),transforms.HWC2CHW()
]dataset_infer = dataset_infer.map(operations=trans_infer,input_columns=["image"],num_parallel_workers=1)
dataset_infer = dataset_infer.batch(1)
接下来,我们将调用模型的predict方法进行模型。
在推理过程中,通过index2label就可以获取对应标签,再通过自定义的show_result接口将结果写在对应图片上。
import os
import pathlib
import cv2
import numpy as np
from PIL import Image
from enum import Enum
from scipy import ioclass Color(Enum):"""dedine enum color."""red = (0, 0, 255)green = (0, 255, 0)blue = (255, 0, 0)cyan = (255, 255, 0)yellow = (0, 255, 255)magenta = (255, 0, 255)white = (255, 255, 255)black = (0, 0, 0)def check_file_exist(file_name: str):"""check_file_exist."""if not os.path.isfile(file_name):raise FileNotFoundError(f"File `{file_name}` does not exist.")def color_val(color):"""color_val."""if isinstance(color, str):return Color[color].valueif isinstance(color, Color):return color.valueif isinstance(color, tuple):assert len(color) == 3for channel in color:assert 0 <= channel <= 255return colorif isinstance(color, int):assert 0 <= color <= 255return color, color, colorif isinstance(color, np.ndarray):assert color.ndim == 1 and color.size == 3assert np.all((color >= 0) & (color <= 255))color = color.astype(np.uint8)return tuple(color)raise TypeError(f'Invalid type for color: {type(color)}')def imread(image, mode=None):"""imread."""if isinstance(image, pathlib.Path):image = str(image)if isinstance(image, np.ndarray):passelif isinstance(image, str):check_file_exist(image)image = Image.open(image)if mode:image = np.array(image.convert(mode))else:raise TypeError("Image must be a `ndarray`, `str` or Path object.")return imagedef imwrite(image, image_path, auto_mkdir=True):"""imwrite."""if auto_mkdir:dir_name = os.path.abspath(os.path.dirname(image_path))if dir_name != '':dir_name = os.path.expanduser(dir_name)os.makedirs(dir_name, mode=777, exist_ok=True)image = Image.fromarray(image)image.save(image_path)def imshow(img, win_name='', wait_time=0):"""imshow"""cv2.imshow(win_name, imread(img))if wait_time == 0: # prevent from hanging if windows was closedwhile True:ret = cv2.waitKey(1)closed = cv2.getWindowProperty(win_name, cv2.WND_PROP_VISIBLE) < 1# if user closed window or if some key pressedif closed or ret != -1:breakelse:ret = cv2.waitKey(wait_time)def show_result(img: str,result: Dict[int, float],text_color: str = 'green',font_scale: float = 0.5,row_width: int = 20,show: bool = False,win_name: str = '',wait_time: int = 0,out_file: Optional[str] = None) -> None:"""Mark the prediction results on the picture."""img = imread(img, mode="RGB")img = img.copy()x, y = 0, row_widthtext_color = color_val(text_color)for k, v in result.items():if isinstance(v, float):v = f'{v:.2f}'label_text = f'{k}: {v}'cv2.putText(img, label_text, (x, y), cv2.FONT_HERSHEY_COMPLEX,font_scale, text_color)y += row_widthif out_file:show = Falseimwrite(img, out_file)if show:imshow(img, win_name, wait_time)def index2label():"""Dictionary output for image numbers and categories of the ImageNet dataset."""metafile = os.path.join(data_path, "ILSVRC2012_devkit_t12/data/meta.mat")meta = io.loadmat(metafile, squeeze_me=True)['synsets']nums_children = list(zip(*meta))[4]meta = [meta[idx] for idx, num_children in enumerate(nums_children) if num_children == 0]_, wnids, classes = list(zip(*meta))[:3]clssname = [tuple(clss.split(', ')) for clss in classes]wnid2class = {wnid: clss for wnid, clss in zip(wnids, clssname)}wind2class_name = sorted(wnid2class.items(), key=lambda x: x[0])mapping = {}for index, (_, class_name) in enumerate(wind2class_name):mapping[index] = class_name[0]return mapping# Read data for inference
for i, image in enumerate(dataset_infer.create_dict_iterator(output_numpy=True)):image = image["image"]image = ms.Tensor(image)prob = model.predict(image)label = np.argmax(prob.asnumpy(), axis=1)mapping = index2label()output = {int(label): mapping[int(label)]}print(output)show_result(img="./dataset/infer/n01440764/ILSVRC2012_test_00000279.JPEG",result=output,out_file="./dataset/infer/ILSVRC2012_test_00000279.JPEG")
推理过程完成后,在推理文件夹下可以找到图片的推理结果,可以看出预测结果是Doberman,与期望结果相同,验证了模型的准确性。
总结
本案例完成了一个ViT模型在ImageNet数据上进行训练,验证和推理的过程,其中,对关键的ViT模型结构和原理作了讲解。通过学习本案例,理解源码可以帮助用户掌握Multi-Head Attention,TransformerEncoder,pos_embedding等关键概念,如果要详细理解ViT的模型原理,建议基于源码更深层次的详细阅读。
SSD目标检测¶
模型简介
SSD,全称Single Shot MultiBox Detector,是Wei Liu在ECCV 2016上提出的一种目标检测算法。使用Nvidia Titan X在VOC 2007测试集上,SSD对于输入尺寸300x300的网络,达到74.3%mAP(mean Average Precision)以及59FPS;对于512x512的网络,达到了76.9%mAP ,超越当时最强的Faster RCNN(73.2%mAP)。具体可参考论文[1]。 SSD目标检测主流算法分成可以两个类型:
-
two-stage方法:RCNN系列
通过算法产生候选框,然后再对这些候选框进行分类和回归。
-
one-stage方法:YOLO和SSD
直接通过主干网络给出类别位置信息,不需要区域生成。
SSD是单阶段的目标检测算法,通过卷积神经网络进行特征提取,取不同的特征层进行检测输出,所以SSD是一种多尺度的检测方法。在需要检测的特征层,直接使用一个3 ×× 3卷积,进行通道的变换。SSD采用了anchor的策略,预设不同长宽比例的anchor,每一个输出特征层基于anchor预测多个检测框(4或者6)。采用了多尺度检测方法,浅层用于检测小目标,深层用于检测大目标。SSD的框架如下图:
模型结构
SSD采用VGG16作为基础模型,然后在VGG16的基础上新增了卷积层来获得更多的特征图以用于检测。SSD的网络结构如图所示。上面是SSD模型,下面是YOLO模型,可以明显看到SSD利用了多尺度的特征图做检测。
两种单阶段目标检测算法的比较:
SSD先通过卷积不断进行特征提取,在需要检测物体的网络,直接通过一个3 ×× 3卷积得到输出,卷积的通道数由anchor数量和类别数量决定,具体为(anchor数量*(类别数量+4))。
SSD对比了YOLO系列目标检测方法,不同的是SSD通过卷积得到最后的边界框,而YOLO对最后的输出采用全连接的形式得到一维向量,对向量进行拆解得到最终的检测框。
模型特点
-
多尺度检测
在SSD的网络结构图中我们可以看到,SSD使用了多个特征层,特征层的尺寸分别是38 ×× 38,19 ×× 19,10 ×× 10,5 ×× 5,3 ×× 3,1 ×× 1,一共6种不同的特征图尺寸。大尺度特征图(较靠前的特征图)可以用来检测小物体,而小尺度特征图(较靠后的特征图)用来检测大物体。多尺度检测的方式,可以使得检测更加充分(SSD属于密集检测),更能检测出小目标。
-
采用卷积进行检测
与YOLO最后采用全连接层不同,SSD直接采用卷积对不同的特征图来进行提取检测结果。对于形状为m ×× n ×× p的特征图,只需要采用3 ×× 3 ×× p这样比较小的卷积核得到检测值。
-
预设anchor
在YOLOv1中,直接由网络预测目标的尺寸,这种方式使得预测框的长宽比和尺寸没有限制,难以训练。在SSD中,采用预设边界框,我们习惯称它为anchor(在SSD论文中叫default bounding boxes),预测框的尺寸在anchor的指导下进行微调。
环境准备
本案例基于MindSpore实现,开始实验前,请确保本地已经安装了mindspore、download、pycocotools、opencv-python。
数据准备与处理
本案例所使用的数据集为COCO 2017。为了更加方便地保存和加载数据,本案例中在数据读取前首先将COCO数据集转换成MindRecord格式。使用MindSpore Record数据格式可以减少磁盘IO、网络IO开销,从而获得更好的使用体验和性能提升。 首先我们需要下载处理好的MindRecord格式的COCO数据集。 运行以下代码将数据集下载并解压到指定路径。
from download import downloaddataset_url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/ssd_datasets.zip"
path = "./"
path = download(dataset_url, path, kind="zip", replace=True)
coco_root = "./datasets/"
anno_json = "./datasets/annotations/instances_val2017.json"train_cls = ['background', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus','train', 'truck', 'boat', 'traffic light', 'fire hydrant','stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog','horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra','giraffe', 'backpack', 'umbrella', 'handbag', 'tie','suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball','kite', 'baseball bat', 'baseball glove', 'skateboard','surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup','fork', 'knife', 'spoon', 'bowl', 'banana', 'apple','sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza','donut', 'cake', 'chair', 'couch', 'potted plant', 'bed','dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote','keyboard', 'cell phone', 'microwave', 'oven', 'toaster', 'sink','refrigerator', 'book', 'clock', 'vase', 'scissors','teddy bear', 'hair drier', 'toothbrush']train_cls_dict = {}
for i, cls in enumerate(train_cls):train_cls_dict[cls] = i
数据采样
为了使模型对于各种输入对象大小和形状更加鲁棒,SSD算法每个训练图像通过以下选项之一随机采样:
-
使用整个原始输入图像
-
采样一个区域,使采样区域和原始图片最小的交并比重叠为0.1,0.3,0.5,0.7或0.9
-
随机采样一个区域
每个采样区域的大小为原始图像大小的[0.3,1],长宽比在1/2和2之间。如果真实标签框中心在采样区域内,则保留两者重叠部分作为新图片的真实标注框。在上述采样步骤之后,将每个采样区域大小调整为固定大小,并以0.5的概率水平翻转。
import cv2
import numpy as npdef _rand(a=0., b=1.):return np.random.rand() * (b - a) + adef intersect(box_a, box_b):"""Compute the intersect of two sets of boxes."""max_yx = np.minimum(box_a[:, 2:4], box_b[2:4])min_yx = np.maximum(box_a[:, :2], box_b[:2])inter = np.clip((max_yx - min_yx), a_min=0, a_max=np.inf)return inter[:, 0] * inter[:, 1]def jaccard_numpy(box_a, box_b):"""Compute the jaccard overlap of two sets of boxes."""inter = intersect(box_a, box_b)area_a = ((box_a[:, 2] - box_a[:, 0]) *(box_a[:, 3] - box_a[:, 1]))area_b = ((box_b[2] - box_b[0]) *(box_b[3] - box_b[1]))union = area_a + area_b - interreturn inter / uniondef random_sample_crop(image, boxes):"""Crop images and boxes randomly."""height, width, _ = image.shapemin_iou = np.random.choice([None, 0.1, 0.3, 0.5, 0.7, 0.9])if min_iou is None:return image, boxesfor _ in range(50):image_t = imagew = _rand(0.3, 1.0) * widthh = _rand(0.3, 1.0) * height# aspect ratio constraint b/t .5 & 2if h / w < 0.5 or h / w > 2:continueleft = _rand() * (width - w)top = _rand() * (height - h)rect = np.array([int(top), int(left), int(top + h), int(left + w)])overlap = jaccard_numpy(boxes, rect)# dropout some boxesdrop_mask = overlap > 0if not drop_mask.any():continueif overlap[drop_mask].min() < min_iou and overlap[drop_mask].max() > (min_iou + 0.2):continueimage_t = image_t[rect[0]:rect[2], rect[1]:rect[3], :]centers = (boxes[:, :2] + boxes[:, 2:4]) / 2.0m1 = (rect[0] < centers[:, 0]) * (rect[1] < centers[:, 1])m2 = (rect[2] > centers[:, 0]) * (rect[3] > centers[:, 1])# mask in that both m1 and m2 are truemask = m1 * m2 * drop_mask# have any valid boxes? try again if notif not mask.any():continue# take only matching gt boxesboxes_t = boxes[mask, :].copy()boxes_t[:, :2] = np.maximum(boxes_t[:, :2], rect[:2])boxes_t[:, :2] -= rect[:2]boxes_t[:, 2:4] = np.minimum(boxes_t[:, 2:4], rect[2:4])boxes_t[:, 2:4] -= rect[:2]return image_t, boxes_treturn image, boxesdef ssd_bboxes_encode(boxes):"""Labels anchors with ground truth inputs."""def jaccard_with_anchors(bbox):"""Compute jaccard score a box and the anchors."""# Intersection bbox and volume.ymin = np.maximum(y1, bbox[0])xmin = np.maximum(x1, bbox[1])ymax = np.minimum(y2, bbox[2])xmax = np.minimum(x2, bbox[3])w = np.maximum(xmax - xmin, 0.)h = np.maximum(ymax - ymin, 0.)# Volumes.inter_vol = h * wunion_vol = vol_anchors + (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]) - inter_voljaccard = inter_vol / union_volreturn np.squeeze(jaccard)pre_scores = np.zeros((8732), dtype=np.float32)t_boxes = np.zeros((8732, 4), dtype=np.float32)t_label = np.zeros((8732), dtype=np.int64)for bbox in boxes:label = int(bbox[4])scores = jaccard_with_anchors(bbox)idx = np.argmax(scores)scores[idx] = 2.0mask = (scores > matching_threshold)mask = mask & (scores > pre_scores)pre_scores = np.maximum(pre_scores, scores * mask)t_label = mask * label + (1 - mask) * t_labelfor i in range(4):t_boxes[:, i] = mask * bbox[i] + (1 - mask) * t_boxes[:, i]index = np.nonzero(t_label)# Transform to tlbr.bboxes = np.zeros((8732, 4), dtype=np.float32)bboxes[:, [0, 1]] = (t_boxes[:, [0, 1]] + t_boxes[:, [2, 3]]) / 2bboxes[:, [2, 3]] = t_boxes[:, [2, 3]] - t_boxes[:, [0, 1]]# Encode features.bboxes_t = bboxes[index]default_boxes_t = default_boxes[index]bboxes_t[:, :2] = (bboxes_t[:, :2] - default_boxes_t[:, :2]) / (default_boxes_t[:, 2:] * 0.1)tmp = np.maximum(bboxes_t[:, 2:4] / default_boxes_t[:, 2:4], 0.000001)bboxes_t[:, 2:4] = np.log(tmp) / 0.2bboxes[index] = bboxes_tnum_match = np.array([len(np.nonzero(t_label)[0])], dtype=np.int32)return bboxes, t_label.astype(np.int32), num_matchdef preprocess_fn(img_id, image, box, is_training):"""Preprocess function for dataset."""cv2.setNumThreads(2)def _infer_data(image, input_shape):img_h, img_w, _ = image.shapeinput_h, input_w = input_shapeimage = cv2.resize(image, (input_w, input_h))# When the channels of image is 1if len(image.shape) == 2:image = np.expand_dims(image, axis=-1)image = np.concatenate([image, image, image], axis=-1)return img_id, image, np.array((img_h, img_w), np.float32)def _data_aug(image, box, is_training, image_size=(300, 300)):ih, iw, _ = image.shapeh, w = image_sizeif not is_training:return _infer_data(image, image_size)# Random cropbox = box.astype(np.float32)image, box = random_sample_crop(image, box)ih, iw, _ = image.shape# Resize imageimage = cv2.resize(image, (w, h))# Flip image or notflip = _rand() < .5if flip:image = cv2.flip(image, 1, dst=None)# When the channels of image is 1if len(image.shape) == 2:image = np.expand_dims(image, axis=-1)image = np.concatenate([image, image, image], axis=-1)box[:, [0, 2]] = box[:, [0, 2]] / ihbox[:, [1, 3]] = box[:, [1, 3]] / iwif flip:box[:, [1, 3]] = 1 - box[:, [3, 1]]box, label, num_match = ssd_bboxes_encode(box)return image, box, label, num_matchreturn _data_aug(image, box, is_training, image_size=[300, 300])
数据集创建
from mindspore import Tensor
from mindspore.dataset import MindDataset
from mindspore.dataset.vision import Decode, HWC2CHW, Normalize, RandomColorAdjustdef create_ssd_dataset(mindrecord_file, batch_size=32, device_num=1, rank=0,is_training=True, num_parallel_workers=1, use_multiprocessing=True):"""Create SSD dataset with MindDataset."""dataset = MindDataset(mindrecord_file, columns_list=["img_id", "image", "annotation"], num_shards=device_num,shard_id=rank, num_parallel_workers=num_parallel_workers, shuffle=is_training)decode = Decode()dataset = dataset.map(operations=decode, input_columns=["image"])change_swap_op = HWC2CHW()# Computed from random subset of ImageNet training imagesnormalize_op = Normalize(mean=[0.485 * 255, 0.456 * 255, 0.406 * 255],std=[0.229 * 255, 0.224 * 255, 0.225 * 255])color_adjust_op = RandomColorAdjust(brightness=0.4, contrast=0.4, saturation=0.4)compose_map_func = (lambda img_id, image, annotation: preprocess_fn(img_id, image, annotation, is_training))if is_training:output_columns = ["image", "box", "label", "num_match"]trans = [color_adjust_op, normalize_op, change_swap_op]else:output_columns = ["img_id", "image", "image_shape"]trans = [normalize_op, change_swap_op]dataset = dataset.map(operations=compose_map_func, input_columns=["img_id", "image", "annotation"],output_columns=output_columns, python_multiprocessing=use_multiprocessing,num_parallel_workers=num_parallel_workers)dataset = dataset.map(operations=trans, input_columns=["image"], python_multiprocessing=use_multiprocessing,num_parallel_workers=num_parallel_workers)dataset = dataset.batch(batch_size, drop_remainder=True)return dataset
模型构建
SSD的网络结构主要分为以下几个部分:
-
VGG16 Base Layer
-
Extra Feature Layer
-
Detection Layer
-
NMS
-
Anchor
Backbone Layer
输入图像经过预处理后大小固定为300×300,首先经过backbone,本案例中使用的是VGG16网络的前13个卷积层,然后分别将VGG16的全连接层fc6和fc7转换成3 ×× 3卷积层block6和1 ×× 1卷积层block7,进一步提取特征。 在block6中,使用了空洞数为6的空洞卷积,其padding也为6,这样做同样也是为了增加感受野的同时保持参数量与特征图尺寸的不变。
Extra Feature Layer
在VGG16的基础上,SSD进一步增加了4个深度卷积层,用于提取更高层的语义信息:
block8-11,用于更高语义信息的提取。block8的通道数为512,而block9、block10与block11的通道数都为256。从block7到block11,这5个卷积后输出特征图的尺寸依次为19×19、10×10、5×5、3×3和1×1。为了降低参数量,使用了1×1卷积先降低通道数为该层输出通道数的一半,再利用3×3卷积进行特征提取。
Anchor
SSD采用了PriorBox来进行区域生成。将固定大小宽高的PriorBox作为先验的感兴趣区域,利用一个阶段完成能够分类与回归。设计大量的密集的PriorBox保证了对整幅图像的每个地方都有检测。PriorBox位置的表示形式是以中心点坐标和框的宽、高(cx,cy,w,h)来表示的,同时都转换成百分比的形式。 PriorBox生成规则: SSD由6个特征层来检测目标,在不同特征层上,PriorBox的尺寸scale大小是不一样的,最低层的scale=0.1,最高层的scale=0.95,其他层的计算公式如下:
在某个特征层上其scale一定,那么会设置不同长宽比ratio的PriorBox,其长和宽的计算公式如下:
在ratio=1的时候,还会根据该特征层和下一个特征层计算一个特定scale的PriorBox(长宽比ratio=1),计算公式如下:
每个特征层的每个点都会以上述规则生成PriorBox,(cx,cy)由当前点的中心点来确定,由此每个特征层都生成大量密集的PriorBox,如下图:
SSD使用了第4、7、8、9、10和11这6个卷积层得到的特征图,这6个特征图尺寸越来越小,而其对应的感受野越来越大。6个特征图上的每一个点分别对应4、6、6、6、4、4个PriorBox。某个特征图上的一个点根据下采样率可以得到在原图的坐标,以该坐标为中心生成4个或6个不同大小的PriorBox,然后利用特征图的特征去预测每一个PriorBox对应类别与位置的预测量。例如:第8个卷积层得到的特征图大小为10×10×512,每个点对应6个PriorBox,一共有600个PriorBox。定义MultiBox类,生成多个预测框。
Detection Layer
SSD模型一共有6个预测特征图,对于其中一个尺寸为m*n,通道为p的预测特征图,假设其每个像素点会产生k个anchor,每个anchor会对应c个类别和4个回归偏移量,使用(4+c)k个尺寸为3x3,通道为p的卷积核对该预测特征图进行卷积操作,得到尺寸为m*n,通道为(4+c)m*k的输出特征图,它包含了预测特征图上所产生的每个anchor的回归偏移量和各类别概率分数。所以对于尺寸为m*n的预测特征图,总共会产生(4+c)k*m*n个结果。cls分支的输出通道数为k*class_num,loc分支的输出通道数为k*4。
from mindspore import nndef _make_layer(channels):in_channels = channels[0]layers = []for out_channels in channels[1:]:layers.append(nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=3))layers.append(nn.ReLU())in_channels = out_channelsreturn nn.SequentialCell(layers)class Vgg16(nn.Cell):"""VGG16 module."""def __init__(self):super(Vgg16, self).__init__()self.b1 = _make_layer([3, 64, 64])self.b2 = _make_layer([64, 128, 128])self.b3 = _make_layer([128, 256, 256, 256])self.b4 = _make_layer([256, 512, 512, 512])self.b5 = _make_layer([512, 512, 512, 512])self.m1 = nn.MaxPool2d(kernel_size=2, stride=2, pad_mode='SAME')self.m2 = nn.MaxPool2d(kernel_size=2, stride=2, pad_mode='SAME')self.m3 = nn.MaxPool2d(kernel_size=2, stride=2, pad_mode='SAME')self.m4 = nn.MaxPool2d(kernel_size=2, stride=2, pad_mode='SAME')self.m5 = nn.MaxPool2d(kernel_size=3, stride=1, pad_mode='SAME')def construct(self, x):# block1x = self.b1(x)x = self.m1(x)# block2x = self.b2(x)x = self.m2(x)# block3x = self.b3(x)x = self.m3(x)# block4x = self.b4(x)block4 = xx = self.m4(x)# block5x = self.b5(x)x = self.m5(x)return block4, x
import mindspore as ms
import mindspore.nn as nn
import mindspore.ops as opsdef _last_conv2d(in_channel, out_channel, kernel_size=3, stride=1, pad_mod='same', pad=0):in_channels = in_channelout_channels = in_channeldepthwise_conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, pad_mode='same',padding=pad, group=in_channels)conv = nn.Conv2d(in_channel, out_channel, kernel_size=1, stride=1, padding=0, pad_mode='same', has_bias=True)bn = nn.BatchNorm2d(in_channel, eps=1e-3, momentum=0.97,gamma_init=1, beta_init=0, moving_mean_init=0, moving_var_init=1)return nn.SequentialCell([depthwise_conv, bn, nn.ReLU6(), conv])class FlattenConcat(nn.Cell):"""FlattenConcat module."""def __init__(self):super(FlattenConcat, self).__init__()self.num_ssd_boxes = 8732def construct(self, inputs):output = ()batch_size = ops.shape(inputs[0])[0]for x in inputs:x = ops.transpose(x, (0, 2, 3, 1))output += (ops.reshape(x, (batch_size, -1)),)res = ops.concat(output, axis=1)return ops.reshape(res, (batch_size, self.num_ssd_boxes, -1))class MultiBox(nn.Cell):"""Multibox conv layers. Each multibox layer contains class conf scores and localization predictions."""def __init__(self):super(MultiBox, self).__init__()num_classes = 81out_channels = [512, 1024, 512, 256, 256, 256]num_default = [4, 6, 6, 6, 4, 4]loc_layers = []cls_layers = []for k, out_channel in enumerate(out_channels):loc_layers += [_last_conv2d(out_channel, 4 * num_default[k],kernel_size=3, stride=1, pad_mod='same', pad=0)]cls_layers += [_last_conv2d(out_channel, num_classes * num_default[k],kernel_size=3, stride=1, pad_mod='same', pad=0)]self.multi_loc_layers = nn.CellList(loc_layers)self.multi_cls_layers = nn.CellList(cls_layers)self.flatten_concat = FlattenConcat()def construct(self, inputs):loc_outputs = ()cls_outputs = ()for i in range(len(self.multi_loc_layers)):loc_outputs += (self.multi_loc_layers[i](inputs[i]),)cls_outputs += (self.multi_cls_layers[i](inputs[i]),)return self.flatten_concat(loc_outputs), self.flatten_concat(cls_outputs)class SSD300Vgg16(nn.Cell):"""SSD300Vgg16 module."""def __init__(self):super(SSD300Vgg16, self).__init__()# VGG16 backbone: block1~5self.backbone = Vgg16()# SSD blocks: block6~7self.b6_1 = nn.Conv2d(in_channels=512, out_channels=1024, kernel_size=3, padding=6, dilation=6, pad_mode='pad')self.b6_2 = nn.Dropout(p=0.5)self.b7_1 = nn.Conv2d(in_channels=1024, out_channels=1024, kernel_size=1)self.b7_2 = nn.Dropout(p=0.5)# Extra Feature Layers: block8~11self.b8_1 = nn.Conv2d(in_channels=1024, out_channels=256, kernel_size=1, padding=1, pad_mode='pad')self.b8_2 = nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=2, pad_mode='valid')self.b9_1 = nn.Conv2d(in_channels=512, out_channels=128, kernel_size=1, padding=1, pad_mode='pad')self.b9_2 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, pad_mode='valid')self.b10_1 = nn.Conv2d(in_channels=256, out_channels=128, kernel_size=1)self.b10_2 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, pad_mode='valid')self.b11_1 = nn.Conv2d(in_channels=256, out_channels=128, kernel_size=1)self.b11_2 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, pad_mode='valid')# boxesself.multi_box = MultiBox()def construct(self, x):# VGG16 backbone: block1~5block4, x = self.backbone(x)# SSD blocks: block6~7x = self.b6_1(x) # 1024x = self.b6_2(x)x = self.b7_1(x) # 1024x = self.b7_2(x)block7 = x# Extra Feature Layers: block8~11x = self.b8_1(x) # 256x = self.b8_2(x) # 512block8 = xx = self.b9_1(x) # 128x = self.b9_2(x) # 256block9 = xx = self.b10_1(x) # 128x = self.b10_2(x) # 256block10 = xx = self.b11_1(x) # 128x = self.b11_2(x) # 256block11 = x# boxesmulti_feature = (block4, block7, block8, block9, block10, block11)pred_loc, pred_label = self.multi_box(multi_feature)if not self.training:pred_label = ops.sigmoid(pred_label)pred_loc = pred_loc.astype(ms.float32)pred_label = pred_label.astype(ms.float32)return pred_loc, pred_label
损失函数
SSD算法的目标函数分为两部分:计算相应的预选框与目标类别的置信度误差(confidence loss, conf)以及相应的位置误差(locatization loss, loc):
其中:
N 是先验框的正样本数量;
c 为类别置信度预测值;
l 为先验框的所对应边界框的位置预测值;
g 为ground truth的位置参数
α 用以调整confidence loss和location loss之间的比例,默认为1。
对于位置损失函数
针对所有的正样本,采用 Smooth L1 Loss, 位置信息都是 encode 之后的位置信息。
对于置信度损失函数
置信度损失是多类置信度(c)上的softmax损失。
def class_loss(logits, label):"""Calculate category losses."""label = ops.one_hot(label, ops.shape(logits)[-1], Tensor(1.0, ms.float32), Tensor(0.0, ms.float32))weight = ops.ones_like(logits)pos_weight = ops.ones_like(logits)sigmiod_cross_entropy = ops.binary_cross_entropy_with_logits(logits, label, weight.astype(ms.float32), pos_weight.astype(ms.float32))sigmoid = ops.sigmoid(logits)label = label.astype(ms.float32)p_t = label * sigmoid + (1 - label) * (1 - sigmoid)modulating_factor = ops.pow(1 - p_t, 2.0)alpha_weight_factor = label * 0.75 + (1 - label) * (1 - 0.75)focal_loss = modulating_factor * alpha_weight_factor * sigmiod_cross_entropyreturn focal_loss
Metrics
在SSD中,训练过程是不需要用到非极大值抑制(NMS),但当进行检测时,例如输入一张图片要求输出框的时候,需要用到NMS过滤掉那些重叠度较大的预测框。
非极大值抑制的流程如下:
-
根据置信度得分进行排序
-
选择置信度最高的比边界框添加到最终输出列表中,将其从边界框列表中删除
-
计算所有边界框的面积
-
计算置信度最高的边界框与其它候选框的IoU
-
删除IoU大于阈值的边界框
-
重复上述过程,直至边界框列表为空
import json
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOevaldef apply_eval(eval_param_dict):net = eval_param_dict["net"]net.set_train(False)ds = eval_param_dict["dataset"]anno_json = eval_param_dict["anno_json"]coco_metrics = COCOMetrics(anno_json=anno_json,classes=train_cls,num_classes=81,max_boxes=100,nms_threshold=0.6,min_score=0.1)for data in ds.create_dict_iterator(output_numpy=True, num_epochs=1):img_id = data['img_id']img_np = data['image']image_shape = data['image_shape']output = net(Tensor(img_np))for batch_idx in range(img_np.shape[0]):pred_batch = {"boxes": output[0].asnumpy()[batch_idx],"box_scores": output[1].asnumpy()[batch_idx],"img_id": int(np.squeeze(img_id[batch_idx])),"image_shape": image_shape[batch_idx]}coco_metrics.update(pred_batch)eval_metrics = coco_metrics.get_metrics()return eval_metricsdef apply_nms(all_boxes, all_scores, thres, max_boxes):"""Apply NMS to bboxes."""y1 = all_boxes[:, 0]x1 = all_boxes[:, 1]y2 = all_boxes[:, 2]x2 = all_boxes[:, 3]areas = (x2 - x1 + 1) * (y2 - y1 + 1)order = all_scores.argsort()[::-1]keep = []while order.size > 0:i = order[0]keep.append(i)if len(keep) >= max_boxes:breakxx1 = np.maximum(x1[i], x1[order[1:]])yy1 = np.maximum(y1[i], y1[order[1:]])xx2 = np.minimum(x2[i], x2[order[1:]])yy2 = np.minimum(y2[i], y2[order[1:]])w = np.maximum(0.0, xx2 - xx1 + 1)h = np.maximum(0.0, yy2 - yy1 + 1)inter = w * hovr = inter / (areas[i] + areas[order[1:]] - inter)inds = np.where(ovr <= thres)[0]order = order[inds + 1]return keepclass COCOMetrics:"""Calculate mAP of predicted bboxes."""def __init__(self, anno_json, classes, num_classes, min_score, nms_threshold, max_boxes):self.num_classes = num_classesself.classes = classesself.min_score = min_scoreself.nms_threshold = nms_thresholdself.max_boxes = max_boxesself.val_cls_dict = {i: cls for i, cls in enumerate(classes)}self.coco_gt = COCO(anno_json)cat_ids = self.coco_gt.loadCats(self.coco_gt.getCatIds())self.class_dict = {cat['name']: cat['id'] for cat in cat_ids}self.predictions = []self.img_ids = []def update(self, batch):pred_boxes = batch['boxes']box_scores = batch['box_scores']img_id = batch['img_id']h, w = batch['image_shape']final_boxes = []final_label = []final_score = []self.img_ids.append(img_id)for c in range(1, self.num_classes):class_box_scores = box_scores[:, c]score_mask = class_box_scores > self.min_scoreclass_box_scores = class_box_scores[score_mask]class_boxes = pred_boxes[score_mask] * [h, w, h, w]if score_mask.any():nms_index = apply_nms(class_boxes, class_box_scores, self.nms_threshold, self.max_boxes)class_boxes = class_boxes[nms_index]class_box_scores = class_box_scores[nms_index]final_boxes += class_boxes.tolist()final_score += class_box_scores.tolist()final_label += [self.class_dict[self.val_cls_dict[c]]] * len(class_box_scores)for loc, label, score in zip(final_boxes, final_label, final_score):res = {}res['image_id'] = img_idres['bbox'] = [loc[1], loc[0], loc[3] - loc[1], loc[2] - loc[0]]res['score'] = scoreres['category_id'] = labelself.predictions.append(res)def get_metrics(self):with open('predictions.json', 'w') as f:json.dump(self.predictions, f)coco_dt = self.coco_gt.loadRes('predictions.json')E = COCOeval(self.coco_gt, coco_dt, iouType='bbox')E.params.imgIds = self.img_idsE.evaluate()E.accumulate()E.summarize()return E.stats[0]class SsdInferWithDecoder(nn.Cell):"""
SSD Infer wrapper to decode the bbox locations."""def __init__(self, network, default_boxes, ckpt_path):super(SsdInferWithDecoder, self).__init__()param_dict = ms.load_checkpoint(ckpt_path)ms.load_param_into_net(network, param_dict)self.network = networkself.default_boxes = default_boxesself.prior_scaling_xy = 0.1self.prior_scaling_wh = 0.2def construct(self, x):pred_loc, pred_label = self.network(x)default_bbox_xy = self.default_boxes[..., :2]default_bbox_wh = self.default_boxes[..., 2:]pred_xy = pred_loc[..., :2] * self.prior_scaling_xy * default_bbox_wh + default_bbox_xypred_wh = ops.exp(pred_loc[..., 2:] * self.prior_scaling_wh) * default_bbox_whpred_xy_0 = pred_xy - pred_wh / 2.0pred_xy_1 = pred_xy + pred_wh / 2.0pred_xy = ops.concat((pred_xy_0, pred_xy_1), -1)pred_xy = ops.maximum(pred_xy, 0)pred_xy = ops.minimum(pred_xy, 1)return pred_xy, pred_label
训练过程
(1)先验框匹配
在训练过程中,首先要确定训练图片中的ground truth(真实目标)与哪个先验框来进行匹配,与之匹配的先验框所对应的边界框将负责预测它。
SSD的先验框与ground truth的匹配原则主要有两点:
-
对于图片中每个ground truth,找到与其IOU最大的先验框,该先验框与其匹配,这样可以保证每个ground truth一定与某个先验框匹配。通常称与ground truth匹配的先验框为正样本,反之,若一个先验框没有与任何ground truth进行匹配,那么该先验框只能与背景匹配,就是负样本。
-
对于剩余的未匹配先验框,若某个ground truth的IOU大于某个阈值(一般是0.5),那么该先验框也与这个ground truth进行匹配。尽管一个ground truth可以与多个先验框匹配,但是ground truth相对先验框还是太少了,所以负样本相对正样本会很多。为了保证正负样本尽量平衡,SSD采用了hard negative mining,就是对负样本进行抽样,抽样时按照置信度误差(预测背景的置信度越小,误差越大)进行降序排列,选取误差的较大的top-k作为训练的负样本,以保证正负样本比例接近1:3。
注意点:
-
通常称与gt匹配的prior为正样本,反之,若某一个prior没有与任何一个gt匹配,则为负样本。
-
某个gt可以和多个prior匹配,而每个prior只能和一个gt进行匹配。
-
如果多个gt和某一个prior的IOU均大于阈值,那么prior只与IOU最大的那个进行匹配。
如上图所示,训练过程中的 prior boxes 和 ground truth boxes 的匹配,基本思路是:让每一个 prior box 回归并且到 ground truth box,这个过程的调控我们需要损失层的帮助,他会计算真实值和预测值之间的误差,从而指导学习的走向。
(2)损失函数
损失函数使用的是上文提到的位置损失函数和置信度损失函数的加权和。
(3)数据增强
使用之前定义好的数据增强方式,对创建好的数据增强方式进行数据增强。
模型训练时,设置模型训练的epoch次数为60,然后通过create_ssd_dataset类创建了训练集和验证集。batch_size大小为5,图像尺寸统一调整为300×300。损失函数使用位置损失函数和置信度损失函数的加权和,优化器使用Momentum,并设置初始学习率为0.001。回调函数方面使用了LossMonitor和TimeMonitor来监控训练过程中每个epoch结束后,损失值Loss的变化情况以及每个epoch、每个step的运行时间。设置每训练10个epoch保存一次模型。
import math
import itertools as itfrom mindspore import set_seedclass GeneratDefaultBoxes():"""Generate Default boxes for SSD, follows the order of (W, H, archor_sizes).`self.default_boxes` has a shape of [archor_sizes, H, W, 4], the last dimension is [y, x, h, w].`self.default_boxes_tlbr` has a shape as `self.default_boxes`, the last dimension is [y1, x1, y2, x2]."""def __init__(self):fk = 300 / np.array([8, 16, 32, 64, 100, 300])scale_rate = (0.95 - 0.1) / (len([4, 6, 6, 6, 4, 4]) - 1)scales = [0.1 + scale_rate * i for i in range(len([4, 6, 6, 6, 4, 4]))] + [1.0]self.default_boxes = []for idex, feature_size in enumerate([38, 19, 10, 5, 3, 1]):sk1 = scales[idex]sk2 = scales[idex + 1]sk3 = math.sqrt(sk1 * sk2)if idex == 0 and not [[2], [2, 3], [2, 3], [2, 3], [2], [2]][idex]:w, h = sk1 * math.sqrt(2), sk1 / math.sqrt(2)all_sizes = [(0.1, 0.1), (w, h), (h, w)]else:all_sizes = [(sk1, sk1)]for aspect_ratio in [[2], [2, 3], [2, 3], [2, 3], [2], [2]][idex]:w, h = sk1 * math.sqrt(aspect_ratio), sk1 / math.sqrt(aspect_ratio)all_sizes.append((w, h))all_sizes.append((h, w))all_sizes.append((sk3, sk3))assert len(all_sizes) == [4, 6, 6, 6, 4, 4][idex]for i, j in it.product(range(feature_size), repeat=2):for w, h in all_sizes:cx, cy = (j + 0.5) / fk[idex], (i + 0.5) / fk[idex]self.default_boxes.append([cy, cx, h, w])def to_tlbr(cy, cx, h, w):return cy - h / 2, cx - w / 2, cy + h / 2, cx + w / 2# For IoU calculationself.default_boxes_tlbr = np.array(tuple(to_tlbr(*i) for i in self.default_boxes), dtype='float32')self.default_boxes = np.array(self.default_boxes, dtype='float32')default_boxes_tlbr = GeneratDefaultBoxes().default_boxes_tlbr
default_boxes = GeneratDefaultBoxes().default_boxesy1, x1, y2, x2 = np.split(default_boxes_tlbr[:, :4], 4, axis=-1)
vol_anchors = (x2 - x1) * (y2 - y1)
matching_threshold = 0.5
from mindspore.common.initializer import initializer, TruncatedNormaldef init_net_param(network, initialize_mode='TruncatedNormal'):"""Init the parameters in net."""params = network.trainable_params()for p in params:if 'beta' not in p.name and 'gamma' not in p.name and 'bias' not in p.name:if initialize_mode == 'TruncatedNormal':p.set_data(initializer(TruncatedNormal(0.02), p.data.shape, p.data.dtype))else:p.set_data(initialize_mode, p.data.shape, p.data.dtype)def get_lr(global_step, lr_init, lr_end, lr_max, warmup_epochs, total_epochs, steps_per_epoch):""" generate learning rate array"""lr_each_step = []total_steps = steps_per_epoch * total_epochswarmup_steps = steps_per_epoch * warmup_epochsfor i in range(total_steps):if i < warmup_steps:lr = lr_init + (lr_max - lr_init) * i / warmup_stepselse:lr = lr_end + (lr_max - lr_end) * (1. + math.cos(math.pi * (i - warmup_steps) / (total_steps - warmup_steps))) / 2.if lr < 0.0:lr = 0.0lr_each_step.append(lr)current_step = global_steplr_each_step = np.array(lr_each_step).astype(np.float32)learning_rate = lr_each_step[current_step:]return learning_rate
import mindspore.dataset as ds
ds.config.set_enable_shared_mem(False)import timefrom mindspore.amp import DynamicLossScalerset_seed(1)# load data
mindrecord_dir = "./datasets/MindRecord_COCO"
mindrecord_file = "./datasets/MindRecord_COCO/ssd.mindrecord0"dataset = create_ssd_dataset(mindrecord_file, batch_size=5, rank=0, use_multiprocessing=True)
dataset_size = dataset.get_dataset_size()image, get_loc, gt_label, num_matched_boxes = next(dataset.create_tuple_iterator())# Network definition and initialization
network = SSD300Vgg16()
init_net_param(network)# Define the learning rate
lr = Tensor(get_lr(global_step=0 * dataset_size,lr_init=0.001, lr_end=0.001 * 0.05, lr_max=0.05,warmup_epochs=2, total_epochs=60, steps_per_epoch=dataset_size))# Define the optimizer
opt = nn.Momentum(filter(lambda x: x.requires_grad, network.get_parameters()), lr,0.9, 0.00015, float(1024))# Define the forward procedure
def forward_fn(x, gt_loc, gt_label, num_matched_boxes):pred_loc, pred_label = network(x)mask = ops.less(0, gt_label).astype(ms.float32)num_matched_boxes = ops.sum(num_matched_boxes.astype(ms.float32))# Positioning lossmask_loc = ops.tile(ops.expand_dims(mask, -1), (1, 1, 4))smooth_l1 = nn.SmoothL1Loss()(pred_loc, gt_loc) * mask_locloss_loc = ops.sum(ops.sum(smooth_l1, -1), -1)# Category lossloss_cls = class_loss(pred_label, gt_label)loss_cls = ops.sum(loss_cls, (1, 2))return ops.sum((loss_cls + loss_loc) / num_matched_boxes)grad_fn = ms.value_and_grad(forward_fn, None, opt.parameters, has_aux=False)
loss_scaler = DynamicLossScaler(1024, 2, 1000)# Gradient updates
def train_step(x, gt_loc, gt_label, num_matched_boxes):loss, grads = grad_fn(x, gt_loc, gt_label, num_matched_boxes)opt(grads)return lossprint("=================== Starting Training =====================")
for epoch in range(60):network.set_train(True)begin_time = time.time()for step, (image, get_loc, gt_label, num_matched_boxes) in enumerate(dataset.create_tuple_iterator()):loss = train_step(image, get_loc, gt_label, num_matched_boxes)end_time = time.time()times = end_time - begin_timeprint(f"Epoch:[{int(epoch + 1)}/{int(60)}], "f"loss:{loss} , "f"time:{times}s ")
ms.save_checkpoint(network, "ssd-60_9.ckpt")
print("=================== Training Success =====================")
评估
自定义eval_net()类对训练好的模型进行评估,调用了上述定义的SsdInferWithDecoder类返回预测的坐标及标签,然后分别计算了在不同的IoU阈值、area和maxDets设置下的Average Precision(AP)和Average Recall(AR)。使用COCOMetrics类计算mAP。模型在测试集上的评估指标如下。
精确率(AP)和召回率(AR)的解释
-
TP:IoU>设定的阈值的检测框数量(同一Ground Truth只计算一次)。
-
FP:IoU<=设定的阈值的检测框,或者是检测到同一个GT的多余检测框的数量。
-
FN:没有检测到的GT的数量。
精确率(AP)和召回率(AR)的公式
-
精确率(Average Precision,AP):
精确率是将正样本预测正确的结果与正样本预测的结果和预测错误的结果的和的比值,主要反映出预测结果错误率。
-
召回率(Average Recall,AR):
召回率是正样本预测正确的结果与正样本预测正确的结果和正样本预测错误的和的比值,主要反映出来的是预测结果中的漏检率。
关于以下代码运行结果的输出指标
-
第一个值即为mAP(mean Average Precision), 即各类别AP的平均值。
-
第二个值是iou取0.5的mAP值,是voc的评判标准。
-
第三个值是评判较为严格的mAP值,可以反应算法框的位置精准程度;中间几个数为物体大小的mAP值。
对于AR看一下maxDets=10/100的mAR值,反应检出率,如果两者接近,说明对于这个数据集来说,不用检测出100个框,可以提高性能。
mindrecord_file = "./datasets/MindRecord_COCO/ssd_eval.mindrecord0"def ssd_eval(dataset_path, ckpt_path, anno_json):"""SSD evaluation."""batch_size = 1ds = create_ssd_dataset(dataset_path, batch_size=batch_size,is_training=False, use_multiprocessing=False)network = SSD300Vgg16()print("Load Checkpoint!")net = SsdInferWithDecoder(network, Tensor(default_boxes), ckpt_path)net.set_train(False)total = ds.get_dataset_size() * batch_sizeprint("\n========================================\n")print("total images num: ", total)eval_param_dict = {"net": net, "dataset": ds, "anno_json": anno_json}mAP = apply_eval(eval_param_dict)print("\n========================================\n")print(f"mAP: {mAP}")def eval_net():print("Start Eval!")ssd_eval(mindrecord_file, "./ssd-60_9.ckpt", anno_json)eval_net()
最后打卡今天的学习时间
心得
今天我学习了Vision Transformer图像分类、SSD目标检测。Vision Transformer(ViT)通过将图像划分为多个patch,并将每个patch转化为一维向量,结合自注意力机制进行图像分类,展现了在不依赖卷积操作情况下的强大性能。ViT的结构主要基于Transformer的Encoder,调整了Normalization的位置,并使用Multi-head Attention机制,最终通过全连接层进行分类。SSD(Single Shot MultiBox Detector)目标检测算法通过单阶段检测方法和多尺度检测策略,在VGG16基础上新增卷积层以获取更多特征图,使用anchor策略预设不同长宽比例的anchor进行检测。SSD结合了高精度和高速度,成为目标检测领域的重要算法。ViT和SSD都展示了在图像处理和目标检测中的创新应用,推动了计算机视觉技术的发展。