Rabbitmq消息丢失-生产者消息丢失(一)

说明:消息生产者在将数据发送到Mq的时候,可能由于网络等原因造成数据投递失败。

消息丢失大致分三种:这里说的是生产者消息丢失

分析原因:

1.有没有一种可能,我刚发送消息,消息还没有到交换机就断网了,是不是消息就没有发送成功,这个时候如果不对这种情况处理,消息是不是就丢失了

2.又有没有一种可能,我又发送了一条消息,交换机拿到消息后正要发送给某个队列,就是你,你把那个队列给删掉了,这个时候消息找不到队列,消息就也丢失了

解决方法:

1.事务:Rabbitmq提供了事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。

缺点:RabbitMQ 事务机制是同步的,提交一个事务之后会阻塞在那儿,采用这种方式基本上吞吐量会下来,太耗性能了

2.confirm机制:相比于事务的同步,confirm机制是异步的,你发送完这个消息之后就可以发送下一个消息,RabbitMQ 接收了之后会异步回调confirm接口通知你这个消息接收到了。一般在生产者这块解决数据丢失,建议使用 confirm 机制。

话不多说,干代码

工程图:

1.pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><artifactId>spring-boot-starter-parent</artifactId>  <!-- 被继承的父项目的构件标识符 --><groupId>org.springframework.boot</groupId>  <!-- 被继承的父项目的全球唯一标识符 --><version>2.2.2.RELEASE</version>  <!-- 被继承的父项目的版本 --></parent><groupId>MqLossDemo</groupId><artifactId>MqLossDemo</artifactId><version>1.0-SNAPSHOT</version><packaging>war</packaging><name>MqLossDemo Maven Webapp</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><!--spring boot核心--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--spring boot 测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--springmvc web--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--开发环境调试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><optional>true</optional></dependency><!--amqp 支持--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.1.7.RELEASE</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.78</version></dependency><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.6</version></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.10</version></dependency></dependencies><build><finalName>MqLossDemo</finalName><pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --><plugins><plugin><artifactId>maven-clean-plugin</artifactId><version>3.1.0</version></plugin><!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging --><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin><plugin><artifactId>maven-war-plugin</artifactId><version>3.2.2</version></plugin><plugin><artifactId>maven-install-plugin</artifactId><version>2.5.2</version></plugin><plugin><artifactId>maven-deploy-plugin</artifactId><version>2.8.2</version></plugin></plugins></pluginManagement></build>
</project>

2.application.yml

server:port: 8080
spring:rabbitmq:port: 5672host: 你的 rabbitmq IPusername: adminpassword: adminvirtual-host: /# 发送者开启 confirm 确认机制publisher-confirm-type: correlated# 发送者开启 return 确认机制publisher-returns: truetemplate:#在配置文件中配置 mandatory: true 页无用,需要在RabbitTemplate中手动设置mandatory: true

3.RabbitMqConfig

