jmeter线程数并发数区别_如何确定Kafka的分区数、key和consumer线程数、以及不消费问题解决...

在Kafak中国社区的qq群中,这个问题被提及的比例是相当高的,这也是Kafka用户最常碰到的问题之一。本文结合Kafka源码试图对该问题相关的因素进行探讨。希望对大家有所帮助。

怎么确定分区数?

“我应该选择几个分区?”——如果你在Kafka中国社区的群里,这样的问题你会经常碰到的。不过有些遗憾的是,我们似乎并没有很权威的答案能够解答这样的问题。其实这也不奇怪,毕竟这样的问题通常都是没有固定答案的。Kafka官网上标榜自己是"high-throughput distributed messaging system",即一个高吞吐量的分布式消息引擎。那么怎么达到高吞吐量呢?Kafka在底层摒弃了Java堆缓存机制,采用了操作系统级别的页缓存,同时将随机写操作改为顺序写,再结合Zero-Copy的特性极大地改善了IO性能。但是,这只是一个方面,毕竟单机优化的能力是有上限的。如何通过水平扩展甚至是线性扩展来进一步提升吞吐量呢? Kafka就是使用了分区(partition),通过将topic的消息打散到多个分区并分布保存在不同的broker上实现了消息处理(不管是producer还是consumer)的高吞吐量。

Kafka的生产者和消费者都可以多线程地并行操作,而每个线程处理的是一个分区的数据。因此分区实际上是调优Kafka并行度的最小单元。对于producer而言,它实际上是用多个线程并发地向不同分区所在的broker发起Socket连接同时给这些分区发送消息;而consumer呢,同一个消费组内的所有consumer线程都被指定topic的某一个分区进行消费(具体如何确定consumer线程数目我们后面会详细说明)。所以说,如果一个topic分区越多,理论上整个集群所能达到的吞吐量就越大。

但分区是否越多越好呢?显然也不是,因为每个分区都有自己的开销:

一、客户端/服务器端需要使用的内存就越多

先说说客户端的情况。Kafka 0.8.2之后推出了Java版的全新的producer,这个producer有个参数batch.size,默认是16KB。它会为每个分区缓存消息,一旦满了就打包将消息批量发出。看上去这是个能够提升性能的设计。不过很显然,因为这个参数是分区级别的,如果分区数越多,这部分缓存所需的内存占用也会更多。假设你有10000个分区,按照默认设置,这部分缓存需要占用约157MB的内存。而consumer端呢?我们抛开获取数据所需的内存不说,只说线程的开销。如果还是假设有10000个分区,同时consumer线程数要匹配分区数(大部分情况下是最佳的消费吞吐量配置)的话,那么在consumer client就要创建10000个线程,也需要创建大约10000个Socket去获取分区数据。这里面的线程切换的开销本身已经不容小觑了。

服务器端的开销也不小,如果阅读Kafka源码的话可以发现,服务器端的很多组件都在内存中维护了分区级别的缓存,比如controller,FetcherManager等,因此分区数越多,这种缓存的成本越久越大。

二、文件句柄的开销

每个分区在底层文件系统都有属于自己的一个目录。该目录下通常会有两个文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager会为每个broker都保存这两个文件句柄(file handler)。很明显,如果分区数越多,所需要保持打开状态的文件句柄数也就越多,最终可能会突破你的ulimit -n的限制。

三、降低高可用性

Kafka通过副本(replica)机制来保证高可用。具体做法就是为每个分区保存若干个副本(replica_factor指定副本数)。每个副本保存在不同的broker上。期中的一个副本充当leader 副本,负责处理producer和consumer请求。其他副本充当follower角色,由Kafka controller负责保证与leader的同步。如果leader所在的broker挂掉了,contorller会检测到然后在zookeeper的帮助下重选出新的leader——这中间会有短暂的不可用时间窗口,虽然大部分情况下可能只是几毫秒级别。但如果你有10000个分区,10个broker,也就是说平均每个broker上有1000个分区。此时这个broker挂掉了,那么zookeeper和controller需要立即对这1000个分区进行leader选举。比起很少的分区leader选举而言,这必然要花更长的时间,并且通常不是线性累加的。如果这个broker还同时是controller情况就更糟了。

