RabbitMQ的Confirm机制

1.消息的可靠性

RabbitMQ提供了Confirm的确认机制。

Confirm机制用于确认消息是否已经发送给了交换机

2.Java的实现 

1.导入依赖

        <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency>

2.Confirm机制的生产者

package com.qf.mq2302.hello;import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Send {//声明队列名字 public static final String QUEUE_NAME="queueA";public static void main(String[] args) throws Exception {//1.获取连接对象Connection conn = MQUtils.getConnection();//2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上Channel channel = conn.createChannel();//3 开启confirmchannel.confirmSelect();//3.声明了一个队列/*** queue – the name of the queue* durable – true代表创建的队列是持久化的(当mq重启后,该队列依然存在)* exclusive – 该队列是不是排他的 (该对立是否只能由当前创建该队列的连接使用)* autoDelete – 该队列是否可以被mq服务器自动删除* arguments – 队列的其他参数,可以为null*/
//     channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello doubleasdasda!";//生产者如何发送消息,使用下面的方法即可/*** exchange – 交换机的名字 ,如果是空串,说明是把消息发给了默认交换机* routingKey – 路由的key,当发送消息给默认交换机时,routingkey代表队列的名字* other properties - 消息的其他属性,可以为null* body – 消息的内容,注意,要是有 字节数组*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");//检查消息是否发送成功了try {/*** 判断是否发送到交换机上,如果发送到了返回true,* 如果因为交换机名字错了,发送不到交换机,则会抛出异常,会自动关闭channel*/if (channel.waitForConfirms()) {//如果返回true,代表交换机成功接收到了消息System.out.println("消息已经成功发送给了交换机");//关闭资源channel.close();}else {System.out.println("消息发送给交换机失败了");//关闭资源channel.close();}} catch (InterruptedException e) {System.out.println("消息发送给交换机失败了");System.out.println("失败的消息为:"+message);}conn.close();}
}

3.confirm 机制的消费者

 

