MQ,RabbitMQ,SpringAMQP的原理与实操

MQ

同步通信

image-20240202103233412

image-20240202105021949

image-20240202105123170

异步通信

image-20240202111930461

事件驱动优势:

  • 服务解耦

  • 性能提升,吞吐量提高

    image-20240202141023030

  • 服务没有强依赖,不担心级联失败问题

    image-20240202141137606

  • 流量消峰

    image-20240202141435355

​ 小结: 大多情况对时效性要求较高,所有大多数时间用同步。而如果不需要对方的结果,且吞吐量,并发量较高则需要使用异步通信

image-20240202141921703

MQ常见框架

MQ(MessageQueue),消息队列,字面来看就是存放消息的队列,也就是事件驱动架构中的Broker

消息:就是事件,比如支付成功了这个事件,在MQ中就是一个消息

image-20240202144211395

RabbitMQ,RocketMQ 适合处理业务(若需要优化定制则选Rocket,因为用Java写的)

Kafka 适合处理日志(海量数据且对数据安全性要求不高的场景),ActiveMQ用的较少

RabbitMQ

RabbitMQ概述与安装

RabbitMQ是基于Erlang语言(面向并发的语言,天生为分布式系统而设计的)开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com/

参考课前资料(链接:https://pan.baidu.com/s/1JuVKKFpUXg8TFxa_FoV3Gg
提取码:1468) 来安装RabbitMQ

image-20240202144811905

之后在浏览器输入:http://192.168.83.130:15672/ 进入RabbitMQ管理页面,按docker run中设置的账号密码进行登录

结果如下

image-20240204101726227

mq整体架构

image-20240204103735587

小结

image-20240204103835121

常见消息模型

image-20240204105108372

HelloWorld 案例

image-20240204105310538

动手实践

案例: 完成官方Demo中的hello world案例(链接:https://pan.baidu.com/s/1JuVKKFpUXg8TFxa_FoV3Gg
提取码:1468)

image-20240204105416259

打开项目,将ip调成自己的rabbitmq使用虚拟机(或电脑)的ip,再运行一次PublisherTest中的 testSendMessage() 方法

发送一条消息。再运行ConsumerTest 中main方法来接收消息。

image-20240204112803673

小结

image-20240204135103572

SpringAMQP

AMOP(Advanced Message Queuing Protocol)高级消息队列协议,大大简化消息发送和接收的代码量,且与语言无关

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

image-20240204145954201

image-20240204140927498

AMQP依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>    <groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在配置文件中添加mq连接信息

spring:rabbitmq:host: 192.168.83.130 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机 username: itcast # 用户名password: 123321 # 密码

Basic Queue 简单队列模型

案例:利用SpringAMQP实现HelloWorld中的基础消息队列功能

流程如下:

1.在父工程中引入spring-amqp的依赖,以及在publisher服务中编写配置

2.在publisher服务中利用RabbitTemplate的convertAndSend方法,发送消息到simple.queue这个队列

image-20240204145734357

SpringAMQP发送消息步骤:引入依赖和设置配置---->利用RabbitTemplate的convertAndSend方法

3.在consumer中编写代码,接收消息

image-20240204151638720

SpringAMQP接收消息步骤:引入依赖和设置配置—》定义类,添加Component注解,类中声明方法添加@RabbitListener注解

Work Queue 工作队列模型

Work queue,工作队列,可以提高消息处理速度,避免队列消息堆积

比如队列 一秒来50条消息 一个消费者一秒处理40条消息,那么需要两个消费者才能使得队列中消息被处理不丢失

image-20240204153355750

案例:实现一个队列绑定多个消费者

image-20240204153947098

问题:rabbitMQ消息预取,会将50条消息平均分给消费者1和消费者2,但消费者2处理速度慢,因此在1s内处理不完publisher发过来的50条消息

解决方案:让能者多劳,设置preFetch,控制预取消息的上限

image-20240204160513742

小结image-20240204161439493

发布、订阅模型-Fanout

image-20240204161952605

