003 Springboot操作RabbitMQ

Springboot整合RabbitMQ

文章目录

    • Springboot整合RabbitMQ
      • 1.pom依赖
      • 2.yml配置
      • 3.配置队列、交换机
        • 方式一:直接通过配置类配置bean
        • 方式二:消息监听通过注解配置
      • 4.编写消息监听发送测试
      • 5.其他类型交换机配置
        • 1.FanoutExchange
        • 2.TopicExchange
        • 3.HeadersExchange
      • 6.延迟消息处理(TTL)
        • 方式一:ttl配置
        • 方式二:消息发送设置
      • 7.死信队列

1.pom依赖

 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version></dependency></dependencies>

2.yml配置

#配置使用的配置文件
spring:#配置rabbitmqrabbitmq:host: 127.0.0.1 #主机地址port: 5672  #端口号username: xxx #用户名password: xxx #密码virtual-host: my_vhost  #虚拟主机地址#开启消息送达提示publisher-returns: true# springboot.rabbitmq.publisher-confirm 新版本已被弃用,现在使用 spring.rabbitmq.publisher-confirm-type = correlated 实现相同效果publisher-confirm-type: correlatedlistener:  #消息监听配置type: simplesimple:acknowledge-mode: manual #manual手动确认消息  auto没有异常时 进行自动确认 (异常类型 消息重新入队)prefetch: 1 #限制每次发送一条数据。concurrency: 3 #同一个队列启动几个消费者max-concurrency: 3 #启动消费者最大数量#重试策略相关配置retry:# 开启消费者(程序出现异常)重试机制,默认开启并一直重试enabled: true# 最大重试次数max-attempts: 5# 重试间隔时间(毫秒)initial-interval: 3000server:port: 18082address: 127.0.0.1servlet:context-path: /

3.配置队列、交换机

方式一:直接通过配置类配置bean

推送消息时不存在创建队列和交换机

/*** direct模式声明配置*/
@Configuration
public class RabbitDirectConfig {public static final String EXCHANGE_NAME="direct-exchange";public static final String QUEUE_NAME="direct-queue";public static final String BINDING_KEY="change:direct";/*** 声明直连交换机* name:交换机的名称* durable 队列是否持久化* autoDelete:是否自动删除,(当该交换机上绑定的最后一个队列解除绑定后,该交换机自动删除)* argument:其他一些参数*/@Beanpublic DirectExchange directExchange() {return new DirectExchange(EXCHANGE_NAME,false,false,null);}/*** 声明队列*  queue 队列的名称*  durable 队列是否持久化*  exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭*  autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。*  arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。*/@Beanpublic Queue directQueue() {return new Queue(QUEUE_NAME,false,false,false,null);}/*** 交换机队列绑定*/@Beanpublic Binding springExchangeBindSpringQueue() {return BindingBuilder.bind(directQueue()).to(directExchange()).with(BINDING_KEY);}}
方式二:消息监听通过注解配置

启动时创建队列和交换机

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "direct1-queue",durable = "true"),exchange = @Exchange(value = "direct1-exchange",type = ExchangeTypes.DIRECT,durable = "true"),key = "change1:direct"))

注意:rabbitmq同名的队列只能创建一个,创建多个会报错,推送消息时需确保队列和交换机已存在,

方式一队列和交换机在第一次推送消息时才会自动创建队列和交换机,方式二注解在启动时就会创建

4.编写消息监听发送测试

监听

