SpringCloud学习路线(9)——服务异步通讯RabbitMQ

一、初见MQ

(一)什么是MQ?

MQ(MessageQueue),意思是消息队列,也就是事件驱动架构中的Broker。

(二)同步调用

1、概念: 同步调用是指,某一服务需要多个服务共同参与,但多个服务之间有一定的执行顺序,当每一个服务都需要等待前面一个服务完成才能继续执行。

2、存在的问题

  • 耦合度高: 新需求需要改动原代码
  • 性能下降: 调用者需要等待服务提供者相应,如果调用链过长则响应时间等于每次调用的时间之和。
  • 资源浪费: 调用链的每个服务在等待响应过程中,不会释放请求资源,高并发场景下会浪费系统资源。
  • 级联失败: 若服务提供者出现宕机,所有调用者都会因故障而导致整个服务集群故障。

(三)异步调用

1、实现模式: 异步调用常见实现的就是事件驱动模式。

2、事件驱动的优势

  • 服务解耦: 只需要将请求交付给事件管理器进行管理即可完成服务。
  • 性能提升: 与客户交互的服务短时间就能完成,并不需要等待后续服务完成。
  • 服务弱依赖: 其它服务宕机不影响服务集群的使用
  • 流量缓冲: 事件管理器通过任务队列的方式,使得订阅的服务按照自身速度进行执行。

3、事件驱动的缺点

  • 高度依赖Broker的可靠性、安全性、吞吐能力
  • 架构复杂时,业务没有明显的流程线,不便于跟踪管理

(四)MQ常见框架

RabbitMQ(中小企业)ActiveMQRocketMQ(大型企业)Kafka
公司/社区RabbitApacheAlibabaApache
开发语言ErlangJavaJavaJava
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般极高
消息延迟微妙级毫秒级毫秒级毫秒以内
消息可靠一般一般

二、使用MQ

(一)RabbitMQ概述

RqbbitMQ是基于Erlang语言开发的开源消息通讯中间件,官方地址:https://rabbitmq.com/

(二)安装MQ

docker pull rabbitmq:3-management

在这里插入图片描述

(三)运行RabbitMQ

#配置 MQ的用户名和密码,容器名和主机名,端口,镜像名 ,注意:15672端口是MQ的控制台访问端口,5672是对外暴露的消息通信端口
docker run -e RABBITMQ_DEFAULT_USER=xxx -e RABBITMQ_DEFAULT_PASS=xxxx --name mq --hostname mq1 -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management

在这里插入图片描述

访问MQ的控制台

在这里插入图片描述
(4)RabbitMQ的整体结构

在这里插入图片描述

(5)RabbitMQ中的几个概念

  • channel: 操作MQ的工具
  • exchange: 路由消息到队列
  • queue: 缓存消息
  • Virtual Host: 虚拟主机,是对queue,exchange等资源进行逻辑分组

(6)常见的MQ模型

  • 基本消息队列(BasicQueue): Publisher —1:1— Queue —1:1— Customer
  • 工作消息队列(WorkQueue): Publisher —1:1— Queue —1:n— Customer
  • 发布/订阅(Publish、Subscribe): 根据交换机类型又有三种模型
    • Fanout Exchange: 广播,Publisher—1:1—Exchange—1:n—Queue—1:1—Customer
    • Direct Exchange: 路由,Publisher—1:1—Exchange—1:n—Queue—1:1—Customer
    • Topic Exchange: 主题,
  • RPC
  • 发布者确认

第一种:基本消息队列的基本使用

包含三种角色:publisherqueueconsumer

  • publisher: 消费发布者,将消息发布到队列queue
  • queue: 消息队列,负责接受并缓存消息
  • consumer: 订阅队列,处理队列中的消息

收发消息的过程: 获取连接 》 建立通信通道 》 创建消息队列 》 收发消息 》 释放资源

1、publisher和consumer引入依赖

  <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId>
</dependency>

2、Publisher创建发送消息通道

