Java17 --- RabbitMQ之常规使用

目录

一、实现消息可靠性投递

1.1、消息生产者端确认机制

1.2、备份交换机 

1.3、消费端确认机制

二、消费端限流设置

三、消息超时设置

3.1、从队列设置全局超时时间 

3.2、设置消息本身超时时间 

四、死信 

4.1、消费端拒绝接收消息

4.1.1、创建死信交换机与队列 

4.1.2、创建常规交换机与队列

 4.2、消息数量超过队列容纳限度

五、延迟队列

5.1、使用死信队列实现

5.2、使用插件实现 

5.3、创建交换机与队列 

六、事务消息 

七、优先级队列


一、实现消息可靠性投递

1.1、消息生产者端确认机制

修改yml

spring:rabbitmq:host: 192.168.200.110port: 5672username: guestpassword: 123456virtual-host: / publisher-confirm-type: correlated #交换机确认publisher-returns: true #队列确认
@Configuration
@Slf4j
public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void initRabbitTemplate(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}//消息发送到交换机成功或失败都会调用@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("correlationData:" + correlationData);log.info("ack:" + ack);log.info("cause:" + cause);}//发送到队列失败调用@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("消息主体:" + new String(returnedMessage.getMessage().getBody()));log.info("应答码:" + returnedMessage.getReplyCode());log.info("描述:" + returnedMessage.getReplyText());log.info("使用交换机:" + returnedMessage.getExchange());log.info("消息使用的路由键:" + returnedMessage.getRoutingKey());}
}
@SpringBootTest
public class MQTest {public static final String EXCHANGE_DIRECT = "exchange.direct.order";public static final String ROUTING_KEY = "order";@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test1(){rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"你好,美羊羊");}
}

1.2、备份交换机 

创建备份交换机

 

创建绑定队列

 

将原交换机与备份交换机绑定

 

1.3、消费端确认机制

修改yml

 

spring:rabbitmq:host: 192.168.200.110port: 5672username: guestpassword: 123456virtual-host: /listener:simple:acknowledge-mode: manual #手动确认
    @RabbitListener(queues = {QUEUE_NAME})public void getMessage(String date, Message message, Channel channel) throws IOException {//获取当前deliveryTagIDlong deliveryTag = message.getMessageProperties().getDeliveryTag();try {log.info(" "+1 / 0);//成功返回ACK信息channel.basicAck(deliveryTag,false);log.info("接收消息为:" + date);} catch (Exception e) {//获取消息是否重复投递Boolean redelivered = message.getMessageProperties().getRedelivered();//失败返回NACK信息if (redelivered){//long var1,// boolean var3,// boolean var4 控制消息是否重新放回队列channel.basicNack(deliveryTag,false,false);}else {channel.basicNack(deliveryTag,false,true);}throw new RuntimeException(e);}}
}

二、消费端限流设置

只需要修改yml

spring:rabbitmq:host: 192.168.200.110port: 5672username: guestpassword: 123456virtual-host: /listener:simple:acknowledge-mode: manual #手动确认prefetch: 1 #设置每次从队列中读取消息数

三、消息超时设置

3.1、从队列设置全局超时时间 

 

3.2、设置消息本身超时时间 

@Testpublic void test4(){//创建消息后置处理器MessagePostProcessor messagePostProcessor = message -> {//设置过期时间,单位毫秒message.getMessageProperties().setExpiration("10000");return message;};rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"a",messagePostProcessor);}

四、死信 

4.1、消费端拒绝接收消息

4.1.1、创建死信交换机与队列 

正常创建绑定即可 

4.1.2、创建常规交换机与队列

创建常规队列注意事项

 

 4.2、消息数量超过队列容纳限度

 @Testpublic void test5(){for (int i = 1; i <=20 ; i++) {rabbitTemplate.convertAndSend(EXCHANGE_DIRECT_NORMAL,ROUTING_KEY_NORMAL,"a"+i);}}

五、延迟队列

5.1、使用死信队列实现

5.2、使用插件实现 

 docker inspect rabbitmq

下载的插件放入source后的目录 

进入容器内部

docker exec -it rabbitmq /bin/bash 

启动插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

退出容器

exit

重启容器

docker restart rabbitmq

5.3、创建交换机与队列 

 

队列正常创建,无需参数设置 

测试代码:

 public static final String EXCHANGE_DIRECT_DELAY = "exchange.delay";public static final String ROUTING_KEY_DELAY = "delay";
@Testpublic void test6(){//创建消息后置处理器MessagePostProcessor messagePostProcessor = message -> {//设置过期时间,单位毫秒//必须安装启动延迟插件设置才生效message.getMessageProperties().setHeader("x-delay","10000");return message;};rabbitTemplate.convertAndSend(EXCHANGE_DIRECT_DELAY,ROUTING_KEY_DELAY,"你好,插件" + new SimpleDateFormat("HH:mm:ss").format(new Date()),messagePostProcessor);}
public static final String QUEUE_NAME_DELAY = "queue.delay";
@RabbitListener(queues = {QUEUE_NAME_DELAY})public void getMessageDelay(String date, Message message, Channel channel) throws Exception {//获取当前deliveryTagIDlong deliveryTag = message.getMessageProperties().getDeliveryTag();//成功返回ACK信息channel.basicAck(deliveryTag,false);log.info("接收消息为:" + date);log.info("当前时间为:" + new SimpleDateFormat("HH:mm:ss").format(new Date()));}

六、事务消息 

 在Java配置类进行设置

@Beanpublic RabbitTransactionManager transactionManager(CachingConnectionFactory cachingConnectionFactory){return new RabbitTransactionManager(cachingConnectionFactory);}@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){RabbitTemplate rabbitTemplate1 = new RabbitTemplate(cachingConnectionFactory);rabbitTemplate1.setChannelTransacted(true);return rabbitTemplate1;}
 @Test@Transactional@Rollback(value = false)public void test7(){rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"异常前");int var = 3 / 0;rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"异常后");}

