SpringBoot-集成Kafka详解

SpringBoot集成Kafka

1、构建项目

1.1、引入依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.5.RELEASE</version>
</parent>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.28</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency>
</dependencies>
1.2、application.yml配置
spring:application:name: application-kafkakafka:bootstrap-servers: localhost:9092 #这个是kafka的地址,对应你server.properties中配置的producer:batch-size: 16384 #批量大小acks: -1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)retries: 10 # 消息发送重试次数#transaction-id-prefix: transactionbuffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:linger:ms: 2000 #提交延迟#partitioner: #指定分区器#class: pers.zhang.config.CustomerPartitionHandlerconsumer:group-id: testGroup #默认的消费组IDenable-auto-commit: true #是否自动提交offsetauto-commit-interval: 2000 #提交offset延时# 当kafka中没有初始offset或offset超出范围时将自动重置offset# earliest:重置为分区中最小的offset;# latest:重置为分区中最新的offset(消费分区中新产生的数据);# none:只要有一个分区不存在已提交的offset,就抛出异常;auto-offset-reset: latestmax-poll-records: 500 #单次拉取消息的最大条数key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session:timeout:ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作)request:timeout:ms: 18000 # 消费请求的超时时间listener:missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错
#      type: batch
1.3、简单生产
@RestController
public class kafkaProducer {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/normal/{message}")public void sendNormalMessage(@PathVariable("message") String message) {kafkaTemplate.send("sb_topic", message);}
}
1.4、简单消费
@Component
public class KafkaConsumer {//监听消费@KafkaListener(topics = {"sb_topic"})public void onNormalMessage(ConsumerRecord<String, Object> record) {System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +record.value());}}
简单消费:sb_topic-0=111
简单消费:sb_topic-0=222
简单消费:sb_topic-0=333

2、生产者

2.1、带回调的生产者

kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法,

/*** 回调的第一种写法* @param message*/
@GetMapping("/kafka/callbackOne/{message}")
public void sendCallbackOneMessage(@PathVariable("message") String message) {kafkaTemplate.send("sb_topic", message).addCallback(new SuccessCallback<SendResult<String, Object>>() {//成功的回调@Overridepublic void onSuccess(SendResult<String, Object> success) {// 消息发送到的topicString topic = success.getRecordMetadata().topic();// 消息发送到的分区int partition = success.getRecordMetadata().partition();// 消息在分区内的offsetlong offset = success.getRecordMetadata().offset();System.out.println("发送消息成功1:" + topic + "-" + partition + "-" + offset);}}, new FailureCallback() {//失败的回调@Overridepublic void onFailure(Throwable throwable) {System.out.println("发送消息失败1:" + throwable.getMessage());}});
}

 

发送消息成功1:sb_topic-0-3
简单消费:sb_topic-0=one
/*** 回调的第二种写法* @param message*/
@GetMapping("/kafka/callbackTwo/{message}")
public void sendCallbackTwoMessage(@PathVariable("message") String message) {kafkaTemplate.send("sb_topic", message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {System.out.println("发送消息失败2:"+throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> result) {System.out.println("发送消息成功2:" + result.getRecordMetadata().topic() + "-"+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());}});
}

发送消息成功2:sb_topic-0-4
简单消费:sb_topic-0=two
2.2、监听器

Kafka提供了ProducerListener 监听器来异步监听生产者消息是否发送成功,我们可以自定义一个kafkaTemplate添加ProducerListener,当消息发送失败我们可以拿到消息进行重试或者把失败消息记录到数据库定时重试。

@Configuration
public class KafkaConfig {@AutowiredProducerFactory producerFactory;@Beanpublic KafkaTemplate<String, Object> kafkaTemplate() {KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<String, Object>();kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {@Overridepublic void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {System.out.println("发送成功 " + producerRecord.toString());}@Overridepublic void onSuccess(String topic, Integer partition, String key, Object value, RecordMetadata recordMetadata) {System.out.println("发送成功 topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value);}@Overridepublic void onError(ProducerRecord<String, Object> producerRecord, Exception exception) {System.out.println("发送失败" + producerRecord.toString());System.out.println(exception.getMessage());}@Overridepublic void onError(String topic, Integer partition, String key, Object value, Exception exception) {System.out.println("发送失败" + "topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value);System.out.println(exception.getMessage());}});return kafkaTemplate;}
}

注意:当我们发送一条消息,既会走 ListenableFutureCallback 回调,也会走ProducerListener回调。

2.3、自定义分区器

我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:

1、若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;
2、若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;
3、patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;
我们来自定义一个分区策略,将消息发送到我们指定的partition,首先新建一个分区器类实现Partitioner接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区
 

public class CustomizePartitioner implements Partitioner {@Overridepublic int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {//自定义分区规则,默认全部发送到0号分区return 0;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

在application.properties中配置自定义分区器,配置的值就是分区器类的全路径名

# 自定义分区器
spring.kafka.producer.properties.partitioner.class=pers.zhang.config.CustomizePartitioner
2.4、事务提交

如果在发送消息时需要创建事务,可以使用 KafkaTemplate 的 executeInTransaction 方法来声明事务:

@GetMapping("/kafka/transaction/{message}")
public void sendTransactionMessage(@PathVariable("message") String message) {//声明事务:后面报错消息不会发出去kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<String, Object, Object>() {@Overridepublic Object doInOperations(KafkaOperations<String, Object> operations) {operations.send("sb_topic", message + " test executeInTransaction");throw new RuntimeException("fail");}});// //不声明事务:后面报错但前面消息已经发送成功了// kafkaTemplate.send("sb_topic", message + " test executeInNoTransaction");// throw new RuntimeException("fail");
}

注意:如果声明了事务,需要在application.yml中指定:

spring:kafka:producer:transaction-id-prefix: tx_ #事务id前缀

3、消费者

3.1、指定topic、partition、offset消费

前面我们在监听消费topic1的时候,监听的是topic1上所有的消息,如果我们想指定topic、指定partition、指定offset来消费呢?也很简单,@KafkaListener注解已全部为我们提供

spring:kafka:listener:type: batch #设置批量消费consumer:max-poll-records: 50 #每次最多消费多少条消息

属性解释:

  • id:消费者ID
  • groupId:消费组ID
  • topics:监听的topic,可监听多个
  • topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听,手动分区。
//批量消费
@KafkaListener(id = "consumer2", topics = {"sb_topic"}, groupId = "sb_group")
public void onBatchMessage(List<ConsumerRecord<String, Object>> records) {System.out.println(">>> 批量消费一次,recoreds.size()=" + records.size());for (ConsumerRecord<String, Object> record : records) {System.out.println(record.value());}
}
>>> 批量消费一次,recoreds.size()=4
hello
hello
hello
hello
>>> 批量消费一次,recoreds.size()=2
hello
hello
3.2、异常处理

ConsumerAwareListenerErrorHandler 异常处理器,新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,用@Bean注入,BeanName默认就是方法名,然后我们将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器。
 

//异常处理器
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {return new ConsumerAwareListenerErrorHandler() {@Overridepublic Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {System.out.println("消费异常:" + message.getPayload());return null;}};
}// 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
@KafkaListener(topics = {"sb_topic"},errorHandler = "consumerAwareErrorHandler")
public void onMessage4(ConsumerRecord<?, ?> record) throws Exception {throw new Exception("简单消费-模拟异常");
}// 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息
@KafkaListener(topics = "sb_topic",errorHandler="consumerAwareErrorHandler")
public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception {System.out.println("批量消费一次...");throw new Exception("批量消费-模拟异常");
}
批量消费一次...
消费异常:[ConsumerRecord(topic = sb_topic, partition = 0, leaderEpoch = 0, offset = 19, CreateTime = 1692604586558, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello), ConsumerRecord(topic = sb_topic, partition = 0, leaderEpoch = 0, offset = 20, CreateTime = 1692604587164, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello), ConsumerRecord(topic = sb_topic, partition = 0, leaderEpoch = 0, offset = 21, CreateTime = 1692604587790, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello)]
3.3、消息过滤器

消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。

配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。
 

@Autowired
ConsumerFactory consumerFactory;//消息过滤器
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory);//被过滤的消息将被丢弃factory.setAckDiscarded(true);//消息过滤策略factory.setRecordFilterStrategy(new RecordFilterStrategy() {@Overridepublic boolean filter(ConsumerRecord consumerRecord) {if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {return false;}return true;}});return factory;
}//消息过滤监听
@KafkaListener(topics = {"sb_topic"},containerFactory = "filterContainerFactory")
public void onMessage6(ConsumerRecord<?, ?> record) {System.out.println(record.value());
}

上面实现了一个"过滤奇数、接收偶数"的过滤策略,我们向topic发送0-9总共10条消息,看一下监听器的消费情况,可以看到监听器只消费了偶数:

3.4、消息转发

在实际开发中,我们可能有这样的需求,应用A从TopicA获取到消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。

在SpringBoot集成Kafka实现消息的转发也很简单,只需要通过一个@SendTo注解,被注解方法的return值即转发的消息内容,如下:
 

//消息转发 从sb_topic转发到sb_topic2
@KafkaListener(topics = {"sb_topic"})
@SendTo("sb_topic2")
public String onMessage7(ConsumerRecord<?, ?> record) {return record.value()+"-forward message";
}@KafkaListener(topics = {"sb_topic2"})
public void onMessage8(ConsumerRecord<?, ?> record) {System.out.println("收到sb_topic转发过来的消息:" + record.value());
}
收到sb_topic转发过来的消息:hello-forward message
收到sb_topic转发过来的消息:hello-forward message
收到sb_topic转发过来的消息:hello-forward message
收到sb_topic转发过来的消息:hello-forward message
3.5、定时启动、停止

默认情况下,当消费者项目启动的时候,监听器就开始工作,监听消费发送到指定topic的消息,那如果我们不想让监听器立即工作,想让它在我们指定的时间点开始工作,或者在我们指定的时间点停止工作,该怎么处理呢——使用KafkaListenerEndpointRegistry,下面我们就来实现:

1、禁止监听器自启动;
2、创建两个定时任务,一个用来在指定时间点启动定时器,另一个在指定时间点停止定时器;
新建一个定时任务类,用注解@EnableScheduling声明,KafkaListenerEndpointRegistry 在Spring中已经被注册为Bean,直接注入,设置禁止KafkaListener自启动
 

@EnableScheduling
@Component
public class CronTimer {/*** @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,* 而是会被注册在KafkaListenerEndpointRegistry中,* 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean**/@Autowiredprivate KafkaListenerEndpointRegistry registry;@Autowiredprivate ConsumerFactory consumerFactory;
​// 监听器容器工厂(设置禁止KafkaListener自启动)@Beanpublic ConcurrentKafkaListenerContainerFactory delayContainerFactory() {ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();container.setConsumerFactory(consumerFactory);//禁止KafkaListener自启动container.setAutoStartup(false);return container;}// 监听器@KafkaListener(id="timingConsumer",topics = "sb_topic",containerFactory = "delayContainerFactory")public void onMessage1(ConsumerRecord<?, ?> record){System.out.println("消费成功:"+record.topic()+"-"+record.partition()+"-"+record.value());}// 定时启动监听器@Scheduled(cron = "0 42 11 * * ? ")public void startListener() {System.out.println("启动监听器...");// "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器if (!registry.getListenerContainer("timingConsumer").isRunning()) {registry.getListenerContainer("timingConsumer").start();}//registry.getListenerContainer("timingConsumer").resume();}
​// 定时停止监听器@Scheduled(cron = "0 45 11 * * ? ")public void shutDownListener() {System.out.println("关闭监听器...");registry.getListenerContainer("timingConsumer").pause();}
}

启动项目,触发生产者向topic1发送消息,可以看到consumer没有消费,因为这时监听器还没有开始工作,

11:42分监听器启动开始工作,消费消息

11:45分监听器停止工作:

3.6、手动确认消息

默认情况下Kafka的消费者是自动确认消息的,通常情况下我们需要在业务处理成功之后手动触发消息的签收,否则可能会出现:消息消费到一半消费者异常,消息并未消费成功但是消息已经自动被确认,也不会再投递给消费者,也就导致消息丢失了。

当 auto.commit.enable 设置为false时,表示kafak的offset由customer手动维护,spring-kafka提供了通过ackMode的值表示不同的手动提交方式;
 

public enum AckMode {// 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交RECORD,// 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交BATCH,// 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交TIME,// 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交COUNT,// TIME | COUNT 有一个条件满足时提交COUNT_TIME,// 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交MANUAL,// 手动调用Acknowledgment.acknowledge()后立即提交MANUAL_IMMEDIATE,
}

如果设置AckMode模式为MANUAL或者MANUAL_IMMEDIATE,则需要对监听消息的方法中,引入Acknowledgment对象参数,并调用acknowledge()方法进行手动提交;

第一步:添加kafka配置,把 spring.kafka.listener.ack-mode = manual 设置为手动
 

spring:kafka:listener:ack-mode: manual consumer:enable-auto-commit: false

第二步;消费消息的时候,给方法添加Acknowledgment参数签收消息:

@KafkaListener(topics = {"sb_topic"})
public void onMessage9(ConsumerRecord<String, Object> record, Acknowledgment ack) {System.out.println("收到消息:" + record.value());//确认消息ack.acknowledge();
}

4、配置详解

4.1、生产者yml方式
server:port: 8081
spring:kafka:producer:# Kafka服务器bootstrap-servers: 175.24.228.202:9092# 开启事务,必须在开启了事务的方法中发送,否则报错transaction-id-prefix: kafkaTx-# 发生错误后,消息重发的次数,开启事务必须设置大于0。retries: 3# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。# 开启事务时,必须设置为allacks: all# 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 生产者内存缓冲区的大小。buffer-memory: 1024000# 键的序列化方式key-serializer: org.springframework.kafka.support.serializer.JsonSerializer# 值的序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类)value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
4.2、生产者Config方式
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.transaction.KafkaTransactionManager;import java.util.HashMap;
import java.util.Map;/*** kafka配置,也可以写在yml,这个文件会覆盖yml*/
@SpringBootConfiguration
public class KafkaProviderConfig {@Value("${spring.kafka.producer.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.transaction-id-prefix}")private String transactionIdPrefix;@Value("${spring.kafka.producer.acks}")private String acks;@Value("${spring.kafka.producer.retries}")private String retries;@Value("${spring.kafka.producer.batch-size}")private String batchSize;@Value("${spring.kafka.producer.buffer-memory}")private String bufferMemory;@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>(16);props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);//acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。//acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。//acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。//开启事务必须设为allprops.put(ProducerConfig.ACKS_CONFIG, acks);//发生错误后,消息重发的次数,开启事务必须大于0props.put(ProducerConfig.RETRIES_CONFIG, retries);//当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送//批次的大小可以通过batch.size 参数设置.默认是16KB//较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。//比如说,kafka里的消息5秒钟Batch才凑满了16KB,才能发送出去。那这些消息的延迟就是5秒钟//实测batchSize这个参数没有用props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);//有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间,//即使数据没达到16KB,也将这个批次发送出去props.put(ProducerConfig.LINGER_MS_CONFIG, "5000");//生产者内存缓冲区的大小props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);//反序列化,和生产者的序列化方式对应props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);return props;}@Beanpublic ProducerFactory<Object, Object> producerFactory() {DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());//开启事务,会导致 LINGER_MS_CONFIG 配置失效factory.setTransactionIdPrefix(transactionIdPrefix);return factory;}@Beanpublic KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) {return new KafkaTransactionManager<>(producerFactory);}@Beanpublic KafkaTemplate<Object, Object> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}
4.3、消费者yml方式
server:port: 8082
spring:kafka:consumer:# Kafka服务器bootstrap-servers: 175.24.228.202:9092group-id: firstGroup# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D#auto-commit-interval: 2s# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)# none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: latest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式#key-deserializer: org.apache.kafka.common.serialization.StringDeserializerkey-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer# 值的反序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类)value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer# 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要properties:spring:json:trusted:packages: "*"# 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。# 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,# 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,# 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。# 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数# 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况max-poll-records: 3properties:# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancemax:poll:interval:ms: 600000# 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10ssession:timeout:ms: 10000listener:# 在侦听器容器中运行的线程数,一般设置为 机器数*分区数concurrency: 4# 自动提交关闭,需要设置手动消息确认ack-mode: manual_immediate# 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误missing-topics-fatal: false# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancepoll-timeout: 600000
4.4、消费者Config方式
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap;
import java.util.Map;/*** kafka配置,也可以写在yml,这个文件会覆盖yml*/
@SpringBootConfiguration
public class KafkaConsumerConfig {@Value("${spring.kafka.consumer.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.consumer.enable-auto-commit}")private boolean enableAutoCommit;@Value("${spring.kafka.properties.session.timeout.ms}")private String sessionTimeout;@Value("${spring.kafka.properties.max.poll.interval.ms}")private String maxPollIntervalTime;@Value("${spring.kafka.consumer.max-poll-records}")private String maxPollRecords;@Value("${spring.kafka.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${spring.kafka.listener.concurrency}")private Integer concurrency;@Value("${spring.kafka.listener.missing-topics-fatal}")private boolean missingTopicsFatal;@Value("${spring.kafka.listener.poll-timeout}")private long pollTimeout;@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>(16);propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);//自动提交的时间间隔,自动提交开启时生效propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");//该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理://earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录//latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)//none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);//两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancepropsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime);//这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。//这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,//如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,//然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。//要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数//注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10spropsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);//序列化(建议使用Json,这种序列化方式可以无需额外配置传输实体类)propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);return propsMap;}@Beanpublic ConsumerFactory<Object, Object> consumerFactory() {//配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要try(JsonDeserializer<Object> deserializer = new JsonDeserializer<>()) {deserializer.trustedPackages("*");return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new JsonDeserializer<>(), deserializer);}}@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());//在侦听器容器中运行的线程数,一般设置为 机器数*分区数factory.setConcurrency(concurrency);//消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误factory.setMissingTopicsFatal(missingTopicsFatal);//自动提交关闭,需要设置手动消息确认factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);factory.getContainerProperties().setPollTimeout(pollTimeout);//设置为批量监听,需要用List接收//factory.setBatchListener(true);return factory;}
}

5、注解消费示例

5.1、简单消费
    /*** 指定一个消费者组,一个主题主题。* @param record*/@KafkaListener(topics = IPHONE_TOPIC,groupId = APPLE_GROUP)public void simpleConsumer(ConsumerRecord<String, String> record) {System.out.println("进入simpleConsumer方法");System.out.printf("分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}
5.2、监听多个主题
    /*** 指定多个主题。** @param record*/@KafkaListener(topics = {IPHONE_TOPIC,IPAD_TOPIC},groupId = APPLE_GROUP)public void topics(ConsumerRecord<String, String> record) {System.out.println("进入topics方法");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}
5.3、监听一个主题,指定分区消费
    /*** 监听一个主题,且指定消费主题的哪些分区。* 参数详解:消费者组=apple_group;监听主题=iphoneTopic;只消费的分区=1,2;消费者数量=2* @param record*/@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"})},concurrency = "2")public void consumeByPattern(ConsumerRecord<String, String> record) {System.out.println("consumeByPattern");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}
5.4、指定多个分区,指定起始偏移量,多线程消费
    /*** 指定多个分区从哪个偏移量开始消费。* 10个线程,也就是10个消费者*/@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPAD_TOPIC,partitions = {"0","1"},partitionOffsets = {@PartitionOffset(partition = "2", initialOffset = "10"),@PartitionOffset(partition = "3", initialOffset = "0"),})},concurrency = "10")public void consumeByPartitionOffsets(ConsumerRecord<String, String> record) {System.out.println("consumeByPartitionOffsets");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}
