目录
- 3.1 生产者消息发送流程
- 3.1.1 发送原理
- 3.2 异步发送 API
- 3.3 同步发送数据
- 3.4 生产者分区
- 3.4.1 kafka分区的好处
- 3.4.2 生产者发送消息的分区策略
- 3.4.3 自定义分区器
- 3.5 生产者如何提高吞吐量
- 3.6 数据可靠性
3.1 生产者消息发送流程
3.1.1 发送原理
3.2 异步发送 API
3.3 同步发送数据
3.4 生产者分区
3.4.1 kafka分区的好处
便于合理使用存储资源
,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果- 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。
3.4.2 生产者发送消息的分区策略
3.4.3 自定义分区器
1、需求:
例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,不包含 atguigu,就发往 1 号分区
2、定义类实现 Partitioner 接口,重写 partition()方法。
public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取数据 atguigu helloString msgValues = value.toString();int partition;if (msgValues.contains("atguigu")){partition = 0;}else {partition = 1;}return partition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}
3、使用分区器的方法,在生产者的配置中添加分区器参数
public class CustomProducerCallbackPartitions {public static void main(String[] args) throws InterruptedException {// 0 配置Properties properties = new Properties();// 连接集群 bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.11:9092");// 指定对应的key和value的序列化类型 key.serializer
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 关联自定义分区器properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atguigu.kafka.producer.MyPartitioner");// 1 创建kafka生产者对象// "" helloKafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 2 发送数据for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("主题: " + metadata.topic() + " 分区: " + metadata.partition());}}});Thread.sleep(2);}// 3 关闭资源kafkaProducer.close();}
}
3.5 生产者如何提高吞吐量
- 分批次发送消息
- 对生产者消息采用压缩
四个重要参数:
public class CustomProducerParameters {public static void main(String[] args) {// 0 配置Properties properties = new Properties();// 连接kafka集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.11:9092");// 序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 缓冲区大小properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);// 批次大小properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);// linger.msproperties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 压缩properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");// 1 创建生产者KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 2 发送数据for (int i = 0; i < 50; i++) {kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));}// 3 关闭资源kafkaProducer.close();}
}