RedisTemplate-opsForStream实现消息队列,主要演示 xgroup,xreadgroup,xpending,xack,xinfo的用法

stream  更多详细命令使用,可查看博文
redis基于Stream类型实现消息队列,命令操作,术语概念,个人总结等-CSDN博客

1 springboot整合redis 就不多说了

2 有用到hutool工具类,添加下 pom 依赖

<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.5</version><scope>compile</scope></dependency>

3 写个pojo类

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SeckillOrder {private String userId;private String goodsId;private String orderId;}

4 往redis 写入些测试数据,如下

del order_streamkeys *# 往消息队列添加消息
xadd order_stream * userId user_01, goodsId goodsId_01, orderId order_01
xadd order_stream * userId user_02, goodsId goodsId_02, orderId order_02
xadd order_stream * userId user_03, goodsId goodsId_03, orderId order_03
xadd order_stream * userId user_04, goodsId goodsId_04, orderId order_04
xadd order_stream * userId user_05, goodsId goodsId_05, orderId order_05
xadd order_stream * userId user_06, goodsId goodsId_06, orderId order_06
xadd order_stream * userId user_07, goodsId goodsId_07, orderId order_07
xadd order_stream * userId user_08, goodsId goodsId_08, orderId order_08
xadd order_stream * userId user_09, goodsId goodsId_09, orderId order_09
xadd order_stream * userId user_10, goodsId goodsId_10, orderId order_10# 获取流中的数据(- 表示最小值,+表示最大值)
xrange order_stream - +创建g1消费者组,从头开始消费
XGROUP CREATE order_stream g1 0-0创建g2消费者组,从尾部开始消费
XGROUP CREATE order_stream g2 $# 通过 xreadgroup 的一个消费者(consumer1)读取流(order_stream)中的1条消息
xreadgroup group g1 consumer1 count 1 streams order_stream ># 显示g1消费者组 待处理消息的相关信息
xpending order_stream g1# 打印流信息
xinfo stream order_stream
# 打印消费组信息
xinfo groups order_stream

5 核心类,主要演示 xreadgroup,xpending,xack的用法

