RabbitMQ开启消息发送确认和消费手动确认

开启RabbitMQ的生产者发送消息到RabbitMQ服务端的接收确认(ACK)和消费者通过手动确认或者丢弃消费的消息。
通过配置 publisher-confirm-type: correlatedpublisher-returns: true开启生产者确认消息。

server:port: 8014spring:rabbitmq:username: adminpassword: 123456dynamic: true
#    port: 5672
#    host: 192.168.49.9addresses: 192.168.49.10:5672,192.168.49.9:5672,192.168.49.11:5672publisher-confirm-type: correlatedpublisher-returns: trueapplication:name: shushandatasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://ip/shushanusername: rootpassword: hikari:minimum-idle: 10maximum-pool-size: 20idle-timeout: 50000max-lifetime: 540000connection-test-query: select 1connection-timeout: 600000

RabbitConfig :

package com.kexuexiong.shushan.common.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {log.info("confirmCallback  data: " + correlationData);log.info("confirmCallback ack :" + ack);log.info("confirmCallback cause :" + cause);});rabbitTemplate.setReturnsCallback(returned -> log.info("returnsCallback msg : " + returned));return rabbitTemplate;}
}

AckReceiver 手动确认消费者:

package com.kexuexiong.shushan.common.mq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Map;
import java.util.Objects;@Slf4j
@Component
public class AckReceiver implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();byte[] messageBody = message.getBody();try (ObjectInputStream inputStream = new ObjectInputStream(new ByteArrayInputStream(messageBody));) {Map<String, String> msg = (Map<String, String>) inputStream.readObject();log.info(message.getMessageProperties().getConsumerQueue()+"-ack Receiver :" + msg);log.info("header msg :"+message.getMessageProperties().getHeaders());if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.BUSINESS_QUEUE)){channel.basicNack(deliveryTag,false,false);}else if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.DEAD_LETTER_QUEUE)){channel.basicAck(deliveryTag, true);}else {channel.basicAck(deliveryTag, true);}} catch (Exception e) {channel.basicReject(deliveryTag, false);log.error(e.getMessage());}}
}

通过配置 simpleMessageListenerContainer.setQueueNames(MqConstant.DEAD_LETTER_QUEUE)可以监听多个消息队列。

package com.kexuexiong.shushan.common.mq;import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MessageListenerConfig {@Autowiredprivate CachingConnectionFactory connectionFactory;@Autowiredprivate AckReceiver ackReceiver;@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer() {SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);simpleMessageListenerContainer.setConcurrentConsumers(2);simpleMessageListenerContainer.setMaxConcurrentConsumers(2);simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);//,MqConstant.demoDirectQueue, MqConstant.FANOUT_A, MqConstant.BIG_CAR_TOPICsimpleMessageListenerContainer.setQueueNames(MqConstant.DEAD_LETTER_QUEUE);simpleMessageListenerContainer.setMessageListener(ackReceiver);return simpleMessageListenerContainer;}}
package com.kexuexiong.shushan.controller.mq;import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.RandomUtil;
import com.kexuexiong.shushan.common.mq.MqConstant;
import com.kexuexiong.shushan.controller.BaseController;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;@Slf4j
@RestController
@RequestMapping("/mq/")
public class MqController extends BaseController {@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/callback/sendDirectMessage")public String sendDirectMessageCallback(){String msgId = UUID.randomUUID().toString();String msg = "demo msg ,kexuexiong";String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss");Map<String,Object> map = new HashMap();map.put("msgId",msgId);map.put("msg",msg);map.put("createTime",createTime);rabbitTemplate.convertAndSend("noneDirectExchange","demoDirectRouting",map);return "ok";}@GetMapping("/callback/lonelyDirectExchange")public String lonelyDirectExchange(){String msgId = UUID.randomUUID().toString();String msg = "demo msg ,kexuexiong";String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss");Map<String,Object> map = new HashMap();map.put("msgId",msgId);map.put("msg",msg);map.put("createTime",createTime);rabbitTemplate.convertAndSend(MqConstant.lonelyDirectExchange,"demoDirectRouting",map);return "ok";}
}

测试:

发送dirct消息 找不到交换机情况
在这里插入图片描述

