kafka生产端是TCP连接管理

目录

前言:

Kafka生产者程序 

 Kafka生产者客户端如何创建TCP连接

 Kafka生产者客户端如何关闭TCP连接

总结: 

参考资料 


前言:

     在网络层协议中,TCP作用在第四层传输层、Http协议作用在第七层最上层应用层,一个完整的网络传输,信息会优先到达第四层,然后在往上传输到第七层,TCP协议相比于Http协议提供更好的连接稳定性及TCP提供的多路复用请求及可靠的消息交付语义保证,如自动重传丢失的报文等,kafka在设计上使用TCP协议作为所有请求通信的底层协议。

Kafka生产者程序 

kafka的Java生产者API主要的对象就是KafkaProducer。通常我们开发一个生产者的步骤有4步。

第1步:构造生产者对象所需的参数对象。

第2步:利用第1步的参数对象,创建KafkaProducer对象实例。

第3步:使用KafkaProducer的send方法发送消息。

第4步:调用KafkaProducer的close方法关闭生产者并释放各种系统资源。

上面这4步写成Java代码的话大概是这个样子:

Properties props = new Properties ();
props.put(“参数1”, “参数1的值”);
props.put(“参数2”, “参数2的值”);
……
try (Producer<String, String> producer = new KafkaProducer<>(props)) {producer.send(new ProducerRecord<String, String>(……), callback);……
}

   当我们开发一个Producer应用时,生产者会向Kafka集群中指定的主题(Topic)发送消息,这必然涉及与Kafka Broker创建TCP连接。那么,Kafka的Producer客户端是如何管理这些TCP连接的呢?

 Kafka生产者客户端如何创建TCP连接

 要回答上面这个问题,我们首先要弄明白生产者代码是什么时候创建TCP连接的。就上面的那段代码而言,可能创建TCP连接的地方有两处:Producer producer = new KafkaProducer(props)和producer.send(msg, callback)。你觉得连向Broker端的TCP连接会是哪里创建的呢?前者还是后者,抑或是两者都有?请先思考5秒钟,然后我给出我的答案。

 首先,生产者应用在创建KafkaProducer实例时是会建立与Broker的TCP连接的。其实这种表述也不是很准确,应该这样说:在创建KafkaProducer实例时,生产者应用会在后台创建并启动一个名为Sender的线程,该Sender线程开始运行时首先会创建与Broker的连接。我截取了一段测试环境中的日志来说明这一点:

[2018-12-09 09:35:45,620] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9093 (id: -2 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)[2018-12-09 09:35:45,622] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9093 (id: -2 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)[2018-12-09 09:35:45,814] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9092 (id: -1 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)[2018-12-09 09:35:45,815] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)[2018-12-09 09:35:45,828] DEBUG [Producer clientId=producer-1] Sending metadata request (type=MetadataRequest, topics=) to node localhost:9093 (id: -2 rack: null) (org.apache.kafka.clients.NetworkClient:1068)

 你也许会问:怎么可能是这样?如果不调用send方法,这个Producer都不知道给哪个主题发消息,它又怎么能知道连接哪个Broker呢?难不成它会连接bootstrap.servers参数指定的所有Broker吗?嗯,是的,Java Producer目前还真是这样设计的。

 我在这里稍微解释一下bootstrap.servers参数。它是Producer的核心参数之一,指定了这个Producer启动时要连接的Broker地址。请注意,这里的“启动时”,代表的是Producer启动时会发起与这些Broker的连接。因此,如果你为这个参数指定了1000个Broker连接信息,那么很遗憾,你的Producer启动时会首先创建与这1000个Broker的TCP连接。

 在实际使用过程中,我并不建议把集群中所有的Broker信息都配置到bootstrap.servers中,通常你指定3~4台就足以了。因为Producer一旦连接到集群中的任一台Broker,就能拿到整个集群的Broker信息,故没必要为bootstrap.servers指定所有的Broker。

 从上面这段日志中,我们可以发现,在KafkaProducer实例被创建后以及消息被发送前,Producer应用就开始创建与两台Broker的TCP连接了。当然了,在我的测试环境中,我为bootstrap.servers配置了localhost:9092、localhost:9093来模拟不同的Broker,但是这并不影响后面的讨论。另外,日志输出中的最后一行也很关键:它表明Producer向某一台Broker发送了METADATA请求,尝试获取集群的元数据信息——这就是前面提到的Producer能够获取集群所有信息的方法。

