使用redis 的stream 做消息中间件 多线程消费消息

1.redis stream 特点

1.支持消息持久化
2.消费者组模式
3.消息确认机制
4. 消息重试机制
5. 死信队列

2. 消息生产者服务

2.1 如下代码
@Service
@Slf4j
public class StreamMessageProducer {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String STREAM_KEY = "message:stream";/*** 发送消息*/public String sendMessage(String topic, Object message) {try {StringRecord record = StreamRecords.string(Collections.singletonMap("message", JSON.toJSONString(message))).withStreamKey(STREAM_KEY + ":" + topic);RecordId recordId = redisTemplate.opsForStream().add(record);log.info("消息发送成功: topic={}, messageId={}", topic, recordId);return recordId.getValue();} catch (Exception e) {log.error("消息发送失败: topic={}, message={}", topic, message, e);throw new RuntimeException("消息发送失败", e);}}/*** 批量发送消息*/public List<String> sendMessages(String topic, List<Object> messages) {try {List<MapRecord<String, String, String>> records = messages.stream().map(msg -> StreamRecords.string(Collections.singletonMap("message", JSON.toJSONString(msg))).withStreamKey(STREAM_KEY + ":" + topic)).collect(Collectors.toList());List<String> messageIds = new ArrayList<>();for (MapRecord<String, String, String> record : records) {RecordId recordId = redisTemplate.opsForStream().add(record);messageIds.add(recordId.getValue());}log.info("批量消息发送成功: topic={}, count={}", topic, messageIds.size());return messageIds;} catch (Exception e) {log.error("批量消息发送失败: topic={}", topic, e);throw new RuntimeException("批量消息发送失败", e);}}
}

3.消息消费者服务(多线程消费)

