Kafka 线程模型痛点攻克: 提升分区写入 2 倍性能

01 引言

单分区写入在一些需要全局顺序消息的场景中具备重要应用价值。在一些严格保序场景下,需要将分区数设置为 1,并且只用单个生产者来发送数据,从而确保消费者可以按照原始顺序读取所有数据。此时,Kafka 的单分区写入性能将会决定整个系统的吞吐上限。在我们的实践中发现,Kafka 由于其本身线程模型实现上的制约,并没有将单分区写入性能的极限发挥出来。本文今天将具体解读 Kafka 线程模型的不足以及 AutoMQ 如何对其进行改进优化,从而实现更好的单分区写入性能。

02 Apache Kafka 串行处理模型解析

Apache Kafka 的串行处理模型网络框架主要由 5 个类组成:

1. SocketServer:网络框架的核心类,包含 Acceptor 和 Processor 部分

  • Acceptor:监听端口,处理新建连接请求,并将连接分发给 Processor;
  • Processor:网络线程,通过num.network.threads 配置数量。单个 TCP 连接有且只有一个 Processor 负责,Processor#run 方法驱动连接后续的生命周期管理,从网络解析请求和将响应写入到网络;

2. KafkaChannel:单个 TCP 连接的抽象,维护了连接的状态信息,被Processor持有;

3. RequestChannel:Processor 从网络解析完请求后将请求放到到单队列 RequestChannel 中,再由 KafkaRequestHandler 拉走多线程并发处理;

4. KafkaRequestHandler:业务逻辑处理 / IO 线程,通过 num.io.threads配置数量,从 RequestChannel 获取到请求后,调用 KafkaApis 进行业务逻辑处理;

5. KafkaApis:具体的业务逻辑处理类,会根据请求类型分发到不同的处理方法;
网络框架核心类和类之间的交互,对应到 Apache Kafka 的线程模型如下图:

可以看到 Kafka 的线程模型和我们使用 Netty 开发的服务端程序类似:

 kafka-socket-listener 对应到 Boss EventLoopGroup:负责接受客户端连接。当一个新的连接到来时,Boss EventLoopGroup 会接受连接,并将接受的连接注册到 Worker EventLoopGroup;

 kafka-network-thread 对应到 Worker EventLoopGroup:处理连接的所有 I/O 事件,包括读取数据,写入数据,以及处理连接的生命周期事件;

kafka-request-handler:为了防止业务逻辑阻塞网络线程,通常会将业务逻辑剥离到单独的线程池异步执行;

那为什么称 Apache Kafka 是串行处理模型呢?这就和它的 KafkaChannel mute 状态机有关了,状态机如下图所示:

 接收请求:当 Processor 从网络里解析出一个完整的请求,首先会将请求添加到 RequestChannel 中,然后调用 #mute 方法将 KafkaChannel 从 NOT_MUTE 状态变成 MUTE 状态,并且发送 REQUEST_RECEIVED 事件将状态变更为 MUTE_AND_RESPONSE_PENDING 状态。注意:直到这个请求收到对应的响应之前,Processor 都不会再尝试 NOT_MUTE 状态的连接里面读取更多的请求(Processor#processCompletedReceives);

 返回响应:当 KafkaApis 将请求处理完毕,将响应返回给 KafkaChannel,首先发送 RESPONSE_SENT 事件将状态从MUTE_AND_RESPONSE_PENDING 变更为 MUTE 状态,然后再调用 #unmute 方法将状态变更为 NOT_MUTE,这时候 Processor 才会从该连接里面解析更多的请求(Processor#processNewResponses);

 Qutota 限制:Quota 限制导致的流控流程就不在本文提及了,感兴趣的小伙伴可以深入研究一下 Processor 类;

Apache Kafka 通过 KafkaChannel 的状态机可以保障:对于单个连接,这个连接有且只有一个请求在被处理,等上个请求处理完成响应后,才会继续处理下一个请求。这也是为什么称 Apache Kafka 是串行处理模型。

在消息生产请求场景,假设一个 1MB 消息生产请求的网络解析、校验定序和持久化(ISR 同步/ 刷盘)总共需要 5ms,那么一个连接的处理能力上限为 200 请求/每秒,单生产者单分区的吞吐上限也就为 200MB/s。

以下图为例,即使客户端设置 max.in.flight.requests.per.connection = 5,MSG1 ~MSG4 “同时” 到达服务端,MSG4 也要等待前面 3 个请求都处理完成响应后,才能开始处理,最终 MSG4 的发送耗时为 4T。

