手搭手RocketMQ重试机制

环境介绍

技术栈

springboot+mybatis-plus+mysql+rocketmq

软件

版本

mysql

8

IDEA

IntelliJ IDEA 2022.2.1

JDK

17

Spring Boot

3.1.7

dynamic-datasource

3.6.1

mybatis-plus

3.5.3.2

rocketmq

4.9.4

加入依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><!-- 排除logback依赖 --><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><!--Log4j2场景启动器 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.3</version><exclusions><exclusion><groupId>com.baomidou</groupId><artifactId>mybatis-plus-generator</artifactId></exclusion></exclusions></dependency><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.14</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>dynamic-datasource-spring-boot-starter</artifactId><version>3.6.1</version></dependency><dependency><groupId>p6spy</groupId><artifactId>p6spy</artifactId><version>3.9.1</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.2</version></dependency></dependencies>

Broker:经纪人(经理人)

Topic主题:消息区分,分类,虚拟结构

Queue:消息队列

Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

Rocket重试机制

RocketMQ的重试机制是指:当消费者消费消息失败时,RocketMQ会在一定时间后重新将消息发送给消费者进行消费,以确保消息的可靠消费。

自动重试:Consumer在消费失败后,会在一定重试策略下定期重试消费失败的消息,直到成功或达到最大重试次数。

消息重发:如果Consumer在最大重试次数内仍然消费失败,Broker会定期扫描被标记为消费失败的消息,并将其重发给其他Consumer。

灵活的重试策略:RocketMQ提供多种重试策略来控制重试时机和频率,主要有:

生产者重试

生产者设置消息失败后重试次数

//同步
producer.setRetryTimesWhenSendFailed(3);
//异步
producer.setRetryTimesWhenSendAsyncFailed(2);

Int 重试的次数

//重试

@Test
void retryProducerTest()throws Exception{//创建生产者DefaultMQProducer producer = new DefaultMQProducer("retryGroup");//连接namesrvproducer.setNamesrvAddr("192.168.68.133:9876");//启动producer.start();producer.setDefaultTopicQueueNums(1);//自身业务key唯一String Key = UUID.randomUUID().toString();System.out.println(Key);//重试//同步producer.setRetryTimesWhenSendFailed(3);//异步//producer.setRetryTimesWhenSendAsyncFailed(2);//创建消息Message message = new Message("retry", null,Key, "重试测试内存内容".getBytes());//发送消息producer.send(message);System.out.println("发送成功");//关闭生产者producer.shutdown();
}

消费者重试

设置自定义最大重试

consumer.setMaxReconsumeTimes(6);

死信消息(超过重试次数,并未处理的消息),放在死信主题中,%DLQ% retry

@Test
void retryConsumerTest() throws Exception {//创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retryConsumerTest");//连接namesrvconsumer.setNamesrvAddr("192.168.68.133:9876");//订阅主题   *表示该主题的所有消息consumer.subscribe("retry","*");//设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式consumer.registerMessageListener(new MessageListenerConcurrently() {//消费方法@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {//业务处理//获取keyfor (MessageExt messageExt : msgs) {System.out.println(new Date());System.out.println("消息内容"+new String(messageExt.getBody()));}//CONSUME_SUCCESS成功  RECONSUME_LATER失败return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});//启动consumer.start();//挂起当前jvmSystem.in.read();//关闭 consumer.shutdown();
}

死信处理方案

死信处理方案1、单独订阅死信主题2、监听死信主题(业务流程控制)

通过存入单独数据库表,业务发送短信等方式通知人工处理

1、单独订阅死信主题

单独订阅监听主题

@Test
void retryDeadMonitorConsumerTest() throws Exception {//创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retryDeadMonitorConsumerTest");//连接namesrvconsumer.setNamesrvAddr("192.168.68.133:9876");//订阅死信主题   *表示该主题的所有消息consumer.subscribe("%DLQ%retryConsumerTest","*");//设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式consumer.registerMessageListener(new MessageListenerConcurrently() {//消费方法@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {//业务处理//获取keyfor (MessageExt messageExt : msgs) {System.out.println(new Date());System.out.println("将死信消息单独存入未处理消息表中"+new String(messageExt.getBody()));}//CONSUME_SUCCESS成功  RECONSUME_LATER失败return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动consumer.start();//挂起当前jvmSystem.in.read();//关闭 consumer.shutdown();
}

2、监听死信主题(业务流程控制)

通过业务流程监听多个主题

//死信处理方案二、监听死信主题

