【Java】SpringBoot中实现Redis Stream队列

SpringBoot实现Redis Stream队列

前言

简单实现一下在SpringBoot中操作Redis Stream队列的方式,监听队列中的消息进行消费。

jdk:1.8

springboot-version:2.6.3

redis:5.0.1(5版本以上才有Stream队列)

准备工作

1pom

redis 依赖包(version 2.6.3)

        <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>

2 yml

spring: redis:database: 0host: 127.0.0.1

3 RedisStreamUtil工具类

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.Map;@Component
public class RedisStreamUtil {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;/*** 创建消费组** @param key   键名称* @param group 组名称* @return {@link String}*/public String oup(String key, String group) {return redisTemplate.opsForStream().createGroup(key, group);}/*** 获取消费者信息** @param key   键名称* @param group 组名称* @return {@link StreamInfo.XInfoConsumers}*/public StreamInfo.XInfoConsumers queryConsumers(String key, String group) {return redisTemplate.opsForStream().consumers(key, group);}/*** 查询组信息** @param key 键名称* @return*/public StreamInfo.XInfoGroups queryGroups(String key) {return redisTemplate.opsForStream().groups(key);}// 添加Map消息public String addMap(String key, Map<String, Object> value) {return redisTemplate.opsForStream().add(key, value).getValue();}// 读取消息public List<MapRecord<String, Object, Object>> read(String key) {return redisTemplate.opsForStream().read(StreamOffset.fromStart(key));}// 确认消费public Long ack(String key, String group, String... recordIds) {return redisTemplate.opsForStream().acknowledge(key, group, recordIds);}// 删除消息。当一个节点的所有消息都被删除,那么该节点会自动销毁public Long del(String key, String... recordIds) {return redisTemplate.opsForStream().delete(key, recordIds);}// 判断是否存在keypublic boolean hasKey(String key) {Boolean aBoolean = redisTemplate.hasKey(key);return aBoolean != null && aBoolean;}
}

代码实现

生产者发送消息

生产者发送消息,在Service层创建addMessage方法,往队列中发送消息。

代码中addMap()方法第一个参数为key,第二个参数为value,该key要和后续配置的保持一致,暂时先记住这个key。

@Service
@Slf4j
@RequiredArgsConstructor
public class RedisStreamMqServiceImpl implements RedisStreamMqService {private final RedisStreamUtil redisStreamUtil;/*** 发送一个消息** @return {@code Object}*/@Overridepublic Object addMessage() {RedisUser redisUser = new RedisUser();redisUser.setAge(18);redisUser.setName("hcr");redisUser.setEmail("156ef561@gmail.com");Map<String, Object> message = new HashMap<>();message.put("user", redisUser);String recordId = redisStreamUtil.addMap("mystream", message);return recordId;}
}

controller接口方法

@RestController
@RequestMapping("/redis")
@Slf4j
@RequiredArgsConstructor
public class RedisController {private final RedisStreamMqService redisStreamMqService;@GetMapping("/addMessage")public Object addMessage() {return redisStreamMqService.addMessage();}
}

调用测试,查看redis中是否正常添加数据。

接口返回数据

1702622585248-0

查看redis中的数据
在这里插入图片描述

消费者监听消息进行消费

创建RedisConsumersListener监听器

