MobileNetv2模型原理介绍
相比于传统的卷积神经网络,MobileNet网络使用深度可分离卷积(Depthwise Separable Convolution)的思想在准确率小幅度降低的前提下,大大减小了模型参数与运算量。并引入宽度系数α和分辨率系数β使模型满足不同应用场景的需求。
由于MobileNet网络中Relu激活函数处理低维特征信息时会存在大量的丢失,所以MobileNetV2网络提出使用倒残差结构(Inverted residual block)和Linear Bottlenecks来设计网络,以提高模型的准确率,且优化后的模型更小。
图中Inverted residual block结构是先使用1x1卷积进行升维,然后使用3x3的DepthWise卷积,最后使用1x1的卷积进行降维。
数据处理
数据准备
from download import download# 下载data_en数据集
url = "https://ascend-professional-construction-dataset.obs.cn-north-4.myhuaweicloud.com:443/MindStudio-pc/data_en.zip"
path = download(url, "./", kind="zip", replace=True)
数据加载
import math
import numpy as np
import os
import randomfrom matplotlib import pyplot as plt
from easydict import EasyDict
from PIL import Image
import numpy as np
import mindspore.nn as nn
from mindspore import ops as P
from mindspore.ops import add
from mindspore import Tensor
import mindspore.common.dtype as mstype
import mindspore.dataset as de
import mindspore.dataset.vision as C
import mindspore.dataset.transforms as C2
import mindspore as ms
from mindspore import set_context, nn, Tensor, load_checkpoint, save_checkpoint, export
from mindspore.train import Model
from mindspore.train import Callback, LossMonitor, ModelCheckpoint, CheckpointConfigos.environ['GLOG_v'] = '3' # Log level includes 3(ERROR), 2(WARNING), 1(INFO), 0(DEBUG).
os.environ['GLOG_logtostderr'] = '0' # 0:输出到文件,1:输出到屏幕
os.environ['GLOG_log_dir'] = '../../log' # 日志目录
os.environ['GLOG_stderrthreshold'] = '2' # 输出到目录也输出到屏幕:3(ERROR), 2(WARNING), 1(INFO), 0(DEBUG).
set_context(mode=ms.GRAPH_MODE, device_target="CPU", device_id=0) # 设置采用图模式执行,设备为Ascend#
配置后续训练、验证、推理用到的参数:
# 垃圾分类数据集标签,以及用于标签映射的字典。
garbage_classes = {'干垃圾': ['贝壳', '打火机', '旧镜子', '扫把', '陶瓷碗', '牙刷', '一次性筷子', '脏污衣服'],'可回收物': ['报纸', '玻璃制品', '篮球', '塑料瓶', '硬纸板', '玻璃瓶', '金属制品', '帽子', '易拉罐', '纸张'],'湿垃圾': ['菜叶', '橙皮', '蛋壳', '香蕉皮'],'有害垃圾': ['电池', '药片胶囊', '荧光灯', '油漆桶']
}class_cn = ['贝壳', '打火机', '旧镜子', '扫把', '陶瓷碗', '牙刷', '一次性筷子', '脏污衣服','报纸', '玻璃制品', '篮球', '塑料瓶', '硬纸板', '玻璃瓶', '金属制品', '帽子', '易拉罐', '纸张','菜叶', '橙皮', '蛋壳', '香蕉皮','电池', '药片胶囊', '荧光灯', '油漆桶']
class_en = ['Seashell', 'Lighter','Old Mirror', 'Broom','Ceramic Bowl', 'Toothbrush','Disposable Chopsticks','Dirty Cloth','Newspaper', 'Glassware', 'Basketball', 'Plastic Bottle', 'Cardboard','Glass Bottle', 'Metalware', 'Hats', 'Cans', 'Paper','Vegetable Leaf','Orange Peel', 'Eggshell','Banana Peel','Battery', 'Tablet capsules','Fluorescent lamp', 'Paint bucket']index_en = {'Seashell': 0, 'Lighter': 1, 'Old Mirror': 2, 'Broom': 3, 'Ceramic Bowl': 4, 'Toothbrush': 5, 'Disposable Chopsticks': 6, 'Dirty Cloth': 7,'Newspaper': 8, 'Glassware': 9, 'Basketball': 10, 'Plastic Bottle': 11, 'Cardboard': 12, 'Glass Bottle': 13, 'Metalware': 14, 'Hats': 15, 'Cans': 16, 'Paper': 17,'Vegetable Leaf': 18, 'Orange Peel': 19, 'Eggshell': 20, 'Banana Peel': 21,'Battery': 22, 'Tablet capsules': 23, 'Fluorescent lamp': 24, 'Paint bucket': 25}# 训练超参
config = EasyDict({"num_classes": 26,"image_height": 224,"image_width": 224,#"data_split": [0.9, 0.1],"backbone_out_channels":1280,"batch_size": 16,"eval_batch_size": 8,"epochs": 10,"lr_max": 0.05,"momentum": 0.9,"weight_decay": 1e-4,"save_ckpt_epochs": 1,"dataset_path": "./data_en","class_index": index_en,"pretrained_ckpt": "./mobilenetV2-200_1067.ckpt" # mobilenetV2-200_1067.ckpt
})
进行数据预处理:
def create_dataset(dataset_path, config, training=True, buffer_size=1000):"""create a train or eval datasetArgs:dataset_path(string): the path of dataset.config(struct): the config of train and eval in diffirent platform.Returns:train_dataset, val_dataset"""data_path = os.path.join(dataset_path, 'train' if training else 'test')ds = de.ImageFolderDataset(data_path, num_parallel_workers=4, class_indexing=config.class_index)resize_height = config.image_heightresize_width = config.image_widthnormalize_op = C.Normalize(mean=[0.485*255, 0.456*255, 0.406*255], std=[0.229*255, 0.224*255, 0.225*255])change_swap_op = C.HWC2CHW()type_cast_op = C2.TypeCast(mstype.int32)if training:crop_decode_resize = C.RandomCropDecodeResize(resize_height, scale=(0.08, 1.0), ratio=(0.75, 1.333))horizontal_flip_op = C.RandomHorizontalFlip(prob=0.5)color_adjust = C.RandomColorAdjust(brightness=0.4, contrast=0.4, saturation=0.4)train_trans = [crop_decode_resize, horizontal_flip_op, color_adjust, normalize_op, change_swap_op]train_ds = ds.map(input_columns="image", operations=train_trans, num_parallel_workers=4)train_ds = train_ds.map(input_columns="label", operations=type_cast_op, num_parallel_workers=4)train_ds = train_ds.shuffle(buffer_size=buffer_size)ds = train_ds.batch(config.batch_size, drop_remainder=True)else:decode_op = C.Decode()resize_op = C.Resize((int(resize_width/0.875), int(resize_width/0.875)))center_crop = C.CenterCrop(resize_width)eval_trans = [decode_op, resize_op, center_crop, normalize_op, change_swap_op]eval_ds = ds.map(input_columns="image", operations=eval_trans, num_parallel_workers=4)eval_ds = eval_ds.map(input_columns="label", operations=type_cast_op, num_parallel_workers=4)ds = eval_ds.batch(config.eval_batch_size, drop_remainder=True)return ds
MobileNetV2模型搭建
__all__ = ['MobileNetV2', 'MobileNetV2Backbone', 'MobileNetV2Head', 'mobilenet_v2']def _make_divisible(v, divisor, min_value=None):if min_value is None:min_value = divisornew_v = max(min_value, int(v + divisor / 2) // divisor * divisor)if new_v < 0.9 * v:new_v += divisorreturn new_vclass GlobalAvgPooling(nn.Cell):"""Global avg pooling definition.Args:Returns:Tensor, output tensor.Examples:>>> GlobalAvgPooling()"""def __init__(self):super(GlobalAvgPooling, self).__init__()def construct(self, x):x = P.mean(x, (2, 3))return xclass ConvBNReLU(nn.Cell):"""Convolution/Depthwise fused with Batchnorm and ReLU block definition.Args:in_planes (int): Input channel.out_planes (int): Output channel.kernel_size (int): Input kernel size.stride (int): Stride size for the first convolutional layer. Default: 1.groups (int): channel group. Convolution is 1 while Depthiwse is input channel. Default: 1.Returns:Tensor, output tensor.Examples:>>> ConvBNReLU(16, 256, kernel_size=1, stride=1, groups=1)"""def __init__(self, in_planes, out_planes, kernel_size=3, stride=1, groups=1):super(ConvBNReLU, self).__init__()padding = (kernel_size - 1) // 2in_channels = in_planesout_channels = out_planesif groups == 1:conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, pad_mode='pad', padding=padding)else:out_channels = in_planesconv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, pad_mode='pad',padding=padding, group=in_channels)layers = [conv, nn.BatchNorm2d(out_planes), nn.ReLU6()]self.features = nn.SequentialCell(layers)def construct(self, x):output = self.features(x)return outputclass InvertedResidual(nn.Cell):"""Mobilenetv2 residual block definition.Args:inp (int): Input channel.oup (int): Output channel.stride (int): Stride size for the first convolutional layer. Default: 1.expand_ratio (int): expand ration of input channelReturns:Tensor, output tensor.Examples:>>> ResidualBlock(3, 256, 1, 1)"""def __init__(self, inp, oup, stride, expand_ratio):super(InvertedResidual, self).__init__()assert stride in [1, 2]hidden_dim = int(round(inp * expand_ratio))self.use_res_connect = stride == 1 and inp == ouplayers = []if expand_ratio != 1:layers.append(ConvBNReLU(inp, hidden_dim, kernel_size=1))layers.extend([ConvBNReLU(hidden_dim, hidden_dim,stride=stride, groups=hidden_dim),nn.Conv2d(hidden_dim, oup, kernel_size=1,stride=1, has_bias=False),nn.BatchNorm2d(oup),])self.conv = nn.SequentialCell(layers)self.cast = P.Cast()def construct(self, x):identity = xx = self.conv(x)if self.use_res_connect:return P.add(identity, x)return xclass MobileNetV2Backbone(nn.Cell):"""MobileNetV2 architecture.Args:class_num (int): number of classes.width_mult (int): Channels multiplier for round to 8/16 and others. Default is 1.has_dropout (bool): Is dropout used. Default is falseinverted_residual_setting (list): Inverted residual settings. Default is Noneround_nearest (list): Channel round to . Default is 8Returns:Tensor, output tensor.Examples:>>> MobileNetV2(num_classes=1000)"""def __init__(self, width_mult=1., inverted_residual_setting=None, round_nearest=8,input_channel=32, last_channel=1280):super(MobileNetV2Backbone, self).__init__()block = InvertedResidual# setting of inverted residual blocksself.cfgs = inverted_residual_settingif inverted_residual_setting is None:self.cfgs = [# t, c, n, s[1, 16, 1, 1],[6, 24, 2, 2],[6, 32, 3, 2],[6, 64, 4, 2],[6, 96, 3, 1],[6, 160, 3, 2],[6, 320, 1, 1],]# building first layerinput_channel = _make_divisible(input_channel * width_mult, round_nearest)self.out_channels = _make_divisible(last_channel * max(1.0, width_mult), round_nearest)features = [ConvBNReLU(3, input_channel, stride=2)]# building inverted residual blocksfor t, c, n, s in self.cfgs:output_channel = _make_divisible(c * width_mult, round_nearest)for i in range(n):stride = s if i == 0 else 1features.append(block(input_channel, output_channel, stride, expand_ratio=t))input_channel = output_channelfeatures.append(ConvBNReLU(input_channel, self.out_channels, kernel_size=1))self.features = nn.SequentialCell(features)self._initialize_weights()def construct(self, x):x = self.features(x)return xdef _initialize_weights(self):"""Initialize weights.Args:Returns:None.Examples:>>> _initialize_weights()"""self.init_parameters_data()for _, m in self.cells_and_names():if isinstance(m, nn.Conv2d):n = m.kernel_size[0] * m.kernel_size[1] * m.out_channelsm.weight.set_data(Tensor(np.random.normal(0, np.sqrt(2. / n),m.weight.data.shape).astype("float32")))if m.bias is not None:m.bias.set_data(Tensor(np.zeros(m.bias.data.shape, dtype="float32")))elif isinstance(m, nn.BatchNorm2d):m.gamma.set_data(Tensor(np.ones(m.gamma.data.shape, dtype="float32")))m.beta.set_data(Tensor(np.zeros(m.beta.data.shape, dtype="float32")))@propertydef get_features(self):return self.featuresclass MobileNetV2Head(nn.Cell):"""MobileNetV2 architecture.Args:class_num (int): Number of classes. Default is 1000.has_dropout (bool): Is dropout used. Default is falseReturns:Tensor, output tensor.Examples:>>> MobileNetV2(num_classes=1000)"""def __init__(self, input_channel=1280, num_classes=1000, has_dropout=False, activation="None"):super(MobileNetV2Head, self).__init__()# mobilenet headhead = ([GlobalAvgPooling(), nn.Dense(input_channel, num_classes, has_bias=True)] if not has_dropout else[GlobalAvgPooling(), nn.Dropout(0.2), nn.Dense(input_channel, num_classes, has_bias=True)])self.head = nn.SequentialCell(head)self.need_activation = Trueif activation == "Sigmoid":self.activation = nn.Sigmoid()elif activation == "Softmax":self.activation = nn.Softmax()else:self.need_activation = Falseself._initialize_weights()def construct(self, x):x = self.head(x)if self.need_activation:x = self.activation(x)return xdef _initialize_weights(self):"""Initialize weights.Args:Returns:None.Examples:>>> _initialize_weights()"""self.init_parameters_data()for _, m in self.cells_and_names():if isinstance(m, nn.Dense):m.weight.set_data(Tensor(np.random.normal(0, 0.01, m.weight.data.shape).astype("float32")))if m.bias is not None:m.bias.set_data(Tensor(np.zeros(m.bias.data.shape, dtype="float32")))@propertydef get_head(self):return self.headclass MobileNetV2(nn.Cell):"""MobileNetV2 architecture.Args:class_num (int): number of classes.width_mult (int): Channels multiplier for round to 8/16 and others. Default is 1.has_dropout (bool): Is dropout used. Default is falseinverted_residual_setting (list): Inverted residual settings. Default is Noneround_nearest (list): Channel round to . Default is 8Returns:Tensor, output tensor.Examples:>>> MobileNetV2(backbone, head)"""def __init__(self, num_classes=1000, width_mult=1., has_dropout=False, inverted_residual_setting=None, \round_nearest=8, input_channel=32, last_channel=1280):super(MobileNetV2, self).__init__()self.backbone = MobileNetV2Backbone(width_mult=width_mult, \inverted_residual_setting=inverted_residual_setting, \round_nearest=round_nearest, input_channel=input_channel, last_channel=last_channel).get_featuresself.head = MobileNetV2Head(input_channel=self.backbone.out_channel, num_classes=num_classes, \has_dropout=has_dropout).get_headdef construct(self, x):x = self.backbone(x)x = self.head(x)return xclass MobileNetV2Combine(nn.Cell):"""MobileNetV2Combine architecture.Args:backbone (Cell): the features extract layers.head (Cell): the fully connected layers.Returns:Tensor, output tensor.Examples:>>> MobileNetV2(num_classes=1000)"""def __init__(self, backbone, head):super(MobileNetV2Combine, self).__init__(auto_prefix=False)self.backbone = backboneself.head = headdef construct(self, x):x = self.backbone(x)x = self.head(x)return xdef mobilenet_v2(backbone, head):return MobileNetV2Combine(backbone, head)
MobileNetV2模型的训练与测试
训练策略
采用cosine decay下降策略作为模型训练学习率下降策略
def cosine_decay(total_steps, lr_init=0.0, lr_end=0.0, lr_max=0.1, warmup_steps=0):"""Applies cosine decay to generate learning rate array.Args:total_steps(int): all steps in training.lr_init(float): init learning rate.lr_end(float): end learning ratelr_max(float): max learning rate.warmup_steps(int): all steps in warmup epochs.Returns:list, learning rate array."""lr_init, lr_end, lr_max = float(lr_init), float(lr_end), float(lr_max)decay_steps = total_steps - warmup_stepslr_all_steps = []inc_per_step = (lr_max - lr_init) / warmup_steps if warmup_steps else 0for i in range(total_steps):if i < warmup_steps:lr = lr_init + inc_per_step * (i + 1)else:cosine_decay = 0.5 * (1 + math.cos(math.pi * (i - warmup_steps) / decay_steps))lr = (lr_max - lr_end) * cosine_decay + lr_endlr_all_steps.append(lr)return lr_all_steps
加载ImageNet数据上预训练的MobileNetv2进行Fine-tuning,只训练最后修改的FC层,并在训练过程中保存Checkpoint。
def switch_precision(net, data_type):if ms.get_context('device_target') == "Ascend":net.to_float(data_type)for _, cell in net.cells_and_names():if isinstance(cell, nn.Dense):cell.to_float(ms.float32)
模型训练与测试
在训练MobileNetV2之前对MobileNetV2Backbone层的参数进行了固定,使其在训练过程中对该模块的权重参数不进行更新;只对MobileNetV2Head模块的参数进行更新。
损失函数采用SoftmaxCrossEntropyWithLogits损失函数。
from mindspore.amp import FixedLossScaleManager
import time
LOSS_SCALE = 1024train_dataset = create_dataset(dataset_path=config.dataset_path, config=config)
eval_dataset = create_dataset(dataset_path=config.dataset_path, config=config)
step_size = train_dataset.get_dataset_size()backbone = MobileNetV2Backbone() #last_channel=config.backbone_out_channels
# Freeze parameters of backbone. You can comment these two lines.
for param in backbone.get_parameters():param.requires_grad = False
# load parameters from pretrained model
load_checkpoint(config.pretrained_ckpt, backbone)head = MobileNetV2Head(input_channel=backbone.out_channels, num_classes=config.num_classes)
network = mobilenet_v2(backbone, head)# define loss, optimizer, and model
loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')
loss_scale = FixedLossScaleManager(LOSS_SCALE, drop_overflow_update=False)
lrs = cosine_decay(config.epochs * step_size, lr_max=config.lr_max)
opt = nn.Momentum(network.trainable_params(), lrs, config.momentum, config.weight_decay, loss_scale=LOSS_SCALE)# 定义用于训练的train_loop函数。
def train_loop(model, dataset, loss_fn, optimizer):# 定义正向计算函数def forward_fn(data, label):logits = model(data)loss = loss_fn(logits, label)return loss# 定义微分函数,使用mindspore.value_and_grad获得微分函数grad_fn,输出loss和梯度。# 由于是对模型参数求导,grad_position 配置为None,传入可训练参数。grad_fn = ms.value_and_grad(forward_fn, None, optimizer.parameters)# 定义 one-step training函数def train_step(data, label):loss, grads = grad_fn(data, label)optimizer(grads)return losssize = dataset.get_dataset_size()model.set_train()for batch, (data, label) in enumerate(dataset.create_tuple_iterator()):loss = train_step(data, label)if batch % 10 == 0:loss, current = loss.asnumpy(), batchprint(f"loss: {loss:>7f} [{current:>3d}/{size:>3d}]")# 定义用于测试的test_loop函数。
def test_loop(model, dataset, loss_fn):num_batches = dataset.get_dataset_size()model.set_train(False)total, test_loss, correct = 0, 0, 0for data, label in dataset.create_tuple_iterator():pred = model(data)total += len(data)test_loss += loss_fn(pred, label).asnumpy()correct += (pred.argmax(1) == label).asnumpy().sum()test_loss /= num_batchescorrect /= totalprint(f"Test: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")print("============== Starting Training ==============")
# 由于时间问题,训练过程只进行了2个epoch ,可以根据需求调整。
epoch_begin_time = time.time()
epochs = 2
for t in range(epochs):begin_time = time.time()print(f"Epoch {t+1}\n-------------------------------")train_loop(network, train_dataset, loss, opt)ms.save_checkpoint(network, "save_mobilenetV2_model.ckpt")end_time = time.time()times = end_time - begin_timeprint(f"per epoch time: {times}s")test_loop(network, eval_dataset, loss)
epoch_end_time = time.time()
times = epoch_end_time - epoch_begin_time
print(f"total time: {times}s")
print("============== Training Success ==============")
模型推理
加载模型,使用load_checkpoint接口加载数据时,需要把数据传入给原始网络,而不能传递给带有优化器和损失函数的训练网络。
CKPT="save_mobilenetV2_model.ckpt"def image_process(image):"""Precess one image per time.Args:image: shape (H, W, C)"""mean=[0.485*255, 0.456*255, 0.406*255]std=[0.229*255, 0.224*255, 0.225*255]image = (np.array(image) - mean) / stdimage = image.transpose((2,0,1))img_tensor = Tensor(np.array([image], np.float32))return img_tensordef infer_one(network, image_path):image = Image.open(image_path).resize((config.image_height, config.image_width))logits = network(image_process(image))pred = np.argmax(logits.asnumpy(), axis=1)[0]print(image_path, class_en[pred])def infer():backbone = MobileNetV2Backbone(last_channel=config.backbone_out_channels)head = MobileNetV2Head(input_channel=backbone.out_channels, num_classes=config.num_classes)network = mobilenet_v2(backbone, head)load_checkpoint(CKPT, network)for i in range(91, 100):infer_one(network, f'data_en/test/Cardboard/000{i}.jpg')
infer()
导出AIR/GEIR/ONNX模型文件
backbone = MobileNetV2Backbone(last_channel=config.backbone_out_channels)
head = MobileNetV2Head(input_channel=backbone.out_channels, num_classes=config.num_classes)
network = mobilenet_v2(backbone, head)
load_checkpoint(CKPT, network)input = np.random.uniform(0.0, 1.0, size=[1, 3, 224, 224]).astype(np.float32)
# export(network, Tensor(input), file_name='mobilenetv2.air', file_format='AIR')
# export(network, Tensor(input), file_name='mobilenetv2.pb', file_format='GEIR')
export(network, Tensor(input), file_name='mobilenetv2.onnx', file_format='ONNX')
总结
MovileNetV2网络使用深度可分离卷积的思想在准确率小幅度降低的前提下,大大减小了模型参数与运算量,同时还突出使用倒残差结构和Linear Bottlenecks来设计网络,以提高模型准确率并使优化后的模型更小。