@Test
void retryDeadMonitorConsumerTest() throws Exception {//创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retryDeadMonitorConsumerTest");//连接namesrvconsumer.setNamesrvAddr("192.168.68.133:9876");//设置每次重试次数consumer.setMaxReconsumeTimes(3);// 订阅需要的多个主题列表List<String> topics = Arrays.asList("retry", "TopicA", "TopicB");// 订阅主题列表中的所有主题for (String topic : topics) {consumer.subscribe(topic, "*"); // 这里的tag是用来过滤消息的,"*" 表示接收这个主题下的所有消息}//设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式consumer.registerMessageListener(new MessageListenerConcurrently() {//消费方法@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {//业务处理//获取keyfor (MessageExt messageExt : msgs) {try{//业务代码System.out.println("业务代码");System.out.println("消息内容"+new String(messageExt.getBody()));int i =1/0;//模拟代码出错}catch (Exception e){//获取重试次数int reconsumeTimes = messageExt.getReconsumeTimes();String key = messageExt.getKeys();if (reconsumeTimes > 2){OrderLog orderLog = new OrderLog();orderLog.setType(2);orderLog.setOrderid(key);orderLog.setUsername("重试超过2次,死信消息");orderMapper.insert(orderLog);System.out.println("将死信消息单独存入未处理消息表中"+new String(messageExt.getBody()));//发送短信通知人工处理return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}else {return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}}//CONSUME_SUCCESS成功  RECONSUME_LATER失败return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});//启动consumer.start();//挂起当前jvmSystem.in.read();//关闭 consumer.shutdown();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/749522.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《Python深度学习》阅读笔记

以下是《Python深度学习》一书中学习过程中记录的一些重要的专属名词和概念&#xff1a; 一、概念 深度学习&#xff08;Deep Learning&#xff09;&#xff1a;指使用多层神经网络进行机器学习的技术。神经网络&#xff08;Neural Network&#xff09;&#xff1a;一种模仿生…

SpringBoot打造企业级进销存储系统 第五讲

package com.java1234.repository;import com.java1234.entity.Menu; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Query;import java.util.List;/*** 菜单Repository接口*/ public interface MenuReposit…

第二百零五回

文章目录 概念介绍响应方法滑动事件点击事件 经验总结 我们在上一章回中介绍了如何给ListView添加分隔线,本章回中将介绍ListView响应事件相关的知识.闲话休提&#xff0c;让我们一起Talk Flutter吧。 概念介绍 我们在这里说的ListView响应事件主要分两种类型&#xff0c;一种…

【深度学习模型移植】用torch普通算子组合替代torch.einsum方法

首先不得不佩服大模型的强大之处&#xff0c;在算法移植过程中遇到einsum算子在ONNX中不支持&#xff0c;因此需要使用普通算子替代。参考TensorRT - 使用torch普通算子组合替代torch.einsum爱因斯坦求和约定算子的一般性方法。可以写出简单的替换方法&#xff0c;但是该方法会…

【Flask开发实战】项目介绍-防火墙规则查询系统

一、前言 硬件防火墙为常备主用网络安全设备&#xff0c;主要通过网络访问控制方式实现安全防护。 不同厂家防火墙的网络访问控制功能均采用同样的模式操作&#xff1a;防火墙配置若干条防火墙规则&#xff0c;当IP包到来&#xff0c;防火墙根据包的五元组属性&#xff08;协…

突破编程_前端_JS编程实例(工具栏组件)

1 开发目标 工具栏组件旨在模拟常见的桌面软件工具栏&#xff0c;所以比较适用于 electron 的开发&#xff0c;该组件包含工具栏按钮、工具栏分割条和工具栏容器三个主要角色&#xff0c;并提供一系列接口和功能&#xff0c;以满足用户在不同场景下的需求&#xff1a; 点击工具…

【MatLab】之:Simulink安装

一、内容简介 本文介绍如何在 MatLab 中安装 Simulink 仿真工具包。 二、所需原材料 MatLab R2020b&#xff08;教学使用&#xff09; 三、安装步骤 1. 点击菜单中的“附加功能”&#xff0c;进入附加功能管理器&#xff1a; 2. 在左侧的“按类别筛选”下选择Using Simulin…

Linux网络编程: IP协议详解

一、TCP/IP五层模型 物理层&#xff08;Physical Layer&#xff09;&#xff1a;物理层是最底层&#xff0c;负责传输比特流&#xff08;bitstream&#xff09;以及物理介质的传输方式。它定义了如何在物理媒介上传输原始的比特流&#xff0c;例如通过电缆、光纤或无线传输等。…

购票小程序有哪些功能

​通过小程序购买电子票&#xff0c;然后在使用时&#xff0c;出示电子票二维码&#xff0c;由商家进行验证/核销。通过小程序购票和核销&#xff0c;使得整个流程非常顺利&#xff0c;免去了线下购票的繁琐&#xff0c;而且还容易遗失。下面我们就来具体看一下小程序如何进行购…

Postman进行Websocket接口测试

Postman进行Websocket接口测试 前言下载地址使用1、new一个一个WebSocket Request2、填写内容和需要请求头携带的参数3、表示成功 网页请求101表示握手成功 前言 有些较低版本postman不支持websocket接口测试&#xff0c;如果根据此文未找到创建websocket接口测试的目录&#…

面向对象(C# )

面向对象&#xff08;C# &#xff09; 文章目录 面向对象&#xff08;C# &#xff09;ref 和 out传值调用和引用调用ref 和 out 的使用ref 和 out 的区别 结构体垃圾回收GC封装成员属性索引器静态成员静态类静态构造函数拓展方法运算符重载内部类和分布类 继承里氏替换继承中的…

Qt 鼠标滚轮示例

1.声明 void wheelEvent(QWheelEvent *event) override;2.实现&#xff08;方便复制、测试起见用静态变量&#xff09; #include <mutex> void MainWindow::wheelEvent(QWheelEvent *event) {static QLabel *label new QLabel("Zoom Level: 100%", this);st…

elementUi中表格超出一行省略,鼠标放入显示完整提示

一、想要的效果 二、代码&#xff0c;加入show-overflow-tooltip即可 <el-table-column min-width"220" prop"content" show-overflow-tooltip> </el-table-column>

PCB设计中的MARKER

今天在给板子布局的时候发现了一个这样的东西&#xff0c;名叫MARKER&#xff0c;查了一下这个东西分享一下&#xff1a; 目录 MARKER是什么样的&#xff1f; MARKER的用途&#xff1a; MARKER是必须的吗&#xff1f; MARKER是什么样的&#xff1f; 他在PCB中是这样的&…

web 课程

文章目录 格式图片超链接书签链接表格例子横跨束跨 格式 <br /> <br/> #换行图片 <img> 标签是用于在网页中嵌入图像的 HTML 标签&#xff0c;它有一些属性可以用来控制图像的加载、显示和交互。以下是对 <img> 标签常用属性的详细介绍&#xff1a;…

MySQL基础架构

文章目录 MySQL基础架构一、连接器 - 建立连接&#xff0c;权限认证二、查缓存 - 提高效率三、分析器 - 做什么四、优化器 - 怎么做五、执行器 - 执行语句六、存储引擎1、存储引擎的概述2、存储引擎的对比3、存储引擎的命令4、存储引擎的选择 MySQL基础架构 大体来说&#xff…

旅游管理系统 |基于springboot框架+ Mysql+Java+Tomcat的旅游管理系统设计与实现(可运行源码+数据库+设计文档)

推荐阅读100套最新项目 最新ssmjava项目文档视频演示可运行源码分享 最新jspjava项目文档视频演示可运行源码分享 最新Spring Boot项目文档视频演示可运行源码分享 目录 前台功能效果图 管理员功能登录前台功能效果图 系统功能设计 数据库E-R图设计 lunwen参考 摘要 研究…

Transformer面试题总结101道

在本文中&#xff0c;我们将回答一系列关于Transformer的问题&#xff0c;涵盖了从基础概念到高级应用的多个方面。无论您是准备面试、学习深度学习&#xff0c;还是对自然语言处理技术感兴趣&#xff0c;都希望本文能为您提供有益的启示和知识。 注&#xff0c;本文的面试题借…

idea中database的一些用法

1、查看表结构 方法1&#xff0c;右键&#xff0c;选这个 方法2 双击表后&#xff0c;看到数据&#xff0c;点DDL 方法3 写SQL时&#xff0c;把鼠标放在表名上&#xff0c;可以快速查看表结构 2、表生成对应的实体类 表中右键&#xff0c;选择这2个&#xff0c;选择生成的路…

FPGA和ASIC

前言 大家好&#xff0c;我是jiantaoyab&#xff0c;这是我所总结作为学习的笔记第16篇,在本篇文章给大家介绍FPGA和ASIC。 一个四核i7的CPU的晶体管中有20亿的晶体管&#xff0c;需要链接起20亿的晶体管可不是一件容易的事情&#xff0c;所以设计一个CPU需要用年来算&#x…