@SpringBootTest
class PublisherApplicationTests {@Testvoid testSendMessage() throws IOException, TimeoutException {
//        1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();
//        2、设置连接参数connectionFactory.setHost("192.168.92.131");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("root");connectionFactory.setPassword("root");
//        3、建立连接Connection connection = connectionFactory.newConnection();
//        4、建立通信通道ChannelChannel channel = connection.createChannel();
//        5、创建队列String queueName = "simple.queue";channel.queueDeclare(queueName,false,false,false,null);
//        6、发送信息String message = "hello,rabbitmq!";channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));System.out.println("发送消息成功:【"+message+"】");
//        7、关闭通道和连接channel.close();connection.close();}
}

2、Consumer创建订阅通道

class ConsumerApplicationTests {public static void main(String[] args) throws IOException, TimeoutException {//        1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();//        2、设置连接参数connectionFactory.setHost("192.168.92.131");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("root");connectionFactory.setPassword("root");
//        3、建立连接Connection connection = connectionFactory.newConnection();
//        4、建立通信通道ChannelChannel channel = connection.createChannel();
//        5、创建队列String queueName = "simple.queue";channel.queueDeclare(queueName,false,false,false,null);
//        6、订阅消息channel.basicConsume(queueName,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                7、处理消息String message = new String(body);System.out.println("接收到消息:【"+message+"】");}});System.out.println("等待接收消息....");}
}

第二种:Work Queue 工作队列

与基本队列的区别在于,它能使用多个订阅队列进行高效的处理请求。(因为一个订阅队列的处理速度是有限的)

使用过程与基本队列几乎一致,只是开启了多个订阅队列。

在使用过程中我们会发现,多个订阅队列对任务的分配是平均的,这就是预取机制

我们需要的是快速处理的订阅队列获取更多的请求,慢速处理的订阅队列获取少量的请求,它如何实现呢?

通过修改配置文件,设置一个 preFetch 值。

spring:rabbitmq:host: 192.168.92.131 #IPport: 5672 #端口virtual-host: / #虚拟主机username: root #用户名password: root #密码listener:simple:prefetch: 1 # 每次取 1 个请求,处理完才能取下一个。

第三种:FanoutQueue 广播消息队列

SpringAMQP提供声明交换机、队列、绑定关系的API

主要使用的是Exchange.FanoutExchange类。

实现思路:
1、在consumer服务,声明队列,交换机,并将两者绑定。

@Configuration
public class FanoutConfig{//交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("com.fanout");}//队列@Beanpublic Queue fanoutQueue1(){return new Queue("com.queue1");}//绑定关系@Beanpublic Binding bindingQueue(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);}//...以相同方式声明第2个队列,并完成绑定}

2、在consumer服务,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

@Component
public class SpringRabbitListener {@RabbitListener(queues = "com.queue1")public void listenFanoutQueue1(String msg) throws InterruptedException {//...处理结果}@RabbitListener(queues = "com.queue2")public void listenFanoutQueue2(String msg) throws InterruptedException {//...处理结果}
}

3、在publisher编写测试方法,向交换机发送信息

@Test
public void sendFanoutExchange() {//1、交换机String exchangeName = "com.fanout";//2、消息String message = "Hello Fanout";//3、发送消息rabbitTemplate.covertAndSend(exchangeName, "", message);
}

第四种:路由信息队列

路由模式的流程: 即设置密钥的绑定关系,只有携带相应的密钥才能进入相应的队列