纵然KafkaProducer是线程安全的,我也不赞同创建KafkaProducer实例时启动Sender线程的做法。写了《Java并发编程实践》的那位布赖恩·格茨(Brian Goetz)大神,明确指出了这样做的风险:在对象构造器中启动线程会造成this指针的逃逸。理论上,Sender线程完全能够观测到一个尚未构造完成的KafkaProducer实例。当然,在构造对象时创建线程没有任何问题,但最好是不要同时启动它。 

 针对TCP连接何时创建的问题,目前我们的结论是这样的:TCP连接是在创建KafkaProducer实例时建立的那么,我们想问的是,它只会在这个时候被创建吗?

当然不是!TCP连接还可能在两个地方被创建:一个是在更新元数据后,另一个是在消息发送时。为什么说是可能?因为这两个地方并非总是创建TCP连接。当Producer更新了集群的元数据信息之后,如果发现与某些Broker当前没有连接,那么它就会创建一个TCP连接。同样地,当要发送消息时,Producer发现尚不存在与目标Broker的连接,也会创建一个。 

 接下来,我们来看看Producer更新集群元数据信息的两个场景。 

场景一:当Producer尝试给一个不存在的主题发送消息时,Broker会告诉Producer说这个主题不存在。此时Producer会发送METADATA请求给Kafka集群,去尝试获取最新的元数据信息。

场景二:Producer通过metadata.max.age.ms参数定期地去更新元数据信息。该参数的默认值是300000,即5分钟,也就是说不管集群那边是否有变化,Producer每5分钟都会强制刷新一次元数据以保证它是最及时的数据。

讲到这里,我们可以“挑战”一下社区对Producer的这种设计的合理性。目前来看,一个Producer默认会向集群的所有Broker都创建TCP连接,不管是否真的需要传输请求。这显然是没有必要的。再加上Kafka还支持强制将空闲的TCP连接资源关闭,这就更显得多此一举了。

试想一下,在一个有着1000台Broker的集群中,你的Producer可能只会与其中的3~5台Broker长期通信,但是Producer启动后依次创建与这1000台Broker的TCP连接。一段时间之后,大约有995个TCP连接又被强制关闭。这难道不是一种资源浪费吗?很显然,这里是有改善和优化的空间的。

 Kafka生产者客户端如何关闭TCP连接

Producer端关闭TCP连接的方式有两种:一种是用户主动关闭;一种是Kafka自动关闭

我们先说第一种。这里的主动关闭实际上是广义的主动关闭,甚至包括用户调用kill -9主动“杀掉”Producer应用。当然最推荐的方式还是调用producer.close()方法来关闭

第二种是Kafka帮你关闭,这与Producer端参数connections.max.idle.ms的值有关。默认情况下该参数值是9分钟,即如果在9分钟内没有任何请求“流过”某个TCP连接,那么Kafka会主动帮你把该TCP连接关闭。用户可以在Producer端设置connections.max.idle.ms=-1禁掉这种机制。一旦被设置成-1,TCP连接将成为永久长连接。当然这只是软件层面的“长连接”机制,由于Kafka创建的这些Socket连接都开启了keepalive,因此keepalive探活机制还是会遵守的。

值得注意的是,在第二种方式中,TCP连接是在Broker端被关闭的,但其实这个TCP连接的发起方是客户端,因此在TCP看来,这属于被动关闭的场景,即passive close。被动关闭的后果就是会产生大量的CLOSE_WAIT连接,因此Producer端或Client端没有机会显式地观测到此连接已被中断。

总结: 

  1. KafkaProducer实例创建时启动Sender线程,从而创建与bootstrap.servers中所有Broker的TCP连接。
  2. KafkaProducer实例首次更新元数据信息之后,还会再次创建与集群中所有Broker的TCP连接。
  3. 如果Producer端发送消息到某台Broker时发现没有与该Broker的TCP连接,那么也会立即创建连接。
  4. 如果设置Producer端connections.max.idle.ms参数大于0,则步骤1中创建的TCP连接会被自动关闭;如果设置该参数=-1,那么步骤1中创建的TCP连接将无法被关闭,从而成为“僵尸”连接。

