如何保证消息不丢失?——使用rabbitmq的死信队列!

如何保证消息不丢失?——使用rabbitmq的死信队列!

1、什么是死信

在 RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现。
死信就是消息在特定场景下的一种表现形式,这些场景包括:

    1. 消息被拒绝访问,即 RabbitMQ返回 basicNack 的信号时 或者拒绝basicReject
    1. 消费者发生异常,超过重试次数 。 其实spring框架调用的就是 basicNack
    1. 消息的Expiration 过期时长或队列TTL过期时间。
    1. 消息队列达到最大容量

上述场景经常产生死信,即消息在这些场景中时,被称为死信。

2、什么是死信队列

死信队列就是用于储存死信的消息队列,在死信队列中,有且只有死信构成,不会存在其余类型的消息。
死信队列在 RabbitMQ 中并不会单独存在,往往死信队列都会绑定这一个普通的业务消息队列,当所绑定的消息队列中,有消息变成死信了,那么这个消息就会重新被交换机路由到指定的死信队列中去,我们可以通过对这个死信队列进行监听,从而手动的去对这一消息进行补偿。 人工干预
在这里插入图片描述

3、那么,我们到底如何来使用死信队列呢?

死信队列基本使用,只需要在声明业务队列的时候,绑定指定的死信交换机和RoutingKey即可。

生产者