既然串行处理模型不是那么高效,为什么 Apache Kafka 要这么设计?

其中一个核心的原因:通过串行处理模型,Apache Kafka 能够较为简单就可以实现单连接请求处理的顺序性。例如在事务中发送多条消息的时候,消息会携带序列号来标识顺序,Broker 会检验持久化消息前会检查请求的序列号是否是依次递增的,如果不是依次递增的话,则返回 OUT_OF_ORDER_SEQUENCE_NUMBER 错误。如果从网络中解析完后并行处理这些请求,就可能导致消息乱序问题。

03 AutoMQ 流水线处理模型

那么有没有既能保证请求处理的顺序性又能高效的方式呢?

首先来看顺序性,Apache Kafka 的顺序性要求体现在 3 个阶段:

1. 网络解析:Kafka 协议是基于 TCP 协议的,那么网络解析必然是顺序 & 串行的,从网络中读取完上个请求的数据才能读取下一个请求;

2. 校验 & 定序:单连接的请求必须要顺序的进行校验 & 定序,要不然就会出现消息乱序问题;

3. 持久化:消息存储在磁盘的顺序必须和消息发送的顺序保持一致;

顺序性总结出来等价于:网络解析串行处理、校验 & 定序串行处理和保序持久化。聪明的读者会发现,“3 个阶段内串行处理”并不等价于“3 个阶段间串行处理”。

那么高效的秘诀就在于如何将这 3 个阶段间进行并行化加速。

因此 AutoMQ 参照 CPU 的流水线将 Kafka 的处理模型优化成流水线模式,兼顾了顺序性和高效两方面:

1. 顺序性:TCP 连接与线程绑定,对于同一个 TCP 连接有且只有一个网络线程在解析请求,并且有且只有一个 RequestHandler 线程在进行业务逻辑处理;

2. 高效:

  • 不同阶段流水线化,网络线程解析完 MSG1 后就可以立马解析 MSG2,无需等待 MSG1 持久化完成。同理 RequestHandler 对 MSG1 进行完校验 & 定序后,立马就可以开始处理 MSG2;

  • 同时为了进一步提高持久化的效率,AutoMQ 还会将数据攒批进行刷盘持久化;

在相同的场景下,原来 Apache Kafka 完成 4 批消息的处理耗时需要 4T,在 AutoMQ 的流水线处理模型下,处理耗时缩短到 1.x T。

接下来再来从实现层面探索一下 AutoMQ 是如何实现流水线处理模型的。

首先是 KafkaChannel 的 mute 状态机做了简化,状态机只保留了两个状态 MUTE 和 NOT_MUTE。相比原来,收到请求后不再 #mute 对应的连接,不再全链路串行处理请求,这样就可以充分利用网络解析层的能力,“源源不断”的从连接中解析新的请求。同时为了支持 Quota 能力和防止过载场景过多 Inflight 的请求导致内存 OOM,新增了 Flag 来标记当前 MUTE 状态的原因,有且仅有 Flag 被清空时,连接才会变回 NOT_MUTE 可读状态。

优化完网络层处理效率的问题,再来看看 3 阶段并行化后,在业务逻辑层如何做到顺序处理。
AutoMQ 将 RequestChannel 进行了多队列改造:

 队列和 KafkaRequestHandler 一一映射,数量保持一致;

 Processor 解析完请求后,根据 hash(channelId) % N 来决定路由到特定的队列;
通过多队列模式,可以做到对于相同连接的请求都被放入相同一个队列,并且只被特定的 KafkaRequestHandler 进行业务逻辑处理,保障了检验 & 定序阶段内部的顺序处理。

同时为了进一步提高持久化的效率,AutoMQ 还会将数据攒批进行刷盘持久化:

 在处理消息生产请求时,KafkaRequestHandler 在进行校验定序后,无需等待数据持久化,即可继续处理下一个请求,提高了业务逻辑处理线程的利用率;

 AutoMQ 后台存储线程会根据攒批大小和攒批时间触发刷盘,并且持久化成功后再异步返回给网络层响应,提升了持久化的效率;

04 优化效果测试

4.1 测试环境准备

为了确保选择合适的 ECS 和 EBS 规格,保证计算和存储本身不会成为瓶颈,本次测试选择了如下的机型和云盘:

 r6i.8xlarge:32C256G、EBS 吞吐基线 1250 MB/s;

 系统盘 EBS 卷:5000 IOPS、吞吐基线 1000 MB/s;