说了这么多“废话”,很多人肯定已经不耐烦了。那你说到底要怎么确定分区数呢?答案就是:视情况而定。基本上你还是需要通过一系列实验和测试来确定。当然测试的依据应该是吞吐量。虽然LinkedIn这篇文章做了Kafka的基准测试,但它的结果其实对你意义不大,因为不同的硬件、软件、负载情况测试出来的结果必然不一样。我经常碰到的问题类似于,官网说每秒能到10MB,为什么我的producer每秒才1MB? —— 且不说硬件条件,最后发现他使用的消息体有1KB,而官网的基准测试是用100B测出来的,因此根本没有可比性。不过你依然可以遵循一定的步骤来尝试确定分区数:创建一个只有1个分区的topic,然后测试这个topic的producer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位可以是MB/s。然后假设总的目标吞吐量是Tt,那么分区数 = Tt / max(Tp, Tc)

Tp表示producer的吞吐量。测试producer通常是很容易的,因为它的逻辑非常简单,就是直接发送消息到Kafka就好了。Tc表示consumer的吞吐量。测试Tc通常与应用的关系更大, 因为Tc的值取决于你拿到消息之后执行什么操作,因此Tc的测试通常也要麻烦一些。

另外,Kafka并不能真正地做到线性扩展(其实任何系统都不能),所以你在规划你的分区数的时候最好多规划一下,这样未来扩展时候也更加方便。

消息-分区的分配

默认情况下,Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions,如下图所示:

def partition(key: Any, numPartitions: Int): Int = {Utils.abs(key.hashCode) % numPartitions
}

这就保证了相同key的消息一定会被路由到相同的分区。如果你没有指定key,那么Kafka是如何确定这条消息去往哪个分区的呢?

if(key == null) {  // 如果没有指定keyval id = sendPartitionPerTopicCache.get(topic)  // 先看看Kafka有没有缓存的现成的分区Idid match {case Some(partitionId) =>  partitionId  // 如果有的话直接使用这个分区Id就好了case None => // 如果没有的话,val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)  //找出所有可用分区的leader所在的brokerif (availablePartitions.isEmpty)throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)val index = Utils.abs(Random.nextInt) % availablePartitions.size  // 从中随机挑一个val partitionId = availablePartitions(index).partitionIdsendPartitionPerTopicCache.put(topic, partitionId) // 更新缓存以备下一次直接使用partitionId}}

可以看出,Kafka几乎就是随机找一个分区发送无key的消息,然后把这个分区号加入到缓存中以备后面直接使用——当然了,Kafka本身也会清空该缓存(默认每10分钟或每次请求topic元数据时)

如何设定consumer线程数

我个人的观点,如果你的分区数是N,那么最好线程数也保持为N,这样通常能够达到最大的吞吐量。超过N的配置只是浪费系统资源,因为多出的线程不会被分配到任何分区。让我们来看看具体Kafka是如何分配的。

topic下的一个分区只能被同一个consumer group下的一个consumer线程来消费,但反之并不成立,即一个consumer线程可以消费多个分区的数据,比如Kafka提供的ConsoleConsumer,默认就只是一个线程来消费所有分区的数据。——其实ConsoleConsumer可以使用通配符的功能实现同时消费多个topic数据,但这和本文无关。

再讨论分配策略之前,先说说KafkaStream——它是consumer的关键类,提供了遍历方法用于consumer程序调用实现数据的消费。其底层维护了一个阻塞队列,所以在没有新消息到来时,consumer是处于阻塞状态的,表现出来的状态就是consumer程序一直在等待新消息的到来。——你当然可以配置成带超时的consumer,具体参看参数consumer.timeout.ms的用法。

下面说说 Kafka提供的两种分配策略: range和roundrobin,由参数partition.assignment.strategy指定,默认是range策略。本文只讨论range策略。所谓的range其实就是按照阶段平均分配。举个例子就明白了,假设你有10个分区,P0 ~ P9,consumer线程数是3, C0 ~ C2,那么每个线程都分配哪些分区呢?

C0 消费分区 0, 1, 2, 3

C1 消费分区 4, 5, 6

C2 消费分区 7, 8, 9

具体算法就是:

val nPartsPerConsumer = curPartitions.size / curConsumers.size // 每个consumer至少保证消费的分区数
val nConsumersWithExtraPart = curPartitions.size % curConsumers.size // 还剩下多少个分区需要单独分配给开头的线程们
...
for (consumerThreadId <- consumerThreadIdSet) {   // 对于每一个consumer线程val myConsumerPosition = curConsumers.indexOf(consumerThreadId)  //算出该线程在所有线程中的位置,介于[0, n-1]assert(myConsumerPosition >= 0)
// startPart 就是这个线程要消费的起始分区数val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
// nParts 就是这个线程总共要消费多少个分区val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
...
}

针对于这个例子,nPartsPerConsumer就是10/3=3,nConsumersWithExtraPart为10%3=1,说明每个线程至少保证3个分区,还剩下1个分区需要单独分配给开头的若干个线程。这就是为什么C0消费4个分区,后面的2个线程每个消费3个分区,具体过程详见下面的Debug截图信息:
ctx.myTopicThreadIds

8a66c188f0af9baec18cc2d8e3704eee.png

nPartsPerConsumer = 10 / 3 = 3

nConsumersWithExtraPart = 10 % 3 = 1

0aa3a162dfa3df6e83e35d8ae1e2198b.png

第一次:

myConsumerPosition = 1

startPart = 1 * 3 + min(1, 1) = 4 ---也就是从分区4开始读

nParts = 3 + (if (1 + 1 > 1) 0 else 1) = 3 读取3个分区, 即4,5,6

第二次:

myConsumerPosition = 0

startPart = 3 * 0 + min(1, 0) =0 --- 从分区0开始读

nParts = 3 + (if (0 + 1 > 1) 0 else 1) = 4 读取4个分区,即0,1,2,3

第三次:

myConsumerPosition = 2

startPart = 3 * 2 + min(2, 1) = 7 --- 从分区7开始读

nParts = 3 + if (2 + 1 > 1) 0 else 1) = 3 读取3个分区,即7, 8, 9