3.1 代码如下
@Service
@Slf4j
public class StreamMessageConsumer {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String STREAM_KEY = "message:stream";private final Map<String, StreamMessageHandler<?>> handlers = new ConcurrentHashMap<>();/*** 注册消息处理器*/public <T> void registerHandler(String topic, Class<T> messageType, Consumer<T> handler) {handlers.put(topic, new StreamMessageHandler<>(messageType, handler));}/*** 启动消费*/@PostConstructpublic void startConsuming() {for (String topic : handlers.keySet()) {String streamKey = STREAM_KEY + ":" + topic;String consumerGroup = "group:" + topic;String consumerName = "consumer:" + UUID.randomUUID().toString();try {// 创建消费者组(如果不存在)createConsumerGroupIfNotExists(streamKey, consumerGroup);// 启动消费线程Thread consumerThread = new Thread(() -> consumeMessages(streamKey, consumerGroup, consumerName, topic));consumerThread.setName("stream-consumer-" + topic);consumerThread.start();} catch (Exception e) {log.error("启动消费者失败: topic={}", topic, e);}}}private void createConsumerGroupIfNotExists(String streamKey, String groupName) {try {redisTemplate.opsForStream().createGroup(streamKey, groupName);} catch (Exception e) {// 组已存在,忽略异常log.debug("Consumer group already exists: {}", groupName);}}private void consumeMessages(String streamKey, String group, String consumer, String topic) {StreamMessageHandler<?> handler = handlers.get(topic);while (!Thread.currentThread().isInterrupted()) {try {// 读取消息List<MapRecord<String, String, String>> records = redisTemplate.opsForStream().read(Consumer.from(group, consumer),StreamReadOptions.empty().count(10).block(Duration.ofSeconds(1)),StreamOffset.create(streamKey, ReadOffset.lastConsumed()));if (records != null && !records.isEmpty()) {for (MapRecord<String, String, String> record : records) {try {// 处理消息processMessage(record, handler);// 确认消息redisTemplate.opsForStream().acknowledge(streamKey, group, record.getId());} catch (Exception e) {log.error("消息处理失败: messageId={}", record.getId(), e);}}}} catch (Exception e) {log.error("消息消费异常: topic={}", topic, e);try {Thread.sleep(1000);} catch (InterruptedException ie) {Thread.currentThread().interrupt();break;}}}}private <T> void processMessage(MapRecord<String, String, String> record, StreamMessageHandler<T> handler) {try {String messageJson = record.getValue().get("message");T message = JSON.parseObject(messageJson, handler.getMessageType());handler.getHandler().accept(message);} catch (Exception e) {log.error("消息处理失败: {}", record, e);throw e;}}
}@Data
@AllArgsConstructor
class StreamMessageHandler<T> {private Class<T> messageType;private Consumer<T> handler;
}

4.消息重试服务

4.1 代码如下
@Service
@Slf4j
public class StreamMessageRetryService {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String STREAM_KEY = "message:stream";private static final int MAX_RETRY_COUNT = 3;/*** 处理待处理的消息*/@Scheduled(fixedDelay = 60000) // 每分钟执行一次public void processPendingMessages() {for (String topic : getTopics()) {String streamKey = STREAM_KEY + ":" + topic;String groupName = "group:" + topic;try {// 获取待处理的消息PendingMessages pending = redisTemplate.opsForStream().pending(streamKey, groupName, Range.unbounded(), 100);if (pending != null) {for (PendingMessage message : pending.getPendingMessages()) {processRetry(streamKey, groupName, message);}}} catch (Exception e) {log.error("处理待处理消息失败: topic={}", topic, e);}}}private void processRetry(String streamKey, String groupName, PendingMessage message) {try {if (message.getTotalDeliveryCount() > MAX_RETRY_COUNT) {// 超过重试次数,移动到死信队列moveToDeadLetter(streamKey, groupName, message.getIdAsString());} else {// 重新投递消息redisTemplate.opsForStream().claim(streamKey, groupName, "retry-consumer", Duration.ofMinutes(1), message.getIdAsString());}} catch (Exception e) {log.error("处理重试消息失败: messageId={}", message.getIdAsString(), e);}}private void moveToDeadLetter(String streamKey, String groupName, String messageId) {try {// 读取消息内容List<MapRecord<String, String, String>> messages = redisTemplate.opsForStream().range(streamKey, Range.closed(messageId, messageId));if (messages != null && !messages.isEmpty()) {MapRecord<String, String, String> message = messages.get(0);// 存储到死信队列redisTemplate.opsForStream().add(streamKey + ":dead", message.getValue());// 确认原消息redisTemplate.opsForStream().acknowledge(streamKey, groupName, messageId);}} catch (Exception e) {log.error("移动消息到死信队列失败: messageId={}", messageId, e);}}
}

5.使用示例

5.1代码如下
@Service
@Slf4j
public class MessageService {@Autowiredprivate StreamMessageProducer producer;@Autowiredprivate StreamMessageConsumer consumer;@PostConstructpublic void init() {// 注册订单消息处理器consumer.registerHandler("order", OrderMessage.class, this::processOrderMessage);// 注册支付消息处理器consumer.registerHandler("payment", PaymentMessage.class, this::processPaymentMessage);}/*** 发送订单消息*/public String sendOrderMessage(OrderMessage message) {return producer.sendMessage("order", message);}/*** 处理订单消息*/private void processOrderMessage(OrderMessage message) {try {log.info("处理订单消息: {}", message);// 处理订单逻辑} catch (Exception e) {log.error("订单消息处理失败", e);throw e;}}/*** 处理支付消息*/private void processPaymentMessage(PaymentMessage message) {try {log.info("处理支付消息: {}", message);// 处理支付逻辑} catch (Exception e) {log.error("支付消息处理失败", e);throw e;}}
}@Data
@AllArgsConstructor
class OrderMessage {private String orderId;private String status;private BigDecimal amount;
}@Data
@AllArgsConstructor
class PaymentMessage {private String paymentId;private String orderId;private BigDecimal amount;private String status;
}

6.监控和管理服务

6.1 代码如下
@Service
@Slf4j
public class StreamMonitorService {@Autowiredprivate StringRedisTemplate redisTemplate;/*** 获取Stream信息*/public StreamInfo getStreamInfo(String topic) {try {String streamKey = STREAM_KEY + ":" + topic;StreamInfo.StreamInfoBuilder builder = StreamInfo.builder();// 获取消息总数Long length = redisTemplate.opsForStream().size(streamKey);builder.messageCount(length != null ? length : 0);// 获取消费者组信息StreamInfo.GroupInfo groupInfo = getGroupInfo(streamKey);builder.groupInfo(groupInfo);// 获取最新消息IDbuilder.lastMessageId(getLastMessageId(streamKey));return builder.build();} catch (Exception e) {log.error("获取Stream信息失败: topic={}", topic, e);throw new RuntimeException("获取Stream信息失败", e);}}/*** 清理过期消息*/@Scheduled(cron = "0 0 * * * *") // 每小时执行public void cleanupOldMessages() {try {for (String topic : getTopics()) {String streamKey = STREAM_KEY + ":" + topic;// 保留最近24小时的消息long maxLength = 1000000; // 最大消息数redisTemplate.opsForStream().trim(streamKey, maxLength, true);}} catch (Exception e) {log.error("清理过期消息失败", e);}}
}@Data
@Builder
class StreamInfo {private long messageCount;private String lastMessageId;private GroupInfo groupInfo;@Data@Builderstatic class GroupInfo {private String name;private int consumerCount;private long pendingMessageCount;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/62385.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python100道练习题

Python100道练习题 BIlibili 1、两数之和 num1 20 num2 22result num1 num2print(result)2、一百以内的偶数 list1 []for i in range(1,100):if i % 2 0:list1.append(i) print(list1)3、一百以内的奇数 # 方法一 list1 [] for i in range(1,100):if i % 2 ! 0:lis…

Java转C之并发和多线程

提纲&#xff1a; 概念介绍与对比概述 简述Java与C在并发和多线程方面的核心区别解释C11标准、POSIX、C11 <threads.h>、Pthread等名词 Java多线程与并发回顾 线程、Runnable、ExecutorService概念说明同步关键字与工具类含义 C并发基础 没有Java式的内置线程类&#xf…

Ubuntu系统本地化搭建Maxakb+Ollama

安装docker 最详细的ubuntu 安装 docker教程-腾讯云开发者社区-腾讯云 安装Ollama Ollama官网 执行命令&#xff1a; curl -fsSL https://ollama.com/install.sh | sh安装完成后下载模型 执行命令&#xff1a; ollama run llama3.3:70b安装MaxKb 执行命令&#xff1a; d…

基于JAVA的旅游网站系统设计

摘要 随着信息技术和网络技术的迅速发展&#xff0c;人们的生活质量和观念也在发生着改变&#xff0c;各地争相发展旅游业&#xff0c;传统的 旅游社已经无法满足人们的需求&#xff0c;旅游网站将突破传统在时间和地域的限制&#xff0c;成为方便、快捷、安全、可靠的旅游 方…

【Flux.jl】 卷积神经网络

Flux.jl 是包含卷积神经网络的, 但是官方API文件中没有给出一个完整的程序框架, 只是对所需神经元给了局部解释, 此外对 model-zoo 模型动物园中的案例没有及时跟着 Flux.jl 的版本更新, 也无法运行出来结果。 因此本文搭建了一个完整可训练的卷积神经网络。 Conv 卷积算子…

H5游戏出海如何获得更多增长机会?

海外H5小游戏的崛起给了国内众多中小厂商出海发展的机会&#xff0c;开发者如何在海外市场获得更多的增长机会&#xff1f;#APP出海# H5游戏如何在海外获得核心用户&#xff1f; HTML5游戏的开发与运营者们首先可以利用量多质高的HTML5游戏&#xff0c;维持海外用户粘性&…

Next.js系统性教学:深入理解和应用组件组合模式

更多有关Next.js教程&#xff0c;请查阅&#xff1a; 【目录】Next.js 独立开发系列教程-CSDN博客 目录 更多有关Next.js教程&#xff0c;请查阅&#xff1a; 1. 什么是组件组合模式&#xff1f; 1.1 组件组合模式概述 1.2 组件组合模式的优势 2. Next.js 中的组件组合模式…

国际荐酒师Peter助力第六届地博会,推动地理标志产品国际化发展

国际荐酒师Peter Lisicky助力第六届知交会暨地博会&#xff0c;推动地理标志产品国际化发展 第六届粤港澳大湾区知识产权交易博览会暨国际地理标志产品交易博览会于2024年12月9日至11日在中新广州知识城盛大举行&#xff0c;吸引了全球众多行业专家、企业代表及相关机构齐聚一…

Mybatis 延迟加载的实现原理详细解析

Mybatis 延迟加载的实现原理详细解析 &#xff08;1&#xff09;代理对象机制的深入探讨 代理对象的生成&#xff1a;Mybatis 使用代理对象来实现延迟加载是基于 Java 的代理机制。当开启延迟加载并且配置正确后&#xff0c;对于需要延迟加载的关联对象&#xff0c;Mybatis 会…

2024 亚马逊云科技re:Invent:Werner Vogels架构哲学,大道至简 六大经验助力架构优化

在2024亚马逊云科技re:Invent全球大会第四天的主题演讲中&#xff0c;亚马逊副总裁兼CTO Dr.Werner Vogels分享了 The Way of Simplexity&#xff0c;繁简之道&#xff0c;浓缩了Werner在亚马逊20年构建架构的经验。 Werner表示&#xff0c;复杂性总是会“悄无声息”地渗透进来…

Java Web 开发学习中:过滤器与 Ajax 异步请求

一、过滤器 Filter&#xff1a; 过滤器的概念与用途 在一个庞大的 Web 应用中&#xff0c;有许多资源需要受到保护或进行特定的预处理。过滤器就像是一位智能的守卫&#xff0c;站在资源的入口处&#xff0c;根据预先设定的规则&#xff0c;决定哪些请求可以顺利访问资源&…

ThinkPHP框架审计--基础

基础入门 搭建好thinkphp 查看版本方法&#xff0c;全局搜version 根据开发手册可以大致了解该框架的路由 例如访问url http://127.0.0.1:8094/index.php/index/index/index 对应代码位置 例如在代码下面添加新方法 那么访问这个方法的url就是 http://127.0.0.1:8094/index.…

浅谈Python库之‌Requests

一、‌Requests的介绍 Requests 是一个简单易用的 HTTP 库&#xff0c;用于发送各种 HTTP 请求。它由 Kenneth Reitz 创建&#xff0c;并广泛用于 Python 社区中。 二、‌Requests的特点 1、人性化的 API&#xff1a;简洁的接口使得编写请求代码变得简单直观。 2、跨平台&…

如何在vue中使用ECharts

一. 打开ECharts官网,点击快速入门 下面是ECharts官网的链接 https://echarts.apache.org/ 二.在vue中使用 1.首先先引入Echarts js文件 如下图&#xff0c;下面的第一张图片是官网的实现&#xff0c;第二章图片是我根据官网的实现 2.给ECharts 创建一个DOM容器 3. 使用ec…

网络原理之 IP 协议

目录 1. IP 协议报文格式 2. 网段划分 3. 地址管理 1) 动态分配 2) NAT 机制 (网络地址转换) 3) IPv6 4. 路由选择 1. IP 协议报文格式 IP 协议是网络层的重点协议。 网络层要做的事情&#xff0c;主要就是两方面&#xff1a; 1) 地址管理 制定一系列的规则&#xff…

HyperMesh CFD功能详解:后处理功能Part 2

Clips Clips 按钮包含两个工具。Box Clip用于空间上的裁剪&#xff0c;Scalar Clip可以根据物理量的范围裁剪。 示例&#xff1a;Box Clips 裁剪 示例&#xff1a;Scalar Clips 裁剪 通过裁剪&#xff0c;仅显示density范围是10~20的等值面 示例&#xff1a;显示效果控制 部分透…

Java项目实战II基于微信小程序的跑腿系统(开发文档+数据库+源码)

目录 一、前言 二、技术介绍 三、系统实现 四、核心代码 五、源码获取 全栈码农以及毕业设计实战开发&#xff0c;CSDN平台Java领域新星创作者&#xff0c;专注于大学生项目实战开发、讲解和毕业答疑辅导。获取源码联系方式请查看文末 一、前言 在快节奏的现代生活中&…

【机器学习与数据挖掘实战案例01】基于支持向量回归的市财政收入分析

【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈机器学习与数据挖掘实战 ⌋ ⌋ ⌋ 机器学习是人工智能的一个分支&#xff0c;专注于让计算机系统通过数据学习和改进。它利用统计和计算方法&#xff0c;使模型能够从数据中自动提取特征并做出预测或决策。数据挖掘则是从大型数…

windows下nacos启动报错:java.lang.unsatisfiedLinkError: C:\USers\乱码AppData\xxx.dll

问题 看了许多别的帖子&#xff0c;大家都是因为缺少dll包&#xff0c;下载安装 Microsoft Visual C 2015 Redistributable 就可以。但我试过了不行。思来想去&#xff0c;之前正常的时候用的JDK版本是17&#xff0c;后面别的项目用1.8给切换回来了。然后尝试配置环境变量将JD…

JavaEE 【知识改变命运】03 多线程(3)

文章目录 多线程带来的风险-线程安全线程不安全的举例分析产出线程安全的原因&#xff1a;1.线程是抢占式的2. 多线程修改同一个变量&#xff08;程序的要求&#xff09;3. 原子性4. 内存可见性5. 指令重排序 总结线程安全问题产生的原因解决线程安全问题1. synchronized关键字…