Broker 配置采用log.flush.interval.messages=1 :在硬件规格相同得情况下,通过强制刷盘模拟 Apache Kafka ISR 多 AZ 副本同步延迟,同时对齐 Apache Kafka 和 AutoMQ 的持久化等级;

测试使用的 Kafka 和 AutoMQ 版本如下:

 AutoMQ:1.1.0 https://github.com/AutoMQ/automq/releases/tag/1.1.0-rc0

 Apache Kafka:3.7.0  https://downloads.apache.org/kafka/3.7.0/kafka\_2.13-3.7.0.tgz

4.2 压测脚本

使用 Kafka 自带的工具脚本模拟测试负载

# 压测目标吞吐 350MB/s
bin/kafka-producer-perf-test.sh --topic perf --num-records=480000 --throughput 6000 --record-size 65536 --producer-props bootstrap.servers=localhost:9092 batch.size=1048576 linger.ms=1# 压测目标吞吐 150 MB/s
bin/kafka-producer-perf-test.sh --topic perf --num-records=480000 --throughput 2400 --record-size 65536 --producer-props bootstrap.servers=localhost:9092 batch.size=1048576 linger.ms=1

4.3 测试结果分析

单生产者单分区极限吞吐性能测试对比如下。从测试的结果列表中我们可以看到:

 AutoMQ 的极限吞吐是 Apache Kafka 的 2 倍,达到了 350MB/s;

 AutoMQ 在极限吞吐下的 P99 延时是 Apache Kafka 的 1 / 15,仅为 11ms;

05 结语

AutoMQ 通过网络处理模型的优化,将 Apache Kafka 的串行处理模型优化成了流水线处理模型,使得单分区的写入性能获得了成倍的性能提升,从而让单分区全局顺序消息可以满足更多场景的性能要求。尽管 AutoMQ 通过流水线处理模型极大得提升了极限吞吐和降低了延迟,但仍旧建议业务尽可能找到合理的数据分区的方式,避免单生产者单分区的场景,并且尽可能避免分区热点。单分区的能力始终是有上限的,一味的堆高单分区的吞吐,不仅集群弹性粒度变大导致弹性的经济性下降,而且单分区高吞吐对下游的消费者的无法横向扩展的单机处理性能也提出了挑战。

关于我们

我们是来自 Apache RocketMQ 和 Linux LVS 项目的核心团队,曾经见证并应对过消息队列基础设施在大型互联网公司和云计算公司的挑战。现在我们基于对象存储优先、存算分离、多云原生等技术理念,重新设计并实现了 Apache Kafka 和 Apache RocketMQ,带来高达 10 倍的成本优势和百倍的弹性效率提升。

🌟 GitHub 地址:https://github.com/AutoMQ/automq

💻 官网:https://www.automq.com

👀 B站:AutoMQ官方账号

🔍 视频号:AutoMQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/801126.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Data Shapley Value 笔记

本文为 Data Shapley: Equitable Valuation of Data for Machine Learning 的阅读笔记,涉及论文中的 Data Shapley Value 计算公式、两种实现算法、实验应用部分的梳理。 为理解 Data Shapley Value,本文首先讨论 Shapley Value的相关内容,利…

Python基于Django的微博热搜、微博舆论可视化系统,附源码

博主介绍:✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专栏推荐订阅👇…

K8s学习九(配置与存储_存储)

存储管理 Volumes HostPath 将节点上的文件或目录挂载到 Pod 上,此时该目录会变成持久化存储目录,即使 Pod 被删除后重启,也可以重新加载到该目录,该目录下的文件不会丢失 效果就是容器里的数据和主机里的数据进行共享 配置文…

蓝桥杯gcd汇总

gcd3014 问题描述 小明和小红是一对恋人,他们相爱已经三年了,在今年的七夕节,小明准备给小红一个特殊的礼物。他想要送给小红一些数字,让小红算出有多少对正整数 (a,b) 满足以下条件: clcm(a,b)−dgcd(a,b)x其中 c,…

JS-25-浏览器和浏览器对象

一、浏览器 由于JavaScript的出现就是为了能在浏览器中运行,所以,浏览器自然是JavaScript开发者必须要关注的。 目前主流的浏览器分这么几种: IE 6~11:国内用得最多的IE浏览器,历来对W3C标准支持差。从IE10开始支持E…

无人售货奶柜:开启便捷生活的新篇章

无人售货奶柜:开启便捷生活的新篇章 在这个快节奏的现代生活中,科技的革新不仅为我们带来了前所未有的便利,更在不经意间改变着我们的日常。其中,无人售货技术的出现,尤其是无人售货奶柜,已经成为我们生活…

项目管理中的估算活动资源

