Springboot Rabbitmq + 线程池技术控制指定数量task执行

定义DataSyncTaskManager,作为线程池任务控制器

package org.demo.scheduletest.service;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;@Slf4j
public class DataSyncTaskManager {// 线程数private static final Integer threadNum = 5;private static DataSyncTaskManager taskManager = null;private static BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();private ThreadPoolExecutor taskExecutorPool;private DataSyncTaskManager() {taskExecutorPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum);}/*** 构建唯一Manager对象单例** @return*/public static synchronized DataSyncTaskManager getManager() {if (null == taskManager) {taskManager = new DataSyncTaskManager();}return taskManager;}/*** 提交需要运行的任务** @param task*/public void submitTask(DataSyncTask task) {taskQueue.add(task);log.info("[DataSyncTaskManager] submitTask size={}", taskQueue.size());}public void runTaskDaemon() {log.info("[DataSyncTaskManager] runTaskDaemon start.");Thread thread = new Thread(() -> {while (true) {try {Runnable task = taskQueue.take();taskExecutorPool.submit(task);// log.info("[DataSyncTaskManager] runTaskDaemon submit task={}", task);Thread.sleep(3000);} catch (InterruptedException e) {Thread.currentThread().interrupt();log.error("[startTaskRunningDaemon] task run InterruptedException", e);} catch (Exception e) {log.error("[startTaskRunningDaemon] task run Exception", e);}}});thread.setName(this.getClass().getSimpleName());thread.start();}
}

定义DataSyncTask,作为具体任务执行方

package org.demo.scheduletest.service;import com.rabbitmq.client.Channel;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;@Data
@Slf4j
public class DataSyncTask implements Runnable {private String name;private Channel channel;private long deliveryTag;public DataSyncTask(String name, Channel channel, long deliveryTag) {this.name = name;this.channel = channel;this.deliveryTag = deliveryTag;}/*** When an object implementing interface <code>Runnable</code> is used* to create a thread, starting the thread causes the object's* <code>run</code> method to be called in that separately executing* thread.* <p>* The general contract of the method <code>run</code> is that it may* take any action whatsoever.** @see Thread#run()*/@Overridepublic void run() {log.info("[DataSyncTask] run task start, name = {}", name);try {Thread.sleep(30000);} catch (InterruptedException e) {throw new RuntimeException(e);}try {channel.basicAck(deliveryTag, true);log.info("[DataSyncTask] run task end, name = {}", name);} catch (IOException e) {throw new RuntimeException(e);}}}

InitTask,服务启动执行Task管理器

package org.demo.scheduletest.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;/*** @author zhe.xiao* @version 1.0* @since 2025/1/9 上午11:38*/
@Slf4j
@Component
public class InitTask implements ApplicationRunner {/*** Callback used to run the bean.** @param args incoming application arguments* @throws Exception on error*/@Overridepublic void run(ApplicationArguments args) throws Exception {DataSyncTaskManager.getManager().runTaskDaemon();}
}

配置Rabbitmq

package org.demo.scheduletest.rabbitmq;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;/*** @author zhe.xiao* @date 2022-07-06 17:27* @description**/
@SpringBootConfiguration
public class MyRabbitTemplateConfig {@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.virtual-host:/}")private String virtualhost;/*** 连接工厂* @return*/@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualhost);//connectionFactory.setPublisherConfirms(true);return connectionFactory;}/**** @return RabbitTemplate*/@Beanpublic RabbitTemplate rabbitTemplate() {return new RabbitTemplate(connectionFactory());}
}
package org.demo.scheduletest.rabbitmq;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;/*** @author zhe.xiao* @date 2022-07-05 14:55* @description**/
@Configuration
public class MyRabbitExecutor {//正常队列public static final String QUEUE_1 = "my:queue:1";public static final String EXCHANGE_1 = "my:exchange:1";public static final String ROUTEING_1 = "data:route:1";//死信队列public static final String QUEUE_DEAD_LETTER = "my:queue:deadLetter";public static final String EXCHANGE_DEAD_LETTER = "my:exchange:deadLetter";public static final String ROUTING_DEAD_LETTER = "data:route:deadLetter";// 提供 Queue@BeanQueue myQueue1(){HashMap<String, Object> args = new HashMap<>();//绑定死信队列信息args.put("x-dead-letter-exchange", EXCHANGE_DEAD_LETTER);args.put("x-dead-letter-routing-key", ROUTING_DEAD_LETTER);//        args.put("x-max-length", 5); //队列最大长度,超过了会进入死信队列
//         args.put("x-message-ttl", 5000); //如果5秒没被消费,则进入死信队列return new Queue(QUEUE_1, true, false, false, args);}// 提供 Exchange@BeanDirectExchange myExchange1(){return new DirectExchange(EXCHANGE_1, true, false);}// 创建一个Binding对象,将Exchange和Queue绑定在一起@BeanBinding myBinding1(){return BindingBuilder.bind(myQueue1()).to(myExchange1()).with(ROUTEING_1);// return BindingBuilder.bind(myQueue1()).to(myExchange1());}// 死信队列配置 QUEUE, EXCHANGE, BINDING@BeanQueue myQueueDeadLetter(){return new Queue(QUEUE_DEAD_LETTER, true, false, false);}@BeanDirectExchange myExchangeDeadLetter(){return new DirectExchange(EXCHANGE_DEAD_LETTER, true, false);}@BeanBinding myBindingDeadLetter(){return BindingBuilder.bind(myQueueDeadLetter()).to(myExchangeDeadLetter()).with(ROUTING_DEAD_LETTER);}
}

