RabbitMQ: return机制

1. Return机制

Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。

而且exchange是不能持久化消息的,queue是可以持久化消息。

采用Return机制来监听消息是否从exchange送到了指定的queue中

 2.Java的实现方式

1.导入依赖

        <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency>

2.生产者的实现方式

 采用Return机制来监听消息是否从exchange送到了指定的queue中

package com.qf.mq2302.hello;import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ReturnListener;import java.io.IOException;public class SendRetrun {public static final String QUEUE_NAME="hello-queue";public static void main(String[] args) throws Exception {//1.获取连接对象Connection conn = MQUtils.getConnection();//2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上Channel channel = conn.createChannel();//3.声明了一个队列/*** queue – the name of the queue* durable – true代表创建的队列是持久化的(当mq重启后,该队列依然存在)* exclusive – 该队列是不是排他的 (该对立是否只能由当前创建该队列的连接使用)* autoDelete – 该队列是否可以被mq服务器自动删除* arguments – 队列的其他参数,可以为null*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);//开启 return 机制//编写回调方法channel.addReturnListener(new ReturnListener() {//如果消息没有成功发送到队列,这个方法会被调用@Overridepublic void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("====================ReturnListener==================");System.out.println("replyCode:"+replyCode);System.out.println("replyText:"+replyText);System.out.println("exchange:"+exchange);System.out.println("routingKey:"+routingKey);System.out.println("properties:"+properties);System.out.println("body:"+new String(body,"utf-8"));System.out.println("====================ReturnListener==================");}});String message = "Hello doubleasdasda!";//生产者如何发送消息,使用下面的方法即可/*** exchange – 交换机的名字 ,如果是空串,说明是把消息发给了默认交换机* routingKey – 路由的key,当发送消息给默认交换机时,routingkey代表队列的名字* other properties - 消息的其他属性,可以为null* body – 消息的内容,注意,要是有 字节数组*///注意:如果要使用生产者的return机制,需要在发送消息时,指定mandatory(强制性)为truechannel.basicPublish("", "sadnaas", true,null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");Thread.sleep(1000);//   关闭资源channel.close();conn.close();}
}

这个必须要加上才能让rutern返回机制生效 

 

 3.消费者的实现方式

package com.qf.mq2302.hello;import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;public class Recv {private  final  static  String QUEUE_NAME="hello-queue";public static void main(String[] args) throws Exception {//1.获取连接对象Connection conn = MQUtils.getConnection();//2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上Channel channel = conn.createChannel();/*** 第一个参数队列名称* 第二个参数,耐用性* 第三个参数排外性* 第四个参数是否自动删除* 第五个参数,可以定义什么类型的队列*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);//3.该消费者收到消息之后的处理逻辑,写在DeliverCallback对象中DeliverCallback deliverCallback =new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println(consumerTag);//从Delivery对象中可以获取到生产者,发送的消息的字节数组byte[] body = message.getBody();String msg = new String(body, "utf-8");//在这里写消费者的业务逻辑,例如,发送邮件System.out.println(msg);}};//4.让当前消费者开始消费(QUEUE_NAME)队列中的消息/*** queue – the name of the queue* autoAck – true 代表当前消费者是不是自动确认模式。true代表自动确认。* deliverCallback – 当有消息发送给该消费者时,消费者如何处理消息的逻辑* cancelCallback – 当消费者被取消掉时,如果要执行代码,写到这里*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag -> {});}}

3.整合springboot实现

1.导入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2.yml配置文件

spring:rabbitmq:host: 8.140.244.227port: 6786username: testpassword: testvirtual-host: /testpublisher-returns: true #开启return机制

3.RabbitMQ配置文件

package com.qf.bootmq2302.config;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();//设置连接工厂对象rabbitTemplate.setConnectionFactory(cachingConnectionFactory);// 开启return机制rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("message:"+new String(message.getBody()));System.out.println("replyCode:"+replyCode);System.out.println("replyText:"+replyText);System.out.println("exchange:"+exchange);System.out.println("routingKey:"+routingKey);}});return rabbitTemplate;}}

4.生产者的controller

    @AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/test1")public String test1(String msg,String routkey){System.out.println(msg);String exchangeName = "";//默认交换机String routingkey = routkey;//队列名字//生产者发送消息rabbitTemplate.convertAndSend(exchangeName,routingkey,msg);return "ok";}

5.消费者写一个队列

   @RabbitListener(queues = "queueA")public void getMsg1(Map<String,Object> data, Channel channel,Message message) throws IOException {System.out.println(data);//手动ack//若开启手动ack,不给手动ack,就按照 prefetch: 1 #等价于basicQos(1)的量,就这么多,不会多给你了,因为你没有确认。确认一条,就给你一条channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}

6.消费者的配置文件

spring:rabbitmq:host: 8.140.244.227port: 6786username: testpassword: testvirtual-host: /test#手动ACKlistener:simple:acknowledge-mode: manual  # 手动ackprefetch: 1 #等价于basicQos(1)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/72785.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SQL函数

函数 字符串函数数值函数日期函数流程函数 字符串函数 常用函数&#xff1a; 函数功能CONCAT(s1, s2, …, sn)字符串拼接&#xff0c;将s1, s2, …, sn拼接成一个字符串LOWER(str)将字符串全部转为小写UPPER(str)将字符串全部转为大写LPAD(str, n, pad)左填充&#xff0c;用…

git ------ IDEA中建立本地/远程仓库及上传

目录 建立本地仓库 1. idea中选择创建本地仓库 选择目标文件 创建远程仓库 1.码云上进行库创建 将本地仓库数据提交到远程仓库 提交代码 推送到远程 建立本地仓库 1. idea中选择创建本地仓库 或 vsm中找下列2 即可 选择目标文件 成功后会出现以下标识 更新 提交 推…

盖子的c++小课堂——第二十二讲:2维dp

前言 大家好&#xff0c;我又来更新了&#xff0c;今天终于有时间了aaaaaaaa 破500粉了&#xff0c;我太高兴了哈哈哈哈哈哈&#xff08;别看IP地址&#xff0c;我去北京旅游回来了&#xff0c;他没改回来&#xff09;&#xff0c;然后我马上就成为创作者一年了&#xff0c;希…

Gitea--私有git服务器搭建详细教程

一.官方文档 https://docs.gitea.com/zh-cn/说明 gitea 是一个自己托管的Git服务程序。他和GitHub, Gitlab等比较类似。他是从 Gogs 发展而来&#xff0c;gitea的创作团队重新fork了代码&#xff0c;并命名为giteagitea 功能特性多&#xff0c;能够满足我们所有的的代码管理需…

Java基于 SpringBoot 的车辆充电桩系统

博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝30W,Csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 文章目录 1、效果演示效果图技术栈 2、 前言介绍&#xff08;完整源码请私聊&#xff09;3、主要技术3.4.1 …

C语言文本为什么不包括库函数和预处理命令

C语言的文本不包括库函数和预处理命令 是因为库函数和预处理命令并不是C语言本身的一部分&#xff0c; 它们是由C语言标准库和预处理器提供的功能。 C语言标准库是一组预定义的函数和常量&#xff0c; 用于提供常见的功能&#xff0c;如输入输出、字符串处理、数学计算等。 …

国密SSL证书是什么?cfca的证书靠谱吗

2019年&#xff0c;《中华人民共和国密码法》第三章表明&#xff1a;“国家鼓励在外商投资过程中基于自愿原则和商业规则开展商用密码技术合作。” 2020年&#xff0c;《中华人民共和国密码法》正式施行&#xff0c; 标志着国家在提升密码工作的科学化、规范化、法治化水平&am…

TLA+学习记录1——hello world

0x01 TLA是个好工具 编程人员一个好习惯是凡事都想偷懒&#xff0c;当然是指要科学地偷懒&#xff0c;而不是真的偷懒。一直想找到一种能检验写出的代码&#xff0c;做出的设计是否真的完全正确&#xff0c;而不是靠经验检视、代码Review、反复测试去检验。因为上述方法不管怎…

End-to-End Object Detection with Transformers(论文解析)

End-to-End Object Detection with Transformers 摘要介绍相关工作2.1 集合预测2.2 transformer和并行解码2.3 目标检测 3 DETR模型3.1 目标检测集设置预测损失3.2 DETR架构 摘要 我们提出了一种将目标检测视为直接集合预测问题的新方法。我们的方法简化了检测流程&#xff0c…

借助AI分析哥斯拉木马原理与Tomcat回显链路挖掘

前言 本次分析使用了ChatGPT进行辅助分析&#xff0c;大大提升了工作效率&#xff0c;很快就分析出木马的工作流程和构造出利用方式。 分析 首先对该木马进行格式化,以增强代码的可读性。得到如下代码 <jsp:root xmlns:jsp"http://java.sun.com/JSP/Page" vers…

基于Python开发的五子棋小游戏(源码+可执行程序exe文件+程序配置说明书+程序使用说明书)

一、项目简介 本项目是一套基于Python开发的五子棋小游戏&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的Python学习者。 包含&#xff1a;项目源码、项目文档、数据库脚本等&#xff0c;该项目附带全部源码可作为毕设使用。 项目都经过严格调试&a…

斯坦福小镇升级版——AI-Town搭建指南

导语&#xff1a; 8月份斯坦福AI小镇开源之后&#xff0c;引起了 AIGC 领域的强烈反响&#xff0c;但8月份还有另一个同样非常有意义的 AI-Agent 的项目开源&#xff0c;a16z主导的 AI-Town 本篇文章主要讲解如何搭建该项目&#xff0c;如有英文基础或者对这套技术栈熟悉&#…

TCP之三次握手四次挥手

在前面的文章中我们了解到http是基于TCP/IP协议的&#xff0c;这篇文章我们来了解一下TCP/IP。 一、TCP与UDP 1、UDP 基于非连接。类似于写信&#xff0c;不能保证对方能不能接收到&#xff0c;接收到的内容是否完整&#xff0c;顺序是否正确。 优缺点&#xff1a;性能损耗小…

云数据库知识学习——概述

一、云计算是云数据库兴起的基础 云计算是分布式计算、并行计算、效用计算、网络存储、虚拟化、负载均衡等计算机和网络技术发展融合的产物。云计算是由一系列可以动态升级和被虚拟化的资源组成的&#xff0c;用户无需掌握云计算的技术&#xff0c;只要通过网络就可以访问这些资…

Maven中导入jQuery,前端页面中引用jQuery

第一步pom文件中&#xff0c;配置maven坐标。 第二步&#xff0c;在前端页面中引用jQuery 注&#xff1a;该前端页面需要在web根目录即webapp目录下。可认为在maven中导入jQuery后&#xff0c;jquery.min.js文件放在目录webapp/webjars/jquery/3.3.1下。

HarmonyOS开发:探索动态共享包的依赖与使用

前言 所谓共享包&#xff0c;和Android中的Library本质是一样的&#xff0c;目的是为了实现代码和资源的共享&#xff0c;在HarmonyOS中&#xff0c;给开发者提供了两种共享包&#xff0c;HAR&#xff08;Harmony Archive&#xff09;静态共享包&#xff0c;和HSP&#xff08;H…

Spring整合tomcat的WebSocket详细逻辑(图解)

主要解决存在的疑问 为什么存在2种spring整合websocket的方式&#xff0c;一种是使用ServerEndpoint注解的方式&#xff0c;一种是使用EnableWebSocket注解的方式&#xff0c;这2种有什么区别和联系&#xff1f;可以共存吗&#xff1f;它们实现的原理是什么&#xff1f;它们的各…

redis实战篇之导入黑马点评项目

1. 搭建黑马点评项目 链接&#xff1a;https://pan.baidu.com/s/1Q0AAlb4jM-5Fc0H_RYUX-A?pwd6666 提取码&#xff1a;6666 1.1 首先&#xff0c;导入SQL文件 其中的表有&#xff1a; tb_user&#xff1a;用户表 tb_user_info&#xff1a;用户详情表 tb_shop&#xff1a;商户…

windows11安装docker时,修改默认安装到C盘

1、修改默认安装到C盘 2、如果之前安装过docker&#xff0c;请删除如下目录&#xff1a;C:\Program Files\Docker 3、在D盘新建目录&#xff1a;D:\Program Files\Docker 4、winr&#xff0c;以管理员权限运行cmd 5、在cmd中执行如下命令&#xff0c;建立软联接&#xff1a; m…

【办公类-19-03】办公中的思考——Python批量制作word单元格照片和文字(小照片系列)

背景需求&#xff1a; 工会老师求助&#xff1a;如何在word里面插入4*8的框&#xff0c;我怎么也拉不到4*8大小&#xff08;她用的是我WORD 文本框&#xff09; 我一听&#xff0c;这又是要手动反复黏贴“文本框”“照片”“文字”的节奏哦 我问&#xff1a;你要做几个人&…