至此10个分区都已经分配完毕

c5b34dcc968c807d3e782b4998e33f64.png

说到这里,经常有个需求就是我想让某个consumer线程消费指定的分区而不消费其他的分区。坦率来说,目前Kafka并没有提供自定义分配策略。做到这点很难,但仔细想一想,也许我们期望Kafka做的事情太多了,毕竟它只是个消息引擎,在Kafka中加入消息消费的逻辑也许并不是Kafka该做的事情。

不消费问题

第一步:参看消费者的基本情况
查看mwbops系统,【Consumer监控】-->【对应的consumerId】

如果offset数字一直在动,说明一直在消费,说明不存在问题,return;

如果offset数字一直不动,看Owner是不是有值存在

如果Owner是空,说明消费端的程序已经跟Kafka断开连接,应该排查消费端是否正常,return;

如果Owner不为空,就是有上图上面的类似于 bennu_index_benuprdapp02-1444748505181-f558155a-0 的文字,继续看下面内容

第二步:查看消费端的程序代码

一般的消费代码是这样的

看看自己的消费代码里面,存不存在处理消息的时候出异常的情况

如果有,需要try-catch一下,其实不论有没有异常,都用try-catch包一下最好,如下面代码

return;

原因:如果在处理消息的时候有异常出现,又没有进行处理,那么while循环就会跳出,线程会结束,所以不会再去取消息,就是消费停止了。

第三步:查看消费端的配置

消费代码中一般以以下方式创建Consumer

消费端有一个配置,叫 fetch.message.max.bytes,默认是1M,此时如果有消息大于1M,会发生停止消费的情况。

此时,在配置中增加 props.put("fetch.message.max.bytes", "10 * 1024 * 1024"); 即可

return;

原因:目前Kafka集群配置的运行最大的消息大小是10M,如果客户端配置的运行接收的消息是1M,跟Kafka服务端配置的不一致,

则消息大于1M的情况下,消费端就无法消费,导致一直卡在这一条消息,现象就是消费停止。

技术的提升是需要下苦工,需要坚持不懈的努力。就比如下面的分享的这些技术点,是否都学会并掌握了呢?如需要以下图谱以及跟多提升架构技术的资源可加入我的粉丝Qqun:855801563。我花了将近一个月时间搜集整理了一套架构技术提升知识点讲解以及一些面试题解析和答案免费分享给大家。助力各位程序员朋友突破自我提升技能,实现自己的目标。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/340497.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

插入排序java_「Java」各类排序算法

