Spring Boot 集成 Redisson 实现消息队列

包含组件内容

  • RedisQueue:消息队列监听标识
  • RedisQueueInit:Redis队列监听器
  • RedisQueueListener:Redis消息队列监听实现
  • RedisQueueService:Redis消息队列服务工具

代码实现

RedisQueue

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** Redis消息队列注解*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisQueue {/*** 队列名*/String value();
}

RedisQueueInit

import jakarta.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.redisson.RedissonShutdownException;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;/*** 初始化Redis队列监听器** @author 十八* @createTime 2024-09-09 22:49*/
@Slf4j
@Component
public class RedisQueueInit implements ApplicationContextAware {public static final String REDIS_QUEUE_PREFIX = "redis-queue";final AtomicBoolean shutdownRequested = new AtomicBoolean(false);@Resourceprivate RedissonClient redissonClient;private ExecutorService executorService;public static String buildQueueName(String queueName) {return REDIS_QUEUE_PREFIX + ":" + queueName;}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {Map<String, RedisQueueListener> queueListeners = applicationContext.getBeansOfType(RedisQueueListener.class);if (!queueListeners.isEmpty()) {executorService = createThreadPool();for (Map.Entry<String, RedisQueueListener> entry : queueListeners.entrySet()) {RedisQueue redisQueue = entry.getValue().getClass().getAnnotation(RedisQueue.class);if (redisQueue != null) {String queueName = redisQueue.value();executorService.submit(() -> listenQueue(queueName, entry.getValue()));}}}}private ExecutorService createThreadPool() {return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2,Runtime.getRuntime().availableProcessors() * 4,60L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(100),new NamedThreadFactory(REDIS_QUEUE_PREFIX),new ThreadPoolExecutor.CallerRunsPolicy());}private void listenQueue(String queueName, RedisQueueListener redisQueueListener) {queueName = buildQueueName(queueName);RBlockingQueue<?> blockingQueue = redissonClient.getBlockingQueue(queueName);log.info("Redis队列监听开启: {}", queueName);while (!shutdownRequested.get() && !redissonClient.isShutdown()) {try {Object message = blockingQueue.take();executorService.submit(() -> redisQueueListener.consume(message));} catch (RedissonShutdownException e) {log.info("Redis连接关闭,停止监听队列: {}", queueName);break;} catch (Exception e) {log.error("监听队列异常: {}", queueName, e);}}}public void shutdown() {if (executorService != null) {executorService.shutdown();try {if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {executorService.shutdownNow();}} catch (InterruptedException ex) {executorService.shutdownNow();Thread.currentThread().interrupt();}}shutdownRequested.set(true);if (redissonClient != null && !redissonClient.isShuttingDown()) {redissonClient.shutdown();}}private static class NamedThreadFactory implements ThreadFactory {private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;public NamedThreadFactory(String prefix) {this.namePrefix = prefix;}@Overridepublic Thread newThread(@NotNull Runnable r) {return new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());}}}

RedisQueueListener

/*** Redis消息队列监听实现** @author 十八* @createTime 2024-09-09 22:51*/
public interface RedisQueueListener<T> {/*** 队列消费方法** @param content 消息内容*/void consume(T content);
}

RedisQueueService

import jakarta.annotation.Resource;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;/*** Redis 消息队列服务** @author 十八* @createTime 2024-09-09 22:52*/
@Component
public class RedisQueueService {@Resourceprivate RedissonClient redissonClient;/*** 添加队列** @param queueName 队列名称* @param content   消息* @param <T>       泛型*/public <T> void send(String queueName, T content) {RBlockingQueue<T> blockingQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));blockingQueue.add(content);}/*** 添加延迟队列** @param queueName 队列名称* @param content   消息类型* @param delay     延迟时间* @param timeUnit  单位* @param <T>       泛型*/public <T> void sendDelay(String queueName, T content, long delay, TimeUnit timeUnit) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(content, delay, timeUnit);}/*** 发送延迟队列消息(单位毫秒)** @param queueName 队列名称* @param content   消息类型* @param delay     延迟时间* @param <T>       泛型*/public <T> void sendDelay(String queueName, T content, long delay) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(content, delay, TimeUnit.MILLISECONDS);}
}

测试

创建监听对象

import cn.yiyanc.infrastructure.redis.annotation.RedisQueue;
import cn.yiyanc.infrastructure.redis.queue.RedisQueueListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** @author 十八* @createTime 2024-09-10 00:09*/
@Slf4j
@Component
@RedisQueue("test")
public class TestListener implements RedisQueueListener<String> {@Overridepublic void invoke(String content) {log.info("队列消息接收 >>> {}", content);}
}

