rabbitmq消息队列,消息发送失败,消息持久化,消费者处理失败相关

转:https://blog.csdn.net/u014373554/article/details/92686063

项目是使用springboot项目开发的,前是代码实现,后面有分析发送消息失败、消息持久化、消费者失败处理方法和发送消息解决方法及手动确认的模式

先引入pom.xml

<!--rabbitmq-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application 配置文件

spring:
rabbitmq:host: IP地址port: 5672username: 用户名password: 密码RabbitConfig配置文件
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;/**Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。Queue:消息的载体,每个消息都会被投到一个或多个队列。Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.Routing Key:路由关键字,exchange根据这个关键字进行消息投递。vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。Producer:消息生产者,就是投递消息的程序.Consumer:消息消费者,就是接受消息的程序.Channel:消息通道,在客户端的每个连接里,可建立多个channel.
*/
@Configuration
@Slf4j
public class RabbitConfig {@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;public static final String EXCHANGE_A = "my_mq_exchange_A";public static final String EXCHANGE_B = "my_mq_exchange_B";public static final String EXCHANGE_C = "my_mq_exchange_C";public static final String QUEUE_A="QUEUE_A";public static final String QUEUE_B="QUEUE_B";public static final String QUEUE_C="QUEUE_C";public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";public static final String ROUTINGKEY_C = "spring-boot-routingKey_C";@Beanpublic ConnectionFactory connectionFactory(){CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost("/");connectionFactory.setPublisherConfirms(true); //设置发送消息失败重试connectionFactory.setChannelCacheSize(100);//解决多线程发送消息return connectionFactory;}@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate rabbitTemplate(){RabbitTemplate template = new RabbitTemplate(connectionFactory());template.setMandatory(true); //设置发送消息失败重试return template;}//配置使用json转递数据@Beanpublic Jackson2JsonMessageConverter producerJackson2MessageConverter() {return new Jackson2JsonMessageConverter();}/*public SimpleMessageListenerContainer messageListenerContainer(){SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageHandler());adapter.setDefaultListenerMethod(new Jackson2JsonMessageConverter());return container;}*//*** 针对消费者配置* 1. 设置交换机类型* 2. 将队列绑定到交换机* FanoutExchange: 将消息分发到所有的绑定队列,无 routingkey的概念* HeadersExchange: 通过添加属性key - value匹配* DirectExchange: 按照routingkey分发到指定队列* TopicExchange : 多关键字匹配* @return*/@Beanpublic DirectExchange defaultExchange(){return new DirectExchange(EXCHANGE_A,true,false);}@Beanpublic Queue queueA(){return  new Queue(QUEUE_A,true);// 队列持久化}@Beanpublic Queue queueB(){return  new Queue(QUEUE_B,true);// 队列持久化}/*** 一个交换机可以绑定多个消息队列,也就是消息通过一个交换机,可以分发到不同的队列当中去。* @return*/@Beanpublic Binding binding(){return BindingBuilder.bind( queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);}@Beanpublic Binding bindingB(){return BindingBuilder.bind( queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);}}

生成者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;/*** 生产者*/
@Component
@Slf4j
public class ProducerMessage implements  RabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnCallback{private RabbitTemplate rabbitTemplate;@Autowiredpublic ProducerMessage(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;rabbitTemplate.setConfirmCallback(this::confirm); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容rabbitTemplate.setReturnCallback(this::returnedMessage);rabbitTemplate.setMandatory(true);}public void  sendMsg (Object content){CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A,RabbitConfig.ROUTINGKEY_A,content,correlationId);}/*** 消息发送到队列中,进行消息确认* @param correlationData* @param ack* @param cause*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info(" 消息确认的id: " + correlationData);if(ack){log.info("消息发送成功");//发送成功 删除本地数据库存的消息}else{log.info("消息发送失败:id "+ correlationData +"消息发送失败的原因"+ cause);// 根据本地消息的状态为失败,可以用定时任务去处理数据}}/*** 消息发送失败返回监控* @param message* @param i* @param s* @param s1* @param s2*/@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {log.info("returnedMessage [消息从交换机到队列失败]  message:"+message);}
}

