MQ:RabbitMQ

同步和异步通讯

同步通讯:

需要实时响应,时效性强

耦合度高

每次增加功能都要修改两边的代码

性能下降

需要等待服务提供者的响应,如果调用链过长则每次响应时间需要等待所有调用完成

资源浪费

调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源

级联失败

如果一个服务的提供者出现了问题,所有调用方都会出问题,出现雪崩问题

异步通讯:

在发布方和接收方之间存在中间人(Broker)

发布方只需将消息发布到中间人

接受方只需要从中间人订阅消息

实现解耦

吞吐量提升

无需等待订阅者处理完成,响应快速

故障隔离

服务不存在直接调用,不存在级联失败问题

资源占用问题

调用之间不会阻塞,不会产生无效资源占用问题

解耦

每个服务之间灵活插拔,实现解耦

流量削峰

所有发布事件由Broker直接接受,接受者按照自己的速度从Broker处理事件,实现缓冲

MQ

MessageQueue消息队列

即上述过程中的Broker

几种常见的MQ对比
请添加图片描述

MQ的基本结构

请添加图片描述

publisher:发布者

consumer:消费者

exchange:交换机,负责消息路由

queue:队列,存储消息,消息的缓冲区

队列绑定交换机

virtualHost:虚拟主机

channel:表示通道,操作MQ的工具,连接消息发布者和交换机,连接消息接受者和队列

RabbitMQ整体工作流程

发布者发布消息给交换机

交换机将消息路由到与其绑定的队列

消费者监听与其对应的队列获取消息

RabbitMQ消息模型

生产者->(交换机)->队列->消费者
基本消息队列BasicQueue

请添加图片描述

工作消息队列WorkQueue

请添加图片描述

多个消费者并发消费队列,消费者之间是竞争关系

发布订阅(Publish,Subscribe)

根据交换机类型不同分为三种

广播

请添加图片描述

消费者各自拥有

生产者将消息发送到交换机,具体发给哪个队列,生产者无法决定,由交换机决定.

交换机把消息发送给绑定过的所有队列,队列的所有消费者都能拿到消息.实现一条消息被多个消费者消费

生产者->所有消费者

路由

请添加图片描述

需求不同的消息被不同队列消费,就需要用到Direct类型的Exchange.在Direct模型下,需要指定RoutingKey(路由key).在消息发送方向交换机发送消息时,必须指定消息的路由key

交换机在接受到生产者消息后,将消息递交给routingkey完全匹配的队列

主题

请添加图片描述

topic类型相当于可以使用通配符匹配routingkey的路由类型

通配符规则

#:匹配一个或者多个词

*:匹配恰好一个词

SpringAMQP

Spring官方基于RabbitMQ提供的一套消息收发的模版工具:SpringAMQP

提供了三个功能:

自动声明队列,交换机以及绑定关系

基于注解的监听器模式,异步接收消息

封装RabbitTemplate工具用于发送消息

引入依赖
 <!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

实现通信

新建队列->publisher发送->consumer接收

基于rabbitTemplate API进行发送

简单队列通信

发送:

    @GetMapping("/simple")public void publishMessage(){String queueName = "simplequeue";String message = "simplequeuemessage";rabbitTemplate.convertAndSend(queueName,message);System.out.println("simple_success");}

接收:

@Component
public class MqListener {@RabbitListener(queues = "simplequeue")public void simpleListener(String msg){System.out.printf("%s 简单队列收到消息:%s", Convert.toStr(LocalDateTime.now()),msg);}
}

工作队列通信

多个消费者监听一个队列,不同消费者因为自身的能力不同对消息处理的时间也不同

如果不进行额外设置的话,会将队列中的消息平均分配给所有消费者

造成处理能力浪费的情况

所以我们可以通过配置

    listener:simple:prefetch: 1#每个消费者每次只能取一条

来限制每个消费者预取的数量,实现能者多劳的工作场景

发送:

    @GetMapping("/work")public void PublishWorkMessage(){String queueName = this.WORK;for (int i = 0; i < 100; i++) {rabbitTemplate.convertAndSend(queueName,System.currentTimeMillis());}}

