消息中间件—简谈Kafka中的NIO网络通信模型

640?wx_fmt=jpeg


前面写的两篇RocketMQ源码研究笔记系列:


1. 消息中间件—RocketMQ的RPC通信(一)


2. 消息中间件—RocketMQ的RPC通信(二)


基本上已经较为详细地将RocketMQ这款分布式消息队列的RPC通信部分的协议格式、消息编解码、通信方式(同步/异步/单向)、消息收发流程和Netty的Reactor多线程分离处理架构讲了一遍。同时,联想业界大名鼎鼎的另一款开源分布式消息队列—Kafka,具备高吞吐量和高并发的特性,其网络通信层是如何做到消息的高效传输的呢?为了解开自己心中的疑虑,就查阅了Kafka的Network通信模块的源码,乘机会写本篇文章。 


本文主要通过对Kafka源码的分析来简述其Reactor的多线程网络通信模型和总体框架结构,同时简要介绍Kafka网络通信层的设计与具体实现。


一、Kafka网络通信模型的整体框架概述


Kafka的网络通信模型是基于NIO的Reactor多线程模型来设计的。这里先引用Kafka源码中注释的一段话:

An NIO socket server. The threading model is 1 Acceptor thread that handles new connections. Acceptor has N Processor threads that each have their own selector and read requests from sockets. M Handler threads that handle requests and produce responses back to the processor threads for writing.

相信大家看了上面的这段引文注释后,大致可以了解到Kafka的网络通信层模型,主要采用了1(1个Acceptor线程)+N(N个Processor线程)+M(M个业务处理线程)。下面的表格简要的列举了下(这里先简单的看下后面还会详细说明):


线程数线程名线程具体说明
1kafka-socket-acceptor_%xAcceptor线程,负责监听Client端发起的请求
Nkafka-network-thread_%dProcessor线程,负责对Socket进行读写
Mkafka-request-handler-_%dWorker线程,处理具体的业务逻辑并生成Response返回


Kafka网络通信层的完整框架图如下图所示:


640?wx_fmt=png


刚开始看到上面的这个框架图可能会有一些不太理解,并不要紧,这里可以先对Kafka的网络通信层框架结构有一个大致了解。本文后面会结合Kafka的部分重要源码来详细阐述上面的过程。这里可以简单总结一下其网络通信模型中的几个重要概念:


(1) Acceptor:1个接收线程,负责监听新的连接请求,同时注册OPACCEPT 事件,将新的连接按照"round robin"方式交给对应的 Processor 线程处理; 


(2) Processor:N个处理器线程,其中每个 Processor 都有自己的 selector,它会向 Acceptor 分配的 SocketChannel 注册相应的 OPREAD 事件,N 的大小由“num.networker.threads”决定; 


(3) KafkaRequestHandler:M个请求处理线程,包含在线程池—KafkaRequestHandlerPool内部,从RequestChannel的全局请求队列—requestQueue中获取请求数据并交给KafkaApis处理,M的大小由“num.io.threads”决定; 


(4) RequestChannel:其为Kafka服务端的请求通道,该数据结构中包含了一个全局的请求队列 requestQueue和多个与Processor处理器相对应的响应队列responseQueue,提供给Processor与请求处理线程KafkaRequestHandler和KafkaApis交换数据的地方;


(5) NetworkClient:其底层是对 Java NIO 进行相应的封装,位于Kafka的网络接口层。Kafka消息生产者对象—KafkaProducer的send方法主要调用NetworkClient完成消息发送; 


(6) SocketServer:其是一个NIO的服务,它同时启动一个Acceptor接收线程和多个Processor处理器线程。提供了一种典型的Reactor多线程模式,将接收客户端请求和处理请求相分离; 


(7) KafkaServer:代表了一个Kafka Broker的实例;其startup方法为实例启动的入口; 


(8) KafkaApis:Kafka的业务逻辑处理Api,负责处理不同类型的请求;比如“发送消息”、“获取消息偏移量—offset”和“处理心跳请求”等;


二、Kafka网络通信层的设计与具体实现


这一节将结合Kafka网络通信层的源码来分析其设计与实现,这里主要详细介绍网络通信层的几个重要元素——SocketServer、Acceptor、Processor、RequestChannel、KafkaRequestHandler 和 KafkaApis。本文分析的源码部分均基于 Kafka 的 0.11.0 版本。


1、SocketServer


