Kafka生产者相关-CSDN博客
消费者消费数据基本流程
package com.hrui;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;/*** @author hrui* @date 2025/2/26 17:35*/
public class KafkaConsumerTest {public static void main(String[] args) {Map<String,Object> consumerConfig=new HashMap<>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//消费者反序列化consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//消费者组consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test-1");//创建消费者对象KafkaConsumer<String,String> consumer=new KafkaConsumer<>(consumerConfig);//订阅主题consumer.subscribe(Collections.singletonList("test"));//从Kafka主题中获取数据while (true){ConsumerRecords<String, String> poll = consumer.poll(100);for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}}//关闭消费者对象 因上面在无线循环//consumer.close();}
}
消费数据偏移量问题
Kafka中的 偏移量(offset)是用于标识每个消费者在某个分区内消费到的位置。每个分区的消息都有一个唯一的偏移量,消费者会根据这个偏移量来读取消息。
Kafka偏移量的管理
Kafka默认提供两种方式来管理偏移量:
- 自动提交偏移量(默认方式)
- 手动提交偏移量(需要显式配置)
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); //默认设置
consumerConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000"); //默认设置 每5秒提交一次
也就是说默认情况下 消费者如果后启动 无法读取到生产者已经发送的消息
偏移量的重置
如果需要重新消费数据,可以通过 auto.offset.reset
配置项来控制消费者的偏移量重置行为。这个配置项有几个常用的值:
earliest
:如果没有找到偏移量(比如第一次消费),消费者会从最早的消息开始消费。latest
:如果没有找到偏移量,消费者会从最新的消息开始消费。none
:如果没有找到偏移量,消费者会抛出异常。
例如,设置为从最早的消息开始消费:
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
如果想从指定偏移量获取消息
解决消费者重复消费的问题(不能完全解决)
以上示例,偏移量默认都是5秒一次提交
例如先启动消费者
然后生产者发送了10000条数据 不好演示的话可以在生产者那边每发送一条数据然后
Thread.sleep 1秒
如果消费者在消费到一定程度之后 突然停止 观察再次启动消费者 存在消费者重复消费的情况
原因就是消费者偏移量默认5秒提交一次的原因
那么可以将消费者默认5秒提交偏移量缩短为1秒 但是这样不能完全解决问题
示例
消费者
代码
package com.hrui;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.*;/*** @author hrui* @date 2025/2/26 17:35*/
public class KafkaConsumerTest {public static void main(String[] args) {Map<String,Object> consumerConfig=new HashMap<>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//消费者反序列化consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//消费者组consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test-3");//设置从最早的消息读取//consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//创建消费者对象KafkaConsumer<String,String> consumer=new KafkaConsumer<>(consumerConfig);//订阅主题consumer.subscribe(Collections.singletonList("test"));
// boolean flg = true;
// while (flg) {
// // 拉取数据
// consumer.poll(Duration.ofMillis(100));
// final Set<TopicPartition> assignment = consumer.assignment();
//
// if (assignment != null && !assignment.isEmpty()) {
// // 检查分配的分区
// for (TopicPartition topicPartition : assignment) {
// if ("test".equals(topicPartition.topic())) {
// // 将偏移量设置为2
// consumer.seek(topicPartition, 2);
// // 停止循环
// flg = false;
// }
// }
// }
// }//从Kafka主题中获取数据while (true){//ConsumerRecords<String, String> poll = consumer.poll(100);ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}}//关闭消费者对象 因上面在无线循环//consumer.close();}
}
生产者
代码
package com.hrui;import com.hrui.interceptor.KafkaProducerInterceptorTest;
import com.hrui.interceptor.ValueInterceptor;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/*** @author hrui* @date 2025/2/26 13:36*/
public class KafkaProducerTest {public static void main(String[] args) throws ExecutionException, InterruptedException {//创建配置对象Map<String,Object> configMap=new HashMap<>();//如果是集群随意指定一个configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//对Key Value进行序列化操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());configMap.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptor.class.getName());//可以配置ACKSconfigMap.put(ProducerConfig.ACKS_CONFIG,"-1");//配置幂等性configMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);//配置重试次数configMap.put(ProducerConfig.RETRIES_CONFIG,3);//配置超时configMap.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);//配置事务 事务基于幂等性configMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-tx-id");//创建生产者对象KafkaProducer<String,String> kafkaProducer=new KafkaProducer<>(configMap);//初始化事务kafkaProducer.initTransactions();try {//开启事务kafkaProducer.beginTransaction();for(int i=0;i<10000;i++){//key的作用是通过某种算法,放到topic的某个分区中//可以不设置key 默认是按照轮询的方式ProducerRecord<String, String> record = new ProducerRecord<>("test", "key1","hello kafka" + i);//发送数据 send方法还可以接收一个参数,就是回调函数 kafkaProducer.send(record);是异步的Future<RecordMetadata> send = kafkaProducer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e != null) {// 处理发送失败的情况e.printStackTrace();} else {// 处理发送成功的情况System.out.println("发送成功:" + recordMetadata);}}});//这样变成同步了send.get();Thread.sleep(1000);}//提交事务kafkaProducer.commitTransaction();}catch (Exception e){e.printStackTrace();//中止事务kafkaProducer.abortTransaction();}finally {//关闭生产者对象kafkaProducer.close();}}
}
先启动消费者
然后启动生产者
观察消费者控制台
过会我再次启动消费者
1.缩短自动提交偏移量的时间
因为默认消费者每5秒自动提交提交
可以缩短自动提交偏移量的时间 但这样只能减少重复消费的量 并不能彻底解决重复消费的问题
2.手动提交偏移量
package com.hrui;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.*;/*** @author hrui* @date 2025/2/26 17:35*/
public class KafkaConsumerTest {public static void main(String[] args) {Map<String,Object> consumerConfig=new HashMap<>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//消费者反序列化consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//消费者组consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test-3");//默认自动提交 改成false 变成手动提交偏移量consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//设置从最早的消息读取//consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//创建消费者对象KafkaConsumer<String,String> consumer=new KafkaConsumer<>(consumerConfig);//订阅主题consumer.subscribe(Collections.singletonList("test"));
// boolean flg = true;
// while (flg) {
// // 拉取数据
// consumer.poll(Duration.ofMillis(100));
// final Set<TopicPartition> assignment = consumer.assignment();
//
// if (assignment != null && !assignment.isEmpty()) {
// // 检查分配的分区
// for (TopicPartition topicPartition : assignment) {
// if ("test".equals(topicPartition.topic())) {
// // 将偏移量设置为2
// consumer.seek(topicPartition, 2);
// // 停止循环
// flg = false;
// }
// }
// }
// }//从Kafka主题中获取数据while (true){//ConsumerRecords<String, String> poll = consumer.poll(100);ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}//设置自动提交偏移量之后 这里要手动去保存偏移量//这里有两种方式 同步提交 和异步提交偏移量consumer.commitAsync();//异步consumer.commitSync();//同步}//关闭消费者对象 因上面在无线循环//consumer.close();}
}
说明以上方式都不能彻底解决重复消费问题
重复消费问题还是存在
如果要进行原子绑定 并非做不到,Kafka本身没有提供相关功能
例如把拉取到的数据全部处理完了,才进行事务提交
一旦出现意外,业务数据恢复 但是Kafka本身没有提供相关功能 和与其他支持事务处理的应用结合使用
消费数据-事务隔离级别
生产者事务图
这个报错 写在提交之前即可
消费者组介绍
如果两个应用都是同一个消费者组
生产者A生产消息 消费者B和C在同一个消费者组 那么A的消息如果被B消费过了那么C是消费不到的 B和C默认是竞争关系
如果生产者A生产消息 消费者B和C在不同消费者组 那么消息会被B和C都消费
第一个场景:消费者B和C在同一个消费者组
如果消费者B和消费者C在同一个消费者组内,消息会按照负载均衡的方式分配给它们。这意味着生产者A生产的消息会被消费者B或消费者C中的一个消费,而不是同时被两个消费者消费。所以,如果B已经消费了某条消息,消费者C就无法再消费到这条消息。这种行为是消费者组的基本特性,主要用于确保每条消息只被某个消费者处理一次。
第二个场景:消费者B和C在不同的消费者组
如果消费者B和消费者C在不同的消费者组中,那么生产者A生产的消息会分别被B和C都消费到。因为每个消费者组有自己独立的消费进度(每个组有独立的偏移量),所以每个消费者组都能独立消费该消息。
第三个场景:消费者B和C同时开启从头消费
如果B和C都在同一个消费者组,并且设置了从头消费,那么它们将从消息队列的最开始位置开始消费。这种情况下,B和C是共享消费队列的,它们会根据负载均衡规则交替消费消息,而不是同时消费同一条消息。因此,A的消息仍然不会同时被B和C消费。每条消息仍然只会被消费者组内的某个消费者消费,并且消息的消费是共享的,但并不是同时共享。
总结来说,在同一个消费者组内,消息的消费是竞争式的。即使B和C同时开启从头消费,它们也不会同时消费同一条消息。每条消息只会由其中一个消费者处理。