1、batch.size:批次大小,默认16k
2、linger.ms:等待时间,修改为5-100ms
3、compression.type:压缩snappy
4、 RecordAccumulator:缓冲区大小,修改为64m
测试代码:
package com.bigdata.producter;import org.apache.kafka.clients.producer.*;import java.util.Properties;/*** 测试自定义分区器的使用*/
public class CustomProducer07 {public static void main(String[] args) {// Properties 它是map的一种Properties properties = new Properties();// 设置连接kafka集群的ip和端口properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");/*** 此处是提高效率的代码*/// batch.size:批次大小,默认 16Kproperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 18000);// linger.ms:等待时间,默认 0properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// RecordAccumulator:缓冲区大小,默认 32M:buffer.memoryproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstdproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");// 创建了一个消息生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 调用这个里面的send方法// ctrl + p/*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("first","告诉你个秘密");kafkaProducer.send(producerRecord);*/for (int i = 0; i < 5; i++) {// 发送消息的时候,指定key值,但是没有分区号,会根据 hash(key) % 3 = [0,1,2]ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("first","c","告诉你个找bigdata的好办法:"+i);// 回调-- 调用之前先商量好,回扣多少。kafkaProducer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {// 获取很多信息,exception == null 说明成功,不为null,说明失败if(exception == null){System.out.println("消息发送成功");System.out.println(metadata.partition());// 三个分区,我什么每次都是2分区,粘性分区System.out.println(metadata.offset());// 13 14 15 16 17System.out.println(metadata.topic());}else{System.out.println("消息发送失败,失败原因:"+exception.getMessage());}}});}kafkaProducer.close();}
}
测试:
①在 bigdata01 上开启 Kafka 消费者。
bin/kafka-console-consumer.sh --bootstrap-server hadoop11:9092 --topic first
②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。