RabbitMQ工作模式2 整合springboot 和MQ高级特性

RabbitMQ工作模式

1.路由模式

创建交换机 , 连接队列 (生产者)

public class MyTestExDirect {@Testpublic void bbb() throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();//连接mqconnectionFactory.setUsername("账号");connectionFactory.setPassword("密码");connectionFactory.setHost("ip地址");connectionFactory.setPort(端口号);connectionFactory.setVirtualHost("/aaa");//建立连接Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//创建交换机channel.exchangeDeclare("ex_direct", BuiltinExchangeType.DIRECT,false);//创建队列/*** String queue, 队列的名称* boolean durable, 持久化* boolean exclusive, 是否独占* boolean autoDelete,  受否自动删除* Map<String, Object> arguments  参数*/channel.queueDeclare("mydirect1",false,false,false,null);channel.queueDeclare("mydirect2",false,false,false,null);//绑定交换机和队列   设置routingkeychannel.queueBind("mydirect1","ex_direct","error");channel.queueBind("mydirect2","ex_direct","test");channel.queueBind("mydirect2","ex_direct","test2");//交换机     routingkey     根据routingkey在队列上发布消息channel.basicPublish("ex_direct","error",null,"路由模式测试".getBytes());}
}

启动测试

交换机创建成功

队列创建成功 , 与交换机连接成功

通过routingkey "error" 将消息发送到 mydirect1

创建消费者

public class ConsumerAppDirect
{public static void main( String[] args ) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();//连接mqconnectionFactory.setUsername("账号"); connectionFactory.setPassword("密码"); connectionFactory.setHost("ip地址");connectionFactory.setPort(端口号); connectionFactory.setVirtualHost("/aaa");//建立连接Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("mq:-----aaa"+s);}};channel.basicConsume("mydirect1",true,consumer);}
}

开启监控

2.Topics 主题模式

Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词 

*:匹配不多不少恰好1个词   test.* test.insert

举例:

item.#:能够匹配item.insert.abc 或者 item.insert

item.*:只能匹配item.insert 

创建交换机和生产者

public class MyTestExTopics {@Testpublic void ccc() throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();//连接mqconnectionFactory.setUsername("账号");connectionFactory.setPassword("密码"); connectionFactory.setHost("ip地址");connectionFactory.setPort(端口号);connectionFactory.setVirtualHost("/aaa");//建立连接Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//创建交换机channel.exchangeDeclare("ex_topics", BuiltinExchangeType.TOPIC,false);//创建队列/*** String queue, 队列的名称* boolean durable, 持久化* boolean exclusive, 是否独占* boolean autoDelete,  受否自动删除* Map<String, Object> arguments  参数*/channel.queueDeclare("mytopics1",false,false,false,null);channel.queueDeclare("mytopics2",false,false,false,null);//绑定交换机和队列   设置routingkeychannel.queueBind("mytopics1","ex_topics","test.#");channel.queueBind("mytopics2","ex_topics","*.aaa");channel.queueBind("mytopics2","ex_topics","test.*");//交换机     此处的routingkey应该是具体的值     根据routingkey在队列上发布消息channel.basicPublish("ex_topics","test.aaa",null,"TOPIC模式测试".getBytes());}
}

测试

发布消息成功

消费者监听参考路由模式 , 只需要修改队列就行

SpringBoot整合RabbitMQ

1.搭建项目

添加依赖

<!--2. rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>

添加配置文件

2.创建工作模式(主题模式)

1)创建交换机和队列

package com.example.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class TopicMqConfig {@Value("${mq.exchange.name}")private String EXCHANGENAME;@Value("${mq.queue.name}")private String QUEUENAME1;@Value("${mq.queue.name}")private String QUEUENAME2;//创建交换机@Bean("ex1")public Exchange getExchange(){Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build();return exchange;}//创建队列@Bean("queue1")public Queue getQueue1(){Queue queue1 = QueueBuilder.nonDurable(QUEUENAME1).build();return queue1;}@Bean("queue2")public Queue getQueue2(){Queue queue2 = QueueBuilder.nonDurable(QUEUENAME1).build();return queue2;}//绑定交换机和队列@Bean("binding1")public Binding bindingQueueToExchange1(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("*.*").noargs();return binding1;}@Bean("binding2")public Binding bindingQueueToExchange2(@Qualifier("ex1") Exchange exchange,@Qualifier("queue2") Queue queue){Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("test.*").noargs();return binding2;}
}

2)创建生产者

测试

3)创建消费者

创建配置文件

创建测试类 监听队列
package com.example.message;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ConsumerMessage {@RabbitListener(queues = "test_queue2")public void xxx(Message message){byte[] body = message.getBody();String s = new String(body);System.out.println(s);}
}

测试

MQ高级特性,消息的可靠性传递

1.确认模式

开启确认模式 修改配置

