详解SpringCloud微服务技术栈:一文速通RabbitMQ,入门到实践

👨‍🎓作者简介:一位大四、研0学生,正在努力准备大四暑假的实习
🌌上期文章:详解SpringCloud微服务技术栈:DockerCompose部署微服务集群
📚订阅专栏:微服务技术全家桶
希望文章对你们有所帮助

RabbitMQ的使用还是很广泛的,主要是用在异步通讯的过程中的消息中间件,而在之前我学习Redis的时候,已经分别通过阻塞队列和Redis的某种数据结构实现了异步通信,可以看我的这两篇总结文章:
Redis:原理速成+项目实战——Redis实战9(秒杀优化)
Redis:原理速成+项目实战——Redis实战10(Redis消息队列实现异步秒杀)

同步通讯与异步通讯的原理、优缺点就不在这里讲解了,之前提到过,做异步通讯的最主流的还得是RabbitMQ,所以速成一波。

RabbitMQ入门到实践

  • MQ常见技术介绍
  • RabbitMQ快速入门
    • 介绍和安装(基于Centos7)
    • 消息模型介绍
    • 简单队列模型
  • SpringAMQP
    • 基本介绍
    • 入门案例
      • 消息发送
      • 消息接收
    • Work Queue 工作队列模型
    • 发布订阅模型
      • FanoutExchange
      • DirectExchange
      • TopicExchange
    • 消息转换器

MQ常见技术介绍

MQ(MessageQueue),即存放消息的队列,也就是事件驱动架构中的Broker。

RabbitMQActiveMQRocketMQKafka
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

RabbitMQ看起来最劣势的地方是单机吞吐量,但是其吞吐量也足够满足大多数企业的需求了。国内用的比较多的是Kafka和RabbitMQ,前者适合拥有海量数据,且对信息安全没有那么高要求的情景。

RabbitMQ快速入门

介绍和安装(基于Centos7)

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:
RabbitMQ官网

安装步骤:

1、下载镜像

方式一:在线拉取

docker pull rabbitmq:3-management

方式二:本地下载镜像包,上传到虚拟机后,使用命令进行加载:
镜像包安装:

链接:https://pan.baidu.com/s/1L-Kzd8PWMYaBwGQPwI1z9g?pwd=mjt5
提取码:mjt5

加载命令:

docker load -i mq.tar

2、安装MQ
执行下面命令来运行MQ容器:

docker run \-e RABBITMQ_DEFAULT_USER=itcast \-e RABBITMQ_DEFAULT_PASS=123321 \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management

其中,15672是管理平台的端口,5672是做消息通信的端口。
现在就可以直接访问RabbitMQ后台管理界面并登录:
在这里插入图片描述
在这里插入图片描述
RabbitMQ的结构:
在这里插入图片描述
发送者将消息发送到交换机exchange,然后再发到队列,消费者从队列中获取消息并处理。每个RabbitMQ的用户都有一个自己的VirtualHost,且互相隔离。

RabbitMQ的几个概念:

channel:操作MQ的工具
exchange:路由消息到队列中
queue:缓存消息
virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组

消息模型介绍

MQ的官方文档给了5种MQ的Demo,对应了几种不同的用法:

1、基本消息队列(BasicQueue)
2、工作消息队列(WorkQueue)
3、发布订阅(Publish、Subscribe),又根据交换机类型不同分为三种:
(1)Fanout Exchange:广播
(2)Direct Exchange:路由
(3)Topic Exchange:主题

官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:

publisher:消息发布者,将消息发送到队列queue
queue:消息队列,负责接受并缓存消息
consumer:订阅队列,处理队列中的消息

而并没有交换机。

简单队列模型

虽是简单队列模型,但要用的API太多了,写起来复杂,直接导入下面的工程跑一下publisher测试类再跑一下consumer测试类,自行调试并在RabbitMQ的后台界面查看相关信息(admin、connection、channel等)。

链接:https://pan.baidu.com/s/1crv6sUmKM44Crj8u–J4GA?pwd=smjn
提取码:smjn

基本消息队列的消息发送流程:

1、建立connection
2、创建channel
3、利用channel声明队列
4、利用channel向队列发送消息

