MQ~消息队列能力、AMQP协议、现有选择(Kafka、RabbitMQ、RocketMQ 、Pulsar)

消息队列

消息队列看作是一个存放消息的容器,当我们需要使用消息的时候,直接从容器中取出消息供自己使用即可。由于队列 Queue 是一种先进先出的数据结构,所以消费消息时也是按照顺序来消费的。

常⽤的消息队列主要这 五 种,分别为 Kafka、RabbitMQ、RocketMQ 、 ActiveMQ、Pulsar。

消息队列模式

点对点通讯

  • 点对点模式:多个⽣产者可以向同⼀个消息队列发送消息,⼀个具体的消息只能由⼀个消费者消费。
    在这里插入图片描述
    使用队列(Queue)作为消息通信载体;满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。比如:我们生产者发送 100 条消息的话,两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)

广播

  • 发布/订阅模式:单个消息可以被多个订阅者并发的获取和处理。
    在这里插入图片描述
    发布订阅模型(Pub/Sub) 使用主题(Topic)作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者。

消息队列能力

1. 异步处理

将写请求数据存储到消息队列之后就立即返回结果。随后,消息队列系统再对消息进行消费。因为写请求写入消息队列之后就立即返回了,大大降低写请求的延迟,下游的执行结果也不会影响核心流程的进行,做到降低延迟。

但如果请求数据在后续的业务校验、写数据库等操作中可能失败。因此,使用消息队列进行异步处理之后,需要适当修改业务流程进行配合,比如用户在提交订单之后,订单数据写入消息队列,不能立即返回用户订单提交成功,需要在消息队列的订单消费者进程真正处理完该订单之后,甚至出库后,再通过电子邮件或短信通知用户订单成功,以免交易纠纷。这就类似我们平时手机订火车票和电影票。

2. 降低系统耦合性,弱依赖化

生产者(客户端)发送消息到消息队列中去,接受者(服务端)处理消息,需要消费的系统直接去消息队列取消息进行消费即可而不需要和其他系统有耦合,这显然也提高了系统的扩展性。

消息队列使用发布-订阅模式工作,消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅消息。
在这里插入图片描述
从上图可以看到消息发送者(生产者)和消息接受者(消费者)之间没有直接耦合,消息发送者将消息发送至分布式消息队列即结束对消息的处理,消息接受者从分布式消息队列获取该消息后进行后续处理,并不需要知道该消息从何而来。

对新增业务,只要对该类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,从而实现网站业务的可扩展性设计。

3. 削峰/限流

先将短时间高并发产生的事务消息存储在消息队列中,然后后端服务再慢慢根据自己的能力去消费这些消息,这样就避免直接把后端服务打垮掉。

例如在电子商务一些秒杀、促销活动中,合理使用消息队列可以有效抵御促销活动刚开始大量订单涌入对系统的冲击。

4. 分布式事务处理

RocketMQ、 Kafka、Pulsar都提供了事务相关的功能。事务允许事件流应用将消费,处理,生产消息整个过程定义为一个原子操作。

在分布式事务的实现中,有很多种方案,其中比较常用的就是基于MQ来实现,在MQ的具体实现中,又有很多种具体的方案,从大的方面来说可以分为两种:

  • 可靠消息最终一致性
  • 最大努力通知

可靠消息最终一致性

可靠消息最终一致性,顾名思义就是依赖可靠的消息,来实现一种最终一致性的模型。
他的大致流程就是:

  1. 事务的发起方执行本地事务
  2. 事务的发起方向事务的参与方发送MQ消息
  3. 事务的参与方接收到MQ消息后执行自己的本地事务

这里面事务的发起方和参与方都需要各自执行本地事务,他们之间,通过可靠消息来保障最终一致。
那么,怎么样的消息算可靠呢,直接依赖kafka、rocketMQ发送一个消息就可靠了么? 显然是不行的,因为我们知道,在出现网络分区、网络延迟等情况时,是没办法保证消息一定可以发发出去的,也没办法保证消息发出去就一定能被成功消费。

