Spring Cloud 之RabbitMQ的学习【详细】

服务通信

分布式系统通信两种方式:

  1. 直接远程调用(同步)
  2. 借助第三方间接通信(异步)

同步通讯的问题

Feign就属于同步通讯。存在的如下问题

  • 耦合度高,每次添加新的模块就要修改原有模块的代码
  • 性能下降,调用者需要等待服务者返回的结果,如果调用链过长,则响应的时间越长
  • 资源浪费,在等待的过程中,不会释放CPU与内存资源,在高并发的场景下占用浪费资源过大
  • 级联失败,当调用链中一个服务宕机,那么调用者也会出现问题。

异步调用方案

异步调用常见的实现方式为事件驱动模式

事件驱动模式优点:

  • 服务解耦,添加模块不需要更改其他服务的代码
  • 性能提升,在用户请求的模块可以直接返回结果,不需要等待其他服务执行完毕后再返回结果
  • 服务没有强依赖关系,一个服务宕机不会影响到其他服务
  • 流量削峰

缺点:

  • 依赖了第三方组件,第三方组件需要保证可靠性、安全性、吞吐能力
  • 架构复杂,业务没有明显流程线,不好追踪管理
  • 一致性问题

MQ

MQ:Message Queue消息队列,是消息在传输的过程中保存消息的容器。多用于分布式系统之间进行通信

Kafka适用于数据量大但对数据安全性不高的场景比如说日志的传输

RabbitMQ与RocketMQ适用于对数据安全要求较高的场景,比如说业务之间的传输信息

满足什么条件才可以使用MQ?

  1. 生产者不需要从消费者处获取任何信息
  2. 容许短暂不一致性
  3. 使用MQ的效果收益大于管理MQ成本

RabbitMQ的下载

在虚拟机上启动dokcer服务后拉去rabbitmq镜像

systemctl start docker
docker pull rabbitmq

RabbitMQ的启动

docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--name mq \
--hostname mql \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:latest

命令解释:

-e RABBITMQ_DEFAULT_USER=admin :指定登录账号为admin

-e RABBITMQ_DEFAULT_PASS=admin :指定登录密码为admin

--name mq :容器名为mq

--hostname mq1 主机名为mq1(做集群时使用,不添加也可以)

-p 15672:15672 端口映射

-p 5672:5672

-d 后台允许

rabbitmq:latest

访问15672端口输入密码登录

可能会遇到的问题

1、关闭防火墙后访问端口仍然无法访问15672端口

解决方法:

开启防火墙
systemctl start firewalld
开放端口
firewall-cmd --zone=public --add-port=15672/tcp --permanent
重新加载配置文件
firewall-cmd --reload

2、即使开放了端口15672也无法访问页面

解决方法:

如果是docker拉取的rabbitmq镜像,需要手动进入容器下载rabbitmq的管理插件

进入容器
docker exec -it 容器名 bash
下载rabbitmq的管理插件
rabbitmq-plugins enable rabbitmq_management
修改配置文件
cd  /etc/rabbitmq/conf.d/
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
退出镜像
exit
重启rabbitmq
docker restart 容器名

RabbitMQ的结构与概念

RabbitMQ中的几个概念

  • channel:操作MQ的工具
  • exchange:路由消息到队列中
  • queue:缓存消息
  • virtualhost:虚拟主机,是对queue、exchange等资源的逻辑分组

常见消息模型

不使用交换机的

  • 基本消息队列
  • 工作消息队列

使用交换机的

  • Fanout Exchange:广播
  • Direct Exchange:路由
  • Topic Exchange:主题

简单消息队列的实现

只存在三种角色:

  • publisher:消息发布者,将消息发送到队列queue
  • queue:消息队列,负责接受并缓存消息
  • consumer:订阅队列,处理队列中的消息

示例代码:

引入依赖

