一、配置文件
pom.xml
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
application.yml
spring:application:name: double-kafka-consumerprofiles:active: devjackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8server:port: 8008sys:kafka:one:bootstrap-servers: 192.168.1.2:5021consumer:group-id: one-groupauto-offset-reset: latestenable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 5topic: one-kafka-testtwo:bootstrap-servers: 192.168.1.3:5021consumer:group-id: two-groupauto-offset-reset: latestenable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 5topic: two-kafka-test
二、配置Configuration Bean
(1)第一个kafka配置
public class OneKafkaConfig {@Beanpublic KafkaListenerContainerFactory oneKafkaFactory(@Autowired @Qualifier("oneConsumerFactory") ConsumerFactory oneConsumerFactory) {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(oneConsumerFactory);factory.setConcurrency(10);factory.setBatchListener(true);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//手动提交return factory;}@Beanpublic ConsumerFactory oneConsumerFactory(@Autowired @Qualifier("oneKafkaProperties") KafkaProperties oneKafkaProperties){return new DefaultKafkaConsumerFactory(oneKafkaProperties.buildConsumerProperties());}@ConfigurationProperties(prefix = "sys.kafka.one")@Beanpublic KafkaProperties oneKafkaProperties(){return new KafkaProperties();}}
(2)第二个kafka配置(主)
public class TwoKafkaConfig {@Beanpublic KafkaListenerContainerFactory twoKafkaFactory(@Autowired @Qualifier("twoConsumerFactory") ConsumerFactory twoConsumerFactory) {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(twoConsumerFactory);factory.setConcurrency(10);factory.setBatchListener(true);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}@Primary//必须指定一个默认的消费者工厂@Beanpublic ConsumerFactory twoConsumerFactory(@Autowired @Qualifier("twoKafkaProperties") KafkaProperties twoKafkaProperties){return new DefaultKafkaConsumerFactory(twoKafkaProperties.buildConsumerProperties());}@Primary//必须指定一个默认的kafka配置@ConfigurationProperties(prefix = "sys.kafka.two")@Beanpublic KafkaProperties twoKafkaProperties(){return new KafkaProperties();}}
(3)kakfka配置导入
@Configuration
@Import({OneKafkaConfig.class, TwoKafkaConfig.class})
public class KafkaConfig {@Beanpublic KafkaConsumer kafkaConsumer(){KafkaConsumer consumer = new KafkaConsumer();return consumer;}}
(4) 消费者监听
public class KafkaConsumer {@KafkaListener(containerFactory = "oneKafkaFactory", topics = "${sys.kafka.one.topic}")public void oneKafkaHandle(List<ConsumerRecord<String,String>> consumerRecords, Acknowledgment ack){//do somthing }@KafkaListener(containerFactory = "twoKafkaFactory", topics = "${sys.kafka.two.topic}")public void twoKafkaHandle(List<ConsumerRecord<String,String>> consumerRecords, Acknowledgment ack){//do somthing}}