import cn.hcr.utils.RedisStreamUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
@Slf4j
@RequiredArgsConstructor
public class RedisConsumersListener implements StreamListener<String, MapRecord<String, String, String>> {public final RedisStreamUtil redisStreamUtil;/*** 监听器** @param message*/@Overridepublic void onMessage(MapRecord<String, String, String> message) {// stream的key值String streamKey = message.getStream();//消息IDRecordId recordId = message.getId();//消息内容Map<String, String> msg = message.getValue();log.info("【streamKey】= " + streamKey + ",【recordId】= " + recordId + ",【msg】=" + msg);//处理逻辑//逻辑处理完成后,ack消息,删除消息,group为消费组名称StreamInfo.XInfoGroups xInfoGroups = redisStreamUtil.queryGroups(streamKey);xInfoGroups.forEach(xInfoGroup -> redisStreamUtil.ack(streamKey, xInfoGroup.groupName(), recordId.getValue()));redisStreamUtil.del(streamKey, recordId.getValue());}
}

创建RedisConfig配置类,配置监听

package cn.hcr.config;import cn.hcr.listener.RedisConsumersListener;
import cn.hcr.utils.RedisStreamUtil;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;import javax.annotation.Resource;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;@Configuration
@Slf4j
public class RedisConfig {@Resourceprivate RedisStreamUtil redisStreamUtil;/*** redis序列化** @param redisConnectionFactory* @return {@code RedisTemplate<String, Object>}*/@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(redisConnectionFactory);Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper om = new ObjectMapper();om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(om);StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();template.setKeySerializer(stringRedisSerializer);template.setHashKeySerializer(stringRedisSerializer);template.setValueSerializer(jackson2JsonRedisSerializer);template.setHashValueSerializer(jackson2JsonRedisSerializer);template.afterPropertiesSet();return template;}@Beanpublic Subscription subscription(RedisConnectionFactory factory) {AtomicInteger index = new AtomicInteger(1);int processors = Runtime.getRuntime().availableProcessors();ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,new LinkedBlockingDeque<>(), r -> {Thread thread = new Thread(r);thread.setName("async-stream-consumer-" + index.getAndIncrement());thread.setDaemon(true);return thread;});StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()// 一次最多获取多少条消息.batchSize(5).executor(executor).pollTimeout(Duration.ofSeconds(1)).errorHandler(throwable -> {log.error("[MQ handler exception]", throwable);throwable.printStackTrace();}).build();//该key和group可根据需求自定义配置String streamName = "mystream";String groupname = "mygroup";initStream(streamName, groupname);var listenerContainer = StreamMessageListenerContainer.create(factory, options);// 手动ask消息Subscription subscription = listenerContainer.receive(Consumer.from(groupname, "zhuyazhou"),StreamOffset.create(streamName, ReadOffset.lastConsumed()), new RedisConsumersListener(redisStreamUtil));// 自动ask消息/* Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]),StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener());*/listenerContainer.start();return subscription;}private void initStream(String key, String group) {boolean hasKey = redisStreamUtil.hasKey(key);if (!hasKey) {Map<String, Object> map = new HashMap<>(1);map.put("field", "value");//创建主题String result = redisStreamUtil.addMap(key, map);//创建消费组redisStreamUtil.oup(key, group);//将初始化的值删除掉redisStreamUtil.del(key, result);log.info("stream:{}-group:{} initialize success", key, group);}}
}

redisTemplate:该bean用于配置redis序列化

subscription:配置监听

initStream:初始化消费组

监听测试

使用addMessage()方法投送一条消息后,查看控制台输出信息。

【streamKey】= mystream,
【recordId】= 1702623008044-0,
【msg】=
{user=["cn.hcr.pojo.RedisUser",{"name":"hcr","age":18,"email":"156ef561@gmail.com"}]
}

总结

以上就是在SpringBoot中简单实现Redis Stream队列的Demo,如有需要源码或者哪里不清楚的请评论或者发送私信。
Template:该bean用于配置redis序列化

subscription:配置监听

initStream:初始化消费组

监听测试

使用addMessage()方法投送一条消息后,查看控制台输出信息。

【streamKey】= mystream,
【recordId】= 1702623008044-0,
【msg】=
{user=["cn.hcr.pojo.RedisUser",{"name":"hcr","age":18,"email":"156ef561@gmail.com"}]
}

总结

以上就是在SpringBoot中简单实现Redis Stream队列的Demo,如有需要源码或者哪里不清楚的请评论或者发送私信。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/228126.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++实现简单的猜数字小游戏

