使用redisMQ-spring-boot-starter实现消息队列和延时队列

简介

redisMQ-spring-boot-starter是一个轻量级的、基于Redis实现的消息队列中间件,它有如下优点:

  • 开箱即用,你几乎不用添加额外的配置
  • 支持消息队列、延时队列,并提供精细化配置参数
  • 提供消息确认机制
  • 支持虚拟空间,不同虚拟空间的数据互相隔离
  • 支持web控制台,实时查看各个队列的消费情况

开始使用

引用依赖

springboot3.0以下版本:

<dependency><groupId>io.github.lengmianshi</groupId><artifactId>redisMQ-spring-boot-starter</artifactId><version>1.0.4</version>
</dependency><!-- 以下配置可以改为你自己的版本 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.1.0.RELEASE</version>
</dependency>
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.2</version>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version>
</dependency>

注:spring-boot-starter-data-redis依赖于spring-data-redis,如果发生依赖冲突,要确保spring-data-redis的版本不低于2.1.0.RELEASE,可在你的pom.xml中锁定版本:

<dependencyManagement><dependencies><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-redis</artifactId><version>2.1.2.RELEASE</version></dependency></dependencies>
</dependencyManagement>

springboot3.0:

<dependency><groupId>io.github.lengmianshi</groupId><artifactId>redisMQ-spring-boot-starter</artifactId><version>2.0.4</version>
</dependency><!-- 以下配置可以改为你自己的版本 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>3.2.1</version>
</dependency>
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>5.1.0</version>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version>
</dependency>

配置redis

一般引入redis的项目都会事先配置,如果你的项目没配置过,则可在application.yml中加上如下配置:

springboot3.0以下版本:

spring:redis:host: test.redis.com  #改成你的password: vC403V2KMc0Kghz #改成你的port: 6379 #改成你的jedis:pool:max-active: 100max-idle: 10min-idle: 10timeout: 2000

springboot3.0:

spring:data:redis:host: test.redis.com #改成你的password: vC403V2KMc0Kghz #改成你的port: 6379 #改成你的jedis:pool:max-active: 100max-idle: 10min-idle: 10timeout: 2000

消息队列

生产者发送消息

@Autowired
private RedisQueueTemplate redisQueueTemplate;/*** 1次只发送一条消息*/
@Test
public void test1() {JSONObject message = new JSONObject();message.put("bondId", "17f62f1dfb5afb12e8d67cd651c1df53");message.put("year", 2022);redisQueueTemplate.sendMessage("test_queue", message);
}/*** 批量发送消息*/
@Test
public void test2() {List messageList = new ArrayList<>();for (int i = 0; i < 5000; i++) {JSONObject mess = new JSONObject();mess.put("index", i);messageList.add(mess);}redisQueueTemplate.sendMessageAll("test_queue", messageList);
}

注:示例中每条MQ消息都用JSONObject包装,这只是我的个人习惯,你也可以使用实体类

消费者消费消息

消费方法的参数只能有1个,并且类型要与生产者发送消费的类型保存一致:

@Component
public class QueueConsumer {//使用默认参数@RedisQueueListener(queue = "test_queue")public void test(JSONObject message){System.out.println(message);}//指定单个实例下使用5个消费线程@RedisQueueListener(queue = "test_queue2", consumers = 5)public void test2(JSONObject message){System.out.println(message);}//单个实例5个线程,手动确认@RedisQueueListener(queue = "test_queue3", consumers = 5, autoAck = false)public void test3(JSONObject message){System.out.println(message);}}

@RedisQueueListener注解支持的所有参数:

package com.leng.project.redisqueue.annotation;import java.lang.annotation.*;@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisQueueListener {/*** 队列名** @return*/String queue() default "";/*** 消费者线程数** @return*/int consumers() default 1;/*** 是否自动确认** @return*/boolean autoAck() default true;/*** 一次从队列中取多少数据** @return*/int prefetch() default 50;/*** 获取消息的频率,单位秒* @return*/long frequency() default 2;
}

其中:

  • consumers:单个实例下启动多少个消费线程,默认为1
  • autoAck:是否自动确认消息,默认为true。自动确认与手动确认的区别:
    • 自动确认:消费线程从队列中取出消息,如果消费失败,则该条消息丟失
    • 手动确认:消费线程从队列中取出消息,并将消息写入待确认队列中;如果消费失败,则一段时间后(15分钟)会重新入队,消费端要做幂等性处理
  • prefetch:一个消费线程一次性从队列中取出多少条消息,因为涉及锁的竞争,不宜过小,默认为50
  • frequency:单个消费线程每隔多少秒获取一次消息,默认为2,最小值为1。有人可能会奇怪,消息不是应该即时消费吗?不是越快越好吗?实际上,有些业务场景对消息的实时性要求很低,几天、几个月、甚至一年才执行一次,这时我们完全可以把frequency调大,以减轻redis的压力

延时队列

延时队列的常用场景如用户下单,xx分钟后没有支付则自动关闭订单;已支持的订单,xxx天后自动确认收货等。

生产者发送消息

@Autowired
private RedisQueueTemplate redisQueueTemplate;/*** 1次只发送1条消息*/
public void test1(){JSONObject message = new JSONObject();message.put("bondId", "17f62f1dfb5afb12e8d67cd651c1df53");message.put("year", 2022);//延时5秒redisQueueTemplate.sendDelayMessage("test_delay_queue", message, 5, TimeUnit.SECONDS);
}/*** 批量发送,每条消息的延时时长一样*/
public void test2(){List messageList = new ArrayList<>();for (int i = 0; i < 5000; i++) {JSONObject mess = new JSONObject();mess.put("index", i);messageList.add(mess);}//延时5秒redisQueueTemplate.sendDelayMessageAll(queue, messageList, 5, TimeUnit.SECONDS);
}/*** 批量发送,每条消息的延时时长各不相同*/
public void test3(){List messageList=new ArrayList<>();for(int i=0; i< 5000; i++){JSONObject mess=new JSONObject();mess.put("index",i);//每条消息可以使用不同的延时时长,这里为了简便,统一写成5了DelayMessageParam param=new DelayMessageParam(mess,5,TimeUnit.SECONDS);messageList.add(param);}redisQueueTemplate.sendDelayMessageAll(queue,messageList);
}

注:示例中每条MQ消息都用JSONObject包装,这只是我的个人习惯,你也可以使用实体类

消费者消费消息

@RedisDelayQueueListener注解的参数与@RedisQueueListener完全相同;消费方法的参数只能有1个,并且类型要与生产者发送消费的类型保存一致:

@Component
public class DelayQueueConsumer {/*** 使用默认参数* @param message*/@RedisDelayQueueListener(queue = "test_delay_queue")public void test(JSONObject message){System.out.println(message);}/*** 单个实例5个消费线程* @param message*/@RedisDelayQueueListener(queue = "test_delay_queue2", consumers = 5)public void test2(JSONObject message){System.out.println(message);}/*** 单个实例5个消费线程,手动确认* @param message*/@RedisDelayQueueListener(queue = "test_delay_queue3", consumers = 5, autoAck = false)public void test3(JSONObject message){System.out.println(message);}}

虚拟空间

参考了RabbitMQ的设计。虚拟空间很有必要,例如,开发环境和测试环境的数据如果没有隔离,在调试时被测试环境的消费端干扰。
配置虚拟空间:

queue:virtual-host: '/dev'  #默认为 /

Web管理平台

浏览器访问:http://ip:port/queue.html,默认的账号密码为admin/admin

配置账号:

queue:console:#是否启用web控制台enable: trueusername: admin #登录用户名password: 123456 #密码

登录成功后的界面,可查看所有虚拟空间的队列及消费情况:
image.png

注:如果你的系统使用了权限控制框架,如shiro、spring-security等,则需要对如下3个资源放行:

  • /queue.html
  • /queue/**
  • /static/**

ps:
项目地址:https://github.com/lengmianshi/redisMQ-spring-boot-starter,欢迎提bug

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/699963.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++ 学习之Map容器

C++ Map容器概念 C++的Map容器是一种关联容器,它提供了一种将键和值相关联的方式。它以键值对的形式存储数据,并根据键的顺序自动进行排序。 Map中的键是唯一的,而值可以重复。你可以使用键来访问对应的值,就像使用索引访问数组中的元素一样。 Map容器的特点如下: 按照…

一些可以参考的文档集合16

之前的文章集合: 一些可以参考文章集合1_xuejianxinokok的博客-CSDN博客 一些可以参考文章集合2_xuejianxinokok的博客-CSDN博客 一些可以参考的文档集合3_xuejianxinokok的博客-CSDN博客 一些可以参考的文档集合4_xuejianxinokok的博客-CSDN博客 一些可以参考的文档集合5…

IDEA查询对应功能的快捷键

首先要知道快捷键的key叫什么&#xff0c;然后通过key来找到对应的快捷键 比如下面这个查找删除导入未使用的类 跳转 或者安装对应插件

可视化 RAG 数据 — 用于检索增强生成的 EDA

原文地址&#xff1a;Visualize your RAG Data — EDA for Retrieval-Augmented Generation 2024 年 2 月 8 日 Github&#xff1a;https://github.com/Renumics/rag-demo/blob/main/notebooks/visualize_rag_tutorial.ipynb 为探索Spotlight中的数据&#xff0c;我们使用Pa…

怎么在线生成动态gif?这个网站一定要知道

静态图片是指一张固定的、不具有动态效果的图片。它通常是由像素点组成的&#xff0c;可以是照片、插图、图标等。静态图片只能呈现一种特定的场景或图像&#xff0c;不能展示动态变化。动态图片&#xff08;是由一系列静态图片组成的&#xff0c;通过快速连续播放这些画面&…

计算机网络-网络层,运输层,应用层

网络层/网际层 网络层的主要任务包括&#xff1a; 提供逻辑上的端到端通信&#xff1a;网络层负责确定数据的传输路径&#xff0c;使数据能够从源主机传输到目标主机&#xff0c;即实现端到端的通信。数据包的路由和转发&#xff1a;网络层根据目标主机的地址信息&#xff0c…

代码随想录刷题39,40天|62.不同路径

62.不同路径 想要求dp[i][j]&#xff0c;只能有两个方向来推导出来&#xff0c;即dp[i - 1][j] 和 dp[i][j - 1]。 此时在回顾一下 dp[i - 1][j] 表示啥&#xff0c;是从(0, 0)的位置到(i - 1, j)有几条路径&#xff0c;dp[i][j - 1]同理。 那么很自然&#xff0c;dp[i][j] …

【MySQL】数据类型(常见类型)-- 详解

一、数据类型分类 二、数值类型 1、tinyint 类型 在 MySQL 中&#xff0c;整型可以指定是有符号的和无符号的&#xff0c;默认是有符号的。 有符号&#xff1a; 插入数据越界测试&#xff1a; 在 MySQL 表中建立属性列时&#xff0c;我们可以发现列名称在前&#xff0c;类型在…

深入探索pdfplumber:从PDF中提取信息到实际项目应用【第94篇—pdfplumbe】

深入探索pdfplumber&#xff1a;从PDF中提取信息到实际项目应用 在数据处理和信息提取的过程中&#xff0c;PDF文档是一种常见的格式。然而&#xff0c;要从PDF中提取信息并进行进一步的分析&#xff0c;我们需要使用适当的工具。本文将介绍如何使用Python库中的pdfplumber库来…

华为OD机试真题-虚拟游戏理财-2023年OD统一考试(C卷)---Python3--开源

题目&#xff1a; 考察内容&#xff1a; for if max 代码&#xff1a; """ 题目分析&#xff1a;投资额*回报率投资回报 要在可接受范围内选择最优的投资方式获得最大回报最多投资2个理财产品输入&#xff1a; 产品数int; 总投资额int; 总风险int 产品投资…

C++入门学习(三十七)函数分文件编写【DEV】

创建.h后级名的头文件创建.cpp后缀名的源文件在头文件中写函数的声明在源文件中写函数的定义 一、选择文件、新建、项目 二、 选择Empty Project 三、 新建源文件New File 四、贴代码 test.cpp #include <iostream> #include "add.h" using namespace std;i…

《TCP/IP详解 卷一》第4章 地址解析协议ARP

目录 4.1 引言 4.2 一个例子 4.3 ARP缓存 4.4 ARP帧格式 4.5 ARP例子 4.6 ARP缓存超时 4.7 代理ARP 4.8 免费ARP和地址冲突检测 4.9 ARP命令 4.10 使用ARP设置嵌入式设备IPv4地址 4.11 与ARP相关攻击 4.12 总结 4.1 引言 地址解析&#xff1a; IPv4&#xff1a;AR…

基于Springboot实现课程评分系统设计和实现

基于java Springboot实现课程评分系统设计和实现 博主介绍&#xff1a;5年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 央顺技术团队 Java毕设项目精品实战案例《1000套》 欢迎点赞 收藏 ⭐留言 文末获取源码…

Linux基础知识——命令行模式下命令的执行

文章目录 Linux基础知识——命令行模式下命令的执行开始执行Linux命令Linux基础命令的操作常用Linux命令行操作按键Linux输出错误信息查看 Linux系统在线帮助--help选项man命令info命令其他有用的文件文档百度搜索 文本编辑器&#xff1a;nanonano启动&#xff01; 正确关机方法…

__proto__和protype的区别

概述&#xff1a; prototype 函数静态属性&#xff0c;非实例属性,所有实例都可以继承它 __proto__ 实例属性&#xff0c;指向实例的原型对象&#xff0c;原型对象包括构造函数和protype属性 替代 现代浏览器中可以使用Object.getPrototypeOf()来替代__proto__来获取原型对象 …

Vue3自定义组件v-model双向绑定

无能吐槽一下&#xff0c;虽然用了很多遍v-model&#xff0c;但是还是不得要领&#xff0c;每次看官网都感觉说的不是很清晰&#xff0c;在写的时候还是要查看文档&#xff0c;可能就是不理解原理&#xff0c;这次特意好好写一篇文章&#xff0c;让自己好好理解一下。 自定义一…

单线程传奇Redis,为何引入多线程?

大家都知道 Redis 的速度非常的快&#xff0c;这其中一个关键原因就是它采用了单线程模型&#xff0c;这也是它的一大独特之处。那么问题来了&#xff0c;既然单线程模型已经如此出色&#xff0c;为什么后续版本还要搞上多线程呢&#xff1f; 本文主要分析一下多线程在Redis中的…

推出新款H7-TOOL 2024版,同时发布新版固件V2.25(2024-02-24)

H7-TOOL 2024版介绍 1、开模定制外壳&#xff0c;取消了侧面的IO接口&#xff0c;汇集到一个主端口&#xff08;2 * 17P排针&#xff09;。 2、显示屏升级为2.8寸&#xff08;分辨率320*240)。 3、两个按键升级为4个按键&#xff1a;上键、下键&#xff0c;OK确认键和C取消键。…

【大数据】Flink 内存管理(二):JobManager 内存分配(含实际计算案例)

Flink 内存管理&#xff08;二&#xff09;&#xff1a;JobManager 内存分配 1.分配 Total Process Size2.分配 Total Flink Size3.单独分配 Heap Size4.分配 Total Process Size 和 Heap Size5.分配 Total Flink Size 和 Heap Size JobManager 是 Flink 集群的控制元素。它由三…

【经验分享】分类算法与聚类算法有什么区别?白话讲解

经常有人会提到这个问题&#xff0c;从我个人的观点和经验来说2者最明显的特征是&#xff1a;分类是有具体分类的数量&#xff0c;而聚类是没有固定的分类数量。 你可以想象一下&#xff0c;分类算法就像是给你一堆水果&#xff0c;然后告诉你苹果、香蕉、橙子分别应该放在哪里…