一、场景复现
双kafka消费者配置
(1)第一个kafka
public class OneKafkaConfig {@Beanpublic KafkaListenerContainerFactory oneKafkaFactory(@Resource ConsumerFactory oneConsumerFactory) {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(oneConsumerFactory);factory.setConcurrency(10);factory.setBatchListener(true);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}@Beanpublic ConsumerFactory oneConsumerFactory(@Resource KafkaProperties oneKafkaProperties){return new DefaultKafkaConsumerFactory(oneKafkaProperties.buildConsumerProperties());}@ConfigurationProperties(prefix = "sys.kafka.one")@Beanpublic KafkaProperties oneKafkaProperties(){return new KafkaProperties();}
(2)第二个kafka
public class TwoKafkaConfig {@Beanpublic KafkaListenerContainerFactory twoKafkaFactory(@Resource ConsumerFactory twoConsumerFactory) {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(twoConsumerFactory);factory.setConcurrency(10);factory.setBatchListener(true);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}@Primary@Beanpublic ConsumerFactory twoConsumerFactory(@Resource KafkaProperties twoKafkaProperties){return new DefaultKafkaConsumerFactory(twoKafkaProperties.buildConsumerProperties());}@Primary@ConfigurationProperties(prefix = "sys.kafka.two")@Beanpublic KafkaProperties twoKafkaProperties(){return new KafkaProperties();}
}
启动项目,发现两个kafka的消费者都是指向同一个kafka,@Resource没有根据名字注入所以依赖的bean。
二、分析原因
从debug模式下中发现:twoKafkaProperties、twoConsumerFactory、twoKafkaFactory依次创建。然后先是TwoConsumerFactory先注入创建oneKafkaFactory,而twoKafkaProperties bean先注入创建oneConsumerFactory,最后oneKafkaProperties才开始创建。
原因:@Resource注入之前先根据名字查询bean,找不到才根据类型进行bean注入。因为twoConsumerFactory为@Primary优先级高先创建,oneKafkaFactory创建的时候oneConsumerFactory 还没有创建,所以根据类型注入了twoConsumerFactory。
建议:多个类型相同的bean创建不要使用@Resource,应该使用@Autowired + @Qualifier或者重新继承bean的类型。
三、改造方式
(1)@Autowiredv+ @Qualifier
public class OneKafkaConfig {@Beanpublic KafkaListenerContainerFactory oneKafkaFactory(@Autowired @Qualifier("oneConsumerFactory") ConsumerFactory oneConsumerFactory) {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(oneConsumerFactory);factory.setConcurrency(10);factory.setBatchListener(true);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}@Beanpublic ConsumerFactory oneConsumerFactory(@Autowired @Qualifier("oneKafkaProperties") KafkaProperties oneKafkaProperties){return new DefaultKafkaConsumerFactory(oneKafkaProperties.buildConsumerProperties());}@ConfigurationProperties(prefix = "sys.kafka.one")@Beanpublic KafkaProperties oneKafkaProperties(){return new KafkaProperties();}}
public class TwoKafkaConfig {@Beanpublic KafkaListenerContainerFactory twoKafkaFactory(@Autowired @Qualifier("twoConsumerFactory") ConsumerFactory twoConsumerFactory) {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(twoConsumerFactory);factory.setConcurrency(10);factory.setBatchListener(true);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}@Primary@Beanpublic ConsumerFactory twoConsumerFactory(@Autowired @Qualifier("twoKafkaProperties") KafkaProperties twoKafkaProperties){return new DefaultKafkaConsumerFactory(twoKafkaProperties.buildConsumerProperties());}@Primary@ConfigurationProperties(prefix = "sys.kafka.two")@Beanpublic KafkaProperties twoKafkaProperties(){return new KafkaProperties();}}
(2)重新继承bean的类型
public class OneKafkaConfig {@Beanpublic KafkaListenerContainerFactory oneKafkaFactory(@Autowired OneConsumerFactory oneConsumerFactory) {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(oneConsumerFactory);factory.setConcurrency(10);factory.setBatchListener(true);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}@Beanpublic OneConsumerFactory oneConsumerFactory(@Autowired OneKafkaProperties oneKafkaProperties){return new OneConsumerFactory(oneKafkaProperties.buildConsumerProperties());}@ConfigurationProperties(prefix = "sys.kafka.one")@Beanpublic OneKafkaProperties oneKafkaProperties(){return new OneKafkaProperties();}
public class TwoKafkaConfig {@Beanpublic KafkaListenerContainerFactory twoKafkaFactory(@Autowired TwoConsumerFactory twoConsumerFactory) {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(twoConsumerFactory);factory.setConcurrency(10);factory.setBatchListener(true);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}@Primary@Beanpublic TwoConsumerFactory twoConsumerFactory(@Autowired TwoKafkaProperties twoKafkaProperties){return new TwoConsumerFactory(twoKafkaProperties.buildConsumerProperties());}@Primary@ConfigurationProperties(prefix = "sys.kafka.two")@Beanpublic TwoKafkaProperties twoKafkaProperties(){return new TwoKafkaProperties();}
}