SpringBoot集成RabbitMq,RabbitMq消费与生产,消费失败重发机制,发送签收确认机制

RabbitMq消费与生产,消费失败重发机制,发送确认机制,消息发送结果回执

  • 1. RabbitMq集成spring boot
    • RabbitMq集成依赖
    • RabbitMq配置
    • RabbitMq生产者,队列,交换通道配置,消费者示例
  • 2. RabbitMq消息确认机制
    • 消息确认机制分自动确认,和手动确认
  • 3. 消息重发机制
    • 消息重发配置
    • 消息重发如何触发
  • 4. 延时消息队列
  • 5. 接收返回结果队列
    • 尚未研究后续用到补充
  • 6. 遇到的报错
    • 启动报错 Channel shutdown: channel error; protocol method:

1. RabbitMq集成spring boot

  • RabbitMq集成依赖

           这里spring-boot依赖版本为2.3.7版本,RabbitMq集成amqp包,版本在spring-boot中有涵盖,不单独指明版本了。

    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.7.RELEASE</version>
    </parent><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>2.3.7.RELEASE</version><type>pom</type></dependency></dependencies>
    </dependencyManagement>	<dependencies><!-- rabbitMQ --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
    </dependencies>
  • RabbitMq配置

    spring:rabbitmq:# 基础项host:  ip地址port: 端口username:  用户名password: 密码# virtualhost需要提前在MQ的Web管理界面里手动创建,或者配置默认host"/"virtual-host: /# 生产者#确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated#开启消息发送确认机制,默认为false#如果没有本条配置信息,当消费者收到生产者发送的消息后,生产者无法收到确认成功的回调信息publisher-confirms: true#支持消息发送失败返回队列,默认为falsepublisher-returns: true# 消费者listener:type: simplesimple:#自动签收auto  手动 manualacknowledge-mode: auto#个字段一定要设置成 false 不然无法消费的数据不会进入死信队列的default-requeue-rejected: falseprefetch: 1 #限制每次发送一条数据max-concurrency: 1 #启动消费者最大数量concurrency: 1 #同一个队列启动几个消费者retry:enabled: true #是否支持重试max-attempts: 3         # 最大重试次数,默认为3initial-interval: 30s # 重试间隔时间,默认1000(单位毫秒)max-interval: 120s # 重试最大间隔# 时间间隔的乘子,下一次间隔的时间=间隔时间 × 乘子,但最大不超过重试最大间隔multiplier: 1	```
  • RabbitMq生产者,队列,交换通道配置,消费者示例

    Exchange 交换机配置

    @Component
    public class DnfxExchangeConfig {@AutowiredRabbitMqConfig rabbitMqConfig;/*** topic交换机起名* 如果rabbitmq设置的类型是topic 就用topic类型的Exchange** @return*/@BeanTopicExchange dnfxOrderExchange() {return new TopicExchange(rabbitMqConfig.getFxexchange());}
    }
    

    队列queue配置

    @Component
    public class DnfxQueueConfig {@AutowiredRabbitMqConfig rabbitMqConfig;/*** 队列起名** @return*/@Beanpublic Queue dnfxOrderQueue() {Map<String, Object> argsMap = new HashMap<String, Object>();//队列优先级  argsMap.put("x-max-priority", 5);//true 是否持久 return new Queue(rabbitMqConfig.getFxqueue(), true, false, false, argsMap);}
    }
    

    将队列和交换机绑定, 并设置用于匹配键

     @Component
    public class DnfxRoutingConfig {@AutowiredRabbitMqConfig rabbitMqConfig;@AutowiredDnfxQueueConfig queueConfig;@AutowiredDnfxExchangeConfig exchangeConfig;/*** 绑定:将队列和交换机绑定, 并设置用于匹配键 myDirectRouting** @return*/@BeanBinding bindingOrderRouting() {return BindingBuilder.bind(queueConfig.dnfxOrderQueue()).to(exchangeConfig.dnfxOrderExchange()).with(rabbitMqConfig.getFxrouting());}
    }
    

    配置加载

    @Configuration
    @ConfigurationProperties(prefix = "xx.mq")
    @Data
    public class RabbitMqConfig {private String fxqueue;private String fxexchange;private String fxrouting;	
    }	
    

    RabbitTemplate

    @Configuration
    public class DnfxRabbitMqConfig {@AutowiredRabbitMqConfig rabbitMqConfig;@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}
    }
    

    生产者

    @Component
    public class DemoTestProduce {@Autowiredprivate RabbitTemplate rabbitTemplate;@AutowiredRabbitMqConfig rabbitMqConfig;public void sendDemoMsg() {String message = "测试消息发送";rabbitTemplate.convertAndSend(rabbitMqConfig.getFxexchange(), rabbitMqConfig.getFxrouting(), message);}
    }
    

    消费者

    @Component
    public class DnfxAliBbMessageListener {private final static Logger logger = LoggerFactory.getLogger(DnfxAliBbMessageListener.class);@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues = "${xx.mq.fxqueue}")public void listenSimpleQueueMessage(String msg) throws IOException {logger.info("接收到的1688回执消息:{}", msg);}
    }
    

