RabbitMQ开启消息发送确认和消费手动确认

开启RabbitMQ的生产者发送消息到RabbitMQ服务端的接收确认(ACK)和消费者通过手动确认或者丢弃消费的消息。
通过配置 publisher-confirm-type: correlatedpublisher-returns: true开启生产者确认消息。

server:port: 8014spring:rabbitmq:username: adminpassword: 123456dynamic: true
#    port: 5672
#    host: 192.168.49.9addresses: 192.168.49.10:5672,192.168.49.9:5672,192.168.49.11:5672publisher-confirm-type: correlatedpublisher-returns: trueapplication:name: shushandatasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://ip/shushanusername: rootpassword: hikari:minimum-idle: 10maximum-pool-size: 20idle-timeout: 50000max-lifetime: 540000connection-test-query: select 1connection-timeout: 600000

RabbitConfig :

package com.kexuexiong.shushan.common.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {log.info("confirmCallback  data: " + correlationData);log.info("confirmCallback ack :" + ack);log.info("confirmCallback cause :" + cause);});rabbitTemplate.setReturnsCallback(returned -> log.info("returnsCallback msg : " + returned));return rabbitTemplate;}
}

AckReceiver 手动确认消费者:

package com.kexuexiong.shushan.common.mq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Map;
import java.util.Objects;@Slf4j
@Component
public class AckReceiver implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();byte[] messageBody = message.getBody();try (ObjectInputStream inputStream = new ObjectInputStream(new ByteArrayInputStream(messageBody));) {Map<String, String> msg = (Map<String, String>) inputStream.readObject();log.info(message.getMessageProperties().getConsumerQueue()+"-ack Receiver :" + msg);log.info("header msg :"+message.getMessageProperties().getHeaders());if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.BUSINESS_QUEUE)){channel.basicNack(deliveryTag,false,false);}else if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.DEAD_LETTER_QUEUE)){channel.basicAck(deliveryTag, true);}else {channel.basicAck(deliveryTag, true);}} catch (Exception e) {channel.basicReject(deliveryTag, false);log.error(e.getMessage());}}
}

通过配置 simpleMessageListenerContainer.setQueueNames(MqConstant.DEAD_LETTER_QUEUE)可以监听多个消息队列。

package com.kexuexiong.shushan.common.mq;import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MessageListenerConfig {@Autowiredprivate CachingConnectionFactory connectionFactory;@Autowiredprivate AckReceiver ackReceiver;@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer() {SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);simpleMessageListenerContainer.setConcurrentConsumers(2);simpleMessageListenerContainer.setMaxConcurrentConsumers(2);simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);//,MqConstant.demoDirectQueue, MqConstant.FANOUT_A, MqConstant.BIG_CAR_TOPICsimpleMessageListenerContainer.setQueueNames(MqConstant.DEAD_LETTER_QUEUE);simpleMessageListenerContainer.setMessageListener(ackReceiver);return simpleMessageListenerContainer;}}
package com.kexuexiong.shushan.controller.mq;import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.RandomUtil;
import com.kexuexiong.shushan.common.mq.MqConstant;
import com.kexuexiong.shushan.controller.BaseController;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;@Slf4j
@RestController
@RequestMapping("/mq/")
public class MqController extends BaseController {@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/callback/sendDirectMessage")public String sendDirectMessageCallback(){String msgId = UUID.randomUUID().toString();String msg = "demo msg ,kexuexiong";String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss");Map<String,Object> map = new HashMap();map.put("msgId",msgId);map.put("msg",msg);map.put("createTime",createTime);rabbitTemplate.convertAndSend("noneDirectExchange","demoDirectRouting",map);return "ok";}@GetMapping("/callback/lonelyDirectExchange")public String lonelyDirectExchange(){String msgId = UUID.randomUUID().toString();String msg = "demo msg ,kexuexiong";String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss");Map<String,Object> map = new HashMap();map.put("msgId",msgId);map.put("msg",msg);map.put("createTime",createTime);rabbitTemplate.convertAndSend(MqConstant.lonelyDirectExchange,"demoDirectRouting",map);return "ok";}
}

测试:

发送dirct消息 找不到交换机情况
在这里插入图片描述