参考资料 

极客时间课程《Kafka核心技术与实战》

13.Java生产者是如何管理TCP连接的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/2007.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

写给小白的ChatGPT和AI原理

前言 随着ChatGPT等生成式AI的大火&#xff0c;很多开发者都对AI感兴趣。笔者是一名应用层的开发工程师&#xff0c;想必很多类似的开发者都对AI这块不太了解&#xff0c;故而从自己的理解&#xff0c;写一篇給小白的AI入门文章&#xff0c;希望可以帮助到大家。 这是GPT对本…

HMLT学习笔记

1. HTML说明 1.1 文档声明 用于告诉浏览器文档版本。1.引入样式&#xff0c;2.自身样式&#xff0c;3.使用框架&#xff08;html与xhtml同样&#xff09; <!-- 引入CSS的文档 --><!-- HTML文档 --><!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN&qu…

EfficientNet论文笔记

EfficientNet论文笔记 通过NAS平衡了channel&#xff0c;depth&#xff0c;resolution&#xff0c;发现在相同的FLOPs下&#xff0c;同时增加 depth和 resolution的效果最好。 数据集效果小于resolution怎么办&#xff1f; EfficientNet—b0框架 表格中每个MBConv后会跟一个…

科技资讯|Apple Vision Pro新专利,关于相对惯性测量系统的校正

美国专利商标局正式授予苹果一项 Apple Vision Pro 相关专利&#xff0c;该专利涵盖了具有视觉校正功能的相对惯性测量系统。这样的系统用于弥补头显内的眼前庭不匹配&#xff0c;当 VR 头显中发生的事情与现实世界环境中发生的运动不匹配时&#xff0c;可能会导致恶心。 苹果…

11. 利用Tomcat服务器配置HTTPS双向认定

文章目录 Tomcat配置HTTPS1.为服务器生成证书2.为客户端生成证书3.让服务器信任客户端证书4.将该文件导入到服务器的证书库&#xff0c;添加为一个信任证书使用命令如下&#xff1a;5.查看证书库6.让客户端信任服务器证书7.配置tomcat8.验证 Tomcat配置HTTPS 1.启动cmd控制台&…

DirectX12(D3D12)基础教程(二十二) ——HDR IBL 等距柱面环境光源加载和解算及 GS 一次性渲染到 CubeMap

前序文章目录 DirectX12&#xff08;D3D12&#xff09;基础教程&#xff08;一&#xff09;——基础教程 DirectX12&#xff08;D3D12&#xff09;基础教程&#xff08;二&#xff09;——理解根签名、初识显存管理和加载纹理、理解资源屏障 DirectX12&#xff08;D3D12&…

【Linux】进程信号之信号的产生

进程信号 一 一、信号入门1、信号的一些特性2、信号的处理方式信号捕捉初识 3、Linux下的信号 二、信号的产生1、通过终端按键产生信号2、调用系统函数向进程发信号a、kill函数b、raise函数c、abort函数 3. 由软件条件产生信号4、硬件异常产生信号 结语 一、信号入门 什么是信号…

怎么解决亚马逊跟卖?为何卖家总是举报不成功?

以前大家都是从跟卖的时代走向现在的品牌化运营之路&#xff0c;但是现在跟卖已经从大家都模仿的对象变成了大部分卖家厌恶的对象&#xff0c;那么怎么解决这个跟卖问题呢&#xff1f;目前最直接的方法就是进入亚马逊后台进行举报&#xff0c;但是大概率是失败的。 一、举报违…

MySQL操作库

MySQL操作库 一.创建数据库1. 创建数据库的方式2. 创建数据库时的编码问题3. 指定编码创建数据库4. 验证校验规则对数据库的影响 二.数据库与文件系统的关系三.操纵数据库1. 查看数据库2. 删除数据库3. 修改数据库 四.数据库的备份和恢复1.数据库的备份2.数据库的恢复 五.查看连…

C++基础算法排序篇

&#x1f4df;作者主页&#xff1a;慢热的陕西人 &#x1f334;专栏链接&#xff1a;C算法 &#x1f4e3;欢迎各位大佬&#x1f44d;点赞&#x1f525;关注&#x1f693;收藏&#xff0c;&#x1f349;留言 主要讲解C算法中所涉及到的两个排序算法&#xff0c;快排和归并。 文章…

