# Kafka_深入探秘者(3):kafka 消费者

Kafka_深入探秘者(3):kafka 消费者

一、kafka 消费者、消费组

1、Kafka 消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。假设有一个 T1 主题,该主题有4个分区;同时我们有一个消费组 G1,这个消费组只有一个消费者 C1。那么消费者 C1 将会收到这 4 个分区的消息,如下所示:

消费者1.png在这里插入图片描述

2、Kafka 一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。

对于上面的例子,假如我们新增了一个新的消费组G2,而这个消费组有两个消费者,那么会是这样的:

消费者2.png在这里插入图片描述

二、kafka 消息接收参数设置

1、kafka 消息接收 必要参数设置

  • 1)(生产者 和 消费者的 key , value 保持一致)
  • 2)制定连接 Kafka 集群所需的 broker 地址清单,可以设置一个或者多个的名称,生产者 和 消费者的 bootstrap 保持一致。
  • 3)消费者隶属于的消费组 group.id,默认为空,如果设置为空,则会抛出异常,这个参数要设置成具有一定业务含义。
  • 4)指定 Kafkaconsumer 对应的客户端 client.id,默认为空,如果不设置 Kafkaconsumer 会自动生成一个非空字符串。

2、示例代码:


public static Properties initconfig(){Properties props =new Properties();//1)与 KafkaProducer 中设置保持一致(生产乾消费者保持一致)	props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.stringDeserializer");//2)必填参数,该参数和 KafkaProducer 中的相同,制定连接 Kafka 集群所需的 broker 地址清单,可以设置一个或者多个的名称props.put("bootstrap.servers",brokerList);//3)消费者隶属于的消费组,默认为空,如果设置为空,则会抛出异常,这个参数要设置成具有一定业务含义props.put("group.id",groupId);//4)指定 Kafkaconsumer 对应的客户端 ID,默认为空,如果不设置 Kafkaconsumer 会自动生成一个非空字符串props.put("client.id","consumer.client.id.demo");return props;
}

三、kafka 订阅主题和分区

1、kafka 订阅主题和分区

创建完消费者后我们便可以订阅主题了,只需要通过调用 subscribe() 方法即可,这个方法接收一个主题列表


KafkaConsumer<String, String>consumer = new Kafkaconsumer<>(props);
consumer.subscribe(Arrays.asList(topic));

2、另外,我们也可以使用正则表达式来匹配多个主题,而且订阅之后如果又有匹配的新主题,那么这个消费组会立即对其进行消费。正则表达式在连接 Kafka 与其他系统时非常有用。比如订阅所有的测试主题:


//订阅所有以 heima 开头的主题
consumer.subscribe(Pattern.compile("heima*"));

3、指定订阅的分区


//指定订阅的分区
consumer.assign(Arrays.asList(new TopicPartition("topic",0)));

4、kafka 反序列化


//与 KafkaProducer 中设置保持一致(生产者消费者保持一致)
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

四、kafka 重复消费、消息丢失

1、位移提交

对于 Kafka 中的分区而言,它的每条消息都有唯一的 offset,用来表示消息在分区中的位置。

当我们调用 poll() 时,该方法会返回我们没有消费的消息。当消息从 broker 返回消费者时,broker 并不跟踪这些消息是否被消费者接收到; Kafka 让消费者自身来管理消费的位移,并向消费者提供更新位移的接口,这种更新位移方式称为提交 (commit)。

2、kafka 消息 重复消费

kafka重复消费消息.png

3、kafka 消息丢失

kafka消息丢失.png

五、kafka 同步、异步提交

1、kafka 消息 自动提交

这种方式让消费者来管理位移,应用本身不需要显式操作。当我们将 enable.auto.commit 设置为 true,那么消费者会在 poll 方法调用后每隔 5 秒 (由 auto.commit.interval.ms 指定) 提交一次位移。和很多其他操作一样,自动提交也是由 poll() 方法来驱动的;在调用 poll() 时,消费者判断是否到达提交时间,如果是则提交上一次 poll 返回的最大位移。

