redis stream restTemplate消息监听队列框架搭建

整体思路

        1. pom增加redis依赖;

        2. 消息监听器,实现StreamListener接口,处理消息到达逻辑;

        3. 将消息订阅bean及监听器注册到配置中;

1. pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.6</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>

2. 消息监听器实现代码

package cn.thuniwhir.fileserver.redis;import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** @Description: TODO**/
@Component
public class RedisMQListener implements StreamListener<String, MapRecord<String, String, Object>> {private static final Logger log = LoggerFactory.getLogger(RedisMQListener.class);// 创建一个线程池private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());@Overridepublic void onMessage(MapRecord message) {// 异步处理消息threadPoolExecutor.execute(()->{System.out.println(Thread.currentThread().getName() + ":接收到的消息:" + message.getId() + ";" + JSON.toJSONString(message.getValue()));});}
}

3. redis订阅bean及监听器注册

package cn.thuniwhir.fileserver.redis;import cn.thuniwhir.fileserver.context.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;import java.time.Duration;
import java.util.stream.Collectors;/*** @Description: TODO**/
@Configuration
public class RedisMQConfig {@Autowiredprivate RedisMQListener redisMQListener;@Autowiredprivate RedisUtils redisUtils;private static RedisTemplate<Object, Object> redisTemplate;private static final Logger log = LoggerFactory.getLogger(RedisMQConfig.class);public RedisMQConfig(RedisTemplate<Object, Object> redisTemplate) {this.redisTemplate = redisTemplate;}@Beanpublic Subscription subscription(RedisConnectionFactory redisConnectionFactory) {if (redisUtils.hasKey(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME)) {StreamInfo.XInfoGroups xInfoGroups = redisTemplate.opsForStream().groups(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME);if (xInfoGroups.isEmpty()) {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);} else {if (xInfoGroups.stream().filter(xInfoGroups1 -> xInfoGroups1.groupName().equals(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME)).collect(Collectors.toList()).isEmpty()) {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);}}} else {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);}StreamMessageListenerContainer.StreamMessageListenerContainerOptions options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(1)).build();StreamMessageListenerContainer streamMessageListenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);Subscription subscription = streamMessageListenerContainer.receiveAutoAck(Consumer.from(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME, Constants.FILE_MQ_DISK_THRESHOLD_CONSUMER), StreamOffset.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, ReadOffset.lastConsumed()), redisMQListener);streamMessageListenerContainer.start();return subscription;}}

4. 测试生产消息 消息监听成功

4.1 生产消息

@RequestMapping("/produceMessage")public JSONObject produceMessage(@RequestBody JSONObject jsonObject) {String key = jsonObject.getString("key");String value = jsonObject.getString("value");MapRecord<Object, String, Object> mapRecord = MapRecord.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Collections.singletonMap(key, value));redisTemplate.opsForStream().add(mapRecord);System.out.println("produceMessage Thread Name:" + Thread.currentThread().getName());return formatResult(null);}

4.2 消息监听器监听消息到达 代码见第二节

4.3 测试结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/615084.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Modern C++ std::mutex底层原理

前言 我时常有这样的疑问&#xff1a; std::mutex怎么就能保证后面的语句100%安全哪&#xff1f;CPU reordering就不会把这些语句重排到mutex前面执行&#xff1f;而且各个CPU都是有L1、L2缓存的&#xff0c;如果mutex后面要访问的的变量在这些缓存中怎么办&#xff1f; 带着…

openssl3.2 - 官方demo学习 - certs

文章目录 openssl3.2 - 官方demo学习 - certs概述笔记官方的实验流程mkcerts.sh - 整理ocsprun.sh - 整理ocspquery.sh - 整理从mkcerts.sh整理出来的27个.bata1_create_certificate_directly.cmda2_Intermediate_CA_request_first.cmda3_Sign_request_CA_extensions.cmda4_Ser…

C++_纯虚函数and抽象类

