消息中间件核心实体(1)

接上一篇《消息中间件核心实体(0)》,这一篇继续介绍消息中间件中的一些实体。

上一篇主要是Message、Topic、TopicMeta和Queue这样最基础的实体,这几篇介绍一些发送和消费的过程中会涉及到的实体和组件。

1. 发送

1.1 增强Message属性

Message一般只包含topic、tag、content这些属性,这些属性也是使用方在发送时会涉及到的内容。但是光有这些属性往往是不够的,比如我们会需要记录产生这条消息的Producer的信息;记录消息的产生时间和产生的IP信息等等。这些信息都是在Client中给消息附加上去的,对发送方来说是透明的,所以不会在Message实体中暴露,而是我们会增加一个实体:EnhancedMessage。

EnhancedMessage继承自Message,并会增加一些如下的属性:

  • bornTime

  • bornAddress

  • producer

  • etc

引申一点,Producer发送消息的大致过程如下:

  1. 增强Message属性,得到EnhancedMessage的实例

  2. 获取可以写入的队列(也可以理解成获取分区)

  3. 向队列写入消息(可以是队列暴露写入接口或者由专门的写入工具写入到队列中)

伪代码:

EnhancedMessage msg = enhance(message);
// 根据消息选择一个可以写入的目标队列
WritableQueue queue = router.select(msg);
// 写入消息(queue实现write方法进行写入)
Result result = queue.write(msg);// write过程
// 将消息序列化成自定义协议的网络包
Packet messagePacket = Serializer.encode(msg);
// 发送网络包
bootstrap.write(messagePacket);

上面的WritableQueue暴露了API去写入,具体实现可以是写入到网络,即远端的一个Partition。而在做单元测试或者本地测试的时候,可以覆盖write的实现,而不用真正写入到网络中,这会使代码更容易测试测试。

上面两幅图是Rocket开源版本中发送相关的一些代码,私以为这段代码非常的不优雅,读起来特别累,特别是requestHeader的各种属性设置。

这段是Rocket开源版本中真正将消息写入到网络的实现,看起来总是非常臃肿,另外不知道是如何mock这些实现以达到在本地做测试的目的的。

1.2 Queue的路由选择

发送过程中会涉及到队列的选择(分区的选择),一条消息最终会根据一定的策略落到一个分区中,这里需要一个组件来完成选择(把这个组件单独抽象出来,这样便于控制写入的目标来进行测试,抽象出来也可以由使用方来实现,这样可以按照使用方自己的场景做特定的路由)。

路由组件非常的简单,一般是Router会根据topic获取到topic的元数据(元数据包含了多有分区的信息),然后根据消息的属性或者用户的参数计算出落到哪个分区,比如可以根据用户的参数对分区总数取模来选择分区,这样可以做到将某一类消息发送到一个分区,比如同一个用户的消息或同一笔订单的不同消息。

这个组件会比较简单,但是在集成的时候需要注意一点,这个组件用户可以自己注入到Producer中来达到控制分区选择策略的目的。

RocketMQ在TopicPublishInfo中实现分区的选择,TopicPublishInfo包含了队列信息(List<MessageQueue> messageQueueList属性),笔者更倾向于抽象出独立的路由组件,以便在特定的场景用户可以自己实现路由,或者在测试时可以做到使用特定路由规则。

2. 消费

消费可以分为多种方式,从获取消息的方式上可以分为Pull和Push两种类型的Consumer;从消费消息的方式上可以分为集群消费和广播消费。这里不展开讨论各种模式的实现(以后单独会讨论Consumer该实现那些内容),会以Push模式&集群消费的Consumer为例,把消费流程中涉及到的一些组件进行介绍。

2.1 分配分区

集群消费中需要保证每个分区有且只有一个Consumer在进行消费。如果某个分区没有Consumer消费,那么使用方拿不到完整的数据;如果某个分区被两个Consumer消费,那么会产生大量的重复消息。所以这里需要实现一个分区分配策略,使在分布式环境中,每个Consumer拿到属于自己的分区,且相互交叉。下面是四个分区两个Consumer默认情况下的分配结果。

实现的策略一般是:

  1. 拿到一个Topic所有的分区,对这个列表进行排序

  2. 拿到当前所有的Consumer,对Consumer列表进行排序

  3. 根据自己所处的Consumer列表的位置和Consumer总数,从分区列表中获取对应的一部分

每个分区和Consumer都有唯一的ID,这样各自按照排序后的结果进行分配,可以达到相互不交叉且不遗漏的目的。(在Consumer总数或分区数发生变化的过程中可能分配结果不正确,这个过程是短暂的,且在消费时还会结合锁去保证分区只有一个Consumer消费,所以不会对实际消费产生影响)。

同样记住一点,这个分配策略是需要暴露出去的,系统可以默认实现集群消费和广播消费的基础策略,用户可以实现自己的分配策略注入到系统中。