接收:

    @RabbitListener(queues = "workqueue")public void workListener1(String msg){System.out.printf("%s 工作队列1收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);}@RabbitListener(queues = "workqueue")public void workListener2(String msg){System.out.printf("%s 工作队列2收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);}

交换机

上述两种工作方式都是不包含交换机,消息直接发送到队列的通信方式

我们可以通过引入交换机来实现消息的路由,决定具体要发送到哪个队列

消息通信流程 发布者->交换机->交换机决定的队列->消费者

交换机类型

包含以下四种:

FanOut:广播,将消息传递给所有绑定交换机的队列
Direct:基于RoutingKey发送消息给对应的队列
Topic:通配符订阅,基于通配符RoutingKey发送消息给对应的队列
Headers:头匹配,基于MQ的消息头匹配

FanOut

创建多个队列->创建交换机进行绑定->发布者发布->消费者接收

发布:

    @GetMapping("/fanout")public void publishFanoutMessage(){String exchangeName = this.FANOUT;for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend(exchangeName,"",System.currentTimeMillis() + "from:fanout");}}

接收:

    @RabbitListener(queues = "cfjg_queue1")public void queue1Listener(String msg){System.out.printf("%s 队列1收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);}@RabbitListener(queues = "cfjg_queue2")public void queue2Listener(String msg){System.out.printf("%s 队列2收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);}

Direct

创建多个队列->创建交换机进行路由key绑定->发布者发布->消费者接收

发布:

    @GetMapping("/direct")public void publishDirectMessage(){String exchangeName = this.DIRECT;for (int i = 0; i < 10; i++) {if(i%2 == 0){rabbitTemplate.convertAndSend(exchangeName,this.Queue1,"directTo:queue1--" + i);}else if(i%2 !=  0){rabbitTemplate.convertAndSend(exchangeName,this.Queue2,"directTo:queue2--" + i);}if(i%5 == 0){rabbitTemplate.convertAndSend(exchangeName,this.ALL,"directTo:all--" + i);}}}

接收:

    @RabbitListener(queues = "cfjg_queue1")public void queue1Listener(String msg){System.out.printf("%s 队列1收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);}@RabbitListener(queues = "cfjg_queue2")public void queue2Listener(String msg){System.out.printf("%s 队列2收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);}

Topic

创建多个队列->创建交换机进行通配符路由key绑定->发布者发布->消费者接收

路由key由多个由.分割的单词组成

绑定时使用#或*通配符进行绑定

#:代表任意多个单词

*.代表任意一个单词

eg:cfjg.test

可以由cfjg.#或者*.test进行匹配

发布:

    @GetMapping("/topic")public void publishTopicMessage(){for (int i = 0; i < 100; i++) {String tmp = i % 2 == 0 ? "two" : "one";String routingKey = "cfjg." + tmp;System.out.println(routingKey);rabbitTemplate.convertAndSend(this.TOPIC,routingKey,"topicTo:queue--" + i);}}

接收:

    @RabbitListener(queues = "cfjg_queue1")public void queue1Listener(String msg){System.out.printf("%s 队列1收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);}@RabbitListener(queues = "cfjg_queue2")public void queue2Listener(String msg){System.out.printf("%s 队列2收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);}

声明队列和交换机

SpringAMQP提供了一个Queue类用来创建队列

public class Queue extends AbstractDeclarable implements Cloneable {}

提供了一个Exchange接口用来表示不同类型的交换机

请添加图片描述

SpringAMQP提供了ExchangeBuilder和BindingBuilder来简化创建和绑定队列和交换机的过程

我们可以在消费者中编写一个配置类来对队列和交换机进行声明

@Configuration
public class MqConfig {//声明交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("cfjg.fanout");}//声明队列@Beanpublic Queue fanoutQueue1(){return new Queue("test_queue");}//声明绑定@Beanpublic Binding bingingQueue1(){return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());}
}
direct和topic模式需要每个key都进行一次绑定(同控制台操作)

基于注解声明

Spring提供了基于注解方式进行声明的途径

通过注解可以声明

绑定
队列
交换机
路由key
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "cfjg_queue2"),exchange = @Exchange(name = "cfjg.fanout" , type = ExchangeTypes.FANOUT),key = {"red","blue"}))public void fanoutQueue(String msg){System.out.println(msg);}

消息转换器