消费者

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;/*** 消费者*/@Slf4j
@Componentpublic class ComsumerMessage {@RabbitListener(queues = RabbitConfig.QUEUE_A)public void handleMessage(Message message,Channel channel) throws  IOException{try {String json = new String(message.getBody());JSONObject jsonObject = JSONObject.fromObject(json);log.info("消息了【】handleMessage" +  json);int i = 1/0;//业务处理。/*** 防止重复消费,可以根据传过来的唯一ID先判断缓存数据中是否有数据* 1、有数据则不消费,直接应答处理* 2、缓存没有数据,则进行消费处理数据,处理完后手动应答* 3、如果消息 处理异常则,可以存入数据库中,手动处理(可以增加短信和邮件提醒功能)*///手动应答channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch (Exception e){log.error("消费消息失败了【】error:"+ message.getBody());log.error("OrderConsumer  handleMessage {} , error:",message,e);// 处理消息失败,将消息重新放回队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);}}}

发送消息:调用生成的方法

import com.zz.blog.BlogApplicationTests;
import com.zz.blog.mq.ProducerMessage;
import net.sf.json.JSONObject;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.UUID;
public class Message extends BlogApplicationTests {@Autowiredprivate ProducerMessage producerMessage;@Testpublic void sendMessage(){JSONObject jsonObject = new JSONObject();jsonObject.put("id", UUID.randomUUID().toString());jsonObject.put("name","TEST");jsonObject.put("desc","订单已生成");//防止发送消息失败,将发送消息存入本地。producerMessage.sendMsg(jsonObject.toString());}
}

rabbitTemplate的发送消息流程是这样的:
1 发送数据并返回(不确认rabbitmq服务器已成功接收)
2 异步的接收从rabbitmq返回的ack确认信息
3 收到ack后调用confirmCallback函数
注意:在confirmCallback中是没有原message的,所以无法在这个函数中调用重发,confirmCallback只有一个通知的作用

在这种情况下,如果在2,3步中任何时候切断连接,我们都无法确认数据是否真的已经成功发送出去,从而造成数据丢失的问题。

最完美的解决方案只有1种:
使用rabbitmq的事务机制。
但是在这种情况下,rabbitmq的效率极低,每秒钟处理的message在几百条左右。实在不可取。

基于上面的分析,我们使用一种新的方式来做到数据的不丢失。
在rabbitTemplate异步确认的基础上
1 在本地缓存已发送的message
2 通过confirmCallback或者被确认的ack,将被确认的message从本地删除
3 定时扫描本地的message,如果大于一定时间未被确认,则重发

当然了,这种解决方式也有一定的问题
想象这种场景,rabbitmq接收到了消息,在发送ack确认时,网络断了,造成客户端没有收到ack,重发消息。(相比于丢失消息,重发消息要好解决的多,我们可以在consumer端做到幂等)。

消息存入本地:在message 发消息的写数据库中。

消息应答成功,则删除本地消息,失败更改消息状态,可以使用定时任务去处理。

消息持久化:

消费者: 

/*** 防止重复消费,可以根据传过来的唯一ID先判断缓存数据库中是否有数据* 1、有数据则不消费,直接应答处理* 2、缓存没有数据,则进行消费处理数据,处理完后手动应答* 3、如果消息 处理异常则,可以存入数据库中,手动处理(可以增加短信和邮件提醒功能)*/

转载于:https://www.cnblogs.com/duende99/p/11597619.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/461349.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于nginx实现缓存功能及uptream模块详细使用方法

基于nginx实现缓存功能及uptream模块详细使用方法一般情况下&#xff0c;前端使用nginx做代理或7层负载并向后实现varish/squid做cache server的效果要好的多nginx与squid做缓存比较nginx比squid有着巨大的优势表现在&#xff1a;nginx是异步假如后端的web服务器&#xff0c;当…

linux中的信号1——进程如何处理信号?

以下内容源于朱有鹏嵌入式课程的学习与整理&#xff0c;如有侵权请告知删除。 一、信号概述 1、信号是内容受限的一种异步通信机制 &#xff08;1&#xff09;信号的目的&#xff1a;用来进程间通信&#xff08;IPC&#xff09;、进程和内核间的通信。 &#xff08;2&#xff…

单片机oled显示浮点数函数_问中文编程在单片机上实现一个电子时钟,总共有几步?...

首先介绍开发一下语言&#xff1a;mcuScript&#xff0c;他是一个支持同时支持中文编程和英文编程的弱数据类型的脚本语言。相关介绍可参考前面的两篇文章:彭贞&#xff1a;mcuScript一个洋名字的中文(汉语)编程语言&#xff0c;初体验​zhuanlan.zhihu.com彭贞&#xff1a;mcu…

RabbitMQ的消息确认机制

转&#xff1a;https://www.toutiao.com/a6583957771840913934/?tt_frommobile_qq&utm_campaignclient_share&timestamp1532999387&appnews_article&utm_sourcemobile_qq&iid39062783162&utm_mediumtoutiao_android 一&#xff1a;确认种类 RabbitMQ的…

javascript 文件的同步加载与异步加载

原文:[转载]javascript 文件的同步加载与异步加载HTML 4.01 的script属性 charset: 可选。指定src引入代码的字符集&#xff0c;大多数浏览器忽略该值。 defer: boolean, 可选。延迟脚本执行&#xff0c;相当于将script标签放入页面body标签的底部&#xff0c;js脚本会在docume…

linux中的信号2——alarm、pause函数

以下内容源于朱有鹏嵌入式课程的学习与整理&#xff0c;如有侵权请告知删除。 1、alarm函数 函数原型 函数说明 &#xff08;1&#xff09;内核以API形式提供的闹钟&#xff1b; &#xff08;2&#xff09;可以为当前进程定义闹钟&#xff0c;时间到了会发出SIGALRM信号。 &…

easyexcel 填充模板 格式变了_Qamp;A | 如何制作规范的电子合同模板?

表单中使用电子合同时&#xff0c;不仅需要进行功能设置&#xff0c;还需要制作一份PDF格式的模板&#xff0c;上传到“合同模板”中&#xff0c;才能生成有效的电子合同&#xff0c;接下来&#xff0c;我们一起了解模板的制作方法。制作方法 1. 准备PDF模板首先准备一份PDF格式…

edoc2中标电子文件管理标准ERMS制定项目

2019独角兽企业重金招聘Python工程师标准>>> 据悉&#xff0c;上海鸿翼数字计算机网络有限公司&#xff08;edoc2&#xff09;参加了电子文件管理标准ERMS制定项目的投标&#xff0c;经过精心准备和专业的答辩&#xff0c;凭借多年在电子文件管理领域长期的探索和研…

文件IO——如何实现非阻塞式IO?

以下内容源于朱有鹏嵌入式课程的学习与整理&#xff0c;如有侵权请告知删除。 一、阻塞式IO 1、阻塞式的概念 我们知道&#xff0c;有些函数在调用时&#xff08;比如网络编程中的recv函数&#xff09;&#xff0c;如果某些条件不满足&#xff0c;则会进入等待状态&#xff0c…

散点画三维曲面图_UG 复杂曲面合金零件的数控加工

随着柔性制造、机床数控技术的飞速发展&#xff0c;具有复杂、 精密、小批量、多品种的曲面零件越来越多&#xff0c;如何利用数控 加工技术高质量、高效率加工该类零部件是很有研究价值 的。 本研究利用 UG 软件对复杂曲面合金零件进行三维实 体建模、设计加工工艺过程&#x…

扫盲:php session缓存至memcached中的方法

memcached是一套分布式的快取系统&#xff0c;当初是Danga Interactive为了LiveJournal所发展的&#xff0c;但被许多软件&#xff08;如MediaWiki&#xff09;所使用。这是一套开放源代码软件&#xff0c;以BSD license授权协议发布。[1]memcached仅支持一些非常简单的命令 比…

使用juniversalchardet做字符编码识别

为什么80%的码农都做不了架构师&#xff1f;>>> 在抓取网站的页面的时候最烦人的一件事情之一就是识别原站点的编码&#xff0c;通常来说只有GBK&#xff08;GB2312&#xff09;和UTF8两种&#xff0c;不过依旧需要读取大量Http头信息来识别&#xff0c;有些网站则…

获取系统信息1——linux系统中的时间

以下内容源于朱有鹏嵌入式课程的学习与整理&#xff0c;如有侵权请告知删除。 一、关于时间的概念 1、GMT时间 GMT是格林尼治时间&#xff0c;即格林尼治地区的当地时间。用格林尼治的当地时间作为全球国际时间&#xff0c;用以描述全球性的事件的时间&#xff0c;方便大家记忆…

判断一个字符串是否为回文-链队(新建,进队,出队),链栈(新建,进栈,出栈)...

回文&#xff1a;字符对称排列的字符串&#xff0c;例如ABCBA 思路&#xff1a;根据队&#xff1a;先进先出和栈: 先进后出的原则&#xff0c;进行比较出队和出栈的字符是否相等。如果相等&#xff0c;则为回文。 创建控制台应用程序。 1 #region 字符节点类 2 …

句法依存分析_复旦大学邱锡鹏教授:词法、句法分析研究进展综述

本文为第十六届自然语言处理青年学者研讨会 YSSNLP2019 报告《词法、句法分析研究进展综述》的简要文字整理&#xff0c;本报告主要回顾词法、句法领域的最新研究进展。 关于报告人&#xff1a;邱锡鹏&#xff0c;复旦大学计算机科学技术学院副教授&#xff0c;博士生导师。于复…

获取系统信息3——proc文件系统介绍和使用

以下内容源于朱有鹏嵌入式课程的学习与整理&#xff0c;如有侵权请告知删除。 一、proc文件系统介绍 1、操作系统级别的调试一般很困难 简单的程序可以单步调试&#xff1b;复杂一点的程序可以printf、cout等打印信息调试&#xff08;即输出信息到控制台&#xff09;&#xff0…

阻止函数源码在控制台输出

这是一个很贱的技能&#xff0c;我在谷歌控制台源码里看到的。相信大家都知道&#xff0c;在控制台里只输入函数名&#xff0c;不输入 () 然后按回车&#xff0c;就可以输出源码。 都不会陌生吧&#xff0c;这也有助于我们调试&#xff0c;是个很棒的技巧。不过系统内置的就会输…

值不值得入手_iPhone11现在还值不值得入手?真实用户说出心里话

iPhone11作为苹果走量的一款机型&#xff0c;自发布以来就备受争议&#xff0c;有的朋友说真香&#xff0c;A13iOS只卖4000多&#xff0c;还有的朋友吐槽大黑边、828P的屏幕、信号不好还有充电太慢&#xff0c;特别是现在同价位能买到的安卓旗舰&#xff0c;要5G有5G、要高刷新…

设备驱动,字符设备驱动、(总线)设备驱动模型、sysfs文件系统、平台设备驱动

以下内容转载于微信公众号&#xff1a;嵌入式企鹅圈。如有侵权&#xff0c;请告知删除。 学习Linux设备驱动开发的过程中自然会遇到字符设备驱动、平台设备驱动、设备驱动模型和sysfs等相关概念和技术。 对于初学者来说会非常困惑&#xff0c;甚至对Linux有一定基础的工程师而言…

对于局部变量_对于SQL常用查询优化方法的整理

查询进行优化&#xff0c;应尽量避免全表扫描&#xff0c;首先应考虑在where 及order by 涉及的列上建立索引:尝试下面的技巧以避免优化器错选了表扫描&#xff1a;使用ANALYZE TABLE tbl_name为扫描的表更新关键字分布。对扫描的表使用FORCE INDEX告知MySQL&#xff0c;相对于…