Rabbitmq消费者通过task控制器提交执行任务

package org.demo.scheduletest.rabbitmq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.demo.scheduletest.service.DataSyncTask;
import org.demo.scheduletest.service.DataSyncTaskManager;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;/*** 消费QUEUE** @author zhe.xiao* @date 2022-07-05 14:57* @description**/
@Slf4j
@Component
public class MyReceiver {@RabbitListener(queues = MyRabbitExecutor.QUEUE_1)public void handler1(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {try {log.info("handler1 process: {}", data);DataSyncTask dataSyncTask = new DataSyncTask(data, channel, deliveryTag);DataSyncTaskManager.getManager().submitTask(dataSyncTask);} catch (Exception e) {log.error(e.getMessage());}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/66686.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

每日学习30分轻松掌握CursorAI:初识Cursor AI

初识Cursor AI 一、什么是Cursor AI&#xff1f; Cursor AI是一款革命性的AI驱动型代码编辑器&#xff0c;它将传统的代码编辑功能与先进的人工智能技术相结合。它不仅是一个编辑器&#xff0c;更是一个智能编程助手&#xff0c;能够帮助开发者提高编码效率&#xff0c;解决编…

小米路由器IPv6 功能使用指南

本文不限于多层路由使用IPv6 的情况&#xff0c;提供解决IPv6 无法获取的更硬核的方法&#xff0c;需要有ssh 工具。&#xff08;无安卓设备&#xff0c;测试环境win、mac、ios&#xff09; 首先明确一点&#xff0c;就是如果想让你的设备得到GUA 地址&#xff0c;即访问 6.i…

云商城--业务+架构学习和环境准备

云商城业务架构学习和环境准备 B2B&#xff1a;Business to Business&#xff0c;交易双方的身份都是商家&#xff0c;也就是商家将商品卖给商家&#xff0c;类似采购、批发类购物&#xff0c;国内代表性网站阿里巴巴批发网 C2C&#xff1a;Customer to Customer&#xff0c;…

机器视觉系统中的重要配件--棱镜

在一套机器视觉系统中&#xff0c;人们一直比较注中工业相机、工业镜头及光源等重要的视觉器件&#xff0c;而小配件通常被忽视&#xff0c;虽然只是配角&#xff0c;但是却起着重要作用。以下以茉丽特镜头为例。 在构建视觉系统当中&#xff0c;遇到某个方向空间不足时&#x…

软件系统安全逆向分析-混淆对抗

1. 概述 在一般的软件中&#xff0c;我们逆向分析时候通常都不能直接看到软件的明文源代码&#xff0c;或多或少存在着混淆对抗的操作。下面&#xff0c;我会实践操作一个例子从无从下手到攻破目标。 花指令对抗虚函数表RC4 2. 实战-donntyousee 题目载体为具有漏洞的小型软…

#渗透测试#网络安全# 一文了解什么是跨域CROS!!!

免责声明 本教程仅为合法的教学目的而准备&#xff0c;严禁用于任何形式的违法犯罪活动及其他商业行为&#xff0c;在使用本教程前&#xff0c;您应确保该行为符合当地的法律法规&#xff0c;继续阅读即表示您需自行承担所有操作的后果&#xff0c;如有异议&#xff0c;请立即停…

ClickHouse vs StarRocks 选型对比

一、面向列存的 DBMS 新的选择 Hadoop 从诞生已经十三年了&#xff0c;Hadoop 的供应商争先恐后的为 Hadoop 贡献各种开源插件&#xff0c;发明各种的解决方案技术栈&#xff0c;一方面确实帮助很多用户解决了问题&#xff0c;但另一方面因为繁杂的技术栈与高昂的维护成本&…

Win11家庭版转专业版

Win11家庭版转专业版&#xff08;亲测有效&#xff09; 第一步 【断网】输入这个密钥&#xff1a; R8NJ8-9X7PV-C7RCR-F3J9X-KQBP6 第二步 点击下一步会自动重启 第三步 【联网】输入这个密钥&#xff1a; F3NWX-VFMFC-MHYYF-BCJ3K-QV66Y 注意 两次输入密钥的地方一致 …

IP 地址与蜜罐技术

基于IP的地址的蜜罐技术是一种主动防御策略&#xff0c;它能够通过在网络上布置的一些看似正常没问题的IP地址来吸引恶意者的注意&#xff0c;将恶意者引导到预先布置好的伪装的目标之中。 如何实现蜜罐技术 当恶意攻击者在网络中四处扫描&#xff0c;寻找可入侵的目标时&…

【Word_笔记】Word的修订模式内容改为颜色标记

需求如下&#xff1a;请把修改后的部分直接在原文标出来&#xff0c;不要采用修订模式 步骤1&#xff1a;打开需要转换的word后&#xff0c;同时按住alt和F11 进入&#xff08;Microsoft Visual Basic for Appliations&#xff09; 步骤2&#xff1a;插入 ---- 模块 步骤3&…

[0405].第05节:搭建Redis主从架构

Redis学习大纲 一、3主3从的集群配置&#xff1a; 1.1.集群规划 1.分片集群需要的节点数量较多&#xff0c;这里我们搭建一个最小的分片集群&#xff0c;包含3个master节点&#xff0c;每个master包含一个slave节点&#xff0c;结构如下&#xff1a; 2.每组是一主一从&#x…

科研绘图系列:R语言绘制分组箱线图(boxplot)

禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍加载R包数据下载导入数据数据预处理画图输出系统信息介绍 科研绘图系列:R语言绘制分组箱线图(boxplot) 加载R包 library(ggpubr) library(ggplot2) library(tidyverse) # dev…

Hadoop - MapReduce编程

文章目录 前言一、创建mapreduce-demo项目1. 在idea上创建maven项目2. 导入hadoop相关依赖 二、MapReduce编程1. 相关介绍1.1 驱动类&#xff08;Driver Class&#xff09;1.1.1 驱动类的定义1.1.2 驱动类的功能1.1.3 驱动类的作用 1.2 Mapper1.2.1 Mapper 的定义1.2.2 Mapper …

原码的乘法运算>>>只有0,1

MQ : 乘数 X : 被乘数 ACC : 乘积高位 [当前位是1,加上被乘数; 当前位是 0,加上0] 例如: MQ的最低位是1,所以要加上被乘数(01101) >>>> 得出 01101 >>>>> ACC MQ 需要整体逻辑右移 (原本01101 01011 >>> 001101 0101) 现在的次低位是…

mapbox基础,style样式汇总,持续更新

&#x1f468;‍⚕️ 主页&#xff1a; gis分享者 &#x1f468;‍⚕️ 感谢各位大佬 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍⚕️ 收录于专栏&#xff1a;mapbox 从入门到精通 文章目录 一、&#x1f340;前言二、&#x1f340;根属性2.1 so…

人工智能知识分享第九天-机器学习_集成学习

集成学习 概念 集成学习是机器学习中的一种思想&#xff0c;它通过多个模型的组合形成一个精度更高的模型&#xff0c;参与组合的模型称为弱学习器&#xff08;基学习器&#xff09;。训练时&#xff0c;使用训练集依次训练出这些弱学习器&#xff0c;对未知的样本进行预测时…

页面滚动下拉时,元素变为fixed浮动,上拉到顶部时恢复原状,js代码以视频示例

页面滚动下拉时,元素变为fixed浮动js代码 以视频示例 <style>video{width:100%;height:auto}.div2,#float1{position:fixed;_position:absolute;top:45px;right:0; z-index:250;}button{float:right;display:block;margin:5px} </style><section id"abou…

排序算法——堆排序

什么是堆 堆就是一种特殊的二叉树&#xff0c;他有以下特点&#xff1a; 堆中某个节点的值总是不大于或不小于其父节点的值&#xff1b; 堆总是一棵完全二叉树。 堆又可以分为大根堆和小根堆 大根堆&#xff1a;根节点最大&#xff0c;每个节点都小于或等于父节点 小跟堆&am…

K-means算法在无监督学习中的应用

K-means算法在无监督学习中的应用 K-means算法是一种典型的无监督学习算法&#xff0c;广泛用于聚类分析。在无监督学习中&#xff0c;模型并不依赖于标签数据&#xff0c;而是根据输入数据的特征进行分组。K-means的目标是将数据集分成K个簇&#xff0c;使得同一簇内的数据点…

Linux 35.6 + JetPack v5.1.4之 pytorch升级

Linux 35.6 JetPack v5.1.4之 pytorch升级 1. 源由2. 升级步骤1&#xff1a;获取二进制版本步骤2&#xff1a;安装二进制版本步骤3&#xff1a;获取torchvision步骤4&#xff1a;安装torchvision步骤5&#xff1a;检查安装版本 3. 使用4. 补充4.1 torchvision版本问题4.2 支持…