@Slf4j
@Component
public class RabbitMQListener {@RabbitListener(queues = "direct-queue")@RabbitHandlerpublic void bootMsg(Channel channel, Message message){String message1 = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println(" direct 消费者:'" + message1 + "'");//手动确认该消息try {//消息确认,根据消息序号(false只确认当前一个消息收到,true确认所有比当前序号小的消息(成功消费,消息从队列中删除 ))channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (IOException e) {log.error("执行异常",e);// 拒绝消息并重新入队channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); }}@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "direct1-queue",durable = "true"),exchange = @Exchange(value = "direct1-exchange",type = ExchangeTypes.DIRECT,durable = "true"),key = "change1:direct"))@RabbitHandlerpublic void bootMsg1(Channel channel, Message message){String message1 = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println(" direct 消费者:'" + message1 + "'");//手动确认该消息try {//消息确认,根据消息序号(false只确认当前一个消息收到,true确认所有比当前序号小的消息(成功消费,消息从队列中删除 ))channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (IOException e) {log.error("执行异常",e);// 拒绝消息并重新入队channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}}

测试

@Slf4j
@SpringBootTest(classes = RabbitProviderApplication.class)
public class RabbitTest {@Autowiredprivate AmqpTemplate amqpTemplate;@Testpublic void directProvider(){String message = "direct模式消息推送。。。。。";/*** 参数分别为,交换机,路由key,消息体*/amqpTemplate.convertAndSend("direct-exchange","change:direct",message);System.out.println(" 消息发送 :'" +message + "'");}@Testpublic void directProvider1(){String message = "direct模式消息推送1。。。。。";/*** 参数分别为,交换机,路由key,消息体*/amqpTemplate.convertAndSend("direct1-exchange","change1:direct",message);System.out.println(" 消息发送1 :'" +message + "'");}}

在这里插入图片描述

5.其他类型交换机配置

1.FanoutExchange
/*** fanout模式声明配置*/
@Configuration
public class RabbitFanoutConfig {public static final String EXCHANGE_NAME="fanout-exchange";public static final String QUEUE_NAME1="fanout-queue1";public static final String QUEUE_NAME2="fanout-queue2";/*** 声明交换机*/@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(EXCHANGE_NAME,false,false,null);}/*** 声明队列*/@Beanpublic Queue fanoutQueue1() {return new Queue(QUEUE_NAME1,false,false,false,null);}@Beanpublic Queue fanoutQueue2() {return new Queue(QUEUE_NAME2,false,false,false,null);}/*** 交换机队列绑定*/@Beanpublic Binding springExchangeBindQueue1() {return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());}/*** 交换机队列绑定*/@Beanpublic Binding springExchangeBindQueue2() {return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());}}

监听

    @RabbitListener(queues = "fanout-queue1")public void fanoutMsg1(Channel channel, Message message) {String message1 = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println(" fanout-queue1 消费者:'" + message1 + "'");}@RabbitListener(queues = "fanout-queue2")public void fanoutMsg2(Channel channel, Message message) {String message1 = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println(" fanout-queue2 消费者:'" + message1 + "'");}

测试

    @Testpublic void fanoutProvider(){String message = "fanout模式消息推送。。。。。";amqpTemplate.convertAndSend("fanout-exchange", "",message);System.out.println(" 消息发送 :'" +message + "'");}

在这里插入图片描述

2.TopicExchange
/*** topic模式声明配置*/
@Configuration
public class RabbitTopicConfig {public static final String EXCHANGE_NAME="topic-exchange";public static final String QUEUE_NAME="topic-queue";public static final String BINDING_KEY="*.orange.#";/*** 声明交换机*/@Beanpublic TopicExchange topicExchange() {return new TopicExchange(EXCHANGE_NAME,false,false,null);}/*** 声明队列*/@Beanpublic Queue topicQueue() {return new Queue(QUEUE_NAME,false,false,false,null);}/*** 交换机队列绑定*/@Beanpublic Binding topicExchangeBindQueue() {return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(BINDING_KEY);}}
    @RabbitListener(queues = "topic-queue")public void topicMsg2(Channel channel, Message message) {String message1 = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println(" topic-queue2 消费者:'" + message1 + "'");}

测试

    @Testpublic void topicProvider(){String message1 = "topic test模式消息推送。。。。。";String message2 = "topic test.aaa模式消息推送。。。。。";amqpTemplate.convertAndSend("topic-exchange", "com.orange.test",message1);amqpTemplate.convertAndSend("topic-exchange", "com.orange.test.aaa",message2);System.out.println(" 消息发送");}

在这里插入图片描述