猜数字 小游戏介绍&#xff1a;猜数字游戏是令游戏机随机产生一个100以内的正整数&#xff0c;用户输入一个数对其进行猜测&#xff0c;需要你编写程序自动对其与随机产生的被猜数进行比较&#xff0c;并提示大了&#xff0c;还是小了&#xff0c;相等表示猜到了。如果猜到&…

网络(九)三层路由、DHCP以及VRRP协议介绍

目录 一、三层路由 1. 定义 2. 交换原理 3. 操作演示 3.1 图示 3.2 LSW1新建vlan10、20、30&#xff0c;分别对应123接口均为access类型&#xff0c;接口4为trunkl类型&#xff0c;允许所有vlan通过 3.3 LSW2新建vlan10、20、30&#xff0c;配置接口1为trunk类型&…

报数游戏C语言

分析:掌握数字移动的规律&#xff0c;以及判断&#xff0c;我们可以用一个二维数组来记录每一个人说的数字&#xff0c;就像第一张图片一样&#xff0c;西安向右边移动&#xff0c;再向左下移动&#xff0c;再向左边移动&#xff0c;在向右边移动&#xff0c;在可以用一个数组来…

微服务保护--线程隔离(舱壁模式)

一、线程隔离的实现方式 线程隔离有两种方式实现&#xff1a; 线程池隔离 信号量隔离&#xff08;Sentinel默认采用&#xff09; 如图&#xff1a; 线程池隔离&#xff1a;给每个服务调用业务分配一个线程池&#xff0c;利用线程池本身实现隔离效果 信号量隔离&#xff1a…

数据分析(一)(附带实例和源码)

一、主要目的&#xff1a; 主要利用Python包&#xff0c;如Numpy、Pandas和Scipy等常用分析工具并结合常用的统计量来进行数据的描述&#xff0c;把数据的特征和内在结构展现出来。熟悉在Python开发环境中支持数据分析的可用模块以及其中的方法&#xff0c;基于一定的样例数据…

前端视角看 Docker : 基础命令全面指南

引言 Docker是一种开源的容器化平台&#xff0c;它允许开发者将应用程序和其依赖打包在一个轻量级的、可移植的容器中。这使得应用程序在不同的环境中部署变得简单且高效。本文将介绍Docker的一些基础命令和概念&#xff0c;帮助初学者快速上手。 1. Docker简介 Docker使用…

pl_vio线特征·part II

pl_vio线特征part II 0.引言4.线段残差对位姿的导数4.1.直线的观测模型和误差4.2.误差雅克比推导4.3.误差雅可比求导简洁版(不含imu坐标系转换)4.4.相关代码 0.引言 pl_vio线特征part I 现在CSDN有字数限制了&#xff0c;被迫拆分为两篇文章。 4.线段残差对位姿的导数 这一小…

ASP.NET MVC权限管理系实战之一验证码功能实现

1&#xff0c;权限的管理系统&#xff1a;开发项目必备的一个的功能&#xff1b;该项目使用 ASP.NET MVC5 SqlServer EF6 IOC容器 BoostStrap 2&#xff0c;登录界面验证码功能实现&#xff0c;整体效果如下&#xff1b; 3&#xff0c;接下来就是代码部分实现&#xff0c;前端…

白盒测试、接口测试、自动化测试

一、什么是白盒测试 白盒测试是一种测试策略&#xff0c;这种策略允许我们检查程序的内部结构&#xff0c;对程序的逻辑结构进行检查&#xff0c;从中获取测试数据。白盒测试的对象基本是源程序&#xff0c;所以它又称为结构测试或逻辑驱动测试&#xff0c;白盒测试方法一般分…

在Windows上通过VS2019自带的Cmake来编译OpenCV-4.5.3源码

文章目录 用VS打开OpenCV源码cmake的配置及生成操作生成及安装 用VS打开OpenCV源码 方式一&#xff1a;文件–》打开–》Cmake 找到源码根目录下CMakeLists.txt文件 导入即可。 方式二&#xff1a;在开始使用这里 选择 打开本地文件夹 找到源码的根目录&#xff0c;导入即可…

