kafka生产端TCP连接管理

目录

前言:

Kafka生产者程序 

 Kafka生产者客户端如何创建TCP连接

 Kafka生产者客户端如何关闭TCP连接

总结: 

参考资料 


前言:

     在网络层协议中,TCP作用在第四层传输层、Http协议作用在第七层最上层应用层,一个完整的网络传输,信息会优先到达第四层,然后在往上传输到第七层,TCP协议相比于Http协议提供更好的连接稳定性及TCP提供的多路复用请求及可靠的消息交付语义保证,如自动重传丢失的报文等,kafka在设计上使用TCP协议作为所有请求通信的底层协议。

Kafka生产者程序 

kafka的Java生产者API主要的对象就是KafkaProducer。通常我们开发一个生产者的步骤有4步。

第1步:构造生产者对象所需的参数对象。

第2步:利用第1步的参数对象,创建KafkaProducer对象实例。

第3步:使用KafkaProducer的send方法发送消息。

第4步:调用KafkaProducer的close方法关闭生产者并释放各种系统资源。

上面这4步写成Java代码的话大概是这个样子:

Properties props = new Properties ();
props.put(“参数1”, “参数1的值”);
props.put(“参数2”, “参数2的值”);
……
try (Producer<String, String> producer = new KafkaProducer<>(props)) {producer.send(new ProducerRecord<String, String>(……), callback);……
}

   当我们开发一个Producer应用时,生产者会向Kafka集群中指定的主题(Topic)发送消息,这必然涉及与Kafka Broker创建TCP连接。那么,Kafka的Producer客户端是如何管理这些TCP连接的呢?

 Kafka生产者客户端如何创建TCP连接

 要回答上面这个问题,我们首先要弄明白生产者代码是什么时候创建TCP连接的。就上面的那段代码而言,可能创建TCP连接的地方有两处:Producer producer = new KafkaProducer(props)和producer.send(msg, callback)。你觉得连向Broker端的TCP连接会是哪里创建的呢?前者还是后者,抑或是两者都有?请先思考5秒钟,然后我给出我的答案。

 首先,生产者应用在创建KafkaProducer实例时是会建立与Broker的TCP连接的。其实这种表述也不是很准确,应该这样说:在创建KafkaProducer实例时,生产者应用会在后台创建并启动一个名为Sender的线程,该Sender线程开始运行时首先会创建与Broker的连接。我截取了一段测试环境中的日志来说明这一点:

[2018-12-09 09:35:45,620] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9093 (id: -2 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)[2018-12-09 09:35:45,622] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9093 (id: -2 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)[2018-12-09 09:35:45,814] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9092 (id: -1 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)[2018-12-09 09:35:45,815] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)[2018-12-09 09:35:45,828] DEBUG [Producer clientId=producer-1] Sending metadata request (type=MetadataRequest, topics=) to node localhost:9093 (id: -2 rack: null) (org.apache.kafka.clients.NetworkClient:1068)

 你也许会问:怎么可能是这样?如果不调用send方法,这个Producer都不知道给哪个主题发消息,它又怎么能知道连接哪个Broker呢?难不成它会连接bootstrap.servers参数指定的所有Broker吗?嗯,是的,Java Producer目前还真是这样设计的。

 我在这里稍微解释一下bootstrap.servers参数。它是Producer的核心参数之一,指定了这个Producer启动时要连接的Broker地址。请注意,这里的“启动时”,代表的是Producer启动时会发起与这些Broker的连接。因此,如果你为这个参数指定了1000个Broker连接信息,那么很遗憾,你的Producer启动时会首先创建与这1000个Broker的TCP连接。

 在实际使用过程中,我并不建议把集群中所有的Broker信息都配置到bootstrap.servers中,通常你指定3~4台就足以了。因为Producer一旦连接到集群中的任一台Broker,就能拿到整个集群的Broker信息,故没必要为bootstrap.servers指定所有的Broker。

 从上面这段日志中,我们可以发现,在KafkaProducer实例被创建后以及消息被发送前,Producer应用就开始创建与两台Broker的TCP连接了。当然了,在我的测试环境中,我为bootstrap.servers配置了localhost:9092、localhost:9093来模拟不同的Broker,但是这并不影响后面的讨论。另外,日志输出中的最后一行也很关键:它表明Producer向某一台Broker发送了METADATA请求,尝试获取集群的元数据信息——这就是前面提到的Producer能够获取集群所有信息的方法。

