YOLOv8融合改进 更换检测头为Detect_DyHead同时添加C2f-EMSC和C2f-EMSCP模块

一、Detect_DyHead检测头和C2f-EMSC,C2f-EMSCP模块

详细介绍和代码在往期的博客里:

Detect_DyHead:

(YOLOv8改进检测头Detect为Detect_Dyhead-CSDN博客)

C2f-EMSC和C2f-EMSCP:

(YOLOv8改进之多尺度转换模块C2f-EMSC和C2f-EMSCP-CSDN博客)

二、算法实现

1、将检测头和C2f的模块融合:

ultralytics\ultralytics\nn\other_modules文件夹中要是dyhead检测头要用到的kernel_warehouse.py(开头提到的博客中包含py文件的详细代码)

ultralytics\ultralytics\nn\other_modules\block.py中的代码为:

import torch
import torch.nn as nn
import torch.nn.functional as F
from ultralytics.nn.modules import Convfrom ..modules.block import *
from einops import rearrange__all__ = ['DyHeadBlock','C2f_EMSC', 'C2f_EMSCP']def autopad(k, p=None, d=1):  # kernel, padding, dilation"""Pad to 'same' shape outputs."""if d > 1:k = d * (k - 1) + 1 if isinstance(k, int) else [d * (x - 1) + 1 for x in k]  # actual kernel-sizeif p is None:p = k // 2 if isinstance(k, int) else [x // 2 for x in k]  # auto-padreturn p##########     添加DyHead        ###########
try:from mmcv.cnn import build_activation_layer, build_norm_layerfrom mmcv.ops.modulated_deform_conv import ModulatedDeformConv2dfrom mmengine.model import constant_init, normal_init
except ImportError:passdef _make_divisible(v, divisor, min_value=None):if min_value is None:min_value = divisornew_v = max(min_value, int(v + divisor / 2) // divisor * divisor)# Make sure that round down does not go down by more than 10%.if new_v < 0.9 * v:new_v += divisorreturn new_vclass swish(nn.Module):def forward(self, x):return x * torch.sigmoid(x)class h_swish(nn.Module):def __init__(self, inplace=False):super(h_swish, self).__init__()self.inplace = inplacedef forward(self, x):return x * F.relu6(x + 3.0, inplace=self.inplace) / 6.0class h_sigmoid(nn.Module):def __init__(self, inplace=True, h_max=1):super(h_sigmoid, self).__init__()self.relu = nn.ReLU6(inplace=inplace)self.h_max = h_maxdef forward(self, x):return self.relu(x + 3) * self.h_max / 6class DyReLU(nn.Module):def __init__(self, inp, reduction=4, lambda_a=1.0, K2=True, use_bias=True, use_spatial=False,init_a=[1.0, 0.0], init_b=[0.0, 0.0]):super(DyReLU, self).__init__()self.oup = inpself.lambda_a = lambda_a * 2self.K2 = K2self.avg_pool = nn.AdaptiveAvgPool2d(1)self.use_bias = use_biasif K2:self.exp = 4 if use_bias else 2else:self.exp = 2 if use_bias else 1self.init_a = init_aself.init_b = init_b# determine squeezeif reduction == 4:squeeze = inp // reductionelse:squeeze = _make_divisible(inp // reduction, 4)# print('reduction: {}, squeeze: {}/{}'.format(reduction, inp, squeeze))# print('init_a: {}, init_b: {}'.format(self.init_a, self.init_b))self.fc = nn.Sequential(nn.Linear(inp, squeeze),nn.ReLU(inplace=True),nn.Linear(squeeze, self.oup * self.exp),h_sigmoid())if use_spatial:self.spa = nn.Sequential(nn.Conv2d(inp, 1, kernel_size=1),nn.BatchNorm2d(1),)else:self.spa = Nonedef forward(self, x):if isinstance(x, list):x_in = x[0]x_out = x[1]else:x_in = xx_out = xb, c, h, w = x_in.size()y = self.avg_pool(x_in).view(b, c)y = self.fc(y).view(b, self.oup * self.exp, 1, 1)if self.exp == 4:a1, b1, a2, b2 = torch.split(y, self.oup, dim=1)a1 = (a1 - 0.5) * self.lambda_a + self.init_a[0]  # 1.0a2 = (a2 - 0.5) * self.lambda_a + self.init_a[1]b1 = b1 - 0.5 + self.init_b[0]b2 = b2 - 0.5 + self.init_b[1]out = torch.max(x_out * a1 + b1, x_out * a2 + b2)elif self.exp == 2:if self.use_bias:  # bias but not PLa1, b1 = torch.split(y, self.oup, dim=1)a1 = (a1 - 0.5) * self.lambda_a + self.init_a[0]  # 1.0b1 = b1 - 0.5 + self.init_b[0]out = x_out * a1 + b1else:a1, a2 = torch.split(y, self.oup, dim=1)a1 = (a1 - 0.5) * self.lambda_a + self.init_a[0]  # 1.0a2 = (a2 - 0.5) * self.lambda_a + self.init_a[1]out = torch.max(x_out * a1, x_out * a2)elif self.exp == 1:a1 = ya1 = (a1 - 0.5) * self.lambda_a + self.init_a[0]  # 1.0out = x_out * a1if self.spa:ys = self.spa(x_in).view(b, -1)ys = F.softmax(ys, dim=1).view(b, 1, h, w) * h * wys = F.hardtanh(ys, 0, 3, inplace=True)/3out = out * ysreturn outclass DyDCNv2(nn.Module):"""ModulatedDeformConv2d with normalization layer used in DyHead.This module cannot be configured with `conv_cfg=dict(type='DCNv2')`because DyHead calculates offset and mask from middle-level feature.Args:in_channels (int): Number of input channels.out_channels (int): Number of output channels.stride (int | tuple[int], optional): Stride of the convolution.Default: 1.norm_cfg (dict, optional): Config dict for normalization layer.Default: dict(type='GN', num_groups=16, requires_grad=True)."""def __init__(self,in_channels,out_channels,stride=1,norm_cfg=dict(type='GN', num_groups=16, requires_grad=True)):super().__init__()self.with_norm = norm_cfg is not Nonebias = not self.with_normself.conv = ModulatedDeformConv2d(in_channels, out_channels, 3, stride=stride, padding=1, bias=bias)if self.with_norm:self.norm = build_norm_layer(norm_cfg, out_channels)[1]def forward(self, x, offset, mask):"""Forward function."""x = self.conv(x.contiguous(), offset, mask)if self.with_norm:x = self.norm(x)return x
class DyHeadBlock(nn.Module):"""DyHead Block with three types of attention.HSigmoid arguments in default act_cfg follow official code, not paper.https://github.com/microsoft/DynamicHead/blob/master/dyhead/dyrelu.py"""def __init__(self,in_channels,norm_type='GN',zero_init_offset=True,act_cfg=dict(type='HSigmoid', bias=3.0, divisor=6.0)):super().__init__()self.zero_init_offset = zero_init_offset# (offset_x, offset_y, mask) * kernel_size_y * kernel_size_xself.offset_and_mask_dim = 3 * 3 * 3self.offset_dim = 2 * 3 * 3if norm_type == 'GN':norm_dict = dict(type='GN', num_groups=16, requires_grad=True)elif norm_type == 'BN':norm_dict = dict(type='BN', requires_grad=True)self.spatial_conv_high = DyDCNv2(in_channels, in_channels, norm_cfg=norm_dict)self.spatial_conv_mid = DyDCNv2(in_channels, in_channels)self.spatial_conv_low = DyDCNv2(in_channels, in_channels, stride=2)self.spatial_conv_offset = nn.Conv2d(in_channels, self.offset_and_mask_dim, 3, padding=1)self.scale_attn_module = nn.Sequential(nn.AdaptiveAvgPool2d(1), nn.Conv2d(in_channels, 1, 1),nn.ReLU(inplace=True), build_activation_layer(act_cfg))self.task_attn_module = DyReLU(in_channels)self._init_weights()def _init_weights(self):for m in self.modules():if isinstance(m, nn.Conv2d):normal_init(m, 0, 0.01)if self.zero_init_offset:constant_init(self.spatial_conv_offset, 0)def forward(self, x):"""Forward function."""outs = []for level in range(len(x)):# calculate offset and mask of DCNv2 from middle-level featureoffset_and_mask = self.spatial_conv_offset(x[level])offset = offset_and_mask[:, :self.offset_dim, :, :]mask = offset_and_mask[:, self.offset_dim:, :, :].sigmoid()mid_feat = self.spatial_conv_mid(x[level], offset, mask)sum_feat = mid_feat * self.scale_attn_module(mid_feat)summed_levels = 1if level > 0:low_feat = self.spatial_conv_low(x[level - 1], offset, mask)sum_feat += low_feat * self.scale_attn_module(low_feat)summed_levels += 1if level < len(x) - 1:# this upsample order is weird, but faster than natural order# https://github.com/microsoft/DynamicHead/issues/25high_feat = F.interpolate(self.spatial_conv_high(x[level + 1], offset, mask),size=x[level].shape[-2:],mode='bilinear',align_corners=True)sum_feat += high_feat * self.scale_attn_module(high_feat)summed_levels += 1outs.append(self.task_attn_module(sum_feat / summed_levels))return outs######         添加EMSConv和EMSConvP            ######class EMSConv(nn.Module):# Efficient Multi-Scale Convdef __init__(self, channel=256, kernels=[3, 5]):super().__init__()self.groups = len(kernels)min_ch = channel // 4assert min_ch >= 16, f'channel must Greater than {64}, but {channel}'self.convs = nn.ModuleList([])for ks in kernels:self.convs.append(Conv(c1=min_ch, c2=min_ch, k=ks))self.conv_1x1 = Conv(channel, channel, k=1)def forward(self, x):_, c, _, _ = x.size()x_cheap, x_group = torch.split(x, [c // 2, c // 2], dim=1)x_group = rearrange(x_group, 'bs (g ch) h w -> bs ch h w g', g=self.groups)x_group = torch.stack([self.convs[i](x_group[..., i]) for i in range(len(self.convs))])x_group = rearrange(x_group, 'g bs ch h w -> bs (g ch) h w')x = torch.cat([x_cheap, x_group], dim=1)x = self.conv_1x1(x)return xclass EMSConvP(nn.Module):# Efficient Multi-Scale Conv Plusdef __init__(self, channel=256, kernels=[1, 3, 5, 7]):super().__init__()self.groups = len(kernels)min_ch = channel // self.groupsassert min_ch >= 16, f'channel must Greater than {16 * self.groups}, but {channel}'self.convs = nn.ModuleList([])for ks in kernels:self.convs.append(Conv(c1=min_ch, c2=min_ch, k=ks))self.conv_1x1 = Conv(channel, channel, k=1)def forward(self, x):x_group = rearrange(x, 'bs (g ch) h w -> bs ch h w g', g=self.groups)x_convs = torch.stack([self.convs[i](x_group[..., i]) for i in range(len(self.convs))])x_convs = rearrange(x_convs, 'g bs ch h w -> bs (g ch) h w')x_convs = self.conv_1x1(x_convs)return x_convsclass Bottleneck_EMSC(Bottleneck):def __init__(self, c1, c2, shortcut=True, g=1, k=(3, 3), e=0.5):super().__init__(c1, c2, shortcut, g, k, e)c_ = int(c2 * e)  # hidden channelsself.cv1 = Conv(c1, c_, k[0], 1)self.cv2 = EMSConv(c2)class C2f_EMSC(C2f):def __init__(self, c1, c2, n=1, shortcut=False, g=1, e=0.5):super().__init__(c1, c2, n, shortcut, g, e)self.m = nn.ModuleList(Bottleneck_EMSC(self.c, self.c, shortcut, g, k=(3, 3), e=1.0) for _ in range(n))class Bottleneck_EMSCP(Bottleneck):def __init__(self, c1, c2, shortcut=True, g=1, k=(3, 3), e=0.5):super().__init__(c1, c2, shortcut, g, k, e)c_ = int(c2 * e)  # hidden channelsself.cv1 = Conv(c1, c_, k[0], 1)self.cv2 = EMSConvP(c2)class C2f_EMSCP(C2f):def __init__(self, c1, c2, n=1, shortcut=False, g=1, e=0.5):super().__init__(c1, c2, n, shortcut, g, e)self.m = nn.ModuleList(Bottleneck_EMSCP(self.c, self.c, shortcut, g, k=(3, 3), e=1.0) for _ in range(n))

检测头head.py文件中的代码为:

import math
import torch
import torch.nn as nnfrom . import DyHeadBlock
from ..modules import Conv, DFLfrom ultralytics.utils.tal import dist2bbox, make_anchors__all__ = ['DetectAux','Detect_DyHead']class DetectAux(nn.Module):"""YOLOv8 Detect head with Aux Head for detection models."""dynamic = False  # force grid reconstructionexport = False  # export modeshape = Noneanchors = torch.empty(0)  # initstrides = torch.empty(0)  # initdef __init__(self, nc=80, ch=()):  # detection layersuper().__init__()self.nc = nc  # number of classesself.nl = len(ch) // 2  # number of detection layersself.reg_max = 16  # DFL channels (ch[0] // 16 to scale 4/8/12/16/20 for n/s/m/l/x)self.no = nc + self.reg_max * 4  # number of outputs per anchorself.stride = torch.zeros(self.nl)  # strides computed during buildc2, c3 = max((16, ch[0] // 4, self.reg_max * 4)), max(ch[0], self.nc)  # channelsself.cv2 = nn.ModuleList(nn.Sequential(Conv(x, c2, 3), Conv(c2, c2, 3), nn.Conv2d(c2, 4 * self.reg_max, 1)) for x in ch[:self.nl])self.cv3 = nn.ModuleList(nn.Sequential(Conv(x, c3, 3), Conv(c3, c3, 3), nn.Conv2d(c3, self.nc, 1)) for x in ch[:self.nl])self.dfl = DFL(self.reg_max) if self.reg_max > 1 else nn.Identity()self.cv4 = nn.ModuleList(nn.Sequential(Conv(x, c2, 3), Conv(c2, c2, 3), nn.Conv2d(c2, 4 * self.reg_max, 1)) for x in ch[self.nl:])self.cv5 = nn.ModuleList(nn.Sequential(Conv(x, c3, 3), Conv(c3, c3, 3), nn.Conv2d(c3, self.nc, 1)) for x in ch[self.nl:])self.dfl_aux = DFL(self.reg_max) if self.reg_max > 1 else nn.Identity()def forward(self, x):"""Concatenates and returns predicted bounding boxes and class probabilities."""shape = x[0].shape  # BCHWfor i in range(self.nl):x[i] = torch.cat((self.cv2[i](x[i]), self.cv3[i](x[i])), 1)if self.training:for i in range(self.nl, 2 * self.nl):x[i] = torch.cat((self.cv4[i - self.nl](x[i]), self.cv5[i - self.nl](x[i])), 1)return xelif self.dynamic or self.shape != shape:if hasattr(self, 'dfl_aux'):for i in range(self.nl, 2 * self.nl):x[i] = torch.cat((self.cv4[i - self.nl](x[i]), self.cv5[i - self.nl](x[i])), 1)self.anchors, self.strides = (x.transpose(0, 1) for x in make_anchors(x[:self.nl], self.stride, 0.5))self.shape = shapex_cat = torch.cat([xi.view(shape[0], self.no, -1) for xi in x[:self.nl]], 2)if self.export and self.format in ('saved_model', 'pb', 'tflite', 'edgetpu', 'tfjs'):  # avoid TF FlexSplitV opsbox = x_cat[:, :self.reg_max * 4]cls = x_cat[:, self.reg_max * 4:]else:box, cls = x_cat.split((self.reg_max * 4, self.nc), 1)dbox = dist2bbox(self.dfl(box), self.anchors.unsqueeze(0), xywh=True, dim=1) * self.stridesy = torch.cat((dbox, cls.sigmoid()), 1)return y if self.export else (y, x[:self.nl])def bias_init(self):"""Initialize Detect() biases, WARNING: requires stride availability."""m = self  # self.model[-1]  # Detect() module# cf = torch.bincount(torch.tensor(np.concatenate(dataset.labels, 0)[:, 0]).long(), minlength=nc) + 1# ncf = math.log(0.6 / (m.nc - 0.999999)) if cf is None else torch.log(cf / cf.sum())  # nominal class frequencyfor a, b, s in zip(m.cv2, m.cv3, m.stride):  # froma[-1].bias.data[:] = 1.0  # boxb[-1].bias.data[:m.nc] = math.log(5 / m.nc / (640 / s) ** 2)  # cls (.01 objects, 80 classes, 640 img)for a, b, s in zip(m.cv4, m.cv5, m.stride):  # froma[-1].bias.data[:] = 1.0  # boxb[-1].bias.data[:m.nc] = math.log(5 / m.nc / (640 / s) ** 2)  # cls (.01 objects, 80 classes, 640 img)def switch_to_deploy(self):del self.cv4, self.cv5, self.dfl_auxclass Detect_DyHead(nn.Module):"""YOLOv8 Detect head with DyHead for detection models."""dynamic = False  # force grid reconstructionexport = False  # export modeshape = Noneanchors = torch.empty(0)  # initstrides = torch.empty(0)  # initdef __init__(self, nc=80, hidc=256, block_num=2, ch=()):  # detection layersuper().__init__()self.nc = nc  # number of classesself.nl = len(ch)  # number of detection layersself.reg_max = 16  # DFL channels (ch[0] // 16 to scale 4/8/12/16/20 for n/s/m/l/x)self.no = nc + self.reg_max * 4  # number of outputs per anchorself.stride = torch.zeros(self.nl)  # strides computed during buildc2, c3 = max((16, ch[0] // 4, self.reg_max * 4)), max(ch[0], self.nc)  # channelsself.conv = nn.ModuleList(nn.Sequential(Conv(x, hidc, 1)) for x in ch)self.dyhead = nn.Sequential(*[DyHeadBlock(hidc) for i in range(block_num)])self.cv2 = nn.ModuleList(nn.Sequential(Conv(hidc, c2, 3), Conv(c2, c2, 3), nn.Conv2d(c2, 4 * self.reg_max, 1)) for _ in ch)self.cv3 = nn.ModuleList(nn.Sequential(Conv(hidc, c3, 3), Conv(c3, c3, 3), nn.Conv2d(c3, self.nc, 1)) for _ in ch)self.dfl = DFL(self.reg_max) if self.reg_max > 1 else nn.Identity()def forward(self, x):"""Concatenates and returns predicted bounding boxes and class probabilities."""for i in range(self.nl):x[i] = self.conv[i](x[i])x = self.dyhead(x)shape = x[0].shape  # BCHWfor i in range(self.nl):x[i] = torch.cat((self.cv2[i](x[i]), self.cv3[i](x[i])), 1)if self.training:return xelif self.dynamic or self.shape != shape:self.anchors, self.strides = (x.transpose(0, 1) for x in make_anchors(x, self.stride, 0.5))self.shape = shapex_cat = torch.cat([xi.view(shape[0], self.no, -1) for xi in x], 2)if self.export and self.format in ('saved_model', 'pb', 'tflite', 'edgetpu', 'tfjs'):  # avoid TF FlexSplitV opsbox = x_cat[:, :self.reg_max * 4]cls = x_cat[:, self.reg_max * 4:]else:box, cls = x_cat.split((self.reg_max * 4, self.nc), 1)dbox = dist2bbox(self.dfl(box), self.anchors.unsqueeze(0), xywh=True, dim=1) * self.stridesy = torch.cat((dbox, cls.sigmoid()), 1)return y if self.export else (y, x)def bias_init(self):"""Initialize Detect() biases, WARNING: requires stride availability."""m = self  # self.model[-1]  # Detect() module# cf = torch.bincount(torch.tensor(np.concatenate(dataset.labels, 0)[:, 0]).long(), minlength=nc) + 1# ncf = math.log(0.6 / (m.nc - 0.999999)) if cf is None else torch.log(cf / cf.sum())  # nominal class frequencyfor a, b, s in zip(m.cv2, m.cv3, m.stride):  # froma[-1].bias.data[:] = 1.0  # boxb[-1].bias.data[:m.nc] = math.log(5 / m.nc / (640 / s) ** 2)  # cls (.01 objects, 80 classes, 640 img)

2、修改ultralytics\ultralytics\nn\tasks.py文件

具体修改的步骤在一中提到的两篇博客里有十分详细的步骤和代码修改。

两篇博客中的修改的地方不同,应该按着步骤,把两篇博客中tasks.py文件中的每一个修改过的地方对照修改,最终tasks.py文件的代码如下:

# Ultralytics YOLO 🚀, AGPL-3.0 licenseimport contextlib
from copy import deepcopy
from pathlib import Path
from ultralytics.nn.other_modules import *
import torch
import torch.nn as nn
from ultralytics.nn.modules import (AIFI, C1, C2, C3, C3TR, SPP, SPPF, Bottleneck, BottleneckCSP, C2f, C3Ghost, C3x,Classify, Concat, Conv, Conv2, ConvTranspose, Detect, DWConv, DWConvTranspose2d,Focus, GhostBottleneck, GhostConv, HGBlock, HGStem, Pose, RepC3, RepConv,RTDETRDecoder,Segment)
from ultralytics.nn.other_modules.kernel_warehouse import Warehouse_Manager
from ultralytics.utils import DEFAULT_CFG_DICT, DEFAULT_CFG_KEYS, LOGGER, colorstr, emojis, yaml_load
from ultralytics.utils.checks import check_requirements, check_suffix, check_yaml
from ultralytics.utils.loss import v8ClassificationLoss, v8DetectionLoss, v8PoseLoss, v8SegmentationLoss
from ultralytics.utils.plotting import feature_visualization
from ultralytics.utils.torch_utils import (fuse_conv_and_bn, fuse_deconv_and_bn, initialize_weights, intersect_dicts,make_divisible, model_info, scale_img, time_sync)try:import thop
except ImportError:thop = Noneclass BaseModel(nn.Module):"""The BaseModel class serves as a base class for all the models in the Ultralytics YOLO family."""def forward(self, x, *args, **kwargs):"""Forward pass of the model on a single scale. Wrapper for `_forward_once` method.Args:x (torch.Tensor | dict): The input image tensor or a dict including image tensor and gt labels.Returns:(torch.Tensor): The output of the network."""if isinstance(x, dict):  # for cases of training and validating while training.return self.loss(x, *args, **kwargs)return self.predict(x, *args, **kwargs)def predict(self, x, profile=False, visualize=False, augment=False):"""Perform a forward pass through the network.Args:x (torch.Tensor): The input tensor to the model.profile (bool):  Print the computation time of each layer if True, defaults to False.visualize (bool): Save the feature maps of the model if True, defaults to False.augment (bool): Augment image during prediction, defaults to False.Returns:(torch.Tensor): The last output of the model."""if augment:return self._predict_augment(x)return self._predict_once(x, profile, visualize)def _predict_once(self, x, profile=False, visualize=False):"""Perform a forward pass through the network.Args:x (torch.Tensor): The input tensor to the model.profile (bool):  Print the computation time of each layer if True, defaults to False.visualize (bool): Save the feature maps of the model if True, defaults to False.Returns:(torch.Tensor): The last output of the model."""y, dt = [], []  # outputsfor m in self.model:if m.f != -1:  # if not from previous layerx = y[m.f] if isinstance(m.f, int) else [x if j == -1 else y[j] for j in m.f]  # from earlier layersif profile:self._profile_one_layer(m, x, dt)if hasattr(m, 'backbone'):x = m(x)for _ in range(5 - len(x)):x.insert(0, None)for i_idx, i in enumerate(x):y.append(i if i_idx in self.save else None)x = x[-1]else:x = m(x)  # runy.append(x if m.i in self.save else None)  # save outputif visualize:feature_visualization(x, m.type, m.i, save_dir=visualize)return xdef _predict_augment(self, x):"""Perform augmentations on input image x and return augmented inference."""LOGGER.warning(f'WARNING ⚠️ {self.__class__.__name__} does not support augmented inference yet. 'f'Reverting to single-scale inference instead.')return self._predict_once(x)def _profile_one_layer(self, m, x, dt):"""Profile the computation time and FLOPs of a single layer of the model on a given input. Appends the results tothe provided list.Args:m (nn.Module): The layer to be profiled.x (torch.Tensor): The input data to the layer.dt (list): A list to store the computation time of the layer.Returns:None"""c = m == self.model[-1] and isinstance(x, list)  # is final layer list, copy input as inplace fixflops = thop.profile(m, inputs=[x.copy() if c else x], verbose=False)[0] / 1E9 * 2 if thop else 0  # FLOPst = time_sync()for _ in range(10):m(x.copy() if c else x)dt.append((time_sync() - t) * 100)if m == self.model[0]:LOGGER.info(f"{'time (ms)':>10s} {'GFLOPs':>10s} {'params':>10s}  module")LOGGER.info(f'{dt[-1]:10.2f} {flops:10.2f} {m.np:10.0f}  {m.type}')if c:LOGGER.info(f"{sum(dt):10.2f} {'-':>10s} {'-':>10s}  Total")def fuse(self, verbose=True):"""Fuse the `Conv2d()` and `BatchNorm2d()` layers of the model into a single layer, in order to improve thecomputation efficiency.Returns:(nn.Module): The fused model is returned."""if not self.is_fused():for m in self.model.modules():if isinstance(m, (Conv, Conv2, DWConv)) and hasattr(m, 'bn'):if isinstance(m, Conv2):m.fuse_convs()m.conv = fuse_conv_and_bn(m.conv, m.bn)  # update convdelattr(m, 'bn')  # remove batchnormm.forward = m.forward_fuse  # update forwardif isinstance(m, ConvTranspose) and hasattr(m, 'bn'):m.conv_transpose = fuse_deconv_and_bn(m.conv_transpose, m.bn)delattr(m, 'bn')  # remove batchnormm.forward = m.forward_fuse  # update forwardif isinstance(m, RepConv):m.fuse_convs()m.forward = m.forward_fuse  # update forwardself.info(verbose=verbose)return selfdef is_fused(self, thresh=10):"""Check if the model has less than a certain threshold of BatchNorm layers.Args:thresh (int, optional): The threshold number of BatchNorm layers. Default is 10.Returns:(bool): True if the number of BatchNorm layers in the model is less than the threshold, False otherwise."""bn = tuple(v for k, v in nn.__dict__.items() if 'Norm' in k)  # normalization layers, i.e. BatchNorm2d()return sum(isinstance(v, bn) for v in self.modules()) < thresh  # True if < 'thresh' BatchNorm layers in modeldef info(self, detailed=False, verbose=True, imgsz=640):"""Prints model information.Args:detailed (bool): if True, prints out detailed information about the model. Defaults to Falseverbose (bool): if True, prints out the model information. Defaults to Falseimgsz (int): the size of the image that the model will be trained on. Defaults to 640"""return model_info(self, detailed=detailed, verbose=verbose, imgsz=imgsz)def _apply(self, fn):"""Applies a function to all the tensors in the model that are not parameters or registered buffers.Args:fn (function): the function to apply to the modelReturns:(BaseModel): An updated BaseModel object."""self = super()._apply(fn)m = self.model[-1]  # Detect()if isinstance(m, (Detect, DetectAux,Detect_DyHead)):m.stride = fn(m.stride)m.anchors = fn(m.anchors)m.strides = fn(m.strides)return selfdef load(self, weights, verbose=True):"""Load the weights into the model.Args:weights (dict | torch.nn.Module): The pre-trained weights to be loaded.verbose (bool, optional): Whether to log the transfer progress. Defaults to True."""model = weights['model'] if isinstance(weights, dict) else weights  # torchvision models are not dictscsd = model.float().state_dict()  # checkpoint state_dict as FP32csd = intersect_dicts(csd, self.state_dict())  # intersectself.load_state_dict(csd, strict=False)  # loadif verbose:LOGGER.info(f'Transferred {len(csd)}/{len(self.model.state_dict())} items from pretrained weights')def loss(self, batch, preds=None):"""Compute loss.Args:batch (dict): Batch to compute loss onpreds (torch.Tensor | List[torch.Tensor]): Predictions."""if not hasattr(self, 'criterion'):self.criterion = self.init_criterion()preds = self.forward(batch['img']) if preds is None else predsreturn self.criterion(preds, batch)def init_criterion(self):"""Initialize the loss criterion for the BaseModel."""raise NotImplementedError('compute_loss() needs to be implemented by task heads')class DetectionModel(BaseModel):"""YOLOv8 detection model."""def __init__(self, cfg='yolov8n.yaml', ch=3, nc=None, verbose=True):  # model, input channels, number of classes"""Initialize the YOLOv8 detection model with the given config and parameters."""super().__init__()self.yaml = cfg if isinstance(cfg, dict) else yaml_model_load(cfg)  # cfg dict# Warehouse_Managerwarehouse_manager_flag = self.yaml.get('Warehouse_Manager', False)self.warehouse_manager = Noneif warehouse_manager_flag:self.warehouse_manager = Warehouse_Manager(cell_num_ratio=self.yaml.get('Warehouse_Manager_Ratio', 1.0))# Define modelch = self.yaml['ch'] = self.yaml.get('ch', ch)  # input channelsif nc and nc != self.yaml['nc']:LOGGER.info(f"Overriding model.yaml nc={self.yaml['nc']} with nc={nc}")self.yaml['nc'] = nc  # override YAML valueself.model, self.save = parse_model(deepcopy(self.yaml), ch=ch, verbose=verbose,warehouse_manager=self.warehouse_manager)  # model, savelistself.names = {i: f'{i}' for i in range(self.yaml['nc'])}  # default names dictself.inplace = self.yaml.get('inplace', True)if warehouse_manager_flag:self.warehouse_manager.store()self.warehouse_manager.allocate(self)self.net_update_temperature(0)# Build stridesm = self.model[-1]  # Detect()if isinstance(m, (Detect,  DetectAux, Segment,  Pose,Detect_DyHead)):s = 640  # 2x min stridem.inplace = self.inplaceif isinstance(m, (DetectAux,)):forward = lambda x: self.forward(x)[:3]else:forward = lambda x: self.forward(x)[0] if isinstance(m, (Segment,  Pose)) else self.forward(x)try:m.stride = torch.tensor([s / x.shape[-2] for x in forward(torch.zeros(2, ch, s, s))])  # forwardexcept RuntimeError as e:if 'Not implemented on the CPU' in str(e):self.model.to(torch.device('cuda'))m.stride = torch.tensor([s / x.shape[-2] for x inforward(torch.zeros(2, ch, s, s).to(torch.device('cuda')))])  # forwardelse:raise eself.stride = m.stridem.bias_init()  # only run onceelse:self.stride = torch.Tensor([32])  # default stride for i.e. RTDETR# Init weights, biasesinitialize_weights(self)if verbose:self.info()LOGGER.info('')def _predict_augment(self, x):"""Perform augmentations on input image x and return augmented inference and train outputs."""img_size = x.shape[-2:]  # height, widths = [1, 0.83, 0.67]  # scalesf = [None, 3, None]  # flips (2-ud, 3-lr)y = []  # outputsfor si, fi in zip(s, f):xi = scale_img(x.flip(fi) if fi else x, si, gs=int(self.stride.max()))yi = super().predict(xi)[0]  # forwardyi = self._descale_pred(yi, fi, si, img_size)y.append(yi)y = self._clip_augmented(y)  # clip augmented tailsreturn torch.cat(y, -1), None  # augmented inference, train@staticmethoddef _descale_pred(p, flips, scale, img_size, dim=1):"""De-scale predictions following augmented inference (inverse operation)."""p[:, :4] /= scale  # de-scalex, y, wh, cls = p.split((1, 1, 2, p.shape[dim] - 4), dim)if flips == 2:y = img_size[0] - y  # de-flip udelif flips == 3:x = img_size[1] - x  # de-flip lrreturn torch.cat((x, y, wh, cls), dim)def _clip_augmented(self, y):"""Clip YOLO augmented inference tails."""nl = self.model[-1].nl  # number of detection layers (P3-P5)g = sum(4 ** x for x in range(nl))  # grid pointse = 1  # exclude layer counti = (y[0].shape[-1] // g) * sum(4 ** x for x in range(e))  # indicesy[0] = y[0][..., :-i]  # largei = (y[-1].shape[-1] // g) * sum(4 ** (nl - 1 - x) for x in range(e))  # indicesy[-1] = y[-1][..., i:]  # smallreturn ydef init_criterion(self):"""Initialize the loss criterion for the DetectionModel."""return v8DetectionLoss(self)def net_update_temperature(self, temp):for m in self.modules():if hasattr(m, "update_temperature"):m.update_temperature(temp)class SegmentationModel(DetectionModel):"""YOLOv8 segmentation model."""def __init__(self, cfg='yolov8n-seg.yaml', ch=3, nc=None, verbose=True):"""Initialize YOLOv8 segmentation model with given config and parameters."""super().__init__(cfg=cfg, ch=ch, nc=nc, verbose=verbose)def init_criterion(self):"""Initialize the loss criterion for the SegmentationModel."""return v8SegmentationLoss(self)class PoseModel(DetectionModel):"""YOLOv8 pose model."""def __init__(self, cfg='yolov8n-pose.yaml', ch=3, nc=None, data_kpt_shape=(None, None), verbose=True):"""Initialize YOLOv8 Pose model."""if not isinstance(cfg, dict):cfg = yaml_model_load(cfg)  # load model YAMLif any(data_kpt_shape) and list(data_kpt_shape) != list(cfg['kpt_shape']):LOGGER.info(f"Overriding model.yaml kpt_shape={cfg['kpt_shape']} with kpt_shape={data_kpt_shape}")cfg['kpt_shape'] = data_kpt_shapesuper().__init__(cfg=cfg, ch=ch, nc=nc, verbose=verbose)def init_criterion(self):"""Initialize the loss criterion for the PoseModel."""return v8PoseLoss(self)class ClassificationModel(BaseModel):"""YOLOv8 classification model."""def __init__(self, cfg='yolov8n-cls.yaml', ch=3, nc=None, verbose=True):"""Init ClassificationModel with YAML, channels, number of classes, verbose flag."""super().__init__()self._from_yaml(cfg, ch, nc, verbose)def _from_yaml(self, cfg, ch, nc, verbose):"""Set YOLOv8 model configurations and define the model architecture."""self.yaml = cfg if isinstance(cfg, dict) else yaml_model_load(cfg)  # cfg dict# Define modelch = self.yaml['ch'] = self.yaml.get('ch', ch)  # input channelsif nc and nc != self.yaml['nc']:LOGGER.info(f"Overriding model.yaml nc={self.yaml['nc']} with nc={nc}")self.yaml['nc'] = nc  # override YAML valueelif not nc and not self.yaml.get('nc', None):raise ValueError('nc not specified. Must specify nc in model.yaml or function arguments.')self.model, self.save = parse_model(deepcopy(self.yaml), ch=ch, verbose=verbose)  # model, savelistself.stride = torch.Tensor([1])  # no stride constraintsself.names = {i: f'{i}' for i in range(self.yaml['nc'])}  # default names dictself.info()@staticmethoddef reshape_outputs(model, nc):"""Update a TorchVision classification model to class count 'n' if required."""name, m = list((model.model if hasattr(model, 'model') else model).named_children())[-1]  # last moduleif isinstance(m, Classify):  # YOLO Classify() headif m.linear.out_features != nc:m.linear = nn.Linear(m.linear.in_features, nc)elif isinstance(m, nn.Linear):  # ResNet, EfficientNetif m.out_features != nc:setattr(model, name, nn.Linear(m.in_features, nc))elif isinstance(m, nn.Sequential):types = [type(x) for x in m]if nn.Linear in types:i = types.index(nn.Linear)  # nn.Linear indexif m[i].out_features != nc:m[i] = nn.Linear(m[i].in_features, nc)elif nn.Conv2d in types:i = types.index(nn.Conv2d)  # nn.Conv2d indexif m[i].out_channels != nc:m[i] = nn.Conv2d(m[i].in_channels, nc, m[i].kernel_size, m[i].stride, bias=m[i].bias is not None)def init_criterion(self):"""Initialize the loss criterion for the ClassificationModel."""return v8ClassificationLoss()class RTDETRDetectionModel(DetectionModel):"""RTDETR (Real-time DEtection and Tracking using Transformers) Detection Model class.This class is responsible for constructing the RTDETR architecture, defining loss functions, and facilitating boththe training and inference processes. RTDETR is an object detection and tracking model that extends from theDetectionModel base class.Attributes:cfg (str): The configuration file path or preset string. Default is 'rtdetr-l.yaml'.ch (int): Number of input channels. Default is 3 (RGB).nc (int, optional): Number of classes for object detection. Default is None.verbose (bool): Specifies if summary statistics are shown during initialization. Default is True.Methods:init_criterion: Initializes the criterion used for loss calculation.loss: Computes and returns the loss during training.predict: Performs a forward pass through the network and returns the output."""def __init__(self, cfg='rtdetr-l.yaml', ch=3, nc=None, verbose=True):"""Initialize the RTDETRDetectionModel.Args:cfg (str): Configuration file name or path.ch (int): Number of input channels.nc (int, optional): Number of classes. Defaults to None.verbose (bool, optional): Print additional information during initialization. Defaults to True."""super().__init__(cfg=cfg, ch=ch, nc=nc, verbose=verbose)def init_criterion(self):"""Initialize the loss criterion for the RTDETRDetectionModel."""from ultralytics.models.utils.loss import RTDETRDetectionLossreturn RTDETRDetectionLoss(nc=self.nc, use_vfl=True)def loss(self, batch, preds=None):"""Compute the loss for the given batch of data.Args:batch (dict): Dictionary containing image and label data.preds (torch.Tensor, optional): Precomputed model predictions. Defaults to None.Returns:(tuple): A tuple containing the total loss and main three losses in a tensor."""if not hasattr(self, 'criterion'):self.criterion = self.init_criterion()img = batch['img']# NOTE: preprocess gt_bbox and gt_labels to list.bs = len(img)batch_idx = batch['batch_idx']gt_groups = [(batch_idx == i).sum().item() for i in range(bs)]targets = {'cls': batch['cls'].to(img.device, dtype=torch.long).view(-1),'bboxes': batch['bboxes'].to(device=img.device),'batch_idx': batch_idx.to(img.device, dtype=torch.long).view(-1),'gt_groups': gt_groups}preds = self.predict(img, batch=targets) if preds is None else predsdec_bboxes, dec_scores, enc_bboxes, enc_scores, dn_meta = preds if self.training else preds[1]if dn_meta is None:dn_bboxes, dn_scores = None, Noneelse:dn_bboxes, dec_bboxes = torch.split(dec_bboxes, dn_meta['dn_num_split'], dim=2)dn_scores, dec_scores = torch.split(dec_scores, dn_meta['dn_num_split'], dim=2)dec_bboxes = torch.cat([enc_bboxes.unsqueeze(0), dec_bboxes])  # (7, bs, 300, 4)dec_scores = torch.cat([enc_scores.unsqueeze(0), dec_scores])loss = self.criterion((dec_bboxes, dec_scores),targets,dn_bboxes=dn_bboxes,dn_scores=dn_scores,dn_meta=dn_meta)# NOTE: There are like 12 losses in RTDETR, backward with all losses but only show the main three losses.return sum(loss.values()), torch.as_tensor([loss[k].detach() for k in ['loss_giou', 'loss_class', 'loss_bbox']],device=img.device)def predict(self, x, profile=False, visualize=False, batch=None, augment=False):"""Perform a forward pass through the model.Args:x (torch.Tensor): The input tensor.profile (bool, optional): If True, profile the computation time for each layer. Defaults to False.visualize (bool, optional): If True, save feature maps for visualization. Defaults to False.batch (dict, optional): Ground truth data for evaluation. Defaults to None.augment (bool, optional): If True, perform data augmentation during inference. Defaults to False.Returns:(torch.Tensor): Model's output tensor."""y, dt = [], []  # outputsfor m in self.model[:-1]:  # except the head partif m.f != -1:  # if not from previous layerx = y[m.f] if isinstance(m.f, int) else [x if j == -1 else y[j] for j in m.f]  # from earlier layersif profile:self._profile_one_layer(m, x, dt)x = m(x)  # runy.append(x if m.i in self.save else None)  # save outputif visualize:feature_visualization(x, m.type, m.i, save_dir=visualize)head = self.model[-1]x = head([y[j] for j in head.f], batch)  # head inferencereturn xclass Ensemble(nn.ModuleList):"""Ensemble of models."""def __init__(self):"""Initialize an ensemble of models."""super().__init__()def forward(self, x, augment=False, profile=False, visualize=False):"""Function generates the YOLO network's final layer."""y = [module(x, augment, profile, visualize)[0] for module in self]# y = torch.stack(y).max(0)[0]  # max ensemble# y = torch.stack(y).mean(0)  # mean ensembley = torch.cat(y, 2)  # nms ensemble, y shape(B, HW, C)return y, None  # inference, train output# Functions ------------------------------------------------------------------------------------------------------------@contextlib.contextmanager
def temporary_modules(modules=None):"""Context manager for temporarily adding or modifying modules in Python's module cache (`sys.modules`).This function can be used to change the module paths during runtime. It's useful when refactoring code,where you've moved a module from one location to another, but you still want to support the old importpaths for backwards compatibility.Args:modules (dict, optional): A dictionary mapping old module paths to new module paths.Example:```pythonwith temporary_modules({'old.module.path': 'new.module.path'}):import old.module.path  # this will now import new.module.path```Note:The changes are only in effect inside the context manager and are undone once the context manager exits.Be aware that directly manipulating `sys.modules` can lead to unpredictable results, especially in largerapplications or libraries. Use this function with caution."""if not modules:modules = {}import importlibimport systry:# Set modules in sys.modules under their old namefor old, new in modules.items():sys.modules[old] = importlib.import_module(new)yieldfinally:# Remove the temporary module pathsfor old in modules:if old in sys.modules:del sys.modules[old]def torch_safe_load(weight):"""This function attempts to load a PyTorch model with the torch.load() function. If a ModuleNotFoundError is raised,it catches the error, logs a warning message, and attempts to install the missing module via thecheck_requirements() function. After installation, the function again attempts to load the model using torch.load().Args:weight (str): The file path of the PyTorch model.Returns:(dict): The loaded PyTorch model."""from ultralytics.utils.downloads import attempt_download_assetcheck_suffix(file=weight, suffix='.pt')file = attempt_download_asset(weight)  # search online if missing locallytry:with temporary_modules({'ultralytics.yolo.utils': 'ultralytics.utils','ultralytics.yolo.v8': 'ultralytics.models.yolo','ultralytics.yolo.data': 'ultralytics.data'}):  # for legacy 8.0 Classify and Pose modelsreturn torch.load(file, map_location='cpu'), file  # loadexcept ModuleNotFoundError as e:  # e.name is missing module nameif e.name == 'models':raise TypeError(emojis(f'ERROR ❌️ {weight} appears to be an Ultralytics YOLOv5 model originally trained 'f'with https://github.com/ultralytics/yolov5.\nThis model is NOT forwards compatible with 'f'YOLOv8 at https://github.com/ultralytics/ultralytics.'f"\nRecommend fixes are to train a new model using the latest 'ultralytics' package or to "f"run a command with an official YOLOv8 model, i.e. 'yolo predict model=yolov8n.pt'")) from eLOGGER.warning(f"WARNING ⚠️ {weight} appears to require '{e.name}', which is not in ultralytics requirements."f"\nAutoInstall will run now for '{e.name}' but this feature will be removed in the future."f"\nRecommend fixes are to train a new model using the latest 'ultralytics' package or to "f"run a command with an official YOLOv8 model, i.e. 'yolo predict model=yolov8n.pt'")check_requirements(e.name)  # install missing modulereturn torch.load(file, map_location='cpu'), file  # loaddef attempt_load_weights(weights, device=None, inplace=True, fuse=False):"""Loads an ensemble of models weights=[a,b,c] or a single model weights=[a] or weights=a."""ensemble = Ensemble()for w in weights if isinstance(weights, list) else [weights]:ckpt, w = torch_safe_load(w)  # load ckptargs = {**DEFAULT_CFG_DICT, **ckpt['train_args']} if 'train_args' in ckpt else None  # combined argsmodel = (ckpt.get('ema') or ckpt['model']).to(device).float()  # FP32 model# Model compatibility updatesmodel.args = args  # attach args to modelmodel.pt_path = w  # attach *.pt file path to modelmodel.task = guess_model_task(model)if not hasattr(model, 'stride'):model.stride = torch.tensor([32.])# Appendensemble.append(model.fuse().eval() if fuse and hasattr(model, 'fuse') else model.eval())  # model in eval mode# Module updatesfor m in ensemble.modules():t = type(m)if t in (nn.Hardswish, nn.LeakyReLU, nn.ReLU, nn.ReLU6, nn.SiLU, Detect, Segment,DetectAux,Detect_DyHead):m.inplace = inplaceelif t is nn.Upsample and not hasattr(m, 'recompute_scale_factor'):m.recompute_scale_factor = None  # torch 1.11.0 compatibility# Return modelif len(ensemble) == 1:return ensemble[-1]# Return ensembleLOGGER.info(f'Ensemble created with {weights}\n')for k in 'names', 'nc', 'yaml':setattr(ensemble, k, getattr(ensemble[0], k))ensemble.stride = ensemble[torch.argmax(torch.tensor([m.stride.max() for m in ensemble])).int()].strideassert all(ensemble[0].nc == m.nc for m in ensemble), f'Models differ in class counts {[m.nc for m in ensemble]}'return ensembledef attempt_load_one_weight(weight, device=None, inplace=True, fuse=False):"""Loads a single model weights."""ckpt, weight = torch_safe_load(weight)  # load ckptargs = {**DEFAULT_CFG_DICT, **(ckpt.get('train_args', {}))}  # combine model and default args, preferring model argsmodel = (ckpt.get('ema') or ckpt['model']).to(device).float()  # FP32 model# Model compatibility updatesmodel.args = {k: v for k, v in args.items() if k in DEFAULT_CFG_KEYS}  # attach args to modelmodel.pt_path = weight  # attach *.pt file path to modelmodel.task = guess_model_task(model)if not hasattr(model, 'stride'):model.stride = torch.tensor([32.])model = model.fuse().eval() if fuse and hasattr(model, 'fuse') else model.eval()  # model in eval mode# Module updatesfor m in model.modules():t = type(m)if t in (nn.Hardswish, nn.LeakyReLU, nn.ReLU, nn.ReLU6, nn.SiLU, Detect, Segment,DetectAux,Detect_DyHead):m.inplace = inplaceelif t is nn.Upsample and not hasattr(m, 'recompute_scale_factor'):m.recompute_scale_factor = None  # torch 1.11.0 compatibility# Return model and ckptreturn model, ckptdef parse_model(d, ch, verbose=True, warehouse_manager=None):  # model_dict, input_channels(3)"""Parse a YOLO model.yaml dictionary into a PyTorch model."""import ast# Argsmax_channels = float('inf')nc, act, scales = (d.get(x) for x in ('nc', 'activation', 'scales'))depth, width, kpt_shape = (d.get(x, 1.0) for x in ('depth_multiple', 'width_multiple', 'kpt_shape'))if scales:scale = d.get('scale')if not scale:scale = tuple(scales.keys())[0]LOGGER.warning(f"WARNING ⚠️ no model scale passed. Assuming scale='{scale}'.")depth, width, max_channels = scales[scale]if act:Conv.default_act = eval(act)  # redefine default activation, i.e. Conv.default_act = nn.SiLU()if verbose:LOGGER.info(f"{colorstr('activation:')} {act}")  # printif verbose:LOGGER.info(f"\n{'':>3}{'from':>20}{'n':>3}{'params':>10}  {'module':<45}{'arguments':<30}")ch = [ch]layers, save, c2 = [], [], ch[-1]  # layers, savelist, ch outis_backbone = Falsefor i, (f, n, m, args) in enumerate(d['backbone'] + d['head']):  # from, number, module, argstry:if m == 'node_mode':m = d[m]if len(args) > 0:if args[0] == 'head_channel':args[0] = int(d[args[0]])t = mm = getattr(torch.nn, m[3:]) if 'nn.' in m else globals()[m]  # get moduleexcept:passfor j, a in enumerate(args):if isinstance(a, str):with contextlib.suppress(ValueError):try:args[j] = locals()[a] if a in locals() else ast.literal_eval(a)except:args[j] = an = n_ = max(round(n * depth), 1) if n > 1 else n  # depth gainif m in (Classify, Conv, ConvTranspose, GhostConv, Bottleneck, GhostBottleneck, SPP, SPPF, DWConv, Focus,BottleneckCSP, C1, C2, C2f, C3, C3TR, C3Ghost, nn.ConvTranspose2d, DWConvTranspose2d, C3x, RepC3,C2f_EMSC, C2f_EMSCP):if args[0] == 'head_channel':args[0] = d[args[0]]c1, c2 = ch[f], args[0]if c2 != nc:  # if c2 not equal to number of classes (i.e. for Classify() output)c2 = make_divisible(min(c2, max_channels) * width, 8)args = [c1, c2, *args[1:]]if m in (BottleneckCSP, C1, C2, C2f, C3, C3TR, C3Ghost, C3x, RepC3, C2f_EMSC, C2f_EMSCP):args.insert(2, n)  # number of repeatsn = 1elif m is AIFI:args = [ch[f], *args]elif m in (HGStem, HGBlock):c1, cm, c2 = ch[f], args[0], args[1]args = [c1, cm, c2, *args[2:]]if m is HGBlock:args.insert(4, n)  # number of repeatsn = 1elif m in (Detect, DetectAux, Pose,Detect_DyHead):args.append([ch[x] for x in f])elif m is nn.BatchNorm2d:args = [ch[f]]elif m is Concat:c2 = sum(ch[x] for x in f)elif m in (Detect, Segment, Pose):args.append([ch[x] for x in f])if m is Segment:args[2] = make_divisible(min(args[2], max_channels) * width, 8)elif m is RTDETRDecoder:  # special case, channels arg must be passed in index 1args.insert(1, [ch[x] for x in f])else:c2 = ch[f]if isinstance(c2, list):is_backbone = Truem_ = mm_.backbone = Trueelse:m_ = nn.Sequential(*(m(*args) for _ in range(n))) if n > 1 else m(*args)  # modulet = str(m)[8:-2].replace('__main__.', '')  # module typem.np = sum(x.numel() for x in m_.parameters())  # number paramsm_.i, m_.f, m_.type = i + 4 if is_backbone else i, f, t  # attach index, 'from' index, typeif verbose:LOGGER.info(f'{i:>3}{str(f):>20}{n_:>3}{m.np:10.0f}  {t:<45}{str(args):<30}')  # printsave.extend(x % (i + 4 if is_backbone else i) for x in ([f] if isinstance(f, int) else f) ifx != -1)  # append to savelistlayers.append(m_)if i == 0:ch = []if isinstance(c2, list):ch.extend(c2)for _ in range(5 - len(ch)):ch.insert(0, 0)else:ch.append(c2)return nn.Sequential(*layers), sorted(save)def yaml_model_load(path):"""Load a YOLOv8 model from a YAML file."""import repath = Path(path)if path.stem in (f'yolov{d}{x}6' for x in 'nsmlx' for d in (5, 8)):new_stem = re.sub(r'(\d+)([nslmx])6(.+)?$', r'\1\2-p6\3', path.stem)LOGGER.warning(f'WARNING ⚠️ Ultralytics YOLO P6 models now use -p6 suffix. Renaming {path.stem} to {new_stem}.')path = path.with_name(new_stem + path.suffix)unified_path = re.sub(r'(\d+)([nslmx])(.+)?$', r'\1\3', str(path))  # i.e. yolov8x.yaml -> yolov8.yamlyaml_file = check_yaml(unified_path, hard=False) or check_yaml(path)d = yaml_load(yaml_file)  # model dictd['scale'] = guess_model_scale(path)d['yaml_file'] = str(path)return ddef guess_model_scale(model_path):"""Takes a path to a YOLO model's YAML file as input and extracts the size character of the model's scale. The functionuses regular expression matching to find the pattern of the model scale in the YAML file name, which is denoted byn, s, m, l, or x. The function returns the size character of the model scale as a string.Args:model_path (str | Path): The path to the YOLO model's YAML file.Returns:(str): The size character of the model's scale, which can be n, s, m, l, or x."""with contextlib.suppress(AttributeError):import rereturn re.search(r'yolov\d+([nslmx])', Path(model_path).stem).group(1)  # n, s, m, l, or xreturn ''def guess_model_task(model):"""Guess the task of a PyTorch model from its architecture or configuration.Args:model (nn.Module | dict): PyTorch model or model configuration in YAML format.Returns:(str): Task of the model ('detect', 'segment', 'classify', 'pose').Raises:SyntaxError: If the task of the model could not be determined."""def cfg2task(cfg):"""Guess from YAML dictionary."""m = cfg['head'][-1][-2].lower()  # output module nameif m in ('classify', 'classifier', 'cls', 'fc'):return 'classify'if 'detect' in m:return 'detect'if 'segment' in m:return 'segment'if 'pose' in m:return 'pose'# Guess from model cfgif isinstance(model, dict):with contextlib.suppress(Exception):return cfg2task(model)# Guess from PyTorch modelif isinstance(model, nn.Module):  # PyTorch modelfor x in 'model.args', 'model.model.args', 'model.model.model.args':with contextlib.suppress(Exception):return eval(x)['task']for x in 'model.yaml', 'model.model.yaml', 'model.model.model.yaml':with contextlib.suppress(Exception):return cfg2task(eval(x))for m in model.modules():if isinstance(m, (Detect, DetectAux,Detect_DyHead)):return 'detect'elif isinstance(m, (Segment)):return 'segment'elif isinstance(m, Classify):return 'classify'elif isinstance(m, Pose):return 'pose'# Guess from model filenameif isinstance(model, (str, Path)):model = Path(model)if '-seg' in model.stem or 'segment' in model.parts:return 'segment'elif '-cls' in model.stem or 'classify' in model.parts:return 'classify'elif '-pose' in model.stem or 'pose' in model.parts:return 'pose'elif 'detect' in model.parts:return 'detect'# Unable to determine task from modelLOGGER.warning("WARNING ⚠️ Unable to automatically guess model task, assuming 'task=detect'. ""Explicitly define task for your model, i.e. 'task=detect', 'segment', 'classify', or 'pose'.")return 'detect'  # assume detect

3、创建yolov8+dyhead+c2fEMSC和yolov8+dyhead+c2fEMSCP的yaml文件 

两种改进方法的结合最重要一点的是最终训练的yaml文件:

yolov8+dyhead+c2fEMSC:

# Ultralytics YOLO 🚀, AGPL-3.0 license
# YOLOv8 object detection model with P3-P5 outputs. For Usage examples see https://docs.ultralytics.com/tasks/detect# Parameters
nc: 2  # number of classes
scales: # model compound scaling constants, i.e. 'model=yolov8n.yaml' will call yolov8.yaml with scale 'n'# [depth, width, max_channels]n: [0.33, 0.25, 1024]  # YOLOv8n summary: 225 layers,  3157200 parameters,  3157184 gradients,   8.9 GFLOPss: [0.33, 0.50, 1024]  # YOLOv8s summary: 225 layers, 11166560 parameters, 11166544 gradients,  28.8 GFLOPsm: [0.67, 0.75, 768]   # YOLOv8m summary: 295 layers, 25902640 parameters, 25902624 gradients,  79.3 GFLOPsl: [1.00, 1.00, 512]   # YOLOv8l summary: 365 layers, 43691520 parameters, 43691504 gradients, 165.7 GFLOPsx: [1.00, 1.25, 512]   # YOLOv8x summary: 365 layers, 68229648 parameters, 68229632 gradients, 258.5 GFLOPs# YOLOv8.0n backbone
backbone:# [from, repeats, module, args]- [-1, 1, Conv, [64, 3, 2]]  # 0-P1/2- [-1, 1, Conv, [128, 3, 2]]  # 1-P2/4- [-1, 3, C2f, [128, True]]- [-1, 1, Conv, [256, 3, 2]]  # 3-P3/8- [-1, 6, C2f, [256, True]]- [-1, 1, Conv, [512, 3, 2]]  # 5-P4/16- [-1, 6, C2f_EMSC, [512, True]]- [-1, 1, Conv, [1024, 3, 2]]  # 7-P5/32- [-1, 3, C2f_EMSC, [1024, True]]- [-1, 1, SPPF, [1024, 5]]  # 9# YOLOv8.0n head
head:- [-1, 1, nn.Upsample, [None, 2, 'nearest']]- [[-1, 6], 1, Concat, [1]]  # cat backbone P4- [-1, 3, C2f_EMSC, [512]]  # 12- [-1, 1, nn.Upsample, [None, 2, 'nearest']]- [[-1, 4], 1, Concat, [1]]  # cat backbone P3- [-1, 3, C2f, [256]]  # 15 (P3/8-small)- [-1, 1, Conv, [256, 3, 2]]- [[-1, 12], 1, Concat, [1]]  # cat head P4- [-1, 3, C2f_EMSC, [512]]  # 18 (P4/16-medium)- [-1, 1, Conv, [512, 3, 2]]- [[-1, 9], 1, Concat, [1]]  # cat head P5- [-1, 3, C2f_EMSC, [1024]]  # 21 (P5/32-large)- [[15, 18, 21], 1, Detect_DyHead, [nc, 128, 1]]  # Detect(P3, P4, P5)

yolov8+dyhead+c2fEMSCP:

# Ultralytics YOLO 🚀, AGPL-3.0 license
# YOLOv8 object detection model with P3-P5 outputs. For Usage examples see https://docs.ultralytics.com/tasks/detect# Parameters
nc: 2  # number of classes
scales: # model compound scaling constants, i.e. 'model=yolov8n.yaml' will call yolov8.yaml with scale 'n'# [depth, width, max_channels]n: [0.33, 0.25, 1024]  # YOLOv8n summary: 225 layers,  3157200 parameters,  3157184 gradients,   8.9 GFLOPss: [0.33, 0.50, 1024]  # YOLOv8s summary: 225 layers, 11166560 parameters, 11166544 gradients,  28.8 GFLOPsm: [0.67, 0.75, 768]   # YOLOv8m summary: 295 layers, 25902640 parameters, 25902624 gradients,  79.3 GFLOPsl: [1.00, 1.00, 512]   # YOLOv8l summary: 365 layers, 43691520 parameters, 43691504 gradients, 165.7 GFLOPsx: [1.00, 1.25, 512]   # YOLOv8x summary: 365 layers, 68229648 parameters, 68229632 gradients, 258.5 GFLOPs# YOLOv8.0n backbone
backbone:# [from, repeats, module, args]- [-1, 1, Conv, [64, 3, 2]]  # 0-P1/2- [-1, 1, Conv, [128, 3, 2]]  # 1-P2/4- [-1, 3, C2f, [128, True]]- [-1, 1, Conv, [256, 3, 2]]  # 3-P3/8- [-1, 6, C2f, [256, True]]- [-1, 1, Conv, [512, 3, 2]]  # 5-P4/16- [-1, 6, C2f_EMSCP, [512, True]]- [-1, 1, Conv, [1024, 3, 2]]  # 7-P5/32- [-1, 3, C2f_EMSCP, [1024, True]]- [-1, 1, SPPF, [1024, 5]]  # 9# YOLOv8.0n head
head:- [-1, 1, nn.Upsample, [None, 2, 'nearest']]- [[-1, 6], 1, Concat, [1]]  # cat backbone P4- [-1, 3, C2f_EMSCP, [512]]  # 12- [-1, 1, nn.Upsample, [None, 2, 'nearest']]- [[-1, 4], 1, Concat, [1]]  # cat backbone P3- [-1, 3, C2f, [256]]  # 15 (P3/8-small)- [-1, 1, Conv, [256, 3, 2]]- [[-1, 12], 1, Concat, [1]]  # cat head P4- [-1, 3, C2f_EMSCP, [512]]  # 18 (P4/16-medium)- [-1, 1, Conv, [512, 3, 2]]- [[-1, 9], 1, Concat, [1]]  # cat head P5- [-1, 3, C2f_EMSCP, [1024]]  # 21 (P5/32-large)- [[15, 18, 21], 1, Detect_DyHead, [nc, 128, 1]]  # Detect(P3, P4, P5)

三、运行测试

yolov8+dyhead+c2fEMSC:

yolov8+dyhead+c2fEMSCP:

可以看出最终模型中已经同时包含了Detect_DyHead检测头和C2f-EMSC或者C2f-EMSCP模块。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/595679.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

leetcode2487.从链表中移除节点

文章目录 题目思路复杂度Code 题目 给你一个链表的头节点 head 。 移除每个右侧有一个更大数值的节点。 返回修改后链表的头节点 head 。 示例 1&#xff1a; 输入&#xff1a;head [5,2,13,3,8] 输出&#xff1a;[13,8] 解释&#xff1a;需要移除的节点是 5 &#xff0…

云盘扩容、云盘缩容

天翼云文档地址&#xff1a;https://www.ctyun.cn/document/10027696/10169558 1、输入 growpart命令&#xff0c;检查此云主机是否已安装growpart扩容工具。若出现图中回显信息&#xff0c;则说明已经安装&#xff0c;无需手动安装。 注意 若没有图示信息&#xff0c;请执行…

《PySpark大数据分析实战》-24.数据可视化图表介绍

&#x1f4cb; 博主简介 &#x1f496; 作者简介&#xff1a;大家好&#xff0c;我是wux_labs。&#x1f61c; 热衷于各种主流技术&#xff0c;热爱数据科学、机器学习、云计算、人工智能。 通过了TiDB数据库专员&#xff08;PCTA&#xff09;、TiDB数据库专家&#xff08;PCTP…

prometheus基本介绍

官网&#xff1a;https://prometheus.io/docs/introduction/overview/ 中文&#xff1a; https://www.prometheus.wang/ Prometheus 选择 Prometheus 并不是偶然&#xff0c;因为&#xff1a; • Prometheus 是按照 《Google SRE 运维之道》的理念构建的&#xff0c;具有实用…

WSL2-Ubuntu22.04子系统图形化界面搭建与远程桌面连接

提示&#xff1a;文中不提供WSL2子系统搭建步骤&#xff0c;假定子系统已建立好&#xff1a; 文章目录 检查WSL子系统状态图形化界面安装远程桌面连接可能遇到的相关问题xrdp状态异常远程桌面黑屏 检查WSL子系统状态 wsl -l -v如下图所示为正常 图形化界面安装 以此执行如下…

【管理篇 / 登录】❀ 06. macOS下使用USB配置线登录 ❀ FortiGate 防火墙

【简介】飞塔防火墙上都会配有CONSOLE接口&#xff0c;包装里都会配置一根USB配置线&#xff0c;通过这个接口和这根线&#xff0c;我们可以用命令的方式登录飞塔防火墙。随着苹果电脑的普及&#xff0c;我们来学习如何在macOS中使用USB配置线登录飞塔防火墙。 早期飞塔防火墙包…

Java学习路线第六篇:互联网生态(1)

这篇则分享Java学习路线第六part&#xff1a;互联网生态 恭喜你已经成功追到第六章节啦&#xff0c;要被自己的努力感动到了吧&#xff0c;而这节将承担起学完互联网生态的使命&#xff0c;本使命为单向契约&#xff0c;你可选择YES或者选择YES。 Linux Linux从入门到精通视…

淘宝商品类目接口API:获取淘宝商品分类类目信息

cat_get-获得淘宝分类详情 响应参数 名称类型必须示例值描述 info Mix0{"cid": 16, "parent_cid": 0, "name": "其他女装", "is_parent": "true", "status": "normal", "sort_order&q…

【UnityShader入门精要学习笔记】(3)章节答疑

本系列为作者学习UnityShader入门精要而作的笔记&#xff0c;内容将包括&#xff1a; 书本中句子照抄 个人批注项目源码一堆新手会犯的错误潜在的太监断更&#xff0c;有始无终 总之适用于同样开始学习Shader的同学们进行有取舍的参考。 文章目录 复习&#xff08;阶段性总结…

matlab如何标定相机内外参和畸变参数

关于内外参矩阵和畸变矩阵可以学习 https://blog.csdn.net/qq_30815237/article/details/87530011?spm1001.2014.3001.5506 在APP中找到 camera Calibrator 点击 Add Images&#xff0c;导入拍照图片。标定20张左右就够了&#xff0c;然后角度变一下&#xff0c;但不需要变太…

Jupyter Notbook+cpolar内网穿透实现公共互联网访问使用数据分析工作

​​ 文章目录 1.前言2.Jupyter Notebook的安装2.1 Jupyter Notebook下载安装2.2 Jupyter Notebook的配置2.3 Cpolar下载安装 3.Cpolar端口设置3.1 Cpolar云端设置3.2.Cpolar本地设置 4.公网访问测试5.结语 ​​​​ 1.前言 在数据分析工作中&#xff0c;使用最多的无疑就是各…

代码随想录算法训练营第五十三天|1143.最长公共子序列、1035.不相交的线、53.最大子序和(动态规划)

代码随想录 (programmercarl.com) 1143.最长公共子序列 与718的区别在于本题不需要是连续的&#xff0c;可以是在【不改变字符的相对顺序的情况下删除某些字符&#xff08;也可以不删除任何字符&#xff09;后组成的新字符串】 1.dp数组及下标含义 dp[i][j]&#xff1a;长度…

python pillow(PIL)库使用介绍

Python 图像库向 Python 解释器添加了图像处理功能。 该库提供了广泛的文件格式支持、高效的内部表示和相当强大的图像处理功能。 核心图像库旨在快速访问以几种基本像素格式存储的数据。它应该为通用图像处理工具提供坚实的基础。 概述 Python 图像库将图像处理功能添加到…

太阳光模拟器|百科

太阳光模拟器的功率 太阳光模拟器是一种能够模拟太阳光谱的仪器&#xff0c;它的功率对于太阳能行业来说至关重要。本文将从六个方面对太阳光模拟器的功率进行详细阐述&#xff0c;包括功率的定义、功率的计算、功率的影响因素、功率的调节方法、功率的应用以及未来的发展方向…

数据库选择题 (期末复习)

数据库第一章 概论简答题 数据库第二章 关系数据库简答题 数据库第三章 SQL简答题 数据库第四第五章 安全性和完整性简答题 数据库第七章 数据库设计简答题 数据库第九章 查询处理和优化简答题 数据库第十第十一章 恢复和并发简答题 2015期末 1、在数据库中&#xff0c;下列说…

Typescript---webpack和Babel的使用 03

webpack 通常情况下&#xff0c;实际开发中我们都需要使用构建工具对代码进行打包&#xff0c;TS同样也可以结合构建工具一起使用&#xff0c;下边以webpack为例介绍一下如何结合构建工具使用TS。 步骤&#xff1a; 初始化项目 进入项目根目录&#xff0c;执行命令 npm init -…

CAN通信-报文信号格式(Inter、Motorola)

DBC 1、Inter格式和Motorola格式2、制作DBC 1、Inter格式和Motorola格式 Inter格式(小端模式)&#xff1a;高位字节存放在高地址中&#xff0c;低位字节存放在低地址中&#xff0c;数据表现&#xff1a;以一个字节为例&#xff0c;前半个字节为地位。 Motorola格式(大端模式)&…

免费SSL证书安全吗?

在互联网时代&#xff0c;网络安全已经成为了每个网民关注的焦点。而SSL证书&#xff0c;作为保障网络安全的重要工具&#xff0c;也越来越受到人们的关注。然而&#xff0c;市面上的SSL证书价格不菲&#xff0c;让许多小站长望而却步。这时&#xff0c;免费的SSL证书就显得十分…

【XR806开发板试用】+00. Win11环境下安装docker环境

很幸运得到XR806开发板的试用机会&#xff0c;在此深深感谢主办方给菜鸟一个机会。 之前开发的芯片主要是STM32、GD32之类的芯片&#xff0c;都是基于win环境的集成环境。现在拿到这块开发板感觉无从下手&#xff0c;就从安装docker环境开始&#xff0c;慢慢更新xr806的开发之…

数据库攻防学习

免责声明 本文仅供学习和研究使用,请勿使用文中的技术用于非法用途,任何人造成的任何负面影响,与本号及作者无关。 Redis 0x01 redis学习 在渗透测试面试或者网络安全面试中可能会常问redis未授权等一些知识&#xff0c;那么什么是redis&#xff1f;redis就是个数据库&#xff…