基本消息队列的消息接收流程:

1、建立connection
2、创建channel
3、利用channel声明低劣
4、定义consumer的消费行为handleDilivery()
5、利用channel将消费者与队列绑定

SpringAMQP

SpringAMQP可以大大的简化消息发送与接收的代码。在这里进行SpringAMQP的介绍,并且利用它来实现RabbitMQ中的五种消息队列模型。

基本介绍

AMQP即为Advanced Message Queuing Protocol(先进消息队列协议),是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
SpringAMQP即为AMQP的一种实现,基于AMQP协议的定义的一套API规范,提供了模板来发送和接收消息。包含2部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

入门案例

利用SpringAMQP实现Rabbit中的入门案例——HelloWorld中的基础消息队列功能。
先操作下面的流程:

1、在父工程中引入spring-amqp的依赖(发送和接收都要用到)

	<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2、在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
3、在consumer服务中编写消费逻辑,绑定simple.queue这个队列

消息发送

1、在publisher服务中编写application.yml,添加mq连接信息:

spring:rabbitmq:host: 192.168.177.130 # RabbitMQ的ip地址port: 5672 # RabbitMQ的端口username: itcastpassword: 123321virtual-host: / 

2、在publisher服务中新建一个测试类,编写测试方法,利用RabbitTemplate发送消息到simple.queue这个队列(从之前的测试中创建出来的,没有的话去自行运行创建出来即可):

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Resourceprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue(){String queueName = "simple.queue";String message = "hello, spring amqp";rabbitTemplate.convertAndSend(queueName, message);}
}

在这里插入图片描述
在这里插入图片描述

消息接收

在consumer中编写消费逻辑,监听simple.queue:
1、在consumer服务中国编写application.yml,添加mq连接信息:

spring:rabbitmq:host: 192.168.177.130 # RabbitMQ的ip地址port: 5672 # RabbitMQ的端口username: itcastpassword: 123321virtual-host: / 

2、在consumer服务中新建一个类,编写消费逻辑:

@Component // 注册成一个bean
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue") //监听的队列名public void listenSimpleQueue(String msg){System.out.println("消费者接受到simple.queue的消息:【" + msg + "】");}
}

这个消费行为应该要自动的让其进行,所以将其交给Spring托管,只需要执行Spring的启动类函数Application即可自动实现监听。
在这里插入图片描述
在这里插入图片描述

Work Queue 工作队列模型

在这里插入图片描述
如果消息很多,显然consumer1与consumer2要一起执行,这是一种合作关系。
如果publisher发送消息的频率很高,而一个consumer无法处理完,这时候就需要其它consumer帮助,即为WorkQueue(工作队列)模型。
接下来要模拟WorkQueue,实现一个队列绑定多个消费者。

实现的基本思路如下:
1、在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue

	@Testpublic void testSendMessage2WorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello, message__";for (int i = 0; i <= 50; i++) {rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}}

2、在consumer服务中定义两个消息监听者,都监听simple.queue队列

	@RabbitListener(queues = "simple.queue") //监听的队列名public void listenWorkQueue(String msg) throws InterruptedException {System.out.println("消费者1接受到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20); //每秒消费50条}@RabbitListener(queues = "simple.queue") //监听的队列名public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2接受到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200); //每秒消费5条}

这样理论上消费者是可以自己处理完消息的。然而启动进行测试可以发现,消费者1和消费者2都执行了25条,且消费者1执行完了,消费者2还在那低效率执行,直到结束,导致1s发送的50条消息用了5s才执行完毕:
在这里插入图片描述
造成这个问题的原因是因为RabbitMQ的消息预取机制,它给两个消费者都平均分配了消息,但是consumer2相对没那样的能力处理那么多消息,所以应该要让consumer2少拿一点任务。

解决方式是在application.yml中设置preFetch,可以控制预取消息的上限:
在这里插入图片描述

发布订阅模型

之前的模型,只要消息被其中之一的消费者消费了,这个消息就会消失。而发布订阅模型与之前案例的区别就是允许将同一消息发送给多个消费者,实现的方式是加入了交换机exchange。结构如下:
在这里插入图片描述
当消息被交换机安排到了多个队列去,自然就可以实现该消息被多个消费者给消费。
常见exchange类型包括:
(1)Fanout:广播
(2)Direct:路由
(3)Topic:话题

