Kafka 管理TCP连接

生产者管理TCP连接

Kafka生产者程序概览

Kafka的Java生产者API主要的对象就是KafkaProducer。通常我们开发一个生产者的步骤有4步: 

        第1步:构造生产者对象所需的参数对象。

        第2步:利用第1步的参数对象,创建KafkaProducer对象实例

        第3步:使用KafkaProducer的send方法发送消息。

        第4步:调用KafkaProducer的close方法关闭生产者并释放各种系统资源。

上面这4步写成Java代码的话大概是这个样子:

Properties props = new Properties ();
props.put(“参数1”, “参数1的值”);
props.put(“参数2”, “参数2的值”);
……
try (Producer<String, String> producer = new KafkaProducer<>(props)) {producer.send(new ProducerRecord<String, String>(……), callback);……
}

 何时创建TCP连接

1. TCP连接在创建KafkaProducer实例时建立

创建KafkaProducer实例时,生产者应用会在后台创建并启动一个名为 Sender的线程,该Sender线程开始运行时首先会创建与Broker的TCP连接的。

如果不调用send方法,这个Producer都不知道给哪个主题发消息,它又怎么能知道连接哪个Broker呢?这是通过Producer的核心参数之一bootstrap.servers参数来指定的。

如果为这个参数指定了1000个Broker连接信息,Producer启动时会首先创建与这1000个Broker的TCP连接,所以不建议把集群中所有的Broker信息都配置到bootstrap.servers中,因为Producer一旦连接到集群中的任一台Broker,就能拿到整个集群的Broker信息,故没必要为bootstrap.servers指定所有的Broker。通过日志可以看出:

[2018-12-09 09:35:45,828] DEBUG[ProducerclientId=producer-1] Sendingmetadatarequest (type=MetadataRequest, topics=) to nodelocalhost:9093 (id:-2 rack: null) (org.apache.kafka.clients.NetworkClient:1068)

Producer向某一台Broker发送了MetadataRequest请求,尝试获取集群的元数据信息——这就是前面提到的Producer能够获取集群所有信息的方法。 

2. 在更新元数据后 和 在消息发送时

 (1) 当Producer尝试给一个不存在的主题发送消息时,Broker会告诉Producer说这个主题不存在。此时Producer会发送METADATA请求给Kafka集群,去尝试获取最新的元数据信息。

  (2) Producer通过metadata.max.age.ms参数定期地去更新元数据信息。该参数的默认值是300000,即5分钟,也就是说不管集群那边是否有变化,Producer每5分钟都会强制刷新一次元数据以保证它是最及时的数据。

何时关闭TCP连接

1. 用户主动关闭

这里的主动关闭实际上是广义的主动关闭,甚至包括用户调用kill-9主动“杀掉”Producer应用。当然最推荐的方式还是调用producer.close()方法来关闭。

2. Kafka自动关闭

这与Producer端参数connections.max.idle.ms的值有关。默认情况下该参数值是9分钟,即如果在9分钟内没有任何请求“流过”某个TCP连接,那么Kafka会主动帮你把该TCP连接关 闭。用户可以在Producer端设置connections.max.idle.ms=-1禁掉这种机制。一旦被设置成-1,TCP连接将成为永久长连接。当然这只是软件层面的“长连接”机制,由于Kafka创建的这些Socket连接都开启了 keepalive,因此keepalive探活机制还是会遵守的。

自动关闭中,TCP连接是在Broker端被关闭的,但其实这个TCP连接的发起方是客户端,因此在TCP看来,这属于被动关闭的场景,即passive close。被动关闭的后果就是会产生大量的 CLOSE_WAIT连接,因此Producer端或Client端没有机会显式地观测到此连接已被中断。

小结:

Java Producer端管理TCP连接的方式是:

1. KafkaProducer实例创建时启动Sender线程,从而创建与bootstrap.servers中所有Broker的TCP连接。

2. KafkaProducer实例首次更新元数据信息之后,还会再次创建与集群中所有Broker的TCP连接。

3. 如果Producer端发送消息到某台Broker时发现没有与该Broker的TCP连接,那么也会立即创建连接。

4. 如果设置Producer端connections.max.idle.ms参数大于0,则步骤1中创建的TCP连接会被自动关闭;如果设置该参数=-1,那么步骤1中创建的TCP连接将无法被关闭,从而成为“僵尸”连接。

消费者管理TCP连接

何时创建TCP连接

和生产者不同的是,构建KafkaConsumer实例时是不会创建任何TCP连接的。TCP连接是在调用KafkaConsumer.poll方法时被创建的。再细粒度地说,在poll方法内部有3个时机可以创建TCP连接:

1.发起FindCoordinator请求时。