【JavaEE】HTTP请求的构造

目录 1、通过form表单构造HTTP请求 2、通过JS的ajax构造HTTP请求 3、Postman的安装和简单使用 常见的构造HTTP请求的方式有一下几种&#xff1a; 直接通过浏览器的地址栏&#xff0c;输入一个URL&#xff0c;就可以构造一个GET请求HTML中的一些特殊标签&#xff0c;也会触发…

IT技术岗的面试技巧分享

我们在找工作时,需要结合自己的现状,针对意向企业做好充分准备。作为程序员,你有哪些面试IT技术岗的技巧?你可以从一下几个方向谈谈你的想法和观点。 方向一:分享你面试IT公司的小技巧 1、事先和邀约人了解公司的基本情况,比如公司的行业,规模,研发人员占比等 2、事先和…

超声医疗高压功率放大器ATA-4315技术参数

超声波检查或超声诊断&#xff0c;是一种非侵入性的医学检查方法&#xff0c;它利用了声波的高频振动来观察和评估人体内部的器官和组织。它基于不同密度和组织结构中传播的原理。通过将ultrasound(超声波)传递到身体的特定区域&#xff0c;并记录反射回来的声波&#xff0c;我…

牛顿修正法在二阶近似方法中的应用

使用optimtool的牛顿修正法来应用学习 pip install optimtool --upgrade pip install optimtool>2.4.2optimtool包所依据的理论支撑中&#xff0c;还没有为二阶微分方法作邻近算子的近似与修正&#xff0c;所以二阶近似方法是研究无不可微项的可微函数的算子。 牛顿修正法…

微信小程序本地存储(wx.setStorage)和(wx.setStorageSync)

在微信小程序中&#xff0c;可以使用本地存储来保存一些数据比如用户状态&#xff0c;姓名&#xff0c;性别等&#xff1b; 本地存储主要包括两种方式&#xff1a;缓存和本地数据存储。 1.缓存 缓存是一种快速访问内存的临时存储机制&#xff0c;可以有效地提高应用程序的响应…

Element-UI 实现动态增加多个不同类型的输入框并校验(双重v-for表单验证)

文章目录 前言定义表单格式表单渲染和验证扩展 前言 在做复杂的动态表单&#xff0c;实现业务动态变动&#xff0c;比如有一条需要动态添加的el-form-item中包含了多个输入框&#xff0c;并实现表单验证&#xff0c;但在element-ui组件库中给出的表单校验中没有这样的格式&…

软件测试工程师最常用的web测试-浏览器兼容性测试

如今&#xff0c;市面上的浏览器种类越来越多&#xff08;尤其是在平板和移动设备上&#xff09;&#xff0c;这就意味着你所测试的站点需要在这些你声称支持浏览器上都能很好的工作。 同时&#xff0c;主流浏览器&#xff08;IE&#xff0c;Firefox&#xff0c;Chrome&#x…

腾讯、飞书等在线表格自动化编辑--python

编辑在线表格 一 目的二 实现效果三 实现过程简介1、本地操作表格之后进入导入在线文档2、直接操作在线文档 四 实现步骤讲解1、实现方法的选择2、导入类库3、设置浏览器代理直接操作已打开浏览器4、在线文档登录5、在线文档表格数据操作6、行数不够自动添加行数 五 代码实现小…

关闭浏览器访问http时自动转https

HSTS HSTS全称&#xff1a;HTTP Strict Transport Security&#xff0c;意译&#xff1a;HTTP严格传输安全&#xff0c;是一个Web安全策略机制。 解决的问题 网站从Http转跳到Https时&#xff0c;可能出现的安全问题。 浏览器怎么关闭HSTS Chrome 1.地址栏中输入chrome://net…

双电源并用问题与解决方案

双电源并用问题 曾经有客户在电源模块应用过程中出现过这样的应用场景&#xff0c;如下图1所示。客户使用两路电源给后端电路进行供电&#xff0c;要求在不断电的情况下切换输入电源&#xff0c;此过程中发现后端电路会出现损坏。对各个节点波形进行分析后发现&#xff0c;在给…