Kafka的Producer和Consumer源码学习

先解释下两个概念:

high watermark (HW)

        它表示已经被commited的最后一个message offset(所谓commited, 应该是ISR中所有replica都已写入),HW以下的消息都已被ISR中各个replica同步,从而保持一致。HW以上的消息可能是脏数据:部分replica写成功,但最终失败了。

Kafka Partition:  1> 均衡各个Broker之间的数据和请求压力; 2> 分摊处理不同的消费者进程; 3> 在partition内可以保证局部有序和状态记录;

Producer发送数据时,Broker的内部处理流程:         a. Broker Server接到一个Producer request

        b. 会先从ZK中获取该topic的metadata

        c. 进而找到partition的metadata

        d. 从而确定对应的partition leader

        e. 接下来通过该leader partition来append log

         f. 最后计算是否调整leader的High Watermark (它是ISR中所有replica的logEndOffset的最小值与leader的logEndOffset比较得出的最大值)         当然,Broker Server还要根据Producer request的需要决定是否回复ack给client;

Kafka Producer配置:

Kafka的Producer采用了 linkedBlockingQueue, 所以用户设置的batchNumMessages不能大于queueBufferingMaxMessages.
Producer线程可以设定为定期更新Topic Metadata (topic.metadata.refresh.interval.ms, 若该值为负值,则取消定期更新);但是一旦Producer遇到失败情况(partition missing or leader not available), 她会自动更新Metadata;
message.send.max.retries: Kafka有可能会出现某个partition leader暂时不可访问的情况,这个配置参数描述了Producer在此种情况下最多retry的次数。
compression.codec:           在Producer配置中,该参数指定压缩模式,默认是NoCompressionCodec(对所有Topic都不使用压缩)
compressed.topics:            在compression.codec不是NoCompressionCodec的前提下,则为指定的若干Topic执行写压缩,当compressed.topics为空时则是为所有topic执行压缩。
producer.type:                       sync or async
metadata.broker.list:           Producer通过这个参数指定的Broker来获取Topic Metadata
partitioner.class:                     关于生产者向指定的分区发送数据,通过设置partitioner.class的属性来指定向那个分区发送数据;如果自己指定必须编写相应的程序,默认是kafka.producer.DefaultPartitioner,分区程序是基于散列的键( Utils.abs(key.hashCode) % numPartitions )。
retry.backoff.ms:                  设置Producer在refresh metadata之前要等待的时间 (Producer在每次retry之前都要refresh metadata, 但是可能partition的leader selection等需要一定的执行时间),默认100毫秒;

Kafka Consumer配置:

group.id:                                           指定consumer所属的consumer group
consumer.id:                                    如果不指定会自动生成
socket.timeout.ms:                      网络请求的超时设定
socket.receive.buffer.bytes:        Socket的接收缓存大小
fetch.message.max.bytes:          试图获取的消息大小之和(bytes)
num.consumer.fetchers:             该消费去获取data的总线程数
auto.commit.enable:                       如果是true, 定期向zk中更新Consumer已经获取的last message offset(所获取的最后一个batch的first message offset)
auto.commit.interval.ms:             Consumer向ZK中更新offset的时间间隔
queued.max.message.chunks:  默认为2
rebalance.max.retries:                     在rebalance时retry的最大次数,默认为4
fetch.min.bytes:                              对于一个fetch request, Broker Server应该返回的最小数据大小,达不到该值request会被block, 默认是1字节。
fetch.wait.max.ms:                           Server在回答一个fetch request之前能block的最大时间(可能的block原因是返回数据大小还没达到fetch.min.bytes规定);
rebalance.backoff.ms:                  当rebalance发生时,两个相邻retry操作之间需要间隔的时间。
refresh.leader.backoff.ms:           如果一个Consumer发现一个partition暂时没有leader, 那么Consumer会继续等待的最大时间窗口(这段时间内会refresh partition leader);
auto.offset.reset:                            当发现offset超出合理范围(out of range)时,应该设成的大小(默认是设成offsetRequest中指定的值):
                                                                    smallest: 自动把该consumer的offset设为最小的offset;
                                                                    largest: 自动把该consumer的offset设为最大的offset;
                                                                    anything else: throw exception to the consumer;
