【原创】Kakfa utils源代码分析(一)

Kafka.utils,顾名思义,就是一个工具套件包,里面的类封装了很多常见的功能实现——说到这里,笔者有一个感触:当初为了阅读Kafka源代码而学习了Scala语言,本以为Kafka的实现会用到很多函数编程(Functional Programming, FP),结果目前来看,大部分还是很朴素地以面向对象的方式来实现的,只有很少一部分集合的处理使用诸如map,reduce这样的FP方式。不能不说有点小小的遗憾。——当然也许后面Kafka的核心代码中会看到更多FP的身影。

下图就是kafka.utils包的所有代码:
因为很难像其他包代码之间有逻辑关系,我们就一个一个说吧:
一、Annotations.scala
这个源代码文件中定义了3个注释类:threadsafe、nonthreadsafe和immutable。它们都继承了StaticAnnotation——Scala提供的StaticAnnotation类似于Java中的@Target(ElementType.TYPE),因此主要的作用域是类和接口。具体到这三个元注解(meta-annotation),很容易知道它们的含义:分别标记线程安全、非线程安全和不可变性。Kafka开发中常用到的SimpleConsumer类就是被标记为@threadsafe的。
二. CommandLineUtils.scala
这个文件使用JOpt Simple库负责解析命令行参数,具体使用用法参见官网:http://pholser.github.io/jopt-simple/
Kafka在这个文件中提供了一个object:CommandLineUtils。具体包含的方法有:
1. printUsageAndDie: 打印命令使用方法并终止程序
2. checkRequiredArgs:使用Jopts Simple的API(以下皆同)检查是否缺少必要参数
3. checkInvalidArgs:检查指定的参数是否存在不兼容情况,即哪些参数不能同时使用
4. parseKeyValueArgs:解析key=value格式的参数对,并返回一个Properties对象
三、Crc32.scala
这个类就是CRC32校验码的实现类,来自于Hadoop提供的PureJavaCrc32类——CRC32校验码的纯Java实现版本。这个类很长,里面有很多位操作,由于CRC32计算不在本次研究范围,所以就了解到这吧。
四、DelayedItem.scala
这个类是个泛型类,实现了java.util.Delayed接口。用于标记那些在给定延迟时间之后执行的对象。该类接收一个泛型T,一个延迟时间以及延迟时间的单位。另外,实现这个接口的话必须要实现一个compareTo和getDelay方法。
1. getDelay: 计算距离触发时间还剩下多长时间
2. compareTo: 比较2个Delayed对象的延迟触发时间
五、FileLock.scala
顾名思义,FileLock就是一个文件锁,它的构造函数接收一个文件对象,并总是先尝试创建这个文件(如果不存在的话),然后创建一个FileChannel对象对该文件进行随机读写操作。同时创建一个java.nio.channel.FlieLock文件锁对象用于实现下面的方法:
1. lock: 对文件加锁,如果该文件上已有锁抛出异常
2. tryLock: 尝试对文件加锁,如果成功返回true,否则返回false
3. unlock: 如果持有锁使用FileLock.release方法释放锁
4. destroy: 先释放锁然后调用FileChannel的close方法销毁该channel
六、IteratorTemplate.scala
这个文件视图定义一个迭代器模板,主要为遍历消息集合使用。迭代器模板有一个状态字段,因此在定义迭代器模板抽象类之前首先定义了一个State状态object,以及一组具体的状态object:完成(DONE),READY(准备就绪),NOT_READY(未准备)和FAILED(失败)。
之后就是定义IteratorTemplate抽象类了,它同时实现了trait Iterator和java Iterator接口——可谓迭代器领域的集大成者:)
 
如前所述,该类有个字段表明了迭代器的状态:state,还有一个nextItem字段执行遍历中的下一个对象,当然初始化为null——说起null,想到一个题外话。我很怀疑Kafka的开发人员是深度的Java编程人员亦或是强面向对象开发人员,Scala推荐使用Option来代替null的,可Kafka的代码中null还是随处可见,当然可能也是为了更好更自然地与Java集成。
 