2023-10-10T17:04:58.492+08:00 ERROR 27232 --- [.168.49.10:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noneDirectExchange' in vhost '/', class-id=60, method-id=40)
2023-10-10T17:04:58.492+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : confirmCallback  data: null
2023-10-10T17:04:58.492+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : confirmCallback ack :false
2023-10-10T17:04:58.492+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : confirmCallback cause :channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noneDirectExchange' in vhost '/', class-id=60, method-id=40)

ack 为false。

发送dirct消息 找不到队列
在这里插入图片描述

2023-10-10T17:05:55.851+08:00  INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig   : confirmCallback  data: null
2023-10-10T17:05:55.852+08:00  INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig   : confirmCallback ack :true
2023-10-10T17:05:55.852+08:00  INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig   : confirmCallback cause :null
2023-10-10T17:05:55.865+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : returnsCallback msg : ReturnedMessage [message=(Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=lonelyDirectExchange, routingKey=demoDirectRouting]

ACK为true,replyText=NO_ROUTE。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/101104.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

项目创建 Vue3 + Ts + vite + pinia

vite官网 项目初始化 准备安装工作(按步骤创建) npm init vuelatest创建完成后再次安装对应插件 然后百度配置main.ts里面引入 npm i pinia --save //安装pinia npm i vue-router --save //安装router npm i axios --save //安装axios //安装sass或less npm add -D scss npm…

Hadoop----Hive的使用

1.数据库的安装&#xff0c;通过网上教程&#xff0c;使用yum进行安装即可&#xff0c;一定删除干净&#xff0c;下载与Hive版本对应的MySQL。 2.Hive的安装&#xff0c;在官网下载.tar.gz包解压至对应目录&#xff08;/export/server&#xff09;&#xff0c;可以根据网上教程…

Python 函数参数传递

Python 函数参数传递 我们都知道&#xff0c;在C中&#xff0c;函数参数的传递包括&#xff0c;值传递、引用传递、地址传递&#xff0c;这三种参数传递方式&#xff0c;并且可以在定义函数时显式的指明传递方式&#xff0c;而Python并非如此。 Python 的函数参数传递是基于对…

uniapp-vue3微信小程序实现全局分享

uniapp-vue3微信小程序实现全局分享 文章目录 uniapp-vue3微信小程序实现全局分享微信小程序官方文档的分享说明onShareAppMessage(Object object)onShareTimeline() uniapp 官方文档的分享说明onShareAppMessage(OBJECT) 实现全局分享代码结构如下share.js文件内容main.js注意…

聊聊身边的嵌入式:用了七八年的电动牙刷,突然罢工了!!!

家里用了七八年的电动牙刷&#xff0c;前两天突然罢工。先尝试一下野蛮的修复方法(摔摔打打)&#xff0c;这种独家绝技屡试不爽&#xff0c;曾经修好过收音机&#xff0c;电视机&#xff0c;电子手表… 等等。不过这次&#xff0c;没有成功&#xff01;这周末终于有点儿时间&am…

数据库Mysql三大引擎(InnoDB、MyISAM、 Memory)与逻辑架构

MySQL数据库及其分支版本主要的存储引擎有InnoDB、MyISAM、 Memory等。简单地理解&#xff0c;存储引擎就是指表的类型以及表在计算机上的存储方式。存储引擎的概念是MySQL的特色&#xff0c;使用的是一个可插拔存储引擎架构&#xff0c;能够在运行的时候动态加载或者卸载这些存…

PostgreSQL limit 语法

PostgreSQL limit 语法 LIMIT 是 PostgreSQL 中用于限制查询结果数量的关键字。其语法如下&#xff1a; SELECT column1, column2, ... FROM table_name LIMIT number_of_rows;其中&#xff0c;SELECT 语句用于指定要查询的列和数据表&#xff0c;LIMIT 用于指定查询结果的行…

奥威BI系统:做数据可视化大屏,又快又简单

数据可视化大屏的制作难吗&#xff1f;会很花时间精力吗&#xff1f;这就要看用的是什么软件了。如果用的是BI系统&#xff0c;特别是奥威BI系统这类BI商业智能软件&#xff0c;那就是又快又简单。 奥威BI系统介绍&#xff1a; 奥威BI系统是一款高效的数据可视化大屏工具&…

Xilisoft Video Converter Ultimate for Mac:让音视频转换变得更简单

无论是在工作还是娱乐中&#xff0c;我们都会遇到音视频格式不兼容的问题。这时候&#xff0c;一个好用的音视频格式转换工具就显得尤为重要。Xilisoft Video Converter Ultimate for Mac&#xff08;曦力音视频转换&#xff09;就是这样一款让您的音视频转换变得更简单的工具。…

linux的管道命令|

有时候&#xff0c;我们可能需要”递归地“执行多条命令&#xff0c;将命令一的结果作为命令二的输入&#xff0c;这时候就需要使用管道命令| 管道命令|可以将多条命令连接起来&#xff0c;每一个命令单独执行&#xff0c;并作为下一个命令的输入 格式&#xff1a; cmd1|cmd2…

萝卜刀玩具上架亚马逊CPC认证测试标准

含铅或含铅涂料儿童产品的要求 分阶段限制儿童产品所有部件的铅含量&#xff0c;要求在3年内将产品任何可接触部件的铅含量限制从不超过重量的600ppm&#xff08;0.06%&#xff09;降至不超过重量的100ppm&#xff08;0.01%&#xff09;。 铅含量限值&#xff08;总铅含量占重…

Pushgateway的场景使用

1,Pushgateway简介 Pushgateway为Prometheus整体监控方案的功能组件之一,并做为一个独立的工具存在。它主要用于Prometheus无法直接拿到监控指标的场景,如监控源位于防火墙之后,Prometheus无法穿透防火墙;目标服务没有可抓取监控数据的端点等多种情况。在类似场景中,可通…

element树形控件单选

需求功能&#xff1a; 1&#xff0c;element树形控件单选 2&#xff0c;双击节点编辑 <div style"height: calc(100% - 48px)"><el-scrollbar class"scrollbar-wrapper"><el-tree :data"treesObj" show-checkbox default-expan…

Ali MaxCompute SDK

ALI MC 文件读写 public abstract BufferedInputStream readResourceFileAsStream(String var1) throws IOException;LocalExecutionContext.java Overridepublic BufferedInputStream readResourceFileAsStream(String resourceName) throws IOException {try {return wareHou…

stable diffusion艰难炼丹之路

文章目录 概要autoDL系统盘爆满autoDL python3.8切换python3.10dreambooth训练大模型完成后报错 概要 主要是通过autoDL服务器部署stable diffusion&#xff0c;通过dreambooth训练大模型。 问题&#xff1a; autoDL系统盘爆满autoDL python3.8切换python3.10dreambooth训练大…

2023NOIP A层联测6 数点

题目大意 给你一个排列 p p p&#xff0c;对于每一个 i i i&#xff0c;我们在平面上&#xff0c;放置一个点 ( i , p i ) (i,p_i) (i,pi​)。对于坐标上下限都在 1 ∼ n 1\sim n 1∼n内的全体 ( n ( n 1 ) 2 ) 2 (\frac{n(n1)}{2})^2 (2n(n1)​)2矩形&#xff0c;求每个矩形…

什么是云计算?云计算简介

其实“云计算”作为一个名词而言&#xff0c;那是相当成功滴。很多人都有听过。但提及云计算”具体是什么?很多人&#xff0c;知其然&#xff0c;却不知其所以然! 利用软件将这些成千上万不可靠的硬件组织成一个稳定可靠的IT系统&#xff0c;以此支撑其公司的IT基础服务。这家…

基于SpringBoot的靓车汽车销售网站

目录 前言 一、技术栈 二、系统功能介绍 用户信息管理 车辆展示管理 车辆品牌管理 用户交流管理 购物车 用户交流 我的订单管理 三、核心代码 1、登录模块 2、文件上传模块 3、代码封装 前言 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的…

在PHP中,可以使用不同的加密算法(如MD5、SHA1、SHA256)结合RSA算法进行公钥加密和私钥解密。

下面是使用这三种算法进行加密和解密的示例代码&#xff1a; // 生成RSA密钥对 $keyPair openssl_pkey_new(array(private_key_bits > 2048,private_key_type > OPENSSL_KEYTYPE_RSA, ));// 获取私钥和公钥 openssl_pkey_export($keyPair, $privateKey); $publicKey o…

java拆分pdf

pom文件 <dependency><groupId>com.lowagie</groupId><artifactId>itext</artifactId><version>2.1.7</version></dependency>主文件 import java.io.FileOutputStream;import com.lowagie.text.Document; import com.lowagie…