redis stream restTemplate消息监听队列框架搭建

整体思路

        1. pom增加redis依赖;

        2. 消息监听器,实现StreamListener接口,处理消息到达逻辑;

        3. 将消息订阅bean及监听器注册到配置中;

1. pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.6</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>

2. 消息监听器实现代码

package cn.thuniwhir.fileserver.redis;import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** @Description: TODO**/
@Component
public class RedisMQListener implements StreamListener<String, MapRecord<String, String, Object>> {private static final Logger log = LoggerFactory.getLogger(RedisMQListener.class);// 创建一个线程池private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());@Overridepublic void onMessage(MapRecord message) {// 异步处理消息threadPoolExecutor.execute(()->{System.out.println(Thread.currentThread().getName() + ":接收到的消息:" + message.getId() + ";" + JSON.toJSONString(message.getValue()));});}
}

3. redis订阅bean及监听器注册

package cn.thuniwhir.fileserver.redis;import cn.thuniwhir.fileserver.context.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;import java.time.Duration;
import java.util.stream.Collectors;/*** @Description: TODO**/
@Configuration
public class RedisMQConfig {@Autowiredprivate RedisMQListener redisMQListener;@Autowiredprivate RedisUtils redisUtils;private static RedisTemplate<Object, Object> redisTemplate;private static final Logger log = LoggerFactory.getLogger(RedisMQConfig.class);public RedisMQConfig(RedisTemplate<Object, Object> redisTemplate) {this.redisTemplate = redisTemplate;}@Beanpublic Subscription subscription(RedisConnectionFactory redisConnectionFactory) {if (redisUtils.hasKey(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME)) {StreamInfo.XInfoGroups xInfoGroups = redisTemplate.opsForStream().groups(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME);if (xInfoGroups.isEmpty()) {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);} else {if (xInfoGroups.stream().filter(xInfoGroups1 -> xInfoGroups1.groupName().equals(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME)).collect(Collectors.toList()).isEmpty()) {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);}}} else {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);}StreamMessageListenerContainer.StreamMessageListenerContainerOptions options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(1)).build();StreamMessageListenerContainer streamMessageListenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);Subscription subscription = streamMessageListenerContainer.receiveAutoAck(Consumer.from(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME, Constants.FILE_MQ_DISK_THRESHOLD_CONSUMER), StreamOffset.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, ReadOffset.lastConsumed()), redisMQListener);streamMessageListenerContainer.start();return subscription;}}

4. 测试生产消息 消息监听成功

4.1 生产消息

@RequestMapping("/produceMessage")public JSONObject produceMessage(@RequestBody JSONObject jsonObject) {String key = jsonObject.getString("key");String value = jsonObject.getString("value");MapRecord<Object, String, Object> mapRecord = MapRecord.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Collections.singletonMap(key, value));redisTemplate.opsForStream().add(mapRecord);System.out.println("produceMessage Thread Name:" + Thread.currentThread().getName());return formatResult(null);}

4.2 消息监听器监听消息到达 代码见第二节

4.3 测试结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/615084.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Modern C++ std::mutex底层原理

前言 我时常有这样的疑问&#xff1a; std::mutex怎么就能保证后面的语句100%安全哪&#xff1f;CPU reordering就不会把这些语句重排到mutex前面执行&#xff1f;而且各个CPU都是有L1、L2缓存的&#xff0c;如果mutex后面要访问的的变量在这些缓存中怎么办&#xff1f; 带着…

C++ 类 对象

C 在 C 语言的基础上增加了面向对象编程&#xff0c;C 支持面向对象程序设计。类是 C 的核心特性&#xff0c;通常被称为用户定义的类型。 类用于指定对象的形式&#xff0c;是一种用户自定义的数据类型&#xff0c;它是一种封装了数据和函数的组合。类中的数据称为成员变量&a…

在 WinForms 应用中使用 FtpWebRequest 进行文件操作和数据显示

在 WinForms 应用中使用 FtpWebRequest 进行文件操作和数据显示 引言 在企业级应用或桌面程序中&#xff0c;经常需要从远程服务器获取数据&#xff0c;并在用户界面上展示这些数据。本文将通过一个实际案例&#xff0c;演示如何在 Windows Forms 应用程序中使用 FtpWebReques…

互联网大厂职场各职级P6/P7和核心能力

目录 具体能力要求总结 具体能力要求 专业工匠 p5 被别人带领p6 独立完成项目全流程&#xff0c;指导 2-3 人 乐队指挥 p7 带行政团队 7-10 &#xff0c;项目团队&#xff0c;专项团队&#xff0c;复杂系统设计 1-3 个一般系统构成p8 领域专家 垂直 3 个团队 &#xff0c;横…

Qt/C++音视频开发63-设置视频旋转角度/支持0-90-180-270度旋转/自定义旋转角度

一、前言 设置旋转角度,相对来说是一个比较小众的需求,如果视频本身带了旋转角度,则解码播放的时候本身就会旋转到对应的角度显示,比如手机上拍摄的视频一般是旋转了90度的,如果该视频文件放到电脑上打开,一些早期的播放器可能播放的时候是躺着的,因为早期播放器设计的…

SQL-条件查询与聚合函数的使用

&#x1f389;欢迎您来到我的MySQL基础复习专栏 ☆* o(≧▽≦)o *☆哈喽~我是小小恶斯法克&#x1f379; ✨博客主页&#xff1a;小小恶斯法克的博客 &#x1f388;该系列文章专栏&#xff1a;重拾MySQL &#x1f379;文章作者技术和水平很有限&#xff0c;如果文中出现错误&am…