  • 每一个 QueueExchange 设置一个 BindingKey
  • 发布者发送消息时,需要指定消息的 RoutingKey
  • Exchange根据消息路由到 BindingKeyRoutingKey 一致的队列

实现思路:
1、利用 @RabbitListener 声明Exchange、Queue、RoutingKey

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "com.exchange", type = ExcahngeTypes.DIRECT), key = {"red","blue"}))
public void listenRoutingQueue1(String msg) throws InterruptedException {//...处理结果
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "com.exchange", type = ExcahngeTypes.DIRECT), key = {"red","green"}))
public void listenRoutingQueue2(String msg) throws InterruptedException {//...处理结果

2、发送消息实现

//指定队列处理
@Test
public void sendRoutingExchange1(){//交换机,消息String exchangeName = "com.exchange";String message = "Hello,RoutingMQ";//发送消息rabbitTemplate.covertAndSend(exchangeName, "blue", message);
}//多队列处理
@Test
public void sendRoutingExchange2(){//交换机,消息String exchangeName = "com.exchange";String message = "Hello,RoutingMQ";//发送消息rabbitTemplate.covertAndSend(exchangeName, "red", message);
}

第五种:主题信息队列(通配key)

TopicExchange 与 DirectExchange 的区别: routingkey必须是多个单词的列表,并且以,分割。并且Queue与Exchange指定的BindingKey时可使用通配符:

  • **#:**代指 0 / n 个单词
  • *: 代指一个单词

实现思路:
1、通过 @RabbitListener 声明Exchange、Queue、RoutingKey

@RabbitListener(bingdings = @QueueBinding(exchange = @Exchange(name = "com.exchange", type = ExchangeTypes.TOPIC), queue = @Queue(name = "com.queue1"), key = {"china.#"}))
public void listenTopicQueue1(String msg) {//处理代码....
}@RabbitListener(bingdings = @QueueBinding(exchange = @Exchange(name = "com.exchange", type = ExchangeTypes.TOPIC), queue = @Queue(name = "com.queue2"), key = {"#.news"}))
public void listenTopicQueue2(String msg) {//处理代码....
}

2、在publisher服务中,向交换机发送消息

@Test
public void sendTopicMessage(){//交换机,消息String exchangeName = "com.exchange";String message = "Hello,Topic";rabbitTemplate.convertAndSend(exchangeName,"china.call",message);
}

四、SpringAMQP

(一)概念

  • AMQP: Advanced Message Queuing Protocol 传递消息队列协议,是用于在应用程序或之间传递业务消息的开放标准。该协议与语言及平台无关,更符合为服务中独立性的要求。
  • Spring AMQP: Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。其中 spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

(二)实现基础消息队列

1、引入spring-amqp依赖

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2、publisher服务中利用RabbitTemplate发送消息到任务队列

  • 配置mq连接信息
spring:rabbitmq:host: 192.168.92.131 #IPport: 5672virtual-host: /username: rootpassword: root
  • 编写发送方法
	@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void sendMessage(){String queueName = "simple.queue";String message = "Hello World";rabbitTemplate.convertAndSend(queueName,message);}

3、在consumer服务中编写消费逻辑,绑定simple.queue队列

  • 配置mq连接信息
spring:rabbitmq:host: 192.168.92.131 #IPport: 5672virtual-host: /username: rootpassword: root
  • 编写发送方法1
	@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void getMessage(){String queueName = "simple.queue";// receive 表示接收方法,接收到的信息会封装到Message,可以看receive的返回值Message message = rabbitTemplate.receive(queueName);// Message.getBody 是 byte[]System.out.println(new String(message.getBody()));}
  • 编写发送方法2
    • 创建一个监听类
// 注册成 Bean 对象
@Component
public class SpringRabbitListener {// 监听器注释,queues = 订阅队列,并将返回值注入参数列表中	@RabbitListener(queues = "simple.queue")public void ListenSimpleQueueMessage(String msg){System.out.println("Spring 消费者接收到消息:【" + msg + "】");}
}

(三)消息转换器

为了让我们能够自由识别consumer发送的消息,则需要使用的是消息转换器

消息转换器如何使用?

Spring对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理,默认实现的是SimpleMessageConverter,基于ObjectObjectOutputStream完成序列化。

我们只需要定义一个 MessageConverter 类型的Bean即可,推荐使用JSON序列化

1、publisher引入依赖


<!-- 接收消息需要使用jackson的转换依赖 -->
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency><!-- 发送消息需要使用jackson的核心依赖 -->
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>

2、publisher启动类,声明MessageConverter

@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}

3、consumer启动类,声明MessageConverter

@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}

4、监听队列消息

