14、Kafka 请求是怎么被处理的

Kafka 请求是怎么被处理的

  • 1、处理请求的 2 种常见方案
    • 1.1、顺序处理请求
    • 1.2、每个请求使用单独线程处理
  • 2、Kafka 是如何处理请求的?
  • 3、控制类请求和数据类请求分离

无论是 Kafka 客户端还是 Broker 端,它们之间的交互都是通过 “请求 / 响应” 的方式完成的。比如,客户端会通过网络发送消息生产请求给 Broker,而 Broker 处理完成后,会发送对应的响应给到客户端。

Apache Kafka 自己定义了一组请求协议,用于实现各种各样的交互操作。比如常见的 PRODUCE 请求是用于生产消息的,FETCH 请求是用于消费消息的,METADATA 请求是用于请求 Kafka 集群元数据信息的。所有的请求都是通过 TCP 网络以 Socket 的方式进行通讯的。

下面会详细讨论一下 Kafka Broker 端处理请求的全流程。

1、处理请求的 2 种常见方案

1.1、顺序处理请求

缺陷是,吞吐量太差。由于只能顺序处理每个请求,因此,每个请求都必须等待前一个请求处理完毕才能得到处理。这种方式只适用于请求发送非常不频繁的系统。

1.2、每个请求使用单独线程处理

好处是,它是完全异步的,每个请求的处理都不会阻塞下一个请求。
缺陷是,为每个请求都创建线程的做法开销极大,在某些场景下甚至会压垮整个服务。还是那句话,这个方法只适用于请求发送频率很低的业务场景。

2、Kafka 是如何处理请求的?

kafka 处理请求使用 Reactor 模式。Reactor 模式是事件驱动架构的一种实现方式,特别适合应用于处理多个客户端并发向服务器端发送请求的场景

Reactor 模式的架构如下图所示:
在这里插入图片描述
从这张图中,我们可以发现,多个客户端会发送请求给到 Reactor。Reactor 有个请求分发线程 Dispatcher,也就是图中的 Acceptor,它会将不同的请求下发到多个工作线程中处理。

在这个架构中,Acceptor 线程只是用于请求分发,不涉及具体的逻辑处理,非常得轻量级,因此有很高的吞吐量表现。而这些工作线程可以根据实际业务处理需要任意增减,从而动态调节系统负载能力。

为 Kafka 画一张类似的图的话,那它应该是这个样子的:
在这里插入图片描述
这两张图长得差不多。Kafka 的 Broker 端有个 SocketServer 组件,类似于 Reactor 模式中的 Dispatcher,它也有对应的 Acceptor 线程和一个工作线程池,只不过在 Kafka 中,这个工作线程池有个专属的名字,叫网络线程池。Kafka 提供了 Broker 端参数 num.network.threads,用于调整该网络线程池的线程数。其默认值是 3,表示每台 Broker 启动时会创建 3 个网络线程,专门处理客户端发送的请求。

Acceptor 线程采用轮询的方式将入站请求公平地发到所有网络线程中,因此,在实际使用过程中,这些线程通常都有相同的几率被分配到待处理请求。这种轮询策略编写简单,同时也避免了请求处理的倾斜,有利于实现较为公平的请求处理调度。

好了,你现在了解了客户端发来的请求会被 Broker 端的 Acceptor 线程分发到任意一个网络线程中,由它们来进行处理。那么,当网络线程接收到请求后,它是怎么处理的呢?你可能会认为,它顺序处理不就好了吗?实际上,Kafka 在这个环节又做了一层异步线程池的处理,我们一起来看一看下面这张图。

在这里插入图片描述

当网络线程拿到请求后,它不是自己处理,而是将请求放入到一个共享请求队列中。Broker 端还有个 IO 线程池,负责从该队列中取出请求,执行真正的处理。如果是 PRODUCE 生产请求,则将消息写入到底层的磁盘日志中;如果是 FETCH 请求,则从磁盘或页缓存中读取消息。