这个抽象类提供很多方法,但似乎只有一个抽象方法:makeNext,其他全是具体方法:
1. next:如果迭代器已遍历完并无法找到下一项或下一项为空,直接抛出异常;否则将状态置为NOT_READY并返回下一项
2. peek:只是探查一下迭代器是否遍历完,如果是抛出异常,否则直接返回下一项,并不做非空判断,也不做状态设置
3. hasNext: 如果状态为FAILED直接抛出异常,如果是DONE返回false,如果是READY返回true,否则调用maybeComputeNext方法
4. makeNext: 返回下一项,这是你需要唯一需要实现的抽象方法。同时你还需要在该方法中对状态字段进行更新
5. maybeComputeNext:调用makeNext获取到下一项,如果状态是DONE返回false,否则返回true并将状态置为READY
6. allDone: 将状态置为DONE并返回null
7. resetStatus:顾名思义,就是重置状态字段为NOT_READY
七、JSON.scala
JSON的一个封装类,用于JSON到String的相互转换,该类不是线程安全的。Scala提供的JSON是将数字型的字符串转化为Double,不过该类创建一个简单函数用于将数字型字符串转为换Integer,并指定其为JSON.globalNumberParser。该类只有2个方法:
1. parseFull: 调用scala JSON的parseFull方法将一个json字符串转化为一个对象,如果出错则抛出异常
2. encode: 讲一个对象编码成json字符串。这个对象只能是null,Boolean,String,Number,Map[String, T],Array[T]或Iterable[T]中的一种,否则会报错

转载于:https://www.cnblogs.com/huxi2b/p/4378439.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/431917.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

redhad yum 安装mysql_redhat7通过yum安装mysql5.7.17教程

rhel/centos系列linux操作系统自身没有mysql的源,需要自行下载安装。本文介绍如何安装mysql5.7.x数据库。第一步:下载源[rootclient ~]# wget http://repo.mysql.com/mysql57-community-release-el7-8.noarch.rpm注意:选择mysql57-community-…

codechef Polo the Penguin and the Tree

一般xor 的题目都是用trie解决。 那这道题是在树上的trie; 首先:从root1,遍历树得到1到所有节点的xor 值。 然后对于每个点我们把其插入二进制树中。 对于每一个点查找其二进值异或值最大的数 依次遍历下来。 注意:边的数量开两倍以上,RE很多…

mysql memcached 使用场景_memcache 应用场景

一..memcache应用场景1.应用场景一: 缓解数据库压力,提高交互速度。它的一个总原则是将经常需要从数据库读取的数据缓存在memcached中。这些数据也分为几类:(1)、经常被读取并且实时性要求不强可以等到自动过期的数据。例如网站首页最新文章列…

link2001错误无法解析外部符号metaObject

http://blog.sina.com.cn/s/blog_791f544a0100r01b.html1>MainWindowBottomWidget.obj : error LNK2001: 无法解析的外部符号 "public: virtual struct QMetaObject const * __thiscall MainWindowBottomWidget::metaObject(void)const " (?metaObjectMainWindow…

mysql主从和dump_MySQL主从同步--原理及实现(一)

1、什么是mysql主从同步?当master(主)库的数据发生变化的时候,变化会实时的同步到slave(从)库。2、主从同步有什么好处?水平扩展数据库的负载能力。容错,高可用。Failover(失败切换)/High Availability数据备份。3、主从同步的原理…

【转】Mybatis/Ibatis,数据库操作的返回值

该问题,我百度了下,根本没发现什么有价值的文章;还是看源代码(详见最后附录)中的注释,最有效了!insert,返回值是:新插入行的主键(primary key)&am…

解密多媒体封装解封装框架

上一篇文章我们搭好了环境并编译出所需的ffmpeg库,本篇我们讨论如何利用ffmpeg提供的API函数进行多媒体文件的解封装(demux)过程。在讲解之前,我们需要了解一些基本的多媒体文件知识,大虾请飘过。 容器格式&#xff1a…

python入门及日常应用_python的日常应用-入门篇02

大部分人在编写自己第一个程序的时候会做什么?当然是让你的程序对我们的世界大喊一声“Hello world!”了。今天我们来学习的便是Python中的输出语句。如何让你的程序“说话”?我们想要让程序帮我们做事之前首先要教会程序怎么“说话”,这样我…

bzoj 3611

和BZOJ消耗站一样&#xff0c;先将那个询问的简图构建出来&#xff0c;然后就是简单的树形DP。 &#xff08;倍增数组开小了&#xff0c;然后就狂WA&#xff0c;自己生成的极限数据深度又没有那么高&#xff0c;链又奇迹般正确&#xff09; 1 #include <cstdio>2 #includ…