创建测试类

@SpringBootTest
public class MqTtst {@Value("${mq.exchange.name}")private String EXCHANGENAME;@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid sendMsg(){rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {if (b){System.out.println("发送消息成功");}else {System.out.println("发送消息失败,原因:"+s);}}});rabbitTemplate.convertAndSend(EXCHANGENAME,"test.topic","测试springBoot");}
}

启动测试

2.消息回退

当交换机接收到消息 , 但队列收不到消息时 , 使用回退

修改配置

测试

@Test
void sendMsgReturn(){//  消息回退rabbitTemplate.setMandatory(true);//rabbitTemplate.setReturnsCallback(returnedMessage -> System.out.println("消息回退,回退的消息是:"+new String(returnedMessage.getMessage().getBody())));rabbitTemplate.convertAndSend(EXCHANGENAME,"test.topic","测试springBoot");
}

3.Consumer Ack

三种确认方式

 自动确认:acknowledge="none" 。不管处理成功与否,业务处理异常也不管

(当消费者意担接收到消息之后,消费者就会给broker一个回执,证明已经接收到消息 了,不管消息到底是否成功)

手动确认:acknowledge="manual" 。可以解决业务异常的情况

(收到消息之后不会立马确认收到消息,当业务处理没有问题的时候手动的调用代码的方 式来进行处理,如果业务失败了,就可以进行额外的操作)

根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

1)自动确认

2)手动确认

修改配置 开启手动签收

3)创建测试

@Component
public class ShouDingQianShouMeaasge implements ChannelAwareMessageListener {@Override@RabbitListener(queues = "test_queue2")public void onMessage(Message message, Channel channel) throws Exception {Thread.sleep(2000);byte[] body = message.getBody();String s = new String(body);System.out.println(s);long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println(1/0);channel.basicAck(deliveryTag,true);}catch (Exception e){System.out.println("拒绝签收");channel.basicNack(deliveryTag,true,true);}}
}

启动测试

有异常拒绝签收

无异常签收成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/178498.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

可信区块链运行监测服务平台(TBM)发展研讨会在北京召开

2023年11月23日&#xff0c;由中国信息通信研究院、中国移动通信集团设计院有限公司、区块链服务网络&#xff08;BSN&#xff09;发展联盟共同主办的“可信区块链运行监测服务平台&#xff08;TBM&#xff09;发展研讨会”在北京成功举行。会议围绕区块链的监测与治理&#xf…

vatee万腾的科技征途:Vatee数字化力量的新视野

在科技的浪潮中&#xff0c;Vatee万腾正展开一场引人注目的科技征途&#xff0c;以其独特的数字化力量描绘出一片新的视野。这不仅是一次技术的升级&#xff0c;更是一场对未来的全新探索&#xff0c;为我们带来了前所未有的数字化时代。 Vatee万腾以其卓越的技术实力和前瞻性的…

springboot实现数据脱敏

springboot实现数据脱敏 怎么说呢&#xff0c;写着写着发觉 ”这写的什么玩意“ 。 总的来说就是&#xff0c;这篇文章并不能解决数据脱敏问题&#xff0c;但以下链接可以。 SpringBoot中利用自定义注解优雅地实现隐私数据脱敏 然后回到本文&#xff0c;本来是想基于AOP代理&am…

PHP众筹系统源码+支持报名众筹+商品众筹+无偿众筹+市面上所有的众筹模式 附带完整的搭建教程

大家好啊&#xff0c;罗峰今天来给大家分好用的源码系统了。今天要给大家分享的是一款PHP众筹系统源码。众筹作为一种新型的融资方式&#xff0c;逐渐在市场上占据了重要的地位。从公益众筹到商品众筹&#xff0c;再到股权众筹&#xff0c;各种众筹模式层出不穷。然而&#xff…

ELK日志系统

&#xff08;一&#xff09;ELK 1、elk&#xff1a;是一套完整的日志集中处理方案&#xff0c;由三个开源的软件简称组成 2、E&#xff1a;ElasticSearch&#xff08;ES&#xff09;&#xff0c;是一个开源的&#xff0c;分布式的存储检索引擎&#xff08;索引型的非关系型数…

后端整合Swagger+Knife4j接口文档

后端整合SwaggerKnife4j接口文档 接口文档介绍 什么是接口文档&#xff1a;写接口信息的文档&#xff0c;条接口包括&#xff1a; 请求参数响应参数 错误码 接口地址接口名称请求类型请求格式备注 为什么需要接口文档 who用&#xff1f;后端提供&#xff0c;前后端都需要使用…

ESXi 添加虚拟闪存 无可选设备问题排查

虚拟内存是计算机系统中的一种技术&#xff0c;它可以将计算机硬盘的一部分空间作为临时存储器来使用。当计算机的物理内存&#xff08;RAM&#xff09;不足时&#xff0c;操作系统可以将部分数据从内存移至硬盘的虚拟内存空间中&#xff0c;以释放内存供其他程序使用。虚拟内存…

