一、Kafka术语
Kafka属于分布式的消息引擎系统,它的主要功能是提供一套完备的消息发布与订阅解决方案。可以为每个业务、每个应用甚至是每类数据都创建专属的主题。
Kafka的服务器端由被称为Broker的服务进程构成,即一个Kafka集群由多个Broker组成,Broker负责接收和处理客户端发送过来的请求, 以及对消息进行持久化。常见的做法是将不同的Broker分散运行在不同的机器上,这样如果集群中某一台机器宕机,即使在它上面运行的所有Broker进程都挂掉了,其他机器上的Broker也依然能够对外提供服务。这其实就是Kafka提供高可用的手段之一。
实现高可用的另一个手段就是备份机制(Replication)。备份的思想很简单,就是把相同的数据拷贝到多台机器上,而这些相同的数据拷贝在Kafka中被称为副本 (Replica)。Kafka定义了两 类副本:领导者副本(Leader Replica)和追随者副本(Follower Replica)。
领导者副本(Leader Replica):提供与客户端程序进行交互的作用。
追随者副本(Follower Replica):不能与外界进行交互,只是被动地追随领导者副本。
副本的工作机制:生产者总是向领导者副本写消息;而消费者总是从领导者副本读消息。追随者副本向领导者副本发送请求,请求领导者把最新生产的消息发给它,这样它能保持与领导者的同步。
虽然有了副本机制可以保证数据的持久化或消息不丢失,但没有解决伸缩性的问题。伸缩性即所谓的Scalability,是分布式系统中非常重要且必须要谨慎对待的问题。什么是伸缩性呢?我们拿副本来说,虽然现在有了领导者副本和追随者副本,但倘若领导者副本积累了太多的数据以至于单台Broker机器都无法容纳了,此时应该怎么办呢?一个很自然的想法就是,能否把数据分割成多份保存在不同的Broker上? 这种机制就是所谓的分区(Partitioning)。
Kafka中的分区机制指的是将每个主题划分成多个分区(Partition),每个分区是一组有序的消息日志。生产者生产的每条消息只会被发送到一个分区中,也就是说如果向一个双分区的主题发送一条消息,这条消息要么在分区0中,要么在分区1中。Kafka的分区编号是从0开始的,如果Topic有100个分区,那么它们的分区号就是从0到99。
副本如何与这里的分区联系在一起呢;实际上,副本是在分区这个层级定义的。每个分区下可以配置若干个副本,其中只能有1个领导者副本和N-1个追随者副本。生产者向分区写入消息,每条消息在分区中的位置信息由一个叫位移(Offset)的数据来表征。分区位移总是从0开始,假设一个生产者向一个空分区写入了10条消息,那么这10条消息的位移依次是0、1、2、…、9。
Kafka的三层消息架构:
1. 第一层是主题层,每个主题可以配置M个分区,而每个分区又可以配置N个副本。
2. 第二层是分区层,每个分区的N个副本中只能有一个充当领导者角色,对外提供服务;其他N-1个副本是追随者副本,只是提供数据冗余之用。
3. 第三层是消息层,分区中包含若干条消息,每条消息的位移从0开始,依次递增。
最后,客户端程序只能与分区的领导者副本进行交互。
Kafka Broker是如何持久化数据的。
总的来说,Kafka使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Appendonly)消息的物理文件。因为只能追加写入,故避免了缓慢的随机I/O操作,改为性能较好的顺序I/O写操作,这也是实现Kafka高吞吐量特性的一个重要手段。不过如果你不停地向一个日志写入消息,最终也会耗尽所有的磁盘空间,因此Kafka必然要定期地删除消息以回收磁盘。怎么删除呢?简单来说就是通过日志段(Log Segment) 机制。在Kafka底层,一个日志又近一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka会自动切分出一个新的日志段,并将老的日志段封存起来。Kafka在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的。
总结:
消息:Record。服务代理节点,Kafka服务实例。 n个组成一个Kafka集群,通常一台机器部署一个Kafka实例,一个实例挂了其他实例仍可以使用,体现了高可用。
主题:Topic。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。
分区:Partition。一个topic 可以拥有若干个partition(从 0 开始标识partition ),分布在不同的broker 上, 实现发布与订阅时负载均衡。producer 通过自定义的规则将消息发送到对应topic 下某个partition,以offset标识一条消息在一个partition的唯一性。一个partition拥有多个replica,提高容灾能力。partition在机器磁盘上以log 体现,采用顺序追加日志的方式添加新消息、实现高吞吐量
消息位移:Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值。
副本:Replica。Kafka中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。leader副本负责读写请求,follower 副本负责同步leader副本消息,通过副本选举实现故障转移。
生产者:Producer。向主题发布新消息的应用程序。
消费者:Consumer。从主题订阅新消息的应用程序。
消费者位移:Consumer Offset。表征消费者消费进度,每个消费者都有自己的消费者位移。
消费者组:Consumer Group。多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。
重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance是Kafka消费者端实现高可用的重要手段。
二、集群参数配置
(1)Broker端参数
Broker端参数也被称为静态参数(Static Configs)。静态参数,是指你必须在Kafka的配置文件server.properties中进行设置的参数,不管你是新增、修改还是删除。同时,你必须重启Broker进程才能令它们生效。
1. 针对存储信息的重要参数
Broker是需要配置存储信息的,即Broker使用哪些磁盘。那么针对存储信息的重要参数有以下这么几个:
log.dirs:这是非常重要的参数,指定了Broker需要使用的若干个文件目录路径。要知道这个参数是没有默认值的,这说明什么?这说明它必须由你亲自指定。
log.dir:注意这是dir,结尾没有s,说明它只能表示单个路径,它是补充上一个参数用的。
只需要设置log.dirs,即第一个参数就好了,不要设置log.dir。而且更重要的是,在线上生产环境中一定要为log.dirs配置多个路径,具体格式是一个CSV格式, 也就是用逗号分隔的多个路径,比如/home/kafka1,/home/kafka2,/home/kafka3这样。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。这样做有两个好处:
1. 提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
2. 能够实现故障转移:即Failover。这是Kafka 1.1版本新引入的强大功能。要知道在以前,只要Kafka Broker使用的任何一块磁盘挂掉了,整个Broker进程都会关闭。但是自1.1开始,这种情况被修正了,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且Broker还能正常工作。
2. 与ZooKeeper相关的设置
ZooKeeper是一个分布式协调框架,负责协调管理并保存Kafka集群的所有元数据信息,比如集群都有哪些Broker在运行、创建了哪些Topic,每个Topic都有多少分区以及这些分区的Leader副本都在哪些机器上等信息。
Kafka与ZooKeeper相关的最重要的参数当属zookeeper.connect。这也是一个CSV格式的参数,如指定它的值为zk1:2181,zk2:2181,zk3:2181。2181是ZooKeeper的默认端口。
让多个Kafka集群使用同一套ZooKeeper集群,那么这个参数应该怎么设置呢?
如果你有两套Kafka集群,假设分别叫它们kafka1和kafka2,那么两套集群的zookeeper.connect参数可以这样指定:zk1:2181,zk2:2181,zk3:2181/kafka1和zk1:2181,zk2:2181,zk3:2181/kafka2。
3. 与Broker连接相关的
即客户端程序或其他Broker如何与该Broker进行通信的设置。有以下三个参数:
listeners:学名叫监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的Kafka服务。格式为<协议名称,主机名,端口号>,比如CONTROLLER: //localhost:9092。一旦自定义了协议名称,还要指定listener.security.protocol.map参数告诉这个协议底层使用了哪种安全协议,比如指定listener.security.protocol.map=CONTROLLER:PLAINTEXT表示 CONTROLLER这个自定义协议底层使用明文不加密传输数据。
advertised.listeners:和listeners相比多了个advertised。Advertised的含义表示宣称的、公布的,就是说这组监听器是Broker用于对外发布的。
host.name/port:列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了。
最好全部使用主机名,即Broker端和Client端应用配置中全部填写主机名。
4. 第四组参数是关于Topic管理
auto.create.topics.enable:是否允许自动创建Topic。
unclean.leader.election.enable:是否允许UncleanLeader选举。
auto.leader.rebalance.enable:是否允许定期进行Leader选举。
auto.create.topics.enable最好设置成false,即不允许自动创建Topic。
unclean.leader.election.enable是关闭UncleanLeader选举的。何谓Unclean?还记得Kafka有多个副本这件事吗?每个分区都有多个副本来提供高可用。在这些副本中只能有一个副本对外提供服务,即所谓的Leader副本。这些副本只有保存数据比较多的那些副本才有资格竞选。如果设置成false,坚决不能让那些落后太多的副本竞选Leader。这样做的后果是这个分区就不可用了,因为没有Leader了。反之如果是true,那么Kafka允许你从那些“跑得慢”的副本中选一个出来当Leader。这样做的后果是数据有可能就丢失了,因为这些副本保存的数据本来就不全。建议把它设置成false。
auto.leader.rebalance.enable设置它的值为true表示允许Kafka定期地对一些Topic分区进行Leader重选举,需要满足一定的条件才会发生。严格来说它与上一个参数中Leader选举的最大不同在于,它不是选Leader,而是换Leader!比如Leader A一直表现得很好,但若auto.leader.rebalance.enable=true,那么有可能一段时间后Leader A就要被强行卸任换成Leader B。所以建议设置成false。
4. 数据留存方面
log.retention.{hours|minutes|ms}:这是个“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说ms设置最高、minutes次之、hours最低。
log.retention.bytes:这是指定Broker为消息保存的总磁盘容量大小。
message.max.bytes:控制Broker能够接收的最大消息大小。
log.retention.{hours|minutes|ms}三兄弟,举例:log.retention.hours=168表示默认保存7天的数据,自动删除7天前的数据。很多公司把Kafka当做存储来使用,那么这个值就要相应地调大。
log.retention.bytes,这个值默认是-1,表明你想在这台Broker上保存多少数据都可以,至少在容量方面Broker绝对为你开绿灯,不会做任何阻拦。这个参数真正发挥作用的场景其实是在云上构建多租户的Kafka集群:设想你要做一个云上的Kafka服务,每个租户只能使用100GB的磁盘空间,为了避免有个“恶意”租户使用过多的磁盘空间,设置这个参数就显得至关重要了。
(2)Topic级别参数
如果同时设置了Topic级别参数和全局Broker参数,Topic级别参数会覆盖全局Broker参数的值,而每个Topic都能设置自己的参数值,这就是所谓的Topic级别参数。
Topic级别参数的设置有两种方式可以设置:
* 创建Topic时进行设置
* 修改Topic时设置
1. 保存消息(创建时)
retention.ms:规定了该Topic消息被保存的时长。默认是7天,即该Topic只保存最近7天的消息。一旦设置了这个值,它会覆盖掉Broker端的全局参数值。
retention.bytes:规定了要为该Topic预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的Kafka集群中会有用武之地。当前默认值是-1,表示可以无限使用磁盘空间。
例1:
设想你的部门需要将交易数据发送到Kafka进行处理,需要保存最近半年的交易数据,同时这些 数据很大,通常都有几MB,但一般不会超过5MB。现在让我们用以下命令来创建Topic:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880
Kafka开放了kafka-topics命令供我们来创建Topic即可。对于上面这样一条命令,请注意结尾处的--config设置,我们就是在config后面指定了想要设置的Topic级别参数。
例2(推荐使用):
自带的命令kafka-configs来修改Topic级别参数。假设我们现在要发送最大值是10MB的消息,该如何修改呢?命令如下:
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760
(3)JVM参数
Kafka服务器端代码是用Scala语言编写的,但终归还是编译成Class文件在JVM上运行,因此JVM参数设置对于Kafka集群的重要性不言而喻。
个人通用的建议:将JVM堆大小设置成6GB
为kafka设置下面这两个环境变量:
KAFKA_HEAP_OPTS:指定堆大小。
KAFKA_JVM_PERFORMANCE_OPTS:指定GC参数。
例:在启动Kafka Broker之前,先设置上这两个环境变量:
$> export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g
$> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
$> bin/kafka-server-start.sh config/server.properties
(4)操作系统参数
Kafka并不需要设置太多的OS参数,但有些因素最好还是关注一下,比如下面这几个:
文件描述符限制
文件系统类型
Swappiness
提交时间
文件描述符限制(ulimit -n): 其实设置这个参数一点 都不重要,但不设置的话后果很严重,比如你会经常看到“Too many open files”的错误。例如:ulimit -n 1000000。
文件系统类型:指的是如ext3、ext4或XFS这样的日志型文件系统。根据官网的测试报告,XFS的性能要强于ext4,可以自行设置。
swap的调优:设置其为0,将swap完全禁掉以防止Kafka进程使用swap空间;尽量不要设置成0比较好,我们可以设置成一个较小的值。为什么呢?因为一旦设置成0,当物理内存耗尽时,操作系统会触发OOMkiller这个组件,它会随机挑选一个进程然后kill掉,即根本不给用户任何的预警。但如果设置成一个比较小的值,当开始使用swap空间时,你至少能够观测到Broker性能开始出现急剧下降,从而给你进一步调优和诊断问题的时间。基于这个考虑,建议将swappniess配置成一个接近0但不为0的值,比如1。
提交时间:或者说是Flush落盘时间。向Kafka发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据LRU算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。这个定期就是由提交时间来确定的,默认是5秒。一般情况下我们会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。当然你可能会有这样的疑问:如果在页缓存中的数据在写入到磁盘前机器宕机了,那岂不是数据就丢失了。的确,这种情况数据确实就丢失了,但鉴于Kafka在软件层面已经提供了多副本的冗余机制,因此这里稍微拉大提交间隔去换取性能还是一个合理的做法。