Rabbitmq消息丢失-消费者消息丢失(二)

说明:消费端在处理消息的过程中出现异常,例如:业务逻辑异常,或者消费者被停机,或者网络断开连接等,以上等情况使消息没有得到正确恰当的处理,也会使消息丢失。

分析:分析就是说明中的例如!

解决:ACK确认机制

所谓的ACK就是:首先关闭自动确认【自动ACK】,消费者收到一个消息后,就可以发一个确认【ACK】给MQ,当然什么时候发送确认【ACK】是程序员决定的,也就是说每次在确保处理完这个消息相关的业务后,程序员可以手动发送确认【ACK】,之后把消息从MQ中干掉!这样即使出现了异常也可以有效的消费消息。

工程图:

1.pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><artifactId>spring-boot-starter-parent</artifactId>  <!-- 被继承的父项目的构件标识符 --><groupId>org.springframework.boot</groupId>  <!-- 被继承的父项目的全球唯一标识符 --><version>2.2.2.RELEASE</version>  <!-- 被继承的父项目的版本 --></parent><groupId>MqLossDemoTwo</groupId><artifactId>MqLossDemoTwo</artifactId><version>1.0-SNAPSHOT</version><packaging>war</packaging><name>MqLossDemoTwo Maven Webapp</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><!--spring boot核心--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--spring boot 测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--springmvc web--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--开发环境调试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><optional>true</optional></dependency><!--amqp 支持--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.1.7.RELEASE</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.78</version></dependency><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.6</version></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.10</version></dependency></dependencies><build><finalName>MqLossDemoTwo</finalName><pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --><plugins><plugin><artifactId>maven-clean-plugin</artifactId><version>3.1.0</version></plugin><!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging --><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin><plugin><artifactId>maven-war-plugin</artifactId><version>3.2.2</version></plugin><plugin><artifactId>maven-install-plugin</artifactId><version>2.5.2</version></plugin><plugin><artifactId>maven-deploy-plugin</artifactId><version>2.8.2</version></plugin></plugins></pluginManagement></build>
</project>

2.application.yml

server:port: 8080
spring:rabbitmq:port: 5672host: 你的 rabbitmq IPusername: adminpassword: adminvirtual-host: /listener:simple:concurrency: 10max-concurrency: 10prefetch: 1auto-startup: truedefault-requeue-rejected: true# 设置消费端手动 ackacknowledge-mode: manual# 是否支持重试retry:enabled: true

3.RabbitMqQueueConfig