[NAND Flash 4.1] Flash(闪存)存储器底层原理 | 闪存存储器重要参数

依公知及经验整理&#xff0c;原创保护&#xff0c;禁止转载。 专栏 《深入理解NAND Flash》 <<<< 返回总目录 <<<< ​全文 5000 字。 从底层物理原理上了解 Nand Flash。 1. 存储器诞生&#xff1a; 现代计算机使用存储器来存储数据&#xff0c;其…

插入排序----希尔排序

希尔排序 希尔排序法又称缩小增量法。希尔排序法的基本思想是&#xff1a;先选定一个整数&#xff0c;把待排序文件中所有记录分成个gap组&#xff0c;所有距离为的记录分在同一组内&#xff0c;并对每一组内的记录进行排序。然后&#xff0c;取&#xff0c;重复上述分组和排序…

QT:Unable to create a debugging engine.

debug跑不了&#xff1a; 报错&#xff1a;Unable to create a debugging engine. 参考&#xff1a; https://blog.csdn.net/u010906468/article/details/104716198 先检查是否安装了DEBUG插件 工具-》》选项 查看插件&#xff0c;如果没有的话&#xff0c;需要重新安装qt时…

加密的艺术:对称加密的奇妙之处(下)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

一些关于fMRI脑数据的预处理工具

一些关于fMRI脑数据的预处理工具 前言概述SPM12工具箱FSL工具箱FreeSurfer工具箱BrainNet Viewer工具箱circularGraph工具箱Nipype集成框架fMRIPrep集成框架参考文献 前言 March 25, 2022 这里是关于fMRI脑数据的预处理工具的相关调研 主要是关于数据的预处理&#xff0c;数据…

Windows环境提示“‘mysql‘ 不是内部或外部命令,也不是可运行的程序或批处文理件” 简易记录

在Windows环境下使用DOS命令窗登入MYSQL&#xff0c;提示“mysql 不是内部或外部命令&#xff0c;也不是可运行的程序或批处理文件。” 这意味着系统无法找到 mysql.exe可执行文件&#xff0c;这是因为 MySQL 没有正确安装或未添加到系统PATH环境变量中所致。 处理方法&#x…

分组背包问题笔记

分组背包是选不同的组&#xff0c;每个组中只能选一个物品。分组背包就是01背包的变种&#xff0c;多重背包就是特殊的分组背包。 //分组背包 #include<iostream> using namespace std; const int N 110; int f[N], v[N], w[N], n, m;int main() {ios::sync_with_stdio(…

ShardingSphereJDBC简单入门

ShardingSphere 介绍ShardingSphere-JDBCSharding-Sphere-ProxyShardingSphere-Sidecar混合架构运行模式DistSQL可拔插架构ShardingSphere的发展路线 主从复制ShardingSphere-JDBC功能SQL解析SQL支持程度SQL稳定支持SQL实验性支持 MySQL不支持SQL清单分页 数据分片垂直分片水平…

Python 爬虫开发完整环境部署,爬虫核心框架安装

Python 爬虫开发完整环境部署 前言&#xff1a; ​ 关于本篇笔记&#xff0c;参考书籍为 《Python 爬虫开发实战3 》 笔记做出来的一方原因是为了自己对 Python 爬虫加深认知&#xff0c;一方面也想为大家解决在爬虫技术区的一些问题&#xff0c;本篇文章所使用的环境为&#x…

关于Ubuntu22.04恢复误删文件的记录

挂载在Ubuntu22.04下的固态盘有文件被误删了&#xff0c;该固态盘是ntfs格式的。 在网上找了很多教程&#xff0c;最后决定用TestDisk工具进行恢复。 现记录如下&#xff1a; Ubuntu安装testdisk sudo apt-get install testdisk运行testdisk sudo testdisk得到 我选择的是…