前言
本文我将介绍我和我的团队自主研发设计的一款AI产品的成果展示——“基于视频AI识别技术的煤矿安全生产管理系统”。
这款产品是目前我在创业阶段和几位矿业大学的博士共同从架构设计、开发到交付的全过程中首次在博客频道发布, 我之前一直想写但没有机会来整理这套系统的架构, 因此我也特别感谢CSDN平台提供了这个机会,让我有了一定的动力来整理并分享这套系统的架构。
本文主要介绍的是系统的萌芽,系统的架构设计,业务模块的划分,容灾恢复等多个方面来介绍这款产品。
我相信这将为系统架构设计,业务模块划分,容灾业务处理能力感到困惑的技术开发人员也能提供有价值的思路。
人工智能在煤矿安全检测领域的发展趋势
近年来,在中美两国的AI发展中,仍然存在一定的差异。美国在AI基础研究和技术创新方面处于领先地位,而且他们拥有丰富的资金和人才支持。而中国则在应用层面迅速追赶,特别是在工业应用和智能化建设方面取得了显著进展, 中国的AI发展更注重产业协同,推动技术与实际应用的结合,以实现更高效的生产和管理。
随着AI技术在国内的迅猛发展,AI正在重塑各个行业,尤其是在煤矿安全检测领域。煤矿业作为国民经济发展的关键支柱型产业,其智能化建设对于促进矿山安全稳健发展、确保国家能源资源安全具有举足轻重的意义。近年来,我国矿山智能化建设步伐加快,成效显著,然而仍面临发展不均衡、不充分及不协调等挑战,亟需进一步优化与提升。为深入贯彻落实《中共中央办公厅 国务院办公厅关于进一步加强矿山安全生产工作的意见》 ,大家可以参考关于印发《关于深入推进矿山智能化建设 促进矿山安全发展的指导意见》的通知 查看具体政策意见。除此之外,国家能源局也出台了关于进一步加快煤矿智能化建设促进煤炭高质量发展的通知
明确要求加强新一代通信技术、人工智能(AI)、数据中心等信息基础设施建设,鼓励具备条件的地方建设煤炭工业互联网平台,逐步实现煤矿生产、经营、管理等数据的智能分析和统一管理。推广AI视频智能监控、井下高精度定位、露天矿边坡监测预警等系统,强化关键区域、重点岗位的实时监控。
可见未来AI领域将对煤矿业的生产带来生产效率,安全的提升,优化资源管理和故障管理维护的预测,促进智能化转型等多方面的影响,安全性将产生非常深远重大的意义。
项目的萌芽
自2011年毕业以来,我首先在IBM Platform Computing深耕虚拟云计算领域,积累了近两年的宝贵经验。随后,我转战淘米网与网易,投身于游戏开发行业,不断拓展自己的技术边界。之后,我决定踏上创业之路,专注于人工智能领域的研发与创新。一次偶然的机遇,我在CSDN上与几位来自矿业大学的博士都对人工智能发展趋势,模型训练聊的投机,志同道合,仿佛“桃园结义”,所以我们又共同致力于推动AI技术在煤矿行业的深度应用与发展,一步步稳健前行。
在这样的矿业背景和AI发展趋势下,我们从2014年3月份开始设计一款基于视频AI识别技术的煤矿安全生产管理系统设计,并迅速推广给西北各个煤矿企业,我主要负责全平台的系统架构设计。
系统架构设计
系统架构设计是软件开发和部署中的核心环节,尤其在煤矿行业这样复杂且高风险的环境中,架构设计需要综合考虑多种因素,以确保系统的性能、可靠性和安全性。以下将从通用架构设计因素和煤矿行业特定环境因素两个方面进行分析。
系统架构设计需要考虑的因素
在设计系统架构时(有对服务器架构设计开发感兴趣的可以订阅专栏 游戏服务器开发专栏 缓存设计专栏),除了要考虑通用的业务需求,性能,可扩展性,高可用,安全性以及成本控制等方面之外,作为煤矿行业,他的环境又有一定的特殊性,这就决定了系统架构设计需要额外考虑以下环境因素:
1. 复杂的井上井下环境
高湿度和粉尘浓度
煤矿井下环境通常湿度大且粉尘浓度高,这对硬件设备的耐用性和系统的稳定性提出了更高要求。需要选择防尘、防潮的设备,并设计冗余系统以应对设备故障。有限的网络条件
井下井上网络通常带宽有限且延迟较高,系统需要优化数据传输协议,减少带宽占用,并支持离线模式以应对网络中断。2. 安全性和实时性
安全监测需求
煤矿行业需要实时监测瓦斯浓度、温度、压力等环境参数,以防止安全事故。系统必须具备高实时性和可靠性,确保数据采集和处理的及时性。应急响应能力
系统需要支持快速响应机制,例如在检测到危险情况时,能够自动触发报警并执行预设的应急方案。3. 系统的可扩展性
模块化设计
煤矿系统通常需要集成多种功能模块(如设备监控、人员定位、生产调度等)。模块化设计可以提高系统的灵活性,便于后续功能扩展。分布式架构
由于煤矿作业区域分散,系统需要采用分布式架构,以支持多区域协同工作,同时避免单点故障。4. 法规与合规性
煤矿行业受严格的安全法规约束,系统设计必须符合相关标准(如数据存储合规性、隐私保护等)
需求简要说明
这里我简要说明下系统设计的需求部分:操作人员可以通过任何一台电脑用浏览器登陆系统页面,部署视频监控,硬件嵌入式盒子,算法和模型的信息,部署告警的区域,配置的算法一旦触发告警策略,会立即通知给管理平台,并通过音柱对告警所在区域进行语音告警,可分析出某一个时间段内所有告警类型的趋势并针对告警趋势进行预判预测。
以下是目前常见的煤矿安全生产过程中需要检测的算法:
告警分类 算法
人员违规及异常行为 人员违规穿越皮带,人员脱岗行为,人员在岗睡觉,规范佩戴安全帽,
规范穿戴反光衣,打电话,吸烟,人员聚集,固定场所巡检行为
主运输系统隐患管理 皮带大块煤检测,皮带异物检测,皮带跑偏检测,皮带空载
皮带煤量识别,烟火检测,高温检测,火焰检测,吸烟检测
破碎站人员报警,人员接近仓口预警
车辆运输异常行为识别 人员下车检测,超时停车检测
软件架构设计
一、应用与服务层
该层位于架构的最顶端,直接与用户和业务需求对接。它包括以下关键组件:
- 视频结构化:在视频结构化过程中,通过RTSP等协议传输的视频流可以被接入到处理系统中。处理系统会对这些视频流进行解码、去噪、稳定处理等预处理操作,然后,利用算法对预处理后的视频流进行特征提取和对象识别等处理,从而提取出视频中的关键信息,以便于后续的应用和分析。
- 业务模块:如“人员违规行为”、“车辆结构化”、“火点烟雾检测”等,这些模块实现了具体的业务逻辑和功能需求。
- 算法模型与特征提取:负责处理和分析数据,提取有用信息,并应用算法模型进行预测和决策。
- 态势预测:通过态势预测,基于当前和历史数据,对未来一段时间内的发展趋势进行预估,从而为决策制定提供科学依据,及时发现并预警潜在的风险和问题,使相关人员能够提前采取措施进行防范,从而降低损失。
二、模型与数据处理层
该层位于应用与服务层之下,专注于算法模型的部署、数据处理和特征提取。它包括:
- BMC引擎与BMCV Engine:专为特定业务场景设计的算法引擎,提供高效的模型推理和数据处理能力。
- OpenCV与FFmpeg:开源的计算机视觉和多媒体处理库,用于图像和视频的处理与分析。
三、基础架构与资源管理层
该层位于模型与数据处理层之下,负责提供基础架构支持和资源管理服务。它包括:
- Docker容器:用于封装和部署算法模型,提高模型的可移植性和可扩展性。通过Docker,可以轻松地在不同环境中部署和运行模型。
- K8S:作为容器编排系统,Kubernetes负责管理和调度容器化应用,提供自动化部署、扩展、管理和运行容器化应用程序的能力。
- 操作系统内核(Kernel):操作系统的核心部分,负责管理硬件资源、提供进程调度、内存管理等功能。
- 设备驱动程序(Device Drivers):与硬件设备通信的接口,使操作系统能够控制和管理这些设备。
- NGINX:高性能的HTTP和反向代理服务器,用于处理网络通信和负载均衡。
- BootLoader:引导加载程序,负责系统启动时加载操作系统内核。
- 数据库(MySQL):作为数据存储的核心,MySQL负责存储和管理系统所需的所有结构化数据,如区域信息,设备信息,算法信息,模型信息,告警信息等。
- Redis:高性能的键值存储数据库,用于缓存数据、热数据存储,灾难恢复和崩溃拉起业务。
各个模块业务松耦合
松耦合是一种软件设计原则,旨在降低系统组件之间的依赖关系,使系统更加可扩展和易维护(感兴趣的可以阅读 :你的代码是否按照高内聚、低耦合的原则来设计的?-CSDN博客)。其实从我们的业务需求上不难划分出各个模块来,我使用了服务化设计的思路:将应用拆分为多个独立的服务,每个服务负责特定的功能。这些服务通过轻量级的通信协议(如REST、gRPC)进行交互,而不依赖于彼此的具体实现。
因此我拆分为三个主要的服务: AI管理平台(Platform),算法后处理服务(AIEC)和算法处理服务(AIInterface),其中AIEC和AIInterface都是运行在嵌入式盒子或者服务器上的,这就要求AIEC和AIInterface适配不同的厂家硬件。
Platform:
平台的功能相对简单,主要是提供人机交互界面,操作人员登陆后对相机,算法,模型等资源进行配置和升级管理:
接收和查看算法告警信息
根据告警历史记录进行告警的态势预测:
AIEC:
AIEC实际上充当的是算法调度器的角色,由于它运行在arm嵌入式盒子或者服务器上,那么他就需要适配多种硬件厂商来满足不同场景下的业务需要。
考虑到适配不同硬件,并且要对算法调度,因此我们抽象化了硬件接口:
type IBaseDevice interface {GetDeviceType() string //获取设备类型Init() error //启动的时候初始化一些配置文件StartAI() error //开启算法接口(必须初始化一些配置检查之类的)StopAI() error //关闭算法接口StatDeep() error //检查ai算法服务状态GetDeepState() (bool, string, error)ReloadAIProcess() error //重启ai算法服务Terminal() error //关闭ai算法服务Stop() error //暂停ai算法服务
}
对于不同的硬件比如华为,算能,英伟达只需要实现以上接口即可。
同样的,由于AIEC需要对算法服务的检测结果要进一步的分析推理,因此我们要对不同的算法进行抽象,实际上就只有一个接口方法DetectAnlysis(result *AIResultReq)而已,通过调用接口方法可以获取到AIInterface的检测结果,并根据不同的算法类型进行各自的推理分析。
比如离岗检测,需要统计某一个时间段内,目标区域是否有人员识别结果,一旦识别到人,则重置识别状态和结果,否则达到时间段的阈值则触发告警。
如何调度的简要说明:
我们通过RESTful接口接收来自Platform的相机和算法更新信息,处理这些请求,并根据相机和算法调度相应的检测算法服务。之后,我们异步获取算法返回的检测结果,进行进一步的分析推理,最终将需要告警的信息推送回Platform。
AIInterface:
同样的,AIInterface也需要根据不同硬件设备(常见的SE算能,英伟达Nvidia,华为Huawei盒子、服务器)来适配做算法处理,根据视频流信息和算法信息进行模型 推理,通过rabbitmq消息队列将算法检测结果通知给AIEC。
由于不同的硬件资源他们的算力不同,因此对于不同的硬件,我们也采用多进程多模型计算,单进程多模型等多种灵活的配置方案来解决算力和计算性能上的问题。
模型训练
我们都知道,模型训练在机器学习和人工智能领域中扮演着至关重要的角色。它不仅是构建有效模型的基础,也是实现高准确性和可靠性的关键过程。
我们大多数模型训练都是通过自己采集数据->人工标注->模型训练这样的方式来进行的,以人以及穿戴装备为例,大多数据集我们都使用了公开的数据集。
我就以安全帽识别为例,我们使用了 Kaggle的数据集:Safety Helmet Detection | KaggleSafety Helmet Detection | Kaggle
该数据集包含 5000 个带有边界框注释的图像,格式为 PASCAL VOC 格式且主要针对以下 3 个类别:helmet, head person,存在数据不平衡:
从上面绘制的直方图中可以清楚看到,数据集中几乎 75% 的出现都属于头盔类,这不是什么好事。
虽然这个数据集从来都不是我们的首选,但这是我们在互联网上能找到的唯一像样的数据集。我们将整个数据集按照 8:1:1 的比例划分为训练集、验证集和测试集。上述集合分别包含 4000、500 和 500 张图像。
我们使用了预先训练的Faster R-CNN 模型,并以 Resnet50 为主 ,并根据该数据集的训练分割进行了微调。让我们看看训练模型的代码片段:
用于训练迭代的函数定义:
from tqdm import tqdmdef train(train_data_loader, model):print('Training...')global train_itrglobal train_loss_list# initialize tqdm progress barprog_bar = tqdm(train_data_loader, total=len(train_data_loader))# criterion = torch.nn.BCELoss(size_average=True)for i, data in enumerate(prog_bar):optimizer.zero_grad() #####images, targets = data #####images = list(image.to(DEVICE) for image in images) #####targets = [{k: v.to(DEVICE) for k, v in t.items()} for t in targets] #####loss_dict = model(images, targets) #####losses = sum(loss for loss in loss_dict.values()) #####loss_value = losses.item()train_loss_list.append(loss_value)train_loss_hist.send(loss_value)losses.backward() #####optimizer.step() #####train_itr += 1 ###### update the loss value beside the progress bar for each iterationprog_bar.set_description(desc=f"Loss: {loss_value:.4f}")return train_loss_list
验证迭代的函数定义:
# function for running validation iterations
def validate(valid_data_loader, model):print('Validating...')global val_itrglobal val_loss_list# initialize tqdm progress barprog_bar = tqdm(valid_data_loader, total=len(valid_data_loader))for i, data in enumerate(prog_bar):images, targets = dataimages = list(image.to(DEVICE) for image in images)targets = [{k: v.to(DEVICE) for k, v in t.items()} for t in targets]with torch.no_grad():loss_dict = model(images, targets)losses = sum(loss for loss in loss_dict.values())loss_value = losses.item()val_loss_list.append(loss_value)val_loss_hist.send(loss_value)val_itr += 1# update the loss value beside the progress bar for each iterationprog_bar.set_description(desc=f"Loss: {loss_value:.4f}")return val_loss_list
平均类用于记录训练和验证损失
class Averager:def __init__(self):self.current_total = 0.0self.iterations = 0.0def send(self, value):self.current_total += valueself.iterations += 1@propertydef value(self):if self.iterations == 0:return 0else:return 1.0 * self.current_total / self.iterationsdef reset(self):self.current_total = 0.0self.iterations = 0.0class SaveBestModel:"""Class to save the best model while training. If the current epoch'svalidation loss is less than the previous least less, then save themodel state."""def __init__(self, best_valid_loss=float('inf')):self.best_valid_loss = best_valid_lossdef __call__(self, current_valid_loss, epoch, model, optimizer):if current_valid_loss < self.best_valid_loss:self.best_valid_loss = current_valid_lossprint(f"\nBest validation loss: {self.best_valid_loss:.3f}")print(f"\nSaving best model for epoch: {epoch+1}\n")torch.save({'epoch': epoch+1,'model_state_dict': model.state_dict(),'optimizer_state_dict': optimizer.state_dict(),'scheduler_state_dict': scheduler.state_dict(),}, '/content/drive/My Drive/helmet_dataset/savedmodel/best_model.pth')# function to save the model after each epoch and after training ends
def save_model(epoch, model, optimizer):"""Function to save the trained model till current epoch, or whenver called"""torch.save({'epoch': epoch+1,'model_state_dict': model.state_dict(),'optimizer_state_dict': optimizer.state_dict(),'scheduler_state_dict': scheduler.state_dict(),}, '/content/drive/My Drive/helmet_dataset/savedmodel/last_model.pth')
训练循环
train_loss_hist = Averager()
val_loss_hist = Averager()
train_itr = 1
val_itr = 1# train and validation loss lists to store loss values of all
# iterations till end and plot graphs for all iterations
train_loss_list = []
val_loss_list = []# initialize SaveBestModel class
save_best_model = SaveBestModel()NUM_EPOCHS = 16 # number of epochs to train for
epoch=0# start the training epochs
for epoch in range(epoch, NUM_EPOCHS):print(f"\nEPOCH {epoch+1} of {NUM_EPOCHS}")# reset the training and validation loss histories for the current epochtrain_loss_hist.reset()val_loss_hist.reset()train_loss = train(train_loader, model)val_loss = validate(valid_loader, model)print(scheduler.get_last_lr()) # this prints the lr used in training that specific epochscheduler.step()print(f"Epoch #{epoch+1} train loss: {train_loss_hist.value:.3f}")print(f"Epoch #{epoch+1} validation loss: {val_loss_hist.value:.3f}")# save the best model till now if we have the least loss in the current epochsave_best_model(val_loss_hist.value, epoch, model, optimizer)# save the current epoch modelsave_model(epoch, model, optimizer)
这些代码片段将指导我们完成用于该模型的整个训练过程。我们首先定义了用于迭代训练和验证数据集的函数。然后定义了用于记录损失和保存最佳和最后模型的类。然后我们有了训练循环的最终代码片段。
最终经过训练的模型图如下:
我们测试下训练结果:
设置置信度为0.65,选择一张测试图片,我们来看下检测结果
检测结果信息:
{
"predictions": [
{
"x": 244.5,
"y": 66.5,
"width": 111,
"height": 79,
"confidence": 0.891,
"class": "helmet",
"class_id": 1,
"detection_id": "d6ca5493-b249-44b2-b857-56d98057051d"
},
{
"x": 189,
"y": 366,
"width": 40,
"height": 40,
"confidence": 0.83,
"class": "boots",
"class_id": 0,
"detection_id": "24dc2b28-235d-42ea-9c17-b4583ee75295"
},
{
"x": 198,
"y": 237,
"width": 312,
"height": 388,
"confidence": 0.731,
"class": "person",
"class_id": 5,
"detection_id": "7c5d5d4f-931e-4486-ac98-b56a8984ff80"
}
]
}
硬件适配
我将从硬件性能、开发 SDK 和适用场景三个方面,对 SE 算能盒子、英伟达 Nvidia 盒子(如 Jetson 系列)以及华为盒子(如昇腾系列)进行详细对比。
硬件对比
特性 | SE 算能盒子 | 英伟达 Nvidia 盒子 | 华为盒子 |
---|---|---|---|
处理器架构 | 专用 AI 加速芯片(如 BM1684X TPU) | Tegra 系列 SoC(如 Jetson Xavier NX) | 昇腾 AI 芯片(如昇腾310/910) |
算力 | INT8 算力高达 32 TOPS(BM1684X) | FP16 算力高达 32 TOPS(Jetson AGX Xavier) | FP16 算力高达 256 TOPS(昇腾910) |
功耗 | 低功耗设计,典型功耗 15W-30W | 功耗范围 10W-30W(根据型号不同) | 功耗范围 30W-310W(根据型号不同) |
存储与内存 | DDR4/DDR5 内存,eMMC 存储 | LPDDR4 内存,eMMC 或 NVMe 存储 | HBM 高带宽内存(如 HBM2e) |
接口支持 | 丰富的 I/O 接口(HDMI、USB、以太网等) | 支持 PCIe、USB、HDMI、GPIO 等 | 支持 PCIe、以太网、NVLink 等高性能接口 |
适用场景 | 视频分析、工业检测、边缘 AI 推理 | 自动驾驶、机器人、边缘 AI 推理 | 云端 AI 训练、大规模推理、边缘计算 |
硬件总结
- SE 算能盒子:专注于低功耗、高效的边缘 AI 推理,适合工业和视频分析场景。
- 英伟达 Nvidia 盒子:通用性强,支持从边缘推理到图形处理的多种任务。
- 华为盒子:以高性能为主,适合云端 AI 训练和大规模推理任务。
SDK 对比
特性 | SE 算能盒子 SDK | 英伟达 Nvidia SDK | 华为盒子 SDK |
---|---|---|---|
开发工具 | Sophon SDK,支持模型优化、推理加速 | JetPack SDK,包含 CUDA、cuDNN、TensorRT 等工具 | MindSpore 和 CANN,支持训练与推理 |
支持框架 | TensorFlow、PyTorch、Caffe 等 | TensorFlow、PyTorch、ONNX 等 | TensorFlow、PyTorch、MindSpore 等 |
模型优化 | BMCompiler 和 BMOptimizer | TensorRT 进行模型优化和推理加速 | CANN 提供模型编译与优化 |
编程语言支持 | C++、Python | C++、Python | C++、Python |
文档与社区支持 | 文档较完善,社区支持相对较少 | 文档丰富,社区活跃,支持广泛 | 文档完善,社区支持逐步增强 |
SDK 总结
- SE 算能盒子:Sophon SDK 针对其专用硬件进行了深度优化,适合特定 AI 推理任务。
- 英伟达 Nvidia 盒子:JetPack SDK 提供全面的开发工具,生态系统成熟,适合多样化开发需求。
- 华为盒子:MindSpore 和 CANN 提供从训练到推理的完整支持,适合高性能 AI 应用。
华为适配
从刚才的硬件,SDK对比结果不难看出,不同硬件设备的适配方案是各有不同的,这里我就以以华为服务器为例, 华为的 MindX 组件 是昇腾 AI 生态系统中的重要部分,旨在简化 AI 应用的开发和部署。通过与 pipeline(数据管道)相结合,MindX 提供了一种高效的方式来处理数据流和任务流,从而实现复杂的 AI 应用场景。 开发文档大家可以参考: 简介-mxVision 用户指南-智能视频分析-MindSDK5.0.0开发文档-昇腾社区
MindX 提供了多个功能模块,这些模块可以通过 pipeline 进行灵活组合。例如:
- 数据解码模块:负责将输入数据(如图片或视频流)解码为可处理的格式。
- 推理模块:调用 AI 模型进行推理。
- 后处理模块:对推理结果进行处理,如生成可视化输出或统计分析。
-
配置驱动:pipeline 的配置文件定义了各模块的执行顺序、参数设置以及模块间的数据流动。
1.设置pipeline
例如设置我们的pipeline信息如下:
{"detection": {"appsink0": {"factory": "appsink"},"mxpi_imageresize0": {"factory": "mxpi_imageresize","next": "mxpi_modelinfer0","props": {"interpolation": "1","resizeType": "Resizer_KeepAspectRatio_Fit","paddingType": "Padding_NO"}},"mxpi_modelinfer0": {"factory": "mxpi_modelinfer","next": "appsink0","props": {"dataSource": "mxpi_imageresize0","deviceId": "0","labelPath": "/data0/algorithm/hvhpp/model/hvhpp.names","modelPath": "/data0/algorithm/hvhpp/model/hvhpp.om","postProcessConfigPath": "/data0/algorithm/hvhpp/model/yolov5.cfg","postProcessLibPath": "/data0/algorithm/mxVision-5.0.1/lib/libMpYOLOv5PostProcessor.so"}},"mxpi_parallel2serial0": {"factory": "mxpi_parallel2serial","next": "mxpi_imageresize0","props": {"dataSource": "mxpi_videodecoder1, mxpi_videodecoder22, mxpi_videodecoder14, mxpi_videodecoder3, mxpi_videodecoder10, mxpi_videodecoder15, mxpi_videodecoder4, mxpi_videodecoder24, mxpi_videodecoder5, mxpi_videodecoder21, mxpi_videodecoder0, mxpi_videodecoder9, mxpi_videodecoder2"}},"mxpi_rtspsrc0": {"factory": "mxpi_rtspsrc","next": "mxpi_videodecoder0","props": {"channelId": "0","rtspUrl": "rtsp://admin:a!123456789@192.168.2.81:554/media1/video2/video"}},"mxpi_rtspsrc1": {"factory": "mxpi_rtspsrc","next": "mxpi_videodecoder1","props": {"channelId": "1","rtspUrl": "rtsp://admin:a!123456789@192.168.2.83:554/media1/video2/video"}},"mxpi_rtspsrc10": {"factory": "mxpi_rtspsrc","next": "mxpi_videodecoder10","props": {"channelId": "10","rtspUrl": "rtsp://admin:a!123456789@192.168.2.93:554/media1/video2/video"}},"mxpi_rtspsrc14": {"factory": "mxpi_rtspsrc","next": "mxpi_videodecoder14","props": {"channelId": "14","rtspUrl": "rtsp://admin:a!123456789@192.168.2.90:554/media1/video2/video"}},"mxpi_rtspsrc15": {"factory": "mxpi_rtspsrc","next": "mxpi_videodecoder15","props": {"channelId": "15","rtspUrl": "rtsp://admin:a!123456789@192.168.2.187:554/media1/video2/video"}},"mxpi_rtspsrc2": {"factory": "mxpi_rtspsrc","next": "mxpi_videodecoder2","props": {"channelId": "2","rtspUrl": "rtsp://admin:a!123456789@192.168.2.82:554/media1/video2/video"}},"mxpi_rtspsrc21": {"factory": "mxpi_rtspsrc","next": "mxpi_videodecoder21","props": {"channelId": "21","rtspUrl": "rtsp://admin:a!123456789@192.168.2.225:554/media1/video2/video"}},"mxpi_rtspsrc22": {"factory": "mxpi_rtspsrc","next": "mxpi_videodecoder22","props": {"channelId": "22","rtspUrl": "rtsp://admin:a!123456789@192.168.2.95:554/media1/video2/video"}},"mxpi_rtspsrc24": {"factory": "mxpi_rtspsrc","next": "mxpi_videodecoder24","props": {"channelId": "24","rtspUrl": "rtsp://admin:a!123456789@192.168.1.105:554/media1/video2/video"}},"mxpi_rtspsrc3": {"factory": "mxpi_rtspsrc","next": "mxpi_videodecoder3","props": {"channelId": "3","rtspUrl": "rtsp://admin:a!123456789@192.168.1.102:554/media1/video2/video"}},"mxpi_rtspsrc4": {"factory": "mxpi_rtspsrc","next": "mxpi_videodecoder4","props": {"channelId": "4","rtspUrl": "rtsp://admin:a!123456789@192.168.1.101:554/media1/video2/video"}},"mxpi_rtspsrc5": {"factory": "mxpi_rtspsrc","next": "mxpi_videodecoder5","props": {"channelId": "5","rtspUrl": "rtsp://admin:a!123456789@192.168.1.100:554/media1/video2/video"}},"mxpi_rtspsrc9": {"factory": "mxpi_rtspsrc","next": "mxpi_videodecoder9","props": {"channelId": "9","rtspUrl": "rtsp://admin:a!123456789@192.168.1.100:554/media1/video2/video"}},"mxpi_videodecoder0": {"factory": "mxpi_videodecoder","next": "mxpi_parallel2serial0:10","props": {"deviceId": "0","skipFrame": "30","inputVideoFormat": "H264","outputImageFormat": "YUV420SP_NV12","vdecChannelId": "0"}},"mxpi_videodecoder1": {"factory": "mxpi_videodecoder","next": "mxpi_parallel2serial0:0","props": {"deviceId": "0","skipFrame": "30","inputVideoFormat": "H264","outputImageFormat": "YUV420SP_NV12","vdecChannelId": "1"}},"mxpi_videodecoder10": {"factory": "mxpi_videodecoder","next": "mxpi_parallel2serial0:4","props": {"deviceId": "0","skipFrame": "30","inputVideoFormat": "H264","outputImageFormat": "YUV420SP_NV12","vdecChannelId": "10"}},"mxpi_videodecoder14": {"factory": "mxpi_videodecoder","next": "mxpi_parallel2serial0:2","props": {"deviceId": "0","skipFrame": "30","inputVideoFormat": "H264","outputImageFormat": "YUV420SP_NV12","vdecChannelId": "14"}},"mxpi_videodecoder15": {"factory": "mxpi_videodecoder","next": "mxpi_parallel2serial0:5","props": {"deviceId": "0","skipFrame": "30","inputVideoFormat": "H264","outputImageFormat": "YUV420SP_NV12","vdecChannelId": "15"}},"mxpi_videodecoder2": {"factory": "mxpi_videodecoder","next": "mxpi_parallel2serial0:12","props": {"deviceId": "0","skipFrame": "30","inputVideoFormat": "H264","outputImageFormat": "YUV420SP_NV12","vdecChannelId": "2"}},"mxpi_videodecoder21": {"factory": "mxpi_videodecoder","next": "mxpi_parallel2serial0:9","props": {"deviceId": "0","skipFrame": "30","inputVideoFormat": "H264","outputImageFormat": "YUV420SP_NV12","vdecChannelId": "21"}},"mxpi_videodecoder22": {"factory": "mxpi_videodecoder","next": "mxpi_parallel2serial0:1","props": {"deviceId": "0","skipFrame": "30","inputVideoFormat": "H264","outputImageFormat": "YUV420SP_NV12","vdecChannelId": "22"}},"mxpi_videodecoder24": {"factory": "mxpi_videodecoder","next": "mxpi_parallel2serial0:7","props": {"deviceId": "0","skipFrame": "30","inputVideoFormat": "H264","outputImageFormat": "YUV420SP_NV12","vdecChannelId": "24"}},"mxpi_videodecoder3": {"factory": "mxpi_videodecoder","next": "mxpi_parallel2serial0:3","props": {"deviceId": "0","skipFrame": "30","inputVideoFormat": "H264","outputImageFormat": "YUV420SP_NV12","vdecChannelId": "3"}},"mxpi_videodecoder4": {"factory": "mxpi_videodecoder","next": "mxpi_parallel2serial0:6","props": {"deviceId": "0","skipFrame": "30","inputVideoFormat": "H264","outputImageFormat": "YUV420SP_NV12","vdecChannelId": "4"}},"mxpi_videodecoder5": {"factory": "mxpi_videodecoder","next": "mxpi_parallel2serial0:8","props": {"deviceId": "0","skipFrame": "30","inputVideoFormat": "H264","outputImageFormat": "YUV420SP_NV12","vdecChannelId": "5"}},"mxpi_videodecoder9": {"factory": "mxpi_videodecoder","next": "mxpi_parallel2serial0:11","props": {"deviceId": "0","skipFrame": "30","inputVideoFormat": "H264","outputImageFormat": "YUV420SP_NV12","vdecChannelId": "9"}},"stream_config": {"deviceId": "0"}}
}
2.配置信息:
模型训练om模型是华为昇腾AI处理器支持的离线推理模型格式,因此我们使用om模型和对应的模型文件配置。
3.加载模型启动算法检测
启动算法脚本python编写:
import os
import cv2
import redis
import json
import time
import base64
import struct
import numpy as npfrom StreamManagerApi import StreamManagerApi, MxDataInput, StringVector
import MxpiDataType_pb2 as MxpiDataType
# redis
r = redis.StrictRedis(host='localhost', port=6379)
# The following belongs to the SDK Process
streamManagerApi = StreamManagerApi()
# init stream manager
ret = streamManagerApi.InitManager()if ret != 0:print("Failed to init Stream manager, ret=%s" % str(ret))exit()# create streams by pipeline config file
# load pipline
cur_dir = os.path.dirname(os.path.abspath(__file__))
print(cur_dir)
cur_dir = cur_dir + "/pipeline/video.pipeline"
print(cur_dir)
with open(cur_dir, 'rb') as f:pipelineStr = f.read()
ret = streamManagerApi.CreateMultipleStreams(pipelineStr)
# Print error message
if ret != 0:print("Failed to create Stream, ret=%s" % str(ret))# Stream name
streamName = b'detection'
# Obtain the inference result by specifying streamName and keyVec
# The data that needs to be obtained is searched by the plug-in name
keys = [b"ReservedFrameInfo", b"mxpi_modelinfer0", b"mxpi_parallel2serial0"]
keyVec = StringVector()
for key in keys:keyVec.push_back(key)while True:# Get data through GetResultinfer_result = streamManagerApi.GetResult(streamName, b'appsink0', keyVec)# Determine whether the output is emptyif infer_result.metadataVec.size() == 0:print("infer_result is null")# continue# Frame information structureframeList = MxpiDataType.MxpiFrameInfo()frameList.ParseFromString(infer_result.metadataVec[0].serializedMetadata)# Objectpostprocessor informationobjectList = MxpiDataType.MxpiObjectList()objectList.ParseFromString(infer_result.metadataVec[1].serializedMetadata)# Videodecoder informationvisionList = MxpiDataType.MxpiVisionList()visionList.ParseFromString(infer_result.metadataVec[2].serializedMetadata)vision_data = visionList.visionVec[0].visionData.dataStrvisionInfo = visionList.visionVec[0].visionInfo# # cv2 func YUV to BGR# YUV_BYTES_NU = 3# YUV_BYTES_DE = 2# img_yuv = np.frombuffer(vision_data, np.uint8)# # reshape# img_bgr = img_yuv.reshape(visionInfo.heightAligned * YUV_BYTES_NU // YUV_BYTES_DE, visionInfo.widthAligned)# # Color gamut conversion# img = cv2.cvtColor(img_bgr, getattr(cv2, "COLOR_YUV2BGR_NV12"))Id = frameList.frameIdbboxes = []# fire or not# if len(objectList.objectVec) == 0:# continuejson_str = '{' + '"StreamID":"{}","list":['.format(frameList.channelId)# json_str = '{' + '"StreamID":"1679044571","list":['print(len(objectList.objectVec))list = ""for i in range(len(objectList.objectVec)):# get ObjectListresults = objectList.objectVec[i]bboxes = {'x0': int(results.x0),'x1': int(results.x1),'y0': int(results.y0),'y1': int(results.y1),'confidence': round(results.classVec[0].confidence, 4),'text': results.classVec[0].className}text = "{}{}".format(str(bboxes['confidence']), " ")list = list + '{' + '"x1":{},"y1":{},"x2":{},"y2":{},"data":"NULL","class":"{}","confidence":{},"track_id":-1'.format(bboxes['x0'], bboxes['y0'], bboxes['x1'], bboxes['y1'], bboxes['text'], bboxes['confidence']) + '}'if(i < len(objectList.objectVec) - 1):list = list + ','# Draw rectangle# for item in bboxes['text']:# text += item# cv2.putText(img, text, (bboxes['x0'] + 10, bboxes['y0'] + 10), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 2)# cv2.rectangle(img, (bboxes['x0'], bboxes['y0']), (bboxes['x1'], bboxes['y1']), (0, 0, 255), 4)# send_json_time = int(time.time())send_json_time = int(time.time() * 1000)if len(objectList.objectVec) == 0:json_str = json_str + ']}'else:json_str = json_str + list + ']}'list_buf = '{' + '"Long_j":{},"width":{},"height":{},"timestamp":{},"json":{},"long_p":{}'.format(len(json_str), visionInfo.width, visionInfo.height, send_json_time, json_str, len(vision_data)) + '}'# 计算list_buf不包含图片的大小,(大小头)小头保存长度len_little = len(list_buf).to_bytes(4, byteorder='little', signed=True)#转成字节list_buf = bytes(list_buf,'UTF-8')# 拼接图片内容print(list_buf)print("\n")list_buf = len_little + list_buf + vision_datar.publish("mychannel",list_buf)# # save picture# Id = frameList.frameId# result_path = "./result/"# if os.path.exists(result_path) != 1:# os.makedirs("./result/")# oringe_imgfile = './result/image' + '-' + str(Id) + '.jpg'# print("Warning! Fire or smoke detected")# print("Result save in ",oringe_imgfile)# cv2.imwrite(oringe_imgfile, img)# Destroy All Streams
streamManagerApi.DestroyAllStreams()
执行算法检测:
服务自启和崩溃拉起
崩溃拉起和服务自启是现代软件系统中不可或缺的机制,通过自动监控和恢复,可以显著提高系统的可用性和稳定性。服务在运行过程中可能会存在各种各样的场景,比如异常断电,断网,硬件故障等,崩溃拉起和服务自启可以确保应用程序高可用性和稳定性。
关于高可用,容灾恢复问题,我曾经在CSDN里的这篇文章里做过方案,大家可以通过阅读这篇文章来寻找答案:
变形记---容灾恢复(一),异常崩溃引发服务器丢档或无法正常运行-CSDN博客
这里我只说明AIEC的服务自启和崩溃拉起的机制,因为Platform和算法AIInterface的相对较为简单,完全是靠操作系统的服务管理配置项或者由AIEC来控制的。
AIEC自启动机制
aiec承接着非常重要的算法资源调度工作,当aiec服务发生崩溃后,服务如果不再正常运行,则会给煤矿生产带来非常重大的安全隐患和损失,我们使用两种方式来确保aiec边缘计算的可用性:
- 通过Platform平台可定时检测AIEC的心跳来监控其运行状态,如果进程一旦崩溃则触发重启操作。
- aiec自身有服务自启机制,通过借助linux的systemd或者init.d的服务器管理配置项来设置启动策略。
关于aiec的自启动机制,我提供了以下自启动脚本:
#!/bin/bash## 创建systemctl服务配置文件
## $1 服务名称,
## $2 pwd
## $3程序位置
## $4 程序文件
## $5名字
## $6 after
## $7用户
function createService() {{echo "[Unit]"echo "Description=$5"#echo "After=$6"echo "Requires=aiec.service mysql.service"echo "Wants=aiec.service mysql.service"echo ""echo "[Service]"echo "User=$7"echo "Restart=always"echo "Group=$7"echo "Type=forking"echo "ExecStart=/bin/bash $2/$3/$4 start"echo "ExecStop=/bin/bash $2/$3/$4 stop"echo "ExecReload=/bin/bash $2/$3/$4 restart"echo ""echo "[Install]"echo "WantedBy=multi-user.target"} >"$1"
}## 创建systemctl服务
function installService() {sudo systemctl daemon-reloadsystemctl enable "$1"systemctl start "$1"
}## 创建服务启动、关闭、重启命令脚本
function createCommandScript() {# 脚本文件名scriptName="$1"# 服务目录serverPath="$(pwd)/$2"# 服务名称serverName="$3"# 服务启动命令serverStartCommand="$4"# systemctl服务名称systemctlName="$5"{echo "#!/bin/bash"echo ""echo "case \$1 in"echo "start)"echo " cd $serverPath && $serverStartCommand"echo " ;;"echo "stop)"echo " if [[ -n \$(pgrep \"$serverName\") ]]; then"echo " pgrep $serverName | sudo xargs kill -9"echo " fi"echo " ;;"echo "restart)"echo " if [[ -n \$(pgrep \"$serverName\") ]]; then"echo " pgrep $serverName | sudo xargs kill -9"echo " fi"echo " cd $serverPath && $serverStartCommand"echo " ;;"echo "reload)"echo " systemctl restart $systemctlName"echo " ;;"echo "esac"} >"$scriptName"chmod 777 "$scriptName"
}## 创建对应服务启动、关闭、重启命令脚本# aiec启动脚本
createCommandScript "./aiec.sh" "" "aiec" "nohup ./aiec >/dev/null &" "aiec.service"## 创建aiec的服务
createService "/etc/systemd/system/aiec.service" "$(pwd)" "" "aiec.sh" "aiec.service" "redis.service" "root"## 设置对应服务自启动
installService "aiec.service"
执行shell脚本后会自动创建一个aiec服务,并启动aiec服务。
AIEC崩溃拉起机制
当一个服务发生崩溃而自启动,如果没有一个安全的崩溃拉起机制,可能会导致多种严重问题:
-
未保存的数据丢失:服务崩溃时,任何未保存的重要的检测结果或临时数据都可能会丢失,导致AIEC自动后依赖的数据找不到,算法分析出现异常。
-
影响上下文分析:如果服务器启动后部分算法的分析需要借助上下文的临时数据来分析这些告警数据,如果崩溃之前的临时数据丢失就很有可能造成数据状态不一致,影响检测告警结果。
这里我给大家提供几个我专栏里的关于崩溃和缓存的文章:
变形记---容灾恢复(一),异常崩溃引发服务器丢档或无法正常运行-CSDN博客
使用缓存策略优化系统性能:综合指南
掌握服务器缓存策略
我们会把每次关键的检测结果缓存到Redis中,当崩溃拉起的时候,就能获取到崩溃前的记录,这样就能根据上下文继续进行算法推理分析了。
后记
我们团队每个人的每一小步都能为人工智能行业和煤矿领域带来积极的进步。即使这些步子微小,缓慢,但我相信,总会有人在默默推动着这一进程向前发展。
每一个小的努力和尝试,都是为未来的更大突破奠定基础。正是这些看似微不足道的进展,汇聚起来才能形成推动整个行业前行的强大动力。无论前路多么艰难,我都愿意坚定地迈出这一步,为实现更高效、更安全的煤矿作业和更智能的AI应用贡献自己的力量。
最后,我相信人工智能不仅能够推动煤矿安全检测的发展,也能促进越来越多行业的高效安全生产,助力其在各个领域扎根生长。