1. Kafka定义
传统定义:分布式的、基于发布/订阅模式的消息队列,主要用于大数据实时处理领域。发布/订阅模式中,发布者不会直接将消息发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接受感兴趣的消息。
官网最新定义:开源的分布式事件流平台(Event Streaming Platform),用于高性能数据管道、流分析、数据集成。
2. 消息队列的应用场景及模式
传统的消息队列的主要应用场景包括:缓冲/消峰、解耦、异步通信。
缓冲/消峰:有助于控制和优化数据流的速度,解决生产消息和消费消息的速度不一致的问题。
解耦:允许独立修改和扩展两边的处理过程,只需确保他们遵守相同的接口约束。
此时消息队列类似于一个超时,数据源是商品生产厂商,目的地是消费者,消费者无需跟各大厂商来往,而是去超市购物。
异步通信:允许用户把一个消息放入队列,不立即处理,再在需要的时候去处理。(比如发送验证码)
消息队列的两种模式:
1)点对点:消费者主动拉取数据,消息收到后清除消息
2)发布/订阅模式:有多个Topic主题;消费者消费完数据后,不删除数据,数据仍可以被其他消费者消费;每个消费者相互独立
3. Kafka基础架构
1)一个topic可以有多个分区,broker为服务器,即一份数据分为多个分区放在多个服务器
2)数据分为多块,消费者也有多个,组成一个消费者组,组内每个成员并行消费不同的分区
3)分区也有副本,不过分HDFS的副本有区别,HDFS的副本是相等的,而Kafka里的副本只有Leader的才能起作用,Follower的副本不能消费(除非Leader挂了,Follower成为Leader)
4)ZK里保存了Kafka的服务器id信息,以及每个topic的各个分区的Leader是哪个服务器,以及isr队列
4. Kafka命令行操作快速入门
针对Kafka基础架构的三大部分,分别有不同的脚本命令来操作。
生产者:kafka-console-producer.sh;
集群:kafka-topics.sh;
消费者:kafka-console-consumer.sh
1)kafka-topics.sh的命令参数如下
创建topic,1个分区,3个副本,并查看:
bin/kafka-topics.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first --create --partitions 1 --replication-factor 3bin/kafka-topics.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --listbin/kafka-topics.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first --describe
结果如下:
修改分区数(分区数只能改大,不能改小)为3
bin/kafka-topics.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first --partitions 3bin/kafka-topics.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first --describe
结果如下:
另外副本数也不能通过命令行修改
2) kafka-console-producer.sh
向指定分区发送数据
bin/kafka-console-producer.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first
3) kafka-console-consumer.sh
消费者消费指定分区的数据
bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first
再在生产端发送数据,消费者端可以收到数据,但不能收到历史数据(即生产者在消费者起来之前发送的数据),要想消费历史数据,加上参数:--from-beginning
bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first --from-beginning
5. 生产者异步发送与同步发送
添加依赖:
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version>
</dependency>
创建Kafka生产者对象:
Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRP_SERVERS_CONFIG, "hadoop102:9092");// 指定key和value的序列化类型
// StringSerializer.class.getName()相当于StringSerializer的全路径名称
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
发送数据:
// new ProducerRecord第一个参数是topic,第二个参数是值(key为null)
kafkaProducer.send(new ProducerRecord<>("first", "value"));
ProducerRecord的多个构造函数:
关闭资源:
kafkaProducer.close();
发送数据也可以带回调函数,返回主题、分区等信息:
kafkaProducer.send(new ProducerRecord<>("first", "value"), new CallBack() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("topic: " + metadata.topic() + ", partition: " + metadata.partition());}}
});
同步发送只需在send方法之后加上get方法:
// new ProducerRecord第一个参数是topic,第二个参数是值(key为null)
kafkaProducer.send(new ProducerRecord<>("first", "value")).get();
6. 生产者分区策略
生产者的默认分区器:DefaultPatitioner,即如果指定分区,就发送到指定分区;如果没指定分区,指定了key,则将key的哈希值对分区数取模得到分区;如果也没指定key,选择粘性分区(sticky partition),即随机选取一个分区,本批次数据满了或者linger.ms时间到了,再次选择另一个分区。
自定义分区,主要是实现Partitioner接口,重写其中的partition方法:
@Overrride
public int partition(String topic, Object key, byte[] keybytes, Object value, byte[] valuebytes, Cluster cluster) {String valueStr = value.toString();if (valueStr.contains("xxx")) {return 0;} else if (valueStr.contains("zzz")) {return 1;} else {return 2;}
}
配置关联自定义分区器:
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
7. 如何提高吞吐量
batch.size:每批次数据大小,数据量达到这个值,就开始发送,默认为16K
linger.ms:等待时间,如果到了这个时间,无论数据量多大,立即发送,默认为0
如果linger.ms设置为0,意味着一旦有数据来就立马发送,这样效率并不高,所以适当提高linger.ms有利于提高吞吐量,但是不能太大,这样会造成较大的数据延迟。
也可以发送数据的过程中采用数据压缩(snappy)的方式,来提高实际发送的数据量。
还可以修改缓冲区大小RecordAccumulator
设置缓冲区大小:
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 32M
设置批次大小:
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 16K
设置linger.ms:
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
设置压缩格式:
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
8. 数据可靠性
生产者发送给Kakfa集群,会收到如下几种应答:
1)ack = 0:不需要等待数据落盘,可靠性最差,存在丢数风险,一般不会用这种模式
2)ack = 1:需要Leader收到数据并进行落盘,也有丢数风险,比如Leader刚应答完就挂了,还没来得及同步数据给Follower
3)ack = -1/all,需要Leader和isr队列中所有节点收到数据并进行落盘,可靠性最好,但是数据可能会重复。
所谓ISR队列,就是和Leader保持同步的Leader+Follower的集合,例如:leader:0; isr: 0,1。如果某个Follower长时间未与Leader通信,该Follower就会被提出isr队列,这样就不会出现Leader长期等待某个故障Follower节点的问题。
ack = -1,如果分区副本数为1,或者isr队列里只有一个节点,则与ack=1效果一样,仍有丢数风险。
数据完全可靠 = (ACK = -1)+ (分区副本数 >= 2) + (ISR队列里节点数 >= 2)
代码配置:
properties.put(ProducerConfig.ACK_CONFIG, -1);
//重试次数,默认为int最大值
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
为解决ack = -1时的数据重复性问题,kafka引入了幂等性和事务的概念。所谓幂等性,就是Producer无论向broker发送多少次重复数据,broker都只会持久化一条。
精确依次(Exactly Once) = 幂等性 + 数据完全可靠
重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,broker只会持久化一条。PID是每次Kafka重启时会分配一个新的,Partition表示分区号,SeqNumber是单调递增的。所以能保证单分区单次会话数据不重复。
开启幂等性,只需将enable.idempodence设为true即可(默认就是true)。
Kafka事务原理:
使用事务发送数据:
properties.put(ProducerConfig.TRANSACTION_ID_CONFIG, "01");KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);kafkaProducer.initTransactions();
kafkaProducer.beginTransactions();
try {kafkaProducer.send(new ProducerRecord<>("first", "value"));kafkaProducer.commitTransactions();
} catch (Exception e) {kafkaProducer.abortTransactions();
} finnaly {kafkaProducer.close();
}
9. 数据乱序
Kafka生产者发送数据给broker,每个broker默认缓存5个请求,如果其中一个请求发送失败,不影响后面请求发送,加入失败的请求后来又重试成功了,那么broker收到的数据会是乱序的。只需将max.in.flight.requests.per.connection设置小于等于5,broker就会自动排序。