springboot整合kafka多数据源

整合kafka多数据源

  • 项目背景
  • 依赖
  • 配置
  • 生产者
  • 消费者
  • 消息体

项目背景

在很多与第三方公司对接的时候,或者处在不同的网络环境下,比如在互联网和政务外网的分布部署服务的时候,我们需要对接多台kafka来达到我们的业务需求,那么当kafka存在多数据源的情况,就与单机的情况有所不同。

依赖

    implementation 'org.springframework.kafka:spring-kafka:2.8.2'

配置

单机的情况
如果是单机的kafka我们直接通过springboot自动配置的就可以使用,例如在yml里面直接引用

spring:kafka:producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerbootstrap-servers: server001.bbd:9092

在使用的时候直接注入,然后就可以使用里面的方法了

    @Resourceprivate KafkaTemplate<String, String> kafkaTemplate;

在这里插入图片描述

多数据源情况下

本篇文章主要讲的是在多数据源下的使用,和单机的有所不同,我也看了网上的一些博客,但是当我去按照网上的配置的时候,总是会报错 kafakTemplate这个bean找不到,所以没办法只有按照springboot自动配置里面的来改
在这里插入图片描述

package com.ddb.zggz.config;import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;import java.io.IOException;@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConfiguration {private final KafkaProperties properties;private final KafkaSecondProperties kafkaSecondProperties;public KafkaConfiguration(KafkaProperties properties, KafkaSecondProperties kafkaSecondProperties) {this.properties = properties;this.kafkaSecondProperties = kafkaSecondProperties;}@Bean("kafkaTemplate")@Primarypublic KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,ProducerListener<Object, Object> kafkaProducerListener,ObjectProvider<RecordMessageConverter> messageConverter) {KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);messageConverter.ifUnique(kafkaTemplate::setMessageConverter);kafkaTemplate.setProducerListener(kafkaProducerListener);kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());return kafkaTemplate;}@Bean("kafkaSecondTemplate")public KafkaTemplate<?, ?> kafkaSecondTemplate(@Qualifier("kafkaSecondProducerFactory") ProducerFactory<Object, Object> kafkaProducerFactory,@Qualifier("kafkaSecondProducerListener") ProducerListener<Object, Object> kafkaProducerListener,ObjectProvider<RecordMessageConverter> messageConverter) {KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);messageConverter.ifUnique(kafkaTemplate::setMessageConverter);kafkaTemplate.setProducerListener(kafkaProducerListener);kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());return kafkaTemplate;}@Bean("kafkaProducerListener")@Primarypublic ProducerListener<Object, Object> kafkaProducerListener() {return new LoggingProducerListener<>();}@Bean("kafkaSecondProducerListener")public ProducerListener<Object, Object> kafkaSecondProducerListener() {return new LoggingProducerListener<>();}@Bean("kafkaConsumerFactory")@Primarypublic ConsumerFactory<Object, Object> kafkaConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));return factory;}@Bean("kafkaSecondConsumerFactory")public ConsumerFactory<Object, Object> kafkaSecondConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(this.kafkaSecondProperties.buildConsumerProperties());customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));return factory;}@Bean("zwKafkaContainerFactory")KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> zwKafkaContainerFactory(@Qualifier(value = "kafkaSecondConsumerFactory") ConsumerFactory<Object, Object> kafkaSecondConsumerFactory) {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(kafkaSecondConsumerFactory);factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Bean("kafkaProducerFactory")@Primarypublic ProducerFactory<Object, Object> kafkaProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(this.properties.buildProducerProperties());String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();if (transactionIdPrefix != null) {factory.setTransactionIdPrefix(transactionIdPrefix);}customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));return factory;}@Bean("kafkaSecondProducerFactory")public ProducerFactory<Object, Object> kafkaSecondProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(this.kafkaSecondProperties.buildProducerProperties());String transactionIdPrefix = this.kafkaSecondProperties.getProducer().getTransactionIdPrefix();if (transactionIdPrefix != null) {factory.setTransactionIdPrefix(transactionIdPrefix);}customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));return factory;}@Bean@ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {return new KafkaTransactionManager<>(producerFactory);}@Bean@ConditionalOnProperty(name = "spring.kafka.jaas.enabled")public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();KafkaProperties.Jaas jaasProperties = this.properties.getJaas();if (jaasProperties.getControlFlag() != null) {jaas.setControlFlag(jaasProperties.getControlFlag());}if (jaasProperties.getLoginModule() != null) {jaas.setLoginModule(jaasProperties.getLoginModule());}jaas.setOptions(jaasProperties.getOptions());return jaas;}@Bean("kafkaAdmin")@Primarypublic KafkaAdmin kafkaAdmin() {KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());return kafkaAdmin;}}