排序大的分类可以分为两种&#xff1a;内排序和外排序。在排序过程中&#xff0c;全部记录存放在内存&#xff0c;则称为内排序&#xff0c;如果排序过程中需要使用外存&#xff0c;则称为外排序。下面讲的排序都是属于内排序。内排序有可以分为以下几类&#xff1a;(1) 插入排…

华为光伏usb适配器_华为系列原装充电器拆解第三弹:比亚迪版华为10W充电器

在对华为18W充电器的比亚迪版和赛尔康版进行拆解之后&#xff0c;充电头网今天继续为大家带来华为10W充电器的比亚迪版和达宏版的拆解。这两种10W规格的华为充电器外观延续了华为原装充电器的风格&#xff0c;而且型号也是一样的。那么&#xff0c;我们先一起来看看比亚迪版华为…

JMetro版本11.5.10和8.5.10发布

在这里&#xff0c;我们再次使用JMetro的另一个版本。 此版本中的新增功能&#xff1a; 工具栏内控件的新样式 新的可编辑组合框样式 对其他样式的一些调整 一些修复 继续阅读以获取详细信息。 可编辑的ComboBox新样式 JMetro早期版本的可编辑ComboBox看起来非常糟糕&am…

1s后跳转 android_优雅保活方案,原来Android还可以这样保活

作者&#xff1a;NanBox保活现状我们知道&#xff0c;Android 系统会存在杀后台进程的情况&#xff0c;并且随着系统版本的更新&#xff0c;杀进程的力度还有越来越大的趋势。系统这种做法本身出发点是好的&#xff0c;因为可以节省内存&#xff0c;降低功耗&#xff0c;也避免…

wordpress致命错误怎么解决_pppoe错误是什么意思 pppoe错误怎么解决

最近有网友反应无线路由器上设置PPPoE拨号上网后&#xff0c;发现PPPoE连接不上&#xff0c;显示pppoe错误是什么意思呢?pppoe错误怎么解决呢?接下来详细为大家介绍&#xff1a;pppoe错误怎么解决无线路由器设置PPPoE拨号后&#xff0c;PPPoE拨号连接不上&#xff0c;不能够上…

java ssm 多租户_(十一)java B2B2C 源码 多级分销springmvc mybatis多租户电子商城系统- SSO单点登录之OAuth2.0登录流程(2)...

上一篇是站在巨人的肩膀上去研究OAuth2.0&#xff0c;也是为了快速帮助大家认识OAuth2.0&#xff0c;闲话少说&#xff0c;我根据框架中OAuth2.0的使用总结&#xff0c;画了一个简单的流程图(根据用户名密码实现OAuth2.0的登录认证)&#xff1a;上面的图很清楚的描述了当前登录…

几何画板200个经典课件_项目制学科联动 | 金芬娥首席工作室:灵动“画板”,研修创新,协同进步...

西湖区成立115个“项目制首席教师工作室”&#xff0c;建立中小学、幼儿园学科联动机制&#xff0c;以专业发展为目标&#xff0c;以教育问题为导向&#xff0c;整合发挥学科教研员、学科带头人和名师工作室领衔人的智力资源&#xff0c;助推教师的专业成长及区域的学科建设。西…

通过这些简单的步骤从头开始学习Java

Java是用于软件开发的最流行的编程语言之一。 无论您的最终目标或技能水平如何&#xff0c;学习和掌握Java都将为您作为开发人员打开大门。 今天&#xff0c;我们将讨论一些原因&#xff0c;我们认为您应该开始学习Java&#xff0c;然后提供有关入门的深入路线图。 为什么要学…

vs 服务容器中已存在服务_敏捷基础设施和公共基础服务

敏捷基础设施和公共基础服务敏捷基础设施和公共基础服务是微服务架构的有力支撑&#xff1b;能够简化业务开发&#xff0c;提升架构能力的基线。Cloud Native的基石是微服务架构、敏捷基础设施和公共基础服务。敏捷基础设施 - 通过容器封装环境&#xff0c;开发人员可以直接将所…

使用php吧excel数据存到数据库,php如何存excel数据到数据库

一、使用PHPExcel Parser Pro软件&#xff0c;但是这个软件为收费软件&#xff1b;二、可将EXCEL表保存为CSV格式&#xff0c;然后通过phpmyadmin或者SQLyog导入&#xff0c;SQLyog导入的方法为&#xff1a;将EXCEL表另存为CSV形式&#xff1b;打开SQLyog&#xff0c;对要导入的…

