RabbitMQ工作模式2 整合springboot 和MQ高级特性

RabbitMQ工作模式

1.路由模式

创建交换机 , 连接队列 (生产者)

public class MyTestExDirect {@Testpublic void bbb() throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();//连接mqconnectionFactory.setUsername("账号");connectionFactory.setPassword("密码");connectionFactory.setHost("ip地址");connectionFactory.setPort(端口号);connectionFactory.setVirtualHost("/aaa");//建立连接Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//创建交换机channel.exchangeDeclare("ex_direct", BuiltinExchangeType.DIRECT,false);//创建队列/*** String queue, 队列的名称* boolean durable, 持久化* boolean exclusive, 是否独占* boolean autoDelete,  受否自动删除* Map<String, Object> arguments  参数*/channel.queueDeclare("mydirect1",false,false,false,null);channel.queueDeclare("mydirect2",false,false,false,null);//绑定交换机和队列   设置routingkeychannel.queueBind("mydirect1","ex_direct","error");channel.queueBind("mydirect2","ex_direct","test");channel.queueBind("mydirect2","ex_direct","test2");//交换机     routingkey     根据routingkey在队列上发布消息channel.basicPublish("ex_direct","error",null,"路由模式测试".getBytes());}
}

启动测试

交换机创建成功

队列创建成功 , 与交换机连接成功

通过routingkey "error" 将消息发送到 mydirect1

创建消费者

public class ConsumerAppDirect
{public static void main( String[] args ) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();//连接mqconnectionFactory.setUsername("账号"); connectionFactory.setPassword("密码"); connectionFactory.setHost("ip地址");connectionFactory.setPort(端口号); connectionFactory.setVirtualHost("/aaa");//建立连接Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("mq:-----aaa"+s);}};channel.basicConsume("mydirect1",true,consumer);}
}

开启监控

2.Topics 主题模式

Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词 

*:匹配不多不少恰好1个词   test.* test.insert

举例:

item.#:能够匹配item.insert.abc 或者 item.insert

item.*:只能匹配item.insert 

创建交换机和生产者

public class MyTestExTopics {@Testpublic void ccc() throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();//连接mqconnectionFactory.setUsername("账号");connectionFactory.setPassword("密码"); connectionFactory.setHost("ip地址");connectionFactory.setPort(端口号);connectionFactory.setVirtualHost("/aaa");//建立连接Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//创建交换机channel.exchangeDeclare("ex_topics", BuiltinExchangeType.TOPIC,false);//创建队列/*** String queue, 队列的名称* boolean durable, 持久化* boolean exclusive, 是否独占* boolean autoDelete,  受否自动删除* Map<String, Object> arguments  参数*/channel.queueDeclare("mytopics1",false,false,false,null);channel.queueDeclare("mytopics2",false,false,false,null);//绑定交换机和队列   设置routingkeychannel.queueBind("mytopics1","ex_topics","test.#");channel.queueBind("mytopics2","ex_topics","*.aaa");channel.queueBind("mytopics2","ex_topics","test.*");//交换机     此处的routingkey应该是具体的值     根据routingkey在队列上发布消息channel.basicPublish("ex_topics","test.aaa",null,"TOPIC模式测试".getBytes());}
}

测试

发布消息成功

消费者监听参考路由模式 , 只需要修改队列就行

SpringBoot整合RabbitMQ

1.搭建项目

添加依赖

<!--2. rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>

添加配置文件

2.创建工作模式(主题模式)

1)创建交换机和队列

package com.example.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class TopicMqConfig {@Value("${mq.exchange.name}")private String EXCHANGENAME;@Value("${mq.queue.name}")private String QUEUENAME1;@Value("${mq.queue.name}")private String QUEUENAME2;//创建交换机@Bean("ex1")public Exchange getExchange(){Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build();return exchange;}//创建队列@Bean("queue1")public Queue getQueue1(){Queue queue1 = QueueBuilder.nonDurable(QUEUENAME1).build();return queue1;}@Bean("queue2")public Queue getQueue2(){Queue queue2 = QueueBuilder.nonDurable(QUEUENAME1).build();return queue2;}//绑定交换机和队列@Bean("binding1")public Binding bindingQueueToExchange1(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("*.*").noargs();return binding1;}@Bean("binding2")public Binding bindingQueueToExchange2(@Qualifier("ex1") Exchange exchange,@Qualifier("queue2") Queue queue){Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("test.*").noargs();return binding2;}
}

2)创建生产者

测试

3)创建消费者

创建配置文件

创建测试类 监听队列
package com.example.message;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ConsumerMessage {@RabbitListener(queues = "test_queue2")public void xxx(Message message){byte[] body = message.getBody();String s = new String(body);System.out.println(s);}
}

测试

MQ高级特性,消息的可靠性传递

1.确认模式

开启确认模式 修改配置

创建测试类

