前言:大家好,我是小威,24届毕业生,曾经在某央企公司实习,目前入职某税务公司。本篇文章将记录和分享RabbitMQ相关的知识点。
本篇文章记录的基础知识,适合在学Java的小白,也适合复习中,面试中的大佬🤩🤩。
如果文章有什么需要改进的地方还请大佬不吝赐教👏👏。
小威在此先感谢各位大佬啦~~🤞🤞
🏠个人主页:小威要向诸佬学习呀
🧑个人简介:大家好,我是小威,一个想要与大家共同进步的男人😉😉
目前状况🎉:24届毕业生,曾经在某央企公司实习,目前在某税务公司实习👏👏💕欢迎大家:这里是CSDN,我总结知识的地方,欢迎来到我的博客,我亲爱的大佬😘
以下正文开始
文章目录
- 🎉初始RabbitMQ
- 🎇RabbitMQ的优点
- 🎈应用解耦
- 🎈流量削峰
- 🎈异步处理
- 🎊MQ的种类
- 🧨RabbitMQ组件架构
- 🎍SpringAMQP
🎉初始RabbitMQ
我们接触过队列的时候还是在介绍AQS的时候,在AQS的内部,维护了一个基于FIFO(First Input First Output)的等待队列。对于今天记录的rabbitmq知识,也是一个先进先出的队列,只不过这个队列是存放消息的队列。
RabbitMQ(message queue)消息队列,是RabbitMQ公司基于Erlang开发语言来创建的,其支持多种协议,如AMQP协议,XMPP协议,SMTP协议,STOMP协议等等。
我们在项目中进行同步调用的时候,会遇到很多问题,比如,当服务的生产者出现某些问题,调用这些服务提供者的消费者也会出现问题导致项目不可用;而且同步调用会使得项目的耦合度比较高,当每次加入新的需求时,都需要对源代码进行改造;同步调用的性能也不是那么的好,因为同步调用的上一个步骤完成后,才会进行下一步,如果调用的时间比较长,则需要等待很长的时间。
而异步通信恰好可以解决这些问题。其中,RabbitMQ消息队列,就是异步通信中的一种。
🎇RabbitMQ的优点
上面我们了解到了同步调用的缺点,异步通信的优点,接着我们了解一下RabbitMQ的优点。
🎈应用解耦
以我们平时做的电商项目为例,在电商项目中,我们通常会有支付服务,订单服务,库存系统,物流服务等。比如用户在下完单后,如果耦合地调用订单服务,库存服务,其中如果有一个系统发生故障,都会导致用户下单操作失败。但是如果基于消息队列的方式异步调用时,如果物流服务发生故障,我们只需要花时间来处理物流服务即可,不会影响到用户的下单支付服务。当物流服务的bug处理完成后即可恢复正常。
🎈流量削峰
以我们的生活为例,通常在双十一,双十二时购物优惠力度会很大,在此期间用户订单会很多,如果不对订单流量进行控制,服务器很容易崩掉。因此我们可以将消息队列用来做缓冲,限制订单的个数,或者将某段时间内的订单分散成一段时间处理,一些用户在下完单后的一小段时间内收到下单成功的提醒,这样可以有效地控制极大流量的情况。
🎈异步处理
同步调用需要一步一步地去执行系统,总执行的时间是各子系统模块花销的时间之和,而异步处理可以使得子系统从对应的消息队列中消费自己对应的消息,在自己本地执行相对应的操作。因此异步处理可以极大提升系统的响应速度和吞吐量。
🎊MQ的种类
消息队列的种类有很多,比如为我们所熟知的RabbitMQ,RocketMQ,ActiveMQ,KafKa,接下来简单介绍一下吧。
RabbitMQ公司基于Erlang开发语言来创建的,其支持多种协议,如AMQP协议,XMPP协议,SMTP协议,STOMP协议等等。其可用性比非常高,单机吞吐量一般,消息延迟微秒级别,消息可靠性比较高。
ActiveMQ是有Apache研发的开源中间件,开发语言是用Java来开发的,支持多种编程语言,支持多种协议,如OpenWire协议,STOMP协议,RESY协议,AMQP协议,XMPP协议等,其可用性一般,单机吞吐量略差,消息延迟为毫秒级,消息可靠性一般。
RocketMQ是阿里巴巴公司研发的消息队列的一种,也是由Java语言开发的,RocketMQ的协议支持自定义,高吞吐量,可用性非常高,消息可靠性也比较高,时效性能延迟在毫秒级别。
KafKa是由Apache社区使用Scala和Java语言开发的消息队列,其支持自定义协议,可用性非常高,单机吞吐量非常高,多用于分布式架构,是由阿里开源的,之后交付给了Apache社区,消息可靠性一般。
RabbitMQ在各个方面都表现的挺不错,因此我们可以选择RabbitMQ来学习的。
🧨RabbitMQ组件架构
Publisher :消息的生产者。
Consumer :消息的消费者。
Broker:主要用于接收和分发消息,RabbitMQ Server 就是 Message Broker。
Virtual host:顾名思义,虚拟主机,类似于nacos中的 namespace(命名空间) 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等。vhost在连接的时候需要指定,默认的vhost是/。
Connection:生产者和消费者与 broker 之间建立的 TCP 连接。
Channel:中文意思为管道,用于数据双向流通。不管是发布消息、订阅队列还是接收消息,这些动作都是通过管道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,因此引入了管道的概念,以复用一条TCP连接。
Exchange:exchange为交换机,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Queue:消息队列,用来保存消息直到发送给消费者。它是存放消息的容器。一个消息可投入一个或多个队列消息,最终被送到这里等待 consumer 取走。
Binding:绑定关系,主要用于交换机和队列之间的关联,binding 中可以包含 routing key,通过路由键(Routing Key)将交换机和消息队列关联起来。
🎍SpringAMQP
我们先来看一下官方给出的SpringAMQP介绍:
Spring AMQP项目将Spring的核心概念应用于基于AMQP的消息传递解决方案的开发。它提供了一个“模板”作为发送和接收消息的高级抽象。它还为带有“监听器容器”的消息驱动的 POJO 提供支持。这些库有助于管理 AMQP 资源,同时促进依赖关系注入和声明性配置的使用。在所有这些情况下,您都会看到与 Spring 框架中的 JMS 支持的相似之处。
特征
用于异步处理入站消息的监听器容器
发送和接收消息的模板
RabbitAdmin,用于自动声明队列,交换和绑定
我们利用SpringAMQP来实现HelloWorld中的基础消息队列功能:
我们首先创建两个模块,publisher生产者模块和consumer消费者,通过在生产者中发送消息,消费者对消息进行处理。
由于publisher模块和consumer模块都需要SpringAMQP依赖,所以在父过程的pom文件中引入依赖:
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
接着在publisher模块的yaml中配置RabbitMQ的相关地址信息:
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 192.168.220.135port: 5672username: xiaoweipassword: 123456virtual-host: /
然后在publisher模块的测试类中编写代码:
package cn.itcast.mq.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
public class SpringAmqpTest {@Autowired //自动装配,和我们使用的RedisTemplate相似private RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue() {String queueName="simple.queue";String message = "hello, spring amqp!";rabbitTemplate.convertAndSend(queueName,message);}
}
直到此刻,生产者就可以发送消息了,启动项目,登录RabbitMQ官网可看到队列中有一条消息:
之后配置consumer模块的依赖和application.yaml,这个是和生产者一样的:
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 192.168.220.135port: 5672username: xiaoweipassword: 123456virtual-host: /
接着编写来接收生产者消息的类:
package cn.itcast.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component //加上注解,交给spring容器来管理
public class SpringRabbitListener {@RabbitListener(queues="simple.queue") //simple.queue是创建的队列名称public void listenSimpleQueue(String msg){ //由于生产者那里是string类型,因此这里一样类型System.out.println("消费者已经接收到simple.queue的消息:[" + msg + "]");}
}
启动consumer模块项目,运行,查看控制台可以看到消费者这里可以得到生产者那里发送的消息:
我们再次打开官网的界面,刷新后会发现,那条消息已经被消费了:
由于篇幅原因,本篇文章就先分享到这里了,后续会继续分享其他的知识,感谢大佬认真读完支持咯~
文章到这里就结束了,如果有什么疑问的地方请指出,诸佬们一起讨论🍻
希望能和诸佬们一起努力,今后进入到心仪的公司
再次感谢各位小伙伴儿们的支持🤞