Kafka概述
传统定义:一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。
最新定义:一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。最主要的功能是做数据的缓冲,相较于flume的channel, 能力更强。
应用场景:
- 缓冲/消峰:解决生产消息和消费消息的处理速度不一致的情况。
- 解耦:只需知道如何连接kafka,作用类似于交换机。
- 异步通信:可以将事务给kafka,自己去处理其他事务。
消息队列的两种模式:
- 点对点模式:消费者拉取数据后删除数据。优点是简单速度快,缺点是不方便实现多用户需要获取同一数据的情况。
- 发布/订阅模式:可以有多个topic主题,消费者拉取数据后不删除数据。
Kafka的基础架构
- 为方便扩展,并提高吞吐量,一个topic分为多个partition
- 配合分区的设计,提出消费者组的概念,组内内个消费者并行消费,以线程为单位。
- 为提高可用性,为每个partiton增加若干副本,类似NameNode HA
- 借助zookeeper来实现leader和follower的选举机制,leader是原数据,follower是副本数据。leader主要用于发送和传输,follower主要作为副本保证安全性。
Kafka的安装部署
官网下载地址:http://kafka.apache.org/downloads.html
- 上传安装包
- 解压安装包
- 修改配置文件
- 配置环境变量
- 编写群启群关脚本kf.sh
#! /bin/bashcase $1 in
"start"){for i in hadoop102 hadoop103 hadoop104doecho " --------启动 $i Kafka-------"ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"done
};;
"stop"){for i in hadoop102 hadoop103 hadoop104doecho " --------停止 $i Kafka-------"ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "done
};;
esac
kafka主题相关操作
kafka-topics.sh脚本里面定义了对应相关操作。
- 增加主题,
kafka-topics.sh --bootstrap-server hadoop102:9092 -- create --topic second --replication-facotr 1 --partitions 1
- 删除,是标记删除,预计在1分钟后完全删除。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first
- 修改,只能增加分区数量。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
- 查看,
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
生产和消费
- 启动生产者:
kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
- 启动消费者:
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
- 如果单独启动生产者,发送数据,之后再启动消费者,默认不发送之前发送的数据。在消费者启动命令后面添加
--from-beginning
关键字可以修改为从头拿取数据。
发送流程
- Kafka Producer生产者
- main线程Producer的生产方法
- interceptors拦截器
- Serializer序列化器
- Partitioner分区器,按照批次随机分区。每个批次默认是16K,默认的等待时间是0ms。
- RecordAccumulator里面创建双端队列,队列个数等于分区个数
- sender进程复制双端队列中的数据发送到Kafka集群,如果成功接收则返回ack应答,否则重新发送,最多重试21亿次。
异步发送和同步发送
sender进程发送请求默认是异步执行,即向kafka集群发送时不管是否收到回复,一直发送,由selector来接收ack和关闭对应的请求进程。在Producer的send方法中有一个Callback对象参数,该对象需要实现一个onCompletetion方法。可以在里面查看到对应方法参数中的元数据的值,里面有主题名称、分区号和偏移量。异步执行时可以发现同一批次分区号是一样的,同步时由于需要等待ack,同一批次的分区号是不同的。
生产者分区
- 分区策略
- 默认分区器
- 如果指定了分区号,到指定分区
- 如果是key-value,使用key进行hash分区
- 粘性分区,如果上一个有分区,跟上一个分区一样,直到数据达到分区容量上限或者等待时间上限进行随机更换分区。
- UniformStickyPartition分区器:如果key值是固定的,可以使用该分区器
- 轮询分区器:需要维护一个列表,效率更低。
- 默认分区器
生产者如何提高吞吐量
- 修改从双端队列拿取数据的等待时间,从0ms修改为5-100ms
- compression.type: 压缩snappy
- 修改批次大小:默认为16K,修改为32KB.
数据可靠性
ack应答级别:
- 0:生产者发送过来的数据,不需要等数据落盘应答,也就是最多一次。
- 1:生产者发送过来的数据,Leader收到后应答
- -1(all): 生产者发送过来的数据,Leader和isr队列里面的所有节点收齐数据并落盘后应答。Leader维护了一个动态的in-sync replica set ISR, 如果有某个节点30s内没有回复,则认为该节点死亡。数据有可能
重复
。
数据的去重
幂等性
指producer不论向Broker发送多少次重复数据,Broker都只会持久化一条,保证精准一次。重复数据的判断标准,根据sqlNumber来判断,重发的数据其seqNumber是一样的。
缺点
:如果生产者中途宕机,然后重新建立会话时,不能保证不同会话时PID是一样,这时候重新发送重复数据时无法保证幂等性。
解决方案
:在Kafka集群中将生产者的信息保存到集群中的某个主题中,如果生产者宕机后重启需要先去读取Kafka集群的状态信息,以保证多会话情况下的幂等性。
数据的有序性
- 因为不能保证多分区之间是有序的,只能指定单分区。
- 开启幂等性,且元数据request个数小于5个,如果发送失败导致顺序异常,Kafka会按照SeqNumber重新排序。
Flume和Kafka
为何Kafka全方面碾压flume,还会有人使用flume?
这是由于flume使用上只需配置一个文件即可使用,无需编写代码。并且可以使用flume将数据灌入到kafka中,既简单又利用到了kafka的性能,flume和kafka结合使用才是日常开发的常用操作。
Kafka Broker总体工作流程
- broker在zk中注册
- controller谁先注册,谁说了算
- 由选举出来的Controller监听brokers节点变化
- Controller决定Leader选举:在isr存活为前提,轮询选举
- Controller将节点信息上传到zk
- 其他controller从zk同步相关信息
Broker节点的服役和退役
- 启动新主机的zookeeper和kafka
- 创建一个要均衡的主题vim topics-to-move.json
{"topics": [{"topic": "first"}],"version": 1
}
- 生成一个负载均衡的计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
- 保存生成的计划到文件中
- 执行负载均衡计划
- 在kafka/datas目录下查看是否正确。
kafka副本
为了提高数据可靠性,副本数量一般设置为两个。
Follower故障处理
LEO(log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1
HW (High Watermark): 所有副本中最小的LEO
高效读取
1. 多分区
2. 稀疏索引
3. 顺序写磁盘
4. 页缓存和零拷贝
页缓存:其实就是把尽可能多的空闲内存当做磁盘缓存来使用。
零拷贝:数据加工处理操作交给生产者和消费者