@SpringBootTest
public class MqTtst {@Value("${mq.exchange.name}")private String EXCHANGENAME;@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid sendMsg(){rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {if (b){System.out.println("发送消息成功");}else {System.out.println("发送消息失败,原因:"+s);}}});rabbitTemplate.convertAndSend(EXCHANGENAME,"test.topic","测试springBoot");}
}

启动测试

2.消息回退

当交换机接收到消息 , 但队列收不到消息时 , 使用回退

修改配置

测试

@Test
void sendMsgReturn(){//  消息回退rabbitTemplate.setMandatory(true);//rabbitTemplate.setReturnsCallback(returnedMessage -> System.out.println("消息回退,回退的消息是:"+new String(returnedMessage.getMessage().getBody())));rabbitTemplate.convertAndSend(EXCHANGENAME,"test.topic","测试springBoot");
}

3.Consumer Ack

三种确认方式

 自动确认:acknowledge="none" 。不管处理成功与否,业务处理异常也不管

(当消费者意担接收到消息之后,消费者就会给broker一个回执,证明已经接收到消息 了,不管消息到底是否成功)

手动确认:acknowledge="manual" 。可以解决业务异常的情况

(收到消息之后不会立马确认收到消息,当业务处理没有问题的时候手动的调用代码的方 式来进行处理,如果业务失败了,就可以进行额外的操作)

根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

1)自动确认

2)手动确认

修改配置 开启手动签收

3)创建测试

@Component
public class ShouDingQianShouMeaasge implements ChannelAwareMessageListener {@Override@RabbitListener(queues = "test_queue2")public void onMessage(Message message, Channel channel) throws Exception {Thread.sleep(2000);byte[] body = message.getBody();String s = new String(body);System.out.println(s);long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println(1/0);channel.basicAck(deliveryTag,true);}catch (Exception e){System.out.println("拒绝签收");channel.basicNack(deliveryTag,true,true);}}
}

启动测试

有异常拒绝签收

无异常签收成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/178498.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

可信区块链运行监测服务平台(TBM)发展研讨会在北京召开

2023年11月23日&#xff0c;由中国信息通信研究院、中国移动通信集团设计院有限公司、区块链服务网络&#xff08;BSN&#xff09;发展联盟共同主办的“可信区块链运行监测服务平台&#xff08;TBM&#xff09;发展研讨会”在北京成功举行。会议围绕区块链的监测与治理&#xf…

小程序如何实现下拉刷新?

一、全局下拉刷新 在app.json的window节点中&#xff0c;将enablePullDownRefresh设置为true&#xff1b; onPullDownRefresh: function () {console.log(下拉刷新);// 在这里编写数据更新的逻辑wx.stopPullDownRefresh(); // 数据更新完成后&#xff0c;调用该方法停止刷新}二…

vatee万腾的科技征途:Vatee数字化力量的新视野

在科技的浪潮中&#xff0c;Vatee万腾正展开一场引人注目的科技征途&#xff0c;以其独特的数字化力量描绘出一片新的视野。这不仅是一次技术的升级&#xff0c;更是一场对未来的全新探索&#xff0c;为我们带来了前所未有的数字化时代。 Vatee万腾以其卓越的技术实力和前瞻性的…

springboot实现数据脱敏

springboot实现数据脱敏 怎么说呢&#xff0c;写着写着发觉 ”这写的什么玩意“ 。 总的来说就是&#xff0c;这篇文章并不能解决数据脱敏问题&#xff0c;但以下链接可以。 SpringBoot中利用自定义注解优雅地实现隐私数据脱敏 然后回到本文&#xff0c;本来是想基于AOP代理&am…

PHP众筹系统源码+支持报名众筹+商品众筹+无偿众筹+市面上所有的众筹模式 附带完整的搭建教程

大家好啊&#xff0c;罗峰今天来给大家分好用的源码系统了。今天要给大家分享的是一款PHP众筹系统源码。众筹作为一种新型的融资方式&#xff0c;逐渐在市场上占据了重要的地位。从公益众筹到商品众筹&#xff0c;再到股权众筹&#xff0c;各种众筹模式层出不穷。然而&#xff…

ELK日志系统

&#xff08;一&#xff09;ELK 1、elk&#xff1a;是一套完整的日志集中处理方案&#xff0c;由三个开源的软件简称组成 2、E&#xff1a;ElasticSearch&#xff08;ES&#xff09;&#xff0c;是一个开源的&#xff0c;分布式的存储检索引擎&#xff08;索引型的非关系型数…

WebSocket--1.协议解析

目录 一.概念 二.建立流程 三.四大事件 五.js中建立ws链接 六.springboot中进行ws连接 1.首先&#xff0c;添加WebSocket的依赖到你的Spring Boot项目中。 2.接下来&#xff0c;创建一个WebSocket处理器 3.最后&#xff0c;创建一个配置类&#xff0c;注册该WebSocket处理…