纵然KafkaProducer是线程安全的,我也不赞同创建KafkaProducer实例时启动Sender线程的做法。写了《Java并发编程实践》的那位布赖恩·格茨(Brian Goetz)大神,明确指出了这样做的风险:在对象构造器中启动线程会造成this指针的逃逸。理论上,Sender线程完全能够观测到一个尚未构造完成的KafkaProducer实例。当然,在构造对象时创建线程没有任何问题,但最好是不要同时启动它。 

 针对TCP连接何时创建的问题,目前我们的结论是这样的:TCP连接是在创建KafkaProducer实例时建立的那么,我们想问的是,它只会在这个时候被创建吗?

当然不是!TCP连接还可能在两个地方被创建:一个是在更新元数据后,另一个是在消息发送时。为什么说是可能?因为这两个地方并非总是创建TCP连接。当Producer更新了集群的元数据信息之后,如果发现与某些Broker当前没有连接,那么它就会创建一个TCP连接。同样地,当要发送消息时,Producer发现尚不存在与目标Broker的连接,也会创建一个。 

 接下来,我们来看看Producer更新集群元数据信息的两个场景。 

场景一:当Producer尝试给一个不存在的主题发送消息时,Broker会告诉Producer说这个主题不存在。此时Producer会发送METADATA请求给Kafka集群,去尝试获取最新的元数据信息。

场景二:Producer通过metadata.max.age.ms参数定期地去更新元数据信息。该参数的默认值是300000,即5分钟,也就是说不管集群那边是否有变化,Producer每5分钟都会强制刷新一次元数据以保证它是最及时的数据。

讲到这里,我们可以“挑战”一下社区对Producer的这种设计的合理性。目前来看,一个Producer默认会向集群的所有Broker都创建TCP连接,不管是否真的需要传输请求。这显然是没有必要的。再加上Kafka还支持强制将空闲的TCP连接资源关闭,这就更显得多此一举了。

试想一下,在一个有着1000台Broker的集群中,你的Producer可能只会与其中的3~5台Broker长期通信,但是Producer启动后依次创建与这1000台Broker的TCP连接。一段时间之后,大约有995个TCP连接又被强制关闭。这难道不是一种资源浪费吗?很显然,这里是有改善和优化的空间的。

 Kafka生产者客户端如何关闭TCP连接

Producer端关闭TCP连接的方式有两种:一种是用户主动关闭;一种是Kafka自动关闭

我们先说第一种。这里的主动关闭实际上是广义的主动关闭,甚至包括用户调用kill -9主动“杀掉”Producer应用。当然最推荐的方式还是调用producer.close()方法来关闭

第二种是Kafka帮你关闭,这与Producer端参数connections.max.idle.ms的值有关。默认情况下该参数值是9分钟,即如果在9分钟内没有任何请求“流过”某个TCP连接,那么Kafka会主动帮你把该TCP连接关闭。用户可以在Producer端设置connections.max.idle.ms=-1禁掉这种机制。一旦被设置成-1,TCP连接将成为永久长连接。当然这只是软件层面的“长连接”机制,由于Kafka创建的这些Socket连接都开启了keepalive,因此keepalive探活机制还是会遵守的。

值得注意的是,在第二种方式中,TCP连接是在Broker端被关闭的,但其实这个TCP连接的发起方是客户端,因此在TCP看来,这属于被动关闭的场景,即passive close。被动关闭的后果就是会产生大量的CLOSE_WAIT连接,因此Producer端或Client端没有机会显式地观测到此连接已被中断。

总结: 

  1. KafkaProducer实例创建时启动Sender线程,从而创建与bootstrap.servers中所有Broker的TCP连接。
  2. KafkaProducer实例首次更新元数据信息之后,还会再次创建与集群中所有Broker的TCP连接。
  3. 如果Producer端发送消息到某台Broker时发现没有与该Broker的TCP连接,那么也会立即创建连接。
  4. 如果设置Producer端connections.max.idle.ms参数大于0,则步骤1中创建的TCP连接会被自动关闭;如果设置该参数=-1,那么步骤1中创建的TCP连接将无法被关闭,从而成为“僵尸”连接。