需要注意到,这种方式可能会导致消息重复消费。假如,某个消费者 poll 消息后,应用正在处理消息,在 3 秒后 Kafka 进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。

2、kafka 消息 同步提交

在 kafka_learn 工程中,创建 CheckOffsetAndcommit.java 类,进行 同步提交 测试。


/***  kafka_learn\src\main\java\djh\it\kafka\learn\chapter3\CheckOffsetAndcommit.java**  2024-6-22 创建 CheckOffsetAndcommit.java 类 测试同步提交*/
package djh.it.kafka.learn.chapter3;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;public class CheckOffsetAndcommit {//private static final String brokerList = "localhost:9092";private static final String brokerList = "172.18.30.110:9092";private static final String topic = "heima";private static final String groupId = "group.heima";private static AtomicBoolean running = new AtomicBoolean(true);public static Properties initConfig() {Properties properties = new Properties();//1)设置 key 序列化器 -- 优化代码properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//3)设置值序列化器 -- 优化代码properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//4)设置集群地址 -- 优化代码properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 手动提交开启properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);return properties;}public static void main( String[] args ) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);TopicPartition tp = new TopicPartition(topic, 0);consumer.assign(Arrays.asList(tp));long lastConsumedOffset = -1;while (true){ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));if(records.isEmpty()){break;}List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);lastConsumedOffset = partitionRecords.get(partitionRecords.size() -1).offset();consumer.commitSync();  //同步提交消费位移}System.out.println("comsumed offset is " + lastConsumedOffset);OffsetAndMetadata offsetAndMetadata = consumer.committed(tp);System.out.println("commited offset is " + offsetAndMetadata.offset());long positition = consumer.position(tp);System.out.println("the offset of the next record is " + positition);}
}

同步提交.png

3、kafka 消息 异步提交

手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的 API。

但是异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。

举个例子,假如我们发起了一个异步提交 commitA,此时的提交位移为 2000,随后又发起了一个异步提交 commitB 且位移为 3000; commitA 提交失败但 commitB 提交成功,此时 commitA 进行重试并成功的话,会将实际上将已经提交的位移从 3000 回滚到 2000,导致消息重复消费。

六、kafka 指定位移消费

1、kafka 指定位移消费

消息的拉取是根据 poll() 方法中的逻辑来处理的,但是这个方法对于普通开发人员来说就是个黑盒处理,无法精确掌握其消费的起始位置。
seek() 方法正好提供了这个功能,让我们得以追踪以前的消费或者回溯消费,

2、在 kafka_learn 工程中,创建 SeekDemo.java 类,进行 指定位移消费 测试。


/***  D:\java-test\idea2019\kafka_learn\src\main\java\djh\it\kafka\learn\chapter3\SeekDemo.java**  2024-6-22 创建 SeekDemo.java 类,进行 指定位移消费 测试。*/
package djh.it.kafka.learn.chapter3;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.Set;public class SeekDemo extends ConsumerClientConfig{public static void main(String[] args){Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));//timeout参数设置多少合适?太短会使分区分配失败,太长又有可能造成一些不必要的等待consumer.poll(Duration.ofMillis(2000));//获取消费者所分配到的分区Set<TopicPartition> assignment= consumer.assignment();System.out.println(assignment);for(TopicPartition tp : assignment){//参数partition表示分区,offset表示指定从分区的哪个位置开始消费consumer.seek(tp,10);}//consumer.seek(new TopicPartition(topic,0), 10);while(true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));//consume the record.for(ConsumerRecord<String, String> record :records){System.out.println(record.offset()+ ":" + record.value());}}}
}

3、在 kafka_learn 工程中,创建 公共类 KafkaContext.java