那么想要做到让这个消息可靠,一般由两种做法:

  1. 本地消息表
  2. 事务消息

通过这两种方案,都可以保证事务的发起方在执行完本地事务之后,消息一定可以发出去,并且一定能被消费成功。

本地消息表的方案是基于本地事务+重试,来保证MQ消息一定可以发出去。
事务消息的方案是基于MQ的事务消息机制,把一条消息拆成两个half消息,通过2阶段的方式+回调反查来保证消息一定能发出去。2者都是依赖MQ自身的重试机制+事务参与者反查+对账来保证正消息一定可以消费。

最大努力通知

除了可靠消息最终一致性这种以外,还有一种方式就是也使用消息,但是这个消息并不要求一定可靠。这就是最大努力通知。

这个方案一般就是只依赖重试机制,来做最大努力的通知事务参与者。但是需要注意的是,在最大努力通知的过程中,可能会出现消息重复发送的情况,也可能会出现消息丢失的情况。

5. 顺序保证

在很多应用场景中,处理数据的顺序至关重要。消息队列保证数据按照特定的顺序被处理,适用于那些对数据顺序有严格要求的场景。大部分消息队列,例如 RocketMQ、RabbitMQ、Pulsar、Kafka,都支持顺序消息。

6. 延时/定时处理

消息发送后不会立即被消费,而是指定一个时间,到时间后再消费。大部分消息队列,例如 RocketMQ、RabbitMQ、Pulsar、Kafka,都支持定时/延时消息。

7. 海量数据流处理(日志、监控)

针对分布式系统产生的海量数据流,如业务日志、监控数据、用户行为等,消息队列可以实时或批量收集这些数据,并将其导入到大数据处理引擎中,实现高效的数据流管理和处理。

消息队列一定好吗?

消息队列能力多,但也会给系统带来如下三点问题:

  • 系统可用性降低: 系统可用性在某种程度上降低,为什么这样说呢?在加入 MQ 之前,你不用考虑消息丢失或者说 MQ 挂掉等等的情况,但是,引入 MQ 之后你就需要去考虑了!
  • 系统复杂性提高: 加入 MQ 之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题!
  • 一致性问题: 我上面讲了消息队列可以实现异步,消息队列带来的异步确实可以提高系统响应速度。但是,万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况了!

消息协议

任何的数据传输都有协议,协议代表了其具有的最基础的能力,了解协议,有助于深入体会能力和后续拓展。

AMQP

AMQP,统一消息服务的应用层标准 高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS。

基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。

  • 支持跨语言
  • 支持跨平台
  • 提供了五种消息模型:①direct exchange;②fanout exchange;③topic change;④headers exchange;⑤system exchange。

本质来讲,后四种和pub/sub 模型没有太大差别,仅是在路由机制上做了更详细的划分。

Exchange 提供的路由算法,AMQP 可以提供多样化的路由方式来传递消息到消息队列。

现有选型

1. Kafka

Kafka 早期被用来用于处理海量日志流式处理平台,后面才慢慢发展成了一款功能全面的高性能消息队列。
流式处理平台具有三个关键功能:

  1. 消息队列:发布和订阅消息流,这个功能类似于消息队列,这也是 Kafka 也被归类为消息队列的原因。
  2. 容错的持久方式存储记录消息流:Kafka 会把消息持久化到磁盘,有效避免了消息丢失的风险。
  3. 流式处理平台: 在消息发布的时候进行处理,Kafka 提供了一个完整的流式处理类库。

Kafka 是一个分布式系统,由通过高性能 TCP 网络协议进行通信的服务器和客户端组成,可以部署在在本地和云环境中的裸机硬件、虚拟机和容器上。