consumer.timeout.ms:                 如果在该规定时间内没有消息可供消费,则向Consumer抛出timeout exception;
                                                             该参数默认为-1, 即不指定Consumer timeout;
client.id:                                           区分不同consumer的ID,默认是group.id

Kafka Consumer如何与ZK交互:

/brokers/ids/[0]...[N-1]: N是Broker个数,每个[i]是一个Broker,里面存储着每个Broker相关的信息(ip,port,epoch等);
/brokers/topics/[topic_name]/partitions/[0]...[N-1]/state: N是这个topic的partition数目,state里存放了每个partition的leader及ISR等信息;
                                                                                                     备注: [topic_name]这个znode本身也存储了所有partition对应的leader broker;

/consumers/[group_id]/ids/[consumer_id]/[topic0]-[topicN]: [consumer_id]是一个临时znode, 其子node是该consumer监听的topic
/consumers/[group_id]/offsets/[topic_name]/[partition_id]: [partition_id]结点中的值即为offset
/consumers/[group_id]/owners/[topic_name]: ?(会陆续补充)

ConsumerFetcherThread的内部命名: 
                                  "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id)

Kafka消费端如何知道从哪个partition消费,以及如何在多个消费者之间平衡对于多个partition的消费? 目前Broker Server端不会做类似处理,client端可以利用zookeeper来动态分配。(此处还有些不解)

Kafka Consumer在ZK中注册后会监听Broker变化及同组内(Consumer Group)consumer的加入或推出,从而自动实现partitions与consumers的平衡。

关于平衡算法,简单的说就是在实现平衡过程中,尽量保证一个consumer只和尽可能少的Broker维持连接。

转载于:https://www.cnblogs.com/tonychai/p/4528208.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/375007.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

for+next()实现数组的遍历及while list each 的使用

//要求使用for循环语句来完成该数组的遍历//输出每一项的键名和对应值&#xff1a; $a array( a > 34, 5 > 51, 13, 32, bb>15, 2 > 31 ); $len count($a); for($i0;$i<$len;$i)…

读Pyqt4教程,带你入门Pyqt4 _005

对话框窗体或对话框是现代GUI应用不可或缺的一部分。dialog定义为两个或多个人之间的交谈。在计算机程序中dialog是一个窗体&#xff0c;用来和程序“交谈”。对话框用来输入数据、修改数据、改变程序设置等等。对话框是用户和计算机程序沟通的重要手段。 QColorDialog 颜色对话…

Linux内核的启动过程分析

秦鼎涛 《Linux内核分析》MOOC课程http://mooc.study.163.com/course/USTC-1000029000  一、实验目的及要求&#xff1a; 使用gdb跟踪调试内核从start_kernel到init进程启动 详细分析从start_kernel到init进程启动的过程并结合实验截图撰写一篇署名博客&#xff0c;并在博客文…

static修饰符详解

static表示“全局”或者“静态”的意思&#xff0c;用来修饰成员变量和成员方法&#xff0c;也可以形成静态static代码块&#xff0c;但是Java语言中没有全局变量的概念。被static修饰的成员变量和成员方法独立于该类的任何对象。也就是说&#xff0c;它不依赖类特定的实例&…

四则运算2+psp0

程序要求&#xff1a; 1.题目避免重复 2.可定制&#xff08;数量\打印方式&#xff09; 3.可以一下控制参数 ① 是否有乘除法 ② 是否有括号&#xff08;最多支持十个数参与运算&#xff09; ③ 数值范围 ④加减有无负数 ⑤除法有无余数 分析&#xff1a;① 如果是两个数…

kettle作业中的js如何写日志文件

在kettle作业中JavaScript脚本有时候也扮演非常重要的角色&#xff0c;此时我们希望有一些日志记录。下面是job中JavaScript记录日志的方式。 job的js写日志的方法。 得到日志输出实例 org.pentaho.di.core.logging.LogWriter.getInstance();按照日志的级别输出&#xff1a; pu…

浅析Kerberos原理,及其应用和管理

文章作者&#xff1a;luxianghao 文章来源&#xff1a;http://www.cnblogs.com/luxianghao/p/5269739.html 转载请注明&#xff0c;谢谢合作。 免责声明&#xff1a;文章内容仅代表个人观点&#xff0c;如有不当&#xff0c;欢迎指正。 --- 一&#xff0c;引言 Kerberos简单来…

