公共社区环境生活垃圾基本上是我们每个人每天几乎都无法避免的一个问题,公共环境下垃圾投放点都会有固定的值班时间,但是考虑到实际扔垃圾的无规律性,往往会出现在无人值守的时段内垃圾堆放垃圾桶溢出等问题,有些容易扩散的垃圾比如:碎纸屑、泡沫粒等等,一旦遇上大风天气往往就会被吹得遍地都是给垃圾清理工作带来负担。
本文的主要目的及时想要探索分析通过接入社区实时视频流数据来对公共环境下的垃圾投放点进行自动化的智能分析计算,当探测到异常问题比如:随意堆放垃圾、垃圾桶溢出等问题的时候结合一些人工业务预设的规则来自动通过短信等形式推送事件给相关的工作人员来进行及时的处置这一方案的可行性,博文主要是侧重对检测模型的开发实现,业务规则需要到具体的项目中去细化,这块就不作为文本的实践内容。
在前文中,我们已经陆续开发了相关的实践项目,感兴趣的话可以自行移步阅读即可:
《助力打造清洁环境,基于YOLOv3开发构建公共场景下垃圾堆放垃圾桶溢出检测识别系统》
《助力打造清洁环境,基于YOLOv4开发构建公共场景下垃圾堆放垃圾桶溢出检测识别系统》
《助力打造清洁环境,基于YOLOv5全系列模型【n/s/m/l/x】开发构建公共场景下垃圾堆放垃圾桶溢出检测识别系统》
《助力打造清洁环境,基于美团最新YOLOv6-4.0开发构建公共场景下垃圾堆放垃圾桶溢出检测识别系统》
《助力打造清洁环境,基于YOLOv7开发构建公共场景下垃圾堆放垃圾桶溢出检测识别系统》
《助力打造清洁环境,基于轻量级YOLOv8开发构建公共场景下垃圾堆放垃圾桶溢出检测识别系统》
本文主要是想要基于DETR模型来开发实践性质的项目,首先看下实例效果:
DETR (DEtection TRansformer) 是一种基于Transformer架构的端到端目标检测模型。与传统的基于区域提议的目标检测方法(如Faster R-CNN)不同,DETR采用了全新的思路,将目标检测问题转化为一个序列到序列的问题,通过Transformer模型实现目标检测和目标分类的联合训练。
DETR的工作流程如下:
输入图像通过卷积神经网络(CNN)提取特征图。
特征图作为编码器输入,经过一系列的编码器层得到图像特征的表示。
目标检测问题被建模为一个序列到序列的转换任务,其中编码器的输出作为解码器的输入。
解码器使用自注意力机制(self-attention)对编码器的输出进行处理,以获取目标的位置和类别信息。
最终,DETR通过一个线性层和softmax函数对解码器的输出进行分类,并通过一个线性层预测目标框的坐标。
DETR的优点包括:
端到端训练:DETR模型能够直接从原始图像到目标检测结果进行端到端训练,避免了传统目标检测方法中复杂的区域提议生成和特征对齐的过程,简化了模型的设计和训练流程。
不受固定数量的目标限制:DETR可以处理变长的输入序列,因此不受固定数量目标的限制。这使得DETR能够同时检测图像中的多个目标,并且不需要设置预先确定的目标数量。
全局上下文信息:DETR通过Transformer的自注意力机制,能够捕捉到图像中不同位置的目标之间的关系,提供了更大范围的上下文信息。这有助于提高目标检测的准确性和鲁棒性。
然而,DETR也存在一些缺点:
计算复杂度高:由于DETR采用了Transformer模型,它在处理大尺寸图像时需要大量的计算资源,导致其训练和推理速度相对较慢。
对小目标的检测性能较差:DETR模型在处理小目标时容易出现性能下降的情况。这是因为Transformer模型在处理小尺寸目标时可能会丢失细节信息,导致难以准确地定位和分类小目标。
接下来看下我们自己构建的数据集:
官方项目地址在这里,如下所示:
可以看到目前已经收获了超过1.2w的star量,还是很不错的了。
DETR整体数据流程示意图如下所示:
官方也提供了对应的预训练模型,可以自行使用:
name | backbone | schedule | inf_time | box AP | url | size | |
---|---|---|---|---|---|---|---|
0 | DETR | R50 | 500 | 0.036 | 42.0 | model | logs | 159Mb |
1 | DETR-DC5 | R50 | 500 | 0.083 | 43.3 | model | logs | 159Mb |
2 | DETR | R101 | 500 | 0.050 | 43.5 | model | logs | 232Mb |
3 | DETR-DC5 | R101 | 500 | 0.097 | 44.9 | model | logs | 232Mb |
COCO panoptic val5k models:
name | backbone | box AP | segm AP | PQ | url | size | |
---|---|---|---|---|---|---|---|
0 | DETR | R50 | 38.8 | 31.1 | 43.4 | download | 165Mb |
1 | DETR-DC5 | R50 | 40.2 | 31.9 | 44.6 | download | 165Mb |
2 | DETR | R101 | 40.1 | 33 | 45.1 | download | 237Mb |
如果对如何使用DETR模型来开发构建自己的个性化目标检测模型有疑问的话,可以参考我的超详细教程文章,如下:
《DETR (DEtection TRansformer)基于自建数据集开发构建目标检测模型超详细教程》
本文选择的预训练官方权重是detr-r50-e632da11.pth,首先需要基于官方的预训练权重开发能够用于自己的 个性化数据集的权重,如下所示:
pretrained_weights = torch.load("./weights/detr-r50-e632da11.pth")
num_class = 3 + 1
pretrained_weights["model"]["class_embed.weight"].resize_(num_class+1,256)
pretrained_weights["model"]["class_embed.bias"].resize_(num_class+1)
torch.save(pretrained_weights,'./weights/detr_r50_%d.pth'%num_class)
因为这里我的类别数量为3,所以num_class修改为:3+1,根据自己的实际情况修改即可。生成后如下所示:
终端执行:
python main.py --dataset_file "coco" --coco_path "/0000" --epoch 100 --lr=1e-4 --batch_size=32 --num_workers=0 --output_dir="outputs" --resume="weights/detr_r50_4.pth"
即可启动训练。训练启动如下:
DETR模型的训练依旧是很耗费算力资源。
等待漫长的训练过程完成后,我们来对结果进行可视化,核心代码如下:
【日志可视化】
def plot_logs(logs, fields=('class_error', 'loss_bbox_unscaled', 'mAP'), ewm_col=0, log_name='log.txt'):'''Function to plot specific fields from training log(s). Plots both training and test results.:: Inputs - logs = list containing Path objects, each pointing to individual dir with a log file- fields = which results to plot from each log file - plots both training and test for each field.- ewm_col = optional, which column to use as the exponential weighted smoothing of the plots- log_name = optional, name of log file if different than default 'log.txt'.:: Outputs - matplotlib plots of results in fields, color coded for each log file.- solid lines are training results, dashed lines are test results.'''func_name = "plot_utils.py::plot_logs"# verify logs is a list of Paths (list[Paths]) or single Pathlib object Path,# convert single Path to list to avoid 'not iterable' errorif not isinstance(logs, list):if isinstance(logs, PurePath):logs = [logs]print(f"{func_name} info: logs param expects a list argument, converted to list[Path].")else:raise ValueError(f"{func_name} - invalid argument for logs parameter.\n \Expect list[Path] or single Path obj, received {type(logs)}")# Quality checks - verify valid dir(s), that every item in list is Path object, and that log_name exists in each dirfor i, dir in enumerate(logs):if not isinstance(dir, PurePath):raise ValueError(f"{func_name} - non-Path object in logs argument of {type(dir)}: \n{dir}")if not dir.exists():raise ValueError(f"{func_name} - invalid directory in logs argument:\n{dir}")# verify log_name existsfn = Path(dir / log_name)if not fn.exists():print(f"-> missing {log_name}. Have you gotten to Epoch 1 in training?")print(f"--> full path of missing log file: {fn}")return# load log file(s) and plotdfs = [pd.read_json(Path(p) / log_name, lines=True) for p in logs]fig, axs = plt.subplots(ncols=len(fields), figsize=(16, 5))for df, color in zip(dfs, sns.color_palette(n_colors=len(logs))):for j, field in enumerate(fields):if field == 'mAP':coco_eval = pd.DataFrame(np.stack(df.test_coco_eval_bbox.dropna().values)[:, 1]).ewm(com=ewm_col).mean()axs[j].plot(coco_eval, c=color)else:df.interpolate().ewm(com=ewm_col).mean().plot(y=[f'train_{field}', f'test_{field}'],ax=axs[j],color=[color] * 2,style=['-', '--'])for ax, field in zip(axs, fields):ax.legend([Path(p).name for p in logs])ax.set_title(field)
结果如下所示:
【precision recall可视化】
def plot_precision_recall(files, naming_scheme='iter'):if naming_scheme == 'exp_id':# name becomes exp_idnames = [f.parts[-3] for f in files]elif naming_scheme == 'iter':names = [f.stem for f in files]else:raise ValueError(f'not supported {naming_scheme}')fig, axs = plt.subplots(ncols=2, figsize=(16, 5))for f, color, name in zip(files, sns.color_palette("Blues", n_colors=len(files)), names):data = torch.load(f)# precision is n_iou, n_points, n_cat, n_area, max_detprecision = data['precision']recall = data['params'].recThrsscores = data['scores']# take precision for all classes, all areas and 100 detectionsprecision = precision[0, :, :, 0, -1].mean(1)scores = scores[0, :, :, 0, -1].mean(1)prec = precision.mean()rec = data['recall'][0, :, 0, -1].mean()print(f'{naming_scheme} {name}: mAP@50={prec * 100: 05.1f}, ' +f'score={scores.mean():0.3f}, ' +f'f1={2 * prec * rec / (prec + rec + 1e-8):0.3f}')axs[0].plot(recall, precision, c=color)axs[1].plot(recall, scores, c=color)axs[0].set_title('Precision / Recall')axs[0].legend(names)axs[1].set_title('Scores / Recall')axs[1].legend(names)return fig, axs
结果如下所示:
这里我们对其计算了F1值,代码如下:
def F1(P,R):"""F1值"""return 2*P*R/(P+R)
结果如下:
感兴趣的话都可以自行尝试下。