使用redisMQ-spring-boot-starter实现消息队列和延时队列

简介

redisMQ-spring-boot-starter是一个轻量级的、基于Redis实现的消息队列中间件,它有如下优点:

  • 开箱即用,你几乎不用添加额外的配置
  • 支持消息队列、延时队列,并提供精细化配置参数
  • 提供消息确认机制
  • 支持虚拟空间,不同虚拟空间的数据互相隔离
  • 支持web控制台,实时查看各个队列的消费情况

开始使用

引用依赖

springboot3.0以下版本:

<dependency><groupId>io.github.lengmianshi</groupId><artifactId>redisMQ-spring-boot-starter</artifactId><version>1.0.4</version>
</dependency><!-- 以下配置可以改为你自己的版本 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.1.0.RELEASE</version>
</dependency>
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.2</version>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version>
</dependency>

注:spring-boot-starter-data-redis依赖于spring-data-redis,如果发生依赖冲突,要确保spring-data-redis的版本不低于2.1.0.RELEASE,可在你的pom.xml中锁定版本:

<dependencyManagement><dependencies><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-redis</artifactId><version>2.1.2.RELEASE</version></dependency></dependencies>
</dependencyManagement>

springboot3.0:

<dependency><groupId>io.github.lengmianshi</groupId><artifactId>redisMQ-spring-boot-starter</artifactId><version>2.0.4</version>
</dependency><!-- 以下配置可以改为你自己的版本 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>3.2.1</version>
</dependency>
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>5.1.0</version>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version>
</dependency>

配置redis

一般引入redis的项目都会事先配置,如果你的项目没配置过,则可在application.yml中加上如下配置:

springboot3.0以下版本:

spring:redis:host: test.redis.com  #改成你的password: vC403V2KMc0Kghz #改成你的port: 6379 #改成你的jedis:pool:max-active: 100max-idle: 10min-idle: 10timeout: 2000

springboot3.0:

spring:data:redis:host: test.redis.com #改成你的password: vC403V2KMc0Kghz #改成你的port: 6379 #改成你的jedis:pool:max-active: 100max-idle: 10min-idle: 10timeout: 2000

消息队列

生产者发送消息

@Autowired
private RedisQueueTemplate redisQueueTemplate;/*** 1次只发送一条消息*/
@Test
public void test1() {JSONObject message = new JSONObject();message.put("bondId", "17f62f1dfb5afb12e8d67cd651c1df53");message.put("year", 2022);redisQueueTemplate.sendMessage("test_queue", message);
}/*** 批量发送消息*/
@Test
public void test2() {List messageList = new ArrayList<>();for (int i = 0; i < 5000; i++) {JSONObject mess = new JSONObject();mess.put("index", i);messageList.add(mess);}redisQueueTemplate.sendMessageAll("test_queue", messageList);
}

注:示例中每条MQ消息都用JSONObject包装,这只是我的个人习惯,你也可以使用实体类

消费者消费消息

消费方法的参数只能有1个,并且类型要与生产者发送消费的类型保存一致:

@Component
public class QueueConsumer {//使用默认参数@RedisQueueListener(queue = "test_queue")public void test(JSONObject message){System.out.println(message);}//指定单个实例下使用5个消费线程@RedisQueueListener(queue = "test_queue2", consumers = 5)public void test2(JSONObject message){System.out.println(message);}//单个实例5个线程,手动确认@RedisQueueListener(queue = "test_queue3", consumers = 5, autoAck = false)public void test3(JSONObject message){System.out.println(message);}}

@RedisQueueListener注解支持的所有参数:

package com.leng.project.redisqueue.annotation;import java.lang.annotation.*;@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisQueueListener {/*** 队列名** @return*/String queue() default "";/*** 消费者线程数** @return*/int consumers() default 1;/*** 是否自动确认** @return*/boolean autoAck() default true;/*** 一次从队列中取多少数据** @return*/int prefetch() default 50;/*** 获取消息的频率,单位秒* @return*/long frequency() default 2;
}

其中:

  • consumers:单个实例下启动多少个消费线程,默认为1
  • autoAck:是否自动确认消息,默认为true。自动确认与手动确认的区别:
    • 自动确认:消费线程从队列中取出消息,如果消费失败,则该条消息丟失
    • 手动确认:消费线程从队列中取出消息,并将消息写入待确认队列中;如果消费失败,则一段时间后(15分钟)会重新入队,消费端要做幂等性处理
  • prefetch:一个消费线程一次性从队列中取出多少条消息,因为涉及锁的竞争,不宜过小,默认为50
  • frequency:单个消费线程每隔多少秒获取一次消息,默认为2,最小值为1。有人可能会奇怪,消息不是应该即时消费吗?不是越快越好吗?实际上,有些业务场景对消息的实时性要求很低,几天、几个月、甚至一年才执行一次,这时我们完全可以把frequency调大,以减轻redis的压力

延时队列

延时队列的常用场景如用户下单,xx分钟后没有支付则自动关闭订单;已支持的订单,xxx天后自动确认收货等。

生产者发送消息

@Autowired
private RedisQueueTemplate redisQueueTemplate;/*** 1次只发送1条消息*/
public void test1(){JSONObject message = new JSONObject();message.put("bondId", "17f62f1dfb5afb12e8d67cd651c1df53");message.put("year", 2022);//延时5秒redisQueueTemplate.sendDelayMessage("test_delay_queue", message, 5, TimeUnit.SECONDS);
}/*** 批量发送,每条消息的延时时长一样*/
public void test2(){List messageList = new ArrayList<>();for (int i = 0; i < 5000; i++) {JSONObject mess = new JSONObject();mess.put("index", i);messageList.add(mess);}//延时5秒redisQueueTemplate.sendDelayMessageAll(queue, messageList, 5, TimeUnit.SECONDS);
}/*** 批量发送,每条消息的延时时长各不相同*/
public void test3(){List messageList=new ArrayList<>();for(int i=0; i< 5000; i++){JSONObject mess=new JSONObject();mess.put("index",i);//每条消息可以使用不同的延时时长,这里为了简便,统一写成5了DelayMessageParam param=new DelayMessageParam(mess,5,TimeUnit.SECONDS);messageList.add(param);}redisQueueTemplate.sendDelayMessageAll(queue,messageList);
}

注:示例中每条MQ消息都用JSONObject包装,这只是我的个人习惯,你也可以使用实体类

消费者消费消息

@RedisDelayQueueListener注解的参数与@RedisQueueListener完全相同;消费方法的参数只能有1个,并且类型要与生产者发送消费的类型保存一致:

@Component
public class DelayQueueConsumer {/*** 使用默认参数* @param message*/@RedisDelayQueueListener(queue = "test_delay_queue")public void test(JSONObject message){System.out.println(message);}/*** 单个实例5个消费线程* @param message*/@RedisDelayQueueListener(queue = "test_delay_queue2", consumers = 5)public void test2(JSONObject message){System.out.println(message);}/*** 单个实例5个消费线程,手动确认* @param message*/@RedisDelayQueueListener(queue = "test_delay_queue3", consumers = 5, autoAck = false)public void test3(JSONObject message){System.out.println(message);}}

虚拟空间

参考了RabbitMQ的设计。虚拟空间很有必要,例如,开发环境和测试环境的数据如果没有隔离,在调试时被测试环境的消费端干扰。
配置虚拟空间:

queue:virtual-host: '/dev'  #默认为 /

Web管理平台

浏览器访问:http://ip:port/queue.html,默认的账号密码为admin/admin

配置账号:

queue:console:#是否启用web控制台enable: trueusername: admin #登录用户名password: 123456 #密码

登录成功后的界面,可查看所有虚拟空间的队列及消费情况:
image.png

注:如果你的系统使用了权限控制框架,如shiro、spring-security等,则需要对如下3个资源放行:

  • /queue.html
  • /queue/**
  • /static/**

ps:
项目地址:https://github.com/lengmianshi/redisMQ-spring-boot-starter,欢迎提bug

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/699963.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++ 学习之Map容器

C++ Map容器概念 C++的Map容器是一种关联容器,它提供了一种将键和值相关联的方式。它以键值对的形式存储数据,并根据键的顺序自动进行排序。 Map中的键是唯一的,而值可以重复。你可以使用键来访问对应的值,就像使用索引访问数组中的元素一样。 Map容器的特点如下: 按照…

一些可以参考的文档集合16

之前的文章集合: 一些可以参考文章集合1_xuejianxinokok的博客-CSDN博客 一些可以参考文章集合2_xuejianxinokok的博客-CSDN博客 一些可以参考的文档集合3_xuejianxinokok的博客-CSDN博客 一些可以参考的文档集合4_xuejianxinokok的博客-CSDN博客 一些可以参考的文档集合5…

IDEA查询对应功能的快捷键

首先要知道快捷键的key叫什么&#xff0c;然后通过key来找到对应的快捷键 比如下面这个查找删除导入未使用的类 跳转 或者安装对应插件

可视化 RAG 数据 — 用于检索增强生成的 EDA

原文地址&#xff1a;Visualize your RAG Data — EDA for Retrieval-Augmented Generation 2024 年 2 月 8 日 Github&#xff1a;https://github.com/Renumics/rag-demo/blob/main/notebooks/visualize_rag_tutorial.ipynb 为探索Spotlight中的数据&#xff0c;我们使用Pa…

Nodejs 第四十一章(项目架构MVC,IoC,DI)

到现在为止&#xff0c;我们学习了&#xff0c;express框架&#xff0c;编写接口&#xff0c;mysql数据库读写数据&#xff0c;knex,prisma ORM框架&#xff0c;现在是时候把这些组合到一起&#xff0c;并且实现一个类似于Nestjs或者java的SpringBoot的架构真正的去开发我们的n…

怎么在线生成动态gif?这个网站一定要知道

静态图片是指一张固定的、不具有动态效果的图片。它通常是由像素点组成的&#xff0c;可以是照片、插图、图标等。静态图片只能呈现一种特定的场景或图像&#xff0c;不能展示动态变化。动态图片&#xff08;是由一系列静态图片组成的&#xff0c;通过快速连续播放这些画面&…

计算机网络-网络层,运输层,应用层

网络层/网际层 网络层的主要任务包括&#xff1a; 提供逻辑上的端到端通信&#xff1a;网络层负责确定数据的传输路径&#xff0c;使数据能够从源主机传输到目标主机&#xff0c;即实现端到端的通信。数据包的路由和转发&#xff1a;网络层根据目标主机的地址信息&#xff0c…

代码随想录刷题39,40天|62.不同路径

62.不同路径 想要求dp[i][j]&#xff0c;只能有两个方向来推导出来&#xff0c;即dp[i - 1][j] 和 dp[i][j - 1]。 此时在回顾一下 dp[i - 1][j] 表示啥&#xff0c;是从(0, 0)的位置到(i - 1, j)有几条路径&#xff0c;dp[i][j - 1]同理。 那么很自然&#xff0c;dp[i][j] …

【MySQL】数据类型(常见类型)-- 详解

一、数据类型分类 二、数值类型 1、tinyint 类型 在 MySQL 中&#xff0c;整型可以指定是有符号的和无符号的&#xff0c;默认是有符号的。 有符号&#xff1a; 插入数据越界测试&#xff1a; 在 MySQL 表中建立属性列时&#xff0c;我们可以发现列名称在前&#xff0c;类型在…

小明找位置(C语言)【二分查找】

题目描述 小朋友出操&#xff0c;按学号从小到大排成一列&#xff1b;小明来迟了&#xff0c;请你给小明出个主意&#xff0c;让他尽快找到他应该排的位置。 算法复杂度要求不高于 nLog(n)&#xff1b;学号为整数类型&#xff0c;队列规模<10000&#xff1b; 输入描述 1…

深入探索pdfplumber:从PDF中提取信息到实际项目应用【第94篇—pdfplumbe】

深入探索pdfplumber&#xff1a;从PDF中提取信息到实际项目应用 在数据处理和信息提取的过程中&#xff0c;PDF文档是一种常见的格式。然而&#xff0c;要从PDF中提取信息并进行进一步的分析&#xff0c;我们需要使用适当的工具。本文将介绍如何使用Python库中的pdfplumber库来…

Vue全局指令防止重复点击(等待请求)

继《vue之全局请求loading》之后&#xff0c;总觉得全局loading有时候不太…友好&#xff0c;所以总想将loading加到被点击的元素上面&#xff0c;于是乎就想到了点击事件与请求方法相关联&#xff0c;本想重写组件的click方法&#xff0c;但是这样对组件的影响太大&#xff0c…

【分享】一道面试题的思考。。。

是一个同学面试时遇见的&#xff0c;他被鄙视了&#xff0c;大家看看自己是否可以过关。 题目如下&#xff1a; 在32位机器上&#xff0c;用你觉得最高效的方法实现memcpy函数。 void*memcpy(void*dest,void*src,unsignedintsize); 大家好好考虑一下这个题 很不容写好的 因…

华为OD机试真题-虚拟游戏理财-2023年OD统一考试(C卷)---Python3--开源

题目&#xff1a; 考察内容&#xff1a; for if max 代码&#xff1a; """ 题目分析&#xff1a;投资额*回报率投资回报 要在可接受范围内选择最优的投资方式获得最大回报最多投资2个理财产品输入&#xff1a; 产品数int; 总投资额int; 总风险int 产品投资…

C++入门学习(三十七)函数分文件编写【DEV】

创建.h后级名的头文件创建.cpp后缀名的源文件在头文件中写函数的声明在源文件中写函数的定义 一、选择文件、新建、项目 二、 选择Empty Project 三、 新建源文件New File 四、贴代码 test.cpp #include <iostream> #include "add.h" using namespace std;i…

《TCP/IP详解 卷一》第4章 地址解析协议ARP

目录 4.1 引言 4.2 一个例子 4.3 ARP缓存 4.4 ARP帧格式 4.5 ARP例子 4.6 ARP缓存超时 4.7 代理ARP 4.8 免费ARP和地址冲突检测 4.9 ARP命令 4.10 使用ARP设置嵌入式设备IPv4地址 4.11 与ARP相关攻击 4.12 总结 4.1 引言 地址解析&#xff1a; IPv4&#xff1a;AR…

Python代码实例调用淘宝封装API接口批量获取淘宝商品详情及描述SKU数据

在Python中获取淘宝商品详情&#xff0c;通常可以通过以下几种方式实现&#xff1a; 1. 使用Taobao Open Platform API&#xff1a;淘宝提供了API接口【注册封装API接口可以免费测试】&#xff0c;允许开发者通过API请求获取商品信息。你可以先注册成为淘宝开发者&#xff0c;…

基于Springboot实现课程评分系统设计和实现

基于java Springboot实现课程评分系统设计和实现 博主介绍&#xff1a;5年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 央顺技术团队 Java毕设项目精品实战案例《1000套》 欢迎点赞 收藏 ⭐留言 文末获取源码…

$nextTick有什么作用?

$nextTick有什么作用&#xff1f; 一、NextTick是什么 为什么要有nexttick 二、使用场景 三、实现原理 $nextTick有什么作用&#xff1f; 一、NextTick是什么 官方对其的定义 在下次 DOM 更新循环结束之后执行延迟回调。在修改数据之后立即使用这个方法&#xff0c;获取更…

Linux基础知识——命令行模式下命令的执行

文章目录 Linux基础知识——命令行模式下命令的执行开始执行Linux命令Linux基础命令的操作常用Linux命令行操作按键Linux输出错误信息查看 Linux系统在线帮助--help选项man命令info命令其他有用的文件文档百度搜索 文本编辑器&#xff1a;nanonano启动&#xff01; 正确关机方法…