@RabbitListener(queues = "object.queue")
public void listenObjectMessage(Object msg) {//处理数据....
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/7880.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Dubbo服务提供者失效踢出原理解析

Dubbo服务提供者失效踢出原理解析 在分布式系统中&#xff0c;服务提供者的失效是一个常见而且重要的问题。Dubbo作为一款优秀的分布式服务框架&#xff0c;提供了失效踢出机制来及时剔除不可用的服务提供者&#xff0c;确保系统的稳定性和可用性。本文将深入探讨Dubbo服务提供…

ProtoBuf入门概念

目录&#xff1a; 序列化概念ProtoBuf是什么ProtoBuf的使⽤特点安装ProtoBuf如何学习ProtoBuf 1.序列化概念 序列化和反序列化 序列化&#xff1a;把对象转换为字节序列的过程称为对象的序列化。反序列化&#xff1a;把字节序列恢复为对象的过程称为对象的反序列化。 什么…

【搜索引擎Solr】Apache Solr 神经搜索

Sease[1] 与 Alessandro Benedetti&#xff08;Apache Lucene/Solr PMC 成员和提交者&#xff09;和 Elia Porciani&#xff08;Sease 研发软件工程师&#xff09;共同为开源社区贡献了 Apache Solr 中神经搜索的第一个里程碑。 它依赖于 Apache Lucene 实现 [2] 进行 K-最近邻…

找不同的字符

题目&#xff1a; 给定两个字符串 s 和 t &#xff0c;它们只包含小写字母。 字符串 t 由字符串 s 随机重排&#xff0c;然后在随机位置添加一个字母。 请找出在 t 中被添加的字母。 解题思路 把字符串转换成字典&#xff0c;key为字符&#xff0c;value为字符出现的次数&…

【Python笔记】Python + xlrd + pymysql读取excel文件数据并且将数据插入到MySQL数据库里面

这篇文章&#xff0c;主要介绍Python xlrd pymysql读取excel文件数据并且将数据插入到MySQL数据库里面。 目录 一、Python读取excel 1.1、安装xlrd库 1.2、打开excel工作簿 1.3、获取sheet工作表 1.4、操作row数据行 1.5、操作column数据列 1.6、操作单元格 二、读取…

Bean 作用域和生命周期

1.通过⼀个案例来看 Bean 作⽤域的问题 假设现在有⼀个公共的 Bean&#xff0c;提供给 A ⽤户和 B ⽤户使⽤&#xff0c;然⽽在使⽤的途中 A ⽤户却“悄悄”地修 改了公共 Bean 的数据&#xff0c;导致 B ⽤户在使⽤时发⽣了预期之外的逻辑错误。 1.1 被修改的 Bean 案例 公…

【N32L40X】学习笔记04-gpio中断库

gpio中断 该函数库的目的就是在统一的地方配置&#xff0c;将配置的不同项放置在一个结构体内部使用一个枚举来定义一个的别名 NVIC 寄存器 NVIC 相关的寄存器定义了可以在 core_cm4.h 文件中找到。我们直接通过程序的定义来分 析 NVIC 相关的寄存器&#xff0c;其定义如下…

RocketMQ(1.NameServer源码)

NameServer功能简述 主要功能如下 服务注册与发现&#xff1a;Nameserver扮演了RocketMQ集群中服务注册中心的角色。当RocketMQ中的Broker、Producer和Consumer启动时&#xff0c;它们会向Nameserver注册自己的网络地址和角色信息。Nameserver维护着集群中所有活跃实例的信息…

openfeign调用文件服务的文件上传接口报错:Current request is not a multipart request

解决办法&#xff1a; Api 接口 Api(tags "文件上接口") RestController public class FileController {Autowiredprivate FileFeignService fileFeignService;ApiOperation("上传文件")PostMapping(value "/uploadFile")public ResData<…

python常用数据类型区别

1.set集合和dict字典的区别 set没有对应的value值&#xff0c;两者都是可变类型&#xff0c;即不可哈希;两者的内部元素是不可变类型&#xff0c;即可哈希&#xff0c;都无索引&#xff0c;不可进行切片和根据索引进行的操作。 2.set集合和list列表的区别 相同点 都是可变类…

zookeeper学习(一) Standalone模式(单机模式)安装

安装准备 centos7环境jdk1.8环境zookeeper安装包 安装jdk 上传jdk安装包解压安装包到目录中 tar -zxvf jdk-8u361-linux-x64.tar.gz如果需要指定目录可以在后面加上 -C&#xff0c;如 tar -zxvf jdk-8u361-linux-x64.tar.gz -C 目录配置jdk环境变量 vim /etc/profile打开…

Dijkstra 算法——求解最短路径问题

迪杰斯特拉算法&#xff08;Dijkstra’s algorithm&#xff09;是一种用于解决单源最短路径问题的贪心算法。它可以找到从一个起始顶点到其他所有顶点的最短路径&#xff0c;并且适用于边的权重非负的图。 算法步骤如下&#xff1a; 创建一个数组 dist&#xff0c;用于保存起…

react当我们有两个完全不相关的组件想要通信时,就可以利用这种模式,其中一个组件负责订阅某个消息,而另一个元素则负责发送这个消息。使用Context配合

在nextjs项目中&#xff0c;发现两个组件没啥关系&#xff0c;例如一个是一直存在的头部组件&#xff0c;另一个是页面中的组件&#xff0c;当我点击头部组件中的特定按钮时&#xff0c;把数据传递到页面组件中&#xff0c;页面组件接受到canshu数据后在做其他操作&#xff0c;…

入门前端监控

背景 前端监控是指通过一系列手段对Web页面或应用程序进行实时监控和数据采集&#xff0c;以了解页面或应用程序的性能状况、用户行为等等&#xff0c;并及时发现和解决潜在的问题。一个完整的前端监控平台可以包括&#xff1a;数据收集与上报、数据整理与存储、数据展示这里仅…

redis---持久化和数据类型的基本操作

目录 1.redis持久化 2.redis数据类型 1.redis持久化 【1】RDB 启用rdb&#xff0c;查看是否有对应文件生成 1.进入配置文件&#xff0c;修改配置 [rootclient ~]# vim /etc/redis.conf save 60 5 # 自动出发机制&#xff08;60秒内进行5次操…

Java连锁门诊医院HIS信息管理系统源码

Java连锁门诊医院HIS信息管理系统源码&#xff1a;SaaS运维平台多医院多机构多门诊入驻强大的电子病历完整开发文档 一、系统概述 ❉采用主流成熟技术&#xff0c;软件结构简洁、代码规范易阅读&#xff0c;SaaS应用&#xff0c;全浏览器访问前后端分离&#xff0c;多服务协同…

通过两种实现方式理解CANoe TC8 demo是如何判断接收的以太网报文里的字段的

假设有一个测试用例,需求是:编写一个测试用例,发送一条icmpv4 echo request报文给DUT,identifier字段设置为10。判断DUT能够回复icmpv4 echo reply报文,且identifier字段值为10。 实现:在canoe的simulation setup界面插入一个test节点,ip地址为:192.168.0.1,mac地址为…

具身智能,是机器人的“冷饭热炒”吗?

大模型正如火如荼&#xff0c;下一个AI风口就来了。 如果你关注2023世界人工智能大会等行业峰会&#xff0c;以及英伟达、微软、谷歌、特斯拉和国內科技大厂的最新发布会&#xff0c;除了“大模型”&#xff0c;应该会听到另一个高频词——具身智能。 所谓具身智能Embodied AI …

IRIS搭建docker

之前把web实现了docker&#xff0c;开发或测试环境可能需要开发自己搭数据库&#xff0c;为了方便使用&#xff0c;把数据库也做一个docker。 由于原生的CentOS我还有改yum仓库&#xff0c;所以这次从之前lis搞的改好yum的镜像开始&#xff08;从改好yum的lisnew的镜像创建lis…

【Linux】Ubuntu基本使用与配置, 以及常见问题汇总(一)

前言 大学期间&#xff0c;感觉很多时候学习课外知识都是被推着往前走&#xff0c;很多内容并没有深入去学习&#xff0c;知识的记录受限于所学比较片面&#xff0c;如今渐渐意识到似乎并没有建立起相关知识的体系架构&#xff0c;缺乏一个系统学习并整理的过程。本文将以Ubunt…