springboot集成kafka消费数据
文章目录 springboot集成kafka消费数据 1.引入pom依赖 2.添加配置文件 2.1.添加KafkaConsumerConfig.java 2.2.添加KafkaIotCustomProperties.java 2.3.添加application.yml配置 3.消费者代码
1.引入pom依赖
< dependency> < groupId> org.springframework.kafka</ groupId> < artifactId> spring-kafka</ artifactId> < version> 2.8.11</ version> </ dependency> < dependency> < groupId> org.apache.kafka</ groupId> < artifactId> kafka-clients</ artifactId> < version> 3.1.2</ version> </ dependency>
2.添加配置文件
2.1.添加KafkaConsumerConfig.java
@Configuration
@EnableConfigurationProperties ( KafkaIotCustomProperties . class )
@Slf4j
public class KafkaConsumerConfig { @Autowired KafkaIotCustomProperties kafkaIotCustomProperties; @Bean KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String , String > > kafkaListenerContainerFactory ( ) { ConcurrentKafkaListenerContainerFactory < String , String > factory = new ConcurrentKafkaListenerContainerFactory < > ( ) ; factory. setConsumerFactory ( consumerFactory ( ) ) ; factory. setConcurrency ( 3 ) ; factory. setBatchListener ( true ) ; ContainerProperties containerProperties = factory. getContainerProperties ( ) ; containerProperties. setAckMode ( ContainerProperties. AckMode . MANUAL_IMMEDIATE ) ; return factory; } private ConsumerFactory < String , String > consumerFactory ( ) { Map < String , Object > consumerConfigs = consumerConfigs ( ) ; log. info ( "消费者的配置信息:{}" , JSONObject . toJSONString ( consumerConfigs) ) ; return new DefaultKafkaConsumerFactory < > ( consumerConfigs) ; } @Bean public Map < String , Object > consumerConfigs ( ) { Map < String , Object > propsMap = new HashMap < > ( ) ; propsMap. put ( ConsumerConfig . BOOTSTRAP_SERVERS_CONFIG , kafkaIotCustomProperties. getBootstrapServers ( ) ) ; propsMap. put ( ConsumerConfig . ENABLE_AUTO_COMMIT_CONFIG , kafkaIotCustomProperties. isEnableAutoCommit ( ) ) ; propsMap. put ( ConsumerConfig . AUTO_COMMIT_INTERVAL_MS_CONFIG , kafkaIotCustomProperties. getAutoCommitInterval ( ) ) ; propsMap. put ( ConsumerConfig . SESSION_TIMEOUT_MS_CONFIG , kafkaIotCustomProperties. getSessionTimeOut ( ) ) ; propsMap. put ( ConsumerConfig . KEY_DESERIALIZER_CLASS_CONFIG , kafkaIotCustomProperties. getKeyDeserializer ( ) ) ; propsMap. put ( ConsumerConfig . VALUE_DESERIALIZER_CLASS_CONFIG , kafkaIotCustomProperties. getValueDeserializer ( ) ) ; propsMap. put ( ConsumerConfig . HEARTBEAT_INTERVAL_MS_CONFIG , kafkaIotCustomProperties. getHeartbeatInterval ( ) ) ; propsMap. put ( ConsumerConfig . GROUP_ID_CONFIG , kafkaIotCustomProperties. getGroupId ( ) ) ; propsMap. put ( ConsumerConfig . AUTO_OFFSET_RESET_CONFIG , kafkaIotCustomProperties. getAutoOffsetReset ( ) ) ; propsMap. put ( ConsumerConfig . MAX_POLL_RECORDS_CONFIG , kafkaIotCustomProperties. getMaxPollRecords ( ) ) ; propsMap. put ( ConsumerConfig . MAX_POLL_INTERVAL_MS_CONFIG , kafkaIotCustomProperties. getMaxPollInterval ( ) ) ; return propsMap; } }
2.2.添加KafkaIotCustomProperties.java
@Component
@ConfigurationProperties ( prefix = "fxyh.realdata.kafka" )
@Data
public class KafkaIotCustomProperties { private List < String > topics; private String groupId; private String sessionTimeOut; private String bootstrapServers; private String autoOffsetReset; private boolean enableAutoCommit; private String autoCommitInterval; private String fetchMinSize; private String fetchMaxWait; private String maxPollRecords; private String maxPollInterval; private String heartbeatInterval; private String keyDeserializer; private String valueDeserializer;
}
2.3.添加application.yml配置
fxyh : realdata : kafka : bootstrapServers : 192.168.80.251: 9092 topics : [ "test1" , "test2" ] groupId : shengtingrealdatagroupsessionTimeOut : 30000 enableAutoCommit : false autoCommitInterval : 1000 fetchMinSize : 1 fetchMaxWait : 500 maxPollRecords : 50 maxPollInterval : 300000 heartbeatInterval : 10000 keyDeserializer : org.apache.kafka.common.serialization.StringDeserializervalueDeserializer : org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset : latest
3.消费者代码
@Slf4j
@Component
public class DeviceDataConsumer { @Autowired private KafkaIotCustomProperties kafkaIotCustomProperties; @KafkaListener ( topics = { "#{@kafkaIotCustomProperties.topics}" } , groupId = "#{@kafkaIotCustomProperties.groupId}" , containerFactory = "kafkaListenerContainerFactory" , properties = { "#{@kafkaIotCustomProperties.autoOffsetReset}" } ) public void topicTest ( List < ConsumerRecord < String , String > > records, Acknowledgment ack) { for ( ConsumerRecord < String , String > record : records) { log. info ( "topic_test 消费了: Topic:" + record. topic ( ) + ",groupId:" + kafkaIotCustomProperties. getGroupId ( ) + ",Message:" + record. value ( ) ) ; ack. acknowledge ( ) ; } }
}