package com.qf.mq2302.hello;import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;public class Recv {private  final  static  String QUEUE_NAME="hello-queue";public static void main(String[] args) throws Exception {//1.获取连接对象Connection conn = MQUtils.getConnection();//2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上Channel channel = conn.createChannel();/*** 第一个参数队列名称* 第二个参数,耐用性* 第三个参数排外性* 第四个参数是否自动删除* 第五个参数,可以定义什么类型的队列*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);//3.该消费者收到消息之后的处理逻辑,写在DeliverCallback对象中DeliverCallback deliverCallback =new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println(consumerTag);//从Delivery对象中可以获取到生产者,发送的消息的字节数组byte[] body = message.getBody();String msg = new String(body, "utf-8");//在这里写消费者的业务逻辑,例如,发送邮件System.out.println(msg);}};//4.让当前消费者开始消费(QUEUE_NAME)队列中的消息/*** queue – the name of the queue* autoAck – true 代表当前消费者是不是自动确认模式。true代表自动确认。* deliverCallback – 当有消息发送给该消费者时,消费者如何处理消息的逻辑* cancelCallback – 当消费者被取消掉时,如果要执行代码,写到这里*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag -> {});}}

3.整合springboot实现

1.导入依赖

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2.yml配置文件

spring:rabbitmq:host: 8.140.244.227port: 6786username: testpassword: testvirtual-host: /testpublisher-confirm-type: correlated #在springboot 项目下开启生产者的confirm机制

3.RabbitMQ配置文件

package com.qf.bootmq2302.config;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();//设置连接工厂对象rabbitTemplate.setConnectionFactory(cachingConnectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("correlationData:"+correlationData.getId());System.out.println("correlationData:"+new String(correlationData.getReturnedMessage().getBody()));//通过id可以去redis 里取 value消息//代表消息是否发送给交换机成功,发送失败false ,发送成功 trueSystem.out.println("ack:"+ack);//代表错误的原因System.out.println("cause:"+cause);}});return rabbitTemplate;}}

4.生产者写一个Controller

   @AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/test1")public String test1(String msg,String routkey){System.out.println(msg);String exchangeName = "";//默认交换机String routingkey = routkey;//队列名字//创建一个 CorrelationData 对象CorrelationData correlationData = new CorrelationData();correlationData.setId("001");Message message = new Message(msg.getBytes(), null);correlationData.setReturnedMessage(message);//要把消息的内容和消息的编号 存放到redis中, key=消息编号,value=消息内容//key = bootmq:failmessage:001//生产者发送消息//第四个参数,可以携带自定义的correlationDatarabbitTemplate.convertAndSend(exchangeName,routingkey,msg,correlationData);return "ok";}

5.消费者写一个接收队列消息

   @RabbitListener(queues = "queueA")public void getMsg1(Map<String,Object> data, Channel channel,Message message) throws IOException {System.out.println(data);//手动ack//若开启手动ack,不给手动ack,就按照 prefetch: 1 #等价于basicQos(1)的量,就这么多,不会多给你了,因为你没有确认。确认一条,就给你一条channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}

6.消费者的配置文件

spring:rabbitmq:host: 8.140.244.227port: 6786username: testpassword: testvirtual-host: /test#手动ACKlistener:simple:acknowledge-mode: manual  # 手动ackprefetch: 1 #等价于basicQos(1)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/72965.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MVCC究竟是什么?

&#xff11;.MVCC概念 MVCC&#xff0c;全称多版本并发控制 MVCC究竟是什么&#xff1f; 通俗的来说MVCC就是为了在读取数据时不加锁来提高读取效率的一种办法&#xff0c;MVCC解决的是读写时线程安全问题&#xff0c;线程不用去抢占读写锁。MVCC中的读就是快照读&#xff0c…

初识计算机和命令行操作

文章目录 计算机基础知识计算机是什么计算机的组成计算机的使用方式Windows的命令行环境变量&#xff08;Environment Variable&#xff09;PATH环境变量进制文本文件和字符集纯文本和富文本字符集乱码 python专栏推荐&#xff1a;python基础知识&#xff08;0基础入门&#xf…

无涯教程-JavaScript - DELTA函数

描述 DELTA函数测试两个值是否相等。如果number1 number2,则返回1&#xff1b;否则返回1。否则返回0。 您可以使用此功能来过滤一组值。如,通过合计几个DELTA函数,您可以计算相等对的计数。此功能也称为Kronecker Delta功能。 语法 DELTA (number1, [number2])争论 Argum…

window mysql-8.0.34 zip解压包安装

window系统上安装mysql8 解压版 下载压缩包 https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-8.0.34-winx64.zip安装 用解压软件解压刚下载的mysql-8.0.34-winx64.zip 的文件至d:\devs路径下。 创建配置文件my.ini到路径d:\devs\mysql-8.0.34-winx64下 [mysqld] # 设置…

Postman接口测试之Mock快速入门

一、Mock简介 1.Mock定义 Mock是一种比较特殊的测试技巧&#xff0c;可以在没有依赖项的情况下进行接口或单元测试。通常情况下&#xff0c;Mock与其他方法的区别是&#xff0c;用于模拟代码依赖对象&#xff0c;并允许设置对应的期望值。简单一点来讲&#xff0c;就是Mock创建…

CSS 滚动驱动动画 scroll()

CSS 滚动驱动动画 scroll() animation-timeline 通过 scroll() 指定可滚动元素与滚动轴来为容器动画提供一个匿名的 scroll progress timeline. 通过元素在顶部和底部(或左边和右边)的滚动推进 scroll progress timeline. 并且元素滚动的位置会被转换为百分比, 滚动开始被转化为…

Vue3中快速简单使用CKEditor 5富文本编辑器

Vue3简单使用CKEditor 5 前言准备定制基础配置富文本配置目录当前文章demo目录结构 快速使用demo 前言 CKEditor 5就是内嵌在网页中的一个富文本编辑器工具 CKEditor 5开发文档&#xff08;英文&#xff09;&#xff1a;https://ckeditor.com/docs/ckeditor5/latest/index.htm…

DBMS_RESOURCE_MANAGER

参考文档&#xff1a; Database Administrator’s Guide 27 Managing Resources with Oracle Database Resource Manager 27.5.5 Creating a Resource Plan BEGINDBMS_RESOURCE_MANAGER.CREATE_PENDING_AREA();DBMS_RESOURCE_MANAGER.CREATE_PLAN(PLAN > bugdb_plan,…

SpringMVC:从入门到精通,7篇系列篇带你全面掌握--三.使用SpringMVC完成增删改查

&#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 接下来看看由辉辉所写的关于SpringMVC的相关操作吧 目录 &#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 效果演示 一.导入项目的相关依赖 二.…

港联证券:大数据看北上资金胜率:整体跑赢市场,六成持股浮亏

一直以来&#xff0c;北上资金被称为“聪明资金”&#xff0c;其一举一动备受出资者重视。盛名之下&#xff0c;其战绩究竟如何&#xff1f;复盘前史数据发现&#xff0c;北上资金全体业绩跑赢沪深300指数&#xff0c;但现在持仓个股浮亏占比约六成。 8月&#xff0c;北上资金…

npm publish包报404,is not in the npm registry错误

1. 指定发布目标2. 登录npm&#xff0c;使用登录名发布包&#xff0c;包名命名原则“登录名/包名”&#xff0c;或 “包名” 3. 删除某一个版本npm unpublish pvfhv/eslint-config-prettier1.0.1 --force 删除后的版本不能重复使用&#xff0c;正式解释&#xff1a; Unfortun…

containerd的安装和使用

containerd的安装和使用 1、containerd介绍 containerd 是从 docker 项目中剥离出来的一个容器运行时、几乎囊括了容器管理的所有功能&#xff0c;并且 containerd 内置了 CRI 插件&#xff0c;k8s 的 kubelet 组件可以直接调用 containerd&#xff0c;相较于 docker 容器运…

无涯教程-JavaScript - IMLOG2函数

描述 IMLOG2函数以x yi或x yj文本格式返回复数的以2为底的对数。可以从自然对数计算复数的以2为底的对数,如下所示- $$\log_2(x yi)(log_2e)\ln(x yi)$$ 语法 IMLOG2 (inumber)争论 Argument描述Required/OptionalInumberA complex number for which you want the bas…

【web开发】4.JavaScript与jQuery

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 一、JavaScript与jQuery二、JavaScript常用的基本功能1.插入位置2.注释3.变量4.数组5.滚动字符 三、jQuery常用的基本功能1.引入jQuery2.寻找标签3.val、text、appe…

static关键字和final关键字

在java的关键字中&#xff0c;static关键字和final关键字是两个必须掌握的关键字。static关键字和final关键字用法多样&#xff0c;且在一定环境下使用&#xff0c;可以提高程序的运行性能&#xff0c;优化程序的结构。下面将依次介绍static关键字和final关键字。注意&#xff…

关于scipy库的入门教程

本教程将介绍如何使用Scipy库进行科学计算和数据分析。Scipy是一个基于NumPy的开源Python库&#xff0c;提供了很多高级的数学函数和科学计算工具。 安装Scipy库 在开始教程之前&#xff0c;首先需要安装Scipy库。可以使用以下命令进行安装&#xff1a; pip install scipy导入…

go的gin框架实现接受多个图片和单个视频并保存到本地服务器的接口

首先是接受多个图片的接口&#xff0c;就是接受多个文件 收到post请求后首先创建一个文件夹&#xff0c;这里利用uuid创建出唯一标识字符串作为文件夹名称&#xff0c;解析表单中的一串文件循环保存到本地服务器 package mainimport ("github.com/gin-gonic/gin"&q…

iveiw 时间验证

最近遇到在使用iview框架时&#xff0c;在商家后端管理系统中&#xff0c;在合同发布时会使用到form表单组件&#xff0c;当然日期也通常出现在搜索 框表单中&#xff0c;但是有时候会出现日期组件校验错误的情况 首先&#xff0c;iview采用的是async-validator的校验规则&…

SwiftUI简单基础知识学习

以下是一个大致的学习计划&#xff0c;将SwiftUI的知识分成12个主题&#xff1a; SwiftUI 简介和基础语法视图和布局状态和数据流按钮和用户输入列表和数据展示导航和页面传递动画和过渡效果手势和交互绘制和绘图多平台适配网络和数据请求实际项目实践和高级主题 每个主题可以…

go开发之个微机器人的二次开发

请求URL&#xff1a; http://域名/addRoomMemberFriend 请求方式&#xff1a; POST 请求头Headers&#xff1a; Content-Type&#xff1a;application/jsonAuthorization&#xff1a;login接口返回 参数&#xff1a; 参数名必选类型说明wId是String登录实例标识chatRoom…