一种基于springboot、redis的分布式任务引擎的实现(一)

 总体思路是,主节点接收到任务请求,将根据任务情况拆分成多个任务块,将任务块标识的主键放入redis。发送redis消息,等待其他节点运行完毕,结束处理。接收到信息的节点注册本节点信息到redis、开启多线程、获取任务块、执行任务、结束处理。

1、主节点接收任务请求

    @Overridepublic void executeTaskInfo(PrepareDTO prepareDTO) {//异常标记String taskInfo = prepareDTO.getTaskId();//任务分组状态String taskStatus = "";try {log.info("数据准备任务并设定任务执行状态,{}", prepareDTO);this.dataPrepareBo.doStartGroupJobInfo(prepareDTO);//给redis集合中放计算对象log.info("开始放入计算任务:{}", prepareDTO);boolean getTaskFlag = this.dataPrepareBo.pushCalculationObject(prepareDTO);if (!getTaskFlag) {taskStatus = String.format("没有获取数据或计划已取消,%s", taskInfo);log.error(taskStatus);throw new Exception(taskStatus);}//发消息执行缓存中任务log.info("发消息执行任务:{}", prepareDTO);sendMessage(prepareDTO);//等待任务执行完毕log.info("等待任务执行结果");taskStatus = this.getGroupUpLoadTaskFinsh(prepareDTO);} catch (Exception e) {//捕获日志e.printStackTrace();taskStatus = "获取任务状态异常" + e;log.info(taskStatus);dataPrepareBo.putExceptionMsg2Cache(taskInfo, "数据准备分发计算任务线程异常:" + taskStatus);} finally {//做任务结束处理this.doGroupTaskFinshpPocess(prepareDTO, taskStatus);}}

2,发送消息

    @Overridepublic void sendMessage(String topic, String msg) {this.redisTemplate.convertAndSend(topic, msg);}

3,节点接收任务,并执行

    public void doUpLoadTask(String msg) throws Exception {log.info("开始执行明细任务{}" + msg);String taskId = this.getTaskId(msg);try {Object cancelFlag = this.redisTemplate.opsForValue().get(String.format(EngineConstant.JOB_CANCEL_FLAG, taskId));if(cancelFlag != null && "1".equals(cancelFlag.toString())){log.info("本次任务已取消");return;}//上传本机执行信息到redisthis.cacheBo.initGroupUpLoadTaskStats(taskId,ENGINE_DISTRIBUTION_RUNNING.getKey());//从缓存获取任务,获取任务后启线程执行任务。如果没获取到任务,则本节点任务执行完毕//循环获取任务this.groupTaskProcessBO.doGroupTaskProcess(taskId, null);//处理结束this.cacheBo.finishGroupUpLoadTaskStats(taskId,ENGINE_DISTRIBUTION_RUNNING.getKey());} catch (Exception e) {//记录日志taskUpldExeLogCDTO.setRunStas("-1");String exceptionInfo = this.taskLogUtils.getExceptionInfo(e) ;taskUpldExeLogCDTO.setAbnInfo(exceptionInfo);throw e;} finally {//记录日志taskUpldExeLogCDTO.setEndtime(DateUtil.getCurrentDate());if("-1".equals(taskUpldExeLogCDTO.getRunStas())){//异常结束this.taskLogUtils.sendLogInfo(taskUpldExeLogCDTO,"执行上传任务异常");} else {//正常结束taskUpldExeLogCDTO.setRunStas("1");this.taskLogUtils.sendLogInfo(taskUpldExeLogCDTO,"执行上传任务正常");}}}

4,开启线程执行任务

    @Overridepublic CalculationDTO doGroupTaskProcess(String taskId, TaskUpldExeLogCDTO taskUpldExeLogCDTO) throws Exception {List<Future> futureList = new ArrayList<>();//开始执行明细任务处理ThreadPoolTaskExecutor taskTransferExecutor = ToolUtil.getExecutor("engine-file-tasks-pool-", Math.min(parallelProcessNum,10), 8);ExecutorListHolder.putThreadPool(String.format(GroupConstant.PREPARE_ENGINE_POOL,taskId), taskTransferExecutor.getThreadPoolExecutor());for(int i = 0 ; i < parallelProcessNum ; i++) {DoGroupUpLoadTaskThread doGroupUpLoadTaskThread = new DoGroupUpLoadTaskThread(taskId, redisTemplate, calculationBo, null, null);Future<?> future = taskTransferExecutor.submit(doGroupUpLoadTaskThread);futureList.add(future);}if (!CollectionUtil.isEmpty(futureList)) {futureList.forEach(f -> {try {f.get(GroupTaskProcessBOImpl.maxTime, TimeUnit.SECONDS);} catch (Exception e) {e.printStackTrace();}});}log.info("本节点执行分组任务执行完毕{}", taskId + ":" + GroupConstant.IDENTITY);return null;}

5,线程执行明细

    @Overridepublic ResponseDTO call() throws Exception {//执行任务while(true) {FilterTableUniqueDTO filterTableUniqueDTO = (FilterTableUniqueDTO)this.redisTemplate.opsForList().leftPop(String.format(ENGINE_FILTERTABLEUNIQUE_QUEUE.getKey(), taskId));log.debug("取出任务:" + filterTableUniqueDTO);if(null == filterTableUniqueDTO) {break ;}long lastNum = this.redisTemplate.opsForList().size(String.format(ENGINE_FILTERTABLEUNIQUE_QUEUE.getKey(), taskId));log.info("生成文件剩余任务数量:" + lastNum);
//           处理任务calculationBo.GenerateFile(filterTableUniqueDTO, taskUpldDetlLogCDTO);}return null;}

以上是主要入口总体思路涉及代码,详细实现整理起来涉及内容比较繁多,将在第二部分分享。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/43766.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OpenCV基础知识(5)— 几何变换

前言&#xff1a;Hello大家好&#xff0c;我是小哥谈。OpenCV中的几何变换是指改变图像的几何结构&#xff0c;例如大小、角度和形状等&#xff0c;让图像呈现出缩放、翻转、旋转和透视效果。这些几何变换操作都涉及复杂、精密的计算。OpenCV将这些计算过程都封装成了非常灵活的…

开源了一套基于springboot+vue+uniapp的商城,包含分类、sku、商户管理、分销、会员、适合企业或个人二次开发

RuoYi-Mall-JAVA商城-电商系统简介 开源了一套基于若依框架&#xff0c;SringBoot2MybatisPlusSpringSecurityjwtredisVueUniapp的前后端分离的商城系统&#xff0c; 包含分类、sku、商户管理、分销、会员、适合企业或个人二次开发。 前端采用Vue、Element UI&#xff08;ant…

Mac terminal 每次打开都要重新配置文件

1. 问题描述 每次打开 Terminal&#xff0c;base_profile文件中配置的内容就不生效&#xff0c;需要重新执行source ~/.bash_profile才可以使用。 2. 原因分析 zsh加载的是~/.zshrc文件&#xff0c;而.zshrc 文件中并没有定义任务环境变量。 3. 解决办法 在~/.zshrc文件末尾添…

CF1017B The Bits 题解

想死人的思维题哈哈。 题目传送门 题目意思&#xff1a; 给你两个二进制串&#xff0c;你可以将第一个二进制串的任意两个位置的数字调换&#xff0c;问有多少种方案可以让这两个二进制串按位或的结果改变&#xff1f; 思路&#xff1a; 要从按位或的性质上开始思考。 按位…

Debian10: 安装nut服务器(UPS)

UPS说明&#xff1a; UPS的作用就不必讲了&#xff0c;我选择是SANTAKTGBOX-850&#xff0c;规格为 850VA/510W&#xff0c;可以满足所需&#xff0c;关键是Debian10自带了驱动可以支持&#xff0c;免去安装驱动&#xff0c;将UPS通过USB线连接服务器即可&#xff0c;如下图所示…

Vue初识别--环境搭建--前置配置过程

问题一&#xff1a; 在浏览器上的扩展程序上添加了vue-devtools后不生效&#xff1a; 解决方式&#xff1a;打开刚加入的扩展工具Vue.js devtools的允许访问文件地址设置 问题二&#xff1a;Vue新建一个项目 创建一个空文件夹hrsone&#xff0c;然后在VSCode中打开这个空文件夹…

RequestRespons

文章目录 Request&Respons1 Request和Response的概述2 Request对象2.1 Request继承体系2.2 Request获取请求数据2.2.1 获取请求行数据2.2.2 获取请求头数据2.2.3 获取请求体数据2.2.4 获取请求参数的通用方式 2.3 IDEA快速创建Servlet2.4 请求参数中文乱码问题2.4.1 POST请…

基于Python的微博大数据舆情分析,舆论情感分析可视化系统,可作为Python毕业设计

运行效果图 基于Python的微博大数据舆情分析&#xff0c;舆论情感分析可视化系统 系统介绍 微博舆情分析系统&#xff0c;项目后端分爬虫模块、数据分析模块、数据存储模块、业务逻辑模块组成。 先后进行了数据获取和筛选存储&#xff0c;对存储后的数据库数据进行提取分析处…

iptables安全与防火墙

防火墙 防火墙主要作用是隔离功能&#xff0c;它是部署在网络边缘或主机边缘&#xff1b;另外在生产中防火墙的主要作用是&#xff1a;决定哪些数据可以被外网访问以及哪些数据可以进入内网访问&#xff1b;顾名思义防火墙处于TCP协议中的网络层。 防火墙分类&#xff1a; 软…

postgresql 分类排名

postgresql 分类排名 排名窗口函数示例CUME_DIST 和 NTILE 排名窗口函数 排名窗口函数用于对数据进行分组排名。常见的排名窗口函数包括&#xff1a; • ROW_NUMBER&#xff0c;为分区中的每行数据分配一个序列号&#xff0c;序列号从 1 开始分配。 • RANK&#xff0c;计算每…

数学建模之“灰色预测”模型

灰色系统分析法在建模中的应用 1、CUMCM2003A SARS的传播问题 2、CUMCM2005A长江水质的评价和预测CUMCM2006A出版社的资源配置 3、CUMCM2006B艾滋病疗法的评价及疗效的预测问题 4、CUMCM2007A 中国人口增长预测 灰色系统的应用范畴大致分为以下几方面: (1&#xff09;灰色关…

“深度学习”学习日记:Tensorflow实现VGG每一个卷积层的可视化

2023.8.19 深度学习的卷积对于初学者是非常抽象&#xff0c;当时在入门学习的时候直接劝退一大班人&#xff0c;还好我坚持了下来。可视化时用到的图片&#xff08;我们学校的一角&#xff01;&#xff01;&#xff01;&#xff09;以下展示了一个卷积和一次Relu的变化 作者使…

IronPDF for .NET Crack

IronPDF for .NET Crack ronPDF现在将等待HTML元素加载后再进行渲染。 IronPDF现在将等待字体加载后再进行渲染。 添加了在绘制文本时指定旋转的功能。 添加了在保存为PDFA时指定自定义颜色配置文件的功能。 IronPDF for.NET允许开发人员在C#、F#和VB.NET for.NET Core和.NET F…

​LeetCode解法汇总43. 字符串相乘

目录链接&#xff1a; 力扣编程题-解法汇总_分享记录-CSDN博客 GitHub同步刷题项目&#xff1a; https://github.com/September26/java-algorithms 原题链接&#xff1a;力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 描述&#xff1a; 给定两个以…

深入探讨API接口测试:从基础到高级策略

引言&#xff1a;API测试的重要性 在当前的技术趋势中&#xff0c;API&#xff08;应用程序接口&#xff09;已经成为连接各种系统和服务的基石。API不仅仅是大型企业的领域&#xff0c;中小型公司和初创公司也越来越依赖API来拓展他们的业务功能和跨系统通信。正因如此&#…

【深入浅出C#】章节 8: 网络编程和远程通信:网络编程和远程通信

计算机网络是指连接多台计算机设备&#xff0c;通过通信链路共享资源和信息的系统。它构建了一个相互连接的世界&#xff0c;使得人们可以在不同地点进行数据交换和资源共享。网络编程是指在计算机网络中&#xff0c;使用编程语言进行通信和数据传输的技术。现代应用中&#xf…

虚拟机问题

虚拟机无法识别USB设备 经排查为VMware USB Arbitration Service 没有启动,但是VMware USB Arbitration Service依赖于VMware Workstation Server启动 VMware USB Arbitration Service(VMUSBArbService)是由 VMware 虚拟化软件提供的一个服务,用于协调和管理主机系统上的…

Python数据分析实战-*和**实现可变多参数的传入或变量的拆解(附源码和实现效果)

实现功能 *和**实现多参数的传入或变量的拆解 实现代码 # 1、实现多参数的传入 def one(a,*b):"""a是一个普通传入参数&#xff0c;*b是一个非关键字星号参数"""print(b) one(1,2,3,4,5,6) 其中&#xff0c;第一个的输入可以理解为&#xff1a…

kafka配置远程连接

要想实现在本地连接服务器的kafka&#xff0c;则必须在远程kafka配置远程连接 默认的 kafka 配置是无法远程访问的&#xff0c;解决该问题有几个方案。 方案1 advertised.listenersPLAINTEXT://IP:9092 注意必须是 ip&#xff0c;不能是 hostname 方案2 advertised.listene…

【虫洞攻击检测】使用多层神经网络的移动自组织网络中的虫洞攻击检测研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…