SocketServer是接收客户端Socket请求连接、处理请求并返回处理结果的核心类,Acceptor及Processor的初始化、处理逻辑都是在这里实现的。在KafkaServer实例启动时会调用其startup的初始化方法,会初始化1个 Acceptor和N个Processor线程(每个EndPoint都会初始化,一般来说一个Server只会设置一个端口),其实现如下:


  1. def startup() {

  2.    this.synchronized {

  3.      connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)

  4.      val sendBufferSize = config.socketSendBufferBytes

  5.      val recvBufferSize = config.socketReceiveBufferBytes

  6.      val brokerId = config.brokerId

  7.      var processorBeginIndex = 0

  8.      // 一个broker一般只设置一个端口

  9.      config.listeners.foreach { endpoint =>

  10.        val listenerName = endpoint.listenerName

  11.        val securityProtocol = endpoint.securityProtocol

  12.        val processorEndIndex = processorBeginIndex + numProcessorThreads

  13.        //N 个 processor

  14.        for (i <- processorBeginIndex until processorEndIndex)

  15.          processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol, memoryPool)

  16.        //1个 Acceptor

  17.        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,

  18.          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)

  19.        acceptors.put(endpoint, acceptor)

  20.        KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()

  21.        acceptor.awaitStartup()

  22.        processorBeginIndex = processorEndIndex

  23.      }

  24.    }


2、Acceptor


Acceptor是一个继承自抽象类AbstractServerThread的线程类。Acceptor的主要任务是监听并且接收客户端的请求,同时建立数据传输通道—SocketChannel,然后以轮询的方式交给一个后端的Processor线程处理(具体的方式是添加socketChannel至并发队列并唤醒Processor线程处理)。 


在该线程类中主要可以关注以下两个重要的变量:


(1) nioSelector:通过NSelector.open()方法创建的变量,封装了JAVA NIO Selector的相关操作; 


(2) serverChannel:用于监听端口的服务端Socket套接字对象; 


下面来看下Acceptor主要的run方法的源码:


  1. def run() {

  2.    //首先注册OP_ACCEPT事件

  3.    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)

  4.    startupComplete()

  5.    try {

  6.      var currentProcessor = 0

  7.      //以轮询方式查询并等待关注的事件发生

  8.      while (isRunning) {

  9.        try {

  10.          val ready = nioSelector.select(500)

  11.          if (ready > 0) {

  12.            val keys = nioSelector.selectedKeys()

  13.            val iter = keys.iterator()

  14.            while (iter.hasNext && isRunning) {

  15.              try {

  16.                val key = iter.next

  17.                iter.remove()

  18.                if (key.isAcceptable)

  19.                  //如果事件发生则调用accept方法对OP_ACCEPT事件处理

  20.                  accept(key, processors(currentProcessor))

  21.                else

  22.                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")

  23.                //轮询算法

  24.                // round robin to the next processor thread

  25.                currentProcessor = (currentProcessor + 1) % processors.length

  26.              } catch {

  27.                case e: Throwable => error("Error while accepting connection", e)

  28.              }

  29.            }

  30.          }

  31.        }

  32.       //代码省略

  33.  }

  34.  def accept(key: SelectionKey, processor: Processor) {

  35.    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]

  36.    val socketChannel = serverSocketChannel.accept()

  37.    try {

  38.      connectionQuotas.inc(socketChannel.socket().getInetAddress)

  39.      socketChannel.configureBlocking(false)

  40.      socketChannel.socket().setTcpNoDelay(true)

  41.      socketChannel.socket().setKeepAlive(true)

  42.      if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)

  43.        socketChannel.socket().setSendBufferSize(sendBufferSize)

  44.      processor.accept(socketChannel)

  45.    } catch {

  46.        //省略部分代码

  47.    }

  48.  }

  49.  def accept(socketChannel: SocketChannel) {

  50.    newConnections.add(socketChannel)

  51.    wakeup()

  52.  }


在上面源码中可以看到,Acceptor线程启动后,首先会向用于监听端口的服务端套接字对象—ServerSocketChannel上注册OPACCEPT 事件。然后以轮询的方式等待所关注的事件发生。如果该事件发生,则调用accept()方法对OPACCEPT事件进行处理。这里,Processor是通过round robin方法选择的,这样可以保证后面多个Processor线程的负载基本均匀。 Acceptor的accept()方法的作用主要如下:


(1) 通过SelectionKey取得与之对应的serverSocketChannel实例,并调用它的accept()方法与客户端建立连接; 


(2) 调用connectionQuotas.inc()方法增加连接统计计数;并同时设置第 (1) 步中创建返回的socketChannel属性(如sendBufferSize、KeepAlive、TcpNoDelay、configureBlocking等) ;