<dependencies><!--rabbitMQ的Java客户端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency>
</dependencies>
/*** 发送消息方*/
public class Producer_Hello {public static void main(String[] args) throws IOException, TimeoutException {//1、创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();//2、设置参数connectionFactory.setHost("192.168.116.131");connectionFactory.setPort(5672);//默认也是5672connectionFactory.setVirtualHost("/");//设置虚拟机 默认值是/connectionFactory.setUsername("admin");//默认值是guestconnectionFactory.setPassword("admin");//默认值是guest//3、创建连接ConnectionConnection connection = connectionFactory.newConnection();//4、创建ChannelChannel channel = connection.createChannel();//5、创建队列Queue/*** String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments* 参数* 1.queue:队列名称* 2.durable:是否持久化* 3.exclusive:*      *是否独占。只能有一个消费者监听这队列当*      *Connection关闭时,是否删除队列全* 4.autoDelete:是否自动删除。当没有Consumer时,自动删除掉* 5.arguments:参数。*///如果没有一个交helloWorld的队列,那么会自动创建一个channel.queueDeclare("hello_World",true,false,false,null);//6、发送消息/*** String exchange, String routingKey, BasicProperties props, byte[] body* 1、exchange交换机(简单模式下不会使用交换机,默认使用"")* 2、routingKey:路由名称* 3、props:配置信息* 4、body:发送消息数据*/String body="Hello";channel.basicPublish("","hello_World",null,body.getBytes());//7、释放资源channel.close();connection.close();}
}

首先看到目前没有连接

打断点启动

当Connection connection = connectionFactory.newConnection()运行结束后。查看控制台连接信息

接下来启动消费者

public class Consumer_Hello {public static void main(String[] args) throws Exception {//1、创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();//2、设置参数connectionFactory.setHost("192.168.116.131");connectionFactory.setPort(5672);//默认也是5672connectionFactory.setVirtualHost("/");//设置虚拟机 默认值是/connectionFactory.setUsername("admin");//默认值是guestconnectionFactory.setPassword("admin");//默认值是guest//3、创建连接ConnectionConnection connection = connectionFactory.newConnection();//4、创建ChannelChannel channel = connection.createChannel();//5、创建队列Queue/*** String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments* 参数* 1.queue:队列名称* 2.durable:是否持久化,当mq重启之后,还在* 3.exclusive:*      *是否独占。只能有一个消费者监听这队列当*      *Connection关闭时,是否删除队列全* 4.autoDelete:是否自动删除。当没有Consumer时,自动删除掉* 5.arguments:参数。*///如果没有一个交helloWorld的队列,那么会自动创建一个channel.queueDeclare("hello_World",true,false,false,null);/*** String queue, boolean autoAck, Consumer callback* queue:队列名称* autoAck:是否自动确认* callback:回调对象*/Consumer consumer =new DefaultConsumer(channel){/*回调方法,收到消息后,自动执行*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope.getExchange());System.out.println("properties:"+properties);System.out.println("body:"+new String(body));}};channel.basicConsume("hello_World",true,consumer);//消费者需要监听因此不需要关闭资源}
}

生产者与消费者都需要声明队列是为了避免队列不存在的情况

SpringAMQP的使用

AMQP是用于在应用程序或之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。而SpringAMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现

生产者实现

引入依赖

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

在application.yml配置如下信息

spring:rabbitmq:host: 192.168.116.131port: 5672username: adminpassword: adminvirtual-host: /

编写测试类

@SpringBootTest
@RunWith(SpringRunner.class)
public class test {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSend2SimpleQueue() throws Exception {String queueName ="hello";String message = "hello, spring amqp";rabbitTemplate.convertAndSend(queueName,message);}
}

运行测试观察rabbit控制台

消费者实现

引入依赖和配置相关信息与消费者相同,不同的是,编写一个监听器去监听队列

@Component
public class SpringRabbitListener {@RabbitListener(queues = "hello")public void listenSimpleQueueMessage(String msg){System.out.println("接收到消息:"+msg);}
}

启动引导类观察控制台

Work Queue工作队列

提高消息处理速度, 避免消息的堆积问题

案例实现:

生产者1秒内生产50条消息