在 Kafka 2.8 之前,Kafka 最被大家诟病的就是其重度依赖于 Zookeeper 做元数据管理和集群的高可用。在 Kafka 2.8 之后,引入了基于 Raft 协议的 KRaft 模式,不再依赖 Zookeeper,大大简化了 Kafka 的架构,让你可以以一种轻量级的方式来使用 Kafka。

2. RocketMQ

RocketMQ 是阿里开源的一款云原生:消息、事件流的实时数据处理平台,借鉴了 Kafka,已经成为 Apache 顶级项目。

RocketMQ 的核心特性(摘自 RocketMQ 官网):

  1. 云原生:生与云,长与云,无限弹性扩缩,K8s 友好高吞吐:万亿级吞吐保证,同时满足微服务与大数据场景。
  2. 流处理:提供轻量、高扩展、高性能和丰富功能的流计算引擎。
  3. 金融级:金融级的稳定性,广泛用于交易核心链路。
  4. 架构极简:零外部依赖,Shared-nothing 架构。
  5. 生态友好:无缝对接微服务、实时计算、数据湖等周边生态。

根据官网介绍:Apache RocketMQ 自诞生以来,因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案,被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。

3. RabbitMQ

RabbitMQ 是采用 Erlang 语言实现 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。

RabbitMQ 发展到今天,被越来越多的人认可,这和它在易用性、扩展性、可靠性和高可用性等方面的卓著表现是分不开的。

RabbitMQ 的具体特点可以概括为以下几点:

  1. 可靠性: RabbitMQ 使用一些机制来保证消息的可靠性,如持久化、传输确认及发布确认等。
  2. 灵活的路由: 在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能,RabbitMQ 己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。
  3. 扩展性: 多个 RabbitMQ 节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
  4. 高可用性: 队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用。
  5. 支持多种协议: RabbitMQ 除了原生支持 AMQP 协议,还支持 STOMP、MQTT 等多种消息中间件协议。
  6. 插件机制: RabbitMQ 提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件。

4. Pulsar

Pulsar 是新一代云原生分布式消息流平台,最初由 Yahoo 开发 ,已经成为 Apache 顶级项目。

Pulsar 集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。

Pulsar 的关键特性如下(摘自官网):

  1. 是新一代云原生分布式消息流平台。
  2. Pulsar 的单个实例原生支持多个集群,可跨机房在集群间无缝地完成消息复制。
  3. 极低的发布延迟和端到端延迟。可无缝扩展到超过一百万个 topic。
  4. 简单的客户端 API,支持 Java、Go、Python 和 C++。
  5. 主题的多种订阅模式(独占、共享和故障转移)。
  6. 通过 Apache BookKeeper 提供的持久化消息存储机制保证消息传递 。
  7. 由轻量级的 serverless 计算框架 Pulsar Functions 实现流原生的数据处理。
  8. 基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得数据更易移入、移出 Apache Pulsar。
  9. 分层式存储可在数据陈旧时,将数据从热存储卸载到冷/长期存储。

总结

  • RabbitMQ 在吞吐量方面虽然稍逊于 Kafka、RocketMQ 和 Pulsar,但是由于它基于 Erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。但是也因为 RabbitMQ 基于 Erlang 开发,所以国内很少有公司有实力做 Erlang 源码级别的研究和定制。如果业务场景对并发量要求不是太高(十万级、百万级),要求极高的低延迟,那这几种消息队列中,RabbitMQ 你可以选择使用。

  • RocketMQ 和 Pulsar 支持强一致性,对消息一致性要求、稳定性要求比较高的场景可以使用。RocketMQ 阿里出品,Java 系开源项目,源代码我们可以直接阅读,然后可以定制自己公司的 MQ,并且 RocketMQ 有阿里巴巴的实际业务场景的实战考验。

  • Kafka 的特点其实很明显,追求⾼吞吐量,提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时 Kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。如果是大数据领域的实时计算、日志采集、监控打点等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/32033.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用 DISPATCHERS 进行 Blueprint 之间的通信