注意:exchange负责消息路由,而不是存储(queue负责存储),路由失败则消息丢失

Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的queue(广播)

案例:利用SpringAMQP演示FanoutExchange的使用

image-20240204163804072

step1 在consumer服务中声明Exchange、Queue、Binding(绑定关系)

image-20240204163828992

image-20240204164305716

step2 在consumer服务声明两个消费者

在consumer服务的SpringRabbitListener类中,添加两个方法,分别监听fanout.queue1和fanout.queue2:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}@RabbitListener(queues = "fanout.queue2") 
public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

step3 在publisher服务发送消息到FanoutExchange

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testFanoutExchange() {// 队列名称  String exchangeName = "itcast.fanout"; // 消息String message = "hello, everyone!";// 发送消息,参数分别是:交互机名称、RoutingKey(暂时为空)、消息		rabbitTemplate.convertAndSend(exchangeName, "", message);
}

小结

image-20240205092228233

发布、订阅模型-Direct

image-20240205092356181

案例:利用SpringAMQP演示DirectExchange的使用

image-20240205092544599

步骤一 在consumer服务声明Exchange、Queue

1.在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2,

2.并利用@RabbitListener声明Exchange、Queue、RoutingKey

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red","blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者1........接收到路由消息:【" + msg + "】" + LocalTime.now());
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red","yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者2........接收到消路由息:【" + msg + "】" + LocalTime.now());
}

步骤二 在publisher服务发送消息到DirectExchange

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testDirectExchange() {//交换机名字String exchangeName = "itcast.direct";//消息String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";//发送消息,参数依次为:交换机名称,RoutingKey,消息rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}

从blue->yellow->red 运行三次,得到结果如下

image-20240205104021565

小结

image-20240205104321850

发布、订阅模型-Topic

image-20240205104559605

案例 利用SpringAMQP演示TopicExchange的使用

image-20240205104825731

步骤一:在consumer服务声明Exchange、Queue

1.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2,

2.并利用@RabbitListener声明Exchange、Queue、RoutingKey

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消费者1........接收到路由消息:【" + msg + "】" + LocalTime.now());
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消费者2........接收到消路由息:【" + msg + "】" + LocalTime.now());
}

步骤二:在publisher服务发送消息到TopicExchange

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testTopicExchange() {//交换机名字String exchangeName = "itcast.topic";//消息String message = "喜报!孙悟空大战哥斯拉,胜!";//发送消息,参数依次为:交换机名称,RoutingKey,消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

小结

image-20240205105655795

消息转化器

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下:

​ 在publisher服务引入依赖

<dependency>   <groupId>com.fasterxml.jackson.core</groupId>   <artifactId>jackson-databind</artifactId>
</dependency>

​ 在publisher服务声明MessageConverter。(原本应该放到配置类中,但启动类也是配置类,所以可以放启动类中)

@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter(); 
}

image-20240205111950238

案例 测试发送Object类型消息

image-20240205111336946

结果如下(没有更改JDK序列化方式)

image-20240205111231469

使用json序列化器之后

image-20240205111303797

consumer接收消息过程

step1:加jackson依赖,依赖上面已经放父工程中,就不用做了

step2: 将pulisher中相同的MessageConverter放入consumer 启动类中(发送方与接收方必须相同)

@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter(); 
}

step3: 定义一个消费者,监听object.queue队列并消费消息

 @RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String,Object> msg){System.out.println("消费者........接收到对象消息:【" + msg + "】" + LocalTime.now());
}

image-20240205135854654

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/668732.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

性能实测:分布式存储 ZBS 与集中式存储 HDS 在 Oracle 数据库场景表现如何

作者&#xff1a;深耕行业的 SmartX 金融团队 金鑫 在金融客户的基础架构环境中&#xff0c;HDS 是一种被广泛使用的存储解决方案。作为集中式存储的代表之一&#xff0c;HDS 拥有高性能、高可用性和可扩展性的企业级存储特点&#xff0c;适用于实时数据处理、虚拟化和灾难备份…