    @Testpublic void testWorkQueueSendMessage() throws Exception {String queueName ="hello";String message = "hello, spring amqp__";for (int i = 0; i < 50; i++) {rabbitTemplate.convertAndSend(queueName,message+i);Thread.sleep(20);}}

而消费者代码如下

@Component
public class SpringRabbitListener {@RabbitListener(queues = "hello")public void listenWorkQueueMessage1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:"+msg);Thread.sleep(30);}@RabbitListener(queues = "hello")public void listenWorkQueueMessage2(String msg) throws InterruptedException {System.out.println("====消费者2接收到消息:"+msg);Thread.sleep(50);}
}

运行结果如下

可以看到每个消费者各处理25条,消费者1处理更快处理结束不会去处理更多的消息而是等待消费者2处理结束。

这种情况是因为Rabbit中存在消息预取的行为,当消息处理前会从Channel中提前拿去一部分消息(类似于轮询平均分配)后再去处理,当我们希望处理更快的设备能够读取更多的消息时,我们可以设置消息预取限制。在application.yml文件中添加如下配置

spring:rabbitmq:host: 192.168.116.131port: 5672username: adminpassword: adminvirtual-host: /listener:simple:prefetch: 1 #每次最多获取一条消息,处理完成后才能获取下一条消息

修改完后再次执行观察控制台

可以看到消费能力更强的处理消息更多。

工作队列模式应用于任务过重或任务过多的场景(比如说发送短信)

发布订阅模式

前两种模式只是将消息发送给一个消费者,而发布订阅模式可以将消息发送给多个消费者。实现方式是加入了exchange(交换机)。exchange只负责路由,不负责存储。路由失败则消息丢失。

Fanout交换机(广播模式)

@Configuration
public class FanoutConfig {//声明交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("fanoutExchange");}//声明队列@Beanpublic Queue queue1(){return new Queue("fanoutQueue1");}@Beanpublic Queue queue2(){return new Queue("fanoutQueue2");}//声明绑定关系@Beanpublic Binding binding1(Queue queue1,FanoutExchange exchange){return BindingBuilder.bind(queue1).to(exchange);}@Beanpublic Binding binding2(Queue queue2,FanoutExchange exchange){return BindingBuilder.bind(queue2).to(exchange);}
}

重启消费者观察Rabbit控制台

编写监听器

@Component
public class SpringRabbitListener {@RabbitListener(queues = "fanoutQueue1")public void listenFanoutQueueMessage1(String msg){System.out.println("从队列queue1中获取到消息:"+msg);}@RabbitListener(queues = "fanoutQueue2")public void listenFanoutQueueMessage2(String msg){System.out.println("从队列queue2中获取到消息:"+msg);}
}

编写生产者测试类

    @Testpublic void testFanoutQueueSendMessage() throws Exception {String exchangeName = "fanoutExchange";String message = "hello, fanout";rabbitTemplate.convertAndSend(exchangeName,"",message);}

启动观察Rabbit控制台

Direct交换机(路由模式)