IO 线程池处中的线程才是执行请求逻辑的线程。Broker 端参数 num.io.threads 控制了这个线程池中的线程数。目前该参数默认值是 8,表示每台 Broker 启动后自动创建 8 个 IO 线程处理请求。你可以根据实际硬件条件设置此线程池的个数。

比如,如果你的机器上 CPU 资源非常充裕,你完全可以调大该参数,允许更多的并发请求被同时处理。当 IO 线程处理完请求后,会将生成的响应发送到网络线程池的响应队列中,然后由对应的网络线程负责将 Response 返还给客户端。

细心的你一定发现了请求队列和响应队列的差别:请求队列是所有网络线程共享的,而响应队列则是每个网络线程专属的。这么设计的原因就在于,Dispatcher 只是用于请求分发而不负责响应回传,因此只能让每个网络线程自己发送 Response 给客户端,所以这些 Response 也就没必要放在一个公共的地方。

我们再来看看刚刚的那张图,图中有一个叫 Purgatory 的组件,这是 Kafka 中著名的 “炼狱” 组件。它是用来缓存延时请求(Delayed Request)的。所谓延时请求,就是那些一时未满足条件不能立刻处理的请求。比如设置了 acks=all 的 PRODUCE 请求,一旦设置了 acks=all,那么该请求就必须等待 ISR 中所有副本都接收了消息后才能返回,此时处理该请求的 IO 线程就必须等待其他 Broker 的写入结果。当请求不能立刻处理时,它就会暂存在 Purgatory 中。稍后一旦满足了完成条件,IO 线程会继续处理该请求,并将 Response 放入对应网络线程的响应队列中。

到这里,Kafka 请求流程解析的故事就已经讲完了

到目前为止,这里提及的请求处理流程对于所有请求都是适用的,也就是说,Kafka Broker 对所有请求是一视同仁的。

3、控制类请求和数据类请求分离

在 Kafka 内部,除了客户端发送的 PRODUCE 请求和 FETCH 请求之外,还有很多执行其他操作的请求类型,比如负责更新 Leader 副本、Follower 副本以及 ISR 集合的 LeaderAndIsr 请求,负责勒令副本下线的 StopReplica 请求等。与 PRODUCE 和 FETCH 请求相比,这些请求有个明显的不同:它们不是数据类的请求,而是控制类的请求。也就是说,它们并不是操作消息数据的,而是用来执行特定的 Kafka 内部动作的。

我来举个例子说明一下。假设我们有个主题只有 1 个分区,该分区配置了两个副本,其中 Leader 副本保存在 Broker 0 上,Follower 副本保存在 Broker 1 上。假设 Broker 0 这台机器积压了很多的 PRODUCE 请求,此时你如果使用 Kafka 命令强制将该主题分区的 Leader、Follower 角色互换,那么 Kafka 内部的控制器组件(Controller)会发送 LeaderAndIsr 请求给 Broker 0,显式地告诉它,当前它不再是 Leader,而是 Follower 了,而 Broker 1 上的 Follower 副本因为被选为新的 Leader,因此停止向 Broker 0 拉取消息。

这时,一个尴尬的场面就出现了:如果刚才积压的 PRODUCE 请求都设置了 acks=all,那么这些在 LeaderAndIsr 发送之前的请求就都无法正常完成了。就像前面说的,它们会被暂存在 Purgatory 中不断重试,直到最终请求超时返回给客户端。

设想一下,如果 Kafka 能够优先处理 LeaderAndIsr 请求,Broker 0 就会立刻抛出 NOT_LEADER_FOR_PARTITION 异常,快速地标识这些积压 PRODUCE 请求已失败,这样客户端不用等到 Purgatory 中的请求超时就能立刻感知,从而降低了请求的处理时间。即使 acks 不是 all,积压的 PRODUCE 请求能够成功写入 Leader 副本的日志,但处理 LeaderAndIsr 之后,Broker 0 上的 Leader 变为了 Follower 副本,也要执行显式的日志截断(Log Truncation,即原 Leader 副本成为 Follower 后,会将之前写入但未提交的消息全部删除),依然做了很多无用功。