在项目管理中,资源估算是一项至关重要的任务。正确地估算活动资源可以确保项目的顺利进行,避免资源浪费和不必要的延误。以下是对项目管理中常见的活动资源类型的详细分析。 一、人力资源 人力资源是项目管理中最基本的资源之一。它包括项目团队成员的技能、知识和经验。在…

Java Web

1.GET方式请求 (1).普通URL get请求 1 2 3 http://localhost:8080/ajaxGet?id1&username用户名&userTrueName真实姓名 //get也可以传json,通过参数传json字符串,然后后端进行解析(不过一般都不这么做) http://localhost:8080/ajaxGet?user{"id":"1&…

【C语言】函数相关选择题

前言 关于函数相关的选择题。 题目一: C语言规定,在一个源程序中,main函数的位置( ) A .必须在最开始 B .必须在库函数的后面 C .可以任意 D .必须在最后 题解:选择C。 main函数为C语言中整个工程的程序入…

ngnix的反向代理是什么?有什么作用?

1、Nginx的反向代理是什么? Nginx的反向代理是一种网络架构模式,其中Nginx服务器作为前端服务器,接收客户端的请求,然后将这些请求转发给后端服务器(例如Java应用程序服务器)。在这个过程中,客…

北京--面试1(设计模式、反射、队列、线程、锁、Linux命令、JVM调优参数)

1、写三个设计模式(代码) //单例懒汉模式:单例模式确保一个类只有一个实例,并提供一个全局访问点。在Java中,单例模式被广泛用于控制资源访问,配置管理器等场景。实现单例模式的方式包括懒汉式、饿汉式、双…

Leetcode56_合并区间

1.leetcode原题链接:. - 力扣(LeetCode) 2.题目描述 以数组 intervals 表示若干个区间的集合,其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间,并返回 一个不重叠的区间数组,该数…

Stable Diffusion|Ai赋能电商 Inpaint Anything

1. 背景介绍 随着人工智能技术的不断发展,其在电商领域的应用也越来越广泛。其中,图像修复技术在电商领域有着重要的应用价值。例如,在商品图片处理中,去除图片中的水印、瑕疵等,可以提高商品图片的质量和美观度。 2…

牛顿:Archetype AI 的开创性模型,实时解读真实世界的新宠儿

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗?订阅我们的简报,深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同,从行业内部的深度分析和实用指南中受益。不要错过这个机会,成为AI领…

kafka命令行高级命令

#--time -1 查看topic各个partition 未过期最早offset --time -2 最后offest,根据二者差值计算存量数据 sh /usr/local/kafka9092/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server xxxxx:9092 --topic __consumer_offsets --time -1 #查…

不同系统锁库存的实现方式

目录 1. 系统内部实现锁库存 2. 使用中间件实现锁库存 3. 服务化实现锁库存 4. 分布式事务实现锁库存 1. 系统内部实现锁库存 描述:系统采用内部机制,如数据库事务、行锁或乐观锁等技术,来在必要的时候锁定库存。特点:实现细…

英语学习笔记-音节划分和字母发音对照表

国际音标 音节划分 英语音节以元音为主体构成的发音单位,一般说来元音发音响亮,可以构成音节,辅音发音不响亮,不能单独构成音节 ((m] (n] [I] 例外)。 从单词拼写形式上看,有几个元字组就有几个音节 音节划分规则 长…

[通俗易懂]《动手学强化学习》学习笔记1-第1章 初探强化学习

文章目录 前言第1章 初探强化学习1.1 简介序贯决策(sequential decision making)任务:强化学习与有监督学习或无监督学习的**区别**:改变未来 1.2 什么是强化学习环境交互与有监督学习的区别1:改变环境 (说…

GPU环境安装与虚拟环境安装(适用于Windows下的李沐GPU)

之前我是用的都是VMware的虚拟机且安装的是cpu的pytorch版本,因为想要使用GPU,最终实现了在Windows上使用GPU,并且相关原理也在参考文章或视频内,可以通过原理自行挑选自己所需的配置并安装。 文章目录 1.GPU安装1.1 名词解释1.2 卸载旧版本的CUDA1.3 版本选择步骤(Nivida显卡…

ubuntu安装

一、安装虚拟机 https://www.vmware.com/products/workstation-pro/workstation-pro-evaluation.html 下载后运行安装向导,一直Next即可 许可证: https://zhuanlan.zhihu.com/p/685829787#:~:textpro,17%E5%AF%86%E9%92%A5%EF%BC%9AMC60H-DWHD5-H80U9-6…