服务器的异步通信——RabbitMQ

目录

一、同步通信 VS 异步通信

二、MQ——消息队列

RabbitMQ 

RabbitMQ安装 

RabbitMQ的整体架构

常见消息模型 

 基本消息队列(BasicQueue)

工作消息队列(WorkQueue)

 发布、订阅(Publish、Subscribe)

 Fanout Exchange

Direct Exchange 

Topic Exchange 

SpringAMQP-消息转换器 


一、同步通信 VS 异步通信

同步通信:双方在同一个时钟信号的控制下,进行数据的接收和发送,来一个时钟,发送端发送,接收端接收,他们彼此之间的工作状态是一致的,例如直播、打电话。

优点:

  • 时效性强,能够立即得到结果

缺点:

  • 耦合性较高:每次加入新的需求,都需要修改原有代码
  • 性能下降:调用者需要等待服务提供者响应,若调用链过长则响应时间等于每次调用时间之和
  • 资源利用率低:调用链中的每个服务在等待响应的过程中,不能释放请求占用的资源,高并发的情况下会造成资源的极度浪费
  • 级联失败:如果服务提供者出现问题,所有的调用方也会跟着出问题

适用场景:业务要求时效性高

异步通信:异步通信在发送字符时,所发送的字符之间的时间间隔可以是任意的。例如微信聊天。

在异步调用过程常见的实现就是事件驱动模式,系统中发生的事件会触发相应的事件处理器或监听器
,从而实现特定的业务逻辑或功能。

例如在如下的支付场景中,当有请求发送给支付服务时,支付服务就会通知Broker,接着后续的订阅事件就会接收到请求,开始同时处理业务,但是支付服务不用等到后续订阅事件完成后再返回,而是将请求通知给Broker之后支付服务就会返回结果。

优点:

  • 服务解耦
  • 性能提升,吞吐量提高
  • 服务之间没有强依赖,不用担心级联失败问题(故障隔离)
  • 流量削峰

缺点:

  • 依赖于Broker的可靠性、安全性和吞吐能力
  • 结构复杂后,业务没有了明显的流水线,难以追踪管理

适用场景:对于并发和吞吐量的要求高,时效性的要求低

二、MQ——消息队列

MQ(消息队列):存放消息的队列,也是事件驱动架构的Broker。

常见的消息队列实现对比:

RabbitMQ 

RabbitMQ是基于Erlang语言开发的消息通信中间件,RabbitMQ的性能以及可用性较好,国内应用较为广泛,所以对RabbitMQ进行重点学习。

RabbitMQ的官网地址:https://www.rabbitmq.com

RabbitMQ安装 

可以根据自己的需求在RabbitMQ的官网进行查看:下载和安装 RabbitMQ — 兔子MQ

RabbitMQ的整体架构

​ 首先,Publisher会把消息发送给exchange(交换机),exchange负责路由再把消息投递到queue(队列),queue负责暂存消息,Consumer会从队列中获取消息并处理消息。

RabbitMQ中的几个概念:

channel :操作 MQ 的工具
exchange :路由消息到队列中
queue :缓存消息
virtual host :虚拟主机,是对 queue exchange 等资源的逻辑分组

常见消息模型 

RabbitMQ的官方文档中给出了5个MQ的Demo实例,可以分为如下:

  • 基本消息队列(BasicQueue)
  • 工作消息队列(WorkQueue)
  • 发布订阅(Publish、Subscribe),又根据交换机类型不同分为三种:

                Fanout Exchange:广播

                Direct Exchange:路由

                Topic Exchange:主题

 基本消息队列(BasicQueue)

官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:

  • publisher:消息发布者,将消息发送到队列queue
  • queue:消息队列,负责接受并缓存消息
  • consumer:订阅队列,处理队列中的消息

 
在RabbitMQ中需要了解的端口:

在使用端口时,需要在云服务器上开放所用的端口 

基本消息队列的消息发送流程:

  1. 建立Connection
  2. 创建Channel
  3. 利用Channel声明队列
  4. 利用Channel向队列中发送消息

代码实现:

public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("x.x.x.x");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("xx");factory.setPassword("xx");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:【" + message + "】");// 5.关闭通道和连接channel.close();connection.close();}
}

运行结果:

基本消息队列的消息接收流程: 

  1. 建立Connection
  2. 创建Channel
  3. 利用Channel声明队列
  4. 定义Consumer的消费行为handleDelivery()
  5. 利用Channel将消费者与队列进行绑定

代码实现:

public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("x.x.x.x");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("xx");factory.setPassword("xx");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");}
}

运行结果:

上述实现方式相对比较复杂,就引入了SpringAMQP来实现。

