序言:
最近项目中用到视频ai分析,由于sdk涉及保密,不便透露,仅对定时任务分析的思路作出分享,仅供参考。
1、定时任务
由于ai服务器的性能上限,只能同时对64个rtsp流分析一种算法,或者对8个rtsp流分析8种算法。因此定时任务,做如下设计。
AiHandlerTask.java
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.ewaycloud.jw.ai.service.AiService;
import com.ewaycloud.jw.camera.entity.Camera;
import com.ewaycloud.jw.camera.mapper.CameraMapper;
import com.ewaycloud.jw.camera.service.CameraService;
import com.ewaycloud.jw.cases.dto.CaseDTO;
import com.ewaycloud.jw.cases.service.CaseService;
import com.ewaycloud.jw.channel.service.HikService;
import com.ewaycloud.jw.task.entity.Task;
import com.ewaycloud.jw.task.mapper.TaskMapper;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Date;
import java.util.List;/*** AI分析 定时任务 处理类** @author gwh* @date 2024-04-14 13:59:17*/
@Component
@EnableScheduling
public class AiHandlerTask {@ResourceAiService aiService;@ResourceTaskService taskService;@ResourceCameraService cameraService;@Resourceprivate TaskMapper taskMapper;@Resourceprivate CameraMapper cameraMapper;@Resourceprivate HikService hkService;@Resourceprivate CaseService caseService;/*** 注解中的Cron表达式: {秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)}* 注意:日和周其中的一个必须为"?"* 10/5 20 10 * * ? 每天10点20分第10秒以后,每3秒执行一次,到10点21分就不会执行了** AI算法分析任务: 每5秒执行一次*/
// @Scheduled(cron = "0/5 * * * * ?")public void startTask(){// System.out.println("AI分析定时任务执行 每隔5秒执行一次:" + new Date());//查询要执行的任务List<Task> aiTasks = taskMapper.findAiTasks(null);if (null != aiTasks) {for(Task vo:aiTasks){if (null != vo.getDeptId()) {//查询某谈话室下边的摄像头列表(flag是1 谈话人特写 和2 被谈话人特写 的)List<Camera> cameraList = cameraMapper.findCamersByDeptId(vo.getDeptId());if (null != cameraList && cameraList.size()>0) {for(Camera camera:cameraList) {//根据摄像头编码cameraCode,调用海康接口拉流String cameraCode = camera.getCameraCode();try {//根据cameraCode、开始时间、结束时间 调用海康接口 拉回放流//查询时间(IOS8601格式yyyy-MM-dd'T'HH:mm:ss.SSSzzz,和结束时间相差不超过三天JSONObject data = hkService.playbackURLs( cameraCode, vo.getStartTime(), vo.getEndTime());//谈话人特写AI分析if (null != data && null != data.getString("url")) {String rtspUrl = data.getString("url");//疑似肢体冲突// startAiTask(rtspUrl, 1L, vo.getStartTime(), vo.getEndTime(), vo);//玩手机分析// startAiTask(rtspUrl, 2L, vo.getStartTime(), vo.getEndTime(), vo);//倒地分析// startAiTask(rtspUrl, 3L, vo.getStartTime(), vo.getEndTime(), vo);//人数异常startAiTask(rtspUrl, 5L, vo.getStartTime(), vo.getEndTime(), vo);}} catch (Exception e) {e.printStackTrace();}}}}}}// System.out.println("AI分析定时任务执行 每隔10秒执行一次:: " + new Date());}//执行拉流调用AI分析的方法public void startAiTask(String rtspUrl, Long aiId, String startTime, String endTime, Task vo) {//调用AI分析接口if (null != rtspUrl) {//调用海康AI算法分析String aiResponse = "";if (aiId == 1) {//疑似肢体冲突aiResponse = aiService.indoorPhysicalConfront(rtspUrl, startTime, endTime);vo.setBreakName("疑似肢体冲突");vo.setAiId(1L);} else if (aiId == 2) {//玩手机aiResponse = aiService.playCellphone(rtspUrl, startTime, endTime);vo.setBreakName("玩手机");vo.setAiId(2L);} else if (aiId == 3) {//倒地aiResponse = aiService.failDown(rtspUrl, startTime, endTime);vo.setBreakName("倒地");vo.setAiId(3L);} else if (aiId == 4) {//人员站立aiResponse = aiService.Standup(rtspUrl,startTime, endTime);vo.setBreakName("人员站立");vo.setAiId(4L);} else if (aiId == 5) {//人数异常aiResponse = aiService.PeopleNumChange(rtspUrl, startTime, endTime);vo.setBreakName("人数异常");vo.setAiId(5L);} else if (aiId == 6) {//声强突变aiResponse = aiService.audioAbnormal(rtspUrl, startTime, endTime);vo.setBreakName("声强突变");vo.setAiId(6L);} else if (aiId == 7) {//超时滞留aiResponse = aiService.overtimeTarry(rtspUrl, startTime, endTime);vo.setBreakName("超时滞留");vo.setAiId(7L);} else if (aiId == 8) {//攀高aiResponse = aiService.reachHeight(rtspUrl, startTime, endTime);vo.setBreakName("攀高");vo.setAiId(8L);}JSONObject aiResponseJSONObject = JSON.parseObject(aiResponse);
// System.out.println("AI分析定时任务返回aiResponseJSONObject:" + aiResponseJSONObject);String taskId = "";String taskStatus = "";if (null != aiResponseJSONObject && null != aiResponseJSONObject.getString("taskID") ){taskId = aiResponseJSONObject.getString("taskID");//调用海康查询任务状态接口获取AI分析任务状态String result = aiService.queryTaskVideoStatus(taskId);JSONObject resultJSONObject = JSON.parseObject(result);JSONArray statusJSONArray = resultJSONObject.getJSONArray("status");JSONObject statusJSONObject = (JSONObject) statusJSONArray.get(0);taskStatus = statusJSONObject.getString("taskStatus");//将AI分析结果taskStatus插入task表中,更新任务表,状态:1 未执行, 2等待, 3 正在执行 , 4 已完成vo.setTaskState(Integer.parseInt(taskStatus));vo.setTaskId(taskId); //保存 海康返回的 taskID//如果任务完成,关闭rtsp流if ("4".equals(taskStatus)) {//根据caseId更新案件表的 task_state =1 , ai任务状态(0:未执行 1:已执行)Long caseId = vo.getCaseId();CaseDTO caseDTO = new CaseDTO();caseDTO.setCaseId(caseId);caseDTO.setCaseState(1);caseService.updCaseInfo(caseDTO);//关闭rtsp流try {hkService.clearPlayUrls(rtspUrl);} catch (Exception e) {e.printStackTrace();}}}System.out.println("AI分析定时任务返回 taskId:" + taskId +" breakName: "+ vo.getBreakName() +" taskStatus: "+ taskStatus);//更新任务表, 根据caseId 和taskId查询任务,如果有更新,没有插入Task dto = new Task();dto.setCaseId(vo.getCaseId());dto.setTaskId(vo.getTaskId());List<Task> tasks = taskMapper.findTasks(dto);if(null != tasks && tasks.size()>0){for(Task po : tasks){vo.setId(po.getId());vo.setUpdateTime(new Date());taskService.updateById(po);}}else {vo.setCreateTime(new Date());vo.setUpdateTime(new Date());taskMapper.insert(vo);}}}}
2、算法实现,由于涉密,只贴出接口
AiService.java
import com.baomidou.mybatisplus.extension.service.IService;
import com.ewaycloud.jw.ai.entity.Ai;
import com.ewaycloud.jw.task.entity.Task;import java.util.List;/*** AI对接** @author gwh* @date 2024-03-13 13:49:09*/
public interface AiService extends IService<Ai> {String getAiDeviceInfo();/*** 创建--疑似肢体冲突事件--分析视频分析任务**/String indoorPhysicalConfront(String streamUrl, String startTime, String endTime);/*** 创建--玩手机--分析视频分析任务**/String playCellphone(String streamUrl, String startTime, String endTime);/*** 创建--倒地检测--分析视频分析任务**/String failDown(String streamUrl, String startTime, String endTime);/*** 创建--人员站立--分析视频分析任务**/String Standup(String streamUrl, String startTime, String endTime);/*** 创建--人数异常--分析视频分析任务**/String PeopleNumChange(String streamUrl, String startTime, String endTime);/*** 创建--声强突变--分析视频分析任务**/String audioAbnormal(String streamUrl, String startTime, String endTime);/*** 创建--超时滞留--分析视频分析任务**/String overtimeTarry(String streamUrl, String startTime, String endTime);/*** 创建--攀高--分析视频分析任务**/String reachHeight(String streamUrl, String startTime, String endTime);/*** 查询分析视频分析任务状态**/String queryTaskVideoStatus(String taskId);}
3、启动一个线程,Socket监听10006端口,接收ai服务器返回的结果
ListenThread.java
import com.ewaycloud.jw.ai.entity.AiResolveResult;
import com.ewaycloud.jw.ai.mapper.AiResolveResultMapper;
import com.ewaycloud.jw.task.entity.ContentTypeEnum;
import com.ewaycloud.jw.task.entity.Task;
import com.ewaycloud.jw.task.mapper.TaskMapper;
import com.mysql.cj.util.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Date;
import java.util.List;/*** @author gwhui* @date 2024/1/18 17:38* @desc 监听处理线程*/
@Slf4j
@Service
public class ListenThread implements Runnable {private final AlarmDataParser alarmDataParser = new AlarmDataParser();private static TaskMapper taskMapper;@Resourcepublic void setVerificDao(TaskMapper taskMapper) {ListenThread.taskMapper = taskMapper;}private static AiResolveResultMapper aiResolveResultMapper;@Resourcepublic void setVerificDao(AiResolveResultMapper aiResolveResultMapper) {ListenThread.aiResolveResultMapper = aiResolveResultMapper;}@Overridepublic void run() {
// int listenPort = propertiesUtil.getIntegerProperty("custom.isapi.listen.port", 9999);int listenPort =10006;try {ServerSocket serverSocket = new ServerSocket(listenPort);System.out.println("启动监听, 监听端口:" + listenPort);while (!Thread.currentThread().isInterrupted()) {Socket accept = serverSocket.accept();accept.setKeepAlive(true);System.out.println("设备(客户端)信息:" + accept.getInetAddress().getHostAddress());if (accept.isConnected()) {handleData(accept);}accept.close();}serverSocket.close();System.out.println("停止监听完成");} catch (InterruptedException e) {// 线程被中断的处理逻辑System.out.println("停止监听完成: " + e.getMessage());} catch (Exception e) {System.out.println("监听创建异常: " + e.getMessage());}}@Transactional(rollbackFor = Exception.class)public synchronized void handleData(Socket accept) throws Exception {InputStream inputData = accept.getInputStream();OutputStream outputData = accept.getOutputStream();// 输出数据ByteArrayOutputStream byOutputData = new ByteArrayOutputStream();byte[] buffer = new byte[2 * 1024 * 1024];int length = 0;// 持续接收处理数据直到接收完毕String recvAlarmData = "";while ((length = inputData.read(buffer)) > 0) {byOutputData.write(buffer, 0, length);String recvData = byOutputData.toString();recvAlarmData = recvAlarmData + recvData;// 获取boundaryString strBoundary = "boundary=";int beginIndex = recvData.indexOf(strBoundary);beginIndex += strBoundary.length();int lenIndex = recvData.indexOf("\r\n", beginIndex);String strBoundaryMark = recvData.substring(beginIndex, lenIndex);if (recvAlarmData.contains("--" + strBoundaryMark.trim() + "--")) {//表单结束符判断接收结束break;}}
// System.out.println("==============recvAlarmData========>> "+recvAlarmData);if(null != recvAlarmData){String taskId = null;int index = recvAlarmData.indexOf("<taskID>");if(index != -1){taskId = recvAlarmData.substring(index + 8, index + 40);}//获取服务器返回的图片String bkgUrl = null;int indexStartBkgUrl = recvAlarmData.indexOf("<bkgUrl>");int indexEndBkgUrl = recvAlarmData.indexOf("</bkgUrl>");if(indexStartBkgUrl != -1){bkgUrl = recvAlarmData.substring(indexStartBkgUrl+8, indexEndBkgUrl);bkgUrl =bkgUrl.replaceAll("&","&");}System.out.println("===AIrecieveData===>>taskId: "+taskId +" bkgUrl: "+ bkgUrl);//根据taskId查询 任务信息if(!StringUtils.isNullOrEmpty(taskId)){Task task = taskMapper.finTaskByTaskId(taskId);if(null != task){AiResolveResult vo = new AiResolveResult();vo.setCreateTime(new Date());vo.setUpdateTime(new Date());vo.setTaskId(taskId); //保存海康返回的 taskIdvo.setBreakName(task.getBreakName());vo.setAiId(task.getAiId());vo.setDeptId(task.getDeptId());vo.setCameraId(task.getCameraId());vo.setBreakTypeId(task.getAiId());vo.setRiskTime(task.getTalkTime());vo.setTalkAddress(task.getTalkAddress());vo.setTalkAddressName(task.getTalkAddressName());vo.setTalkUnit(task.getTalkUnit());vo.setTalkUnitName(task.getTalkUnitName());vo.setPhoto(bkgUrl); //保存海康返回的图片vo.setCaseId(task.getCaseId());vo.setCaseName(task.getCaseName());vo.setInterviewerName(task.getInterviewerName());//根据taskId查询任务结果表,如果有做更新操作,没有做插入操作List<AiResolveResult> aiResolveResults = aiResolveResultMapper.findAiResults(vo);if(null != aiResolveResults && aiResolveResults.size()>0){for(AiResolveResult aiResolveResult:aiResolveResults){if(null != aiResolveResult){aiResolveResult.setPhoto(vo.getPhoto());aiResolveResultMapper.updateById(aiResolveResult);}}}else {aiResolveResultMapper.insert(vo);}}}}String response = "HTTP/1.1 200 OK" +"\r\n" +"Connection: close" +"\r\n\r\n";outputData.write(response.getBytes());outputData.flush();outputData.close();inputData.close();//解析数据response = parseAlarmInfoByte(byOutputData);System.out.println("==============response========>> "+response);}private String parseAlarmInfoByte(ByteArrayOutputStream byOutputData) throws Exception {// 事件报文字节byte[] byAlarmDataInfo = byOutputData.toByteArray();int iDataLen = byAlarmDataInfo.length;String szBoundaryMark = "boundary=";String szContentTypeMark = "Content-Type: ";int iTypeMarkLen = szContentTypeMark.getBytes("UTF-8").length;String szContentLenMark = "Content-Length: ";int iLenMarkLen = szContentLenMark.getBytes("UTF-8").length;String szContentLenMark2 = "content-length: ";int iLenMarkLen2 = szContentLenMark2.getBytes("UTF-8").length;int iContentLen = 0;String szEndMark = "\r\n";int iMarkLen = szEndMark.getBytes("UTF-8").length;String szEndMark2 = "\r\n\r\n";int iMarkLen2 = szEndMark2.getBytes("UTF-8").length;String szJson = "text/json";String szJpg = "image/jpeg";int iStartBoundary = doDataSearch(byAlarmDataInfo, szBoundaryMark.getBytes("UTF-8"), 0, byAlarmDataInfo.length);iStartBoundary += szBoundaryMark.getBytes("UTF-8").length;int iEndBoundary = doDataSearch(byAlarmDataInfo, szEndMark.getBytes("UTF-8"), iStartBoundary, byAlarmDataInfo.length);byte[] byBoundary = new byte[iEndBoundary - iStartBoundary];System.arraycopy(byAlarmDataInfo, iStartBoundary, byBoundary, 0, iEndBoundary - iStartBoundary);String szBoundaryEndMark = "--" + new String(byBoundary).trim() + "--";int iDateEnd = doDataSearch(byAlarmDataInfo, szBoundaryEndMark.getBytes("UTF-8"), 0, byAlarmDataInfo.length);String szBoundaryMidMark = "--" + new String(byBoundary).trim();int iBoundaryMidLen = szBoundaryMidMark.getBytes("UTF-8").length;int startIndex = iEndBoundary;String szContentType = "";int[] iBoundaryPos = new int[11]; //boundary个数,这里最大解析10个int iBoundaryNum = 0;for (iBoundaryNum = 0; iBoundaryNum < 10; iBoundaryNum++) {startIndex = doDataSearch(byAlarmDataInfo, szBoundaryMidMark.getBytes("UTF-8"), startIndex, iDateEnd);if (startIndex < 0) {break;}startIndex += iBoundaryMidLen;iBoundaryPos[iBoundaryNum] = startIndex;}iBoundaryPos[iBoundaryNum] = iDateEnd;//最后一个是结束符for (int i = 0; i < iBoundaryNum; i++) {// Content-Typeint iStartType = doDataSearch(byAlarmDataInfo, szContentTypeMark.getBytes("UTF-8"), iBoundaryPos[i], iBoundaryPos[i + 1]);if (iStartType > 0) {iStartType += iTypeMarkLen;int iEndType = doDataSearch(byAlarmDataInfo, szEndMark.getBytes("UTF-8"), iStartType, iBoundaryPos[i + 1]);if (iEndType > 0) {byte[] byType = new byte[iEndType - iStartType];System.arraycopy(byAlarmDataInfo, iStartType, byType, 0, iEndType - iStartType);szContentType = new String(byType).trim();}}// Content-Lengthint iStartLength = doDataSearch(byAlarmDataInfo, szContentLenMark.getBytes("UTF-8"), iBoundaryPos[i], iBoundaryPos[i + 1]);if (iStartLength > 0) {iStartLength += iLenMarkLen;int iEndLength = doDataSearch(byAlarmDataInfo, szEndMark.getBytes("UTF-8"), iStartLength, iBoundaryPos[i + 1]);if (iEndLength > 0) {byte[] byLength = new byte[iEndLength - iStartLength];System.arraycopy(byAlarmDataInfo, iStartLength, byLength, 0, iEndLength - iStartLength);iContentLen = Integer.parseInt(new String(byLength).trim());}}// Content-Length(兼容错误大小写)int iStartLength2 = doDataSearch(byAlarmDataInfo, szContentLenMark2.getBytes("UTF-8"), iBoundaryPos[i], iBoundaryPos[i + 1]);if (iStartLength2 > 0) {iStartLength2 += iLenMarkLen2;int iEndLength2 = doDataSearch(byAlarmDataInfo, szEndMark.getBytes("UTF-8"), iStartLength2, iBoundaryPos[i + 1]);if (iEndLength2 > 0) {byte[] byLength2 = new byte[iEndLength2 - iStartLength2];System.arraycopy(byAlarmDataInfo, iStartLength2, byLength2, 0, iEndLength2 - iStartLength2);iContentLen = Integer.parseInt(new String(byLength2).trim());}}// 通过\r\n\r\n判断报文数据起始位置int iStartData = doDataSearch(byAlarmDataInfo, szEndMark2.getBytes("UTF-8"), iBoundaryPos[i], iBoundaryPos[i + 1]);if (iStartData > 0) {iStartData += iMarkLen2;// 有的报文可能没有Content-Lengthif (iContentLen <= 0) {iContentLen = iBoundaryPos[i + 1] - iStartData;}// 截取数据内容byte[] byData = new byte[iContentLen];System.arraycopy(byAlarmDataInfo, iStartData, byData, 0, iContentLen);// 根据类型处理数据int contentType = ContentTypeEnum.getEventType(szContentType);String storeFolder = System.getProperty("user.dir") + "\\output\\listen\\event\\";switch (contentType) {case ContentTypeEnum.APPLICATION_JSON:case ContentTypeEnum.APPLICATION_XML: {String rawContent = new String(byData).trim();alarmDataParser.parseAlarmInfo(contentType, storeFolder, rawContent, null);break;}case ContentTypeEnum.IMAGE_JPEG:case ContentTypeEnum.IMAGE_PNG:case ContentTypeEnum.VIDEO_MPG:case ContentTypeEnum.VIDEO_MPEG4:case ContentTypeEnum.APPLICATION_ZIP: {alarmDataParser.parseAlarmInfo(contentType, storeFolder, null, byData);break;}default: {System.out.println("未匹配到可以解析的content-type, 请自行补全处理!");}}}}// 响应报文String response = "";// 消费交易事件 (实际如果没有消费机设备可以不需要消费机的处理代码)String eventType = "";String eventConfirm = "";if (eventType.equals("ConsumptionEvent") || eventType.equals("TransactionRecordEvent") || eventType.equals("HealthInfoSyncQuery")) {response = "HTTP/1.1 200 OK" +"\r\n" +"Content-Type: application/json; charset=\"UTF-8\"" +"\r\n" +"Content-Length: " + eventConfirm.length() +"\r\n\r\n" + eventConfirm +"\r\n";} else {response = "HTTP/1.1 200 OK" +"\r\n" +"Connection: close" +"\r\n\r\n";}return response;}private int doDataSearch(byte[] bySrcData, byte[] keyData, int startIndex, int endIndex) {if (bySrcData == null || keyData == null || bySrcData.length <= startIndex || bySrcData.length < keyData.length) {return -1;}if (endIndex > bySrcData.length) {endIndex = bySrcData.length;}int iPos, jIndex;for (iPos = startIndex; iPos < endIndex; iPos++) {if (bySrcData.length < keyData.length + iPos) {break;}for (jIndex = 0; jIndex < keyData.length; jIndex++) {if (bySrcData[iPos + jIndex] != keyData[jIndex]) {break;}}if (jIndex == keyData.length) {return iPos;}}return -1;}}
4、数据库设计
瀚高国产数据库