参考资料 

极客时间课程《Kafka核心技术与实战》

13.Java生产者是如何管理TCP连接的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/3277.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

k8s服务发现之第二弹Service详解

创建 Service Kubernetes Servies 是一个 RESTFul 接口对象&#xff0c;可通过 yaml 文件创建。 例如&#xff0c;假设您有一组 Pod&#xff1a; 每个 Pod 都监听 9376 TCP 端口每个 Pod 都有标签 appMyApp apiVersion: v1 kind: Service metadata:name: my-service spec:s…

vue3笔记-脚手架篇

第一章 基础篇 第二章 脚手架篇 vue2与vue3的一些区别 响应式系统&#xff1a; Vue 2 使用 Object.defineProperty 进行响应式数据的劫持和监听&#xff0c;它对数据监听是一项项的进行监听&#xff0c;因此&#xff0c;当新增属性发生变化时&#xff0c;它无法监测到&…

ASP.NET Website 项目 .NET Framework 4.0 ~ .NET Framework 4.8支持c#哪些版本(Website)

本文讲的是Website网站项目&#xff0c;由于维护老项目Website .net framework4.0&#xff0c;遇到c#6.0语法不支持。便做了点记录 ASP.NET Website 项目 .NET Framework 4.0、 .NET Framework 4.5、 .NET Framework 4.6、 .NET Framework 4.8都支持c#哪些版本&#xff1f; 下面…

MongoDB源码安装

文章目录 MongoDB源码安装&#xff1a;注&#xff1a;下载&#xff1a;解压&#xff1a;创建数据目录&#xff1a;创建软链接&#xff1a;创建变量脚本&#xff1a;执行脚本&#xff1a;启动mongodb:检查&#xff1a;连接mongodb&#xff1a; MongoDB源码安装&#xff1a; 注&…

react+unittest+flask 接口自动化测试平台

目录 1 前言 2 框架 2-1 框架简介 2-2 框架介绍 2-3 框架结构 3 平台 3-1 平台组件图 1 新建用例 2 生成测试任务 3 执行并查看测试报告 3-2 用例管理 3-2-1 用例设计 3-3 任务管理 3-3-1 创建任务 3-3-2 执行任务 3-3-3 测试报告 3-3-4 邮件通知 1 前言 在现…

jar程序部署的外部依赖和按名传参和shellUtil传参json串及返回pid问题

文章目录 指定jar程序运行的外部依赖指定参数名称传参给程序shellUtil命令传参JSON串shellUtil获取回调nohub启动程序后的pid 指定jar程序运行的外部依赖 nohup java -Djava.ext.dirs./lib/ -cp DataSourceAccessPage.jar com.sitech.adapter.JsonAdapter arg0 arg1java -cp 命…

《ReentrantLock与synchronized的区别》

目录 Synchronized同步锁 synchronized的用法&#xff1a; ReentrantLock ReentrantLock和Synchronized的区别 Synchronized同步锁 使用Synchronized关键字将一段代码锁起来&#xff0c;同一时间只允许一个线程访问。只有获取了这把锁的线程才能访问这段代码&#xff0c;并…

数学建模常用模型(九) :偏最小二乘回归分析

数学建模常用模型&#xff08;九&#xff09; &#xff1a;偏最小二乘回归分析 偏最小二乘回归&#xff08;Partial Least Squares Regression&#xff0c;PLS Regression&#xff09;是一种常用的统计建模方法&#xff0c;用于解决多元线性回归中自变量间高度相关的问题。在偏…

Http 接口测试框架

目录 前言&#xff1a; 实际效果 框架的下一步 最新框架图&#xff08;红色部分未完成&#xff09; 部分代码 你需要做的 前言&#xff1a; 在进行HTTP接口测试时&#xff0c;使用一个可靠的测试框架可以提高测试效率和质量。HTTP接口测试框架是一种用于自动化测试HTTP接…

vue 通过多组复选框来过滤数据

