【Kafka】Kafka Producer 分区-05
- 1. 分区的好处
- 2. 分区策略
- 2.1 默认的分区器 DefaultPartitioner
- 3. 自定义分区器
1. 分区的好处
(1)便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
(2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。
see 2.4.1http://t.csdnimg.cn/m1O9u
2. 分区策略
2.1 默认的分区器 DefaultPartitioner
/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a
partition based on a hash of the key
* <li>If no partition or key is present choose the sticky
partition that changes when the batch is full.
*
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {… …
}
策略如下:
- 如果记录中指定了分区,使用指定的分区。
这意味着生产者在发送消息时明确指定了一个分区号,Kafka将直接使用这个分区。 - 如果没有指定分区但存在键,根据键的哈希值选择分区。
如果消息中没有明确指定分区,但是提供了一个键,Kafka会使用键的哈希值来计算应该使用哪个分区。这可以保证具有相同键的消息被发送到相同的分区,从而保证消息的有序性。 - 如果既没有指定分区也没有键,选择一个“sticky partition”(粘性分区),在批次满时更换分区。
如果消息既没有指定分区,也没有键,Kafka将使用“sticky partition”策略。这种策略会在消息批次满时更换分区,以便于提高效率和性能。
这个注释是对DefaultPartitioner类的说明,该类实现了Partitioner接口,用于定义消息分区的策略。DefaultPartitioner类的主要功能就是根据上述规则决定消息发送到哪个分区。以下是DefaultPartitioner类的示例实现结构:
public class DefaultPartitioner implements Partitioner {// 初始化方法@Overridepublic void configure(Map<String, ?> configs) {// 配置代码}// 计算分区的方法@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 分区计算逻辑}// 清理资源的方法@Overridepublic void close() {// 资源清理代码}
}
该实现中包括了三个主要的方法:
- configure(Map<String, ?> configs):用于初始化和配置分区器。
- partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster):用于根据给定的主题、键和值来计算应该使用哪个分区。
- close():用于在分区器关闭时清理资源。
案例一: 将数据发往指定 partition 的情况下,例如,将所有数据发往分区 1 中。
import org.apache.kafka.clients.producer.*;import java.util.Properties;
public class CustomProducerCallbackPartitions {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102
:9092 ");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());KafkaProducer<String, String> kafkaProducer = newKafkaProducer<>(properties);for (int i = 0; i < 5; i++) {// 指定数据发送到 1 号分区,key 为空(IDEA 中 ctrl + p 查看参数)kafkaProducer.send(new ProducerRecord<>("first",1, "", "atguigu " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata,Exception e) {if (e == null) {System.out.println(" 主题: " +metadata.topic() + "->" + "分区:" + metadata.partition());} else {e.printStackTrace();}}});}kafkaProducer.close();}
}
案例二: 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值。
public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());KafkaProducer<String, String> kafkaProducer = newKafkaProducer<>(properties);for (int i = 0; i < 5; i++) {// 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,分别发往 1、2、0kafkaProducer.send(new ProducerRecord<>("first","a", "atguigu " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata,Exception e) {if (e == null) {System.out.println(" 主题: " +metadata.topic() + "->" + "分区:" + metadata.partition());} else {e.printStackTrace();}}});}kafkaProducer.close();}
}
3. 自定义分区器
如果研发人员可以根据企业需求,自己重新实现分区器
-
需求
例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,不包含 atguigu,就发往 1 号分区。 -
实现步骤
定义类实现 Partitioner 接口
重写 partition()方法
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;/*** 1. 实现接口 Partitioner* 2. 实现 3 个方法:partition,close,configure* 3. 编写 partition 方法,返回分区号*/
public class MyPartitioner implements Partitioner {/*** 返回信息对应的分区** @param topic 主题* @param key 消息的 key* @param keyBytes 消息的 key 序列化后的字节数组* @param value 消息的 value* @param valueBytes 消息的 value 序列化后的字节数组* @param cluster 集群元数据可以查看分区信息* @return*/@Overridepublic int partition(String topic, Object key, byte[]keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取消息String msgValue = value.toString();// 创建 partitionint partition;// 判断消息是否包含 atguiguif (msgValue.contains("atguigu")) {partition = 0;} else {partition = 1;}// 返回分区号return partition;}// 关闭资源@Overridepublic void close() {}// 配置方法@Overridepublic void configure(Map<String, ?> configs) {}
}
使用分区器的方法,在生产者的配置中添加分区器参数。
import org.apache.kafka.clients.producer.*;import java.util.Properties;public class CustomProducerCallbackPartitions {public static void main(String[] args) throws InterruptedException {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 添加自定义分区器properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atguigu.kafka.producer.MyPartitioner");KafkaProducer < String, String > kafkaProducer = newKafkaProducer<>(properties);for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata,Exception e) {if (e == null) {System.out.println(" 主题: " +metadata.topic() + "->" + "分区:" + metadata.partition());} else {e.printStackTrace();}}});}kafkaProducer.close();}
}
测试
①在 hadoop102 上开启 Kafka 消费者
[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
②在 IDEA 控制台观察回调信息
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0