后端整合Swagger+Knife4j接口文档

后端整合SwaggerKnife4j接口文档 接口文档介绍 什么是接口文档&#xff1a;写接口信息的文档&#xff0c;条接口包括&#xff1a; 请求参数响应参数 错误码 接口地址接口名称请求类型请求格式备注 为什么需要接口文档 who用&#xff1f;后端提供&#xff0c;前后端都需要使用…

python:类中静态方法,类方法和实例方法的使用与区别

python 类里面常用的方法有3个&#xff1a;静态方法(staticmethod)&#xff0c;类方法(classmethod)和实例方法(self) 1. 函数和方法 1.1 函数&#xff1a; 函数定义是由def()关键字定义 def fun():a "hello"return a# 函数调用 res fun() print(res)1.2 方法-…

ESXi 添加虚拟闪存 无可选设备问题排查

虚拟内存是计算机系统中的一种技术&#xff0c;它可以将计算机硬盘的一部分空间作为临时存储器来使用。当计算机的物理内存&#xff08;RAM&#xff09;不足时&#xff0c;操作系统可以将部分数据从内存移至硬盘的虚拟内存空间中&#xff0c;以释放内存供其他程序使用。虚拟内存…

uniapp基础-教程之HBuilderX配置篇-01

uniapp教程之HBuilderX配置篇-01 为什么要做这个教程的梳理&#xff0c;主要用于自己学习和总结&#xff0c;利于增加自己的积累和记忆。首先下载HBuilderX&#xff0c;并保证你的软件在C盘进行运行&#xff0c;最好使用英文或者拼音&#xff0c;这个操作是为了保证软件的稳定…

羊大师提示,别让坏习惯影响生活

羊大师提示&#xff0c;别让坏习惯影响生活 拖延是人们常常会遇到的一种坏习惯&#xff0c;它不仅浪费时间&#xff0c;还会对生活、工作和学习造成负面影响。为了改变这种坏习惯&#xff0c;我们需要采取一系列的措施&#xff0c;从根本上改变自己的生活方式。下面小编羊大师…

qt 5.15.2读取csv文件功能

qt 5.15.2读取csv文件功能 工程文件.pro 内容&#xff1a; QT core#添加网络模块 QT networkCONFIG c17 cmdline# You can make your code fail to compile if it uses deprecated APIs. # In order to do so, uncomment the following line. #DEFINES QT_DISABLE_DEPREC…

bat脚本执行py文件

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

RK356x监控温度及CPU频率的服务(Ubuntu20.04)

1 脚本 touch /userdata/show_temps.sh添加内容 #! /bin/bash //压力测试 stress --cpu 4 & stress --cpu 4 &while true; dotypes($(cat /sys/class/thermal/thermal_zone*/type))temps($(cat /sys/class/thermal/thermal_zone*/temp))freqs($(cat /sys/bus/cpu/dev…

你知道如何使用队列实现栈吗?(C语言)

这时一道非常经典的题型&#xff0c;因为栈和队列的性质是相反的&#xff0c;队列的数据是先入先出&#xff0c;栈的数据是后入先出&#xff0c;那么怎样使用两个队列实现栈呢&#xff1f; 225. 用队列实现栈https://leetcode.cn/problems/implement-stack-using-queues/ 这是…

安卓开发学习---kotlin版---笔记(一)

Hello word 前言&#xff1a;上次学习安卓&#xff0c;学了Java开发&#xff0c;简单的搭了几个安卓界面。这次要学习Kotlin语言&#xff0c;然后开发安卓&#xff0c;趁着还年轻&#xff0c;学点新东西&#xff0c;坚持~ 未来的你会感谢现在努力的你~ 主要学习资料&#xff1a…

面试题库之JAVA基础篇(一)

java的特性 面向对象&#xff0c;能够使程序的耦合度更低&#xff0c;内聚性更高。平台无关性&#xff0c;因为java程序运行在jvm虚上。支持多线程。安全可靠。有丰富的类库。 面向对象 万物皆对象&#xff0c;将解决问题的各个元素抽象成对象&#xff0c;对象中包含解决单个…

leetcode算法之字符串

目录 1.最长公共前缀2.最长回文子串3.二进制求和4.字符串相乘 1.最长公共前缀 最长公共前缀 class Solution { public:string longestCommonPrefix(vector<string>& strs) {//法一&#xff1a;两两比较string ret strs[0];for(int i1;i<strs.size();i){ret f…

Linux gzip命令用法详解:如何压缩和解压文件(附实例教程和注意事项)

Linux gzip命令介绍 Linux gzip命令用于压缩文件。它可以减小文件的大小以节省磁盘空间&#xff0c;并且可以通过gzip命令将多个文件合并成一个压缩文件。 Linux gzip命令适用的Linux版本 Linux gzip命令可以在多数Linux发行版&#xff08;如Debian、Ubuntu、Alpine、Arch L…