整合kafka多数据源
- 项目背景
- 依赖
- 配置
- 生产者
- 消费者
- 消息体
项目背景
在很多与第三方公司对接的时候,或者处在不同的网络环境下,比如在互联网和政务外网的分布部署服务的时候,我们需要对接多台kafka来达到我们的业务需求,那么当kafka存在多数据源的情况,就与单机的情况有所不同。
依赖
implementation 'org.springframework.kafka:spring-kafka:2.8.2'
配置
单机的情况
如果是单机的kafka我们直接通过springboot自动配置的就可以使用,例如在yml里面直接引用
spring:kafka:producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerbootstrap-servers: server001.bbd:9092
在使用的时候直接注入,然后就可以使用里面的方法了
@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;
多数据源情况下
本篇文章主要讲的是在多数据源下的使用,和单机的有所不同,我也看了网上的一些博客,但是当我去按照网上的配置的时候,总是会报错 kafakTemplate
这个bean
找不到,所以没办法只有按照springboot自动配置里面的来改
package com.ddb.zggz.config;import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;import java.io.IOException;@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConfiguration {private final KafkaProperties properties;private final KafkaSecondProperties kafkaSecondProperties;public KafkaConfiguration(KafkaProperties properties, KafkaSecondProperties kafkaSecondProperties) {this.properties = properties;this.kafkaSecondProperties = kafkaSecondProperties;}@Bean("kafkaTemplate")@Primarypublic KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,ProducerListener<Object, Object> kafkaProducerListener,ObjectProvider<RecordMessageConverter> messageConverter) {KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);messageConverter.ifUnique(kafkaTemplate::setMessageConverter);kafkaTemplate.setProducerListener(kafkaProducerListener);kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());return kafkaTemplate;}@Bean("kafkaSecondTemplate")public KafkaTemplate<?, ?> kafkaSecondTemplate(@Qualifier("kafkaSecondProducerFactory") ProducerFactory<Object, Object> kafkaProducerFactory,@Qualifier("kafkaSecondProducerListener") ProducerListener<Object, Object> kafkaProducerListener,ObjectProvider<RecordMessageConverter> messageConverter) {KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);messageConverter.ifUnique(kafkaTemplate::setMessageConverter);kafkaTemplate.setProducerListener(kafkaProducerListener);kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());return kafkaTemplate;}@Bean("kafkaProducerListener")@Primarypublic ProducerListener<Object, Object> kafkaProducerListener() {return new LoggingProducerListener<>();}@Bean("kafkaSecondProducerListener")public ProducerListener<Object, Object> kafkaSecondProducerListener() {return new LoggingProducerListener<>();}@Bean("kafkaConsumerFactory")@Primarypublic ConsumerFactory<Object, Object> kafkaConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));return factory;}@Bean("kafkaSecondConsumerFactory")public ConsumerFactory<Object, Object> kafkaSecondConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(this.kafkaSecondProperties.buildConsumerProperties());customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));return factory;}@Bean("zwKafkaContainerFactory")KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> zwKafkaContainerFactory(@Qualifier(value = "kafkaSecondConsumerFactory") ConsumerFactory<Object, Object> kafkaSecondConsumerFactory) {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(kafkaSecondConsumerFactory);factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Bean("kafkaProducerFactory")@Primarypublic ProducerFactory<Object, Object> kafkaProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(this.properties.buildProducerProperties());String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();if (transactionIdPrefix != null) {factory.setTransactionIdPrefix(transactionIdPrefix);}customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));return factory;}@Bean("kafkaSecondProducerFactory")public ProducerFactory<Object, Object> kafkaSecondProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(this.kafkaSecondProperties.buildProducerProperties());String transactionIdPrefix = this.kafkaSecondProperties.getProducer().getTransactionIdPrefix();if (transactionIdPrefix != null) {factory.setTransactionIdPrefix(transactionIdPrefix);}customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));return factory;}@Bean@ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {return new KafkaTransactionManager<>(producerFactory);}@Bean@ConditionalOnProperty(name = "spring.kafka.jaas.enabled")public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();KafkaProperties.Jaas jaasProperties = this.properties.getJaas();if (jaasProperties.getControlFlag() != null) {jaas.setControlFlag(jaasProperties.getControlFlag());}if (jaasProperties.getLoginModule() != null) {jaas.setLoginModule(jaasProperties.getLoginModule());}jaas.setOptions(jaasProperties.getOptions());return jaas;}@Bean("kafkaAdmin")@Primarypublic KafkaAdmin kafkaAdmin() {KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());return kafkaAdmin;}}
生产者
package com.ddb.zggz.event;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;import com.ddb.zggz.config.ApplicationConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;import javax.annotation.Resource;@Component
@Slf4j
public class KafkaPushEvent {@Resourceprivate KafkaTemplate<String, String> kafkaSecondTemplate;@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;@Autowiredprivate ApplicationConfiguration configuration;public void pushEvent(PushParam param) {ListenableFuture<SendResult<String, String>> sendResultListenableFuture = null;if ("zw".equals(configuration.getEnvironment())){sendResultListenableFuture = kafkaSecondTemplate.send(configuration.getPushTopic(), JSON.toJSONString(param));}if ("net".equals(configuration.getEnvironment())){sendResultListenableFuture = kafkaTemplate.send(configuration.getPushTopic(), JSON.toJSONString(param));}if (sendResultListenableFuture == null){throw new IllegalArgumentException("kakfa发送消息失败");}sendResultListenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable ex) {log.error("kafka发送的message报错,发送数据:{}", param);}@Overridepublic void onSuccess(SendResult<String, String> result) {log.info("kafka发送的message成功,发送数据:{}", param);}});}}
消费者
package com.ddb.zggz.event;import com.alibaba.fastjson.JSONObject;import com.ddb.zggz.config.ApplicationConfiguration;
import com.ddb.zggz.model.dto.ApprovalDTO;
import com.ddb.zggz.param.OffShelfParam;
import com.ddb.zggz.service.GzApprovalService;
import com.ddb.zggz.service.GzServiceService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;import java.util.ArrayList;
import java.util.List;
import java.util.Objects;@Component
@Slf4j
public class SendMessageListener {@Autowiredprivate GzApprovalService gzApprovalService;@Autowiredprivate GzServiceService gzServiceService;@KafkaListener(topics = "${application.config.push-topic}", groupId = "zggz",containerFactory = "zwKafkaContainerFactory")@RetryableTopic(include = {Exception.class},backoff = @Backoff(delay = 3000, multiplier = 1.5, maxDelay = 15000))public void listen(ConsumerRecord<?, ?> consumerRecord) {String value = (String) consumerRecord.value();PushParam pushParam = JSONObject.parseObject(value, PushParam.class);//版本提审if ("version-approval".equals(pushParam.getEvent())) {ApprovalDTO approvalDTO = JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()), ApprovalDTO.class);gzApprovalService.approval(approvalDTO);}//服务下架if (pushParam.getEvent().equals("server-OffShelf-gzt")) {OffShelfParam offShelfParam = JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()), OffShelfParam.class);gzServiceService.offShelfV1(offShelfParam.getReason(), null, offShelfParam.getUserName(), "ZGGZ", offShelfParam.getH5Id(), offShelfParam.getAppId(), offShelfParam.getVersion());}}@DltHandlerpublic void processMessage(String message) {}
}
消息体
package com.ddb.zggz.event;import com.alibaba.fastjson.annotation.JSONField;
import com.ddb.zggz.model.GzH5VersionManage;
import com.ddb.zggz.model.GzService;
import com.ddb.zggz.model.dto.ApprovalDTO;
import com.ddb.zggz.param.OffShelfParam;
import com.ddb.zggz.param.PublishParam;
import com.ddb.zggz.param.ReviewAndRollback;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.Data;import java.io.Serializable;
import java.time.LocalDateTime;/*** @author bbd*/
@Data
public class PushParam implements Serializable {/*** 发送的消息数据*/private Object data;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@JsonSerialize(using = LocalDateTimeSerializer.class)@JsonDeserialize(using = LocalDateTimeDeserializer.class)@JSONField(format = "yyyy-MM-dd HH:mm:ss")private LocalDateTime createTime = LocalDateTime.now();/*** 事件名称,用于消费者处理相关业务*/private String event;/*** 保存版本参数*/public static PushParam toKafkaVersion(GzH5VersionManage gzH5VersionManage) {PushParam pushParam = new PushParam();pushParam.setData(gzH5VersionManage);pushParam.setEvent("save-version");return pushParam;}/*** 保存服务参数*/public static PushParam toKafkaServer(GzService gzService) {PushParam pushParam = new PushParam();pushParam.setData(gzService);pushParam.setEvent("save-server");return pushParam;}