import cn.hutool.core.bean.BeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.example.service_a.domain.SeckillOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;@Component
@Slf4j
public class Redis_Stream_Test {@Autowiredprivate StringRedisTemplate redisTemplate;// 定义流的键名public static final String stream_key = "order_stream";// 定义消费者组名public static final String group_str = "g1";// 定义消费者名称public static final String consumer_str = "c1";/*** 消费流中的消息* 该方法会不断读取指定流中的消息并处理,直到没有消息为止。* 用到命令 xreadgroup* 每个消费者组执行这个方法后,假如队列有5条消息,第1次执行,就返回5条消息,再次执行此方法,就返回为空* 如果还想读这5条消息,就换个消费者组名称就可以了*/@EnableScheduling //添加到主启动类上@Scheduled(cron="0/5 * *  * * ? ")   //每5秒执行一次public void read_message() {try {if (!redisTemplate.hasKey(stream_key)) {return;}// 检查是否已创建消费者组,若未创建则创建Set<StreamInfo.XInfoGroup> collect = redisTemplate.opsForStream().groups(stream_key).stream().filter(infoGroup -> infoGroup.groupName().equals(group_str)).collect(Collectors.toSet());if (collect.isEmpty()) {// 创建消费者组,****从头开始读****// XGROUP CREATE order_stream g1 0-0String group = redisTemplate.opsForStream().createGroup(stream_key, ReadOffset.from("0-0"), group_str);log.info("消费者组添加成功,group = {}", group);}// 读取消息,可以不阻塞读,也可以阻塞方式来读 如 xreadgroup group g1 c1 count 5 block 60000 order_stream >List<MapRecord<String, Object, Object>> read = redisTemplate.opsForStream().read(Consumer.from(group_str, consumer_str),StreamReadOptions.empty().count(5),StreamOffset.create(stream_key, ReadOffset.lastConsumed()));// 若无消息则返回if (read.isEmpty()) {return;}// 记录读取到的消息数量log.info("读取到消息,size = {}", read.size());// 处理每条消息read.forEach(mapRecord -> {// 记录消息ID和内容RecordId recordId = mapRecord.getId();Map<Object, Object> value = mapRecord.getValue();log.info("消息id = {}", mapRecord.getId());log.info("消息内容 = {}", mapRecord.getValue());// 将消息内容转换为实体对象,用hutoolSeckillOrder seckillOrder = BeanUtil.fillBeanWithMap(value, new SeckillOrder(), false);log.info("消息内容转成实体查看 = {}", seckillOrder);// 进行业务逻辑处理等});} catch (Exception e) {// 记录读取消息失败的日志log.error("读取消息失败,e = {}", e);}}/*** 确认消息处理完成* 该方法会读取并确认指定消费者组下指定消费者处理完成的消息。* 用到命令 xpending , xack*/public void ack_message() {try {// 获取待处理的消息PendingMessages pending = redisTemplate.opsForStream().pending(stream_key,Consumer.from(group_str, consumer_str));log.info("待处理消息 = {}", pending);// 遍历并确认每条待处理消息pending.forEach(pendingMessage -> {// 记录待处理消息的元数据log.info("待处理消息元数据 = {}", pendingMessage);RecordId recordId = pendingMessage.getId();log.info("待处理消息id = {}", recordId);// 提交消息确认redisTemplate.opsForStream().acknowledge(stream_key, group_str, recordId);log.info("消息提交ack成功,id = {}", recordId);});} catch (Exception e) {throw new RuntimeException(e);}}/*** 消费者组 ,消费者,队列的信息*/public void xInfo() {StreamInfo.XInfoGroups groups = redisTemplate.opsForStream().groups(stream_key);log.info("获取组的信息:{}", groups);System.out.println();StreamInfo.XInfoConsumers consumers = redisTemplate.opsForStream().consumers(stream_key, group_str);log.info("获取消费者的信息:{}", consumers);System.out.println();StreamInfo.XInfoStream infoStream = redisTemplate.opsForStream().info(stream_key);log.info("获取流的信息:{}", infoStream);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/3785.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++中的queue(容器适配器)

目录 一、成员函数 一、构造函数 二、入栈 push 三、出栈 pop 四、判空 empty 五、队列大小 size 六、取队头元素 front 七、取队尾元素 back 八、入栈 emplace 九、交换函数 swap 二、非成员函数重载 一、关系运算符重载 二、交换函数 swap C中的queue不再是容…

在 Windows 下搭建自己的深度学习开发环境

通常程序员们会选择 Ubuntu 这类 linux 环境开发深度学习算法&#xff0c;但在 Windows 做类似的工作有时也会带来很多方便。本文梳理一下在 Windows 部署深度学习开发环境的一般步骤&#xff0c;供大家参考。 一、检查GPU显卡 1、查看显卡规格 在Windows下&#xff0c;可以…

JUC之线程、线程池

一、start与run方法 start方法开启一个新线程&#xff0c;异步执行。 run方法同步执行&#xff0c;不会产生新的线程。 start方法只能执行一次&#xff0c;run方法可以执行多次。 二、一些方法 sleep() 线程睡眠 两种方式调用&#xff1a; Thread.sleep(1000);TimeUnit.…

SAP MRP中的滚动提前期简介(MRP自动删除已固定计划订单)

通常被标记了固定标识的计划订单在运行MRP的时候系统是不会自动删除这部分计划订单的,但是SAP提供了“滚动提前期”这一功能,允许我们通过设定规则,在MRP运算的同时,自动删除在规定期间范围内的被固定的计划订单,从而避免这种干扰MRP运算的现象发生。 下面我们模拟这个场…

Oracle基础1

数据准备 create table my_user (id number primary key,name1 varchar2(30),name2 varchar2(30) ); alter table my_useradd(name3 varchar2(30)); -- 修改字段 alter table my_usermodify(name1 varchar2(10),name2 varchar2(20)); -- 修改字段名 alter table my_user re…

SignalR中的重连机制和心跳监测机制详解

一. 重连机制 声明&#xff1a;   本节仅介绍重连机制和心跳监测机制&#xff0c;基于Core 3.1框架&#xff0c;至于SignalR其它的一些基本使用&#xff0c;包括引入、Hub、配置等常规操作&#xff0c;在本节中不介绍&#xff0c;后续写Core下的SignalR 说明   默认是没有重…

钥匙和房间

题目链接 钥匙和房间 题目描述 注意点 所有 rooms[i] 的值 互不相同如果能进入所有房间返回true&#xff0c;否则返回falserooms[i] 是进入 i 号房间可以获得的钥匙集合 解答思路 可以通过深度优先遍历找到所有可以访问的房间&#xff0c;需要注意的是同一个房间不能重复访…

(数据分析) 一季度四川外贸进出口增长7.8% 回稳向好态势显著

据海关统计&#xff0c;一季度&#xff0c;四川实现货物贸易进出口总值2415.4亿元&#xff0c;规模位列全国第8&#xff0c;同比&#xff08;下同&#xff09;增长7.8%&#xff0c;增速高出全国2.8个百分点。主要呈现以下特点&#xff1a; 一、进口显著回升&#xff0c;进、出口…

2024.4.26

//异或 Complex Complex::operator^(const Complex c1){Complex temp;temp.relthis->rel^c1.rel;temp.virthis->vir^c1.vir;return temp; }//按位取反 Complex Complex::operator~(){this->rel~this->rel;this->vir~this->vir;return* this; } //左移 Comple…

vue+element之解决upload组件上传文件失败后仍显示在列表上、自动上传、过滤、findIndex、splice、filter

MENU 前言错误案例(没有用)正确方法结束语 前言 el-upload上传失败后&#xff0c;文件仍显示在列表上。 这个pdf文件上传失败&#xff0c;仍显示在列表&#xff0c;给人错觉是上传成功&#xff0c;所以要把它去掉。 在element中&#xff0c;file-list和v-model:file-list是用于…

Bun 入门到精通(一)

Bun 是什么&#xff1f; Bun 是用于 JavaScript 和 TypeScript 应用程序的多合一工具包。它作为一个名为 bun 的可执行文件提供。 其核心是 Bun 运行时&#xff0c;这是一个快速的 JavaScript 运行时&#xff0c;旨在替代 Node.js。它是用 Zig 编写的&#xff0c;并由 JavaSc…

Matlab|交直流系统潮流计算(含5种控制模式)

目录 1 主要内容 程序参考流程图 2 部分代码 3 程序结果 4 下载链接 1 主要内容 该程序参考文献《交直流系统潮流计算及相互关联特性分析》&#xff0c;采用5种交直流潮流控制方式&#xff1a;1.定电流定电压 2.定电流定熄弧角 3.定功率定电压 4.定功率定熄弧角 5.定触发角…

Kafka 生产者应用解析

目录 1、生产者消息发送流程 1.1、发送原理 2、异步发送 API 2.1、普通异步发送 2.2、带回调函数的异步发送 3、同步发送 API 4、生产者分区 4.1、分区的优势 4.2、生产者发送消息的分区策略 示例1&#xff1a;将数据发往指定 partition 示例2&#xff1a;有 key 的…

ollama集成open-webui本地部署大模型应用

文章目录 概述安装ollama运行指定模型命令帮助docker 安装 webuiWebUI与ollama在同一台机器WebUI与ollama不在同一台机器(推荐)更新open-webui离线安装open-webui使用验证访问导入模型文件参考资料概述 ollama是一款在本地启动并运行大型语言模型的工具,主要功能是在 Docke…

原型模式(上机考试抽题)

定义 原型模式主要解决的问题就是创建复对象&#xff0c;⽽这部分 对象 内容本身⽐较复杂&#xff0c;⽣成过程可能从库或者RPC接⼝中获取数据的耗时较⻓&#xff0c;因此采⽤克隆的⽅式节省时间。 上机考试抽题 从⼀部分可以上机考试的内容开始&#xff0c;在保证⼤家的公平…

经典的免费wordpress模板

这款经典的免费WordPress模板以鲜艳的红色为主调&#xff0c;充满了活力与热情。设计简洁而现代&#xff0c;适合各种类型的项目网站。模板采用响应式设计&#xff0c;确保在不同设备和屏幕尺寸上都能呈现出完美的视觉效果。 红色象征着激情、活力和自信&#xff0c;这款模板…

面试题:判断一个完全平方数

面试题&#xff1a;判断一个完全平方数 方法一&#xff1a;平方根法 bool isPerfectSquare(int n) {int sqrt_n int(sqrt(1.0 * n));return sqrt_n * sqrt_n n; }方法二&#xff1a;连续奇数和法 // 一个完全平方数可以表示为前n个连续奇数的和&#xff0c;如1 3 5 // 不…

对策略模式的理解

目录 一、场景1、题目描述 【[来源](https://kamacoder.com/problempage.php?pid1082)】2、输入描述3、输出描述4、输入示例5、输出示例 二、不使用策略模式三、使用策略模式1、不优雅的实现2、策略模式 简单工厂模式2.1 代码2.2 优点2.3 另一种实现方式 四、个人思考 一、场…

2024年Q1企业邮箱安全性研究报告:钓鱼邮件同比增长59.9%

4月23日&#xff0c;Coremail邮件安全联合北京中睿天下信息技术有限公司发布《2024年第一季度企业邮箱安全性研究报告》。对当前企业邮箱的应用状况和安全风险进行了分析。 1、垃圾邮件持续增长 根据Coremail邮件安全人工智能实验室最新数据显示&#xff0c;2024年第一季度&am…

4 -26

4-26 1 英语单词100个一篇六级翻译 2 div 4 补题目 3 概率论期中卷子一张&#xff0c;复习复习。 4 备课ing 晚上出去炫饭&#xff0c;串串香&#xff0c;无敌了。 中间一些模拟题是真的恶心&#xff0c;思维题是真的想不到&#xff0c;感觉自己就是一个废物呢。 1.是将一个数…