再举一个例子,同样是在积压大量数据类请求的 Broker 上,当你删除主题的时候,Kafka 控制器(我会在专栏后面的内容中专门介绍它)向该 Broker 发送 StopReplica 请求。如果该请求不能及时处理,主题删除操作会一直 hang 住,从而增加了删除主题的延时。

那么,社区是如何解决的呢?社区实现了两类请求的分离。也就是说,Kafka Broker 启动后,会在后台分别创建两套网络线程池和 IO 线程池的组合,它们分别处理数据类请求和控制类请求。至于所用的 Socket 端口,自然是使用不同的端口了,你需要提供不同的 listeners 配置,显式地指定哪套端口用于处理哪类请求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/232652.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

42 经典回溯算法题-全排列

问题描述:给你的那个一个不好喊重复数字的数组nums,返回其所有可能的全排列,你可以按照任何顺序返回答案; 回溯算法求解:定义一个used数组用来表征数组nums是否被选择,每一个回溯函数都要经过nums.length个…

Home Assistant HAOS版如何安装HACS

环境: Home Assistant 11.2 SSH & Web Terminal 17.0 问题描述: Home Assistant HAOS版如何安装HACS 解决方案: 1.打开WEB 里面的终端输入下面命令 wget -O - https://hacs.vip/get | bash -如果上面的命令执行后卡住不动&#xff…

深度学习模型(目标检测)轻量化压缩算法的挑战与解决方法

深度学习模型,尤其是用于目标检测的模型,是高度复杂的,通常包括数以百万计的参数和复杂的层次结构。虽然模型压缩和轻量化算法允许这些模型在资源受限的设备上部署和运行,但这仍然是一个活跃和具有挑战性的研究领域,包…

R语言生物群落(生态)数据统计分析与绘图丨R语言基础、tidyverse数据清洗、多元统计分析、随机森林模型、回归及混合效应模型、结构方程模型、统计结果作图

R 语言的开源、自由、免费等特点使其广泛应用于生物群落数据统计分析。生物群落数据多样而复杂,涉及众多统计分析方法。本教程以生物群落数据分析中的最常用的统计方法回归和混合效应模型、多元统计分析技术及结构方程等数量分析方法为主线,通过多个来自…

js 字符串之间转换

一. 字符串转换 (1)对象转字符串stringify var str JSON.stringify(weather);(2)字符串转对象 var obj JSON.parse(str);(3)数字转字符串toString() var num 2023; var str ; str num.toString();…

记一次jar冲突的问题

问题 业务中需要在spark中链接redis作为服务缓存,spark程序中引入redis的jar包后上传spark集群运行是报java.lang.NoSuchMethodError: com.xxx.common.pool.ConnectionPool.startAsync()Lcom/google/common/util/concurrent/Service;根据报错信息发现是jar包冲突造…

【Java】工业园区高精准UWB定位系统源码

UWB (ULTRA WIDE BAND, UWB) 技术是一种无线载波通讯技术,它不采用正弦载波,而是利用纳秒级的非正弦波窄脉冲传输数据,因此其所占的频谱范围很宽。UWB定位系统依托在移动通信,雷达,微波电路,云计算与大数据…

SQL Server 查询处理过程

查询处理--由 SQL Server 中的关系引擎执行,它获取编写的 T-SQL 语句并将其转换为可以向存储引擎发出请求并检索所需结果的过程。 SQL Server 需要四个步骤来处理查询:分析、代化、优化和执行。 前三个步骤都由关系引擎执行;第三步输出的是…

open3d bug:pcd转txt前后位姿发生改变

1、open3d bug:pcd转txt前后位姿发生改变 open3d会对原有结果进行一个微小位姿变换 import open3d as o3d import numpy as np# 读取PCD点云文件 pcd o3d.io.read_point_cloud(/newdisk/darren_pty/zoom_centered_s2.pcd)# 获取点云坐标 points pcd.points# 指定…