测试用例

import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author 十八* @createTime 2024-09-10 00:11*/
@RestController
@RequestMapping("queue")
public class QueueController {@Resourceprivate RedisQueueService redisQueueService;@PostMapping("send")public void send(String message) {redisQueueService.send("test", message);redisQueueService.sendDelay("test", "delay messaege -> " + message, 1000);}}

测试结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/53579.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第2章 方法

本书作者起初以为仅靠研究命令行工具和指标就能提高性能。他认为这样不对。他从头到尾读了一遍手册&#xff0c;看懂了缺页故障、上下文切换和其他各种系统指标的定义&#xff0c;但不知道如何处理它们&#xff1a;如何从发现信号到找到解决方案。 他注意到&#xff0c;每当出…

18、公司信贷管理|贷款额度的测算|贷款期限及其定价的设定逻辑!

银行在综合权衡贷款的第一还款来源和第二还款来源、风险和收益的基础上&#xff0c;应明确提出贷与不贷的意见。经调查审查同意的贷款&#xff0c;应提出最终的融资方案。 合理的融资方案既要有利于提升本行的竞争力&#xff0c;又要有利于控制贷款风险。完整的融资方案一般包…

计算机网络 --- 【2】计算机网络的组成、功能

目录 一、计算机网络的组成 1.1 从组成部分看 1.2 从工作方式看 1.3 从逻辑功能看 1.4 总结 二、计算机网络的功能 2.1 数据通信 2.2 资源共享​编辑 2.3 分布式处理 2.4 提高可靠性 2.5 负载均衡 一、计算机网络的组成 1.1 从组成部分看 我们举例分析计算机网络从…

【人工智能学习笔记】4_4 深度学习基础之生成对抗网络

生成对抗网络&#xff08;Generative Adversarial Network, GAN&#xff09; 一种深度学习模型&#xff0c;通过判别模型&#xff08;Discriminative Model&#xff09;和生成模型&#xff08;Generative Model&#xff09;的相互博弈学习&#xff0c;生成接近真实数据的数据分…

19章 泛型

1.修改程序清单19-1中的GenericStack类&#xff0c;使用数组而不是ArrayList来实现它。你应该在给栈添加新元素之前检查数组的大小如果数组满了&#xff0c;就创建一个新数组。该数组是当前数组大小的两倍&#xff0c;然后将当前数组的元素复制到新数组中。 public class Gene…

​ArcGIS Pro和ArcGIS的10大区别

本文来源&#xff1a;水经注GIS公众号 如果你经常使用ArcGIS 进行制图和分析&#xff0c;那么你一定听说过ArcGIS Pro&#xff0c;这款软件是Esri未来主打的一款桌面GIS软件&#xff0c;那么这款软件和ArcGIS相比有什么不同呢&#xff0c;这里为你列举了两款软件的10大区别&am…

QScopedPointer的了解

QT 智能指针 QPointer QScopedPointer QSharedPointer QWeakPointer QSharedDataPointer 隐式共享 显示共享-CSDN博客 本文摘录自上面的文章 其类似于C11中的unique_ptr&#xff0c;用于管理动态分配的对象的独占所有权&#xff0c;即同一时间只能有一个QScopedPointer指向该…

Docker 安装 Nacos 教程

Nacos 是一个易于使用的平台&#xff0c;用于服务发现和配置管理。它支持服务的注册与发现&#xff0c;同时提供动态配置管理功能。本文将介绍如何使用 Docker 快速安装 Nacos&#xff0c;并提供相应的配置文件 standalone-mysql-5.7.yaml 用于设置 Nacos 与 MySQL 的集成。 参…

文本分类场景下微调BERT

How to Fine-Tune BERT for Text Classification 论文《How to Fine-Tune BERT for Text Classification?》是2019年发表的一篇论文。这篇文章做了一些实验来分析了如何在文本分类场景下微调BERT&#xff0c;是后面网上讨论如何微调BERT时经常提到的论文。 结论与思路 先来看…

Vue跨域问题、Vue配置开发环境代理服务、集成Axios发送Ajax请求、集成vue-resource发送Ajax请求

