目录
一 BiFPN(双向特征金字塔网络)
1 BiFPN
2 EfficientDet
二 使用BiFPN助力模型更优秀
1 整体修改
2 配置文件
3 训练
其他
一 BiFPN(双向特征金字塔网络)
BiFPN(双向特征金字塔网络, 2020)用于特征融合层。
官方论文地址:https://arxiv.org/pdf/1911.09070.pdf
在计算机视觉中,模型效率变得越来越重要。本文系统地研究了目标检测的神经网络架构选择,并提出了几个关键的优化方法来提高效率。首先,提出了一种加权双向特征金字塔网络(BiFPN),它可以实现简单快速的多尺度特征融合;其次,提出了一种复合缩放方法,该方法可以同时对所有骨干网络、特征网络和框/类预测网络的分辨率、深度和宽度进行统一缩放。基于这些优化和更好的主干,开发了一个新的对象检测器系列,称为EfficientDet,它在广泛的资源限制范围内始终实现比现有技术更高的效率。
代码地址:https://github.com/google/automl/tree/master/efficientdet
1 BiFPN
下图中的特征网络设计:
a FPN引入了自顶向下的路径来融合从3级到7级的多尺度特征 (p3 - p7);
b PANet在FPN之上增加了一条额外的自下而上的通路;
c NAS-FPN采用神经网络架构搜索(NAS)找到不规则的特征网络拓扑,然后重复应用同一块;
d 通过双向跨尺度连接和重复同一块的设计,提高了准确性和效率的权衡。
2 EfficientDet
下图是EfficientDet 的架构:
采用EfficientDet作为骨干网,采用BiFPN作为特征网,采用共享类/框预测网络。
提出的BiFPN作为特征网络,从骨干网中提取3-7级特征{P3, P4, P5, P6, P7},重复进行自顶向下和自底向上的双向特征融合。这些融合的特征被送到类和框网络中,分别产生对象的类别和边界框预测。类和框网络的权重在所有级别的特征上是共享的。
二 使用BiFPN助力模型更优秀
1 整体修改
① 添加BiFPN.py文件
在ultralytics/nn/modules目录下新建BiFPN.py文件,文件的内容如下:
import torch.nn as nn
import torch__all__ = ['Bi_FPN']class swish(nn.Module):def forward(self, x):return x * torch.sigmoid(x)class Bi_FPN(nn.Module):def __init__(self, length):super().__init__()self.weight = nn.Parameter(torch.ones(length, dtype=torch.float32), requires_grad=True)self.swish = swish()self.epsilon = 0.0001def forward(self, x):weights = self.weight / (torch.sum(self.swish(self.weight), dim=0) + self.epsilon) # 权重归一化处理weighted_feature_maps = [weights[i] * x[i] for i in range(len(x))]stacked_feature_maps = torch.stack(weighted_feature_maps, dim=0)result = torch.sum(stacked_feature_maps, dim=0)return result
② 修改ultralytics/nn/tasks.py文件
具体的修改内容如下图所示:
2 配置文件
说明:以下配置文件的内容来源于网络,非个人设计。
yolov8_BiFPN.yaml 的内容如下所示:
# Ultralytics YOLO 🚀, AGPL-3.0 license
# YOLOv8 object detection model with P3-P5 outputs. For Usage examples see https://docs.ultralytics.com/tasks/detect# Parameters
nc: 80 # number of classes
scales: # model compound scaling constants, i.e. 'model=yolov8n.yaml' will call yolov8.yaml with scale 'n'# [depth, width, max_channels]n: [0.33, 0.25, 1024] # YOLOv8n summary: 225 layers, 3157200 parameters, 3157184 gradients, 8.9 GFLOPss: [0.33, 0.50, 1024] # YOLOv8s summary: 225 layers, 11166560 parameters, 11166544 gradients, 28.8 GFLOPsm: [0.67, 0.75, 768] # YOLOv8m summary: 295 layers, 25902640 parameters, 25902624 gradients, 79.3 GFLOPsl: [1.00, 1.00, 512] # YOLOv8l summary: 365 layers, 43691520 parameters, 43691504 gradients, 165.7 GFLOPsx: [1.00, 1.25, 512] # YOLOv8x summary: 365 layers, 68229648 parameters, 68229632 gradients, 258.5 GFLOPs# YOLOv8.0n backbone
backbone:# [from, repeats, module, args]- [-1, 1, Conv, [64, 3, 2]] # 0-P1/2- [-1, 1, Conv, [128, 3, 2]] # 1-P2/4- [-1, 3, C2f, [128, True]]- [-1, 1, Conv, [256, 3, 2]] # 3-P3/8- [-1, 6, C2f, [256, True]]- [-1, 1, Conv, [512, 3, 2]] # 5-P4/16- [-1, 6, C2f, [512, True]]- [-1, 1, Conv, [1024, 3, 2]] # 7-P5/32- [-1, 3, C2f, [1024, True]]- [-1, 1, SPPF, [1024, 5]] # 9# YOLOv8.0n head
head:- [4, 1, Conv, [256]] # 10-P3/8- [6, 1, Conv, [256]] # 11-P4/16- [9, 1, Conv, [256]] # 12-P5/32- [-1, 1, nn.Upsample, [None, 2, "nearest"]]- [[-1, 11], 1, Bi_FPN, []] # 14- [-1, 3, C2f, [256]] # 15-P4/16- [-1, 1, nn.Upsample, [None, 2, "nearest"]]- [[-1, 10], 1, Bi_FPN, []] # 17- [-1, 3, C2f, [256]] # 18-P3/8- [1, 1, Conv, [256, 3, 2]] - [[-1, 10, 18], 1, Bi_FPN, []] # 20- [-1, 3, C2f, [256]] # 21-P3/8- [-1, 1, Conv, [256, 3, 2]] # 22 P3->P4- [[-1, 11, 15], 1, Bi_FPN, []] # 23- [-1, 3, C2f, [512]] # 24-P4/16- [-1, 1, Conv, [256, 3, 2]] # 25 P4->P5- [[-1, 12], 1, Bi_FPN, []] # 26- [-1, 3, C2f, [1024]] # 27-P5/32- [[21, 24, 27], 1, Detect, [nc]] # Detect(P3, P4, P5)
3 训练
上述修改完毕后,开始训练吧!🌺🌺🌺
训练示例:
yolo task=detect mode=train model=cfg/models/v8/yolov8_BiFPN.yaml data=cfg/datasets/coco128.yaml epochs=100 batch=16 device=cpu project=yolov8
其他
如果觉得替换部分内容不方便的话,可以直接复制下述文件对应替换原始py文件的内容:
- 修改后的task.py
# Ultralytics YOLO 🚀, AGPL-3.0 licenseimport contextlib
from copy import deepcopy
from pathlib import Pathimport torch
import torch.nn as nnfrom ultralytics.nn.modules import (AIFI,C1,C2,C3,C3TR,OBB,SPP,SPPELAN,SPPF,ADown,Bottleneck,BottleneckCSP,C2f,C2fAttn,C3Ghost,C3x,CBFuse,CBLinear,Classify,Concat,Conv,Conv2,ConvTranspose,Detect,DWConv,DWConvTranspose2d,Focus,GhostBottleneck,GhostConv,HGBlock,HGStem,ImagePoolingAttn,Pose,RepC3,RepConv,RepNCSPELAN4,ResNetLayer,RTDETRDecoder,Segment,Silence,WorldDetect,
)
from ultralytics.utils import DEFAULT_CFG_DICT, DEFAULT_CFG_KEYS, LOGGER, colorstr, emojis, yaml_load
from ultralytics.utils.checks import check_requirements, check_suffix, check_yaml
from ultralytics.utils.loss import v8ClassificationLoss, v8DetectionLoss, v8OBBLoss, v8PoseLoss, v8SegmentationLoss
from ultralytics.utils.plotting import feature_visualization
from ultralytics.utils.torch_utils import (fuse_conv_and_bn,fuse_deconv_and_bn,initialize_weights,intersect_dicts,make_divisible,model_info,scale_img,time_sync,
)
from .modules.DynamicHead import Detect_DynamicHead #导入DynamicHead.py中的检测头
from .modules.OREPA import OREPA #导入OREPA
from .modules.BiFPN import Bi_FPN #导入Bi_FPN
try:import thop
except ImportError:thop = Noneclass BaseModel(nn.Module):"""The BaseModel class serves as a base class for all the models in the Ultralytics YOLO family."""def forward(self, x, *args, **kwargs):"""Forward pass of the model on a single scale. Wrapper for `_forward_once` method.Args:x (torch.Tensor | dict): The input image tensor or a dict including image tensor and gt labels.Returns:(torch.Tensor): The output of the network."""if isinstance(x, dict): # for cases of training and validating while training.return self.loss(x, *args, **kwargs)return self.predict(x, *args, **kwargs)def predict(self, x, profile=False, visualize=False, augment=False, embed=None):"""Perform a forward pass through the network.Args:x (torch.Tensor): The input tensor to the model.profile (bool): Print the computation time of each layer if True, defaults to False.visualize (bool): Save the feature maps of the model if True, defaults to False.augment (bool): Augment image during prediction, defaults to False.embed (list, optional): A list of feature vectors/embeddings to return.Returns:(torch.Tensor): The last output of the model."""if augment:return self._predict_augment(x)return self._predict_once(x, profile, visualize, embed)def _predict_once(self, x, profile=False, visualize=False, embed=None):"""Perform a forward pass through the network.Args:x (torch.Tensor): The input tensor to the model.profile (bool): Print the computation time of each layer if True, defaults to False.visualize (bool): Save the feature maps of the model if True, defaults to False.embed (list, optional): A list of feature vectors/embeddings to return.Returns:(torch.Tensor): The last output of the model."""y, dt, embeddings = [], [], [] # outputsfor m in self.model:if m.f != -1: # if not from previous layerx = y[m.f] if isinstance(m.f, int) else [x if j == -1 else y[j] for j in m.f] # from earlier layersif profile:self._profile_one_layer(m, x, dt)x = m(x) # runy.append(x if m.i in self.save else None) # save outputif visualize:feature_visualization(x, m.type, m.i, save_dir=visualize)if embed and m.i in embed:embeddings.append(nn.functional.adaptive_avg_pool2d(x, (1, 1)).squeeze(-1).squeeze(-1)) # flattenif m.i == max(embed):return torch.unbind(torch.cat(embeddings, 1), dim=0)return xdef _predict_augment(self, x):"""Perform augmentations on input image x and return augmented inference."""LOGGER.warning(f"WARNING ⚠️ {self.__class__.__name__} does not support augmented inference yet. "f"Reverting to single-scale inference instead.")return self._predict_once(x)def _profile_one_layer(self, m, x, dt):"""Profile the computation time and FLOPs of a single layer of the model on a given input. Appends the results tothe provided list.Args:m (nn.Module): The layer to be profiled.x (torch.Tensor): The input data to the layer.dt (list): A list to store the computation time of the layer.Returns:None"""c = m == self.model[-1] and isinstance(x, list) # is final layer list, copy input as inplace fixflops = thop.profile(m, inputs=[x.copy() if c else x], verbose=False)[0] / 1e9 * 2 if thop else 0 # FLOPst = time_sync()for _ in range(10):m(x.copy() if c else x)dt.append((time_sync() - t) * 100)if m == self.model[0]:LOGGER.info(f"{'time (ms)':>10s} {'GFLOPs':>10s} {'params':>10s} module")LOGGER.info(f"{dt[-1]:10.2f} {flops:10.2f} {m.np:10.0f} {m.type}")if c:LOGGER.info(f"{sum(dt):10.2f} {'-':>10s} {'-':>10s} Total")def fuse(self, verbose=True):"""Fuse the `Conv2d()` and `BatchNorm2d()` layers of the model into a single layer, in order to improve thecomputation efficiency.Returns:(nn.Module): The fused model is returned."""if not self.is_fused():for m in self.model.modules():if isinstance(m, (Conv, Conv2, DWConv)) and hasattr(m, "bn"):if isinstance(m, Conv2):m.fuse_convs()m.conv = fuse_conv_and_bn(m.conv, m.bn) # update convdelattr(m, "bn") # remove batchnormm.forward = m.forward_fuse # update forwardif isinstance(m, ConvTranspose) and hasattr(m, "bn"):m.conv_transpose = fuse_deconv_and_bn(m.conv_transpose, m.bn)delattr(m, "bn") # remove batchnormm.forward = m.forward_fuse # update forwardif isinstance(m, RepConv):m.fuse_convs()m.forward = m.forward_fuse # update forwardself.info(verbose=verbose)return selfdef is_fused(self, thresh=10):"""Check if the model has less than a certain threshold of BatchNorm layers.Args:thresh (int, optional): The threshold number of BatchNorm layers. Default is 10.Returns:(bool): True if the number of BatchNorm layers in the model is less than the threshold, False otherwise."""bn = tuple(v for k, v in nn.__dict__.items() if "Norm" in k) # normalization layers, i.e. BatchNorm2d()return sum(isinstance(v, bn) for v in self.modules()) < thresh # True if < 'thresh' BatchNorm layers in modeldef info(self, detailed=False, verbose=True, imgsz=640):"""Prints model information.Args:detailed (bool): if True, prints out detailed information about the model. Defaults to Falseverbose (bool): if True, prints out the model information. Defaults to Falseimgsz (int): the size of the image that the model will be trained on. Defaults to 640"""return model_info(self, detailed=detailed, verbose=verbose, imgsz=imgsz)def _apply(self, fn):"""Applies a function to all the tensors in the model that are not parameters or registered buffers.Args:fn (function): the function to apply to the modelReturns:(BaseModel): An updated BaseModel object."""self = super()._apply(fn)m = self.model[-1] # Detect()if isinstance(m, (Detect, Detect_DynamicHead)): # includes all Detect subclasses like Segment, Pose, OBB, WorldDetectm.stride = fn(m.stride)m.anchors = fn(m.anchors)m.strides = fn(m.strides)return selfdef load(self, weights, verbose=True):"""Load the weights into the model.Args:weights (dict | torch.nn.Module): The pre-trained weights to be loaded.verbose (bool, optional): Whether to log the transfer progress. Defaults to True."""model = weights["model"] if isinstance(weights, dict) else weights # torchvision models are not dictscsd = model.float().state_dict() # checkpoint state_dict as FP32csd = intersect_dicts(csd, self.state_dict()) # intersectself.load_state_dict(csd, strict=False) # loadif verbose:LOGGER.info(f"Transferred {len(csd)}/{len(self.model.state_dict())} items from pretrained weights")def loss(self, batch, preds=None):"""Compute loss.Args:batch (dict): Batch to compute loss onpreds (torch.Tensor | List[torch.Tensor]): Predictions."""if not hasattr(self, "criterion"):self.criterion = self.init_criterion()preds = self.forward(batch["img"]) if preds is None else predsreturn self.criterion(preds, batch)def init_criterion(self):"""Initialize the loss criterion for the BaseModel."""raise NotImplementedError("compute_loss() needs to be implemented by task heads")class DetectionModel(BaseModel):"""YOLOv8 detection model."""def __init__(self, cfg="yolov8n.yaml", ch=3, nc=None, verbose=True): # model, input channels, number of classes"""Initialize the YOLOv8 detection model with the given config and parameters."""super().__init__()self.yaml = cfg if isinstance(cfg, dict) else yaml_model_load(cfg) # cfg dict# Define modelch = self.yaml["ch"] = self.yaml.get("ch", ch) # input channelsif nc and nc != self.yaml["nc"]:LOGGER.info(f"Overriding model.yaml nc={self.yaml['nc']} with nc={nc}")self.yaml["nc"] = nc # override YAML valueself.model, self.save = parse_model(deepcopy(self.yaml), ch=ch, verbose=verbose) # model, savelistself.names = {i: f"{i}" for i in range(self.yaml["nc"])} # default names dictself.inplace = self.yaml.get("inplace", True)# Build stridesm = self.model[-1] # Detect()if isinstance(m, (Detect, Detect_DynamicHead)): # includes all Detect subclasses like Segment, Pose, OBB, WorldDetects = 256 # 2x min stridem.inplace = self.inplaceforward = lambda x: self.forward(x)[0] if isinstance(m, (Segment, Pose, OBB)) else self.forward(x)m.stride = torch.tensor([s / x.shape[-2] for x in forward(torch.zeros(1, ch, s, s))]) # forwardself.stride = m.stridem.bias_init() # only run onceelse:self.stride = torch.Tensor([32]) # default stride for i.e. RTDETR# Init weights, biasesinitialize_weights(self)if verbose:self.info()LOGGER.info("")def _predict_augment(self, x):"""Perform augmentations on input image x and return augmented inference and train outputs."""img_size = x.shape[-2:] # height, widths = [1, 0.83, 0.67] # scalesf = [None, 3, None] # flips (2-ud, 3-lr)y = [] # outputsfor si, fi in zip(s, f):xi = scale_img(x.flip(fi) if fi else x, si, gs=int(self.stride.max()))yi = super().predict(xi)[0] # forwardyi = self._descale_pred(yi, fi, si, img_size)y.append(yi)y = self._clip_augmented(y) # clip augmented tailsreturn torch.cat(y, -1), None # augmented inference, train@staticmethoddef _descale_pred(p, flips, scale, img_size, dim=1):"""De-scale predictions following augmented inference (inverse operation)."""p[:, :4] /= scale # de-scalex, y, wh, cls = p.split((1, 1, 2, p.shape[dim] - 4), dim)if flips == 2:y = img_size[0] - y # de-flip udelif flips == 3:x = img_size[1] - x # de-flip lrreturn torch.cat((x, y, wh, cls), dim)def _clip_augmented(self, y):"""Clip YOLO augmented inference tails."""nl = self.model[-1].nl # number of detection layers (P3-P5)g = sum(4**x for x in range(nl)) # grid pointse = 1 # exclude layer counti = (y[0].shape[-1] // g) * sum(4**x for x in range(e)) # indicesy[0] = y[0][..., :-i] # largei = (y[-1].shape[-1] // g) * sum(4 ** (nl - 1 - x) for x in range(e)) # indicesy[-1] = y[-1][..., i:] # smallreturn ydef init_criterion(self):"""Initialize the loss criterion for the DetectionModel."""return v8DetectionLoss(self)class OBBModel(DetectionModel):"""YOLOv8 Oriented Bounding Box (OBB) model."""def __init__(self, cfg="yolov8n-obb.yaml", ch=3, nc=None, verbose=True):"""Initialize YOLOv8 OBB model with given config and parameters."""super().__init__(cfg=cfg, ch=ch, nc=nc, verbose=verbose)def init_criterion(self):"""Initialize the loss criterion for the model."""return v8OBBLoss(self)class SegmentationModel(DetectionModel):"""YOLOv8 segmentation model."""def __init__(self, cfg="yolov8n-seg.yaml", ch=3, nc=None, verbose=True):"""Initialize YOLOv8 segmentation model with given config and parameters."""super().__init__(cfg=cfg, ch=ch, nc=nc, verbose=verbose)def init_criterion(self):"""Initialize the loss criterion for the SegmentationModel."""return v8SegmentationLoss(self)class PoseModel(DetectionModel):"""YOLOv8 pose model."""def __init__(self, cfg="yolov8n-pose.yaml", ch=3, nc=None, data_kpt_shape=(None, None), verbose=True):"""Initialize YOLOv8 Pose model."""if not isinstance(cfg, dict):cfg = yaml_model_load(cfg) # load model YAMLif any(data_kpt_shape) and list(data_kpt_shape) != list(cfg["kpt_shape"]):LOGGER.info(f"Overriding model.yaml kpt_shape={cfg['kpt_shape']} with kpt_shape={data_kpt_shape}")cfg["kpt_shape"] = data_kpt_shapesuper().__init__(cfg=cfg, ch=ch, nc=nc, verbose=verbose)def init_criterion(self):"""Initialize the loss criterion for the PoseModel."""return v8PoseLoss(self)class ClassificationModel(BaseModel):"""YOLOv8 classification model."""def __init__(self, cfg="yolov8n-cls.yaml", ch=3, nc=None, verbose=True):"""Init ClassificationModel with YAML, channels, number of classes, verbose flag."""super().__init__()self._from_yaml(cfg, ch, nc, verbose)def _from_yaml(self, cfg, ch, nc, verbose):"""Set YOLOv8 model configurations and define the model architecture."""self.yaml = cfg if isinstance(cfg, dict) else yaml_model_load(cfg) # cfg dict# Define modelch = self.yaml["ch"] = self.yaml.get("ch", ch) # input channelsif nc and nc != self.yaml["nc"]:LOGGER.info(f"Overriding model.yaml nc={self.yaml['nc']} with nc={nc}")self.yaml["nc"] = nc # override YAML valueelif not nc and not self.yaml.get("nc", None):raise ValueError("nc not specified. Must specify nc in model.yaml or function arguments.")self.model, self.save = parse_model(deepcopy(self.yaml), ch=ch, verbose=verbose) # model, savelistself.stride = torch.Tensor([1]) # no stride constraintsself.names = {i: f"{i}" for i in range(self.yaml["nc"])} # default names dictself.info()@staticmethoddef reshape_outputs(model, nc):"""Update a TorchVision classification model to class count 'n' if required."""name, m = list((model.model if hasattr(model, "model") else model).named_children())[-1] # last moduleif isinstance(m, Classify): # YOLO Classify() headif m.linear.out_features != nc:m.linear = nn.Linear(m.linear.in_features, nc)elif isinstance(m, nn.Linear): # ResNet, EfficientNetif m.out_features != nc:setattr(model, name, nn.Linear(m.in_features, nc))elif isinstance(m, nn.Sequential):types = [type(x) for x in m]if nn.Linear in types:i = types.index(nn.Linear) # nn.Linear indexif m[i].out_features != nc:m[i] = nn.Linear(m[i].in_features, nc)elif nn.Conv2d in types:i = types.index(nn.Conv2d) # nn.Conv2d indexif m[i].out_channels != nc:m[i] = nn.Conv2d(m[i].in_channels, nc, m[i].kernel_size, m[i].stride, bias=m[i].bias is not None)def init_criterion(self):"""Initialize the loss criterion for the ClassificationModel."""return v8ClassificationLoss()class RTDETRDetectionModel(DetectionModel):"""RTDETR (Real-time DEtection and Tracking using Transformers) Detection Model class.This class is responsible for constructing the RTDETR architecture, defining loss functions, and facilitating boththe training and inference processes. RTDETR is an object detection and tracking model that extends from theDetectionModel base class.Attributes:cfg (str): The configuration file path or preset string. Default is 'rtdetr-l.yaml'.ch (int): Number of input channels. Default is 3 (RGB).nc (int, optional): Number of classes for object detection. Default is None.verbose (bool): Specifies if summary statistics are shown during initialization. Default is True.Methods:init_criterion: Initializes the criterion used for loss calculation.loss: Computes and returns the loss during training.predict: Performs a forward pass through the network and returns the output."""def __init__(self, cfg="rtdetr-l.yaml", ch=3, nc=None, verbose=True):"""Initialize the RTDETRDetectionModel.Args:cfg (str): Configuration file name or path.ch (int): Number of input channels.nc (int, optional): Number of classes. Defaults to None.verbose (bool, optional): Print additional information during initialization. Defaults to True."""super().__init__(cfg=cfg, ch=ch, nc=nc, verbose=verbose)def init_criterion(self):"""Initialize the loss criterion for the RTDETRDetectionModel."""from ultralytics.models.utils.loss import RTDETRDetectionLossreturn RTDETRDetectionLoss(nc=self.nc, use_vfl=True)def loss(self, batch, preds=None):"""Compute the loss for the given batch of data.Args:batch (dict): Dictionary containing image and label data.preds (torch.Tensor, optional): Precomputed model predictions. Defaults to None.Returns:(tuple): A tuple containing the total loss and main three losses in a tensor."""if not hasattr(self, "criterion"):self.criterion = self.init_criterion()img = batch["img"]# NOTE: preprocess gt_bbox and gt_labels to list.bs = len(img)batch_idx = batch["batch_idx"]gt_groups = [(batch_idx == i).sum().item() for i in range(bs)]targets = {"cls": batch["cls"].to(img.device, dtype=torch.long).view(-1),"bboxes": batch["bboxes"].to(device=img.device),"batch_idx": batch_idx.to(img.device, dtype=torch.long).view(-1),"gt_groups": gt_groups,}preds = self.predict(img, batch=targets) if preds is None else predsdec_bboxes, dec_scores, enc_bboxes, enc_scores, dn_meta = preds if self.training else preds[1]if dn_meta is None:dn_bboxes, dn_scores = None, Noneelse:dn_bboxes, dec_bboxes = torch.split(dec_bboxes, dn_meta["dn_num_split"], dim=2)dn_scores, dec_scores = torch.split(dec_scores, dn_meta["dn_num_split"], dim=2)dec_bboxes = torch.cat([enc_bboxes.unsqueeze(0), dec_bboxes]) # (7, bs, 300, 4)dec_scores = torch.cat([enc_scores.unsqueeze(0), dec_scores])loss = self.criterion((dec_bboxes, dec_scores), targets, dn_bboxes=dn_bboxes, dn_scores=dn_scores, dn_meta=dn_meta)# NOTE: There are like 12 losses in RTDETR, backward with all losses but only show the main three losses.return sum(loss.values()), torch.as_tensor([loss[k].detach() for k in ["loss_giou", "loss_class", "loss_bbox"]], device=img.device)def predict(self, x, profile=False, visualize=False, batch=None, augment=False, embed=None):"""Perform a forward pass through the model.Args:x (torch.Tensor): The input tensor.profile (bool, optional): If True, profile the computation time for each layer. Defaults to False.visualize (bool, optional): If True, save feature maps for visualization. Defaults to False.batch (dict, optional): Ground truth data for evaluation. Defaults to None.augment (bool, optional): If True, perform data augmentation during inference. Defaults to False.embed (list, optional): A list of feature vectors/embeddings to return.Returns:(torch.Tensor): Model's output tensor."""y, dt, embeddings = [], [], [] # outputsfor m in self.model[:-1]: # except the head partif m.f != -1: # if not from previous layerx = y[m.f] if isinstance(m.f, int) else [x if j == -1 else y[j] for j in m.f] # from earlier layersif profile:self._profile_one_layer(m, x, dt)x = m(x) # runy.append(x if m.i in self.save else None) # save outputif visualize:feature_visualization(x, m.type, m.i, save_dir=visualize)if embed and m.i in embed:embeddings.append(nn.functional.adaptive_avg_pool2d(x, (1, 1)).squeeze(-1).squeeze(-1)) # flattenif m.i == max(embed):return torch.unbind(torch.cat(embeddings, 1), dim=0)head = self.model[-1]x = head([y[j] for j in head.f], batch) # head inferencereturn xclass WorldModel(DetectionModel):"""YOLOv8 World Model."""def __init__(self, cfg="yolov8s-world.yaml", ch=3, nc=None, verbose=True):"""Initialize YOLOv8 world model with given config and parameters."""self.txt_feats = torch.randn(1, nc or 80, 512) # features placeholderself.clip_model = None # CLIP model placeholdersuper().__init__(cfg=cfg, ch=ch, nc=nc, verbose=verbose)def set_classes(self, text, batch=80, cache_clip_model=True):"""Set classes in advance so that model could do offline-inference without clip model."""try:import clipexcept ImportError:check_requirements("git+https://github.com/ultralytics/CLIP.git")import clipif (not getattr(self, "clip_model", None) and cache_clip_model): # for backwards compatibility of models lacking clip_model attributeself.clip_model = clip.load("ViT-B/32")[0]model = self.clip_model if cache_clip_model else clip.load("ViT-B/32")[0]device = next(model.parameters()).devicetext_token = clip.tokenize(text).to(device)txt_feats = [model.encode_text(token).detach() for token in text_token.split(batch)]txt_feats = txt_feats[0] if len(txt_feats) == 1 else torch.cat(txt_feats, dim=0)txt_feats = txt_feats / txt_feats.norm(p=2, dim=-1, keepdim=True)self.txt_feats = txt_feats.reshape(-1, len(text), txt_feats.shape[-1])self.model[-1].nc = len(text)def predict(self, x, profile=False, visualize=False, txt_feats=None, augment=False, embed=None):"""Perform a forward pass through the model.Args:x (torch.Tensor): The input tensor.profile (bool, optional): If True, profile the computation time for each layer. Defaults to False.visualize (bool, optional): If True, save feature maps for visualization. Defaults to False.txt_feats (torch.Tensor): The text features, use it if it's given. Defaults to None.augment (bool, optional): If True, perform data augmentation during inference. Defaults to False.embed (list, optional): A list of feature vectors/embeddings to return.Returns:(torch.Tensor): Model's output tensor."""txt_feats = (self.txt_feats if txt_feats is None else txt_feats).to(device=x.device, dtype=x.dtype)if len(txt_feats) != len(x):txt_feats = txt_feats.repeat(len(x), 1, 1)ori_txt_feats = txt_feats.clone()y, dt, embeddings = [], [], [] # outputsfor m in self.model: # except the head partif m.f != -1: # if not from previous layerx = y[m.f] if isinstance(m.f, int) else [x if j == -1 else y[j] for j in m.f] # from earlier layersif profile:self._profile_one_layer(m, x, dt)if isinstance(m, C2fAttn):x = m(x, txt_feats)elif isinstance(m, WorldDetect):x = m(x, ori_txt_feats)elif isinstance(m, ImagePoolingAttn):txt_feats = m(x, txt_feats)else:x = m(x) # runy.append(x if m.i in self.save else None) # save outputif visualize:feature_visualization(x, m.type, m.i, save_dir=visualize)if embed and m.i in embed:embeddings.append(nn.functional.adaptive_avg_pool2d(x, (1, 1)).squeeze(-1).squeeze(-1)) # flattenif m.i == max(embed):return torch.unbind(torch.cat(embeddings, 1), dim=0)return xdef loss(self, batch, preds=None):"""Compute loss.Args:batch (dict): Batch to compute loss on.preds (torch.Tensor | List[torch.Tensor]): Predictions."""if not hasattr(self, "criterion"):self.criterion = self.init_criterion()if preds is None:preds = self.forward(batch["img"], txt_feats=batch["txt_feats"])return self.criterion(preds, batch)class Ensemble(nn.ModuleList):"""Ensemble of models."""def __init__(self):"""Initialize an ensemble of models."""super().__init__()def forward(self, x, augment=False, profile=False, visualize=False):"""Function generates the YOLO network's final layer."""y = [module(x, augment, profile, visualize)[0] for module in self]# y = torch.stack(y).max(0)[0] # max ensemble# y = torch.stack(y).mean(0) # mean ensembley = torch.cat(y, 2) # nms ensemble, y shape(B, HW, C)return y, None # inference, train output# Functions ------------------------------------------------------------------------------------------------------------@contextlib.contextmanager
def temporary_modules(modules=None):"""Context manager for temporarily adding or modifying modules in Python's module cache (`sys.modules`).This function can be used to change the module paths during runtime. It's useful when refactoring code,where you've moved a module from one location to another, but you still want to support the old importpaths for backwards compatibility.Args:modules (dict, optional): A dictionary mapping old module paths to new module paths.Example:```pythonwith temporary_modules({'old.module.path': 'new.module.path'}):import old.module.path # this will now import new.module.path```Note:The changes are only in effect inside the context manager and are undone once the context manager exits.Be aware that directly manipulating `sys.modules` can lead to unpredictable results, especially in largerapplications or libraries. Use this function with caution."""if not modules:modules = {}import importlibimport systry:# Set modules in sys.modules under their old namefor old, new in modules.items():sys.modules[old] = importlib.import_module(new)yieldfinally:# Remove the temporary module pathsfor old in modules:if old in sys.modules:del sys.modules[old]def torch_safe_load(weight):"""This function attempts to load a PyTorch model with the torch.load() function. If a ModuleNotFoundError is raised,it catches the error, logs a warning message, and attempts to install the missing module via thecheck_requirements() function. After installation, the function again attempts to load the model using torch.load().Args:weight (str): The file path of the PyTorch model.Returns:(dict): The loaded PyTorch model."""from ultralytics.utils.downloads import attempt_download_assetcheck_suffix(file=weight, suffix=".pt")file = attempt_download_asset(weight) # search online if missing locallytry:with temporary_modules({"ultralytics.yolo.utils": "ultralytics.utils","ultralytics.yolo.v8": "ultralytics.models.yolo","ultralytics.yolo.data": "ultralytics.data",}): # for legacy 8.0 Classify and Pose modelsckpt = torch.load(file, map_location="cpu")except ModuleNotFoundError as e: # e.name is missing module nameif e.name == "models":raise TypeError(emojis(f"ERROR ❌️ {weight} appears to be an Ultralytics YOLOv5 model originally trained "f"with https://github.com/ultralytics/yolov5.\nThis model is NOT forwards compatible with "f"YOLOv8 at https://github.com/ultralytics/ultralytics."f"\nRecommend fixes are to train a new model using the latest 'ultralytics' package or to "f"run a command with an official YOLOv8 model, i.e. 'yolo predict model=yolov8n.pt'")) from eLOGGER.warning(f"WARNING ⚠️ {weight} appears to require '{e.name}', which is not in ultralytics requirements."f"\nAutoInstall will run now for '{e.name}' but this feature will be removed in the future."f"\nRecommend fixes are to train a new model using the latest 'ultralytics' package or to "f"run a command with an official YOLOv8 model, i.e. 'yolo predict model=yolov8n.pt'")check_requirements(e.name) # install missing moduleckpt = torch.load(file, map_location="cpu")if not isinstance(ckpt, dict):# File is likely a YOLO instance saved with i.e. torch.save(model, "saved_model.pt")LOGGER.warning(f"WARNING ⚠️ The file '{weight}' appears to be improperly saved or formatted. "f"For optimal results, use model.save('filename.pt') to correctly save YOLO models.")ckpt = {"model": ckpt.model}return ckpt, file # loaddef attempt_load_weights(weights, device=None, inplace=True, fuse=False):"""Loads an ensemble of models weights=[a,b,c] or a single model weights=[a] or weights=a."""ensemble = Ensemble()for w in weights if isinstance(weights, list) else [weights]:ckpt, w = torch_safe_load(w) # load ckptargs = {**DEFAULT_CFG_DICT, **ckpt["train_args"]} if "train_args" in ckpt else None # combined argsmodel = (ckpt.get("ema") or ckpt["model"]).to(device).float() # FP32 model# Model compatibility updatesmodel.args = args # attach args to modelmodel.pt_path = w # attach *.pt file path to modelmodel.task = guess_model_task(model)if not hasattr(model, "stride"):model.stride = torch.tensor([32.0])# Appendensemble.append(model.fuse().eval() if fuse and hasattr(model, "fuse") else model.eval()) # model in eval mode# Module updatesfor m in ensemble.modules():if hasattr(m, "inplace"):m.inplace = inplaceelif isinstance(m, nn.Upsample) and not hasattr(m, "recompute_scale_factor"):m.recompute_scale_factor = None # torch 1.11.0 compatibility# Return modelif len(ensemble) == 1:return ensemble[-1]# Return ensembleLOGGER.info(f"Ensemble created with {weights}\n")for k in "names", "nc", "yaml":setattr(ensemble, k, getattr(ensemble[0], k))ensemble.stride = ensemble[int(torch.argmax(torch.tensor([m.stride.max() for m in ensemble])))].strideassert all(ensemble[0].nc == m.nc for m in ensemble), f"Models differ in class counts {[m.nc for m in ensemble]}"return ensembledef attempt_load_one_weight(weight, device=None, inplace=True, fuse=False):"""Loads a single model weights."""ckpt, weight = torch_safe_load(weight) # load ckptargs = {**DEFAULT_CFG_DICT, **(ckpt.get("train_args", {}))} # combine model and default args, preferring model argsmodel = (ckpt.get("ema") or ckpt["model"]).to(device).float() # FP32 model# Model compatibility updatesmodel.args = {k: v for k, v in args.items() if k in DEFAULT_CFG_KEYS} # attach args to modelmodel.pt_path = weight # attach *.pt file path to modelmodel.task = guess_model_task(model)if not hasattr(model, "stride"):model.stride = torch.tensor([32.0])model = model.fuse().eval() if fuse and hasattr(model, "fuse") else model.eval() # model in eval mode# Module updatesfor m in model.modules():if hasattr(m, "inplace"):m.inplace = inplaceelif isinstance(m, nn.Upsample) and not hasattr(m, "recompute_scale_factor"):m.recompute_scale_factor = None # torch 1.11.0 compatibility# Return model and ckptreturn model, ckptdef parse_model(d, ch, verbose=True): # model_dict, input_channels(3)"""Parse a YOLO model.yaml dictionary into a PyTorch model."""import ast# Argsmax_channels = float("inf")nc, act, scales = (d.get(x) for x in ("nc", "activation", "scales"))depth, width, kpt_shape = (d.get(x, 1.0) for x in ("depth_multiple", "width_multiple", "kpt_shape"))if scales:scale = d.get("scale")if not scale:scale = tuple(scales.keys())[0]LOGGER.warning(f"WARNING ⚠️ no model scale passed. Assuming scale='{scale}'.")depth, width, max_channels = scales[scale]if act:Conv.default_act = eval(act) # redefine default activation, i.e. Conv.default_act = nn.SiLU()if verbose:LOGGER.info(f"{colorstr('activation:')} {act}") # printif verbose:LOGGER.info(f"\n{'':>3}{'from':>20}{'n':>3}{'params':>10} {'module':<45}{'arguments':<30}")ch = [ch]layers, save, c2 = [], [], ch[-1] # layers, savelist, ch outfor i, (f, n, m, args) in enumerate(d["backbone"] + d["head"]): # from, number, module, argsm = getattr(torch.nn, m[3:]) if "nn." in m else globals()[m] # get modulefor j, a in enumerate(args):if isinstance(a, str):with contextlib.suppress(ValueError):args[j] = locals()[a] if a in locals() else ast.literal_eval(a)n = n_ = max(round(n * depth), 1) if n > 1 else n # depth gainif m in {Classify,Conv,ConvTranspose,GhostConv,Bottleneck,GhostBottleneck,SPP,SPPF,DWConv,Focus,BottleneckCSP,C1,C2,C2f,RepNCSPELAN4,ADown,SPPELAN,C2fAttn,C3,C3TR,C3Ghost,nn.ConvTranspose2d,DWConvTranspose2d,C3x,RepC3,}:c1, c2 = ch[f], args[0]if c2 != nc: # if c2 not equal to number of classes (i.e. for Classify() output)c2 = make_divisible(min(c2, max_channels) * width, 8)if m is C2fAttn:args[1] = make_divisible(min(args[1], max_channels // 2) * width, 8) # embed channelsargs[2] = int(max(round(min(args[2], max_channels // 2 // 32)) * width, 1) if args[2] > 1 else args[2]) # num headsargs = [c1, c2, *args[1:]]if m in {BottleneckCSP, C1, C2, C2f, C2fAttn, C3, C3TR, C3Ghost, C3x, RepC3}:args.insert(2, n) # number of repeatsn = 1elif m is AIFI:args = [ch[f], *args]elif m in {OREPA}:args = [ch[f], *args]elif m in {HGStem, HGBlock}:c1, cm, c2 = ch[f], args[0], args[1]args = [c1, cm, c2, *args[2:]]if m is HGBlock:args.insert(4, n) # number of repeatsn = 1elif m in {Bi_FPN}: #注册Bi_FPN模块args = [len([ch[x] for x in f])]elif m is ResNetLayer:c2 = args[1] if args[3] else args[1] * 4elif m is nn.BatchNorm2d:args = [ch[f]]elif m is Concat:c2 = sum(ch[x] for x in f)elif m in {Detect, WorldDetect, Segment, Pose, OBB, ImagePoolingAttn, Detect_DynamicHead}:args.append([ch[x] for x in f])if m is Segment:args[2] = make_divisible(min(args[2], max_channels) * width, 8)elif m is RTDETRDecoder: # special case, channels arg must be passed in index 1args.insert(1, [ch[x] for x in f])elif m is CBLinear:c2 = args[0]c1 = ch[f]args = [c1, c2, *args[1:]]elif m is CBFuse:c2 = ch[f[-1]]else:c2 = ch[f]m_ = nn.Sequential(*(m(*args) for _ in range(n))) if n > 1 else m(*args) # modulet = str(m)[8:-2].replace("__main__.", "") # module typem.np = sum(x.numel() for x in m_.parameters()) # number paramsm_.i, m_.f, m_.type = i, f, t # attach index, 'from' index, typeif verbose:LOGGER.info(f"{i:>3}{str(f):>20}{n_:>3}{m.np:10.0f} {t:<45}{str(args):<30}") # printsave.extend(x % i for x in ([f] if isinstance(f, int) else f) if x != -1) # append to savelistlayers.append(m_)if i == 0:ch = []ch.append(c2)return nn.Sequential(*layers), sorted(save)def yaml_model_load(path):"""Load a YOLOv8 model from a YAML file."""import repath = Path(path)if path.stem in (f"yolov{d}{x}6" for x in "nsmlx" for d in (5, 8)):new_stem = re.sub(r"(\d+)([nslmx])6(.+)?$", r"\1\2-p6\3", path.stem)LOGGER.warning(f"WARNING ⚠️ Ultralytics YOLO P6 models now use -p6 suffix. Renaming {path.stem} to {new_stem}.")path = path.with_name(new_stem + path.suffix)unified_path = re.sub(r"(\d+)([nslmx])(.+)?$", r"\1\3", str(path)) # i.e. yolov8x.yaml -> yolov8.yamlyaml_file = check_yaml(unified_path, hard=False) or check_yaml(path)d = yaml_load(yaml_file) # model dictd["scale"] = guess_model_scale(path)d["yaml_file"] = str(path)return ddef guess_model_scale(model_path):"""Takes a path to a YOLO model's YAML file as input and extracts the size character of the model's scale. The functionuses regular expression matching to find the pattern of the model scale in the YAML file name, which is denoted byn, s, m, l, or x. The function returns the size character of the model scale as a string.Args:model_path (str | Path): The path to the YOLO model's YAML file.Returns:(str): The size character of the model's scale, which can be n, s, m, l, or x."""with contextlib.suppress(AttributeError):import rereturn re.search(r"yolov\d+([nslmx])", Path(model_path).stem).group(1) # n, s, m, l, or xreturn ""def guess_model_task(model):"""Guess the task of a PyTorch model from its architecture or configuration.Args:model (nn.Module | dict): PyTorch model or model configuration in YAML format.Returns:(str): Task of the model ('detect', 'segment', 'classify', 'pose').Raises:SyntaxError: If the task of the model could not be determined."""def cfg2task(cfg):"""Guess from YAML dictionary."""m = cfg["head"][-1][-2].lower() # output module nameif m in {"classify", "classifier", "cls", "fc"}:return "classify"if m == "detect":return "detect"if m == "segment":return "segment"if m == "pose":return "pose"if m == "obb":return "obb"else:return "detect"# Guess from model cfgif isinstance(model, dict):with contextlib.suppress(Exception):return cfg2task(model)# Guess from PyTorch modelif isinstance(model, nn.Module): # PyTorch modelfor x in "model.args", "model.model.args", "model.model.model.args":with contextlib.suppress(Exception):return eval(x)["task"]for x in "model.yaml", "model.model.yaml", "model.model.model.yaml":with contextlib.suppress(Exception):return cfg2task(eval(x))for m in model.modules():if isinstance(m, Segment):return "segment"elif isinstance(m, Classify):return "classify"elif isinstance(m, Pose):return "pose"elif isinstance(m, OBB):return "obb"elif isinstance(m, (Detect, WorldDetect, Detect_DynamicHead)):return "detect"# Guess from model filenameif isinstance(model, (str, Path)):model = Path(model)if "-seg" in model.stem or "segment" in model.parts:return "segment"elif "-cls" in model.stem or "classify" in model.parts:return "classify"elif "-pose" in model.stem or "pose" in model.parts:return "pose"elif "-obb" in model.stem or "obb" in model.parts:return "obb"elif "detect" in model.parts:return "detect"# Unable to determine task from modelLOGGER.warning("WARNING ⚠️ Unable to automatically guess model task, assuming 'task=detect'. ""Explicitly define task for your model, i.e. 'task=detect', 'segment', 'classify','pose' or 'obb'.")return "detect" # assume detect
到此,本文分享的内容就结束啦!遇见便是缘,感恩遇见!!!💛 💙 💜 ❤️ 💚