七、优先级队列

 创建交换机与队列

@Testpublic void test8(){//创建消息后置处理器MessagePostProcessor messagePostProcessor = message -> {message.getMessageProperties().setPriority(3);return message;};rabbitTemplate.convertAndSend("exchange.priority","priority","第3级",messagePostProcessor);}

 

 

 

 

 

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/27339.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LangChain入门学习笔记(一)——Hello World

什么是LangChain LangChain是一个开源&#xff08;github repo&#xff09;的大语言模型应用开发框架&#xff0c;提供了一整套的工具、方法和接口去帮助程序员构建基于大语言模型的端到端应用。LangChain是长链&#xff08;long chain&#xff09;的意思&#xff0c;它的一个…

Linux驱动开发笔记(八)输入子系统

文章目录 前言一、输入子系统1. 子系统的引入2. 组成部分3. 事件处理流程4. 相关数据结构 二、程序编写1. 相关API函数1.1 input_allocate_device ( )1.2 input_free_device ( )1.3 input_register_device ( )1.4 input_unregister_device ( )1.5 input_event ( )1.6 input_rep…

Prometheus写入influxDB:中间件remote_storage_adapter

Prometheus写入influxDB&#xff1a;中间件remote_storage_adapter prometheus默认采用的是本地磁盘做数据存储&#xff0c;本地存储的优势就是运维简单但是缺点就是无法海量的metrics持久化和数据存在丢失的风险,数据写入可能造成wal文件损坏导致采集数据无法再写入的问题。 …

【嵌入式DIY实例】-Nokia 5110显示DS18B20传感器数据

Nokia 5110显示DS18B20传感器数据 文章目录 Nokia 5110显示DS18B20传感器数据1、硬件准备2、代码实现本文将介绍如何使用 ESP8266 NodeMCU 板和 DS18B20 数字温度传感器实现简单的温度测量站。 NodeMCU 微控制器 (ESP8266EX) 从 DS18B20 传感器读取温度值,并将其打印在诺基亚 …

LeetCode 2786.访问数组中的位置使分数最大:奇偶分开记录(逻辑还算清晰的题解)

【LetMeFly】2786.访问数组中的位置使分数最大&#xff1a;奇偶分开记录&#xff08;逻辑还算清晰的题解&#xff09; 力扣题目链接&#xff1a;https://leetcode.cn/problems/visit-array-positions-to-maximize-score/ 给你一个下标从 0 开始的整数数组 nums 和一个正整数 …

如何使⽤C语⾔填充封闭图形?

一、问题 如要对封闭图形&#xff08;如两个区域的交集&#xff09;进⾏填充&#xff0c;那么怎么实现呢&#xff1f; 二、解答 填充就是⽤指定的颜⾊和图案填满⼀个封闭图形。 TC 提供了⼀个可对任意封闭图形填充的函数&#xff0c;即 floodfill( ) 。其调⽤格式如下&#xf…

Nginx部署Vue项目css文件能加载但是不生效

目录 问题描述问题解决 问题描述 Nginx部署打包后的Vue项目css文件能加载但是不生效&#xff0c; 问题解决 查看响应标头&#xff0c;发现不对劲&#xff0c; Content-Type: text/plain正确的应该是 Content-Type: text/css根本原因是nginx没有告诉浏览器正确的文件类型 所…

Kubernetes面试整理-Kubernetes 如何工作?

1. 部署应用: ● 开发者或管理员定义一组期望的状态(通常通过 YAML 文件),描述了应用包括的 pods、容器镜像、网络设置和存储要求。 ● 这些定义文件会提交给 API 服务器,存储在 etcd 中。 2. 调度: ● 当创建 pod 请求提交给 Kubernetes 时,调度器会选择一个节点来部署…

Postman接口测试/接口自动化实战教程

一、API 自动化测试 Postman 最基本的功能用来重放请求&#xff0c;并且配合良好的 response 格式化工具。 高级点的用法可以使用 Postman 生成各个语言的脚本&#xff0c;还可以抓包&#xff0c;认证&#xff0c;传输文件。 仅仅做到这些还不能够满足一个系统的开发&#x…

springboot学习小结

背景 业务上需要开发&#xff0c;组里一位前辈给我指路 spring基础 什么是spring spring提供一个容器称为spring应用上下文&#xff0c;容器里可以创建和管理组件&#xff0c;组件会在容器里装配好&#xff0c;组件也可以叫bean。 装配不由组件创建他依赖的组件&#xff0…

Python学习打卡:day05

day5 笔记来源于&#xff1a;黑马程序员python教程&#xff0c;8天python从入门到精通&#xff0c;学python看这套就够了 目录 day538、函数的初体验39、函数的基础定义语法函数的定义注意事项 40、函数的基础定义案例练习41、函数的传入参数42、函数的传入参数案例练习——升…

GEE数据融合——Landsat (collection 2,level 2 )4、5、7、8、9长时间序列影像数据融合和视频导出分析

本次我们使用Landsat (collection 2,level 2 )4、5、7、8、9数据的地标反射率数据进行融合,来实现指定区域的影像导出分析。 简介 长时间序列影像数据融合是指将Landsat影像数据集合2级2(Level 2)中的4、5、7、8和9这五个卫星的数据进行融合。具体来说,这包括将同一地…

毕业年薪30W起!25届最近5年浙江大学自动化考研院校分析

浙江大学 目录 一、学校学院专业简介 二、考试科目指定教材 三、近5年考研分数情况 四、近5年招生录取情况 五、最新一年分数段图表 六、历年真题PDF 七、初试大纲复试大纲 八、学费&奖学金&就业方向 一、学校学院专业简介 二、考试科目指定教材 1、考试科目…

揭秘招生简章的制作方法

一年一度的招生季即将来临&#xff0c;各大院校纷纷摩拳擦掌&#xff0c;准备迎接新的学子。对于学校而言&#xff0c;招生简章是其对外宣传的重要窗口&#xff0c;它直接关系到学校的招生效果和声誉。那么&#xff0c;如何制作一份既吸引人又实用的招生简章呢&#xff1f;下面…

web前端翻页:技术探秘与未来趋势

web前端翻页&#xff1a;技术探秘与未来趋势 Web前端翻页&#xff0c;作为网页交互体验的重要组成部分&#xff0c;始终吸引着开发者的关注。其设计原理、实现技巧以及未来趋势&#xff0c;都是我们在探索前端技术时不可忽视的方面。本文将从四个方面、五个方面、六个方面和七…

GIS之arcgis系列09:arcpy实现克里金差值

矢量点数据经过克里金差值后可以转换成栅格数据&#xff0c;那么就需要了解一下什么是克里金差值。 什么是克里金法? IDW(反距离加权法)和样条函数法插值工具被称为确定性插值方法&#xff0c;因为这些方法直接基于周围的测量值或确定生成表面的平滑度的指定数学公式。第二类…

【leetcode--字母异位词分组】

class Solution:def groupAnagrams(self, strs: List[str]) -> List[List[str]]:np collections.defaultdict(list)for st in strs:name "".join(sorted(st))np[name].append(st)return list(np.values()) collections.defaultdict(list)创建字典类型&#xff…

Git代码冲突原理与三路合并算法

Git代码冲突原理 Git合并文件是以行为单位进行一行一行合并的&#xff0c;但是有些时候并不是两行内容不一样Git就会报冲突&#xff0c;这是因为Git会帮助我们进行分析得出哪个结果是我们所期望的最终结果。而这个分析依据就是三路合并算法。当然&#xff0c;三路合并算法并不…

Flowable-决策表设计器

✨✨✨ 最好用的Flowable决策表设计器 ✨✨✨ 最好用的Flowable流程设计器 本文中内容和案例出自贺波老师的书《深入Activiti流程引擎&#xff1a;核心原理与高阶实战》&#xff0c;书中的介绍更全面、详细&#xff0c;推荐给大家。 深入Activiti流程引擎

C++ 31 之 静态成员变量

#include <iostream> #include <string.h> using namespace std;// 特点: // 1.在编译阶段就分配了内存空间 // 2.类内声明&#xff0c;在类外进行初始化 // 3.所有对象共享一份静态成员数据 class Students01{ public:static int s_a; // 静态成员变量int s_b; };…