消费者端有个组件叫协调者Coordinator),它驻留在Broker端的内存中,负责消费者组的组成员管理和各个消费者的位移提交管理。当消费者程序首次启动调用poll方法时,它需要向Kafka集群 发送一个名为FindCoordinator的请求,向集群中当前负载最小的那台Broker发送请求。希望Kafka集群告诉它哪个Broker是管理它的协调者。

2.连接协调者时。

Broker处理完上一步发送的FindCoordinator请求之后,会返还对应的响应结果(Response),显式地告诉消费者哪个Broker是真正的协调者,因此在这一步,消费者知晓了真正的协调者后,会创建连向该 Broker的Socket连接。只有成功连入协调者,协调者才能开启正常的组协调操作,比如加入组、等待组分配方案、心跳请求处理、位移获取、位移提交等。

3.消费数据时。

消费者会为每个要消费的分区创建与该分区领导者副本所在Broker连接的TCP。举个例子,假设消费者要消费5个分区的数据,这5个分区各自的领导者副本分布在4台Broker上,那么该消费者在消费时会创建与这4台Broker的Socket连接。

注意:当第三类TCP连接成功创建后,消费者程序就会废弃第一类TCP连接,第一类TCP连接会在后台被默默地关闭掉。对一个运行了一段时间的消费者程序来说,只会有后面两类TCP连接存在。

通过日志查看:
[2019-05-27 10:00:54,142] DEBUG [ConsumerclientId=consumer-1, groupId=test] Initiating connection to nodelocalhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1
(org.apache.kafka.clients.NetworkClient:944)
…
[2019-05-27 10:00:54,188] DEBUG [ConsumerclientId=consumer-1, groupId=test] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name=‘t4’)],
allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to nodelocalhost:9092 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:1097)
…
[2019-05-27 10:00:54,188] TRACE [ConsumerclientId=consumer-1, groupId=test] Sending FIND_COORDINATOR {key=test,key_type=0} with correlation id 0 to node-1
(org.apache.kafka.clients.NetworkClient:496)
[2019-05-27 10:00:54,203] TRACE [ConsumerclientId=consumer-1, groupId=test] Completed receivefrom node-1 for FIND_COORDINATORwith correlation id 0, received
{throttle_time_ms=0,error_code=0,error_message=null, node_id=2,host=localhost,port=9094}(org.apache.kafka.clients.NetworkClient:837)
…
[2019-05-27 10:00:54,204] DEBUG [ConsumerclientId=consumer-1, groupId=test] Initiating connection to nodelocalhost:9094 (id: 2147483645 rack: null) using address localhost/127.0.0.1
(org.apache.kafka.clients.NetworkClient:944)
…
[2019-05-27 10:00:54,237] DEBUG [ConsumerclientId=consumer-1, groupId=test] Initiating connection to nodelocalhost:9094 (id: 2 rack: null) using address localhost/127.0.0.1
(org.apache.kafka.clients.NetworkClient:944)
[2019-05-27 10:00:54,237] DEBUG [ConsumerclientId=consumer-1, groupId=test] Initiating connection to nodelocalhost:9092 (id: 0 rack: null) using address localhost/127.0.0.1
(org.apache.kafka.clients.NetworkClient:944)
[2019-05-27 10:00:54,238] DEBUG [ConsumerclientId=consumer-1, groupId=test] Initiating connection to nodelocalhost:9093 (id: 1 rack: null) using address localhost/127.0.0.1
(org.apache.kafka.clients.NetworkClient:944)

日志的第一行是消费者程序创建的第一个TCP连接,就像我们前面说的,这个Socket用于发送FindCoordinator请求。由于这是消费者程序创建的第一个连接,此时消费者对于要连接 的Kafka集群一无所知,因此它连接的Broker节点的ID是-1,表示消费者尚未获取到Broker数据。

日志的第二行,消费者复用了刚才创建的那个Socket连接,向Kafka集群发送元数据请求以获取整个集群的信息。

日志的第三行表明,消费者程序开始发送FindCoordinator请求给第一步中连接的Broker,即localhost:9092,也就是nodeId等于-1的那个。在十几毫秒之后,消费者程序成功地获悉协调者所在的Broker信息, 也就是第四行的“node_id = 2”

完成这些之后,消费者就已经知道协调者Broker的连接信息了,因此在日志的第五行发起了第二个Socket连接,创建了连向localhost:9094的TCP。只有连接了协调者,消费者进程才能正常地开启消费者组 的各种功能以及后续的消息消费。

在日志的最后三行中,消费者又分别创建了新的TCP连接,主要用于实际的消息获取。要消费的分区的领导者副本在哪台Broker上,消费者就要创建连向哪台Broker的TCP

那么2147483645是怎么来的呢?