FanoutExchange

FanoutExchange会将接收到的消息路由到每一个跟其绑定的queue,绑定可以由SpringAMQP提供的API去声明队列和交换机并且实现绑定。
在这里插入图片描述
演示流程:
1、在consumer服务中,利用代码声明队列、交换机,并将两者绑定

@Configuration
public class FanoutConfig {//声明交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}//声明队列@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}//将队列1绑定到交换机@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Queue fanoutQueue2() {return new Queue("fanout.queue2");}@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

执行主程序,Spring将会自动读取配置并执行:
在这里插入图片描述
在这里插入图片描述

2、在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2:

	@RabbitListener(queues = "fanout.queue1") //监听的队列名public void listenFanoutQueue1(String msg){System.out.println("消费者接受到fanout.queue1的消息:【" + msg + "】");}@RabbitListener(queues = "fanout.queue2") //监听的队列名public void listenFanoutQueue2(String msg){System.out.println("消费者接受到fanout.queue2的消息:【" + msg + "】");}

编写完毕后重启ConsumerApplication。

3、在publisher中编写测试方法,向itcast.fanout发送消息

	@Testpublic void testSendFanoutExchange(){//交换机名称String exchangeName = "itcast.fanout";//消息String message = "hello, every one!";//发送消息rabbitTemplate.convertAndSend(exchangeName, "", message);}

在这里插入图片描述

DirectExchange

DirectExchange会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes):

1、每一个Queue都和Exchange设置一个BindingKey
2、发布者发送消息时,指定消息的RoutingKey
3、Exchange将消息路由到BindingKey与消息RouingKey一致的队列

需要注意的是,队列之间是可以绑定相同的BindingKey的,一个队列可以绑定多个BindingKey,所以如果publisher发送消息会被多个queue读取,也就是广播,因此DirectExchange是可以模拟FanoutExchange的。

实现流程如下:
1、利用@RabbitListener声明Exchange、Queue、RoutingKey(不再用bean声明,要声明一堆的东西,太复杂了)
2、在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
上面2步一起在consumer的监听类中完成:

	@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}))public void listenDirectQueue1(String msg){System.out.println("消费者1接受到direct.queue1的消息" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}))public void listenDirectQueue2(String msg){System.out.println("消费者1接受到direct.queue2的消息" + msg + "】");}

在这里插入图片描述

3、在publisher中编写测试方法,向itcast. direct发送消息

	@Testpublic void testSendDirectExchange(){//交换机名称String exchangeName = "itcast.direct";//消息String message = "hello, blue!";//发送消息rabbitTemplate.convertAndSend(exchangeName, "blue", message);}

在这里插入图片描述

TopicExchange

TopicExchange与DirectExchange类似,区别在于其routingKey必须是多个单词的列表,分别以.分割。

Queue与Exchange指定BindingKey时可以使用通配符:

#:0个或多个单词
*:一个单词

实现流程:
1、利用@RabbitListener声明Exchange、Queue、RoutingKey
2、在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
上面两个步骤的代码:

	@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "china.#"))public void listenTopicQueue1(String msg){System.out.println("消费者1接受到topic.queue1的消息" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "#.news"))public void listenTopicQueue2(String msg){System.out.println("消费者1接受到topic.queue2的消息" + msg + "】");}

