kafka 配置类
用途:定义使用的基本 kafka 配置,以及定义Bean
下面文件是读取本地 spring 的标准配置文件的类,用于一般属性获取等操作
import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;@Component
@ConfigurationProperties(prefix = "my.kafka")
@Data
public class MyTaskKafkaProperties {/**r* kafka地址*/private String serverUrl;/*** groupId*/private String groupId;/*** topic*/private String topic;private boolean enableAutoCommit;private String autoOffsetReset;@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(6);factory.getContainerProperties().setPollTimeout(6000);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}private ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}private Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);return props;}
}
@Data 为其他用于控制get set 方法的,与 此处配置不是强关联,可以没有
实际 kafka 监听消费
import com.dtdream.dthink.dtalent.dmall.openplat.service.opendata.OpenDataService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.util.Optional;@Slf4j
@ConditionalOnProperty(name = "my.kafka.enable", havingValue = "true")
@Component
public class MyTaskConsumer {@Autowiredprivate XxxxxService xxxxxService;@KafkaListener(topics = "${my.kafka.topic}", groupId = "${my.kafka.groupId}",containerFactory = "kafkaTwoContainerFactory")public void dxpTaskEnd(ConsumerRecord<String, String> record, Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {consume(record, ack, topic, msg -> xxxxxService.xxxxxxx(msg));}private void consume(ConsumerRecord<String, String> record, Acknowledgment ack, String topic,java.util.function.Consumer<String> consumer) {Optional<String> optional = Optional.ofNullable(record.value());if (!optional.isPresent()) {log.warn("kafka收到消息 但为空,record:{}", record);return;}String msg = optional.get();log.info("kafka收到消息 开始消费 topic:{},msg:{}", topic, msg);try {consumer.accept(msg);// 上面方法执行成功后手动提交ack.acknowledge();log.info("kafka收到消息消费成功 topic:{},msg:{}", topic, msg);} catch (Exception e) {log.error("kafka消费消息失败 topic:{},msg:{}", topic, msg, e);}}
}
@ConditionalOnProperty spring boot 用于判断当前类是否加载的条件
XxxxxService: 为我们的业务服务层,用于消费消息