文章目录 初始准备DISPATCHERS 的创建和绑定实现效果 初始准备 首先 UE5 默认是不提供 静态网格体编辑器也就是 Modeling Mode 的,这里需要从插件中添加 Modeling Tools Editor Mode 进入 Modeling Mode 模式,创建一个正方体 然后利用 PolyGroup Edit 和…

网络编程1

网络编程中&#xff0c;客户端调用close或者shutdown后&#xff0c;操作系统会给服务器发送一个FIN #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h>int main() {int sockfd;struct s…

Vue79-路由组件独有的2个新的生命周期钩子

一、需求 news.vue路由组件被缓存了&#xff08;因为想要保留里面的输入框的数据&#xff01;&#xff09;&#xff0c;导致&#xff0c;路由页面切走&#xff0c;组件也不会被销毁&#xff0c;所以&#xff0c;beforeDestroy()函数就不会被执行&#xff0c;所以&#xff0c;定…

npm、yarn、pnpm 最新国内镜像源设置和常见问题解决

1. npm 设置国内镜像源 1.1 镜像源概述 镜像源是软件包管理工具用来下载和安装软件包的服务器地址。由于网络原因&#xff0c;直接使用官方源可能会导致速度慢或连接失败的问题。国内镜像源可以提供更快的访问速度和更稳定的连接。 1.2 镜像源的选择 国内有许多可用的npm镜…

Java中如何使用设计模式来解决编程问题?

Java中使用设计模式来解决编程问题&#xff0c;可以显著提高代码的可复用性、可维护性和可读性。设计模式是一套被广泛应用于软件工程的解决方案&#xff0c;描述了在特定上下文中面对具体问题时的可复用解决方案。以下是几种常用的设计模式及其应用场景&#xff1a; 单例模式…

数据结构课设——文章编辑系统

需求分析(菜单 输入文章 统计字符 童子字符串出现次数 删除某子串 查看文章 保存文章 加载文章)、概要设计(算法功能设计 软件环境)、详细设计(主要数据类型 变量 函数 算法流程图)、调试分析(非法合法测试数据、遇到问题解决方案解决结果)、总结、文献、附录(代码) …

React+TS前台项目实战(十二)-- 全局常用组件Toast封装,以及rxjs和useReducer的使用

文章目录 前言Toast组件1. 功能分析2. 代码详细注释&#xff08;1&#xff09;建立一个reducer.ts文件&#xff0c;用于管理状态数据&#xff08;2&#xff09;自定义一个清除定时器的hook&#xff08;3&#xff09;使用rxjs封装全局变量管理hook&#xff08;4&#xff09;在to…

文字模拟:经营酒店隐私政策

隐私政策 文字模拟&#xff1a;经营酒店隐私政策 一、引言 本隐私政策适用于我们提供的文字模拟&#xff1a;经营酒店小游戏&#xff08;以下简称“游戏”&#xff09;。我们非常重视用户的隐私和个人信息的保护&#xff0c;因此制定了本隐私政策&#xff0c;以解释我们如何…

在scrapy中使用Selector提取数据

经院吉吉&#xff1a; 首先说明一下&#xff0c;在scrapy中使用选择器是基于Selector这个对象滴&#xff0c;selector对象在scrapy中通过XPATH或是CSS来提取数据的&#xff0c;我们可以自己创建selector对象&#xff0c;但在实际开发中我们不需要这样做&#xff0c;因为respons…

御道源码(ruoyi-vue-pro)个人使用小结

御道源码&#xff08;ruoyi-vue-pro&#xff09;个人使用小结 一、Git地址 1、平台项目简介及地址 2、开发指南&#xff0c;如图所示&#xff0c;部分功能需要收费&#xff0c;可自行了解 二、项目文件夹结构示例&#xff1a; 三、技术介绍 1.基于 Spring Boot MyBatis P…