3、在publisher中编写测试方法,向itcast.topic发送消息

	@Testpublic void testSendTopicExchange(){//交换机名称String exchangeName = "itcast.topic";//消息String message = "南京航空航天大学第五轮学科评估有7个A类评分!";//发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);}

在这里插入图片描述

消息转换器

Ctrl+P查看参数,可以发现发出的消息的类型都是Object类型的。
测试发送Object类型消息。
说明SpringAMQP允许我们发任何对象,比如List,Map,但是RabbitMQ是以字节传输的,这说明了SpringAMQP会帮我们序列化为字节后发送。
Spring的对消息对象的处理是基于JDK的ObjectOutputStream完成序列化,这种序列化的性能差,而且安全有问题,数据长度也会很长。所以最好换一个序列化方式。
推荐用JSON方式序列化,步骤如下:
1、在publisher服务引入依赖:

	<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>

2、在publisher服务声明MessageConverter:

	@Beanpublic MessageConverter messageConverter(){return (MessageConverter) new Jackson2JsonMessageConverter();}

消息的接收也需要和上面一样:引入相同的依赖,然后在consumer服务中定义MessageConverter。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/640541.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ELK分离式日志(2)

目录 一.FilebeatELK 部署 开台服务器&#xff08;192.168.233.50&#xff09;下载fliebeat&#xff1a; 安装nginx后查看下日志文件&#xff1a; 设置 filebeat 的主配置文件: 关闭logstash&#xff0c;检测文件&#xff1a; 在50节点上启动filebeat&#xff1a; 访问页…

SpikingJelly笔记之IFLIF神经元

文章目录 前言一、脉冲神经元二、IF神经元1、神经元模型2、神经元仿真 三、LIF神经元1、神经元模型2、神经元仿真 总结 前言 记录整合发放(integrate-and-fire, IF)神经元与漏电整合发放(leaky integrate-and-fire, LIF)神经元模型&#xff0c;以及在SpikingJelly中的实现方法…

x-cmd pkg | yq - 命令行 YAML处理工具

目录 简介首次用户支持格式转换友好的显示和操作语法与 jq 类似竞品和相关作品进一步阅读 简介 yq (YAML Query) 是一个轻量级的 YAML、JSON、XML 处理器&#xff0c;主要用于查询和提取 YAML 数据。 本 yq 的包来自 mikefarah/yq 项目&#xff0c;语法类似于 jq 。相比 kisly…

java数据结构与算法刷题-----LeetCode645. 错误的集合(位运算解法需要重点掌握)

java数据结构与算法刷题目录&#xff08;剑指Offer、LeetCode、ACM&#xff09;-----主目录-----持续更新(进不去说明我没写完)&#xff1a;https://blog.csdn.net/grd_java/article/details/123063846 文章目录 法一&#xff1a;桶排序思想法二&#xff1a;位运算 法一&#x…

Python文件操作和异常处理:高效处理数据的利器

文章目录 一、引言1.1 文件操作和异常处理对于编程的重要性1.2 Python作为实现文件操作和异常处理的强大工具 二、为什么学习文件操作和异常处理2.1 处理各种文件格式&#xff1a;从文本到图像到音频等2.2 确保代码的鲁棒性&#xff1a;有效处理异常情况 三、文件读取和写入3.1…

什么是关键字?C语言的关键字有哪些?(C语言32个关键字详解)

目录 一、问题 二、解答 1、数据类型关键字&#xff08;12个&#xff09; (1) 声明和定义的区别 (2) 数据类型关键字 • char&#xff1a;声明字符型变量 1、声明字符变量 2、字符数组 3、ASCII码表示 4、指针与字符数组 5、多字节字符集&#xff08;如UTF-8&#xff…

【C++】初识类和对象

引言 在C语言中&#xff0c;我们用结构体来描述一个复杂的对象&#xff0c;这个对象可能包括许多的成员&#xff0c;如用结构体描述一个学生的成绩&#xff0c;或者描述一个日期等。 struct Date {int _year;int _month;int _day; }; 如上是一个描述日期的结构体定义&#x…

超融合基础架构理解

1 超融合基础架构 1.1 定义 超融合基础架构&#xff08;Hyper-converged infrastructure&#xff0c;缩写为HCI&#xff09;&#xff0c;是一种集成了存储设备及虚拟运算的信息基础架构框架。在这样的架构环境中&#xff0c;同一厂商的服务器与存储等硬件单元&#xff0c;搭配…

【网站项目】基于SSM的263货物进销管理系统

&#x1f64a;作者简介&#xff1a;多年一线开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以私聊博主获取。&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0c;帮助大学选题。赠送开题报告模板&#xff…

探索Docker-Compose:从基础到高级命令全解析

探索Docker-Compose&#xff1a;从基础到高级命令全解析 引言Docker-Compose基础1. Docker-Compose简介2. 安装Docker-Compose3. 编写第一个Compose文件4. 基本命令 Docker-Compose文件结构解析1. 理解docker-compose.yml2. 基本组件3. 文件示例4. 配置项解析 常用Docker-Compo…

洛谷P1319 压缩技术(C语言)

这样一道入门题目&#xff0c;本来可以用for循环直接操作&#xff0c;但作者异想天开(xian de dan teng)地把所有数据登记在一个数组里面&#xff0c;然后再统一按格式输出。也就是定义一个数组Map&#xff0c;大小为n成n&#xff0c;然后按照输入数据&#xff0c;把Map中每一个…

【50.2K⭐】Tabby:一款强大、灵活且跨平台的免费终端应用程序

【50.2K⭐】Tabby&#xff1a;一款强大、灵活且跨平台的免费终端应用程序 在快节奏的现代生活中&#xff0c;我们总是在寻找提高工作效率的方法。如果你是一位开发人员&#xff0c;或者是一个对技术充满好奇心的电脑爱好者&#xff0c;我们经常需要在 Windows 上进行远程操作与…

加密机授权报错如何排查?进入加密机后台的方式介绍

我们在此前的文章中介绍过不少TSINGSEE青犀视频安防监控视频平台关于加密机授权操作及相关疑问解答&#xff0c;感兴趣的用户可以翻阅往期的文章进行了解。由于新用户咨询该方面的问题较多&#xff0c;今天我们再来介绍一下用户在使用过程中遇到的问题。 1、如何进入加密机后台…

Vue-33、Vue中为什么使用render函数

1、main.js //该文件是整个项目的入口文件 //引入Vue import Vue from vue //引入APP组件&#xff0c;他是所有组件的父组件 import App from ./App.vue //关闭Vue是生产提示 Vue.config.productionTip false; //创建Vue实例对象---vm new Vue({render: h > h(App), }).$m…

笔试面试题——二叉树进阶(二)

&#x1f4d8;北尘_&#xff1a;个人主页 &#x1f30e;个人专栏:《Linux操作系统》《经典算法试题 》《C》 《数据结构与算法》 ☀️走在路上&#xff0c;不忘来时的初心 文章目录 一、二叉搜索树与双向链表1、题目讲解2、思路讲解递归展开图3、代码实现 二、从前序遍历和中序…

安装向量数据库milvus可视化工具attu

使用docker安装的命令和简单就一个命令&#xff1a; docker run -p 8000:3000 -e MILVUS_URL{milvus server IP}:19530 zilliz/attu:v2.3.5sunyuhuasunyuhua-HKF-WXX:~/dockercom/milvus$ docker run -p 8000:3000 -e MILVUS_URL127.0.0.1:19530 zilliz/attu:latest yarn run…

Xcode查看APP文件目录

一、连接真机到MAC电脑上 二、打开Devices 点击window -> Devices and Simulatores 三、选中设备、选择app 四、选择下载内容 五、查看文件内容 得到的文件 右键显示包内容&#xff0c;获得APP内数据 六、分发证书无法下载 使用分发的证书无法下载文件内容&#xf…

k8s的包管理工具helm

Helm是什么? 之前的这篇文章介绍了一开始接触k8s的时候接触到的几个命令工具 kubectl&kubelet&rancher&helm&kubeadm这几个命令行工具是什么关系&#xff1f;-CSDN博客 Helm 是一个用于管理和部署 Kubernetes 应用程序的包管理工具。它允许用户定义、安装和…

阿里云优惠券领取入口、使用方法和限制条件,2024最新

阿里云优惠代金券领取入口&#xff0c;阿里云服务器优惠代金券、域名代金券&#xff0c;在领券中心可以领取当前最新可用的满减代金券&#xff0c;阿里云百科aliyunbaike.com分享阿里云服务器代金券、领券中心、域名代金券领取、代金券查询及使用方法&#xff1a; 阿里云优惠券…

如何在Mac上安装PHP环境

前置环境&#xff1a;HomeBrew # Homebrew 是 Mac 上最好的包管理器之一&#xff0c;可以用于安装各种开源软件。从 Terminal&#xff08;终端&#xff09;执行以下命令安装 Homebrew&#xff1a; /usr/bin/ruby -e $(curl -fsSL https://raw.githubusercontent.com/Homebrew/i…