聊一聊顺序消息(RocketMQ顺序消息的实现机制)

本文来自:https://www.cnblogs.com/hzmark/p/orderly_message.html

当我们说顺序时,我们在说什么?

日常思维中,顺序大部分情况会和时间关联起来,即时间的先后表示事件的顺序关系。

比如事件A发生在下午3点一刻,而事件B发生在下午4点,那么我们认为事件A发生在事件B之前,他们的顺序关系为先A后B。

上面的例子之所以成立是因为他们有相同的参考系,即他们的时间是对应的同一个物理时钟的时间。如果A发生的时间是北京时间,而B依赖的时间是东京时间,那么先A后B的顺序关系还成立吗?

如果没有一个绝对的时间参考,那么A和B之间还有顺序吗,或者说怎么断定A和B的顺序?

显而易见的,如果A、B两个事件之间如果是有因果关系的,那么A一定发生在B之前(前因后果,有因才有果)。相反,在没有一个绝对的时间的参考的情况下,若A、B之间没有因果关系,那么A、B之间就没有顺序关系。

那么,我们在说顺序时,其实说的是

  • 有绝对时间参考的情况下,事件的发生时间的关系;

  • 和没有时间参考下的,一种由因果关系推断出来的happening before的关系;

在分布式环境中讨论顺序
当把顺序放到分布式环境(多线程、多进程都可以认为是一个分布式的环境)中去讨论时:

  • 同一线程上的事件顺序是确定的,可以认为他们有相同的时间作为参考

  • 不同线程间的顺序只能通过因果关系去推断

在这里插入图片描述
(点表示事件,波浪线箭头表示事件间的消息)

上图中,进程P中的事件顺序为p1->p2->p3->p4(时间推断)。而因为p1给进程Q的q2发了消息,那么p1一定在q2之前(因果推断)。但是无法确定p1和q1之间的顺序关系。

推荐阅读《Time, Clocks, and the Ordering of Events in a Distributed System》,会透彻的分析分布式系统中的顺序问题。

消息中间件中的顺序消息

什么是顺序消息

有了上述的基础之后,我们回到本篇文章的主题中,聊一聊消息中间件中的顺序消息。

顺序消息(FIFO 消息)是 MQ 提供的一种严格按照顺序进行发布和消费的消息类型。顺序消息由两个部分组成:顺序发布和顺序消费。

顺序消息包含两种类型:

分区顺序:一个Partition内所有的消息按照先进先出的顺序进行发布和消费

全局顺序:一个Topic内所有的消息按照先进先出的顺序进行发布和消费

这是阿里云上对顺序消息的定义,把顺序消息拆分成了顺序发布和顺序消费。那么多线程中发送消息算不算顺序发布?

如上一部分介绍的,多线程中若没有因果关系则没有顺序。那么用户在多线程中去发消息就意味着用户不关心那些在不同线程中被发送的消息的顺序。即多线程发送的消息,不同线程间的消息不是顺序发布的,同一线程的消息是顺序发布的。这是需要用户自己去保障的。

而对于顺序消费,则需要保证哪些来自同一个发送线程的消息在消费时是按照相同的顺序被处理的(为什么不说他们应该在一个线程中被消费呢?)。

全局顺序其实是分区顺序的一个特例,即使Topic只有一个分区(以下不在讨论全局顺序,因为全局顺序将面临性能的问题,而且绝大多数场景都不需要全局顺序)。

如何保证顺序

在MQ的模型中,顺序需要由3个阶段去保障:

  1. 消息被发送时保持顺序

  2. 消息被存储时保持和发送的顺序一致

  3. 消息被消费时保持和存储的顺序一致

发送时保持顺序意味着对于有顺序要求的消息,用户应该在同一个线程中采用同步的方式发送。存储保持和发送的顺序一致则要求在同一线程中被发送出来的消息A和B,存储时在空间上A一定在B之前。而消费保持和存储一致则要求消息A、B到达Consumer之后必须按照先A后B的顺序被处理。

如下图所示:
在这里插入图片描述
对于两个订单的消息的原始数据:a1、b1、b2、a2、a3、b3(绝对时间下发生的顺序):

  • 在发送时,a订单的消息需要保持a1、a2、a3的顺序,b订单的消息也相同,但是a、b订单之间的消息没有顺序关系,这意味着a、b订单的消息可以在不同的线程中被发送出去

  • 在存储时,需要分别保证a、b订单的消息的顺序,但是a、b订单之间的消息的顺序可以不保证

    • a1、b1、b2、a2、a3、b3是可以接受的

    • a1、a2、b1、b2、a3、b3也是可以接受的

    • a1、a3、b1、b2、a2、b3是不能接受的

  • 消费时保证顺序的简单方式就是“什么都不做”,不对收到的消息的顺序进行调整,即只要一个分区的消息只由一个线程处理即可;当然,如果a、b在一个分区中,在收到消息后也可以将他们拆分到不同线程中处理,不过要权衡一下收益

开源RocketMQ中顺序的实现