AMQP:是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。 

SpringAMQP:SpringAMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

SpringAMQP的官方地址

那么利用SpringAMQP来实现基本消息队列的流程如下:

  1. 在父工程中引入spring-amqp的依赖
  2. 在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
  3. 在consumer服务中编写消费逻辑,绑定simple.queue这个队列

具体实现:

1、在父工程中引入spring-amqp的依赖:

        <!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2、在publisher中编写测试方法,向simple.queue发送消息:

在publisher服务的配置文件中添加mq的连接信息:

spring:rabbitmq:host:  # rabbitMQ的ip地址port: 5672 # 端口username: # 用户名password: # 密码virtual-host: # 虚拟主机

在publisher服务中新建一个测试类,编写测试方法:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue() {String queueName = "simple.queue";String message = "hello, spring amqp!";rabbitTemplate.convertAndSend(queueName, message);}
}

在RabbitMQ中的simple队列中查询信息:

3、在consumer服务中编写消费逻辑,监听simple.queue

在consumer服务的配置文件中添加mq连接信息:

spring:rabbitmq:host:  # rabbitMQ的ip地址port: 5672 # 端口username: # 用户名password: # 密码virtual-host: # 虚拟主机

在consumer服务中新建一个类,编写具体的消费逻辑:

@Component
public class SpringRabbitListener { @RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg) throws InterruptedException {System.out.println("消费者接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);}
}

运行启动类:

工作消息队列(WorkQueue)

下面场景中如果queue中有50条请求消息,但是consumer1只能处理40条,剩余的10条就可以由consumer进行处理,所以说工作消息队列可以提高消息的处理速度,避免队列消息堆积

模拟Workqueue,实现一个队列绑定多个消费者,基本实现思路如下:

  1. 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue中
  2. 在consumer服务中定义两个消息监听者,都监听simple.queue队列
  3. 消费者1每秒处理50条消息,消费者2每秒处理10条消息

代码实现:

在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue中

    public void testSendMessage2WorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello, message__";for (int i = 1; i <= 50; i++) {rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}}
在consumer服务中定义两个消息监听者,都监听simple.queue队列,设置消费者1每秒处理50条消息,消费者2每秒处理10条消息
    @RabbitListener(queues = "simple.queue")public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);}@RabbitListener(queues = "simple.queue")public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);}

修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限,确保消费者2取消息时只能取一条,提高效率(“能者多劳”):

spring:rabbitmq:listener:simple:prefetch: 1

运行结果:

 发布、订阅(Publish、Subscribe)

发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。

常见exchange类型包括:

  • Fanout:广播
  • Direct:路由
  • Topic:话题

exchange负责消息路由,而不是存储,路由失败则消息丢失 

 Fanout Exchange

 Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue中,如下:

基本实现思路如下:

  1. 在consumer中,利用代码声明队列、交换机,将二者进行绑定
  2. 在consumer中,编写两个消费方法,分别监听fanout.queue1和fanout.queue2
  3. 在publisher中编写测试方法,向fanout发送消息

代码实现:

在consumer中,利用代码声明队列、交换机,将二者进行绑定

@Configuration
public class FanoutConfig {// itcast.fanout@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}// fanout.queue1@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}// 绑定队列1到交换机@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}// fanout.queue2@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}// 绑定队列2到交换机@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

在consumer中,编写两个消费方法,分别监听fanout.queue1和fanout.queue2

    @RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg) {System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg) {System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】");}

在publisher中编写测试方法,向fanout发送消息

    @Testpublic void testSendFanoutExchange() {// 交换机名称String exchangeName = "itcast.fanout";// 消息String message = "hello, every one!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "", message);}

运行结果:

Direct Exchange 

Direct Exchange会将接收到的消息根据规则路由到指定的Queue,因此被称为路由模式

  • l每一个Queue都与Exchange设置一个BindingKey
  • l发布者发送消息时,指定消息的RoutingKey
  • lExchange将消息路由到BindingKey与消息RoutingKey一致的队列

基本实现思路如下:

  1. 利用@RabbitListener声明ExchangeQueueRoutingKey
  2. consumer服务中,编写两个消费者方法,分别监听direct.queue1direct.queue2
  3. publisher中编写测试方法,向itcast. direct发送消息

代码实现:

在consumer服务中,编写两个消费者方法,分别监听direct.queue1direct.queue2,并利用@RabbitListener声明ExchangeQueueRoutingKey

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}))public void listenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}))public void listenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");}