2. RabbitMq消息确认机制

  • 消息确认机制分自动确认,和手动确认

    消息确认签收配置

      # 消费者
    listener:type: simplesimple:#自动签收 auto  手动 manualacknowledge-mode: auto
    

           消息确认签收机制不过多赘述,网上有大把说明,这里简单描述一下,以及记录一下个人使用心得。
           acknowledge-mode: auto 配置为自动签收时候,消息送达至消费者手上后,Mq自动签收,并移除消息出消息队列。
           acknowledge-mode: false 配置为手动签收时候,消息送达至消费者后,消费者需要手动触发签收动作,如果消费者没有发送ACK消息,RabbitMQ服务器就会认为该消息还没有被消费,会将该消息重新发送给其他消费者。例如下图,手动签收模式,没有主动向MQ发送签收讯息,那么当前消费的这条消息会被标记为Unacked
    在这里插入图片描述
    关于签收 确认机制可以参考 https://blog.csdn.net/qq_42331185/article/details/131696949 ,这里贴部分这个博主的结论

    	@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues = "${xx.mq.fxqueue}")public void listenSimpleQueueMessage(Message message, Channel channel) throws IOException {String msgBody = new String(message.getBody());logger.info("接收到的1688回执消息:{}", msgBody);long deliveryTag = message.getMessageProperties().getDeliveryTag();channel.basicReject(deliveryTag, false);
    

    deliveryTag:消息传递标签,格式为序列号,必须使用这个标签,不然信道会关闭,详情下面会说到
    multiple:为true则表示序号deliverTag之前的消息均被确认或拒绝(basicNack),false表示当前消息。为true的时候就可以做到批量确认
    requeue:为true表示,失败的消息将会重新排队,不会丢弃或者死信,为false则表示丢弃

     1、消息成功签收  basicAck(deliveryTag,multiple)channel.basicAck(message.getEnvelope().getDeliveryTag(), false);2、失败确认 basicNack(deliveryTag,multiple,requeue)channel.basicNack(message.getEnvelope().getDeliveryTag(),false, true);3、失败确认:basicReject(deliveryTag,requeue)channel.basicReject(message.getEnvelope().getDeliveryTag(), true);
    

    注:关于以上手动确认multiple属性为true时,批量确认这个元素个人未进行验证,失败确认requeue为true时,当前消息会重新丟至MQ队列中,等待下次消费(已验证)
           关于消息确认机制,自动确认可能导致消息丢失,如果单条消息发送至消费者后,消费者处理报错,最多触发消息重发机制,重发达到重发上限后,便会抛弃此消息,造成消息丢失。
           手动确认签收,千万不要在cath中或者final中进行失败重发签收,即basicNack basicReject 失败签收时requeue 为true,否则当前消息若真为异常消息,此消息会一直消费,失败签收,重新排队,进行循环,导致消息积压或者资源浪费

3. 消息重发机制

  • 消息重发配置

           注意: 如果遗漏 max-interval multiplier两个属性,消息重发机制仍会生效,但是重发间隔时间为默认10秒重发, initial-interval 重发间隔时间将不会生效。此处已验证,尚未确认是bug或者本身就是联动配置

     # 消费者
    listener:type: simplesimple:retry:enabled: true #是否支持重试max-attempts: 3         # 最大重试次数,默认为3initial-interval: 30s # 重试间隔时间,默认1000(单位毫秒)max-interval: 120s # 重试最大间隔# 时间间隔的乘子,下一次间隔的时间=间隔时间 × 乘子,但最大不超过重试最大间隔multiplier: 1
    
  • 消息重发如何触发

            消息重发机制,与消息确认签收机制是两种不同的机制,这个概念不要弄混了,消息确认签收机制亦可以将消息重新放入队列进行二次消费
           消息重发机制,在消费者进行消费时,如果rabbitmq开启了消息重发机制,当消费者处理消息时候抛出了异常,即触发消息重发机制,注意,处理消息逻辑不要用try-catch捕捉异常,异常被捕捉后,会抛出异常信息,但不会影响代码正常执行,amqp aop会视为正常消费,不会触发重发机制。

    	@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues = "${zcwl.mq.fxqueue}")public void listenSimpleQueueMessage(Message message, Channel channel) throws IOException {String msgBody = new String(message.getBody());logger.info("接收到的1688回执消息:{}", msgBody);long deliveryTag = message.getMessageProperties().getDeliveryTag();//此处会抛出异常int a = 1 / 0;//确认签收机制为手动签收,一定要进行签收,否则触发重发机制后,此条消息仍会被标记为unackedchannel.basicReject(deliveryTag, false);
    

4. 延时消息队列

       延时消息队列需要配合RabbitMq延时消息队列插件使用,安装延时消息队列插件此处不赘述,网上搜一大把
       延时消息队列创建队列以及绑定key时没什么特殊的,在创建exchange交换机时,需要注意选项,如下图所示即可。
       x-delayed-type = redirect 如果不能创建,报错时,那么=topic也是可以的
在这里插入图片描述
注册exchange交换机时候,注意给入 x-delayed-type 参数,队列注册,以及队列交换机绑定与普通队列一样即可

	@BeanCustomExchange dnfxOrderDelayExchange() {Map<String, Object> args = new HashMap<String, Object>();args.put("x-delayed-type", "topic");return new CustomExchange(rabbitMqConfig.getFxOrderDelayExchange(), "x-delayed-message", true, false, args);}

测试发送延时消息方法,队列监听与普通消息一样即可

 public void sendDelayMsg() {System.out.println(LocalDateTime.now() + ":发送延时消息");String message = "这里是测试延时发送消息";this.rabbitTemplate.convertAndSend(rabbitMqConfig.getFxOrderDelayExchange(), rabbitMqConfig.getFxOrderDelayRouting(), message, message1 -> {//delay的单位是毫秒message1.getMessageProperties().setDelay(1000 * 60);return message1;});}

5. 接收返回结果队列

尚未研究后续用到补充

6. 遇到的报错

  • 启动报错 Channel shutdown: channel error; protocol method:

    报错详情:

    Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'fx-bb-msg-exchange' in vhost '/': received 'direct' but current is 'topic', class-id=40, method-id=10)
    

           此错误为注册交换机时候抛出的错误,错误信息为注册交换机的属性,与RabbitMq已经创建好的交换机属性不一致,程序试图修改属性报错。
           错误示范:
           当前exchange交换机创建时候,创建的类型Type为topic类型,在注册exchange交换机时,返回的却是DirectExchange,那么系统便会尝试修改属性,从而引发报错
    在这里插入图片描述
    在这里插入图片描述
           修复方式:
           创建时,返回TopicExchange即可,与 创建的交换机类型保持一致
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/625165.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【例7.5】 取余运算(mod) 快速幂

1326&#xff1a;【例7.5】 取余运算&#xff08;mod&#xff09; 时间限制: 1000 ms 内存限制: 65536 KB 【题目描述】 输入b&#xff0c;p&#xff0c;k的值&#xff0c;求bpmodk 的值。其中b&#xff0c;p&#xff0c;kk为长整型数。 【输入】 输入b&#xff0c;p&#xf…

Scott用户数据表的分析

Oracle从入门到总裁:https://blog.csdn.net/weixin_67859959/article/details/135209645 如果想要知道某个用户所有的数据表: select * from tab; 此时结果中一共返回了四张数据表&#xff0c;分别为部门表&#xff08;dept&#xff09; &#xff0c;员工表&#xff08;emp&a…

【LV12 DAY20 RTC实验】

编程实现通过LED状态显示当前电压范围&#xff0c;并打印产生低压警报时的时间 注&#xff1a; 电压在1501mv~1800mv时&#xff0c;LED2、LED3、LED4、LED5点亮 电压在1001mv~1500mv时&#xff0c;LED2、LED3、LED4点亮 电压在501mv~1000mv时&#xff0c;LED2、LED3点亮 电压在…

HTML--CSS--浮动布局及定位布局

正常文档布局 块元素独占一行 行内元素在有多个的时候&#xff0c;就是从左到右排在一行 块元素包括&#xff1a;div,p,hr 行内元素&#xff1a;span,i,img 浮动布局 float 属性&#xff1a; left 向左 right 向右 作用我目前看起来就是浮动元素的宽度是由内容决定的&#x…

HDFS和MapReduce综合实训

文章目录 第1关&#xff1a;WordCount词频统计第2关&#xff1a;HDFS文件读写第3关&#xff1a;倒排索引第4关&#xff1a; 网页排序——PageRank算法 第1关&#xff1a;WordCount词频统计 测试说明 以下是测试样例&#xff1a; 测试输入样例数据集&#xff1a;文本文档test1…

Java实战之每日海报

前言 使用java生成每日海报。 项目起因是巧合下遇到了一篇很棒的文档&#xff0c;说的是用程序来实现每日生成一个海报。如果之后加上自动发布的功能&#xff0c;简直就是太棒了啊&#xff01; 样例图如下&#xff1a; 每日海报 思路 访问某词站的API获取网络图片&#…

Java持久层框架之争:选择最佳方案来提升你的开发效率!

1、前言 在现代软件开发领域&#xff0c;选择适合的持久层框架是至关重要的一步。持久层框架可以帮助我们管理数据访问、数据库连接、事务处理等复杂的数据库操作&#xff0c;从而提升开发效率和代码质量。 然而&#xff0c;在众多的Java持久层框架中&#xff0c;选择最佳方案并…

算法通关村番外篇-LeetCode编程从0到1系列五

大家好我是苏麟 , 今天带来算法通关村番外篇-LeetCode编程从0到1系列五 . 数学 1523. 在区间范围内统计奇数数目 描述 : 给你两个非负整数 low 和 high 。请你返回 low 和 high 之间&#xff08;包括二者&#xff09;奇数的数目。 题目 : LeetCode 1523. 在区间范围内统计奇…

Spring Data JPA 踩过的坑实录

前言 游戏中台一直在使用spring 全家桶&#xff0c; 本文会左右使用Spring Data JPA的坑点记录总结 主要给大家总结介绍了关于使用Spring JPA注意事项及踩过的坑。 案例1&#xff1a; 为什么只调用了 org.springframework.data.repository.CrudRepository#findById(ID id) 却…

孤儿进程与僵尸进程以及僵尸进程的解决

孤儿进程&#xff1a; 定义&#xff1a; 父进程运行结束&#xff0c;但子进程还在运行&#xff08;未运行结束&#xff09;&#xff0c;这样的子进程就称为孤儿进程&#xff08; Orphan Process &#xff09;。 过程&#xff1a; 每当出现一个孤儿进程的时候&#xff0c;内核就…

rtklib读取原始数据是一次读取了一个文件的全部数据

一般来说&#xff0c;rtklib读取观测值文件&#xff08;o文件&#xff09;和导航文件&#xff08;n文件&#xff09;进行解算。 读取文件的时候&#xff0c;并非一次读取一个历元&#xff0c;而是将一个文件所有历元的数据都读取完毕以后&#xff0c;再进行解算。 这看起来是…

《C++大学教程》4.34阶乘

题目&#xff1a; 对一个非负整数n来说&#xff0c;它的阶乘可以写成 n! (读作“n的阶乘”)&#xff0c;其计算公式定义如下&#xff1a; n! n x (n-1) x (n-2)x......x1&#xff08;对于大于1的 n &#xff09; 和 n! 1 ( 对于等于0或者等于1的n ) 例如&#xff0c;5&…

重学Java 6 流程控制语句

我与我&#xff0c;至死不渝 ——24.1.15 模块重点&#xff1a; ①会使用Scanner和Random ②会使用switch以及知道case的穿透性 ③会使用if ④会使用for循环&#xff0c;while循环&#xff0c;嵌套循环 一、键盘录入_Scanner 1.概述&#xff1a;是Java定义好的一个类 2.作用&am…

网络安全等级保护测评规划与设计

笔者单位网络结构日益复杂&#xff0c;应用不断增多&#xff0c;使信息系统面临更多的风险。同时&#xff0c;网络攻防技术发展迅速&#xff0c;攻击的技术门槛随着自动化攻击工具的应用也在不断降低&#xff0c;勒索病毒等未知威胁也开始泛滥。基于此&#xff0c;笔者单位拟进…

一篇文章带你搞懂多线程面试相关的一些问题

目录 1.Callable接口 1.1使用Callable接口来创建线程 1.1相关面试题&#xff1a; 介绍下 Callable 是什么 2.JUC常见的类&#xff08;java.util,concurrent) 2.1ReentrantLock ReentrantLock和sychronized的区别 3.信号量 4.CountDownLatch 5.线程安全的集合类 5.1多线…

yolov7_Obb环境安装

下载obb代码之后&#xff0c;除了安装python和pytorch环境&#xff0c;由于还需要编译nms部分的c代码&#xff0c;因此还需要安装Visual Studio. 这里推荐安装Visual Studio2019版本。 然后在系统环境中配置环境变量 C:\Program Files (x86)\Microsoft Visual Studio\2019\Co…

案例127:基于微信小程序的预约挂号系统

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…

文件按名称分类,批量归类到指定文件夹

我们的生活中充满了各种各样的文件&#xff1a;工作报告、家庭照片、旅行纪念品等&#xff0c;然而文件管理却是一个让人头疼的问题。你是否也曾在寻找某些文件名的重要文件&#xff0c;却因为文件混乱无章的堆放而感到烦躁不安&#xff1f;现在&#xff0c;有了我们【文件批量…

HTML--JavaScript--引入方式

啊哈~~~基础三剑看到第三剑&#xff0c;JavaScript HTML用于控制网页结构 CSS用于控制网页的外观 JavaScript用于控制网页的行为 JavaScript引入方式 引入的三种方式&#xff1a; 外部JavaScript 内部JavaScript 元素事件JavaScript 引入外部JavaScript 一般情况下网页最好…

积极参与建设“一带一路”,川宁生物与微构工场达成战略合作

2024年1月12日&#xff0c;北京微构工场生物技术有限公司&#xff08;以下简称“微构工场”&#xff09;与伊犁川宁生物技术股份有限公司&#xff08;“川宁生物”&#xff09;宣布签订战略合作协议&#xff0c;双方将共同出资设立合资公司&#xff0c;加速生物制造产业化落地&…