vscode添加源文件_VSCode自制的IDE编译多个源文件

文/EdwardVSCode的预定义变量我们上一篇文章中讲述了如何将MinGW工具嵌入到VSCode文本编辑器中&#xff0c;在这个配置的过程中&#xff0c;我们只需要通过修改VSCode生成的“luanch.json”和“task.json”两个JSON文件中的特定字段&#xff0c;就可以实现开发环境的搭建。那么…

c# 第四课 interfaces

An interface is a contract(协定) that guarantees to a client how a class or struct will behave.When a class implements an interface(实现一个接口), it tells any potential(可能的) client “I guarantee I’ll support all the methods, properties, events, and in…

mysql+自动还原备份_Mysql 自动备份与恢复

自动备份MySql 5.0有三个方案&#xff1a;备份方案一&#xff1a; 通过 mysqldump命令,直接生成一个完整的 .sql 文件Step 1: 创建一个批处理(说明&#xff1a;root 是mysql默认用户名, aaaaaa 是mysql密码, bugtracker 是数据库名)------------mySql_backup.bat--------------…

SqlServer按时间自动生成生成单据编号

SET _tmpDateTime GETDATE() EXEC dbo.Dtw_Common_GenerateProofCode ProofType SO,WhsCodeWhsCode, ProofDate _tmpDateTime, RtnCode _tmpProofCode OUTPUT --生成的最终的CODE USE [SZVB]GO/****** Object: StoredProcedure [dbo].[Dtw_Common_GenerateProofCode]…

hive创建分区表 指定分隔符_HIVE 对于分区表的操作

CREATE EXTERNALTABLE IF NOT EXISTS data_zh(ROWKEY STRING,STATION INT,YEAR INT,MONTH INT,DAY INT,HOUR INT,MINUTE INT,)PARTITIONED BY (AGE INT)指定分区(此列并没真正存储列&#xff0c;也就是不存于你的数据中。但是如果你的数据从Oracle按年份导出&#xff0c;按照年…

Web Service 学习

1. Web services 平台的元素&#xff1a; SOAP (简易对象访问协议) UDDI (通用描述、发现及整合) WSDL (Web services 描述语言)1.1 什么是 SOAP&#xff1f; 基本的 Web services 平台是 XML HTTP。 SOAP 指简易对象访问协议 SOAP 是一种通信协议 SOAP 用于应用程序之间的通信…

java高级mysql面试题_Java高级面试题

一.基础知识&#xff1a;1)集合类&#xff1a;List和Set比较&#xff0c;各自的子类比较(ArrayList&#xff0c;Vector&#xff0c;LinkedList&#xff1b;HashSet&#xff0c;TreeSet)&#xff1b;2)HashMap的底层实现&#xff0c;之后会问ConcurrentHashMap的底层实现&#x…

转:Oracle 中union的用法

UNION 指令的目的是将两个 SQL 语句的结果合并起来,可以查看你要的查询结果. 例如: SELECT Date FROM Store_Information UNION SELECT Date FROM Internet_Sales 注意:union用法中,两个select语句的字段类型匹配,而且字段个数要相同,如上面的例子,在实际的软件开发过程,会遇到…

mysql skip_counter_mysql的三个故障解决小结

mysql使用过程中经常会遇到的三个故障&#xff0c;在此小结一下。1、MySQl服务无法启动我们在使用mysql的过程中&#xff0c;常会遇到MySQl服务无法启动&#xff0c;具体报错信息&#xff1a;Starting MySQL ERROR.The server quit without updating PID file (/[FAILED]l/mysq…

Httpd 使用ip可以访问,localhost和127.0.0.1不能访问

解决方法&#xff1a;打开/etc/httpd/conf目录下的httpd.conf文件&#xff0c; 加入 Listen 127.0.0.1:81 加入后&#xff1a; Listen xxx.xxx.xxx.xxx:81 Listen 127.0.0.1:81 其中xxx.xxx.xxx.xxx是我的ip 这样通过ip、localhost、127.0.0.1都可以访问了 转载于:https://www.…

如何将每一条记录放入到对应的范围中

编程序的时候遇到一个问题&#xff1a; 画热图 &#xff1a;计算热力值--->画网格&#xff0c;将在一定范围内定位出的mac累积计数--->编写出了定位程序&#xff0c;但是如何将每个具体的坐标值放入对应的范围&#xff08;网格&#xff09;--->因为具体坐标和网格选取…