在publisher服务发送消息到DirectExchange

    @Testpublic void testSendDirectExchange() {// 交换机名称String exchangeName = "itcast.direct";// 消息String message = "hello, red!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);}

运行结果:

Topic Exchange 

Topic Exchange与Direct Exchange类似,区别在于Topic Exchange的routingKey必须是多个单词的列表,并且以.分割

QueueExchange指定BindingKey时可以使用通配符:

#:代指0个或多个单词

*:代指一个单词

基本实现思路如下:

  1. 利用@RabbitListener声明ExchangeQueueRoutingKey
  2. 在consumer服务中,编写两个消费者方法,分别监听topic.queue1topic.queue2
  3. 在publisher中编写测试方法,向itcast. topic发送消息

代码实现:

利用@RabbitListener声明ExchangeQueueRoutingKey,在consumer服务中,编写两个消费者方法,分别监听topic.queue1topic.queue2

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "china.#"))public void listenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "#.news"))public void listenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");}

在publisher中编写测试方法,向itcast. topic发送消息

    @Testpublic void testSendTopicExchange() {// 交换机名称String exchangeName = "itcast.topic";// 消息String message = "今天天气不错,我的心情好极了!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);}

运行结果:

SpringAMQP-消息转换器 

SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDKObjectOutputStream完成序列化。

如果要修改只需要定义一个MessageConverter 类型的Bean即可。

推荐用JSON方式序列化,实现步骤如下:

在父工程中引入依赖

        <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>

在publisher和consumer服务中声明MessageConverter:

    @Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/642463.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS文本外观属性内容(知识点1)

知识引入 使用HTML可以对文本外观进行简单的控制&#xff0c;但是效果并不理想&#xff0c;为此CSS提供了一系列的文本外观样式属性&#xff0c;具体如下。 color:文本颜色 color属性用于定义文本的颜色&#xff0c;其取值方式有以下三种。 &#xff08;1&#xff09;预定义…

爬取的数据可以入表吗?怎样入表?

合规是数据入表的前提。当前爬虫数据是非常敏感的&#xff0c;因为爬虫极容易造成两大不合规的问题&#xff1a;一是没有经过个人同意获取数据&#xff0c;二是爬取的数据里可能含有个人敏感信息也是一个问题。现在法律对于这部分非常严苛&#xff0c;如果企业里有50条未获得授…

前端基础(三十八):iframe通信、浏览器跨窗口通信

iframe通信 - MessageChannel <!-- index.html --> <h3>MessageChannel</h3> <input id"input" type"text" oninput"handleInput(this.value)" /> <hr /> <iframe src"./demo.html"></iframe&…

HCIA——25FTP 的工作原理、功能、TFTP、控制连接、数据连接的选择、解答

学习目标&#xff1a; 计算机网络 1.掌握计算机网络的基本概念、基本原理和基本方法。 2.掌握计算机网络的体系结构和典型网络协议&#xff0c;了解典型网络设备的组成和特点&#xff0c;理解典型网络设备的工作原理。 3.能够运用计算机网络的基本概念、基本原理和基本方法进行…

第2章-OSI参考模型与TCP/IP模型

目录 1. 引入 2. OSI参考模型 2.1. 物理层 2.2. 数据链路层 2.3. 网络层 2.4. 传输层 2.5. 会话层 2.6. 表示层 2.7. 应用层 3. 数据的封装与解封装 4. TCP/IP模型 4.1. 背景引入 4.2. TCP/IP模型&#xff08;4层&#xff09; 4.3. 拓展 1. 引入 1&#xff09;产…

char const char* 类型的实参与LPCWSTR 类型的形参类型不兼容

点击项目->项目属性 在高级中点击字符集->选择使用多字节字符集 ———————————————————————— 如果还是显示报错&#xff0c;关闭项目&#xff0c;重新进一下项目&#xff0c; 我的当时就是找了好久&#xff0c;都是以上方法&#xff0c;然后重新…

业务连续性演练在软件中的重要性

随着现代社会对信息技术的依赖程度不断增加&#xff0c;软件系统的业务连续性变得至关重要。业务连续性演练成为保障软件系统在各种不可预测情况下能够持续运行的关键措施。本文将探讨业务连续性演练在软件中的重要性以及它为组织提供的价值。 1. 灾难恢复能力的验证 业务连续性…

5G+物联网:连接万物,重塑智慧社区,开启未来生活新纪元,助力智慧社区的革新与发展

一、5G与物联网&#xff1a;技术概述与基础 随着科技的飞速发展&#xff0c;第五代移动通信技术&#xff08;5G&#xff09;和物联网&#xff08;IoT&#xff09;已经成为当今社会的热门话题。这两项技术作为现代信息社会的核心基础设施&#xff0c;正深刻地改变着人们的生活和…

Open3D 与 Point Cloud 处理

点云基础3D数据结构点云采集方法点云处理框架点云操作 Open3D基础操作 点云基础 3D数据结构 点云&#xff08;Point Cloud&#xff09;&#xff1a; 点云是由一组离散的点构成的三维数据集合&#xff0c;每个点都包含了坐标信息 (x, y, z) 、颜色 (RGB)、类别 (cls)、强度值等…

Windows如何开启telnet

打开控制面板-----点击程序 启用windows功能 勾线telnet

pytest+allure 生成中文报告

背景 已安装pytestallure&#xff0c;生成的报告是英文 allure生成中文报告 参考&#xff1a;allure report 报告中文化及其它优化 方法1&#xff1a;直接在报告中切换中文 方法2&#xff1a;依赖系统中文语言 创建一个setting.js 文件在index.html 同级目录 // 尝试从 l…

两千字讲明白java中instanceof关键字的使用!

写在开头 在过往的内容中&#xff0c;我们讲了不少的Java关键字&#xff0c;比如final、static、this、super等等&#xff0c;Java中的关键字非常之多&#xff0c;下图是整理的关键字集合 而我们今天要学习的就是其中的instanceof关键字&#xff01; instanceof的定义 inst…

图像处理算法:白平衡、除法器、乘法器~笔记

参考&#xff1a; 基于FPGA的自动白平衡算法的实现 白平衡初探 (qq.com) FPGA自动白平衡实现步骤详解-CSDN博客 xilinx 除法ip核&#xff08;divider&#xff09; 不同模式结果和资源对比&#xff08;VHDL&ISE&#xff09;_ise除法器ip核-CSDN博客 数…

【BBuf的CUDA笔记】十三,OpenAI Triton 入门笔记一

0x0. 前言 2023年很多mlsys工作都是基于Triton来完成或者提供了Triton实现版本&#xff0c;比如现在令人熟知的FlashAttention&#xff0c;大模型推理框架lightllm&#xff0c;diffusion第三方加速库stable-fast等灯&#xff0c;以及很多mlsys的paper也开始使用Triton来实现比…

sqlmap使用教程(3)-探测注入漏洞

1、探测GET参数 以下为探测DVWA靶场low级别的sql注入&#xff0c;以下提交方式为GET&#xff0c;问号&#xff08;?&#xff09;将分隔URL和传输的数据&#xff0c;而参数之间以&相连。--auth-credadmin:password --auth-typebasic &#xff08;DVWA靶场需要登录&#xf…

C语言 小明喝饮料

题目&#xff1a;喝汽水&#xff0c;1瓶汽水1元&#xff0c;2个空瓶可以换汽水&#xff0c;给n元&#xff0c;可以喝多少汽水//理论问题&#xff0c;请勿模仿-^- #include <stdio.h> int main() {int n,ret,i;scanf("%d", &n);ret n;while (n>1){ret …

基于SpringBoot的教务管理系统设计与实现(源码+调试)

项目描述 临近学期结束&#xff0c;还是毕业设计&#xff0c;你还在做java程序网络编程&#xff0c;期末作业&#xff0c;老师的作业要求觉得大了吗?不知道毕业设计该怎么办?网页功能的数量是否太多?没有合适的类型或系统?等等。今天给大家介绍一篇基于SpringBoot的教务管…

QuestDB时序数据库快速入门

简介 QuestDB是一个开源的高性能时序数据库&#xff0c;专门用于处理时间序列相关的数据存储与查询&#xff1b; QuestDB使用列式存储模型。数据存储在表中&#xff0c;每列存储在其自己的文件和其自己的本机格式中。新数据被附加到每列的底部&#xff0c;以便能够按照与摄取…

别再局限于Android和iOS了尝试鸿蒙APP系统开发吧!

最近&#xff0c;多家互联网公司也发布了鸿蒙OS的App开发工程师的岗位&#xff0c;开启了抢人大战&#xff0c;有的企业开出了近百万的年薪招聘鸿蒙OS工程师&#xff0c;而华为甚至为鸿蒙OS资深架构师开出了100万元—160万元的年薪。 「纯血」鸿蒙开启&#xff0c;欲与 Andori…

WEBDYNPRO FPM 框架

框架搭建 1、FPM_OVP_COMPONENT 1 METHOD change_toolbar_btn .2 * enabled "ABAP_TRUE可用 ABAP_FALSE不可用3 * visibility "01不可见 02可见4 DATA: ls_btn TYPE if_fpm_ovp>ty_s_toolbar_button.5 CHECK wd_this->mo_cnr IS BOUND.6 7 TRY .8 …