2.2 消息缓存

消费端一个重要的组件是消息缓存。为了提升性能,在消费端消息的获取和消息的消费是异步的。Consumer内部有线程专门从服务端获取消息写入到消息缓存中,另外有线程从缓存中获取消息调用用户的回调接口来执行业务操作。

消息缓存除了提供基础的put和take来实现存入消息和取出消息,还需要自身容量,水位控制等配置。

本身Buffer不是很复杂的部分,但是需要考虑一些流控策略,比如Buffer使用率到多少时降低从服务端获取数据的频率。

RocketMQ中实现消息缓存由ProcessQueue实现,笔者倾向于独立出Buffer模块,另外Buffer需要提供锁,以实现顺序消费。

2.3 消费进度

还有一个重要的实体是消费进度,系统需要记录“每个”Consumer的消费进度,且这个数据需要被持久化。

消费进度需要记录某个Group对某个Topic的某个分区的消费位点。进度是按照Topic维度去组织的(持久化在服务端),结构如下:

topicgroup0cursor0、cursor1、cursor2...group1...实现的对象应该是:
class Cursors {String topic;Cursor cursor;class Cursor {String group;// 用数组来存储一个group消费的一个topic的所有分区的进度// 分区数一般情况下不会变更(变更场景很少),用数据就可以long[] cursors;}
}

Consumer可以在每一次获取消息时将消费进度提交到服务端,在服务端来更新Cursors内部的数据。

3. 结语

最近两篇内容将一些基础实体和组件简单的介绍了一下,下一篇讨论一下消息应该由Server Push给Consumer还是Consumer主动来Pull消息。

往期文章:

消息中间件核心实体(0)

消息的写入和读取流程

NameServer模块划分

Client模块划分

Broker模块划分

消息中间件架构讨论

业务方对消息中间件的需求

消息中间件中的一些概念

什么是分布式消息中间件?

欢迎关注公众号来交流MQ相关问题。

 

转载于:https://www.cnblogs.com/hzmark/p/mq_entity_1.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/467746.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

sklearn(2

算法库顶层设计 SKLEARN监督学习模块 近邻算法 neighbors支持向量机SVM岭回归 kernal_ridge判别分析discriminant_analysis广义线性模型linear_model集成方法ensemble决策树tree朴素贝叶斯naive_bayes交叉分解cross_decompostition高斯过程gaussian_process多层神经网络neural…

该死的装修

周末了&#xff0c;没有写技术文~因为最近的新房子需要装修&#xff0c;谈了几个装修公司&#xff0c;不知道为什么&#xff0c;我对装修这个事情非常抵触&#xff0c;繁琐的事情太多了&#xff0c;还没有装修&#xff0c;我就觉得有很多事情要斗争。因为家里有小孩&#xff0c…

SKLEARN模型选择

数据集划分方法 K折交叉验证法 将全部训练集S分成k个不相交的子集&#xff0c;假设S中的训练样例子数为m&#xff0c;那么每一个子集有m/k个训练样例&#xff0c;相应子集称作{s1,s2……sk}每次从分好的子集里面&#xff0c;拿一个作为测试集&#xff0c;其他k-1作为训练集在k…

被该死的Openssl编译嘲讽了一个下午

昨晚比较尴尬&#xff0c;下班的时候已经很晚了&#xff0c;到半路突然想起来没有带钥匙回去&#xff0c;赶紧折返回公司拿钥匙&#xff0c;在已经很晚的路上我们又多花费了20分钟&#xff0c;到家已经挺晚了&#xff0c;具体多少点我都不好意思说&#xff0c;我要是说出来&…

JavaScript中本地对象、内置对象和宿主对象

http://www.jianshu.com/p/a52e6e183427 http://blog.csdn.net/weiyastory/article/details/52837466 http://www.cnblogs.com/qigang/p/3520974.html 在ECMAScript中&#xff0c;所有对象并非同等创建的。 一般来说&#xff0c;可以创建并使用的对象有3种&#xff1a;本地对象…

类的真正形态

类的真正形态 类的关键字 struct在C语言中已经有了自己的定义&#xff0c;必须继续兼容 在C中提供了新的关键字&#xff0c;class用于类定义 class和struct用法是完全相同 struct和class区别 用struct定义时&#xff0c;所有成员默认为public 用class定义时&#xff0c;所…

分类器

sklearn分类器性能指标 分类器性能评估指标 只限于二元单元标签分类问题 可用于二元多标签分类问题的评估指标 如何将二元分类指标扩展到多类或多标签问题中去 可用于多类分类问题的评估指标(紫色的可用于多标签问题&#xff09; accuracy_score准确率 函数计算分类…

Cucumber+Rest Assured快速搭建api自动化测试平台

转载&#xff1a;http://www.jianshu.com/p/6249f9a9e9c4 什么是Cucumber&#xff1f;什么是BDD&#xff1f;这里不细讲&#xff0c;不懂的直接查看官方&#xff1a;https://cucumber.io/  什么是Rest Assured&#xff1f;传送门&#xff1a;https://github.com/rest-assured…

让这个该死的服务跑起来了~

#前言被该死的Openssl编译嘲讽了一个下午之前的文章说了我的那个编译的问题&#xff0c;这里说下&#xff0c;知识点有点零散&#xff0c;最后的解决也是一个同事提示&#xff0c;感觉也有点奇葩的赶脚。到目前为止&#xff0c;我现在感受到了写文章的好处&#xff0c;昨晚的问…

深度学习概述

深度学习 传统学习与深度学习 深度学习应用特点 深度学习框架比较 Tensorflow 神经元 卷积核 分类 回归 生成

Python--递归

面向函数编程 def func():print(从前有座山&#xff0c;山里有座庙&#xff0c;庙里有个老和尚讲故事&#xff0c;讲的什么呀&#xff1f;)func() 解耦&#xff1a;尽量把不相关的功能拆开&#xff0c;用的时候再调用函数&#xff0c;增强代码重用性&#xff0c;减少代码变更的…

传统神经网络

文章目录神经网络的起源&#xff1a;线性回归一个线性回归问题线性回归优化方法&#xff1a;梯度下降梯度计算梯度计算总结线性回归&#xff1a;输出线性回归&#xff1a;局限从线性到非线性非线性激励常用的非线性激励函数tanhRELULecky RELU神经元—神经网络神经网络构建神经…

深度学习之卷积神经网络

文章目录深度学习之卷积神经网络链式反向梯度链式法则的计算神经网络中链式法则实例二、卷积神经网络-卷积层&#xff08;一&#xff09;什么是卷积层&#xff08;二&#xff09;有什么组成受什么影响&#xff0c;有何特点卷积网络正向传播反向传播卷积和神经网络功能层深度学习…

卷积神经网络高级篇

【 文章目录Alextnet参数计算VGG alexnet增强版VGG参数计算VGG作用GoogleNet 多分辨率融合全卷积神经网络RESNET结构特性有效性结构化图片特殊处理识别效果全局部卷积网络缺陷U-net图片生成网络VGG u-netAlextnet 参数计算 VGG alexnet增强版 VGG参数计算 VGG作用 GoogleNet 多…

LinuxGPIO操作和MTK平台GPIO

GPIO口配置是一个历史性的问题&#xff0c;不管我们使用什么MCU&#xff0c;单片机也好&#xff0c;ARM也好&#xff0c;都离不开驱动GPIO口。Linux下有一个宏&#xff0c;GPIO_GPIO_SYSFS&#xff0c;打开这个宏后&#xff0c;编译烧录到设备端&#xff0c;去看看sys/class/gp…

Linux cpu亲和力

最近在对项目进行性能优化&#xff0c;由于在多核平台上&#xff0c;所以了解了些进程、线程绑定cpu核的问题&#xff0c;在这里将所学记录一下。不管是线程还是进程&#xff0c;都是通过设置亲和性(affinity)来达到目的。对于[进程]的情况&#xff0c;一般是使用sched_setaffi…

卷积神经网络(目标分类)

文章目录目标分类基本框架数据准备数据扩充数据规范模型设计任务分类局部更改训练细节目标分类基本框架 数据准备 现有数据集的子集&#xff0c;网络采集&#xff0c;现有数据人工标注 数据扩充 原始数据切割&#xff0c;噪声颜色等像素变化&#xff0c;旋转平移 数据规范…

安卓9.0添加服务修改SELinux

#文章目录#前言#SELinux来源#SELinux基本框架#SELinux 在不同版本的表现#使用audit2allow工具生成SELinux 权限#完整代码#前言先推荐下之前的SELinux文章&#xff0c;但是那个是7.1的&#xff0c;在9.0上已经在差别很大的了。Android7.1 在init.rc 添加shell服务题外话~在企业里…

卷积神经网络-目标探测

文章目录目标探测介绍任务思路DPMRCNN1&#xff09;候选区域选择2&#xff09;CNN特征提取3&#xff09;分类与边界回归R-CNN总结优点缺陷FAST-RCNNFASTER-RCNNYOLO目标探测介绍 任务 分类获取坐标 目标探测 图片分割 思路 回归问题&#xff1a;利用神经网络进行目标识别&am…

相机视场角和焦距_镜头小讲堂(一)镜头的焦距

在刚购买完相机的时候&#xff0c;我们久会考虑需要什么样的镜头来配合机身来使用。而市场上的镜头种类是在太多了&#xff0c;所以我们就要学习了解下镜头都有哪些种类&#xff0c;选择哪种镜头比较有优势。这也是学习摄影必备的基础知识。01 镜头的焦距焦距是镜头的重要指标&…