架构01 - 知识体系详解

架构&#xff0c;又称为知识体系&#xff0c;是指在特定领域或系统中的组织结构和设计原则。它涵盖了该领域或系统的核心概念、基础理论、方法技术以及实践经验等。架构的主要作用是提供一个全面且系统化的视角&#xff0c;帮助人们理解和应用相关知识&#xff0c;并指导系统的…

微信小程序开发WebSocket通讯

官方文档说明&#xff1a;入口 WebSocket连接的链接只支持wss加密方式&#xff0c;且只能用域名的方式 该域名还要在微信公众平台的小程序中登记才能使用&#xff0c;开发->开发管理->服务器域名->修改 该域名要和https使用的一致 以域名地址&#xff1a;dtu.aab…

树状结构查询 - 华为OD统一考试

OD统一考试 分值&#xff1a; 200分 题解&#xff1a; Java / Python / C 题目描述 通常使用多行的节点、父节点表示一棵树&#xff0c;比如&#xff1a; 西安 陕西 陕西 中国 江西 中国 中国 亚洲 泰国 亚洲 输入一个节点之后&#xff0c;请打印出来树中他的所有下层节点。 …

靶机实战(10):OSCP备考之VulnHub Tre 1

靶机官网&#xff1a;Tre: 1[1] 实战思路&#xff1a; 一、主机发现二、端口发现&#xff08;服务、组件、版本&#xff09;三、漏洞发现&#xff08;获取权限&#xff09; 8082端口/HTTP服务 组件漏洞URL漏洞&#xff08;目录、文件&#xff09;80端口/HTTP服务 组件漏洞URL漏…

鸿蒙Harmony--状态管理器-@Observed装饰器和@ObjectLink装饰器详解

经历的越多&#xff0c;越喜欢简单的生活&#xff0c;干净的东西&#xff0c;清楚的感觉&#xff0c;有结果的事&#xff0c;和说到做到的人。把圈子变小&#xff0c;把语放缓&#xff0c;把心放宽&#xff0c;用心做好手边的事儿&#xff0c;该有的总会有的! 目录 一&#xff…

kafka下载安装部署

Apache kafka 是一个分布式的基于push-subscribe的消息系统&#xff0c;它具备快速、可扩展、可持久化的特点。它现在是Apache旗下的一个开源系统&#xff0c;作为hadoop生态系统的一部分&#xff0c;被各种商业公司广泛应用。它的最大的特性就是可以实时的处理大量数据以满足各…

面试算法115:重建序列

题目 长度为n的数组org是数字1&#xff5e;n的一个排列&#xff0c;seqs是若干序列&#xff0c;请判断数组org是否为可以由seqs重建的唯一序列。重建的序列是指seqs所有序列的最短公共超序列&#xff0c;即seqs中的任意序列都是该序列的子序列。 例如&#xff0c;如果数组org为…

初识Hadoop-概述与关键技术

一.大数据概述 1.什么是大数据 高速发展的信息时代&#xff0c;新一轮科技革命和变革正在加速推进&#xff0c;技术创新日益成为重塑经济发展模式和促进经济增长的重要驱动力量&#xff0c;而“大数据”无疑是核心推动力。 那么&#xff0c;什么是“大数据”呢&#xff1…

初识C#语言

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、C#语言以及特点C# 强大的编程功能 二、C# 与 .NET三、.NET 与 .NET Framwork.Net 框架&#xff08;.Net Framework&#xff09;.NET 与 .NET Framework 总结…

MySQL基础命令

1.SQL语句基础 1.1 SQL简介 SQL&#xff1a;结构化查询语言(Structured Query Language)&#xff0c;在关系型数据库上执行数据操作、数据检索以及 数据维护的标准语言。使用SQL语句&#xff0c;程序员和数据库管理员可以完成如下的任务 改变数据库的结构 更改系统的安全设置 …

人工智能推动供应链革命的成功

人工智能推动供应链革命的成功 目录 人工智能推动供应链革命的成功一、供应链管理不断变化的面貌二、拥挤的解决方案景观三、踏上人工智能驱动的转型1. 价值创造识别、战略和路线图2. 目标解决方案设计和供应商选择3. 实施与系统集成4. 变革管理、能力建设和全面价值获取 新技术…

flutter 文件下载及存储路径

flutter 文件下载及存储路径 前言一、下载进度条二、文件路径二、文件上传总结 前言 日常开发中&#xff0c;经常会遇到下载文件的功能&#xff0c;往往我们在需要保存文件的路径上去调试&#xff0c;比如Android中的路径&#xff0c;有些会报错在SD卡中&#xff0c;但是有些手…

1.5计算机网络的分类

1.5计算机网络的分类 1.5.1按照网络的作用范围进行分类 1、广域网WAN 广域网WAN&#xff08;WideAreaNetwork&#xff09;&#xff1a;广域网的作用范围通常为几十到几千公里&#xff0c;因而有时也称为远程网(longhaulnetwork)。广域网是互联网的核心部分&#xff0c;其任务…

几款优秀科学开源计算软件介绍

有一些比较优秀的软件&#xff0c;它们在科学计算、数据处理和分析方面具有广泛的应用和功能。以下是一些比较知名的软件&#xff1a; SciPy&#xff1a;SciPy是一个非常流行的科学计算库&#xff0c;提供了大量的数学函数和算法&#xff0c;用于解决各种科学问题。它支持多种操…