Springboot Rabbitmq + 线程池技术控制指定数量task执行

定义DataSyncTaskManager,作为线程池任务控制器

package org.demo.scheduletest.service;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;@Slf4j
public class DataSyncTaskManager {// 线程数private static final Integer threadNum = 5;private static DataSyncTaskManager taskManager = null;private static BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();private ThreadPoolExecutor taskExecutorPool;private DataSyncTaskManager() {taskExecutorPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum);}/*** 构建唯一Manager对象单例** @return*/public static synchronized DataSyncTaskManager getManager() {if (null == taskManager) {taskManager = new DataSyncTaskManager();}return taskManager;}/*** 提交需要运行的任务** @param task*/public void submitTask(DataSyncTask task) {taskQueue.add(task);log.info("[DataSyncTaskManager] submitTask size={}", taskQueue.size());}public void runTaskDaemon() {log.info("[DataSyncTaskManager] runTaskDaemon start.");Thread thread = new Thread(() -> {while (true) {try {Runnable task = taskQueue.take();taskExecutorPool.submit(task);// log.info("[DataSyncTaskManager] runTaskDaemon submit task={}", task);Thread.sleep(3000);} catch (InterruptedException e) {Thread.currentThread().interrupt();log.error("[startTaskRunningDaemon] task run InterruptedException", e);} catch (Exception e) {log.error("[startTaskRunningDaemon] task run Exception", e);}}});thread.setName(this.getClass().getSimpleName());thread.start();}
}

定义DataSyncTask,作为具体任务执行方

package org.demo.scheduletest.service;import com.rabbitmq.client.Channel;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;@Data
@Slf4j
public class DataSyncTask implements Runnable {private String name;private Channel channel;private long deliveryTag;public DataSyncTask(String name, Channel channel, long deliveryTag) {this.name = name;this.channel = channel;this.deliveryTag = deliveryTag;}/*** When an object implementing interface <code>Runnable</code> is used* to create a thread, starting the thread causes the object's* <code>run</code> method to be called in that separately executing* thread.* <p>* The general contract of the method <code>run</code> is that it may* take any action whatsoever.** @see Thread#run()*/@Overridepublic void run() {log.info("[DataSyncTask] run task start, name = {}", name);try {Thread.sleep(30000);} catch (InterruptedException e) {throw new RuntimeException(e);}try {channel.basicAck(deliveryTag, true);log.info("[DataSyncTask] run task end, name = {}", name);} catch (IOException e) {throw new RuntimeException(e);}}}

InitTask,服务启动执行Task管理器

package org.demo.scheduletest.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;/*** @author zhe.xiao* @version 1.0* @since 2025/1/9 上午11:38*/
@Slf4j
@Component
public class InitTask implements ApplicationRunner {/*** Callback used to run the bean.** @param args incoming application arguments* @throws Exception on error*/@Overridepublic void run(ApplicationArguments args) throws Exception {DataSyncTaskManager.getManager().runTaskDaemon();}
}

配置Rabbitmq

package org.demo.scheduletest.rabbitmq;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;/*** @author zhe.xiao* @date 2022-07-06 17:27* @description**/
@SpringBootConfiguration
public class MyRabbitTemplateConfig {@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.virtual-host:/}")private String virtualhost;/*** 连接工厂* @return*/@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualhost);//connectionFactory.setPublisherConfirms(true);return connectionFactory;}/**** @return RabbitTemplate*/@Beanpublic RabbitTemplate rabbitTemplate() {return new RabbitTemplate(connectionFactory());}
}
package org.demo.scheduletest.rabbitmq;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;/*** @author zhe.xiao* @date 2022-07-05 14:55* @description**/
@Configuration
public class MyRabbitExecutor {//正常队列public static final String QUEUE_1 = "my:queue:1";public static final String EXCHANGE_1 = "my:exchange:1";public static final String ROUTEING_1 = "data:route:1";//死信队列public static final String QUEUE_DEAD_LETTER = "my:queue:deadLetter";public static final String EXCHANGE_DEAD_LETTER = "my:exchange:deadLetter";public static final String ROUTING_DEAD_LETTER = "data:route:deadLetter";// 提供 Queue@BeanQueue myQueue1(){HashMap<String, Object> args = new HashMap<>();//绑定死信队列信息args.put("x-dead-letter-exchange", EXCHANGE_DEAD_LETTER);args.put("x-dead-letter-routing-key", ROUTING_DEAD_LETTER);//        args.put("x-max-length", 5); //队列最大长度,超过了会进入死信队列
//         args.put("x-message-ttl", 5000); //如果5秒没被消费,则进入死信队列return new Queue(QUEUE_1, true, false, false, args);}// 提供 Exchange@BeanDirectExchange myExchange1(){return new DirectExchange(EXCHANGE_1, true, false);}// 创建一个Binding对象,将Exchange和Queue绑定在一起@BeanBinding myBinding1(){return BindingBuilder.bind(myQueue1()).to(myExchange1()).with(ROUTEING_1);// return BindingBuilder.bind(myQueue1()).to(myExchange1());}// 死信队列配置 QUEUE, EXCHANGE, BINDING@BeanQueue myQueueDeadLetter(){return new Queue(QUEUE_DEAD_LETTER, true, false, false);}@BeanDirectExchange myExchangeDeadLetter(){return new DirectExchange(EXCHANGE_DEAD_LETTER, true, false);}@BeanBinding myBindingDeadLetter(){return BindingBuilder.bind(myQueueDeadLetter()).to(myExchangeDeadLetter()).with(ROUTING_DEAD_LETTER);}
}