  1. 每一个Queue都与Exchange设置一个BindingKey
  2. 发布者发送消息时,指定消息的RoutingKey
  3. Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

案例实现

如果和Fanout模式一样去声明绑定关系的话,会比较麻烦,编写代码较多,我们可以采用注解的方式去声明绑定关系。

@Component
public class SpringRabbitListener {@RabbitListener(bindings = @QueueBinding(value = @Queue("directQueue1"),exchange = @Exchange(value = "directExchange",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void listenDirectQueueMessage1(String msg){System.out.println("从队列queue1中获取到消息:"+msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue("directQueue2"),exchange = @Exchange(value = "directExchange",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void listenDirectQueueMessage2(String msg){System.out.println("从队列queue1中获取到消息:"+msg);}
}

运行消费者后观察Rabbit控制台

编写生产者测试类代码

    @Testpublic void testDirectQueueSendMessage() throws Exception {String exchangeName = "directExchange";String message = "hello, direct";rabbitTemplate.convertAndSend(exchangeName, "blue", message);rabbitTemplate.convertAndSend(exchangeName, "red", message + " red");rabbitTemplate.convertAndSend(exchangeName, "yellow", message + " yellow");}

运行观察控制台

TopicExchange(话题模式)

案例实现

@Component
public class SpringRabbitListener {@RabbitListener(bindings = @QueueBinding(value = @Queue("topicQueue1"),exchange = @Exchange(value = "topicExchange",type = ExchangeTypes.TOPIC),key = "china.#"))public void listenTopicQueueMessage1(String msg){System.out.println("从中国话题队列中获取到消息:"+msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue("topicQueue2"),exchange = @Exchange(value = "topicExchange",type = ExchangeTypes.TOPIC),key = "#.news"))public void listenTopicQueueMessage2(String msg){System.out.println("从新闻话题队列中获取到消息:"+msg);}
}

运行观察Rabbit控制台

编写生产者测试类代码

    @Testpublic void testTopicQueueSendMessage() throws Exception {String exchangeName = "topicExchange";String message = "hello, topic";rabbitTemplate.convertAndSend(exchangeName, "china.news", message+" 中国新闻");rabbitTemplate.convertAndSend(exchangeName, "china.#", message + "晴朗");rabbitTemplate.convertAndSend(exchangeName, "#.news", message + "战争");}

运行观察Rabbit控制台

发送三条消息但共有5条消息

消息转换器

在简单消息队列的实现中,我们发送消息发送的是字节数组。但是接收的消息反而是String类型的字符。那是因为。Spring中对消息的处理是由org.springframework.amqp.support.converter.MessageConverter处理默认使用SimpleMessageConverter来实现序列化(基于JDK的ObjectOutputStream实现)

进行一个测试,创建一个object.queue队列,发送一个Map类型的数据

    @Testpublic void testSendObject() throws Exception {Map<String, Object> map = new HashMap<>();map.put("name","zmbwcx");String queueName = "object.queue";rabbitTemplate.convertAndSend(queueName,map);}

观察Rabbit控制台

消息内容被JDK序列化为上图内容,这种序列化方式不安全且占用内存更大。增加了传输成本。

我们可以修改为JSON的序列化方式,具体操作如下

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId>
</dependency>
@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}

重新发送一条消息,观察Rabbit控制台

生产者与消费者应该使用同一个消息转换器,因此,消费者也应进行相同操作

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/125248.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[论文阅读]Voxel R-CNN——迈向高性能基于体素的3D目标检测

Voxel R-CNN Voxel R-CNN: Towards High Performance Voxel-based 3D Object Detection 迈向高性能基于体素的3D目标检测 论文网址&#xff1a;Voxel R-CNN 论文代码&#xff1a;Voxel R-CNN 简读论文 该论文提出了 Voxel R-CNN&#xff0c;这是一种基于体素的高性能 3D 对象…

Go 语言gin框架的web

节省时间与精力&#xff0c;更高效地打造稳定可靠的Web项目&#xff1a;基于Go语言和Gin框架的完善Web项目骨架。无需从零开始&#xff0c;直接利用这个骨架&#xff0c;快速搭建一个功能齐全、性能优异的Web应用。充分发挥Go语言和Gin框架的优势&#xff0c;轻松处理高并发、大…

Qt实现卡牌对对碰游戏

效果 闲来无事&#xff0c;实现一个对对碰游戏&#xff0c;卡牌样式是火影动漫。 先上效果&#xff1a; 卡牌对对碰_火影主题 玩法 启动游戏&#xff0c;进入第一关卡&#xff0c;所有卡牌都为未翻开状态&#xff0c;即背面朝上&#xff1b;点击卡牌&#xff0c;则将卡牌翻开…

jenkins工具系列 —— 删除Jenkins JOB后清理workspace

文章目录 问题现象分析解决思路脚本实现问题现象分析 Jenkins使用过程中,占用空间最大的两个位置: 1 、workspace: 工作空间,可以随便删除,删除后再次构建时间可能会比较长,因为要重新获取一些资源。 2 、job: 存放的是项目的配置、构建结果、日志等。不建议手动删除,…

双亲委派模式

双亲委派模型 双亲委派的工作过程 一个类加载器收到类加载的请求时&#xff0c;它不会马上加载该类&#xff0c;而是把这个请求委托给父加载器去完成&#xff0c;每一个层次的类加载器都是如此&#xff0c;因此所有的类加载请求都必须先通过启动类加载器尝试加载&#xff0c;只…

【快报】正在把教学视频搬运到B站和油管

hello 大家好&#xff0c;我是老戴。 熟悉我的同学知道&#xff0c;我从14年开始录制GIS相关的教学视频&#xff0c;之前是放到优酷上给大家下载&#xff0c;后期发现很多人把视频弄下来淘宝上卖&#xff0c;然后我就把视频整体放到了我自己的网站上。 随着视频录制的数量越来…

HBuilderX实现安卓真机调试

1. 简介 HBuilderX 简称 HX&#xff0c;HBuilder&#xff0c;H 是 HTML 的缩写&#xff0c;Builder 是建设者。是为前端开发者服务的通用 IDE&#xff0c;或者称为编辑器。与 vscode、sublime、webstorm 类似。 它可以开发普通 web 项目&#xff0c;也可以开发 DCloud 出品的 u…

nodejs+vue+python+php基于微信小程序的在线学习平台设计与实现-计算机毕业设计

困扰管理层的许多问题当中,在线学习也是不敢忽视的一块。但是管理好在线学习又面临很多麻烦需要解决,例如&#xff1a;如何在工作琐碎,记录繁多的情况下将在线学习的当前情况反应给课程问题管理员决策,等等。 流,开发一个在线学习平台小程序一方面的可能会更合乎时宜,另一方面来…

Java IDEA设置环境变量 以及代码获取

IDEA 设置环境变量 1.进入如图设置&#xff0c;一般的 java 程序和 spring Boot &#xff0c;还是tomcat 都可以从这里进入 2.可以在如下地方手动添加 3. tomcat 类 4.spring boot 类 代码获取指定值 假定我设置如下 代码则如下获取&#xff08;类均为JDK 自带类&…

代理模式代理模式

目录 1、使用场景 2、静态代理 3、动态代理 JDK动态代理 CGlib 动态代理实现 1、使用场景 使用代理模式主要有两个目的&#xff1a;一是保护目标对象&#xff0c;二是增强目标对象。 2、静态代理 NO.1 抽象接口&#xff1a;定义视频播放器接口Player public interface P…

http1,https,http2,http3总结

1.HTTP 当我们浏览网页时&#xff0c;地址栏中使用最多的多是https://开头的url&#xff0c;它与我们所学的http协议有什么区别&#xff1f; http协议又叫超文本传输协议&#xff0c;它是应用层中使用最多的协议&#xff0c; http与我们常说的socket有什么区别吗&#xff1f; …

怎么在电脑桌面上添加待办事项?

在电脑桌面上选择一款待办事项工具&#xff0c;可以高效率地督促各项任务的按时完成&#xff0c;大大地提高工作的效率&#xff0c;支持在电脑上安装待办事项的工具类型是比较多的&#xff0c;为更好的辅助日常办公&#xff0c;建议大家可以选择高效率辅助办公的电脑便签工具&a…

网络协议--TCP的未来和性能

24.1 引言 TCP已经在从1200 b/s的拨号SLIP链路到以太数据链路上运行了许多年。在80年代和90年代初期&#xff0c;以太网是运行TCP/IP最主要的数据链路方式。虽然TCP在比以太网速率高的环境&#xff08;如T2电话线、FDDI及千兆比网络&#xff09;中也能够正确运行&#xff0c;但…

高并发和存储之间的关系是什么?

文章目录 &#x1f50a;博主介绍&#x1f916;博主的简介&#x1f4e5;博主的目标 &#x1f964;本文内容&#x1f34a; 一、高并发对存储的压力&#x1f34a; 二、存储的性能和可扩展性 &#x1f4e2;总结 &#x1f50a;博主介绍 &#x1f4d5;我是廖志伟&#xff0c;一名Java…

[Unity+智谱AI开放平台]调用ChatGLM Tuobo模型驱动AI小姐姐数字人

1.简述 本篇文章主要介绍一下&#xff0c;在Unity端&#xff0c;集成智谱AI开放平台提供的chatglm模型api&#xff0c;实现AI聊天互动相关的功能。从智谱AI官方站点上看到&#xff0c;提供有chatglm turbo的公共模型服务&#xff0c;能够实现32K超长上下文&#xff0c;应用到我…

Spring Security 6.1.x 系列(3)—— 基于过滤器的基础原理(二)

四、SecurityFilterChain 在Serlvet中&#xff0c;一组Security Filter组成SecurityFilterChain&#xff0c;SecurityFilterChain的概念就比较好理解&#xff0c;是Spring Security 提供的过滤器链&#xff0c;用于管理本身所有的过滤器&#xff0c;在上面的流程图中已有说明。…

【嵌入式】Linux C编程——C要注意的东西

1、语法分析中的“贪心法”&#xff1a; 编译器将程序分解成符号的方法是&#xff0c;从左到右一个字符一个字符地读入&#xff0c;如果该字符可能组成一个符号&#xff0c;那么再读入下一个字符&#xff0c;判断已经读入的两个字符组成的字符串是否可能是一个符号的组成部分&…

Lvs+Nginx+NDS

什么是&#xff1f;为什么&#xff1f;需要负载均衡 一个网站在创建初期&#xff0c;一般来说都是只有一台服务器对用户提供服务 ​ 从图里可以看出&#xff0c;用户经过互联网直接连接了后端服务器&#xff0c;如果这台服务器什么时候突然 GG 了&#xff0c;用户将无法访问这…

Python构造代理IP池提高访问量

目录 前言 一、代理IP是什么 二、代理IP池是什么 三、如何构建代理 IP 池 1. 从网上获取代理 IP 地址 2. 对 IP 地址进行筛选 3. 使用筛选出来的 IP 地址进行数据的爬取 四、总结 前言 爬虫程序是批量获取互联网上的信息的重要工具&#xff0c;在访问目标网站时需要频…

QT实现用本地资源管理器来打开文件夹

QString path"文件夹路径";QDesktopServices::openUrl(QUrl("file:"path, QUrl::TolerantMode)); 在windows中QT编程&#xff0c;使用资源管理器来打开指定本地文件夹的方法&#xff1a; 第一种&#xff1a;使用Qprocess命令&#xff08;相当于在cmd命令管…