3.HeadersExchange
/*** headers模式声明配置* 与路由key无关,只需要消息的头参数匹配即可* x-match参数代表是全部匹配还是部分匹配*/
@Configuration
public class RabbitHeadersConfig {public static final String EXCHANGE_NAME="headers-exchange";public static final String QUEUE_NAME="headers-queue";public static final String QUEUE_NAME1="headers-queue1";/*** 声明交换机*/@Beanpublic HeadersExchange headersExchange() {return new HeadersExchange(EXCHANGE_NAME,false,false,null);}/*** 声明队列*/@Beanpublic Queue headersQueue() {return new Queue(QUEUE_NAME,false,false,false,null);}@Beanpublic Queue headersQueue2() {return new Queue(QUEUE_NAME1,false,false,false,null);}/*** 交换机队列绑定(任意匹配)* whereAny 等同于x-match = any*/@Beanpublic Binding headersExchangeBindSpringQueue() {HashMap<String, Object> header = new HashMap<>();header.put("test", "111");header.put("test1", "222");return BindingBuilder.bind(headersQueue()).to(headersExchange()).whereAny(header).match();}/*** 交换机队列绑定(全部匹配)* whereAny 等同于x-match = all*/@Beanpublic Binding headersExchangeBindSpringQueue1() {HashMap<String, Object> header = new HashMap<>();header.put("test", "111");header.put("test1", "222");return BindingBuilder.bind(headersQueue2()).to(headersExchange()).whereAll(header).match();}}

发送测试

  @Testpublic void headerProvider(){String param = "headers 模式消息推送。。。。。";MessageProperties messageProperties = new MessageProperties();messageProperties.setContentType("text/plain");messageProperties.setContentEncoding("utf-8");messageProperties.setHeader("test","111");Message message = new Message(param.getBytes(), messageProperties);amqpTemplate.convertAndSend("headers-exchange", null,message);System.out.println(" 消息发送");}

在这里插入图片描述

队列queue任意匹配有数据,queue1全部匹配无数据

headers-queue

在这里插入图片描述

headers-queue1

在这里插入图片描述

消息监听

    @RabbitListener(queues = "headers-queue")public void headersMsg2(Channel channel, Message message) {String message1 = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println(" headers-queue 消费者:'" + message1 + "'");}@RabbitListener(queues = "headers-queue1")public void headers1Msg2(Channel channel, Message message) {String message1 = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println(" headers-queue1 消费者:'" + message1 + "'");}

在这里插入图片描述

6.延迟消息处理(TTL)

  • 第一种是使用普通队列和死信队列来模拟实现延迟的效果。将消息放入一个没有被监听的队列上,设置TTL(一条消息的最大存活时间)为延迟的时间,时间到了没有被消费,直接成为死信,进入死信队列。后监听私信队列来消息消费

  • 第二种是使用rabbitmq官方提供的delayed插件来真正实现延迟队列。

方式一:ttl配置

超时自动删除

/*** rabbitmq的ttl延迟过期时间配置*/
@Configuration
public class RabbitMQTTLConfig {/*** 声明交换机* @return*/@Beanpublic DirectExchange ttlDirectExchange(){return new DirectExchange("ttl-direct-exchange");}/*** 声明队列* @return*/@Beanpublic Queue ttlQueue(){//设置参数Map<String,Object> args = new HashMap<>();//设置ttl过期时间,需设置int值args.put("x-message-ttl",5000);return new Queue("ttl-direct-queue",true,false,false,args);}/*** 绑定队列* @return*/@Beanpublic Binding ttlBingQueue(){return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("direct:ttl:key");}}

测试