2023-10-10T17:04:58.492+08:00 ERROR 27232 --- [.168.49.10:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noneDirectExchange' in vhost '/', class-id=60, method-id=40)
2023-10-10T17:04:58.492+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : confirmCallback  data: null
2023-10-10T17:04:58.492+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : confirmCallback ack :false
2023-10-10T17:04:58.492+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : confirmCallback cause :channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noneDirectExchange' in vhost '/', class-id=60, method-id=40)

ack 为false。

发送dirct消息 找不到队列
在这里插入图片描述

2023-10-10T17:05:55.851+08:00  INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig   : confirmCallback  data: null
2023-10-10T17:05:55.852+08:00  INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig   : confirmCallback ack :true
2023-10-10T17:05:55.852+08:00  INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig   : confirmCallback cause :null
2023-10-10T17:05:55.865+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : returnsCallback msg : ReturnedMessage [message=(Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=lonelyDirectExchange, routingKey=demoDirectRouting]

ACK为true,replyText=NO_ROUTE。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/101104.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

项目创建 Vue3 + Ts + vite + pinia

vite官网 项目初始化 准备安装工作(按步骤创建) npm init vuelatest创建完成后再次安装对应插件 然后百度配置main.ts里面引入 npm i pinia --save //安装pinia npm i vue-router --save //安装router npm i axios --save //安装axios //安装sass或less npm add -D scss npm…

uniapp-vue3微信小程序实现全局分享

uniapp-vue3微信小程序实现全局分享 文章目录 uniapp-vue3微信小程序实现全局分享微信小程序官方文档的分享说明onShareAppMessage(Object object)onShareTimeline() uniapp 官方文档的分享说明onShareAppMessage(OBJECT) 实现全局分享代码结构如下share.js文件内容main.js注意…

聊聊身边的嵌入式:用了七八年的电动牙刷,突然罢工了!!!

家里用了七八年的电动牙刷&#xff0c;前两天突然罢工。先尝试一下野蛮的修复方法(摔摔打打)&#xff0c;这种独家绝技屡试不爽&#xff0c;曾经修好过收音机&#xff0c;电视机&#xff0c;电子手表… 等等。不过这次&#xff0c;没有成功&#xff01;这周末终于有点儿时间&am…

数据库Mysql三大引擎(InnoDB、MyISAM、 Memory)与逻辑架构

MySQL数据库及其分支版本主要的存储引擎有InnoDB、MyISAM、 Memory等。简单地理解&#xff0c;存储引擎就是指表的类型以及表在计算机上的存储方式。存储引擎的概念是MySQL的特色&#xff0c;使用的是一个可插拔存储引擎架构&#xff0c;能够在运行的时候动态加载或者卸载这些存…

奥威BI系统:做数据可视化大屏,又快又简单

数据可视化大屏的制作难吗&#xff1f;会很花时间精力吗&#xff1f;这就要看用的是什么软件了。如果用的是BI系统&#xff0c;特别是奥威BI系统这类BI商业智能软件&#xff0c;那就是又快又简单。 奥威BI系统介绍&#xff1a; 奥威BI系统是一款高效的数据可视化大屏工具&…

Xilisoft Video Converter Ultimate for Mac:让音视频转换变得更简单

无论是在工作还是娱乐中&#xff0c;我们都会遇到音视频格式不兼容的问题。这时候&#xff0c;一个好用的音视频格式转换工具就显得尤为重要。Xilisoft Video Converter Ultimate for Mac&#xff08;曦力音视频转换&#xff09;就是这样一款让您的音视频转换变得更简单的工具。…

萝卜刀玩具上架亚马逊CPC认证测试标准

含铅或含铅涂料儿童产品的要求 分阶段限制儿童产品所有部件的铅含量&#xff0c;要求在3年内将产品任何可接触部件的铅含量限制从不超过重量的600ppm&#xff08;0.06%&#xff09;降至不超过重量的100ppm&#xff08;0.01%&#xff09;。 铅含量限值&#xff08;总铅含量占重…

Pushgateway的场景使用

1,Pushgateway简介 Pushgateway为Prometheus整体监控方案的功能组件之一,并做为一个独立的工具存在。它主要用于Prometheus无法直接拿到监控指标的场景,如监控源位于防火墙之后,Prometheus无法穿透防火墙;目标服务没有可抓取监控数据的端点等多种情况。在类似场景中,可通…

element树形控件单选

需求功能&#xff1a; 1&#xff0c;element树形控件单选 2&#xff0c;双击节点编辑 <div style"height: calc(100% - 48px)"><el-scrollbar class"scrollbar-wrapper"><el-tree :data"treesObj" show-checkbox default-expan…

stable diffusion艰难炼丹之路

文章目录 概要autoDL系统盘爆满autoDL python3.8切换python3.10dreambooth训练大模型完成后报错 概要 主要是通过autoDL服务器部署stable diffusion&#xff0c;通过dreambooth训练大模型。 问题&#xff1a; autoDL系统盘爆满autoDL python3.8切换python3.10dreambooth训练大…

什么是云计算?云计算简介

其实“云计算”作为一个名词而言&#xff0c;那是相当成功滴。很多人都有听过。但提及云计算”具体是什么?很多人&#xff0c;知其然&#xff0c;却不知其所以然! 利用软件将这些成千上万不可靠的硬件组织成一个稳定可靠的IT系统&#xff0c;以此支撑其公司的IT基础服务。这家…

基于SpringBoot的靓车汽车销售网站

目录 前言 一、技术栈 二、系统功能介绍 用户信息管理 车辆展示管理 车辆品牌管理 用户交流管理 购物车 用户交流 我的订单管理 三、核心代码 1、登录模块 2、文件上传模块 3、代码封装 前言 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的…

vite vue3 pwa 更新提醒

效果 vite-plugin-pwa插件启用pwa后默认会在后台自动更新应用&#xff0c;并在关闭所有已开启的页面并重新打开后激活 通过此方法可以以消息方式提醒用户手动刷新激活更新应用 方法 已经使用vite-plugin-pwa插件启用pwa 修改vite.config.ts export default defineConfig(…

论文阅读——Large Selective Kernel Network for Remote Sensing Object Detection

目录 基本信息标题目前存在的问题改进网络结构另一个写的好的参考 基本信息 期刊CVPR年份2023论文地址https://arxiv.org/pdf/2303.09030.pdf代码地址https://github.com/zcablii/LSKNet 标题 遥感目标检测的大选择核网络 目前存在的问题 相对较少的工作考虑到强大的先验知…

python自动解析301、302重定向链接

嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 &#x1f447; &#x1f447; &#x1f447; 更多精彩机密、教程&#xff0c;尽在下方&#xff0c;赶紧点击了解吧~ python源码、视频教程、插件安装教程、资料我都准备好了&#xff0c;直接在文末名片自取就可 使用模块requ…

8 个 Promise 高级用法

在 js 项目中&#xff0c;promise 的使用应该是必不可少的&#xff0c;但我发现在同事和面试者中&#xff0c;很多中级或以上的前端都还停留在promiseInst.then()、promiseInst.catch()、Promise.all等常规用法&#xff0c;连async/await也只是知其然&#xff0c;而不知其所以然…

Rockchip平台 远程OTA服务搭建

Rockchip平台 远程OTA服务搭建 1. 概述 远程OTA升级服务是一种通过互联网远程更新Rockchip设备的固件和软件的方法。这种服务对于确保设备安全性、修复错误和添加新功能非常重要。 本文档将引导您完成在Rockchip平台上搭建远程OTA升级服务的过程。 在阅读本文的前&#xff…

Vue3新的状态管理库-Pinia(保姆级别教程)

目录 1.什么是Pinia2.为什么使用Pinia3.创建项目4.检查Pinia的安装版本5.main.js引入Pinia6.定义Store-组合式API写法(推荐)7.getters的实现8.action的异步实现9.storeToRefs 1.什么是Pinia Pinia是Vue的专属的最新状态管理库, 是Vuex状态管理工具的替代品 vue.js官网 https:…

L14D6内核模块编译方法

一、内核模块基础代码解析 一个内核模块代码错误仍然会导致的内核崩溃。 GPL协议&#xff1a;开源规定&#xff0c;使用内核一些函数需要 1、单内核的缺点 单内核扩展性差的缺点减小内核镜像文件体积&#xff0c;一定程度上节省内存资源提高开发效率不能彻底解决稳定性低的缺…

【新品发布】四核A53超高性价比!RK3562系列核心板及开发板震撼上市

RK3562系列产品采用 Rockchip 新一代 64 位处理器 RK3562&#xff08;Quad-core ARM Cortex-A53&#xff0c;主频最高 2.0GHz&#xff09;&#xff0c;最大支持 8GB 内存&#xff1b;内置独立的 NPU&#xff0c;可用于轻量级人工智能应用&#xff0c;RK3562 拥有 PCIE2.1 / USB…