在MQ的消息传输中,会先将对象序列化为字节,接收消息时将字节反序列化为Java对象

但是默认的JDK序列化存在以下问题

数据体积过大,

存在安全漏洞,

可读性差

可以使用JSON进行序列化和反序列化

引入JackSon依赖来进行JSON序列化

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>

通过在配置类中注册转换器Bean来实现消息发送时的自动序列化

	@Beanpublic MessageConverter messageConverter(){return new jackson2JsonMessageConverter();}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/41561.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

排序——交换类排序、插入类排序、选择类排序、归并类排序

排序 排序算法分为交换类排序、插入类排序、选择类排序、归并类排序。 交换类排序 冒泡排序 冒泡排序的基本思想是&#xff1a;从后往前&#xff08;或从前往后&#xff09;两两比较相邻元素的值。若A[ j - 1 ] > A[ j ]&#xff0c;则交换它们&#xff0c;直到序列比较…

commonjs、module 模块同时启动

怎样同时在一个项目中同时启动node服务和我们前端项目&#xff08;commonjs、module 模块同时启动&#xff09; 今天在使用node实现完增删改查的接口之后&#xff0c;将自己node代码嵌入到我们react项目中 启动完前端项目之后&#xff0c;当我使用node service.js的时候&#x…

Unity 简单载具路线 Waypoint 导航

前言 在游戏开发和导航系统中&#xff0c;"waypoint" 是指路径中的一个特定位置或点。它通常用于定义一个物体或角色在场景中移动的目标位置或路径的一部分。通过一系列的 waypoints&#xff0c;可以指定复杂的移动路径和行为。以下是一些 waypoint 的具体用途&…

用Python轻松转换PDF为CSV

数据的可访问性和可操作性是数据管理的核心要素。PDF格式因其跨平台兼容性和版面固定性&#xff0c;在文档分享和打印方面表现出色&#xff0c;尤其适用于报表、调查结果等数据的存储。然而&#xff0c;PDF的非结构化特性限制了其在数据分析领域的应用。相比之下&#xff0c;CS…

【国产开源可视化引擎Meta2d.js】图元

图元 又称画笔Pen。图形表达的基本元素&#xff0c;组成图像的基本单元。 构成 每一个图元由ID、名字、类型、属性&#xff08;数据&#xff09;组成。 ID 名为“id”的特殊属性&#xff0c;图元实例&#xff08;画布上的图元对象&#xff09;的唯一标识。拖拽到画布或创建…

【线性代数的本质】矩阵与线性变换

线性变化要满足两点性质&#xff1a; 直线&#xff08;连续的点&#xff09;在变换后还是直线。原点不变。 假设有坐标轴&#xff08;基底&#xff09; i ^ \widehat{i} i 和 j ^ \widehat{j} j ​&#xff1a; i ^ [ 1 0 ] , j ^ [ 0 1 ] \widehat{i}\begin{bmatrix} 1 \…

《昇思25天学习打卡营第6天|网络构建》

文章目录 前言&#xff1a;今日所学&#xff1a;1. 定义模型类2. 模型层3. 模型参数 前言&#xff1a; 在第六节中我们学习了网络构建&#xff0c;了解了神经网络模型是由神经网络层和Tensor操作构成&#xff0c;我们使用的mindspore.nn中提供了常见的升级网络层的实现&#x…

在线图片转文字的软件,分享3种强大的软件!

在信息爆炸的时代&#xff0c;图片作为信息的重要载体之一&#xff0c;其内容往往蕴含着巨大的价值。然而&#xff0c;面对海量的图片信息&#xff0c;如何高效、准确地将其转化为文字&#xff0c;成为了许多人的迫切需求。今天&#xff0c;就为大家盘点几款功能强大的在线图片…

【python基础】—如何理解安装程序时要配置Widows和DOS操作系统中的path环境变量?

文章目录 前言一、环境变量是什么&#xff1f;二、为什么需要设置环境变量&#xff1f;三、配置anaconda的环境变量 前言 在安装一些程序的时候&#xff0c; 我们总是需要将安装路径配置到正在使用电脑的环境变量里。为什么要进行这一步呢&#xff1f;本文主要解释Widows和DOS…