/** Copyright (c) 2020, 2024, fpl1116.cn All rights reserved.**/
package com.fpl.provider;import com.fpl.model.OrderingOk;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;/*** <p>Project: spring-rabbitmq - DeadProvider</p>* <p>Powered by fpl1116 On 2024-04-09 11:35:12</p>* <p>描述:<p>** @author penglei* @version 1.0* @since 1.8*/
@Service
public class DeadProvider {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(OrderingOk orderingOk) {rabbitTemplate.convertAndSend("Direct_E01", "RK01", orderingOk,new MessagePostProcessor(){@Overridepublic Message postProcessMessage(Message message) throws AmqpException {int id  = orderingOk.getId();int expiration = 0;if(id == 1){expiration = 50*1000;}else if(id == 2){expiration = 40*1000;}else if(id ==3){expiration = 30*1000;}else if(id ==4){expiration = 20*1000;}else if(id ==5){expiration = 10*1000;}//为每个消息设置过期时长,但是有可能造成最前面的一个消息未过期一直阻塞后面的消息不能被消费message.getMessageProperties().setExpiration(String.valueOf(expiration));return message;}});}
}

消费者

/** Copyright (c) 2020, 2024, fpl1116.cn All rights reserved.**/
package com.fpl.consumers;import com.fpl.model.OrderingOk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;/*** <p>Project: spring-rabbitmq - DeadConsumer</p>* <p>Powered by fpl1116 On 2024-04-09 11:32:59</p>* <p>描述:<p>** @author penglei* @version 1.0* @since 1.8*/
//@Configuration
@Slf4j
public class DeadConsumer {//死信交换机@Beanpublic DirectExchange deadExchange(){return  ExchangeBuilder.directExchange("Dead_E01").build();}//死信队列@Beanpublic Queue deadQueue1(){return   QueueBuilder.durable("Dead_Q01").build();}//死信交换机与死信队列的绑定@Beanpublic Binding deadBinding1(Queue deadQueue1,DirectExchange deadExchange){return BindingBuilder.bind(deadQueue1).to(deadExchange).with("RK_DEAD");}//业务队列@Beanpublic Queue queue1(){return   QueueBuilder.durable("Direct_Q01").deadLetterExchange("Dead_E01").deadLetterRoutingKey("RK_DEAD")//.ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息//.maxLength(20) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过20,会把队列之前的消息依次放进死信队列.build();}//业务交换机@Beanpublic DirectExchange exchange(){return  ExchangeBuilder.directExchange("Direct_E01").build();}//业务交换机与队列的绑定@Beanpublic Binding binding1(Queue queue1,DirectExchange exchange){return BindingBuilder.bind(queue1).to(exchange).with("RK01");}//    @RabbitListener(queues = "Direct_Q01")
//    public void receiveMessage(OrderingOk msg,Message message, Channel channel) throws IOException {
//
//        long deliveryTag = message.getMessageProperties().getDeliveryTag();
//
//        System.out.println("消费者1 收到消息:"+ msg +" tag:"+deliveryTag);
//
//        channel.basicReject(deliveryTag, false);
//        try {
//            // 处理消息...
//            int  i= 5/0;
//            // 如果处理成功,手动发送ack确认 ,Yes
//            channel.basicAck(deliveryTag, false);
//        } catch (Exception e) {
//            // 处理失败,可以选择重试或拒绝消息(basicNack或basicReject)  NO
//            channel.basicNack(deliveryTag, false, false); // 并重新入队
//
//        }
}//}

测试

@Testvoid test4() throws IOException {for (int i = 1; i <=5;i++){OrderingOk orderingOk = OrderingOk.builder().id(i).name("fpl " + i).build();deadProvider.send(orderingOk);System.out.println("发送成功:"+i);}System.in.read();}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/814049.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IDEA中sql语句智能提示设置

选中一句sql语句&#xff0c;点击鼠标右键 指定数据库

【MATLAB源码-第47期】基于matlab的GMSK调制解调仿真,输出误码率曲线,采用相干解调。

操作环境&#xff1a; MATLAB 2022a 1、算法描述 GMSK&#xff08;高斯最小移相键控&#xff09;是数字调制技术的一种。下面是关于GMSK调制解调、应用场景以及其优缺点的详细描述&#xff1a; 1. 调制解调&#xff1a; - 调制&#xff1a;GMSK是一种连续相位调制技术&a…

世界需要和平--中介者模式

1.1 世界需要和平 "你想呀&#xff0c;国与国之间的关系&#xff0c;就类似于不同的对象与对象之间的关系&#xff0c;这就要求对象之间需要知道其他所有对象&#xff0c;尽管将一个系统分割成许多对象通常可以增加其可复用性&#xff0c;但是对象间相互连接的激增又会降低…

迈威通信MaxGate800系列工业边缘计算网关,算力硬核中枢的巅峰之作

随着人们对工业物联网领域的深入了解与实践&#xff0c;越来越多的企业开始寻求将计算业务从云端迁移至网络边缘处理与执行。然而&#xff0c;在实际应用中&#xff0c;开发者面临着诸多挑战。为了解决这些问题&#xff0c;迈威通信MaxGate800系列工业智能网关应运而生。 MaxG…

2024年公共管理、健康与大数据国际学术会议(ICPAHBD 2024)

2024 International Conference on Public Administration, Health and Big Data (ICPAHBD 2024) ●会议简介 2024年公共管理、健康与大数据国际学术会议&#xff08;ICPAHBD 2024&#xff09;即将在宁波盛大召开。本次会议旨在汇聚全球公共管理、健康与大数据领域的专家学者…

【LAMMPS学习】八、基础知识(2.2)类型标签

8. 基础知识 此部分描述了如何使用 LAMMPS 为用户和开发人员执行各种任务。术语表页面还列出了 MD 术语&#xff0c;以及相应 LAMMPS 手册页的链接。 LAMMPS 源代码分发的 examples 目录中包含的示例输入脚本以及示例脚本页面上突出显示的示例输入脚本还展示了如何设置和运行各…

基于ssm的校园短期闲置资源置换平台(java项目+文档+源码)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于ssm的校园短期闲置资源置换平台。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 校园短期闲置资源置换…

大话设计模式——9.单例模式(Singleton Pattern)

简介 确保一个类只有一个实例&#xff0c;并提供全局访问点来获取该实例&#xff0c;是最简单的设计模式。 UML图&#xff1a; 单例模式共有两种创建方式&#xff1a; 饿汉式&#xff08;线程安全&#xff09; 提前创建实例&#xff0c;好处在于该实例全局唯一&#xff0c;不…

C程序的编译

经过预处理后的源文件,退去一切包装,注释被删除,各种预处理命令也基本上被处理掉,剩下的就是原汁原味的C代码了。接下来的第二步,就开始进入编译阶段。编译阶段主要分两步:第一步,编译器调用一系列解析工具,去分析这些C代码,将C源文件编译为汇编文件;第二步,通过汇编…

anaconda创建了虚拟python环境,且安装了pytorch,但是pycharm中import torch运行时报错

报错如下&#xff1a; C:\Users\tashi\.conda\envs\test1\python.exe D:\project\python\transformer\main.py C:\Users\tashi\.conda\envs\test1\lib\site-packages\numpy\__init__.py:127: UserWarning: mkl-service package failed to import, therefore Intel(R) MKL init…

AI预测体彩排3第2弹【2024年4月13日预测--第1套算法开始计算第2次测试】

各位小伙伴&#xff0c;今天实在抱歉&#xff0c;周末回了趟老家&#xff0c;回来比较晚了&#xff0c;数据今天上午跑完后就回老家了&#xff0c;晚上8点多才回来&#xff0c;赶紧把预测结果发出来吧&#xff0c;虽然有点晚了&#xff0c;但是咱们前面说过了&#xff0c;目前的…

将Visio绘图导出PDF文件,使其自适应大小,并去掉导入Latex的边框显示

问题描述 将Visio绘图导成pdf文件&#xff0c;首先在Visio绘图如下&#xff1a; 如果直接导出或者另存为pdf文件&#xff0c;则会发现pdf文件是整个页面大小&#xff0c;而不是图片大小。而且在导入latex等排版工具现实时&#xff0c;会显示边框。 问题解决 1.调整Visio中的页…

vox2vec论文速读

vox2vec: A Framework for Self-supervised Contrastive Learning of Voxel-Level Representations in Medical Images 摘要 本文介绍了 vox2vec——一种体素级表示的自监督学习 &#xff08;SSL&#xff09; 对比方法 vox2vec 表示由特征金字塔网络 &#xff08;FPN&#xf…

Cascader 级联选择器 - 选择器最后一级数据为空

原因&#xff1a;将扁平数据转化为树形数据时&#xff0c;给每个项都添加了 children export const transList2Tree (list, rootPid) > {const result []list.forEach(item > {if (item.pid rootPid) {const children transList2Tree(list, item.id)item.children …

Unity绘制地图

首先在项目/Assets文件夹下创建一个Tiles文件夹 在层级下点击鼠标右键选择2D对象选择瓦片地图创建Tilemap。 选择地图素材 如果素材需要裁剪&#xff0c;在检查器Sprite模式选择多个&#xff0c;点击Sprite Editor,选择切 &#xff0c;选择类型Grid By Cell Count&#xff0c;…

BoostCompass(建立正排索引和倒排索引模块)

阅读导航 一、模块概述二、编写正排索引和倒排索引模块✅安装 jsoncpp✅Jieba分词库的安装1. 代码基本框架2. 正排索引的建立3. 倒排索引的建立 三、整体代码⭕index.hpp 一、模块概述 这个模块我们定义了一个名为Index的C类&#xff0c;用于构建和维护一个文档索引系统。该系…

【计算机毕业设计】停车场管理系统——后附源码

&#x1f389;**欢迎来到琛哥的技术世界&#xff01;**&#x1f389; &#x1f4d8; 博主小档案&#xff1a; 琛哥&#xff0c;一名来自世界500强的资深程序猿&#xff0c;毕业于国内知名985高校。 &#x1f527; 技术专长&#xff1a; 琛哥在深度学习任务中展现出卓越的能力&a…

Harmony鸿蒙南向驱动开发-SDIO接口使用

功能简介 SDIO是安全数字输入输出接口&#xff08;Secure Digital Input and Output&#xff09;的缩写&#xff0c;是从SD内存卡接口的基础上演化出来的一种外设接口。SDIO接口兼容以前的SD卡&#xff0c;并且可以连接支持SDIO接口的其他设备。 SDIO接口定义了操作SDIO的通用…

21 标准错误

标准输出重定向关闭无数据 下面的代码&#xff1a; #include <stdio.h> #include <string.h> #include <stdlib.h> #include <unistd.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h>int main() {close(1);i…

Xilinx Zynq UltraScale+ MPSoC无人机控制器

官方术语是无人驾驶飞行器&#xff08;UAV&#xff09;&#xff0c;这显然有点拗口&#xff0c;所以我们更喜欢说无人机。在过去的几十年里&#xff0c;无人机技术有了巨大的进步。我们为一个客户开发了一个无人机的飞行和视频控制器。 客户挑战 客户需要一种混合FPGA/CPU硬件&…