package com.dev.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 类名称:** @author 李庆伟* @date 2024年03月04日 14:12*/
@Configuration
public class RabbitMqQueueConfig {//绑定键public final static String QUEUE_ONE = "loss_queue";public final static String EXCHANGE_ONE = "loss_exchange";@Beanpublic Queue directQueue() {return new Queue(RabbitMqQueueConfig.QUEUE_ONE);}//Direct交换机 起名:directExchange@BeanDirectExchange directExchange() {return new DirectExchange(RabbitMqQueueConfig.EXCHANGE_ONE,true,false);}//绑定  将队列和交换机绑定, 并设置用于匹配键:directRoutingKey@BeanBinding bindingDirect() {return BindingBuilder.bind(directQueue()).to(directExchange()).with("directRoutingKey");}}

4.RabbitController

package com.dev.controller;import com.alibaba.fastjson.JSONObject;
import com.dev.config.RabbitMqQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** 类名称:消息丢失问题** @author lqw* @date 2024年02月27日 14:47*/
@Slf4j
@RestController
@RequestMapping("loss")
public class RabbitController {@AutowiredRabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法/*** 消息丢失* @return*/@GetMapping("/sendMessage")public String sendMessage() {String id = UUID.randomUUID().toString().replace("-","");Map<String,Object> addMap = new HashMap<>();//添加用户信息addMap.put("id",id);addMap.put("name","张龙");Message msg = MessageBuilder.withBody(JSONObject.toJSONString(addMap).getBytes()).setMessageId(id).build();rabbitTemplate.convertAndSend(RabbitMqQueueConfig.EXCHANGE_ONE, "directRoutingKey", msg);return "ok";}}

5.RabbitMqListener

package com.dev.listener;import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** 类名称:** @author 李庆伟* @date 2024年03月04日 16:54*/@Component
public class RabbitMqListener {@RabbitListener(queues = "loss_queue")@RabbitHandlerpublic void process(Message msg, Channel channel) {System.out.println("Rabbitmq Direct : " + msg);//设置的唯一id,可以用来处理重复消费String id = msg.getMessageProperties().getMessageId();//消息队列自身设置的唯一标识long tag = msg.getMessageProperties().getDeliveryTag();//int a = 1/0;try {//监听到要添加的用户信息String dataStr = new String(msg.getBody());Map<String,Object> addMap = JSON.parseObject(dataStr,Map.class);//先去redis中查询是否已经添加过了该用户//如果未添加[重复消费]if(true){ //如果未添加//添加用户业务//....add(addMap);//告诉队列该消息已经消费channel.basicAck(tag,false);} else {//如果已添加//告诉队列该消息已经消费channel.basicAck(tag,false);}} catch (Exception e) {try {//tag:消息序号//multiple:消息的标识,是否确认多条,false只确认当前一个消息收到,//                               true确认所有consumer获得的消息(成功消费,消息从队列中删除)//requeue:是否要退回到队列 true 将消息再次放到mq队列中,false是不把消息放到队列中channel.basicNack(tag, true, false);//channel.basicNack(tag, true, true); //如果能走到此处,这样会把消息在放到队列中,会在次被监听到,陷入死循环//channel.basicNack(tag, false, true); //如果能走到此处,这样会把消息在放到队列中,会在次被监听到,陷入死循环//channel.basicNack(tag, false, false); //如果能走到此处,如果是扇形交换机,其他消费者也会再次消费此信息//可以把添加用户业务数据保存起来//...} catch (Exception ex) {ex.printStackTrace();}}}}

6.App

package com.dev;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** 类名称:** @author 李庆伟* @date 2024年03月04日 14:11*/
@SpringBootApplication
public class App {public static void main(String[] args) {SpringApplication.run(App.class);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/719790.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Composer基础使用 SDK包初始化

Composer 的工作原理 我们在使用 Composer 之前我们得了解一下它的实现原理&#xff0c;它主要由三个部分组成&#xff1a;命令行工具、包仓库、代码库&#xff1a; Packagist 它是官方仓库&#xff0c;也就是我们平常说的 Composer 源&#xff0c;它的作用是存储这些包的信息…

参数引入和全局变量引入实现-目标和

LCR 102. 目标和 - 力扣&#xff08;LeetCode&#xff09; 分析题意&#xff0c;画出决策树&#xff0c;其他的思路都跟前面讲过的类似&#xff1a; 全局变量引入实现&#xff1a; 全局变量的引入&#xff0c;需要手动处理回溯&#xff1b; class Solution {int ret; //…

从0到1全流程使用 segment-anything

从0到1全流程使用 segment-anything 一、安装 anaconda 一、下载 anaconda 二、以管理员身份运行安装 1、勾选 Just Me 2、统一安装路径(后续 python 等包也安装至此目录) 3、勾选 add to path 然后安装即可。 三、修改 Anaconda 默认路径及默认缓存路径 Anaconda 默认下…

#QT(DEMO)

1.IDE&#xff1a;QTCreator 2.实验&#xff1a;打印"hello wolrd" 3.记录 &#xff08;1&#xff09;创建一个新工程&#xff1a; 新建好一个工程存放文件夹&#xff08;路径不能有中文&#xff09;,然后按下图配置 &#xff08;2&#xff09;点击widgets.ui拖入以…

真香定律!我用这种模式重构了第三方登录

分享是最有效的学习方式。 博客&#xff1a;https://blog.ktdaddy.com/ 老猫的设计模式专栏已经偷偷发车了。不甘愿做crud boy&#xff1f;看了好几遍的设计模式还记不住&#xff1f;那就不要刻意记了&#xff0c;跟上老猫的步伐&#xff0c;在一个个有趣的职场故事中领悟设计模…

2023人机交互期末复习

考试题型及分值分布 1、选择题&#xff08;10题、20分&#xff09; 2、填空题&#xff08;10题、20分&#xff09; 3、判断题&#xff08;可选、5题、10分&#xff09; 4、解答题&#xff08;5~6题、30分&#xff09; 5、分析计算题&#xff08;1~2题、20分&#xff09; 注意&…

PHP+MySQL实现后台管理系统增删改查之够用就好

说明 最近要给博客弄个后台&#xff0c;不想搞得很复杂&#xff0c;有基本的增删改查就够了&#xff0c;到网上找了一圈发现这个不错&#xff0c;很实用&#xff0c;希望可以帮到大家&#xff0c;需要的朋友评论区留下邮箱&#xff0c;我安排发送。 演示效果 项目介绍 本项目…

带使能控制的锂电池充放电解决方案

一、产品概述 TP4594R 是一款集成线性充电管理、同步升压转换、电池电量指示和多种保护功能的单芯片电源管理 SOC&#xff0c;为锂电池的充放电提供完整的单芯片电源解决方案。 TP4594R 内部集成了线性充电管理模块、同步升压放电管理模块、电量检测与 LED 指示模块、保护模块…

关于python函数参数传递

参数传递 在 python 中&#xff0c;类型属于对象&#xff0c;对象有不同类型的区分&#xff0c;变量是没有类型的&#xff1a; 在下面的代码示例重&#xff0c;[1,2,3] 是 List 类型&#xff0c;“qayrup” 是 String 类型&#xff0c;而变量 a 是没有类型&#xff0c;它仅仅…

#WEB前端

1.实验&#xff1a;vscode安装&#xff0c;及HTML常用文本标签 2.IDE&#xff1a;VSCODE 3.记录&#xff1a; &#xff08;1&#xff09;网页直接搜索安装vscode &#xff08;2&#xff09;打开vscode&#xff0c;在下图分别安装以下插件&#xff1a; Html Css Support …

C++11线程同步之互斥锁

C11线程同步之互斥锁 std::mutex成员函数线程同步 std::lock_guardstd::recursive_mutexstd::timed_mutex 进行多线程编程&#xff0c;如果多个线程需要对同一块内存进行操作&#xff0c;比如&#xff1a;同时读、同时写、同时读写对于后两种情况来说&#xff0c;如果不做任何的…

《互联网的世界》第四讲-拥塞控制与编码

需要澄清的一个误区是&#xff0c;拥塞绝不是发送的数据量太大导致&#xff0c;而是数据在极短的时间段内到达了同一个地方以至于超过了网络处理容量导致&#xff0c;拥塞的成因一定要考虑时间因素。换句话说&#xff0c;拥塞由大突发导致。 只要 pacing&#xff0c;再多的数据…

动态规划(算法竞赛、蓝桥杯)--树形DP树形背包

1、B站视频链接&#xff1a;E18 树形DP 树形背包_哔哩哔哩_bilibili #include <bits/stdc.h> using namespace std; const int N110; int n,V,p,root; int v[N],w[N]; int h[N],to[N],ne[N],tot; //邻接表 int f[N][N];void add(int a,int b){to[tot]b;ne[tot]h[a];h[a…

数仓项目6.0(一)

尚硅谷大数据项目【电商数仓6.0】企业数据仓库项目_bilibili 数据流转过程 用户➡️业务服务器➡️数据库存储➡️数仓统计分析➡️数据可视化 数据仓库处理流程&#xff1a;数据源➡️加工数据➡️统计筛选数据➡️分析数据 数据库不是为了数据仓库服务的&#xff0c;需要…

B084-SpringCloud-Zuul Config

目录 zuul系统架构和zuul的作用zuul网关实现配置映射路径过滤器 Config概述云端管理本地配置 zuul zuul是分布式和集群后前端统一访问入口 系统架构和zuul的作用 zuul把自己注册进eureka&#xff0c;然后可通过前端传来的服务名发现和访问对应的服务集群 为了预防zuul单点故…

【OJ】求和与计算日期

文章目录 1. 前言2. JZ64 求123...n2.1 题目分析2.2 代码 3. HJ73 计算日期到天数转换3.1 题目分析3.2 代码 4. KY222 打印日期4.1 题目分析4.2 代码 1. 前言 下面两个题目均来自牛客&#xff0c;使用的编程语言是c&#xff0c;分享个人的一些思路和代码。 2. JZ64 求123…n …

毕业生信息招聘平台|基于springboot+ Mysql+Java的毕业生信息招聘平台设计与实现(源码+数据库+文档+PPT)

目录 论文参考 摘 要 数据库设计 系统详细设计 文末获取源码联系 论文参考 摘 要 随着社会的发展&#xff0c;社会的各行各业都在利用信息化时代的优势。计算机的优势和普及使得各种信息系统的开发成为必需。 毕业生信息招聘平台&#xff0c;主要的模块包括查看管理员&a…

基于Springboot的无人智慧超市管理系统(有报告)。Javaee项目,springboot项目。

演示视频&#xff1a; 基于Springboot的无人智慧超市管理系统&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系…

【每日刷题】数组-LC56、LC238、随想录1、LC560

1. LC56 合并区间 题目链接 Arrays.sort先让intervals里的子数组按照子数组的第一个数字值从小到大排列。开一个新数组&#xff0c;newInterval&#xff0c;存放合并好的子数组让intervals的当前子数组i的第一个数字与newInterval的当前子数组index的最后一个数字比较大小&am…

Intel/国产化无人叉车机器视觉专用控制器

无人叉车和机器视觉是两个独立的技术领域&#xff0c;但它们可以结合使用以实现更高效的物流自动化。无人叉车是一种自动化运输工具&#xff0c;可以在没有人为干预的情况下完成货物的搬运和运输。机器视觉是一种人工智能技术&#xff0c;可以让计算机识别和理解图像或视频中的…