package com.dev.config;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 类名称:** @author 李庆伟* @date 2024年03月04日 14:12*/
@Configuration
public class RabbitMqConfig {@Beanpublic ConfirmCallbackService confirmCallbackService() {return new ConfirmCallbackService();}@Beanpublic ReturnCallbackService returnCallbackService() {return new ReturnCallbackService();}@Beanpublic RabbitTemplate rabbitTemplate(@Autowired CachingConnectionFactory factory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);//生产者发送消息到Mq交换机回执,手动ack回执回调处理//可以理解为:消息推送到server,但是在server里找不到交换机//如果想看效果【先清除交换机和队列】:在工程运行前注释掉RabbitMqQueueConfig类中的directExchange和bindingDirect方法rabbitTemplate.setConfirmCallback(confirmCallbackService());//生产者发送消息到Mq,交换机发送到队列回执,一定要设置手动设置Mandatory(true),配置文件中不生效//可以理解为:消息推送到server,但是在server里找不到队列//如果想看效果【先清除交换机和队列】:如果之前看过setConfirmCallback效果,先去掉RabbitMqQueueConfig类中注释//           在工程运行前注释掉RabbitMqQueueConfig类中的directQueue和bindingDirect方法rabbitTemplate.setReturnCallback(returnCallbackService());rabbitTemplate.setMandatory(true);return rabbitTemplate;}//生产者发送消息到Mq交换机回执 //可以理解为:消息推送到server,但是在server里找不到交换机class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {//log.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData, ack, cause);System.out.println(correlationData);System.out.println(ack);System.out.println(cause);System.out.println("--------");} else {System.out.println("消息发送异常!");//可以进行重发等操作//这里可以处理失败的业务}}}//生产者发送的消息到Mq队列回执 //可以理解为:消息推送到server,但是在server里找不到队列class ReturnCallbackService implements RabbitTemplate.ReturnCallback {//public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {System.out.println(message.getMessageProperties().getMessageId());System.out.println(new String(message.getBody()));System.out.println(i);System.out.println(s);System.out.println(s1);System.out.println(s2);//可以将消息存储到一个新的位置,这里可以处理失败的业务}}}

4.RabbitMqQueueConfig

package com.dev.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 类名称:** @author 李庆伟* @date 2024年03月04日 14:12*/
@Configuration
public class RabbitMqQueueConfig {//绑定键public final static String QUEUE_ONE = "loss_queue";public final static String EXCHANGE_ONE = "loss_exchange";@Beanpublic Queue directQueue() {return new Queue(RabbitMqQueueConfig.QUEUE_ONE);}//Direct交换机 起名:directExchange@BeanDirectExchange directExchange() {return new DirectExchange(RabbitMqQueueConfig.EXCHANGE_ONE,true,false);}//绑定  将队列和交换机绑定, 并设置用于匹配键:directRoutingKey@BeanBinding bindingDirect() {return BindingBuilder.bind(directQueue()).to(directExchange()).with("directRoutingKey");}}

5.RabbitContoller

package com.dev.controller;import com.alibaba.fastjson.JSONObject;
import com.dev.config.RabbitMqQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** 类名称:消息丢失问题** @author lqw* @date 2024年02月27日 14:47*/
@Slf4j
@RestController
@RequestMapping("loss")
public class RabbitController {@AutowiredRabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法/*** 消息丢失* @return*/@GetMapping("/sendMessage")public String sendMessage() {String id = UUID.randomUUID().toString().replace("-","");Map<String,Object> map = new HashMap<>();map.put("id",id);map.put("name","张龙");Message msg = MessageBuilder.withBody(JSONObject.toJSONString(map).getBytes()).setMessageId(id).build();rabbitTemplate.convertAndSend(RabbitMqQueueConfig.EXCHANGE_ONE, "directRoutingKey", msg);return "ok";}}

6.App

package com.dev;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** 类名称:** @author 李庆伟* @date 2024年03月04日 14:11*/
@SpringBootApplication
public class App {public static void main(String[] args) {SpringApplication.run(App.class);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/720669.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL中有事务无法回滚的语句?

目录 0.从修改表结构语句开始 1.DDL(Data Definition Language) 数据定义语言 2.DCL(Data Control Language) 数据控制语言 3.在该事务还没提交时开启新事务 4.锁操作 5.行政声明语句 6.主从复制的从机操作 7.如何避免出现隐式提交导致的错误 0.从修改表结构语句开始 试…

tomcat nginx 动静分离

实验目的:当访问静态资源的时候&#xff0c;nginx自己处理 当访问动态资源的时候&#xff0c;转给tomcat处理 第一步 关闭防火墙 关闭防护 代理服务器操作&#xff1a; 用yum安装nginx tomcat &#xff08;centos 3&#xff09;下载 跟tomcat&#xff08;centos 4&#xff0…

Ansible-Playbook

目录 1、概念介绍 roles 角色 playbook 核心元素 ansible-playbook 命令 playbook 简单案例 2、Ansible 变量 自定义变量 facts 变量 Palybook 部署 LAMP ansible 端安装 LAMP playbook 系统环境脚本 构建 httpd 任务 构建 mariadb 任务 构建 php 任务 编写整个任务…

2024年【陕西省安全员C证】考试资料及陕西省安全员C证找解析

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 陕西省安全员C证考试资料根据新陕西省安全员C证考试大纲要求&#xff0c;安全生产模拟考试一点通将陕西省安全员C证模拟考试试题进行汇编&#xff0c;组成一套陕西省安全员C证全真模拟考试试题&#xff0c;学员可通过…

Netty权威指南——基础篇4 网络通信基础

1 TCP粘包/拆包 TCP是个“流”协议&#xff0c;所谓流&#xff0c;就是没有界限的一串数字。可以想象河里流水&#xff0c;是连成一片的&#xff0c;其间没有分界线。TCP底层并不了解上层业务数据的具体含义&#xff0c;它会根据TCP缓冲区的实际情况进行包的划分&#xff0c;一…

Vue.js的单向数据流:让你的应用更清晰、更可控

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

全局渐变滚动条样式

效果如下&#xff1a; APP.vue<style> /* 整个滚动条 */ ::-webkit-scrollbar {width: 5px;height: 10px; } /* 滚动条上的滚动滑块 */ ::-webkit-scrollbar-thumb {background-color: #49b1f5;/* 关键代码 */background-image: -webkit-linear-gradient(45deg,rgba(255,…

使用Go的encoding/asn1库处理复杂数据:技巧与最佳实践

使用Go的encoding/asn1库处理复杂数据&#xff1a;技巧与最佳实践 引言ASN.1 基础ASN.1与Go语言的关系ASN.1数据类型 encoding/asn1库概览主要功能和特性关键API应用场景 基本使用方法序列化&#xff08;编码&#xff09;反序列化&#xff08;解码&#xff09;处理复杂数据结构…

npm、cnpm、pnpm使用详细

简介&#xff1a; npm&#xff1a;npm&#xff08;Node Package Manager&#xff09;是Node.js的包管理工具&#xff0c;用于安装、更新、卸载Node.js的模块和包。它提供了一个命令行界面&#xff0c;使得开发者可以轻松地管理项目依赖。npm 是 nodejs 中的一部分&#xff0c;…

Pytorch学习 day01(Jupyter安装、常用函数、三种编辑器的对比)

Jupyter 安装过程中遇到的问题&#xff1a; Anaconda的base环境会自动安装Jupyter&#xff0c;但是如果我们要在其他环境中安装Jupyter&#xff0c;就需要注意&#xff0c;该环境的python版本不能高于3.11&#xff0c;且用以下代码安装&#xff1a; conda install nb_conda_…

什么是跨站脚本攻击(XSS)

厦门微思网络​​​​​​https://www.xmws.cn 华为认证\华为HCIA-Datacom\华为HCIP-Datacom\华为HCIE-Datacom Linux\RHCE\RHCE 9.0\RHCA\ Oracle OCP\CKA\K8S\ CISP\CISSP\PMP\ ​ 跨站脚本攻击&#xff08;Cross-site Scripting&#xff0c;通常称为XSS&#xff09;&#xf…

SpringCloud-RabbitMQ消息模型

本文深入介绍了RabbitMQ消息模型&#xff0c;涵盖了基本消息队列、工作消息队列、广播、路由和主题等五种常见消息模型。每种模型都具有独特的特点和适用场景&#xff0c;为开发者提供了灵活而强大的消息传递工具。通过这些模型&#xff0c;RabbitMQ实现了解耦、异步通信以及高…

深度学习系列61:在CPU上运行大模型

1. 快速版 1.1 llamafile https://github.com/Mozilla-Ocho/llamafile 直接下载就可以用&#xff0c;链接为&#xff1a;https://huggingface.co/jartine/llava-v1.5-7B-GGUF/resolve/main/llava-v1.5-7b-q4.llamafile?downloadtrue 启动&#xff1a;./llava-v1.5-7b-q4.lla…

提升效率的电脑定时工具,AutoOff软件推荐

今天最软库给大家带来一款非常实用的电脑定时关机软件在我们日常办公的时候有的时候需要上传一些资料由于我们下班了&#xff0c;我们想让他上传完成之后我们才离开这时候呢&#xff0c;就可以用到这款定时工具了。 我们可以设置中设置在几小时或者几分钟之后让电脑进行关机我们…

基于springboot+vue的新闻资讯系统

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

【HarmonyOS】DevEco Studio3.1x安装教程

目录 下载安装开发工具配置 下载 开发工具点击跳转→https://hmxt.org/deveco-studio 点击‘立即下载’ 安装 将安装包解压 双击安装&#xff0c;点击“Next” 选择安装目录&#xff0c;点击Next 勾选创建桌面快捷方式和环境的添加&#xff0c;点击Next 点击Instal…

Python实现ROC工具判断信号:股票技术分析的工具系列(7)

Python实现ROC工具判断信号&#xff1a;股票技术分析的工具系列&#xff08;7&#xff09; 介绍算法公式 代码rolling函数介绍完整代码data代码ROC.py 介绍 ROC&#xff08;变动率指标&#xff09;是一种技术分析指标&#xff0c;用于衡量价格变动的速度和幅度&#xff0c;计算…

LCR 127. 跳跃训练

解题思路&#xff1a; 动态规划&#xff0c;类似于斐波那契数列&#xff0c;但需要根据题意调整初始值dp[0]和dp[1]&#xff0c;递推公式由最后一跳跳一次或两次得到。 class Solution {public int trainWays(int num) {if(num<1) return 1;int[] dpnew int[num1];//当平台…

抖音视频评论批量采集软件|视频下载工具

《轻松搞定&#xff01;视频评论批量采集软件&#xff0c;助您高效工作》 在短视频这个充满活力和创意的平台上&#xff0c;了解用户评论是了解市场和观众心声的重要途径之一。为了帮助您快速获取大量视频评论数据&#xff0c;我们推出了一款操作便捷、功能强大的软件&#xff…

JVM运行时数据区——堆

文章目录 1、堆的核心概述1.1、JVM实例与堆内存的对应关系1.2、堆与栈的关系1.3、JVM堆空间划分 2、设置堆内存大小与内存溢出2.1、设置堆内存大小2.2、内存溢出案例 3、新生代与老年代4、图解对象分配过程5、Minor GC、Major GC、Full GC5.1、GC的分类5.2、分代式GC策略的触发…