特殊用途二极管+二极管故障检测+三极管(BJT)的工作原理+定时器的使用(小灯定时闪烁实现)

2024-7-5&#xff0c;星期五&#xff0c;17:27&#xff0c;天气&#xff1a;晴&#xff0c;心情&#xff1a;晴。今天没有什么特殊的事情发生&#xff0c;继续学习啦&#xff0c;加油加油&#xff01;&#xff01;&#xff01; 今日完成模电自选教材第二章内容的学习&#xff…

1-4 NLP发展历史与我的工作感悟

1-4 NLP发展历史与我的工作感悟 主目录点这里 第一个重要节点&#xff1a;word2vec词嵌入 能够将无限的词句表示为有限的词向量空间&#xff0c;而且运算比较快&#xff0c;使得文本与文本间的运算有了可能。 第二个重要节点&#xff1a;Transformer和bert 为预训练语言模型发…

【ABB】原点设定

【ABB】原点设定 操作流程演示 操作流程 操作轴回原点编辑电机校准偏移更新转速计数器 1.首先得了解机器手的轴&#xff0c;这里以6轴作参考。 注意先回456轴&#xff0c;后回123轴。 2.然后需要了解机器人关节运动模式&#xff0c;即选择如下两个模式。 3.注意机器人各轴移动…

QT的编译过程(底层逻辑)

qmake -project 用于从源代码生成项目文件&#xff0c;qmake 用于从项目文件生成 Makefile&#xff0c;而 make 用于根据 Makefile 构建项目。 详细解释&#xff1a; qmake -project 这个命令用于从源代码目录生成一个初始的 Qt 项目文件&#xff08;.pro 文件&#xff09;。它…

吃顿饭的时间,用AI开发一个应用官网

最早接触开发时做的第一个项目就是企业官网&#xff0c;到后来自己开始走上独立开发者的道路时&#xff0c;哪怕是开发面向消费者的移动端产品&#xff0c;在产品上架时也需要提供应用官网。 感觉&#xff0c;编程这件事情和官网开发&#xff0c;紧密相连。 过往为了追求开发效…

个人微信 微信营销系统

个人微信 微信营销系统 CRM系统

Windows 玩转大模型第一天:大模型本地部署,调用大模型API可直接工程化应用(全部代码和详细部署流程)

Ollama 是一个开源框架&#xff0c;专为在本地机器上便捷部署和运行大型语言模型&#xff08;LLM&#xff09;而设计。 以下是其主要特点和功能概述&#xff1a; 1. 简化部署&#xff1a;Ollama 目标在于简化在 Docker 容器中部署大型语言模型的过程&#xff0c;使得非专业用…

ELK日志系统和Filebeat采集器的学习总结

ELK是ElasticSerach、Logstash、Kina Logstash负责采集数据&#xff0c;Logstash有三个插件&#xff0c;input、filter、output&#xff0c;filter插件作用是对采集的数据进行处理&#xff0c;过滤的&#xff0c;因此filter插件可以选&#xff0c;可以不用配置。 ElasticSear…

vulnhub靶场之DC-1

1 信息收集 1.1 主机发现 arp-scan -l 主机ip地址为&#xff1a;192.168.1.4 1.2 端口服务扫描 nmap -sS -sV -A -T5 -p- 192.168.1.4 开发22&#xff0c;80&#xff0c;111端口 1.3 目录扫描 dirsearch -u 192.168.1.4 2 渗透测试 2.1 先访问一下80端口 发现是一个…

K8S 部署 EFK

安装说明 系统版本为 Centos7.9 内核版本为 6.3.5-1.el7 K8S版本为 v1.26.14 ES官网 开始安装 本次安装使用官方ECK方式部署 EFK&#xff0c;部署的是当前的最新版本。 在 Kubernetes 集群中部署 ECK 安装自定义资源 如果能打开这个网址的话直接用这个命令安装,打不开的话…

半导体制造企业 文件共享存储应用

用户背景&#xff1a;半导体设备&#xff08;上海&#xff09;股份有限公司是一家以中国为基地、面向全球的微观加工高端设备公司&#xff0c;为集成电路和泛半导体行业提供具竞争力的高端设备和高质量的服务。 挑战&#xff1a;芯片的行业在国内迅猛发展&#xff0c;用户在上海…