纯虚函数 and 抽象类 介绍纯虚函数抽象类纯抽象类(俗称&#xff1a;接口类) 介绍 本文主要介绍 纯虚函数 和 抽象类 纯虚函数 直接看源码吧&#xff0c;纯虚函数样式为&#xff1a;virtual 类型 函数名(参数表) 0; 源码 #include<iostream> #include<string> usi…

C++ 类 对象

C 在 C 语言的基础上增加了面向对象编程&#xff0c;C 支持面向对象程序设计。类是 C 的核心特性&#xff0c;通常被称为用户定义的类型。 类用于指定对象的形式&#xff0c;是一种用户自定义的数据类型&#xff0c;它是一种封装了数据和函数的组合。类中的数据称为成员变量&a…

MyBatis Plus wrapper A and (B or C or D)

Rt&#xff0c;怎么写这个wrapper呢&#xff1f; 例如我们有一个整数列表&#xff0c;数据库中存的是整数列表的字符串形式&#xff1a; list数据库中的存储1,2,3[1,2,3] 我们想查包含某几个数字的所有行。例如如果有1&#xff0c;那么结果中要有[1,2,3]、[1]。 // A Lambd…

在 WinForms 应用中使用 FtpWebRequest 进行文件操作和数据显示

在 WinForms 应用中使用 FtpWebRequest 进行文件操作和数据显示 引言 在企业级应用或桌面程序中&#xff0c;经常需要从远程服务器获取数据&#xff0c;并在用户界面上展示这些数据。本文将通过一个实际案例&#xff0c;演示如何在 Windows Forms 应用程序中使用 FtpWebReques…

互联网大厂职场各职级P6/P7和核心能力

目录 具体能力要求总结 具体能力要求 专业工匠 p5 被别人带领p6 独立完成项目全流程&#xff0c;指导 2-3 人 乐队指挥 p7 带行政团队 7-10 &#xff0c;项目团队&#xff0c;专项团队&#xff0c;复杂系统设计 1-3 个一般系统构成p8 领域专家 垂直 3 个团队 &#xff0c;横…

Qt/C++音视频开发63-设置视频旋转角度/支持0-90-180-270度旋转/自定义旋转角度

一、前言 设置旋转角度,相对来说是一个比较小众的需求,如果视频本身带了旋转角度,则解码播放的时候本身就会旋转到对应的角度显示,比如手机上拍摄的视频一般是旋转了90度的,如果该视频文件放到电脑上打开,一些早期的播放器可能播放的时候是躺着的,因为早期播放器设计的…

SQL-条件查询与聚合函数的使用

&#x1f389;欢迎您来到我的MySQL基础复习专栏 ☆* o(≧▽≦)o *☆哈喽~我是小小恶斯法克&#x1f379; ✨博客主页&#xff1a;小小恶斯法克的博客 &#x1f388;该系列文章专栏&#xff1a;重拾MySQL &#x1f379;文章作者技术和水平很有限&#xff0c;如果文中出现错误&am…

xbox无法登录、没有反应解决方法分享

如果你遇到了Xbox无法登录或没有反应的问题&#xff0c;可以尝试以下几种解决方法&#xff1a; 重启Xbox&#xff1a; 关闭Xbox。等待一分钟。重新启动Xbox。 检查Xbox Live服务状态&#xff1a; 访问Xbox Live服务状态网页&#xff08;官方网站&#xff09;检查是否有任何服务…

架构01 - 知识体系详解

架构&#xff0c;又称为知识体系&#xff0c;是指在特定领域或系统中的组织结构和设计原则。它涵盖了该领域或系统的核心概念、基础理论、方法技术以及实践经验等。架构的主要作用是提供一个全面且系统化的视角&#xff0c;帮助人们理解和应用相关知识&#xff0c;并指导系统的…

微信小程序开发WebSocket通讯

官方文档说明&#xff1a;入口 WebSocket连接的链接只支持wss加密方式&#xff0c;且只能用域名的方式 该域名还要在微信公众平台的小程序中登记才能使用&#xff0c;开发->开发管理->服务器域名->修改 该域名要和https使用的一致 以域名地址&#xff1a;dtu.aab…

Python基础(二十七、继承复写、注解)

文章目录 一、继承1.复写2.调用父类同名成员3.代码示例 二、注解1.变量注解2.函数注解3.Union联合注解语法如下&#xff1a;示例&#xff1a;注意事项&#xff1a; 一、继承 1.复写 子类继承父类的成员属性和成员方法后&#xff0c;如果对其“不满意”&#xff0c;那么可以进…

【Microsoft Copilot】手机端发布 ——GPT-4, DALL-E3 免费用

Microsoft Copilot 关于Microsoft CopilotMicrosoft Copilot 的特点1. 可以在手机端使用&#xff1a;2. 可以免费使用GPT-4。3. 可以无限制地使用GPT-4。4. 可以使用DALL-E3生成图片。5. 搜索功能6. 图像识别 Microsoft Copilot的缺点和注意事项1. 非常容易报错2. 不支持长篇聊…

树状结构查询 - 华为OD统一考试

OD统一考试 分值&#xff1a; 200分 题解&#xff1a; Java / Python / C 题目描述 通常使用多行的节点、父节点表示一棵树&#xff0c;比如&#xff1a; 西安 陕西 陕西 中国 江西 中国 中国 亚洲 泰国 亚洲 输入一个节点之后&#xff0c;请打印出来树中他的所有下层节点。 …

【Python_PySide6学习笔记(二十八)】基于PySide6实现移动窗体到屏幕中间位置

基于PySide6实现移动窗体到屏幕中间位置 基于PySide6实现移动窗体到屏幕中间位置前言一、背景介绍二、实现思路三、具体代码四、函数调用五、实现效果基于PySide6实现移动窗体到屏幕中间位置 前言 在 【Python_PySide2学习笔记(十)】移动窗体到屏幕中间位置 中介绍过移动窗…

靶机实战(10):OSCP备考之VulnHub Tre 1

靶机官网&#xff1a;Tre: 1[1] 实战思路&#xff1a; 一、主机发现二、端口发现&#xff08;服务、组件、版本&#xff09;三、漏洞发现&#xff08;获取权限&#xff09; 8082端口/HTTP服务 组件漏洞URL漏洞&#xff08;目录、文件&#xff09;80端口/HTTP服务 组件漏洞URL漏…

鸿蒙Harmony--状态管理器-@Observed装饰器和@ObjectLink装饰器详解

经历的越多&#xff0c;越喜欢简单的生活&#xff0c;干净的东西&#xff0c;清楚的感觉&#xff0c;有结果的事&#xff0c;和说到做到的人。把圈子变小&#xff0c;把语放缓&#xff0c;把心放宽&#xff0c;用心做好手边的事儿&#xff0c;该有的总会有的! 目录 一&#xff…

kafka下载安装部署

Apache kafka 是一个分布式的基于push-subscribe的消息系统&#xff0c;它具备快速、可扩展、可持久化的特点。它现在是Apache旗下的一个开源系统&#xff0c;作为hadoop生态系统的一部分&#xff0c;被各种商业公司广泛应用。它的最大的特性就是可以实时的处理大量数据以满足各…

面试算法115:重建序列

题目 长度为n的数组org是数字1&#xff5e;n的一个排列&#xff0c;seqs是若干序列&#xff0c;请判断数组org是否为可以由seqs重建的唯一序列。重建的序列是指seqs所有序列的最短公共超序列&#xff0c;即seqs中的任意序列都是该序列的子序列。 例如&#xff0c;如果数组org为…