    @Testpublic void ttlSendMessageTest(){String exchange = "ttl-direct-exchange";String routingKey = "direct:ttl:key";String msg = UUID.randomUUID().toString();//发送并设置amqpTemplate.convertAndSend(exchange,routingKey,msg);System.out.println("消息发送成功====="+msg);}

在这里插入图片描述

方式二:消息发送设置

注释掉x-message-ttl参数,使用普通队列,发送消息时设置过期时间

    @Testpublic void ttlSendMessageTest(){String exchange = "ttl-direct-exchange";String routingKey = "direct:ttl:key";String msg = UUID.randomUUID().toString();//设置过期时间MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("5000");message.getMessageProperties().setContentEncoding("UTF-8");return message;}};//发送并设置amqpTemplate.convertAndSend(exchange,routingKey,msg,messagePostProcessor);System.out.println("消息发送成功====="+msg);}

在这里插入图片描述

注意:如果项目中即使用了ttl配置过期时间,有设置了消息过期时间,则执行时以最小的时间为准,ttl过期队列的消息过期会写到死信,而设置方式的普通队列则不会自动写到死信队列

7.死信队列

死信的情况:消息被拒绝,消息过期,队列达到最大长度

死信队列声明

@Configuration
public class RabbitMQDLXConfig {/*** 声明死信交换机* @return*/@Beanpublic DirectExchange dlxDirectExchange(){return new DirectExchange("dlx-direct-exchange");}/*** 声明死信队列* @return*/@Beanpublic Queue dlxQueue(){ ;return new Queue("dlx-direct-queue",true);}/*** 绑定队列* @return*/@Beanpublic Binding dlxBingQueue(){return BindingBuilder.bind(dlxQueue()).to(dlxDirectExchange()).with("direct:dlx:key");}}

过期推送到死信设置

   /*** 声明ttl队列* @return*/@Beanpublic Queue ttlQueue(){//设置参数Map<String,Object> args = new HashMap<>();//设置ttl过期时间,需设置int值args.put("x-message-ttl",5000);args.put("x-max-length",5);//最大长度//消息过期死信队列入队配置args.put("x-dead-letter-exchange","dlx-direct-exchange");//设置死信交换机args.put("x-dead-letter-routing-key","direct:dlx:key");//死信路由key,fanout模式不需要设置路由keyreturn new Queue("ttl-direct-queue",true,false,false,args);}

注意:队列参数修改后,不会重新创建覆盖而是会报错,需要手动删除重新创建,生产环境中则可以通过重新创建一个队列,进行转移

测试

在这里插入图片描述

消息过期进死信队列
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/56398.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

4D-fy: Text-to-4D Generation Using Hybrid Score Distillation Sampling技术路线

这篇文章分为四部分&#xff0c;首先从2021年的CLIP说起。 这篇论文的主要工作是提出了一种名为 CLIP&#xff08;Contrastive Language-Image Pre-training&#xff09; 的模型&#xff0c;它通过自然语言监督学习视觉模型&#xff0c;以实现视觉任务的零样本&#xff08;zer…

20 Shell Script输入与输出

标出输入、标准输出、错误输出 一、程序的基本三个IO流 一&#xff09;文件描述符 ​ 任何程序在Linux系统中都有3个基本的文件描述符 ​ 比如: ​ cd/proc/$$/fd ​ 进入当前shell程序对于内核在文件系统的映射目录中: [rootlocalhost ~]# cd /proc/$$/fd [rootlocalhos…

Web集群服务-代理和负载均衡

1. 概述 1. 用户----->代理--->Web节点,后面只有一个节点,一般使用的是nginx代理功能即可 2. 后面如果是集群需要使用nginx负载均衡功能 2. 代理分类 代理分类方向应用正向代理用户(服务器)-->代理--->外部(某网站)服务器通过代理实现共享上网/访问公网反向代理用…

Linux:进程控制(三)——进程程序替换

目录 一、概念 二、使用 1.单进程程序替换 2.多进程程序替换 3.exec接口 4.execle 一、概念 背景 当前进程在运行的时候&#xff0c;所执行的代码来自于自己的源文件。使用fork创建子进程后&#xff0c;子进程执行的程序中代码内容和父进程是相同的&#xff0c;如果子进…

Python基础语法条件

注释 注释的作用 通过用自己熟悉的语言&#xff0c;在程序中对某些代码进行标注说明&#xff0c;这就是注释的作用&#xff0c;能够大大增强程序的可读性。 注释的分类及语法 注释分为两类&#xff1a;单行注释 和 多行注释。 单行注释 只能注释一行内容&#xff0c;语法如下…

跟着小土堆学习pytorch(一)——Dataset

文章目录 一、前言二、dataset三、代码展示 一、前言 pytorch也是鸽了很久了&#xff0c;确定了下&#xff0c;还是用小土堆的教程。 kaggle获取数据集 二、dataset dateset&#xff1a;数据集——提供一种方式去获取数据及其标签 如何获取数据及其标签以及总共多少个数据…

PostgreSQL学习笔记六:模式SCHEMA

模式&#xff08;Schema&#xff09; PostgreSQL中的模式&#xff08;Schema&#xff09;是一个命名的数据库对象集合&#xff0c;包括表、视图、索引、数据类型、函数、存储过程和操作符等。模式的主要作用是组织和命名空间数据库对象&#xff0c;使得同一个数据库中可以包含…

基于gewechat制作第一个微信聊天机器人

Gewe 个微框架 GeWe&#xff08;个微框架&#xff09;是一个创新性的软件开发框架&#xff0c;为个人微信号以及企业信息安全提供了强大的功能和保障。GeWe的设计旨在简化开发过程&#xff0c;使开发者能够高效、灵活地构建和定制通信协议&#xff0c;以满足不同应用场景的需求…

JavaScript object(2)

这样的话&#xff0c;就变成只读了。

外包干了5天,技术明显退步

我是一名本科生&#xff0c;自2019年起&#xff0c;我便在南京某软件公司担任功能测试的工作。这份工作虽然稳定&#xff0c;但日复一日的重复性工作让我逐渐陷入了舒适区&#xff0c;失去了前进的动力。两年的时光匆匆流逝&#xff0c;我却在原地踏步&#xff0c;技术没有丝毫…

Qualitor checkAcesso.php 任意文件上传漏洞复现(CVE-2024-44849)

0x01 漏洞概述 Qualitor 8.24及之前版本存在任意文件上传漏洞,未经身份验证远程攻击者可利用该漏洞代码执行,写入WebShell,进一步控制服务器权限。 0x02 复现环境 FOFA:app="Qualitor-Web" 0x03 漏洞复现 PoC POST /html/ad/adfilestorage/request/checkAcess…

【IC验证】随机约束

1.约束 &#xff08;1&#xff09;注意 一般随机约束只能在类中使用&#xff1b; &#xff08;2&#xff09;实现步骤 在定义变量时&#xff0c;用rand/randc关键字进行修饰&#xff1b; 定义约束&#xff1b; 创建并实例化类后&#xff0c;调用随机约束方法&#xff1b; &am…

光平面标定代码

本篇文章主要给出光平面标定代码&#xff0c;鉴于自身水平所限&#xff0c;如有错误&#xff0c;欢迎批评指正。&#xff08;欢迎进Q群交流&#xff1a;874653199&#xff09; 数据分为棋盘格数据和激光条数据&#xff0c;激光条数据为在第22个位姿至第26个位姿下打在棋盘格标定…

初识Linux之指令(二)

一&#xff1a;head指令 head 与 tail 就像它的名字一样的浅显易懂&#xff0c;它是用来显示开头或结尾某个数量的文字区块&#xff0c;head 用来显示档案的 开头至标准输出中&#xff0c;而 tail 想当然尔就是看档案的结尾。 语法&#xff1a;head 【参数】 【文件】 功能&…

開發 meshtastic 聊天機器人(2)

利用 Web 串接主機附近周邊藍芽(含 meshtastic client) pip install bleak (這個比較簡單) ----另外一個為 pybluez2 (pybluez) 2.程式 import streamlit as st import asyncio from bleak import BleakScannerasync def fetch_data():devices await BleakScanner.discover(…

京存非编存储走进中央民族大学

中央民族大学是一所具有鲜明民族特色的综合性全国重点大学&#xff0c;是国家民委、教育部、北京市共建高校。学校前身为1941年成立的延安民族学院。新中国成立后&#xff0c;经中央政府批准&#xff0c;1951年在北京成立中央民族学院&#xff0c;1993年11月更名为中央民族大学…

【量化交易】聚宽安装

安装JQData 更换源&#xff1a; 如果使用的是pip默认的PyPI源&#xff0c;可以尝试更换为一个更快的国内镜像源。例如阿里云、豆瓣等提供的PyPI镜像。 更改方法可以通过设置环境变量或者在pip命令中直接指定&#xff1a; PS C:\Users\bilirjs\Documents> pip config set …

No.13 笔记 | 网络安全防护指南:从法律法规到技术防御

一、法律法规 《中华人民共和国网络安全法》要点 遵守法律&#xff1a;所有个人和组织在使用网络时&#xff0c;必须遵守宪法和法律&#xff0c;不得利用网络从事危害国家安全等活动。 个人信息保护&#xff1a;禁止非法获取、出售或提供个人信息。若违反但未构成犯罪&#x…

Karmada核心概念

以下内容为翻译&#xff0c;原文地址 Karmada 是什么&#xff1f; | karmada 一、Karmada核心概念 一&#xff09;什么是Karmada 1、Karmada&#xff1a;开放&#xff0c;多云&#xff0c;多集群Kubernetes业务流程 Karmada (Kubernetes Armada)是一个Kubernetes管理系统&…

【NTN 卫星通信】卫星通信的专利

1 概述 好久没有看书了&#xff0c;最近买了本讲低轨卫星专利的书&#xff0c;也可以说是一个分析报告。推荐给喜欢的朋友。 2 书籍截图 图1 封面 图2 波音低轨卫星专利演进 图3 低轨卫星关键技术专利发展阶段 图4 第一页 3 参考文献 产业专利分析报告–低轨卫星通信技术