Rabbitmq消费者通过task控制器提交执行任务

package org.demo.scheduletest.rabbitmq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.demo.scheduletest.service.DataSyncTask;
import org.demo.scheduletest.service.DataSyncTaskManager;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;/*** 消费QUEUE** @author zhe.xiao* @date 2022-07-05 14:57* @description**/
@Slf4j
@Component
public class MyReceiver {@RabbitListener(queues = MyRabbitExecutor.QUEUE_1)public void handler1(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {try {log.info("handler1 process: {}", data);DataSyncTask dataSyncTask = new DataSyncTask(data, channel, deliveryTag);DataSyncTaskManager.getManager().submitTask(dataSyncTask);} catch (Exception e) {log.error(e.getMessage());}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/66686.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

51单片机 和 STM32 在硬件操作上的差异

51单片机 和 STM32 在硬件操作上的差异 1. 时钟系统的差异 STM32 的时钟系统 STM32 的时钟系统非常复杂&#xff0c;支持多种时钟源&#xff08;如内部晶振、外部晶振、PLL 等&#xff09;&#xff0c;并且每个外设&#xff08;如 GPIO、定时器、串口等&#xff09;都有独立的…

T-SQL语言的编程范式

T-SQL编程范式探析 引言 随着信息技术的迅猛发展&#xff0c;数据库在各个行业的应用日益广泛。在众多数据库管理系统中&#xff0c;SQL Server以其高性能和易用性受到广泛欢迎。T-SQL&#xff08;Transact-SQL&#xff09;是SQL Server的扩展版本&#xff0c;是一种用于查询…

每日学习30分轻松掌握CursorAI:初识Cursor AI

初识Cursor AI 一、什么是Cursor AI&#xff1f; Cursor AI是一款革命性的AI驱动型代码编辑器&#xff0c;它将传统的代码编辑功能与先进的人工智能技术相结合。它不仅是一个编辑器&#xff0c;更是一个智能编程助手&#xff0c;能够帮助开发者提高编码效率&#xff0c;解决编…

小米路由器IPv6 功能使用指南

本文不限于多层路由使用IPv6 的情况&#xff0c;提供解决IPv6 无法获取的更硬核的方法&#xff0c;需要有ssh 工具。&#xff08;无安卓设备&#xff0c;测试环境win、mac、ios&#xff09; 首先明确一点&#xff0c;就是如果想让你的设备得到GUA 地址&#xff0c;即访问 6.i…

云商城--业务+架构学习和环境准备

云商城业务架构学习和环境准备 B2B&#xff1a;Business to Business&#xff0c;交易双方的身份都是商家&#xff0c;也就是商家将商品卖给商家&#xff0c;类似采购、批发类购物&#xff0c;国内代表性网站阿里巴巴批发网 C2C&#xff1a;Customer to Customer&#xff0c;…

vk-unicloud如何简单实现邮箱发送验证码?

以下代码是云函数发送验证码api&#xff0c;直接复制改个人参数&#xff1a; 其中"user"和"pass"使用自己的账号数据&#xff0c;如何拿到看以下步骤&#xff1a; 网易邮箱6.0版&#xff1a;登录--点击设置--点击POP3/SMTP/IMAP--点击开启服务&#xff1…

机器视觉系统中的重要配件--棱镜

在一套机器视觉系统中&#xff0c;人们一直比较注中工业相机、工业镜头及光源等重要的视觉器件&#xff0c;而小配件通常被忽视&#xff0c;虽然只是配角&#xff0c;但是却起着重要作用。以下以茉丽特镜头为例。 在构建视觉系统当中&#xff0c;遇到某个方向空间不足时&#x…

射频到底是什么

背景: 由于工作中wifi&#xff0c; gps 等等&#xff0c;经常使用到射频这个概念&#xff0c;一直很模糊&#xff0c;于是特此了解并记录一下。 概念理解&#xff1a; 射频可以理解为发射一个信号&#xff0c;该信号本质上是交流电所产生的电磁波&#xff0c; 一般通过这种方…

Flink-CDC 全面解析

Flink-CDC 全面解析 一、CDC 概述 &#xff08;一&#xff09;什么是 CDC CDC 即 Change Data Capture&#xff08;变更数据获取&#xff09;&#xff0c;其核心要义在于严密监测并精准捕获数据库内发生的各种变动情况&#xff0c;像数据的插入、更新以及删除操作&#xff0…

PHP语言的字符串处理

PHP语言的字符串处理 引言 字符串是编程中最基本的数据类型之一&#xff0c;通常用于存储和操作文本数据。在PHP语言中&#xff0c;对字符串的处理非常灵活且强大。无论是简单的字符操作&#xff0c;还是复杂的模式匹配&#xff0c;PHP都提供了丰富的函数和工具来满足不同的需…

PHP的扩展Imagick的安装

windows下的安装 下载&#xff1a;Imagick扩展 PECL :: Package :: imagick 3.7.0 for Windows​​​​​​​ 下载&#xff1a;ghostscript&#xff08;PDF提取图片时用到&#xff0c;不处理PDF可以不安装&#xff09; Ghostscript : Downloads 安装扩展 Imagick解压后&…

THREE.js的VideoTexture以及CanvasTexture在部分浏览器以及小程序webview中纯黑不起作用的解决办法

黑色是因为video没有自动播放导致的。 而且video必须设置muted&#xff08;静音&#xff09;属性&#xff0c;否则视频都无法播放&#xff1b; 如果不设置muted,也可以用设置x5-video-player-type"h5" 替代&#xff08;意为兼容qq浏览器&#xff0c;解决在小程序中黑…

【redis】ubuntu18安装redis7

在Ubuntu 18下安装Redis7可以通过以下两种方法实现&#xff1a;手动编译安装和使用APT进行安装。 Ubuntu 18系统的环境和版本&#xff1a; $ cat /proc/version Linux version 4.15.0-213-generic (builddlcy02-amd64-079) (gcc version 7.5.0 (Ubuntu 7.5.0-3ubuntu1~18.04)…

Java实现迭代器模式

一、简介 1、定义 迭代器模式(Iterator Pattern)是一种面向集合对象而生的行为设计模式。对于集合对象而言&#xff0c;会涉及对集合的添加和删除操作&#xff0c;也要支持遍历集合元素的操作。可以把遍历操作放在集合对象中&#xff0c;但这样做&#xff0c;集合对象就承担太…

uniapp中h5使用地图

export function loadTMap(key) {return new Promise(function(resolve, reject) {window.init function() {// resolve(qq) //注意这里resolve(TMap) //注意这里}var script document.createElement("script");script.type "text/javascript";// scrip…

获取地图文档中的图层列表

大多数情况下,获取地图文档中的图层列表是地理处理脚本中的首要工作之一.获取图层列表后,脚本可以循环遍历每个图层并执行某些类型的处理.制图模块中的ListLayers()函数提供获取图层列表的功能.本节将学习如何获得地图文档中的图层列表. 操作方法: 1.在arcgis中打开地图文件 …

软件系统安全逆向分析-混淆对抗

1. 概述 在一般的软件中&#xff0c;我们逆向分析时候通常都不能直接看到软件的明文源代码&#xff0c;或多或少存在着混淆对抗的操作。下面&#xff0c;我会实践操作一个例子从无从下手到攻破目标。 花指令对抗虚函数表RC4 2. 实战-donntyousee 题目载体为具有漏洞的小型软…

#渗透测试#网络安全# 一文了解什么是跨域CROS!!!

免责声明 本教程仅为合法的教学目的而准备&#xff0c;严禁用于任何形式的违法犯罪活动及其他商业行为&#xff0c;在使用本教程前&#xff0c;您应确保该行为符合当地的法律法规&#xff0c;继续阅读即表示您需自行承担所有操作的后果&#xff0c;如有异议&#xff0c;请立即停…

【权限管理】Apache Shiro学习教程

Apache Shiro 是一个功能强大且灵活的安全框架&#xff0c;主要用于身份认证&#xff08;Authentication&#xff09;、授权&#xff08;Authorization&#xff09;、会话管理&#xff08;Session Management&#xff09;和加密&#xff08;Cryptography&#xff09;。它旨在为…

Spring事件发布与监听

Spring事件机制详解&#xff1a;事件发布与监听 在Spring框架中&#xff0c;事件机制基于发布-订阅模式&#xff0c;允许组件之间进行解耦。发布者发布事件&#xff0c;监听者订阅并响应这些事件。Spring事件机制的核心在于ApplicationEvent和ApplicationListener&#xff0c;…