它是由Integer.MAX_VALUE减去协调者所在Broker的真实ID计算得来的。看第四行的内容,我们可以知道协调者ID是2,因此这个Socket连接的节点ID就是 Integer.MAX_VALUE减去2,即2147483647减去2,也就是2147483645。这种节点ID的标记方式目的就是要让组协调请求和真正的数据获取请求使用不同的Socket连接。 至于后面的0、1、2,那就很好解释了。它们表征了真实的Broker ID,也就是我们在server.properties中配置的broker.id值。

何时关闭TCP连接

1. 手动关闭

手动调用KafkaConsumer.close()方法或者是执行Kill命令。

2. 自动关闭

自动关闭是由消费者端参数connection.max.idle.ms控制的,该参数现在的默认值是9分钟,即如果某个Socket连接上连续9分钟都没有任何请求“过境”的话,那么消费者会强行“杀掉”这个Socket连接。

注意:和生产者有些不同的是,如果在编写消费者程序时,使用了循环的方式来调用poll方法消费消息,那么上面提到的所有请求都会被定期发送到Broker,因此这些Socket连接上总是能保证有请求在发送,从而也就实现了“长连接”的效果

参考:Kafka 核心技术与实战 (geekbang.org)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/864927.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

自定义控件动画篇(三)ValueAnimator的使用

ValueAnimator 是 Android 属性动画系统的一部分&#xff0c;允许你创建基于值的动画而不仅仅是基于 UI 控件的动画。它可以用来改变任何类型的数据&#xff0c;如整数、浮点数、颜色值等&#xff0c;然后你可以利用这些变化的值来更新你的 UI 或执行其他逻辑。以下是 ValueAni…

SpringBoot工程中测试Groovy代码

Groovy 代码 def testWithoutParam() {println ("这是一个没有参数的Groovy方法") }testWithoutParam()def testWithParam(String s1,String s2) {println ("这是一个有参数的Groovy方法")println (s1)println (s2) }testWithParam(s1,s2)Java 测试代码 测…

ssh网关-sshpiper

1、环境 操作系统&#xff1a;龙蜥os 7.9 sshpiper&#xff1a;1.3.1 2、下载软件 https://github.com/tg123/sshpiper/releases 3、解压到指定目录 mkdir /opt/sshpiper tar -xvf sshpiperd_with_plugins_linux_x86_64.tar.gz -C /opt/sshpipe4、添加执行权限 chmod x /o…

[开源软件] 支持链接汇总

“Common rules: 1- If the repo is on github, the support/bug link is also on the github with issues”" label; 2- Could ask questions by email list;" 3rd party software support link Note gcc https://gcc.gnu.org openssh https://bugzilla.mindrot.o…

《昇思25天学习打卡营第9天|onereal》

继续学习昨天的 基于MindNLPMusicGen生成自己的个性化音乐 生成音乐 MusicGen支持两种生成模式&#xff1a;贪心&#xff08;greedy&#xff09;和采样&#xff08;sampling&#xff09;。在实际执行过程中&#xff0c;采样模式得到的结果要显著优于贪心模式。因此我们默认启…

K6 性能测试教程:入门介绍,环境搭建和编写第一个 K6 测试脚本

K6 性能测试教程&#xff1a;入门介绍&#xff0c;环境搭建和编写第一个 K6 测试脚本 这篇文章将带您进入 K6 性能测试的世界。博文内容涵盖了 K6 性能测试的入门知识、环境搭建步骤&#xff0c;以及如何编写您的第一个测试脚本。无论您是初学者还是有经验的性能测试专业人员&…

详解C语言分支与循环语句

分支语句 if elseswitch 循环语句 whilefordo while goto语句 文章目录 1.什么是语句2.分支语句&#xff08;选择结构&#xff09;2.1 if语句2.1.1 悬空else2.1.3 练习 2.2 switch语句2.2.1 在switch语句中的break2.2.2 default子句 3.循环语句3.1 while循环3.1.1 while语句中…

【Linux进程通信】使用匿名管道制作一个简单的进程池

进程池是什么呢&#xff1f;我们可以类比内存池的概念来理解进程池。 内存池 内存池是在真正使用内存之前&#xff0c;先申请分配一定数量的、大小相等(一般情况下)的内存块留作备用。当有新的内存需求时&#xff0c;就从内存池中分出一部分内存块&#xff0c;若内存块不够再继…

web权限到系统权限 内网学习第一天 权限提升 使用手工还是cs???msf可以不??

现在开始学习内网的相关的知识了&#xff0c;我们在拿下web权限过后&#xff0c;我们要看自己拿下的是什么权限&#xff0c;可能是普通的用户权限&#xff0c;这个连添加用户都不可以&#xff0c;这个时候我们就要进行权限提升操作了。 权限提升这点与我们后门进行内网渗透是乘…