2014! 的末尾有多少个0

2014&#xff01; 的末尾有多少个0<?xml version"1.0" encoding"UTF-8"?> 假设 末尾有 k 个0&#xff0c;所以 2014&#xff01; x * 10^k ; 10 ^ k &#xff08;2 * 5 &#xff09;^ k 2^k * 5^k, 明显所有数字中因数含有2的数字多于含有5的数…

[转载]一句话插配置文件

http://www.t00ls.net/viewthread.php?tid13901 一句话插入配置文件system.asp没有过滤双引号&#xff0c;插入一句就行。常规插法如下&#xff1a;"%><%eval request("d")%><%但金刀客这篇文件&#xff08;http://www.cqzh.cn/post/328.html&…

android插件化-获取apkplug框架已安装插件-03

上一篇文章成功的将apkplug框架嵌入了应用中而且启动 链接http://www.apkplug.com/blog/?post10 这一篇文章实现怎样获取全部已安装插件 一 获取框架的SystemBundle的上下文BundleContext apkplug框架启动会自己主动创建一个SystemBundle, 它是框架的第一个插件不可停止和卸…

Java实现栈。

定义一个接口MyStack接口&#xff1a; package Stack; public interface MyStack<T> { boolean isEmpty(); int length(); boolean push(T date); T pop();} 数组实现&#xff1a; package Stack; public class ArrayStack<T> implements MyStack<T>{ privat…

转载]SA权限九种上传方法

刚看了一种方法&#xff0c;如果是注入点&#xff0c;利用管中窥豹以二进制的方式上传&#xff0c;上传的时候最好改下名&#xff0c;比如do.exe&#xff0c;上传到目标服务器可以改成do.cmd&#xff0c;等传上去之后用copy 命令改回来。 当然用啊d也可以上传&#xff0c;还有…

asp.net 导出Excel

asp.net 导出Excel 分享一个asp.net 导出假Excel代码。优点&#xff0c;不用借助于任何插件比如&#xff08;NPOI&#xff09;,复制代码&#xff0c;修改grid.DataSource直接导出。 先看导出后的效果图 1 System.Web.UI.WebControls.DataGrid grid new DataGrid();2 …

bzoj 2300 动态维护上凸壳(不支持删除)

新技能GET。 用set保存点&#xff0c;然后只需要找前趋和后继就可以动态维护了。 1 /**************************************************************2 Problem: 23003 User: idy0024 Language: C5 Result: Accepted6 Time:556 ms7 Memory:4824 kb8 …

带有Guice的富域模型

贫血域模型是一个非常常见的反模式。 在ORM和DI框架的世界中&#xff0c;我们自然会发现自己拥有一个由ORM管理的“域”&#xff0c;该域包含所有数据且无行为。 通过我们的DI框架有帮助地注入了辅助类&#xff0c;这些辅助类都是行为且没有数据。 在本文中&#xff0c;我将介绍…

php匿名函数小示例

<?php //$fun function($params){ // echo $params; //}; // //$fun(aa);//例一 //在普通函数中定义一个匿名函数 //function printStr(){ // $fun function($something){ // echo $something; // }; // $fun(something); // //} //printStr();//例子…

购书心得

作者&#xff1a;泉哥主页&#xff1a;http://riusksk.blogbus.com富家不用买良田&#xff0c;书中自有千钟粟&#xff1b;安居不用架高堂&#xff0c;书中自有黄金屋&#xff1b;出门莫恨无人随&#xff0c;书中车马多如簇&#xff1b;娶妻莫恨无良媒&#xff0c;书中自有颜如…

MariaDB 条件语句WHERE

MariaDB 条件语句WHEREWHERE Clause Operators Operator Description Equality<> Nonequality! Nonequality< Less than< Less than or equal to > Greater than > Greater than or equal to BETWEEN Between two specified values BETWEEN AND (jlive)[c…

Spring 3.1缓存抽象教程

即将发布的Spring 3.1版本中引入的新功能之一是缓存抽象之一 。 Spring Framework提供了对将缓存透明添加到现有Spring应用程序中的支持。 与事务支持类似&#xff0c;缓存抽象允许一致使用各种缓存解决方案&#xff0c;而对代码的影响最小。 从本质上讲&#xff0c;抽象将缓存…