(3) 将socketChannel交给processor.accept()方法进行处理。这里主要是将socketChannel加入Processor处理器的并发队列newConnections队列中,然后唤醒Processor线程从队列中获取socketChannel并处理。其中,newConnections会被Acceptor线程和Processor线程并发访问操作,所以newConnections是ConcurrentLinkedQueue队列(一个基于链接节点的无界线程安全队列)


3、Processor


Processor同Acceptor一样,也是一个线程类,继承了抽象类AbstractServerThread。其主要是从客户端的请求中读取数据和将KafkaRequestHandler处理完响应结果返回给客户端。在该线程类中主要关注以下几个重要的变量: 


(1) newConnections:在上面的Acceptor一节中已经提到过,它是一种ConcurrentLinkedQueue[SocketChannel]类型的队列,用于保存新连接交由Processor处理的socketChannel;


(2) inflightResponses:是一个Map[String, RequestChannel.Response]类型的集合,用于记录尚未发送的响应; 


(3) selector:是一个类型为KSelector变量,用于管理网络连接; 下面先给出Processor处理器线程run方法执行的流程图:

640?wx_fmt=png

从上面的流程图中能够可以看出Processor处理器线程在其主流程中主要完成了这样子几步操作:


(1) 处理newConnections队列中的socketChannel。遍历取出队列中的每个socketChannel并将其在selector上注册OPREAD事件;


(2) 处理RequestChannel中与当前Processor对应响应队列中的Response。在这一步中会根据responseAction的类型(NoOpAction/SendAction/CloseConnectionAction)进行判断,若为“NoOpAction”,表示该连接对应的请求无需响应;若为“SendAction”,表示该Response需要发送给客户端,则会通过“selector.send”注册OPWRITE事件,并且将该Response从responseQueue响应队列中移至inflightResponses集合中;“CloseConnectionAction”,表示该连接是要关闭的; 


(3) 调用selector.poll()方法进行处理。该方法底层即为调用nioSelector.select()方法进行处理。


(4) 处理已接受完成的数据包队列—completedReceives。在processCompletedReceives方法中调用“requestChannel.sendRequest”方法将请求Request添加至requestChannel的全局请求队列—requestQueue中,等待KafkaRequestHandler来处理。同时,调用“selector.mute”方法取消与该请求对应的连接通道上的OPREAD事件;


(5) 处理已发送完的队列—completedSends。当已经完成将response发送给客户端,则将其从inflightResponses移除,同时通过调用“selector.unmute”方法为对应的连接通道重新注册OPREAD事件; 


(6) 处理断开连接的队列。将该response从inflightResponses集合中移除,同时将connectionQuotas统计计数减1;


4、RequestChannel


在Kafka的网络通信层中,RequestChannel为Processor处理器线程与KafkaRequestHandler线程之间的数据交换提供了一个数据缓冲区,是通信过程中Request和Response缓存的地方。因此,其作用就是在通信中起到了一个数据缓冲队列的作用。Processor线程将读取到的请求添加至RequestChannel的全局请求队列—requestQueue中;KafkaRequestHandler线程从请求队列中获取并处理,处理完以后将Response添加至RequestChannel的响应队列—responseQueue中,并通过responseListeners唤醒对应的Processor线程,最后Processor线程从响应队列中取出后发送至客户端。


5、KafkaRequestHandler


KafkaRequestHandler也是一种线程类,在KafkaServer实例启动时候会实例化一个线程池—KafkaRequestHandlerPool对象(包含了若干个KafkaRequestHandler线程),这些线程以守护线程的方式在后台运行。在KafkaRequestHandler的run方法中会循环地从RequestChannel中阻塞式读取request,读取后再交由KafkaApis来具体处理。


6、KafkaApis


KafkaApis是用于处理对通信网络传输过来的业务消息请求的中心转发组件。该组件反映出Kafka Broker Server可以提供哪些服务。


三、总结


仔细阅读Kafka的NIO网络通信层的源码过程中还是可以收获不少关于NIO网络通信模块的关键技术。Apache的任何一款开源中间件都有其设计独到之处,值得借鉴和学习。对于任何一位使用Kafka这款分布式消息队列的同学来说,如果能够在一定实践的基础上,再通过阅读其源码能起到更为深入理解的效果,对于大规模Kafka集群的性能调优和问题定位都大有裨益。 对于刚接触Kafka的同学来说,想要自己掌握其NIO网络通信层模型的关键设计,还需要不断地使用本地环境进行debug调试和阅读源码反复思考。限于笔者的才疏学浅,对本文内容可能还有理解不到位的地方,如有阐述不合理之处还望留言一起探讨。后续还会根据自己的实践和研发,陆续发布关于Kafka分布式消息队列的其他相关技术文章,敬请关注。