Java字符串连接符拼接操作

在Java的算术运算符中的加法符号“ ”&#xff0c;可以用来进行算术运算&#xff0c;也可以用来当作连接符进行字符串的拼接。 当“ ”操作中出现字符串时&#xff0c;这个“ ”是字符串连接符&#xff0c;而不是运算符了。 会将前后的数据进行拼接在一起&#xff0c;并产生…

dll丢失应该怎么解决,总结5种解决DLL丢失问题的方法

在数字时代&#xff0c;我们与计算机的每一天都密不可分。然而&#xff0c;就像所有技术产品一样&#xff0c;我们的计算设备也时不时地会出现一些问题&#xff0c;让人头疼不已。就在上周&#xff0c;我遭遇了一个令人崩溃的技术挑战——DLL文件丢失。这个看似微不足道的小问题…

转--基于OpenEuler的Docker容器安装使用

/usr/sbin/sshd执行以下命令查看ssh服务是否已经开始监听22端口&#xff1a; netstat -tuln | grep :22看到以下输出证明ssh服务已启动&#xff1a; [rootmaster /]# netstat -tuln | grep :22 tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN …

【MySQL】 -- 事务

如果对表中的数据进行CRUD操作时&#xff0c;不加控制&#xff0c;会带来一些问题。 比如下面这种场景&#xff1a; 有一个tickets表&#xff0c;这个数据库被两个客户端机器A和B用时连接对此表进行操作。客户端A检查tickets表中还有一张票的时候&#xff0c;将票出售了&#x…

OpenCloudOS系统上安装Java环境

在腾讯云OpenCloudOS系统上安装Java环境&#xff0c;可以使用yum包管理器进行安装。以下是安装Java环境的步骤和示例代码&#xff1a; 首先打开终端。 执行以下命令以更新yum包索引&#xff1a; sudo yum update 安装OpenJDK Java环境&#xff0c;可以选择安装Java 8或者更…

【Linux基础IO】深入理解缓冲区

缓冲区在文件操作的过程中是比较重要的&#xff0c;理解缓冲区向文件刷新内容的原理可以更好的帮助我们更深层的理解操作系统内核对文件的操作。 FILE 因为IO相关函数与系统调用接口对应&#xff0c;并且库函数封装系统调用&#xff0c;所以本质上&#xff0c;访问文件都是通过…

ES数值类型慢查询优化

现象 某个查询ES接口慢调用告警&#xff0c;如图&#xff0c;接口P999的耗时都在2500ms: 基本耗时都在查询ES阶段&#xff1a; 场景与ES设定 慢调用接口为输入多个条件分页查询&#xff0c;慢调用接口调用的ES索引为 express_order_info&#xff0c;该索引通过DTS(数据同步…

STM32人工智能检测-筛选机器人

前言 本文描述了一种使用STM32进行机器人筛选的办法。筛选对象是我的粉s&#xff0c;删选办法是瞪眼法。 问题现象 每次当我的STM32 向外界发出一篇新的的报文&#xff0c;总能在1H之内得到focus&#xff0c;格式如下 [title][body][tail]于是我对各个focus 我报文的对象进…

Redis数据过期、淘汰策略

数据过期策略&#xff1a; 惰性删除&#xff1a; 设置该key过期时间后&#xff0c;我们不去管它&#xff0c;当需要该key时&#xff0c;我们在检查其是否过期&#xff0c;如果过期&#xff0c;我们就删掉它&#xff0c;反之返回该key。 这种方式对cpu友好&#xff08;只在用…

浏览器/H5复制链接功能

方法1&#xff1a;execCommand copyLink(){//复制链接--execCommandlet input document.createElement(input); //创建一个input标签input.value this.shareForm.url; //复制的内容&#xff0c;没有先获取标签document.body.appendChild(input);//将input添加的document中in…