C#编程命名笔记

1.变量名的命名规则->要求用“匈牙利法则” 变量类型特征位数命名规则例子bool 用b开头bUpdatesbyte有符号8位用sby开头sbyTypebyte无符号8位用by开头byTypeshort有符号16位用n开头nStepCountushort无符号16位用un开头unCountint有符号32位用i开头iCountuint&#xff08;WO…

MySQL:设计数据库与操作

设计数据库 1. 数据建模1.1 概念模型1.2 逻辑模型1.3 实体模型主键外键外键约束 2. 标准化2.1 第一范式2.2 链接表2.3 第二范式2.4 第三范式 3. 数据库模型修改3.1 模型的正向工程3.2 同步数据库模型3.3 模型的逆向工程3.4 实际应用建议 4. 数据库实体模型4.1 创建和删除数据库…

Linux 磁盘空间清理

1.检查磁盘使用情况 #显示每个挂载点的磁盘使用量&#xff0c;以及可用空间和使用率 df -h #显示当前目录的全部文件和目录&#xff08;包括隐藏的&#xff09;,以MB显示 ll -h 2. du查看最大的目录或文件 #逐级检查某个目录下各个子目录的大小。从根目录开始&#xff0c;逐级…

Linux_应用篇(25) SPI 应用编程基础

SPI基础知识 SPI&#xff08;Serial Peripheral Interface&#xff0c;串行外设接口&#xff09;是一种同步串行通信协议&#xff0c;广泛应用于微控制器和各种外围设备之间的数据传输。它由摩托罗拉公司在20世纪80年代开发&#xff0c;具有高速、全双工通信的特点&#xff0c…

基于x86+FPGA+AI轴承缺陷视觉检测系统,摇枕弹簧智能检测系统

一、承缺陷视觉检测系统 应用场景 轴类零件自动检测设备&#xff0c;集光、机、软件、硬件&#xff0c;智能图像处理等先进技术于一体&#xff0c;利用轮廓特征匹配&#xff0c;目标与定位&#xff0c;区域选取&#xff0c;边缘提取&#xff0c;模糊运算等算法实现人工智能高…

PCL 点云聚类(基于体素连通性)

文章目录 一、简介二、实现代码三、实现效果参考资料一、简介 这里的思路很简单,我们通过将点云转换为体素,基于体素的连通性实现对点云的聚类(有点类似于欧式聚类),不过这种方式进行的聚类有些粗糙,但聚类速度相对会快很多,具体的实现效果可以详细阅读代码。 二、实现代…

[AIGC] Java HashMap原理解析:深入探索键值对存储和检索的内部机制

HashMap是Java中最常用的数据结构之一&#xff0c;它提供了高效的键值对存储和检索能力。本文将深入探索Java HashMap的内部机制&#xff0c;详细介绍其原理和工作流程。 文章目录 一、HashMap的数据结构二、哈希冲突处理三、哈希算法四、键值对的存储和检索五、扩容和负载因子…

提高候选人的招聘感受:成功的策略

大约78%的候选人表示&#xff0c;他们的整体应聘体验表明企业对员工的关注。然而&#xff0c;超过一半的候选人透露&#xff0c;他们在招聘过程中有过负面的候选人经历&#xff0c;80%的候选人在经历了令人失望的招聘过程后会公开与他人分享他们的不良经历。 但也有一线希望&am…

Perl的上下文之谜:深入理解上下文概念

&#x1f577;️ Perl的上下文之谜&#xff1a;深入理解上下文概念 Perl&#xff0c;这门被誉为“只需一条命令就能完成任务”的编程语言&#xff0c;以其强大的文本处理能力而闻名。在Perl中&#xff0c;上下文是一个核心概念&#xff0c;它决定了变量的解释方式以及操作符的…

在nginx中设置相对路径跳转的方式

在nginx中的location中&#xff0c;设置301或302的跳转的方式一般是这样的 # 302跳转 location ~ ^/old/$ {return 302 /new/; }# 301跳转 location ~ ^/old/$ {return 301 /new/; }这里/new/虽然写的是相对路径&#xff0c;但是nginx依然会补齐url的前缀&#xff0c;这样在…

某某市信息科技学业水平测试软件打开加载失败逆向分析(笔记)

引言&#xff1a;笔者在工作过程中&#xff0c;用户上报某某市信息科技学业水平测试软件在云电脑上打开初始化的情况下出现了加载和绑定机器失败的问题。一般情况下&#xff0c;在实体机上用户进行登录后&#xff0c;用户的账号信息跟主机的机器码进行绑定然后保存到配置文件&a…