目录 1. Ajax请求服务介绍2. axios的安装3. Vue跨域问题介绍和解决方案4. 使用vue-cli配置开发环境代理服务4.1 简单配置4.2 复杂配置4.3 二次封装 5. 集成vue-resource发送Ajax请求 1. Ajax请求服务介绍 xhr: new XHLHttpRequest().open()/send()。偏向底层JQuery: 对xhr进行…

Sklearn的datasets模块与自带数据集介绍

datasets 模块 用 dir() 函数查看 datasets 模块的所有属性和函数 import sklearn.datasets as datasets# 列出 sklearn.datasets 模块中的所有属性和函数 print(dir(datasets)) datasets 模块下的数据集有三种类型&#xff1a; &#xff08;1&#xff09;load系列的经典数…

Sqoop 数据迁移

Sqoop 数据迁移 一、Sqoop 概述二、Sqoop 优势三、Sqoop 的架构与工作机制四、Sqoop Import 流程五、Sqoop Export 流程六、Sqoop 安装部署6.1 下载解压6.2 修改 Sqoop 配置文件6.3 配置 Sqoop 环境变量6.4 添加 MySQL 驱动包6.5 测试运行 Sqoop6.5.1 查看Sqoop命令语法6.5.2 测…

Vue 中 计算属性与侦听属性的使用与介绍

Vue 中 计算属性与侦听属性的使用与介绍 计算属性 - computed 计算属性是一种特殊的属性&#xff0c;它依赖于其他属性&#xff0c;并返回一个新的值。当依赖的属性发生变化时&#xff0c;计算属性会重新求值。 计算属性的语法如下&#xff1a; computed: {// 计算属性名: …

【数学建模】2024数学建模国赛经验分享

文章目录 一、关于我二、我的数模历程三、经验总结&#xff1a; 一、关于我 我的CSDN主页&#xff1a;https://gxdxyl.blog.csdn.net/ 2020年7月&#xff08;大二结束的暑假&#xff09;开始在CSDN写作&#xff1a; 阿里云博客专家&#xff1a; 接触的领域挺多的&#xff…

摩尔投票算法--169. 多数元素

169. 多数元素 普通方法-借助map计数 class Solution { public:int majorityElement(vector<int>& nums) {map<int,int> mp;for(int num :nums){mp[num];}for(auto &a :mp){if(a.second>nums.size()/2){return a.first;}}return 0;} }; 进阶&#xff…

【Linux】常用指令(中)(附带基础指令的详细讲解、Linux的一些附加知识)

文章目录 前言1. Linux基础常用指令1.1 通配符 "*"1.2 man指令&#xff08;重要&#xff09;1.2.1 man指令的语法 1.3 何为"指令"&#xff1f;(附带知识)1.4 echo指令1.5 cat指令1.6 Linux下一切皆文件&#xff01;1.6.1 ">" 输出重定向1.6.2…

【基础知识复习 - 随机练习题】

问题 1&#xff1a;在软件生命周期模型中&#xff0c;哪一个模型强调了开发过程的迭代性和反馈&#xff1f; A. 瀑布模型 B. V模型 C. 敏捷模型 D. 原型模型 答案&#xff1a;C. 敏捷模型 解析&#xff1a;敏捷模型强调迭代开发和反馈&#xff0c;允许在每个迭代周期中进行调…

浅谈C#之线程锁

一、基本介绍 锁是一种同步机制&#xff0c;用于控制多个线程对共享资源的访问。当一个线程获得了锁时&#xff0c;其他线程将被阻塞&#xff0c;直到该线程释放了锁。 在并发编程中&#xff0c;多个线程同时访问共享资源可能导致数据竞争和不确定的行为。锁可以确保在任意时刻…

springboot提升-多数据源配置

文章目录 1. 添加依赖2. 配置数据源示例配置&#xff1a; 3. 创建数据源 Bean4. 创建动态数据源5. 配置 MyBatis SqlSessionFactory6. 在业务代码中使用注意事项 在 Spring Boot 中配置 MyBatis 以支持多数据源涉及几个关键步骤&#xff0c;包括配置数据源、集成 MyBatis 以及动…

Qt篇——Qt使用C++获取Windows电脑上所有外接设备的名称、物理端口位置等信息

我之前有发过一篇文章《Qt篇——获取Windows系统上插入的串口设备的物理序号》&#xff0c;文章中主要获取的是插入的USB串口设备的物理序号&#xff1b;而本篇文章则进行拓展&#xff0c;可以获取所有外接设备的相关信息&#xff08;比如USB摄像头、USB蓝牙、USB网卡、其它一些…