Python 潮流周刊#38:Django + Next.js 构建全栈项目

△△请给“Python猫”加星标 &#xff0c;以免错过文章推送 你好&#xff0c;我是猫哥。这里每周分享优质的 Python、AI 及通用技术内容&#xff0c;大部分为英文。本周刊开源&#xff0c;欢迎投稿[1]。另有电报频道[2]作为副刊&#xff0c;补充发布更加丰富的资讯&#xff0c;…

开源软件全景解析:驱动技术创新与行业革新的力量

目录 什么是开源 开源的核心 开源软件的特点 为什么程序员应该拥抱开源 1.学习机会&#xff1a; 2.社区支持&#xff1a; 3.提高职业竞争力&#xff1a; 4.加速开发过程&#xff1a; 5.贡献和回馈&#xff1a; 开源软件的影响力 开源软件多元分析&#xff1a; 开源…

蓝桥杯刷题day06——平均

1、题目描述 有一个长度为n 的数组&#xff08;n 是 10 的倍数&#xff09;&#xff0c;每个数ai都是区间 [0,9] 中的整数。 小明发现数组里每种数出现的次数不太平均&#xff0c;而更改第i 个数的代价为bi&#xff0c; 他想更改若干个数的值使得这10 种数出现的次数相等&…

YOLOv8改进 | 检测头篇 | 重参数化检测头RepHead解决困难样本检测(全网独家首发)

一、本文介绍 本文给大家带来的改进机制是RepHead,该检测头为我独家全网首发,该检测头由重参数化模块组成,加大对于特征学习的能力,却可以不增加GFLOPs(仅仅略微提升)从而不影响模型的推理速度和性能,保持较高的FPS能力,牺牲了少量GFLOPs的情况下确提高了模型的特征提…

(2)(2.13) Rockblock Satellite Modem

文章目录 前言 1 支持的MAVLink命令信息 2 设置 3 使用方法 4 数据成本 5 参数 前言 &#xff01;Note 该功能仅适用于 ArduPilot 4.4 或更高版本&#xff0c;并且要求飞行控制器支持 LUA 脚本(LUA Scripts)。 RockBLOCK 卫星调制解调器可实现与 ArduPilot 飞行器的全球…

SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式 基础(持续更新~)

具体操作&#xff1a; day2: 作用&#xff1a; 出现跨域问题 配相对应进行配置即可解决&#xff1a; IDEA连接的&#xff0c;在url最后加参数?useSSLfalse注意链接密码是123&#xff08;docker中mysql密码&#xff09; 注意&#xff0c;虚拟机中设置的密码和ip要和主机上…

专业排版设计软件:QuarkXPress 2024 for mac中文激活版

QuarkXPress 2024 for Mac是一款功能强大、易于使用、高质量输出的专业排版软件。无论您是出版业的专家还是初学者&#xff0c;都可以通过QuarkXPress 2024轻松创建出令人惊叹的出版物。 软件下载&#xff1a;QuarkXPress 2024 for mac中文激活版下载 QuarkXPress 2023 for Mac…

Unity3d Cinemachine篇(完)— TargetGroup

文章目录 前言使用TargetGroup追随多个模型1. 创建二个游戏物体2. 创建TargetGroup相机3. 设置相机4. 完成 前言 上一期我们简单的使用了ClearShot相机&#xff0c;这次我们来使用一下TargetGroup 使用TargetGroup追随多个模型 1. 创建二个游戏物体 2. 创建TargetGroup相机 3…

vue 下载二进制文件

文章目录 概要技术细节 概要 vue 下载后端返回的二进制文件流 技术细节 import axios from "axios"; const baseUrl process.env.VUE_APP_BASE_API; //downLoadPdf("/pdf/download?pdfName" res .pdf, res); export function downLoadPdf(str, fil…

react-virtualized实现行元素不等高的虚拟列表滚动