作者:胡宗棠,中移(苏州)软件技术有限公司,云计算软件高级研发工程师,从事公有云产品平台研发、架构设计;目前专注于大型分布式系统的高并发、高可用设计。曾就职于蚂蚁金服支付宝,甲骨文中国研发中心,个人公众号:匠心独运的博客。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/525636.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何系统学习领域驱动设计?

一、领域驱动设计为何又焕发青春&#xff1f;领域驱动设计&#xff08;Domain Driven Design&#xff0c;DDD&#xff09;确实已不再青春&#xff0c;从 Eric Evans 出版了划时代的著作《领域驱动设计》至今&#xff0c;已有将近十五年的时间&#xff0c;在软件设计领域中&…

如何运用DDD解决团队协作与沟通问题?

领域驱动设计的核心是“领域”&#xff0c;因此要运用领域驱动设计&#xff0c;从一开始就要让团队走到正确的点上。当我们组建好了团队之后&#xff0c;应该从哪里开始&#xff1f;不是UI原型设计&#xff0c;不是架构设计&#xff0c;不是设计数据库&#xff0c;这些事情重要…

微服务等于Spring Cloud?一文告诉你微服务到底是什么

作者&#xff1a;TIM XU 原文&#xff1a;https://xiaoxubeii.github.io/articles/microservices-architecture-introduction/1微服务初探什么是微服务&#xff1f;首先微服务并没有一个官方的定义&#xff0c;想要直接描述微服务比较困难&#xff0c;我们可以通过对比传统WEB应…

我们爬了上千个数据分析师信息, 你真的懂数据分析师嘛?

01 项目简介有人说&#xff0c;这个时代&#xff0c;只要站在了风口&#xff0c;猪都能飞起来&#xff0c;尤其互联网行业&#xff0c;千变万化&#xff0c;日异月殊&#xff0c;一不小心就错过了风口&#xff0c;如果没记错的话&#xff0c;前几年火的是App开发&#xff0c;后…

高可用Redis服务架构分析与搭建

作者&#xff1a;HorstXu 原文&#xff1a;https://www.cnblogs.com/xuning/p/8464625.html基于内存的Redis应该是目前各种Web开发业务中最为常用的Key-Value数据库了&#xff0c;我们经常在业务中用其存储用户登陆态&#xff08;Session存储&#xff09;&#xff0c;加速一些热…

java怎么弄redis,java怎么使用redis

开始在 Java 中使用 Redis 前&#xff0c; 我们需要确保已经安装了 redis 服务及 Java redis 驱动&#xff0c;且你的机器上能正常使用 Java。Java的安装配置可以参考我们的 Java开发环境配置 接下来让我们安装 Java redis 驱动&#xff1a;首先你需要下载驱动包 下载 jedis.ja…

我花了14个小时找了一下长春长生们究竟卖到了哪里去

前言本文首发于个人的公众号和v2ex&#xff0c;事先也没想到会有这么多人关注。在这边重新编辑一下&#xff0c;去掉了原先前言中对此次疫苗事件背景的描述及部分不严谨的措辞。全文的观点从技术讨论出发&#xff0c;尽量客观中立&#xff0c;观点及行为为员工自发&#xff0c;…

我们分析了50万条拼多多商品数据,告诉你到底是消费升级还是降级?

作者&#xff1a;放开那个猕猴桃来源&#xff1a;人工智能与大数据生活转自&#xff1a;知乎&#xff0c;恋习Python一、缘起我在杭州有位朋友&#xff0c;提到有家做社交的电商很火&#xff0c;叫拼多多&#xff0c;我没有在意&#xff0c;直到有一天&#xff0c;我居然在电视…

java并发集合面试题,那些经常被问的JAVA面试题(1)—— 集合部分

【本文转自极客原创 作者&#xff1a;张锋 原文链接&#xff1a;】Java集合框架是什么&#xff1f;说出一些集合框架的优点&#xff1f;每种编程语言中都有集合&#xff0c;最初的Java版本包含几种集合类&#xff1a;Vector、Stack、HashTable和Array。随着集合的广泛使用&…

拯救阿波罗14号!那些伟大太空计划背后的计算机工程师们