uniapp基础-教程之HBuilderX配置篇-01

uniapp教程之HBuilderX配置篇-01 为什么要做这个教程的梳理&#xff0c;主要用于自己学习和总结&#xff0c;利于增加自己的积累和记忆。首先下载HBuilderX&#xff0c;并保证你的软件在C盘进行运行&#xff0c;最好使用英文或者拼音&#xff0c;这个操作是为了保证软件的稳定…

羊大师提示,别让坏习惯影响生活

羊大师提示&#xff0c;别让坏习惯影响生活 拖延是人们常常会遇到的一种坏习惯&#xff0c;它不仅浪费时间&#xff0c;还会对生活、工作和学习造成负面影响。为了改变这种坏习惯&#xff0c;我们需要采取一系列的措施&#xff0c;从根本上改变自己的生活方式。下面小编羊大师…

bat脚本执行py文件

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

你知道如何使用队列实现栈吗?(C语言)

这时一道非常经典的题型&#xff0c;因为栈和队列的性质是相反的&#xff0c;队列的数据是先入先出&#xff0c;栈的数据是后入先出&#xff0c;那么怎样使用两个队列实现栈呢&#xff1f; 225. 用队列实现栈https://leetcode.cn/problems/implement-stack-using-queues/ 这是…

安卓开发学习---kotlin版---笔记(一)

Hello word 前言&#xff1a;上次学习安卓&#xff0c;学了Java开发&#xff0c;简单的搭了几个安卓界面。这次要学习Kotlin语言&#xff0c;然后开发安卓&#xff0c;趁着还年轻&#xff0c;学点新东西&#xff0c;坚持~ 未来的你会感谢现在努力的你~ 主要学习资料&#xff1a…

leetcode算法之字符串

目录 1.最长公共前缀2.最长回文子串3.二进制求和4.字符串相乘 1.最长公共前缀 最长公共前缀 class Solution { public:string longestCommonPrefix(vector<string>& strs) {//法一&#xff1a;两两比较string ret strs[0];for(int i1;i<strs.size();i){ret f…

NocoBase企业级低代码开发平台有什么优势?

企业级低代码开发平台&#xff0c;作为一种新兴的技术解决方案&#xff0c;正逐渐在企业中受到越来越多的关注和青睐。它以其高效、灵活的特性&#xff0c;为企业的创新提供了更快速、更可持续的支持和推动。 低代码开发平台是一种以图形化界面为基础&#xff0c;结合拖拽式编…

Qt右键菜单+动作+qss案例

Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this);//设置界面颜色样式this->setStyleSheet("background-color:rgb(54,54,54)");//创建文件菜单QMenu *fileMenuItems new QMenu;//菜单添加iconfileMenuItems->se…

如何根据接口文档,轻松快速的模拟接口服务?

什么是WireMock? WireMock 是一个Http 模拟服务,其核心也是一个web服务,WireMock主要是为特定请求提供固定的返回值。 WireMock可以作为单独进程启动,模拟一个WEB服务器,提供一些API访问,并返回特定的返回值。也可以作为第三方库在项目中使用。 如何使用 standalone方…

2161根据数字划分数组

给你一个下标从 0 开始的整数数组 nums 和一个整数 pivot 。请你将 nums 重新排列&#xff0c;使得以下条件均成立&#xff1a; 所有小于 pivot 的元素都出现在所有大于 pivot 的元素 之前 。所有等于 pivot 的元素都出现在小于和大于 pivot 的元素 中间 。小于 pivot 的元素之…

allure修改logo 自定义

无论pytest还是httprunner都适用allure生成报告。那我们就有必要对allure报告进行一些定制。我们先修改logo&#xff1a; 1、给allure.yml插件custom-logo-plugin 找到allure安装的位置&#xff0c;在config文件夹下有一个allure.yml的配置文件。打开它&#xff0c;在最后添加…

Python接口自动化测试 ---Allure报告使用详解

这一节主要是记录allure的内容以及用法&#xff0c;怎么让他生成一个完整的想要的报告。 allure生成的报告和其他五花八门的报告对比了一下&#xff0c;它的可读性是最好、最直观的。这不仅仅是我想要的效果&#xff0c;也是很多小伙伴想要的结果&#xff0c;毕竟这是给领导看…

扩散模型DDPM学习笔记

扩散模型DDPM 文章目录 扩散模型DDPM如何运作基本概念训练过程推理过程&#xff1a; 目标损失函数推导评估标准 论文地址&#xff1a; Denoising Diffusion Probabilistic Models (DDPM) 如何运作 ​ 从guassian distribution进行采样得到一个噪声的图片&#xff0c;图片大小…