sle linux lftp禁止匿名登陆_软件测试常用linux命令整理

作为一个名软件测试工程师&#xff0c;掌握Linux的基本操作是必须的。下面罗列下linux的常用命令&#xff0c;方便大家今后操作linux时查找&#xff0c;然后通过[帮助命令]进行具体的使用。1、帮助命令man -- man 命令 查看命令的使用帮助说明。2、显示目录和文件的命令ls --…

用php模拟斗地主发牌,php模拟实现斗地主发牌

本文实例为大家分享了php实现斗地主发牌的具体代码&#xff0c;供大家参考&#xff0c;具体内容如下闲来无聊&#xff0c;就写了这个方法&#xff0c;也算是熟悉下php的数组操作&#xff0c;还请各位大神多指教。$arr 数组&#xff0c;好像有点问题&#xff0c;应该 2>"…

如何在AWS EC2实例上部署Spring Boot应用程序

你好朋友&#xff0c; 在本教程中&#xff0c;我们将看到如何在AWS EC2实例上部署Spring Boot应用程序。 这是我们将要执行的步骤。 1.使用Spring Boot Initialiser创建一个Spring Boot项目。 2.创建一个休息端点&#xff0c;部署后我们可以访问 3.启动EC2实例 4.将我们的…

成为Java流大师–第3部分:终端操作

比尔盖茨曾经说过&#xff1a;“我选择一个懒惰的人去做一件困难的事情&#xff0c;因为一个懒惰的人会找到一个简单的方法来做。” 关于流&#xff0c;没有什么比这更真实了。 在本文中&#xff0c;您将学习Stream如何通过在调用终端操作之前不对源元素执行任何计算来避免不必…

matlab多径信道模型,基于matlab的无线多径信道建模与仿真分析

基于matlab的无线多径信道建模与仿真分析 基于MATLAB的无线多径信道建模与仿真分析 摘 要:对于无线通信, 衰落是影响系统性能的重要因素, 而不同形式的衰落对于信号产生的影响 也不相同。本文在阐述移动多径信道特性的基础上, 建立了不同信道模型下多径时延效应的计算 机仿真模…

您的JVM是否泄漏文件描述符-像我的一样?

前言&#xff1a;此处描述的两个问题是在一年前发现并修复的。 本文仅用作历史证明&#xff0c;也是有关解决Java中文件描述符泄漏的初学者指南。 在Ultra ESB中&#xff0c;我们使用内存RAM磁盘文件缓存来进行快速且无垃圾的有效负载处理。 一段时间以前&#xff0c;我们在共…

螺旋桨设计软件_欧洲斥巨资研发的A400M螺旋桨运输机,为啥就没人买啊?| 图说...

A400M是欧洲自行设计、研制和生产的新一代军用运输机&#xff0c;也是欧盟国家进行合作的最大的武器联合研制项目。A400M最大的特点&#xff0c;就是其标志性的8叶弯刀螺旋桨。A400M也是20世纪后服役的为数不多的几个使用涡轮旋桨发动机的军用运输机之一。A400M曾在系列电影《碟…

JAR文件句柄:烦恼后清理!

在Ultra ESB中&#xff0c;我们使用特殊的热交换类加载器 &#xff0c;该加载器使我们可以按需重新加载Java类。 这使我们能够从字面上热交换我们的部署单元 -加载&#xff0c;卸载&#xff0c;使用更新的类重新加载&#xff0c;以及正常地逐步退出-无需重启JVM。 Windows&…

大气校正后的ndvi_Sentinel2 L1C下载、大气校正、重采样

点击蓝字关注我哦1.基本信息(成像仪/重访周期/波段数/分辨率)哨兵2号是高分辨率多光谱成像卫星&#xff0c;携带一枚多光谱成像仪(MSI)&#xff0c;用于陆地监测&#xff0c;可提供植被、土壤和水覆盖、内陆水路及海岸区域等图像&#xff0c;分为2A和2B两颗卫星,哨兵&#xff0…

php strtotime month bug,处理PHP strtotime的BUG

PHP strtotime的BUG处理最近使用了strtotime结合-1 month, 1 month, next month获取上个月或者下个月的日期&#xff0c;不过刚看到一篇文章&#xff0c;才知道原来使用strtotime直接获取日期还是有点小BUGBUG如日期&#xff1a;$today 2020-12-31;echo date("Y-m-d"…