如何安装Kafka,可以参考docker搭载Kafka集群,一个文件搞定,超简单,亲试可行-CSDN博客
1、在pom.xml中加入依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.1.6</version></dependency>
2、配置application.yml文件
在application.yml中加入
springkafka:#Kafka地址,可以是一个,也可以是Kafka集群的地址,多个地址用逗号分隔bootstrap-servers: 192.168.57.1xx:9093,192.168.57.1xx:9094,192.168.57.1xx:9095producer:# 消息确认模式:0=不等待确认,1=等待leader确认,all=所有副本确认acks: 1# 发送失败时的重试次数,0表示不重试retries: 0# 批量发送时的批次大小(字节)batch-size: 30720000 # 30MB# 生产者的内存缓冲区大小(字节)buffer-memory: 33554432 # 32MB# Key的序列化器类key-serializer: org.apache.kafka.common.serialization.StringSerializer# Value的序列化器类value-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:# 消费者所属的组IDgroup-id: test-kafka# 禁用自动提交offset,改为手动提交enable-auto-commit: false# 偏移量重置策略:# earliest:从最早的记录开始消费# latest:从最新的记录开始消费auto-offset-reset: earliest# Key的反序列化器类key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# Value的反序列化器类value-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 每次poll()调用返回的最大消息条数max-poll-records: 2session:# 消费者会话超时时间,超时未发送心跳将被认为失联(毫秒)timeout:ms: 300000 # 5分钟listener:# 如果指定的主题不存在,是否让应用启动失败,false表示不会报错missing-topics-fatal: false# 消费模式:single=单条消息,batch=批量消费type: single# 消费确认模式:# manual_immediate:手动确认消息,立即提交offsetack-mode: manual_immediate
3、主要示例代码
创建一个目录和四个java文件,可以做测试
3.1、KafkaConfig.java
Kafka监听器配置
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;@EnableKafka
@Configuration
public class KafkaConfig {// 单条消费监听器工厂,手动提交offset@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> singleFactory(ConsumerFactory<String, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.getContainerProperties().setAckMode(org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}// 批量消费监听器工厂,手动提交offset@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> batchFactory(ConsumerFactory<String, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.setBatchListener(true); // 启用批量消费factory.getContainerProperties().setAckMode(org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}
}
3.2、KafkaProducer.java
生产者
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootApplication
public class KafkaProducer {public static void main(String[] args) {SpringApplication.run(KafkaProducer.class, args);}@BeanCommandLineRunner commandLineRunner(KafkaTemplate<String, String> kafkaTemplate) {return args -> {String topic = "test-topic";for (int i = 1; i <= 10; i++) {String message = "Message " + i;kafkaTemplate.send(topic, message);System.out.println("Sent: " + message);Thread.sleep(500); // 模拟消息发送间隔}};}
}
3.3、SingleConsumer.java
单条消息消费者
autoStartup参数:是是否自动启动;=”true“:自动启动,即生产者启动,该消费者将会开始消费;=”false":不自动启动,不开该模式的消费。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;@Service
public class SingleConsumer {@KafkaListener(topics = "test-topic", groupId = "test-group", containerFactory = "singleFactory", autoStartup = "true")public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {System.out.println("SingleConsumer - Received: " + record.value());// 手动提交offsetacknowledgment.acknowledge();}
}
3.4、BatchConsumer.java
批量消息消费者
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;import java.util.List;@Service
public class BatchConsumer {@KafkaListener(topics = "test-topic", groupId = "test-group", containerFactory = "batchFactory", autoStartup = "false")public void batchListen(List<String> messages, Acknowledgment acknowledgment) {System.out.println("BatchConsumer - Received batch: " + messages);// 手动提交offsetacknowledgment.acknowledge();}
}
4、测试
4.1、单条信息消费模式
在SingleConsumer.java中设置autoStartup = "true",启动KafkaProducer.java
消费成功
4.2、批量信息消费模式
在BatchConsumer.java中设置autoStartup = "true",启动KafkaProducer.java
配置文件中设置了max-poll-records: 2,所有一次只消费两条
消费成功
如果在BatchConsumer.java和SingleConsumer.java中设置autoStartup = "true",Kafka会随机选择消费者组里的一个消费者进行消费,所有可以会导致其中一个消费者没有消费信息