/***  D:\java-test\idea2019\kafka_learn\src\main\java\djh\it\kafka\learn\chapter3\KafkaContext.java**  2024-6-22 创建公共类 KafkaContext.java*/
package djh.it.kafka.learn.chapter3;public class KafkaContext {// 172.18.30.110:9092 填写你自己的 虚拟机 IP 地址和端口号public static String brokerList = "172.18.30.110:9092";public static String topic = "heima";public static String groupId = "group.heima";
}

4、在 kafka_learn 工程中,创建 公共类 ConsumerClientConfig.java


/***  kafka_learn\src\main\java\djh\it\kafka\learn\chapter3\ConsumerClientConfig.java**  2024-6-22 创建公共类 ConsumerClientConfig.java*/
package djh.it.kafka.learn.chapter3;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;public class ConsumerClientConfig extends KafkaContext{public static Properties initConfig(){Properties props = new Properties();//1)设置 key 序列化器 -- 优化代码//properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//2)设置值序列化器 -- 优化代码//properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//3)设置集群地址 -- 优化代码//properties.put("bootstrap.servers", brokerList);props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);//4)消费组//properties.put("group.id", groupId);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//kafka 消费者找不到消费的位移时,从什么位置开始消费,默认:latest :末尾开始消费 earliest : 从头开始//props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");//是否启用自动位移提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);return props;}
}

5、在 kafka_learn 工程中,运行 SeekDemo.java 类,进行 指定位移消费 测试

指定位移消费测试.png

七、kafka 再均衡

1、再均衡是指分区的所属从一个消费者转移到另外一个消费者的行为,它为消费组具备了高可用性和伸缩性提供了保障,使得我们既方便又安全地删除消费组内的消费者或者往消费组内添加消费者。不过再均衡发生期间,消费者是无法拉取消息的。

2、在 kafka_learn 工程中,创建 再均衡监听器 类 CommitSyncInRebalance.java


/***  D:\java-test\idea2019\kafka_learn\src\main\java\djh\it\kafka\learn\chapter3\CommitSyncInRebalance.java**  2024-6-22 创建 再均衡监听器 类 CommitSyncInRebalance.java*/
package djh.it.kafka.learn.chapter3;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;public class CommitSyncInRebalance extends ConsumerClientConfig {public static final AtomicBoolean isRunning = new AtomicBoolean(true);public static void main( String[] args ) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);Map<TopicPartition, OffsetAndMetadata> currentoffsets = new HashMap<>();consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener(){@Overridepublic void onPartitionsRevoked( Collection<TopicPartition> partitions){//尽量避免重复消费consumer.commitSync(currentoffsets);}@Overridepublic void onPartitionsAssigned( Collection<TopicPartition> partitions){//do nothing.}});try{while (isRunning.get()) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.offset() + ":" + record.value());//异步提交消费位移,在发生再均衡动作之前可以通过再均衡临听器的 onPartitionsRevoked 回调执行 commitsvnc 方法同步提交位移。currentoffsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1));}//异步提交consumer.commitAsync(currentoffsets, null);}} finally {consumer.close();}}
}

八、kafka 消费者拦截器

1、消费者拦截器

消费者也有相应的拦截器概念,消费者拦截器主要是在消费到消息或者在提交消费位移时进行的一些定制化的操作。

2、消费者拦截器 使用场景:

对消费消息设置一个有效期的属性,如果某条消息在既定的时间窗口内无法到达,那就视为无效,不需要再被处理。

3、在 kafka_learn 工程中,创建 消费者拦截器 类 ConsumerInterceptorTTL.java


/***  D:\java-test\idea2019\kafka_learn\src\main\java\djh\it\kafka\learn\chapter3\ConsumerInterceptorTTL.java**  2024-6-22 创建 消费者拦截器 类 ConsumerInterceptorTTL.java*/
package djh.it.kafka.learn.chapter3;import org.apache.commons.collections.map.HashedMap;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class ConsumerInterceptorTTL implements ConsumerInterceptor<String, String> {private static final long EXPIRE_INTERVAL = 10 * 1000;@Overridepublic ConsumerRecords<String, String> onConsume( ConsumerRecords<String, String> records ) {System.out.println("before" + records);long now = System.currentTimeMillis();Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashedMap();for(TopicPartition tp : records.partitions()){List<ConsumerRecord<String, String>> tpRecords = records.records(tp);List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();for(ConsumerRecord<String, String> record : tpRecords){//设置一个发送时间戳,超过一分钟的消息,超时,不能收到此消息if(now - record.timestamp() < EXPIRE_INTERVAL){newTpRecords.add(record);}}if(!newTpRecords.isEmpty()){newRecords.put(tp, newTpRecords);}}return new ConsumerRecords<>(newRecords);}@Overridepublic void onCommit( Map<TopicPartition, OffsetAndMetadata> offsets ) {offsets.forEach((tp, offset) -> System.out.println(tp + ":" + offset.offset()));}@Overridepublic void close() {}@Overridepublic void configure( Map<String, ?> configs ) {}
}

4、在 kafka_learn 工程中,创建 消费者 KafkaConsumerAnalysis.java 类,自定义分区器、自定义拦截器 分析,进行消费消息测试


/***  kafka_learn\src\main\java\djh\it\kafka\learn\chapter3\KafkaConsumerAnalysis.java**  2024-6-22 创建 消费者 KafkaConsumerAnalysis.java 类,自定义分区器、自定义拦截器 分析,进行消费消息测试*/
package djh.it.kafka.learn.chapter3;//注意导包,一定要导成 kafka 的序列化包
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;public class KafkaConsumerAnalysis {//private static final String brokerList = "localhost:9092";private static final String brokerList = "172.18.30.110:9092";private static final String topic = "heima";private static final String groupId = "group.heima";private static final AtomicBoolean isRunning = new AtomicBoolean(true);public static Properties initConfig(){Properties props = new Properties();//1)设置 key 序列化器 -- 优化代码//properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//2)设置值序列化器 -- 优化代码//properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//3)设置集群地址 -- 优化代码//properties.put("bootstrap.servers", brokerList);props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);//4)消费组//properties.put("group.id", groupId);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//指定 KafkaConsumer 对应的客户端ID,默认为空,如果不设置KafkaConsumer会自动生成一个非空字符串props.put("client.id", "consumer.client.id.demo");// 指定消费者拦截器props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptorTTL.class.getName());//kafka 消费者找不到消费的位移时,从什么位置开始消费,默认:latest :末尾开始消费 earliest : 从头开始//props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");//        //是否启用自动位移提交
//        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);return props;}public static void main( String[] args ) throws InterruptedException{Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topic));// 正则订阅主题//consumer.subscribe(Pattern.compile("heima"));// 指定订阅的分区//consumer.assign(Arrays.asList(new TopicPartition("heima", 0)));try{while (isRunning.get()){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String, String> record : records){System.out.println("topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset() );System.out.println("key = " + record.key() + ", value = " + record.value());// do something to process record.}}}catch (Exception e){e.printStackTrace();//log.error("occur exception ", e);} finally {consumer.close();}}
}

5、在 kafka_learn 工程中,创建 生产者 ProducerFastStart.java 类中,添加超时发送和不超时发送消息,进行测试。


/***  kafka_learn\src\main\java\djh\it\kafka\learn\chapter1\ProducerFastStart.java**  2024-6-21 创建 生产者 ProducerFastStart.java 类*/
package djh.it.kafka.learn.chapter1;import org.apache.kafka.clients.producer.*;
//注意导包,一定要导成 kafka 的序列化包
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.Future;public class ProducerFastStart {//private static final String brokerList = "localhost:9092";private static final String brokerList = "172.18.30.110:9092";private static final String topic = "heima";public static void main( String[] args ) {Properties properties = new Properties();//1)设置 key 序列化器 -- 优化代码//properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//2)设置重试次数 -- 优化代码properties.put(ProducerConfig.RETRIES_CONFIG, 10);//3)设置值序列化器 -- 优化代码//properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//4)设置集群地址 -- 优化代码//properties.put("bootstrap.servers", brokerList);properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);ProducerRecord<String,String> record = new ProducerRecord<>(topic, "kafka-demo-000", "hello,kafka");//设置一个发送时间戳倒退500毫秒的消息,不超时,能消费到此消息ProducerRecord<String,String> record2 = new ProducerRecord<>(topic, 0, System.currentTimeMillis() - 10 * 500,"kafka-demo-001", "hello,kafka-> 5秒不超时");//设置一个发送时间戳倒退一分钟的消息,超时,不能收到此消息ProducerRecord<String,String> record3 = new ProducerRecord<>(topic, 0, System.currentTimeMillis() - 10 * 1000,"kafka-demo-001", "hello,kafka->10秒超时");try{producer.send(record);producer.send(record2);  //发送时间戳倒退500毫秒的消息,不超时,能消费到此消息producer.send(record3);  //发送时间戳倒退一分钟的消息,超时,不能收到此消息//            //发送类型--同步发送
//            Future<RecordMetadata> send = producer.send(record);
//            RecordMetadata recordMetadata = send.get();
//            System.out.println("topic: " + recordMetadata.topic());
//            System.out.println("partition: " + recordMetadata.partition());
//            System.out.println("offset: " + recordMetadata.offset());//            //发送类型--异步发送
//            producer.send(record, new Callback() {
//                public void onCompletion(RecordMetadata metadata, Exception exception) {
//                    if (exception == null) {
//                        System.out.println("topic: " + metadata.topic());
//                        System.out.println("partition: " + metadata.partition());
//                        System.out.println("offset: " + metadata.offset());
//                    }
//                }
//            });}catch (Exception e){e.printStackTrace();}producer.close();}
}

超时消息未接收到(超过1分钟).png

九、kafka 消费者 总结

1、kafka 消费者参数补充:

  • 1)fetch.min.bytes

这个参数允许消费者指定从 broker 读取消息时最小的数据量。当消费者从 broker 读取消息时,如果数据量小于这个阈值,broker 会等待直到有足够的数据,然后才返回给消费者。对于写入量不高的主题来说,这个参数可以减少 broker 和消费者的压力,因为减少了往返的时间。而对于有大量消费者的主题来说,则可以明显减轻 broker 压力。

  • 2)fetch.max.wait.ms

上面的 fetch.min.bvtes 参数指定了消费者读取的最小数据量,而这个参数则指定了消费者读取时最长等待时间,从而避免长时间阻塞。这个参数默认为 500ms。

  • 3)max.partition.fetch.bytes

这个参数指定了每个分区返回的最多字节数,默认为1M。也就是说,Kafkaconsumer.poll(0) 返回记录列表时,每个分区的记录字节数最多为 1M。如果一个主题有 20 个分区,同时有5个消费者,那么每个消费者需要 4M 的空间来处理消息。实际情况中,我们需要设置更多的空间,这样当存在消费者宕机时,其他消费者可以承担更多的分区。

  • 4)max.poll.records

这个参数控制一个 poll(0) 调用返回的记录数,这个可以用来控制应用在拉取循环中的处理数据量。

2、kafka 消费者总结

  • kafka 消费者和消费组的概念,
  • 使用 KafkaConsumer,
  • kafka 消费者参数的配置,
  • kafka 订阅、
  • kafka 反序列化、
  • kafka 位移提交、
  • kafka 再均衡、
  • kafka 拦截器等。

上一节关联链接请点击
# Kafka_深入探秘者(2):kafka 生产者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/33105.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Web应用防火墙(WAF)(上:基础概念篇)

运维专题 Web应用防火墙&#xff08;WAF&#xff09;&#xff08;上&#xff1a;基础概念篇&#xff09; - 文章信息 - Author: 李俊才 (jcLee95) Visit me at CSDN: https://jclee95.blog.csdn.netMy WebSite&#xff1a;http://thispage.tech/Email: 291148484163.com. She…

pgAdmin后台命令执行漏洞(CVE-2023-5002)

​ 我们可以看到针对于漏洞 CVE-2022-4223&#xff0c;官方做了一定的修复措施。 web\pgadmin\misc_init_.py#validate_binary_path ​ 首先是添加了 login_required​ 进行权限校验。在 Flask 框架中&#xff0c;login_required​ 装饰器通常与 Flask-Login 扩展一起使用。…

LED恒流调光电路

LED等在工作的时候发热较大&#xff0c;所以通常选用铝基板作为底板&#xff1b;常用白色油墨。 LED必须在恒流源电路下工作&#xff0c;下图为最简单的恒流源&#xff1a;B极电压3.3V不变左下侧蓝色的为稳压二极管&#xff0c;由于BE极可以看做二极管&#xff0c;压降为0.7V&…

OpenCV颜色检测

OpenCV颜色检测 前言策略分析根据颜色检测目标对象相关链接 前言 绿幕技术是一种经典的视频编辑技术&#xff0c;可以用于将人物置于不同的背景中。例如在电影制作中&#xff0c;技术的关键在于演员不能身着特定颜色的衣服(比如绿色)&#xff0c;站在只有绿色的背景前。然后&a…

异地组网如何OEM?

在现代信息社会中&#xff0c;企业越来越需要跨地域进行数据传输与共享。面临的挑战却是如何在不暴露在公网的情况下&#xff0c;实现异地组网并保障数据的安全性。本文将介绍一种名为“异地组网OEM”的解决方案&#xff0c;该方案能够通过私有通道传输数据并对数据进行安全加密…

我的大学生活-人面不知何处去(大三篇)

我的大学生活&#xff08;大三篇&#xff09; 前言推荐大三&#xff08;人面不知何处去&#xff09;2022年8月2022年9月2022年10月2022年11月2022年12月 寒假2023年1月 大三&#xff08;人面不知何处去&#xff09;2023年2月2023年3月2023年4月2023年5月2023年6月 暑假2023年7月…

【LeedCode】二分查找算法(一)

二分查找算法的时间复杂度是O(logN) &#xff0c;更优于传统的遍历数组算法值得我们学习。 注意二分查找一般使用的前提是&#xff1a;待操作的数组的元素有某种规律也就是要有二阶性&#xff0c;二段性就是在数组中选取一点根据该数组元素某种规律可以把数组分为两部分&#x…

国企:2024年6月中国移动相关招聘信息 三

中国移动卓望公司-卓望信息 卓望公司成立于2000年6月,是中国移动的控股子公司,积极拓展互联网、IT、ICT领域,提供平台及应用开发、运营运维等服务。  成立二十余年来,卓望公司逐渐形成包括业务合作管理、内容渠道运营、网络集中运维、企业服务、安全服务、行业DICT服务等…

LDR6500U,让设备爱上“被骗”的充电速度!

在数字设备日新月异的今天&#xff0c;兼容性和充电效率已成为用户关注的核心焦点。尤其是随着电子设备市场的全球化发展&#xff0c;标准化的需求日益凸显。近期&#xff0c;欧洲联盟&#xff08;简称“欧盟”&#xff09;就电子设备充电接口问题做出了重要决策&#xff0c;要…

高校新生如何选择最优手机流量卡?

一年一度的高考已经结束了&#xff0c;愿广大学子金榜题名&#xff0c;家长们都给孩子准备好了手机&#xff0c;那么手机流量卡应该如何选择呢&#xff1f; 高校新生在选择手机流量卡时&#xff0c;需要综合考量流量套餐、费用、网络覆盖、售后服务等多方面因素&#xff0c;以下…

Java开发-实际工作经验和技巧-0001-PostgreSQL数据库存储磁盘满了重启以及应急措施

Java开发-实际工作经验和技巧-0001-PostgreSQL数据库存储磁盘满了重启以及应急措施 更多内容欢迎关注我&#xff08;持续更新中&#xff0c;欢迎Star✨&#xff09; Github&#xff1a;CodeZeng1998/Java-Developer-Work-Note 技术公众号&#xff1a;CodeZeng1998&#xff0…

用于射频功率应用的氮化铝电阻元件

EAK推出了新的厚膜氮化铝 &#xff08;AlN&#xff09; 电阻器和端接系列&#xff0c;以补充公司现有的产品。传统上&#xff0c;射频功率电阻元件采用氧化铍&#xff08;BeO&#xff09;陶瓷材料作为陶瓷基板;然而&#xff0c;由于国际上要求从产品中去除BeO的压力&#xff0c…

[HBM] HBM 国产进程, 国产HBM首次研发成功 (202406)

依公知及经验整理&#xff0c;原创保护&#xff0c;禁止转载。 专栏 《深入理解DDR》 AI 的火热浪潮带火了高带宽内存的需求&#xff0c;HBM已是存储市场耀眼的明星。目前市场上还没有国产HBM, 什么时候可以看到国产希望呢&#xff1f; 或许现在可以看到曙光了。 1. 设计端 1…

免费内网穿透工具 ,快解析内网穿透解决方案

在IPv4公网IP严重不足的环境下&#xff0c;内网穿透技术越来越多的被人们所使用&#xff0c;使用内网穿透技术的好处有很多。 1&#xff1a;无需公网ip 物以稀为贵&#xff0c;由于可用的公网IP地址越来越少&#xff0c;价格也是水涨船高&#xff0c;一个固定公网IP一年的成本…

全面讲解数字化采购:整体技术架构与最佳实践

在全球化和数字化浪潮的推动下&#xff0c;企业的采购流程正经历深刻变革。数字化采购通过引入先进的信息技术&#xff0c;优化供应链管理&#xff0c;提高采购效率&#xff0c;降低成本。本文将详细介绍数字化采购的整体技术架构&#xff0c;并分享最佳实践经验&#xff0c;帮…

Jenkins nginx自动化构建前端vue项目

在现代的Web开发中&#xff0c;Vue.js已经成为一种非常流行的JavaScript框架。为了更高效地管理和部署Vue.js项目&#xff0c;使用自动化构建工具是至关重要的。Jenkins作为一款强大的持续集成和持续部署&#xff08;CI/CD&#xff09;工具&#xff0c;为我们提供了一种便捷的方…

海洋生物识别系统+图像识别+Python+人工智能课设+深度学习+卷积神经网络算法+TensorFlow

一、介绍 海洋生物识别系统。以Python作为主要编程语言&#xff0c;通过TensorFlow搭建ResNet50卷积神经网络算法&#xff0c;通过对22种常见的海洋生物&#xff08;‘蛤蜊’, ‘珊瑚’, ‘螃蟹’, ‘海豚’, ‘鳗鱼’, ‘水母’, ‘龙虾’, ‘海蛞蝓’, ‘章鱼’, ‘水獭’, …

Apple - Media Playback Programming Guide

本文翻译整理自&#xff1a;Media Playback Programming Guide&#xff08;Updated: 2018-01-16 https://developer.apple.com/library/archive/documentation/AudioVideo/Conceptual/MediaPlaybackGuide/Contents/Resources/en.lproj/Introduction/Introduction.html#//apple_…

鸿蒙开发系统基础能力:【@ohos.faultLogger (故障日志获取)】

故障日志获取 说明&#xff1a; 本模块首批接口从API version 8开始支持。后续版本的新增接口&#xff0c;采用上角标单独标记接口的起始版本。 导入模块 import faultLogger from ohos.faultLoggerFaultType 故障类型枚举。 系统能力&#xff1a; 以下各项对应的系统能力…