5.5、监听多个主题,指定多个分区,指定起始偏移量
    /*** 指定多个主题。参数详解如上面的方法。* @param record*/@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),@TopicPartition(topic = IPAD_TOPIC, partitions = "1",partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))},concurrency = "4")public void topics2(ConsumerRecord<String, String> record) {System.out.println("topics2");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}
5.6、指定多个监听器
    /*** 指定多个消费者组。参数详解如上面的方法。** @param record*/@KafkaListeners({@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),@TopicPartition(topic = IPAD_TOPIC, partitions = "1",partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))},concurrency = "3"),@KafkaListener(groupId = XM_GROUP,topicPartitions = {@TopicPartition(topic = XMPHONE_TOPIC, partitions = {"1", "2"}),@TopicPartition(topic = XMPAD_TOPIC, partitions = "1",partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))},concurrency = "3")})public void groupIds(ConsumerRecord<String, String> record) {System.out.println("groupIds");System.out.println("内容:" + record.value());System.out.println("分区:" + record.partition());System.out.println("偏移量:" + record.offset());System.out.println("创建消息的时间戳:" + record.timestamp());}
5.7、手动提交偏移量
    /*** 设置手动提交偏移量** @param record*/@KafkaListener(topics = IPHONE_TOPIC,groupId = APPLE_GROUP,//3个消费者concurrency = "3")public void setCommitType(ConsumerRecord<String, String> record, Acknowledgment ack) {System.out.println("setCommitType");System.out.println("内容:" + record.value());System.out.println("分区:" + record.partition());System.out.println("偏移量:" + record.offset());System.out.println("创建消息的时间戳:" + record.timestamp());ack.acknowledge();}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/154012.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【JavaEE】Spring核心与设计思想(控制反转式程序演示、IoC、DI)

一、什么是Spring&#xff1f; 通常所说的 Spring 指的是 Spring Framework&#xff08;Spring 框架&#xff09;&#xff0c;它是⼀个开源框架&#xff0c;有着活跃⽽庞⼤的社区&#xff0c;这就是它之所以能⻓久不衰的原因。Spring ⽀持⼴泛的应⽤场景&#xff0c;它可以让 …

陶瓷行业废污水处理使用MES系统的作用

陶瓷行业属于高能耗、高污染行业&#xff0c;生产过程中消耗大量矿产资源和能源&#xff0c;产生的废气、废水、废渣、粉尘等对环境造成严重污染。在近年来&#xff0c;全社会环保意识增强&#xff0c;政府出台了一系列政策、措施加大节能、减排力度&#xff0c;整治行业污染。…

C进阶---动态内存管理

目录 一、为什么存在动态内存分配 1.1静动态内存分配区别&#xff1a; 1.2静态分配的优缺点 1.3动态分配优缺点 二、动态内存函数的介绍 2.1malloc和free 2.2calloc 2.3realloc 三、常见的动态内存错误 3.1对NULL指针的解引用操作 3.2 对动态开辟空间的越界…

「分享学习」SpringCloudAlibaba高并发仿斗鱼直播平台实战完结

[分享学习]SpringCloudAlibaba高并发仿斗鱼直播平台实战完结 第一段&#xff1a;简介 Spring Cloud Alibaba是基于Spring Cloud和阿里巴巴开源技术的微效劳框架&#xff0c;普遍应用于大范围高并发的互联网应用系统。本文将引见如何运用Spring Cloud Alibaba构建一个高并发的仿…

DolphinDB 基于 Glibc 升级的性能优化实战案例

在高并发查询、查询需要涉及很多个分区的情况下&#xff0c;低版本的 glibc&#xff08;低于2.23&#xff09;会严重影响查询性能。需要升级 glibc 解决该问题优化性能。我们撰写了本文&#xff0c;通过 patchelf 工具修改可执行文件和动态库的 rpath&#xff0c;达到无需升级系…

【机器学习】032_多种神经网络层类型

一、密集层 每一层神经元都是上一层神经元的函数&#xff0c;每层每个神经元都从前一层获得所有激活的输入。 整个神经网络前一层与后一层连接在一起&#xff0c;构造的网络密集。 二、卷积层 假设有一张大小为axb像素的图片&#xff0c;上面标着一些手写数字&#xff0c…

【Vue】响应式与数据劫持

目录 前言 响应式 Vue的响应式是如何实现的 数据劫持 Vue中的data属性都具有响应式 Vue后期添加的属性如何使其具有响应式 数组的响应式处理 如何使用数组下标去修改可以具有响应式呢 前言 什么是响应式&#xff1f;数据劫持是什么&#xff1f;Vue响应式是如何实现的&a…

什么是单片机?聊聊它的历史

前言 1946年2月15日&#xff0c;第一台电子数字计算机 ENIAC问世&#xff0c;这标志着计算机时代的到来。 ENIAC 是电子管计算机&#xff0c;时钟频率虽然仅有 100 kHz&#xff0c;但能在1s 的时间内完成 5000 次加法运算。与现代的计算机相比&#xff0c;ENIAC有许多不足&am…

【Dynamic-datasource】Springboot多数据源整合

引入依赖&#xff1a; <dependency><groupId>com.baomidou</groupId><artifactId>dynamic-datasource-spring-boot-starter</artifactId><version>3.5.0</version> </dependency> 整体pom文件&#xff1a; <?xml versi…

激发创新,助力研究:CogVLM,强大且开源的视觉语言模型亮相

项目设计集合&#xff08;人工智能方向&#xff09;&#xff1a;助力新人快速实战掌握技能、自主完成项目设计升级&#xff0c;提升自身的硬实力&#xff08;不仅限NLP、知识图谱、计算机视觉等领域&#xff09;&#xff1a;汇总有意义的项目设计集合&#xff0c;助力新人快速实…

4.3 Windows驱动开发:监控进程与线程对象操作

在内核中&#xff0c;可以使用ObRegisterCallbacks这个内核回调函数来实现监控进程和线程对象操作。通过注册一个OB_CALLBACK_REGISTRATION回调结构体&#xff0c;可以指定所需的回调函数和回调的监控类型。这个回调结构体包含了回调函数和监控的对象类型&#xff0c;还有一个A…

鸿蒙APP外包开发上线流程

鸿蒙系统的上线流程可能会根据具体的版本和平台要求而略有不同。在进行上线之前&#xff0c;开发人员应该详细了解并遵循鸿蒙生态系统的相关规定和要求。鸿蒙&#xff08;HarmonyOS&#xff09;应用的上线流程通常包括以下步骤&#xff0c;希望对大家有所帮助。北京木奇移动技术…

OpenCV入门6——图像基本变换

文章目录 图像的放大与缩小缩放算法题目放大 图像的翻转图像的旋转仿射变换之图像平移仿射变换之获取变换矩阵仿射变换之变换矩阵之二OpenCV透视变换 图像的放大与缩小 缩放算法 # -*- coding: utf-8 -*- import cv2 import numpy as npimg cv2.imread(E://pic//4.jpg) # (600…

瞬态抑制二极管(TVS管)特性及电路应用?|深圳比创达电子EMC

瞬态抑制二极管简称TVS管(Transient Voltage Suppressor) 一、TVS管特性 瞬态抑制二极管是在稳压二极管的工艺上发展起来的,一种用途广泛的高效能保护器件。当TVS二极管的两极受到反向瞬态高能量冲击时&#xff0c;它能以皮秒量级的速度&#xff0c;将其两极间的高阻抗变为低…

qt和window抓包程序

1.思路 使用原始套接字&#xff0c;将网卡设置为混杂模式&#xff0c;监听该网卡的数据。 2. 了解协议封包和协议层 下图是tcp封包详细过程 数据包传输情况 在TCP/IP协议栈中的每一层为了能够正确解析出上层的数据包&#xff0c;从而使用一些“协议类型”来标记&#xff0c;详…

高压放大器使用方法介绍

高压放大器是一种用于放大高压信号的电子设备&#xff0c;常用于科学研究、工业应用和医疗设备等领域。它可以将低电压信号放大到较高的电压水平&#xff0c;以满足特定应用的需求。 使用高压放大器需要注意以下几个方面&#xff1a; 1.了解设备规格&#xff1a;在使用高压放大…

《视觉SLAM十四讲》-- 建图

11 建图 11.1 概述 &#xff08;1&#xff09;地图的几类用处&#xff1a; 定位&#xff1a;导航&#xff1a;机器人在地图中进行路径规划&#xff1b;避障重建交互&#xff1a;人与地图之间的互动 &#xff08;2&#xff09;几类地图 稀疏地图稠密地图语义地图 11.2 单目…

解决Jira导出csv最大限度是1000的问题

JIRA为了防止过多影响性能&#xff0c; 设置了导出CSV的上线为1000&#xff0c;影响了搜索结果导出以及RestAPI。 可以通过以下配置参数修改此限制&#xff1a; 通过JIRA管理界面的"高级设置 “设置以下参数 系统管理 > 系统 > 一般设置>高级设置找到 jira.sea…

034、test

之——全纪录 目录 之——全纪录 杂谈 正文 1.下载处理数据 2.数据集概览 3.构建自定义dataset 4.初始化网络 5.训练 杂谈 综合方法试一下。 leaves 1.下载处理数据 从官网下载数据集&#xff1a;Classify Leaves | Kaggle 解压后有一个图片集&#xff0c;一个提交示…

Codeforces Round 910 (Div. 2) --- B-E 补题记录

B - Milena and Admirer Problem - B - Codeforces 题目大意&#xff1a; 现在给出一个无序序列&#xff0c;你可以使用任意次操作将这个无序序列修改为不递减序列&#xff0c;操作为你可以使用两个数a和b来替换ai&#xff0c;序列就变为了 ai-1&#xff0c; a&#xff0c;…