1.通过if else 来筛选数据 <template> <div><div><label><input type"checkbox" v-model"checkedNames" value"北京"> 北京</label><label><input type"checkbox" v-model"chec…

ubuntu 20.04 4090 显卡驱动安装 深度学习环境配置

1. 显卡驱动安装 准备工作&#xff1a; 换源安装输入法&#xff1a;重启的步骤先不管&#xff08;自选&#xff09;sudo apt update && sudo apt upgrade 禁用nouveau驱动&#xff08;这个驱动是ubuntu开源小组逆向破解NVIDIA的开源驱动&#xff0c;与英伟达的原有驱…

Flask_使用flask_marshmallow序列化数据

代码如下&#xff1a; from flask import Flask from flask_marshmallow import Marshmallow from flask_sqlalchemy import SQLAlchemy from marshmallow import fieldsapp Flask(__name__) app.config["SQLALCHEMY_DATABASE_URI"] "mysqlpymysql://root:12…

网络协议栈介绍

一、网络协议栈 网络协议栈是计算机网络中的重要组件&#xff0c;负责处理网络数据包在不同协议层之间的传递和处理。常见的网络协议栈有以下几种&#xff1a; 1. 基于内核的协议栈&#xff1a;操作系统内核实现的网络协议栈&#xff0c;如Linux的TCP/IP协议栈。 实现原理&a…

Apikit 自学日记:私有云才有的测试文件库功能,该如何使用呢

在 APIkit 的私有云版本中&#xff0c;提供了测试文件库功能。不过目前该功能仅供私有云产品&#xff0c;线上SaaS产品不提供测试文件库功能 API自动化测试中可以添加文件参数。在这里统一管理所有测试文件。 在测试文件库界面&#xff0c;点击上传文件&#xff1a; 在私有云产…

ORACLE的循环

ORACLE的循环 LOOP循环 declare num number; begin num:1; loopdbms_output.put_line(yes);IF num > 10 THENEXIT;END IF;num : num 1; end loop; end;WHILE 循环 declare num stu_info.id%type; beginnum:0;while num<10 loopdbms_output.put_line(num);num:num1;en…

opencv -12 图像运算之按 《位或》 运算(图像融合图像修复和去除)

位或运算 或运算的规则是&#xff0c;当参与或运算的两个逻辑值中有一个为真时&#xff0c;结果就为真。其逻辑关系可以类比为如图 所示的并联电路&#xff0c;两个开关中只要有任意一个闭合时&#xff0c;灯就会亮。 3-5 对参与或运算的算子的不同情况进行了说明&#xff0c;…

7月19日,每日信息差

1、奔驰召回3.2万辆进口汽车&#xff0c;ESP存安全隐患.本次召回范围内车辆由于软件问题&#xff0c;车身电子稳定系统&#xff08;ESP&#xff09;内部监控程序可能会在驾驶循环开始时错误地判定故障&#xff0c;导致车辆动态控制系统的功能受限&#xff0c;存在安全隐患 2、…

Linux操作系统升级低版本的OpenSSH到9.3的高版本

OpenSSH 9.3之前的版本存在各种各样的安全漏洞&#xff0c;为此&#xff0c;我们需要将OpenSSH升级到最新的9.3的版本。 执行&#xff1a;ssh -V&#xff0c;我们可以查看当前的openssh版本 为了避免升级过程中出现意外而导致服务器无法正常使用&#xff0c;建议操作前先对服务…

力扣 406. 根据身高重建队列

题目来源&#xff1a;https://leetcode.cn/problems/queue-reconstruction-by-height/description/ C题解1&#xff1a;分别对h和k两个维度进行考虑&#xff0c;我这里是优先考虑k值&#xff0c;k值相同的时候h小的排前面。然后再一一遍历&#xff0c;对于people[i]&#xff0c…

Nacos报错Could not resolve placeholder ‘order.name‘ in value “${order.name}“怎么解决?

出现这个原因有两个&#xff1a; 1.首先在Nacos配置中心&#xff0c;写入yml配置文件的数据和后端服务在取数据的时候名称不一致 如下图&#xff0c;现在我的配置中心为order-service 看看其中的文件内容信息&#xff1a; 再看看后端是怎么取的&#xff1a; 看出上面错误了吗…