在这里插入图片描述
上图是RocketMQ顺序消息原理的介绍,将不同订单的消息路由到不同的分区中。文档只是给出了Producer顺序的处理,Consumer消费时通过一个分区只能有一个线程消费的方式来保证消息顺序,具体实现如下。

Producer端

Producer端确保消息顺序唯一要做的事情就是将消息路由到特定的分区,在RocketMQ中,通过MessageQueueSelector来实现分区的选择。
在这里插入图片描述

  • List mqs:消息要发送的Topic下所有的分区

  • Message msg:消息对象

  • 额外的参数:用户可以传递自己的参数

比如如下实现就可以保证相同的订单的消息被路由到相同的分区:

long orderId = ((Order) object).getOrderId;
return mqs.get(orderId % mqs.size());

Consumer端

RocketMQ消费端有两种类型:MQPullConsumer和MQPushConsumer。

MQPullConsumer由用户控制线程,主动从服务端获取消息,每次获取到的是一个MessageQueue中的消息。PullResult中的List msgFoundList自然和存储顺序一致,用户需要再拿到这批消息后自己保证消费的顺序。

对于PushConsumer,由用户注册MessageListener来消费消息,在客户端中需要保证调用MessageListener时消息的顺序性。RocketMQ中的实现如下:
在这里插入图片描述

  1. PullMessageService单线程的从Broker获取消息

  2. PullMessageService将消息添加到ProcessQueue中(ProcessMessage是一个消息的缓存),之后提交一个消费任务到ConsumeMessageOrderService

  3. ConsumeMessageOrderService多线程执行,每个线程在消费消息时需要拿到MessageQueue的锁

  4. 拿到锁之后从ProcessQueue中获取消息

保证消费顺序的核心思想是:

  • 获取到消息后添加到ProcessQueue中,单线程执行,所以ProcessQueue中的消息是顺序的

  • 提交的消费任务时提交的是“对某个MQ进行一次消费”,这次消费请求是从ProcessQueue中获取消息消费,所以也是顺序的(无论哪个线程获取到锁,都是按照ProcessQueue中消息的顺序进行消费)

顺序和异常的关系

顺序消息需要Producer和Consumer都保证顺序。Producer需要保证消息被路由到正确的分区,消息需要保证每个分区的数据只有一个线程消息,那么就会有一些缺陷:

  • 发送顺序消息无法利用集群的Failover特性,因为不能更换MessageQueue进行重试

  • 因为发送的路由策略导致的热点问题,可能某一些MessageQueue的数据量特别大

  • 消费的并行读依赖于分区数量

  • 消费失败时无法跳过

不能更换MessageQueue重试就需要MessageQueue有自己的副本,通过Raft、Paxos之类的算法保证有可用的副本,或者通过其他高可用的存储设备来存储MessageQueue。

热点问题好像没有什么好的解决办法,只能通过拆分MessageQueue和优化路由方法来尽量均衡的将消息分配到不同的MessageQueue。

消费并行度理论上不会有太大问题,因为MessageQueue的数量可以调整。

消费失败的无法跳过是不可避免的,因为跳过可能导致后续的数据处理都是错误的。不过可以提供一些策略,由用户根据错误类型来决定是否跳过,并且提供重试队列之类的功能,在跳过之后用户可以在“其他”地方重新消费到这条消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/313454.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何摆脱「技术思维」的惯性?

大家好,我是Z哥。虽然从标题上看,这篇文章是写给“技术人”的,但从广义上来说,只要你是一位以理性见长的人,那么这篇文章要讲的东西可能会与你有关。先问大家一个问题。如果你现在打算做一件事A,它的目的是…

RocketMq重试及消息不丢失机制

1、消息重试机制 由于MQ经常处于复杂的分布式系统中,考虑网络波动、服务宕机、程序异常因素,很有可能出现消息发送或者消费失败的问题。因此,消息的重试就是所有MQ中间件必须考虑到的一个关键点。如果没有消息重试,就可能产生消息…

cmake编译opencv3.0

本文参照了 http://www.huqiwen.com/2012/11/27/compile-opencv-243-in-visual-studio-2012/ 安装CMake 从CMake的官方网站下载最新版的CMake。http://www.cmake.org/cmake/resources/software.html,选择Windows (Win32 Installer)平台的进行下载。 安装时请勾选…

【 .NET Core 3.0 】框架之五 || JWT权限验证

前言关于JWT一共三篇 姊妹篇,内容分别从简单到复杂,一定要多看多想:一、Swagger的使用 3.3 JWT权限验证【修改】二、解决JWT权限验证过期问题三、JWT完美实现权限与接口的动态分配这里一共三个文章,目前是第一篇,剩下两…

OpenCV Stitching 工程搭建

转自http://www.tuicool.com/articles/fMbUfaF Opencv中提供Stitcher类,实现了多图像自动拼接,Opencv是开源的,程序实现的源代码都在Opencv安装文件中,以及Opencv提供的函数查询手册和Opencv教程都可以在…

asp.net core 3.0 更新简记

