Kafka是由LinkedIn开发的一个分布式发布/订阅的消息系统和一个强大的队列,使用Scala编写,它以可扩展和高吞吐率而被广泛使用。 Kafka适合离线和在线消息消费。 Kafka消息保留在磁盘上,并在群集内以master-flower方式实现数据同步,从而防止数据丢失。
1、组件和角色
Producer:消息生产者,发布消息到 kafka 集群的终端或服务。
Consumer:从 kafka 集群中消费消息的终端或服务。
Consumer group:high-level consumer API 中,每个 consumer 都属于一个 consumer group,一个partition只能被同一个 consumer group 中的一个 Consumer 消费,但可以被多个不同consumer group 中的consumer消费。
Broker: 集群中的每一个kafka进程都是一个Broker,通常一台服务器上部署一个broker。
Topic :每条发布到 kafka 集群的消息属于的类别,即kafka是面向topic的,topic是逻辑概念。
Partition:每个topic包含一个或多个partition。kafka分配的单位是partition,partition是物理概念,生产者发送的消息就是保存在partition中的。
Segment:partition物理上由多个segment组成。
offset : 每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续递增的序列号叫做offset,偏移量offset在每个分区中是唯一的。
replica:partition的副本,保障 partition 的高可用。leader和follower统称为Replica。在kafka集群中,为了防止数据丢失,每个partition都会有主分区和从分区,当然,也可以没有从分区。每个partition有且只有一个主分区,可以没有从分区,也可以有一个或者多个从分区。
leader:replica中的一个角色,主分区所在的节点称为leader。在kafka集群中,每个partition都有一个leader,producer和consumer只跟leader交互,leader负责数据的读写。
follower:replica中的一个角色,从分区所在的节点称为follower,从leader中复制(fentch)数据。为了防止leader与follower节点上数据不一致性的问题,kafka没有使用读写分离,而是只在leader节点上读写数据,follower节点只是从leader节点上定期复制数据。如果leader节点异常,随机选择一个follower节点成为leader节点,从而防止数据丢失。
controller:kafka 集群中的一个broker,用来进行 leader 选举以及各种故障转移。
zookeeper:kafka 通过 zookeeper 来存储集群的 meta 信息,meta信息主要包括kafka的broker列表(ip:port)、topic和partition等信息。
AR(Assigned Replica 已分配的副本):表示某个分区的所有副本。
ISR(In-Sync-Replica 在同步中的副本):表示所有与leader副本保持一定程度同步的副本(包括leader副本在内)。
OSR(Out-of-Sync-Replica 不在同步中的副本):表示所有与leader副本同步滞后过多的follower副本(不包括leader副本)。
AR = ISR + OSR
正常情况下,所有的follower副本都应该与leader副本保持同步,即:AR = ISR,OSR集合为空。
(1)controller的选举
Kafka启动时,会在所有的broker(集群节点)中选择一个controller,leader和follower是针对分区而言的,而controller是针对broker而言的。创建topic、添加分区、修改副本数量等管理任务都是由controller完成的,以及Kafka分区leader的选举,也是有controller决定的。
在Kafka集群启动时,每个broker都会将自己注册到zookeeper上,并尝试在zookeeper上抢锁,抢占成功的broker就注册成为Controller(ZK临时节点)。只会有一个broker节点竞争成功,其他的broker会注册该节点的监视器,一旦该临时节点状态发生变化,就可以进行相应的处理。Controller是高可用的,一旦某个broker崩溃,其他的broker会重新注册成为Controller。
(2)leader的选举
所有分区的leader选举都是由controller决定的,controller会将leader的改变直接通过RPC的方式通知需为此做出响应的broker,controller读取到当前分区的ISR,只有一个replica存活时,就选择这个replica作为leader,否则任意选择一个replica作为leader,如果该分区的所有replica都已经宕机,则新的leader为-1。
为什么不通过ZK的方式选举分区的leader?
Kafka集群如果业务很多的情况下,会存在很多的分区,假设某个broker宕机,就会出现很多的分区都需要重新选举leader,如果使用zookeeper选举leader,会给zk带来巨大的压力。因此,Kafka中leader的选举不能使用zk来实现。
2、原理简介
1.一个Topic分为多个Partition来进行数据管理,一个Partition中的数据是有序、不可变的,使用偏移量(offset)唯一标识一条数据,是一个long类型的数据。Partition接收到producer发送过来的数据后,会产生一个递增的offset偏移量数据,同时将数据保存到本地的磁盘文件中(文件内容以追加的方式写入数据),Partition中的数据存活时间超过参数值(log.retention.{ms,minutes,hours},默认7天)的时候进行删除(默认)。Consumer根据offset消费对应Topic的Partition中的数据(也就是每个Consumer消费的每个Topic的Partition都拥有自己的offset偏移量)。注意:Kafka的数据消费是顺序读写的,磁盘的顺序读写速度(600MB/sec)比随机读写速度(100k/sec)快。
2.在Kafka集群中,producer生产数据并发送到对应的Topic。Producer通过push的方式将数据发送到对应Topic的分区,Producer发送到Topic的数据是由key/value键值对组成的,Kafka根据不同的key将数据发送到不同的Partition,默认采用Hash的机制发送数据到对应Topic的不同Partition中,配置参数为{partitioner.class}。也可以配置自定义分配机制,自定义类实现Partitioner接口,重写partition方法的方式。Producer发送数据的方式分为sync(同步)和async(异步)两种,默认为同步方式, 由参数{producer.type}决定;当发送模式为异步发送的时候,Producer提供重试机制,默认失败重试发送3次。
3.如果生产者同步发消息,在收到kafka的ack告知发送成功之前一直处于阻塞状态。如果生产者异步发消息,发送完之后不用等待broker给回复,直接执行后面的业务逻辑。可以提供回调方法,让broker异步的调用callback,告知生产者,消息发送的结果。如果告知的结果异常,再进行相应的处理操作。
4.Kafka有两种模式消费数据:队列和发布订阅;在队列模式下,一条数据只会发送给consumer group中的一个consumer进行消费;在发布订阅模式下,一条数据会发送给多个consumer进行消费。Kafka中通过控制consumer的参数{group.id}来决定kafka是什么数据消费模式,如果所有消费者的该参数值是相同的,那么此时的kafka就是类似于队列模式,数据只会发送到一个consumer,此时类似于负载均衡;否则就是发布订阅模式。Kafka的consumer基于offset对kafka中的数据进行消费。
5.Kafka的数据是按照分区进行排序的(插入的顺序),也就是每个分区中的数据是有序的。在Consumer进行数据消费的时候,也是对分区的数据进行有序消费的, 但是不保证所有数据的有序性(多个分区之间),同一个分区数据先进先出。
6.Consumer Rebalance:当一个consumer group组中的消费者数量和对应Topic的分区数量一致的时候,此时一个Consumer消费一个Partition的数据; 如果不一致,那么可能出现一个Consumer消费多个Partition的数据或者不消费数据的情况,这个机制是根据Consumer和Partition的数量动态变化的。Consumer通过poll的方式主动从Kafka集群中获取数据。
7.Kafka的Replication指的是Partition的复制,一个Partition的所有分区中只有一个分区是leader节点,其它分区是follower节点。Replication对Kafka的吞吐率有一定的影响,但是极大的增强了可靠性。Follower节点会定时的从leader节点上获取增量数据,一个活跃的follower节点必须满足以下两个条件: (1)所有节点必须维护与zookeeper的连接(通过zk的heartbeat实现) ;(2)follower必须能够及时的将leader上的writing复制过来,不能“落后太多”,由参数{replica.lag.time.max.ms}和{replica.lag.max.messages}决定。
8.MessageDeliverySemantics是消息系统中数据传输的可靠性保证的一个定义,主要分为三种类型: At most once(最多一次):消息可能会丢失,但是不可能重复发送。At least once(最少一次):消息不可能丢失,但是可能重复发送。Exactly once(仅仅一次):消息只发送一次,但不存在消息的丢失。Kafka的Producer通过参数{request.required.acks}来确定Producer和Broker之间是哪种消息传递类型。Ack=0,相当于异步发送,意味着producer不等待broker同步完成,消息发送完毕继续发送下一批消息。提供了最低延迟,但持久性最弱,当broker发生故障时很可能发生数据丢失。如果leader死亡,producer继续发送消息,broker接收不到数据就会造成数据丢失。 Ack=1,producer要等待leader成功收到消息并确认,才发送下一条message。提供较低的延迟性以及较好的持久性。但是如果partition下的leader死亡,而follower尚未复制数据,数据就会丢失。 Ack=-1,leader收到所有消息,且follower同步完数据,才发送下一条数据。延迟性最差,持久性最好(即可靠性最好)。 三种参数设置性能递减,可靠性递增。 同时,Ack默认值为1,此时吞吐量与可靠性折中。实际生产中可以根据实际需求进行调整。
3、常用参数介绍
(1)kafka的server.properties配置文件中参数:
broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=9092 #当前kafka对外提供服务的端口,默认是9092
host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。改成自己centos的ip地址。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后再发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880 #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.7.100:2181,192.168.7.101:2181,192.168.7.107:2181/kafka #设置zookeeper的连接端口,在集群配置时,要把所有机器的ip地址都要写上,这里以三个机器为例。如果是单机部署,只需要写一个ip地址就行了。
注意:在zookeeper.connect的最后加上/kafka是因为kafka需要依赖zookeeper,在kafka启动之后默认会在zookeeper服务所在节点的根目录下创建很多与kafka有关的目录,这样就会导致zookeeper服务所在节点的根目录下的文件很多很乱。另外,如果多个kafka共用一个zookeeper,就会导致zookeeper服务的根目录下各个kafka文件更加混乱。所以在zookeeper.connect的最后加上/kafka是为了在kafka启动时将创建的文件都放到zookeeper节点根目录下的/kafka子目录下。多个kafka共用一个zookeeper时可以分别配置自己的子目录以示区分。
启动zookeeper和kafka之后,会自动在zookeeper节点上创建/kafka目录。
(2)生产者producer.properties配置文件中的参数:
1.bootstrap.servers=host1:port1,host2:port2 // 用于生产者与kafka集群建立连接
2. acks:表示Producer需要Leader确认的模式。
(1)acks = 0: 表示Producer请求立即返回,不需要等待Leader的任何确认。这种方案有最高的吞吐率,但是不保证消息是否真的发送成功。
(2)acks = -1: 表示分区Leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为Producer请求成功。这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的。
(3)acks = 1: 表示Leader副本必须应答此Producer请求并写入消息到本地日志,之后Producer请求被认为成功。如果此时Leader副本应答请求之后挂掉了,消息会丢失。这个方案,提供了不错的持久性保证和吞吐。
3. compression.type=none // 压缩类型,目前支持none(不压缩), gzip, snappy, lz4, zstd
4. partitioner.class= kafka.producer.DefaultPartitioner // 分区的策略,默认是取模
5. request.timeout.ms=10000 // 消息发送的最长等待时间
6. linger.ms=0 //这个值是为了延迟发送来收集更多的消息一批发送,Producer是按照batch进行发送的,但是还要看linger.ms的值,默认是0,表示不延迟。为了减少网络IO,提升整体的性能,建议设置5-100ms。
7. batch.size=16384 // Producer按照batch进行发送,通过这个参数来设置批量提交的数据大小,默认是16KB,当积压的消息达到这个值的时候就会统一发送(发往同一分区的消息)。
8. buffer.memory=33554432 //该参数用于指定Producer端用于缓存消息的缓冲区大小,单位为字节,默认值为:33554432即32MB。发送的消息会先进入到本地缓冲区(32MB),生产者会跑一个线程,该线程去缓冲区中取16KB的数据,发送到kafka,如果到10毫秒数据没取满16KB,也会发送一次。异步的时候假如设置了缓存消息数量为200,但是一直没有200条数据,那么不可能一直等下去,就会取16KB大小的数据,直接发,不够16KB也会发。
(3)消费者consumer.properties配置文件中的参数:
1.bootstrap.servers= host1:port1,host2:port2 ... // 用于消费者与kafka集群建立连接
2. group.id=test-consumer-group // 标记消费者所属的消费者组
3. key.deserializer和value.deserializer:指定接收消息的key和value的反序列化类型。一定要写全类名。
4. enable.auto.commit:默认值为true,消费者会自动周期性地向服务器提交偏移量。
5. auto.commit.interval.ms:如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。
6. auto.offset.reset:当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),读取数据偏移量的处理方式:
(1)earliest:自动重置偏移量到最早的偏移量。
(2)latest:默认,自动重置偏移量为最新的偏移量。
(3)none:如果消费组原来的偏移量不存在,则向消费者抛异常。
7. max.poll.records:一次poll拉取数据返回消息的最大条数,默认500条。
8.offsets.topic.num.partitions:__consumer_offsets的分区数,默认是50个分区。
9.heartbeat.interval.ms:Kafka消费者和coordinator之间的心跳时间,默认3s。该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的1/3。
10.session.timeout.ms:Kafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。
11.max.poll.interval.ms:消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。
12.fetch.min.bytes:默认1个字节。消费者获取服务器端一批消息最小的字节数。
13.fetch.max.wait.ms:默认500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。
14.fetch.max.bytes:默认值: 52428800字节,即50MB。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。