前言&#xff1a; 当一个页面中需要接受接口返回的全部数据进行页面渲染时间&#xff0c;如果数据量比较庞大&#xff0c;前端在渲染dom的过程中需要花费时间&#xff0c;造成页面经常出现卡顿现象。 需求&#xff1a;通过虚拟加载&#xff0c;优化页面渲染速度 优点&#xff1…

Codeforces Round 651 (Div. 2)C. Number Game 博弈 奇偶数 偶数的表示

Submission #244500083 - Codeforces 题目&#xff1a; 思路&#xff1a; 此题要从奇偶性上入手。&#xff08;注意除的是奇因数&#xff0c;即一个奇数。我想成质数了&#xff09; 1.当A选手开局是1时&#xff0c;A败。 2.当A选手开局是2和奇数时&#xff0c;A必胜。&…

vue2 el-table新增行内删除行内(两种写法)里面第一个是树组件,第二个是数字组件,第一个数组件只能勾选最后一个节点

第一种 <template><div class"time_table"><div style"margin-bottom: 10px"><el-button click"addRowFn">新增</el-button></div><el-form ref"costForm" :model"formData">&l…

备战蓝桥杯---搜索(剪枝)

何为剪枝&#xff0c;就是减少搜索树的大小。 它有什么作用呢&#xff1f; 1.改变搜索顺序。 2.最优化剪枝。 3.可行性剪枝。 首先&#xff0c;单纯的广搜是无法实现的&#xff0c;因为它存在来回跳的情况来拖时间。 于是我们可以用DFS&#xff0c;那我们如何剪枝呢&#…

Http请求Cookie失效问题

Http请求Cookie失效问题记录 一、问题现象 在开发功能的过程中&#xff0c;业务依赖cookie进行取之&#xff0c;项目进行交互时会对前端http请求携带的cookies进行解析操作&#xff0c;但在自测调试对过程中出现账户的授权失效的报错问题。 二、问题排查 用arthas进行代码方…

3d网上虚拟现实展厅让汽车零部件厂商脱颖而出

在这个信息爆炸的时代&#xff0c;如何让自己的产品在众多竞争者中脱颖而出?让我们为您揭示一个秘密武器——汽车线上3D云展示软件。 想象一下&#xff0c;一辆外观炫酷、性能卓越的红色汽车&#xff0c;通过这款3D云展示软件&#xff0c;呈现在潜在客户的眼前。那流线型的车身…

Failed at the chromedriver@2.27.2 install script.

目录 【错误描述】Failed at the chromedriver2.27.2 install script. npm install报的错误 【解决方法】 删除node_modules文件夹npm install chromedriver --chromedriver_cdnurlhttp://cdn.npm.taobao.org/dist/chromedrivernpm install 【未解决】 下载该zip包运行这个&…

【npm】安装全局包,使用时提示:不是内部或外部命令,也不是可运行的程序或批处理文件

问题 如图&#xff0c;明明安装Vue是全局包&#xff0c;但是使用时却提示&#xff1a; 解决办法 使用以下命令任意一种命令查看全局包的配置路径 npm root -g 然后将此路径&#xff08;不包括node_modules&#xff09;添加到环境变量中去&#xff0c;这里注意&#xff0c;原…

前端框架学习 Vue(3)vue生命周期,钩子函数,工程化开发脚手架CLI,组件化开发,组件分类

Vue 生命周期 和生命周期的四个阶段 Vue生命周期:一个Vue实例从创建 到 销毁 的整个过程 生命周期四个阶段 :(1)创建 (2)挂载 (3)更新 (4)销毁 Vue生命周期函数(钩子函数) Vue生命周期过程中,会自动运行一些函数,被称为[生命周期钩子] ->让开发者可以在[特定阶段] 运行自…

【云原生kubernetes系列】---亲和与反亲和

1、亲和和反亲和 node的亲和性和反亲和性pod的亲和性和反亲和性 1.1node的亲和和反亲和 1.1.1ndoeSelector&#xff08;node标签亲和&#xff09; #查看node的标签 rootk8s-master1:~# kubectl get nodes --show-labels #给node节点添加标签 rootk8s-master1:~# kubectl la…