1、Topic的分片和副本机制
分片作用:
解决单台节点容量有限的问题,节点多,效率提升,吞吐量提升。通过分片,将一个大的容器分解为多个小的容器,分布在不同的节点上,从而实现分布式存储。
分片的数量没有限制,与节点数量没有关系,分片数量不会超过总节点数量的三倍。
副本作用:
提升数据的可靠性,副本越多数据越可靠,但是数据冗余越高。
副本数量有限制,最多和节点的数量相等,但是一般构建1~3个之间。
2、Kafka如何保证数据不丢失
数据传输的三个阶段:
生产者生产数据到broker
broker存储数据
消费者从broker上消费数据
<1> 生产端如何保证数据不丢失:
生产数据到broker之后的响应机制
当生产者生产数据到Broker后,Broker应该给于确认响应(ack)。
ack 确认机制,主要有三种方案,分别为0 1 -1(ALL)
0:生产者只管将数据生产到Borker ,不等待Broker返回的ack 信息
1:生产者将数据生产到Broker,需要等待Broker端Topic的对应分片上的主副本接收到消息后,即为成功发送消息。
-1: 生产者将数据生产到broker,需要等待broker端Topic的对应分片所有副本都接收到消息,即为成功发送
生产中一般根据消息重要情况以及生成和消费速率来选择相应的级别。一般来说,重要程度越高的,安全级别越高,速率越高,优先保证安全性,在此基础上,保持平衡。
相关问题思考:
1-生产者发送一条数据到Broker,Broker给于一次响应,如果Broker迟迟不予响应,怎么办?
先等待,然后重试,最后报错,先等待一段时间,当超时后,然后触发充重试策略,进行重试操作,当重试后依然没有响应,最后程序报错,停止发送。
2-生产者发送一条数据,Broker就要给予一次响应,那么这样是否会占用更多的带宽,如果占用,如何解决?
肯定会影响,可以引入缓存池,生产者在生产数据的时候,底层先将其放置到一个缓存池,当池子中消息数据达到一批数据大小后,会专门有一个子线程,触发执行,将数据生产到Broker,此时Broker只需要对这一批数据给予一次响应即可,异步发送。
3- 如果采用一批一批的发送,如果Broker又没有给予响应,但是,此时缓存池中数据已经满了,如何解决?
可以选择直接清空缓存池或者不清空,如果数据可以重复读,直接报错清空即可,后续重新读取数据即可,如果数据不可重复读,可以提前设置处理方案,将每一个消息提前先找一个容器进行备份存储,自己维护数据,当发送成功,删除一部分数据,如果出错,重启后,先从这个容器将剩余的数据发送即可,当然如果选择不清空,那么一直等待即可。
相关的一些参数设置:
buffer.memory 设置缓存池大小,默认值33554432(32M)
retries 重试的次数,默认值2147483647,最终的重试策略取决于超时设置
batch.size 表示一批数据大小,默认值16384(16KB)
delivery.timeout.ms 总超时时间,默认值120000(120S)
requesst.timeout.ms 每一次请求后的超时时间(等待时间),默认值 30000(30s )
<2>Broker 如何保证数据不丢失
Broker可以将每个分片的副本数量设置为多个,提供数据的可靠性同时还需要生产端将ACK设置为-1
<3>消费端如何保证数据不丢失
消费者连接kafka集群,kafka收到请求后,首先会根据group_id 查询上一次消费到了哪个消息偏移量,如果没有找到,默认从当前的位置开始消费数据,之前的消息默认不处理,如果找到了,就从记录的消息偏移量位置继续消费数据即可
消费者消费完数据后,会把对应的消息的偏移量信息重新提交给Broker记录
在提交偏移量的时候有两种提交方式:自动提交偏移量,手动提交偏移量
配置自动提交:
consumer = KafkaConsumer("test",bootstrap_servers=['localhost:9092'],group_id='g_2',enable_auto_commit=True,auto_commit_interval_ms=1000)
手动提交:
consumer = KafkaConsumer("test",bootstrap_servers=['localhost:9092'],group_id='g_2',enable_auto_commit=False,auto_commit_interval_ms=1000)consumer.commit() #同步提交
consumer.commit_async() #异步提交
3、Kafka中生产者的数据分发策略
分发策略:生产者生产数据到Broker的某一Topic,这个数据最终落入到那个分片的副本,即是分发策略
1、Hash策略 -- 支持
def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
如果发送数据时指定了topic ,value , key, 即是采用hash 策略
相同key 的hash是一样的,会分发到同一个分区
2、随机分发策略---python客户端支持,Java客户端不支持
发送数据时,如果只传递了Topic和value,即是随机分发
3、轮询策略 -- 2.4版本以上修改为粘性策略,2.4版本以下支持,但是这两种方式Java客户端支持,python不支持
4、指定分区的策略 -- 支持
当发送数据时,如果指定了partition参数,即是采用指定分区策略,分区的编号从0 开始
5、自定义分区策略 -- 支持
from kafka.partitioner import DefaultPartitioner
参考Kafka的默认分发策略方法DefaultPartitioner
def __call__(cls, key, all_partitions, available):"""Get the partition corresponding to key:param key: partitioning key:param all_partitions: list of all partitions sorted by partition ID:param available: list of available partitions in no particular order:return: one of the values from all_partitions or available"""if key is None:if available:return random.choice(available)return random.choice(all_partitions)idx = murmur2(key)idx &= 0x7fffffffidx %= len(all_partitions)return all_partitions[idx]
自定义实现:
class MyPartitioner(object):def __call__(self, key, all_partitions, available):# 实现分发策略return all_partitions[i]kafkaProducer = KafkaProducer(bootstrap_servers=[],ack=-1,partitioner=MyPartitioner()
)
4、kakfa的存储和查询机制
1、存储:
数据存储在磁盘中,位置取决于配置log.dirs参数
在对应目录下,以topic名称+分片号创建目录,目录下有2个主要文件log文件和index文件
log文件:存储消息数据,前面的数字代表消息从哪个偏移量开始存储
index文件:存索引数据,用于加速查询
默认情况下,当log文件达到1G时,会拆分数据,滚动形成一个新的log文件,index文件也会随之产生
问题:为什么不放在同一个文件?
一个文件过大,打开和关闭可能很耗费资源,从一个log中检索数据相对也很慢,影响效率。
Kafka本质上是一个消息队列的中间件,仅仅负责消息的临时存储,当消息超过一定的时间,Kafka就会执行删除数据操作,默认168小时
如果保存到一个文件,删除数据就会挺慢。当一个log永远不会达到1G时,就永远不会被删除。
2、查询
首先确定要读取的offset在那个segment 片段中
查询这个片段的index文件,根据offset确定这个消息在log文件中的什么位置
读取log文件,检索对应位置下的内容即可(底层是基于磁盘顺序查询)
获取最终的消息数据
5、kafka的消费者的负载均衡机制
假设生产者速率400条/分钟, 消费者速率400条/分钟
随着发展,生产者速率达到1200条/分钟,但是消费者还是400条/分钟,会造成什么问题?
会造成大量的数据在Broker积压,影响消息处理的时效性问题。增加消费者,必须保证都在同一个消费者组内
随着发展,生产者速率达到1600条/分钟,但是消费者还是400条/分钟,会造成什么问题?
会造成大量的数据在Broker积压,影响消息处理的时效性问题。此时再添加一个消费者,但是总有一个消费者无法消费数据,这是因为Kafka的消费者的负载均衡机制,只能增加topic的分片数量。
消费者的负载均衡机制:
1、在一个消费者组内,消费者的数量最多和所监听的topic 分片数量是相等的,如果有多余的消费者,那么会出现某些消费者处于限制的状态
2、在一个消费者组内,topic的一个分片的数据,只能被一个消费者所消费,不允许出现一个分片被组内的多个消费者消费的情况,但一个消费者可以消费多个分区的数据
所以点对点和发布订阅的实现
点对点:把监听这个topic的消费者全部放到同一个消费者组内,这样消息必然只能有一个消费者消费
发布订阅:把监听这个topic的消费者放置在不同的消费者组内,这样消息就可以被多个消费者消费了