生产者


package com.ddb.zggz.event;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;import com.ddb.zggz.config.ApplicationConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;import javax.annotation.Resource;@Component
@Slf4j
public class KafkaPushEvent {@Resourceprivate KafkaTemplate<String, String> kafkaSecondTemplate;@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;@Autowiredprivate ApplicationConfiguration configuration;public void pushEvent(PushParam param) {ListenableFuture<SendResult<String, String>> sendResultListenableFuture = null;if ("zw".equals(configuration.getEnvironment())){sendResultListenableFuture = kafkaSecondTemplate.send(configuration.getPushTopic(), JSON.toJSONString(param));}if ("net".equals(configuration.getEnvironment())){sendResultListenableFuture = kafkaTemplate.send(configuration.getPushTopic(), JSON.toJSONString(param));}if (sendResultListenableFuture == null){throw new IllegalArgumentException("kakfa发送消息失败");}sendResultListenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable ex) {log.error("kafka发送的message报错,发送数据:{}", param);}@Overridepublic void onSuccess(SendResult<String, String> result) {log.info("kafka发送的message成功,发送数据:{}", param);}});}}

消费者

package com.ddb.zggz.event;import com.alibaba.fastjson.JSONObject;import com.ddb.zggz.config.ApplicationConfiguration;
import com.ddb.zggz.model.dto.ApprovalDTO;
import com.ddb.zggz.param.OffShelfParam;
import com.ddb.zggz.service.GzApprovalService;
import com.ddb.zggz.service.GzServiceService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;import java.util.ArrayList;
import java.util.List;
import java.util.Objects;@Component
@Slf4j
public class SendMessageListener {@Autowiredprivate GzApprovalService gzApprovalService;@Autowiredprivate GzServiceService gzServiceService;@KafkaListener(topics = "${application.config.push-topic}", groupId = "zggz",containerFactory = "zwKafkaContainerFactory")@RetryableTopic(include = {Exception.class},backoff = @Backoff(delay = 3000, multiplier = 1.5, maxDelay = 15000))public void listen(ConsumerRecord<?, ?> consumerRecord) {String value = (String) consumerRecord.value();PushParam pushParam = JSONObject.parseObject(value, PushParam.class);//版本提审if ("version-approval".equals(pushParam.getEvent())) {ApprovalDTO approvalDTO = JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()), ApprovalDTO.class);gzApprovalService.approval(approvalDTO);}//服务下架if (pushParam.getEvent().equals("server-OffShelf-gzt")) {OffShelfParam offShelfParam = JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()), OffShelfParam.class);gzServiceService.offShelfV1(offShelfParam.getReason(), null, offShelfParam.getUserName(), "ZGGZ", offShelfParam.getH5Id(), offShelfParam.getAppId(), offShelfParam.getVersion());}}@DltHandlerpublic void processMessage(String message) {}
}

消息体

package com.ddb.zggz.event;import com.alibaba.fastjson.annotation.JSONField;
import com.ddb.zggz.model.GzH5VersionManage;
import com.ddb.zggz.model.GzService;
import com.ddb.zggz.model.dto.ApprovalDTO;
import com.ddb.zggz.param.OffShelfParam;
import com.ddb.zggz.param.PublishParam;
import com.ddb.zggz.param.ReviewAndRollback;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.Data;import java.io.Serializable;
import java.time.LocalDateTime;/*** @author bbd*/
@Data
public class PushParam implements Serializable {/*** 发送的消息数据*/private Object data;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@JsonSerialize(using = LocalDateTimeSerializer.class)@JsonDeserialize(using = LocalDateTimeDeserializer.class)@JSONField(format = "yyyy-MM-dd HH:mm:ss")private LocalDateTime createTime = LocalDateTime.now();/*** 事件名称,用于消费者处理相关业务*/private String event;/*** 保存版本参数*/public static PushParam toKafkaVersion(GzH5VersionManage gzH5VersionManage) {PushParam pushParam = new PushParam();pushParam.setData(gzH5VersionManage);pushParam.setEvent("save-version");return pushParam;}/*** 保存服务参数*/public static PushParam toKafkaServer(GzService gzService) {PushParam pushParam = new PushParam();pushParam.setData(gzService);pushParam.setEvent("save-server");return pushParam;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/38162.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Vue-Router】路由过渡动效

在 Vue Router 中&#xff0c;你可以通过过渡动效&#xff08;Transition Effects&#xff09;为路由切换添加平滑的过渡效果&#xff0c;从而提升用户体验。过渡动效可以使用 Vue 的 <transition> 组件和 CSS 过渡来实现。 基本使用&#xff1a; 对导航使用动画&#…

HTML-文本标签

历史上&#xff0c;网页的主要功能是文本展示。所以&#xff0c;HTML 提供了大量的文本处理标签。 <div> <div>是一个通用标签&#xff0c;表示一个区块&#xff08;division&#xff09;。它没有语义&#xff0c;如果网页需要一个块级元素容器&#xff0c;又没有…

leetcode 494. 目标和

2023.8.14 一杯茶&#xff0c;一包烟&#xff0c;一道dp做一天... ps&#xff1a;nums[i]均大于等于0。本题先转化为0-1背包问题&#xff1a;将数组元素分成两堆&#xff1a;一堆为正号&#xff0c;另一堆为负号。设正号堆的和为x&#xff0c;则负号堆的和为sum-x。&#xff08…

CentOS系统环境搭建(十)——CentOS7定时任务

centos系统环境搭建专栏&#x1f517;点击跳转 使用CentOS系统环境搭建&#xff08;九&#xff09;——centos系统下使用docker部署项目的项目做定时任务。 CentOS7定时任务 查看现有的定时任务 crontab -l编辑定时任务 crontab -e示例 每天凌晨两点运行脚本 清理内存 0 2 *…

【Linux的开胃小菜】常用的RPM软件包与YUM仓库包管理器使用

一、系统初始化进程 systemd与System V init的区别以及作用&#xff1a; System V init运行级别systemd目标名称systemd目标作用0poweroff.target关机1rescue.target单用户模式2multi-user.target多用户的文本界面3multi-user.target多用户的文本界面4multi-user.target多用户…

【SpringBoot】88、SpringBoot中使用Undertow替代Tomcat容器

SpringBoot 中我们既可以使用 Tomcat 作为 Http 服务,也可以用 Undertow 来代替。Undertow 在高并发业务场景中,性能优于 Tomcat。所以,如果我们的系统是高并发请求,不妨使用一下 Undertow,你会发现你的系统性能会得到很大的提升。 1、Tomcat 介绍 Tomcat是一个开源的Ja…

【数据结构】“单链表”的练习题(二)

&#x1f490; &#x1f338; &#x1f337; &#x1f340; &#x1f339; &#x1f33b; &#x1f33a; &#x1f341; &#x1f343; &#x1f342; &#x1f33f; &#x1f344;&#x1f35d; &#x1f35b; &#x1f364; &#x1f4c3;个人主页 &#xff1a;阿然成长日记 …

Django框架 靓号管理(增删改查)

Django框架 靓号管理&#xff08;增删改查&#xff09; 新建一个项目 backend 使用pycharm创建app startapp app项目目录 C:\code\backend ├── app | ├── admin.py | ├── apps.py | ├── migrations | ├── models.py | ├── tests.py | ├── views.…

关于微信临时文件wxfile://tmp文件如何处理,微信小程序最新获取头像和昵称

分享-2023年资深前端进阶&#xff1a;前端登顶之巅-最全面的前端知识点梳理总结&#xff0c;前端之巅 *分享一个使用比较久的&#x1fa9c; 技术栈&#xff1a;taro框架 vue3版本 解决在微信小程序获取微信头像时控制台报错&#xff1a;找不着wxfile://tmp 文件路径,失败&…

java spring cloud 企业电子招标采购系统源码:营造全面规范安全的电子招投标环境,促进招投标市场健康可持续发展 tbms

​ 项目说明 随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大&#xff0c;公司对内部招采管理的提升提出了更高的要求。在企业里建立一个公平、公开、公正的采购环境&#xff0c;最大限度控制采购成本至关重要。符合国家电子招投标法律法规及相关规范&#xff0c;以…

支持M1 Syncovery for mac 文件备份同步工具

Syncovery for Mac 是一款功能强大、易于使用的文件备份和同步软件&#xff0c;适用于需要备份和同步数据的个人用户和企业用户。Syncovery 提供了一个直观的用户界面&#xff0c;使用户可以轻松设置备份和同步任务。用户可以选择备份的文件类型、备份目录、备份频率等&#xf…

解读2023年上半年财报:营收净利双增长,珀莱雅离高端还有多远?

夏季炎热&#xff0c;防晒类产品的销量暴涨。根据千牛数据&#xff0c;防晒衣今年5月全网搜索人数同比增长15%&#xff0c;加购人数同比增长29.8%&#xff0c;访问人数同比增加42%。消费者狂热的防晒需求&#xff0c;孕育着巨大的商机&#xff0c;许多企业开始瞄准这一机会。而…

在Windows和MacOS环境下实现批量doc转docx,xls转xlsx

一、引言 Python中批量进行办公文档转化是常见的操作&#xff0c;在windows状态下我们可以利用changeOffice这个模块很快进行批量操作。 二、在Windows环境下的解决文案 Windows环境下&#xff0c;如何把doc转化为docx&#xff0c;xls转化为xlsx&#xff1f; 首先&#xff…

mysql三大日志—— 二进制日志binlog

binlog用于记录数据库执行的写入性操作&#xff0c;由服务层进行记录&#xff0c;通过追加的方式以二进制的形式保存在磁盘中。 binlog主要用于主从复制和数据恢复。 主从复制&#xff1a;在主机端开启binlog&#xff0c;然后将binlog发送到各个从机&#xff0c;从机存放binl…

sykwalking8.2和mysql5.7快速部署

1.SkyWalking 是什么&#xff1f; 分布式系统的应用程序性能监视工具&#xff0c;专为微服务、云原生架构和基于容器&#xff08;Docker、K8s、Mesos&#xff09;架构而设计。 提供分布式追踪、服务网格遥测分析、度量聚合和可视化一体化解决方案。 2.SkyWalking 有哪些功能…

Spring Task入门案例

Spring Task 是Spring框架提供的任务调度工具&#xff0c;可以按照约定的时间自动执行某个代码逻辑。 定位&#xff1a;定时任务框架 作用&#xff1a;定时自动执行某段Java代码 强调&#xff1a;只要是需要定时处理的场景都可以使用Spring Task 1. cron表达式 cron表达式…

Java多线程线程间的通信—wait及notify方法

线程间的相互作用 线程间的相互作用:线程之间需要一些协调通信,来共同完成一件任务。 Object类中相关的方法有两个notify方法和三个wait方法: Object (Java Platform SE 7 ) 因为wait和notify方法定义在Object类中,因此会被所有的类所继承。 这些方法都是final的,即它们…

树形dp模板

285. 没有上司的舞会 - AcWing题库 #include<iostream> #include<cstdio> #include<cstdlib> #include<string> #include<cstring> #include<cmath> #include<ctime> #include<algorithm> #include<utility> #include&…

Visual Studio 与QT ui文件

对.ui文件鼠标右键&#xff0c;然后单击 Open with…在弹出的窗口中&#xff0c;选中左侧的 Qt Designer&#xff0c;然后单击右侧的 Add 按钮&#xff0c;随后会弹出一个窗口&#xff0c;在 Program: 输入框中输入 Qt Designer 的路径&#xff0c;最后单击 OK找到 Qt Designer…

内网ip与外网ip

一、关于IP地址 我们平时直接接触最多的是内网IP。而且还可以自己手动修改ip地址。而外网ip&#xff0c;我们很少直接接触&#xff0c;都是间接接触、因为外网ip一般都是运营商管理&#xff0c;而且是全球唯一的&#xff0c;一般我们自己是无法修改的。 内网IP和外网IP是指在…