带你手把手解读rejail沙盒源码(0.9.72版本) (八) fnettrace-dns

fnettrace_dns.h 文章目录 fnettrace_dns.hmain.cprint_dnscustom_bpfprint_daterun_tracemain 补充DNSDNS协议报文格式**问题记录****资源记录****报文实例****请求报文****响应报文** DNS解析过程DNS 出现DNS 介绍DNS 域名DNS 域名组成DNS 域名空间DNS 域名解析DNS 解析器DNS…

Redis高级技巧:性能提升50%不是梦

Redis作为一种高性能的键值存储系统,在众多企业和开发者的技术栈中占有一席之地。然而,很多人在使用Redis时,往往只停留在基本操作层面,没有挖掘其更深层次的潜力。 本文探讨如何通过一系列高级技巧和实用的策略,将Re…

word的docx模板导出,poi

word中docx模板导出 后台模板在后台存放位置前台写法 后台 controller使用void也行 InputStream isTemplate this.getClass().getClassLoader().getResourceAsStream("templates/meet/testModel.docx"); String filename LocalDate.now().format(DateTimeFormatt…

SpringCloudGateway 3.1.4版本 Netty内存泄漏问题解决

一、 产生的异常 当时是服务器访问不到服务了,上去一看,无法申请资源OutOfDirectMemoryError了,内存级别的东西让人一阵头大,赶紧在线下模拟, 1. 减少分配的堆外内存,打开Netty的监测工具等有助于复现的…

#HarmonyOS:@ohos.promptAction (弹窗)---onProgressChange

组件实例 ohos.promptAction (弹窗) 导入模块 import promptAction from ohos.promptAction示例 try {promptAction.showToast({ message: Message Info,duration: 2000, bottom: 64 // 设置弹窗边框距离屏幕底部的位置}); } catch (error) {console.error(sho…

建筑模板怎么选?

在建筑领域,选择合适的模板材料对于确保工程质量、提高施工效率和控制成本至关重要。目前,常见的建筑模板主要有钢模板、塑料模板和木模板三种类型,每种都有其独特的优势和局限性。本文将对这些模板类型进行分析,并特别推荐广西生…

linux | sed 命令使用 | xargs命令 使用

##################################################### sed命令来自英文词组stream editor的缩写,其功能是利用语法/脚本对文本文件进行批量的编辑操作。sed命令最初由贝尔实验室开发,后被众多Linux系统集成,能够通过正则表达式对文件进行批…

C++11 【初识】

C11简介 1.在2003年C标准委员会曾经提交了一份技术勘误表(简称TC1),使得C03这个名字已经取代了C98称为C11之前的最新C标准名称。 2.不过由于C03(TC1)主要是对C98标准中的漏洞进行修复,语言的核心部分则没有改动,因此人们习惯性的把两个标准合…

化合物1219962-49-8;AT791抑制剂

化合物1219962-49-8是一种小分子化合物,分子式为C15H25N3O4,相对分子质量为305.37。该化合物为白色至灰白色粉末,不溶于水,易溶于有机溶剂,如甲醇、乙醇等。因此,该化合物在制备时,可以通过有机…

【RocketMQ-Install】RocketMQ 的安装及基础命令的使用

【RocketMQ-Install】Windows 环境下 安装本地 RocketMQ 及基础命令的使用 1)下载 RocketMQ 安装包1.1.官网下载(推荐)1.2.Git 下载1.3.安装环境要求说明 2)Windows 安装3)Linux 安装4)控制台安装5&#xf…

HTML5+CSS3小实例:纯CSS实现网站置灰

实例:纯CSS实现网站置灰 技术栈:HTML+CSS 效果: 源码: 【HTML】 <!DOCTYPE html> <html><head><meta http-equiv="content-type" content="text/html; charset=utf-8"><meta name="viewport" content="…