asp.net core 3.0 更新简记Intro最近把活动室预约项目从 asp.net core 2.2 更新到了 asp.net core 3.0,记录一下,升级踩过的坑以及经验总结,包括但不限于TargetFramework ( netcoreapp2.2 需要更新为 netcoreapp3.0)DependencyHost/Environme…

kafka吞吐量高的原因

kafa 吞吐量高的原因 1、顺序读写 kafka的消息是不断追加到文件中的,这个特性使kafka可以充分利用磁盘的顺序读写性能 顺序读写不需要硬盘磁头的寻道时间,只需很少的扇区旋转时间,所以速度远快于随机读写 2、零拷贝 在Linux kernel2.2 之…

【 .NET Core 3.0 】框架之三 || swagger的使用

一、为什么使用Swagger上文中已经说到,单纯的项目接口在前后端开发人员使用是特别不舒服的,那所有要推荐一个,既方便又美观的接口文档说明框架,当当当,就是Swagger,随着互联网技术的发展,现在的…

MQ问题集(kafka主从同步与高可用,MQ重复消费、幂等)

1、kafka主从同步与高可用 https://1028826685.iteye.com/blog/2354570 http://developer.51cto.com/art/201808/581538.htm 2、MQ有可能发生重复消费,如何避免,如何做到幂等 2.1 MQ消息发送 1、发送端MQ-client(消息生产者:Producer)将消…

微软编程题:寻找最小的k个值

转载自:http://blog.csdn.net/v_JULY_v/article/details/6370650 寻找最小的k个数 题目描述:5.查找最小的k个元素 题目:输入n个整数,输出其中最小的k个。 例如输入1,2,3,4,5&#xf…

Bumblebee微服务网关之访问日志处理

记录访问日志可以起到非常重要的作用,它不仅记录了API的使用情况,更可以反映API各种相关数据;通过分析日志可以得到API不同时间的负载情况,访问效率和流量分布,更进一步还能分析出用户的操作历史和行为这是非常有价值的…

负载均衡基础

1、什么是负载均衡(Load balancing) 在网站创立初期,我们一般都使用单台机器对台提供集中式服务,但是随着业务量越来越大,无论是性能上还是稳定性上都有了更大的挑战。这时候我们就会想到通过扩容的方式来提供更好的服…

Bumblebee微服务网关之并发限制

对于服务应用来说支持的并发越高越好,但很多时候资源有限,超负载的并发则会给整体应用带来更大的危险性(更何况有些并发来源是恶意的)。作为微服务网关应该具有一定的挡洪作用,这样可以一定程度保障后台逻辑服务的稳定…

[ASP.NET Core 3框架揭秘] 跨平台开发体验: Mac OS

除了微软自家的Windows平台, .NET Core针对Mac OS以及各种Linux Distribution(RHEL、Ubuntu、Debian、Fedora、CentOS和SUSE等)都提供了很好的支持。我们先来体验一下使用Mac来开发.NET Core应用,在这之前我们照例先得在Mac OS上构…

接雨水

题目描述 给定 n 个非负整数表示每个宽度为 1 的柱子的高度图,计算按此排列的柱子,下雨之后能接多少雨水。 上面是由数组 [0,1,0,2,1,0,1,3,2,1,2,1] 表示的高度图,在这种情况下,可以接 6 个单位的雨水(蓝色部分表示…

使用RabbitMQ实现接口补偿

业务背景在我们的日常开发中,经常需要调用第三方接口来进行数据传递,在调用接口的过程中,会因为各种原因导致调用的失败。这时我们希望能有一种机制实现对失败的接口的重复调用,并且能够实现人工干预。实现思路1、当接口调用失败&…

集群环境下,你不得不注意的ASP.NET Core Data Protection 机制

引言最近线上环境遇到一个问题,就是ASP.NET Core Web应用在单个容器使用正常,扩展多个容器无法访问的问题。查看容器日志,发现以下异常:System.Security.Cryptography.CryptographicException: The key {efbb9f35-3a49-4f7f-af19-…

.NET斗鱼直播弹幕客户端(上)

前言现在直播平台由于弹幕的存在,主播与观众可以更轻松地进行互动,非常受年轻群众的欢迎。斗鱼TV就是一款非常流行的直播平台,弹幕更是非常火爆。看到有不少主播接入 弹幕语音播报器、 弹幕点歌等模块,这都需要首先连接斗鱼弹幕。…

程序员后期,架构师发展路线!

作者:zollty,资深程序员和架构师,私底下是个爱折腾的技术极客,架构师社区合伙人!我总结了3个阶段。先说一下各个阶段的感受:1、系统架构阶段:系统架构实际上包括了 业务功能架构 和 技术功能架构。业务上&a…

YUV格式学习

转载自http://blog.csdn.net/searchsun/article/details/2443867 YUV是指亮度参量和色度参量分开表示的像素格式,而这样分开的好处就是不但可以避免相互干扰,还可以降低色度的采样率而不会对图像质量影响太大。YUV是一个比较笼统地说法,针对它…