&#xfeff;&#xfeff;1971年1月31日&#xff0c;阿波罗14号发射。外太空旅行近一周后的凌晨&#xff0c;回程中的阿波罗14号突然面临一个严峻的问题&#xff1a;它可能没办法正常降落了。问题出在其计算机工程师Don Eyles编写的一个应急程序上&#xff0c;他必须在很短的时…

golang web php,golang 适合做web开发吗

使用go语言来做web开发&#xff0c;是非常方便的。如果不使用框架&#xff0c;仅仅使用net/http包&#xff0c;也能快速开发一个web应用。但是&#xff0c;官方包不支持RESTful风格的API&#xff0c;所以我们依然还是需要选择一个框架来帮助我们进行开发。 (推荐学习&#xff1…

我又花了28个小时分析了一下各省二类疫苗采购公示数据

1前 言距离《我花了14个小时找了一下长春长生们究竟卖到了哪里去》发出来已经过去了4天&#xff0c;过去的几天里&#xff0c;每天晚上我都在搜集和整理数据&#xff0c;终于把之前没做完的工作做的差不多了。现在做一个大致的总结&#xff0c;分析相对粗略&#xff0c;大家见…

[重磅] 如何更好地实现服务调用和消息推送

第四届阿里中间件性能挑战赛是由阿里巴巴集团发起&#xff0c;阿里巴巴中间(Aliware)、阿里云天池联合举办&#xff0c;是集团少有的工程性品牌赛事。大赛的初衷是为热爱技术的年轻人提供一个挑战世界级技术问题的舞台&#xff0c;希望选手在追求性能极致的同时&#xff0c;能深…

开发怼产品,天经地义?大惊小怪?

最近&#xff0c;又有一件轰动程序员界的事情发生了&#xff0c;想必大家伙都已经奔走相告了。来回顾下事情的经过&#xff0c;1张图就能说明白了骚不&#xff1f;反正有句话叫「从技术层面出发&#xff0c;总归有办法实现的」&#xff0c;还有这么一句话叫「从技术角度出发&am…

php实现数据排序算法,PHP实现排序堆排序算法

这篇文章主要为大家详细介绍了PHP实现排序堆排序(Heap Sort)算法&#xff0c;具有一定的参考价值&#xff0c;感兴趣的小伙伴们可以参考一下算法引进&#xff1a;在这里我直接引用《大话数据结构》里面的开头&#xff1a;在前面讲到 简单选择排序 &#xff0c;它在待排序的 n 个…

谷歌Edge TPU:将机器学习引入边缘,撬动边缘计算/IOT大“地球”

近期&#xff0c;谷歌在Cloud Next会议上推出其最新产品&#xff0c;Edge TPU芯片和Cloud IOT Edge软件&#xff0c;并将于10月推出Edge TPU开发套件。作为Cloud TPU的补充&#xff0c;目前Edge TPU仅用于推理&#xff0c;专为在边缘运行TensorFlow Lite ML模型而设计。Edge TP…

详解云计算、大数据和人工智能的区别与联系

今天跟大家讲讲云计算、大数据和人工智能。为什么讲这三个东西呢&#xff1f;因为这三个东西现在非常火&#xff0c;并且它们之间好像互相有关系&#xff1a;一般谈云计算的时候会提到大数据、谈人工智能的时候会提大数据、谈人工智能的时候会提云计算……感觉三者之间相辅相成…

关于腾讯云丢数据事件的一些看法

事件回顾&#xff1a;创业公司“前沿数控”8月5日发文称&#xff0c;公司存放在腾讯云上的精准注册用户以及内容数据全部丢失&#xff0c;并且不能恢复&#xff0c;造成公司平台全部停运的状态。前沿数控表示&#xff0c;公司丢失的数据近千万元级&#xff0c;对此索赔1000余万…

Navicat for mysql备份与恢复

文章目录 一、Navicat for mysql备份1.打开navicat&#xff0c;找到备份2.点击新建备份&#xff0c;直接点备份3.备份完成 二、恢复数据1.删除表2.点击备份&#xff0c;选中备份文件&#xff0c;点击还原备份3.还原完成 三、其他命令四、视频演示总结 一、Navicat for mysql备份…

一文详解微服务架构的数据设计

微服务是一个软件架构模式&#xff0c;对微服务的讨论大多集中在容器或其他技术是否能很好的实施微服务这些方面。本文将从以下几个角度来和大家分享在微服务架构下进行数据设计需要关注的地方&#xff0c;旨在帮助大家在构建微服务架构时&#xff0c;提供一个数据方面的视角:什…