搭建大型分布式服务(三十七)SpringBoot 整合多个kafka数据源-取消限定符

系列文章目录


文章目录

  • 系列文章目录
  • 前言
      • 一、本文要点
      • 二、开发环境
      • 三、原项目
      • 四、修改项目
      • 五、测试一下
      • 五、小结


前言

本插件稳定运行上百个kafka项目,每天处理上亿级的数据的精简小插件,快速上手。

<dependency><groupId>io.github.vipjoey</groupId><artifactId>multi-kafka-consumer-starter</artifactId><version>最新版本号</version>
</dependency>

例如下面这样简单的配置就完成SpringBoot和kafka的整合,我们只需要关心com.mmc.multi.kafka.starter.OneProcessorcom.mmc.multi.kafka.starter.TwoProcessor 这两个Service的代码开发。

## topic1的kafka配置
spring.kafka.one.enabled=true
spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.one.topic=mmc-topic-one
spring.kafka.one.group-id=group-consumer-one
spring.kafka.one.processor=com.mmc.multi.kafka.starter.OneProcessor // 业务处理类名称
spring.kafka.one.consumer.auto-offset-reset=latest
spring.kafka.one.consumer.max-poll-records=10
spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer## topic2的kafka配置
spring.kafka.two.enabled=true
spring.kafka.two.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.two.topic=mmc-topic-two
spring.kafka.two.group-id=group-consumer-two
spring.kafka.two.processor=com.mmc.multi.kafka.starter.TwoProcessor // 业务处理类名称
spring.kafka.two.consumer.auto-offset-reset=latest
spring.kafka.two.consumer.max-poll-records=10
spring.kafka.two.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.two.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

国籍惯例,先上源码:Github源码

一、本文要点

本文将介绍通过封装一个starter,来实现多kafka数据源的配置,通过通过源码,可以学习以下特性。系列文章完整目录

  • SpringBoot 整合多个kafka数据源
  • SpringBoot 批量消费kafka消息
  • SpringBoot 优雅地启动或停止消费kafka
  • SpringBoot kafka本地单元测试(免集群)
  • SpringBoot 利用map注入多份配置
  • SpringBoot BeanPostProcessor 后置处理器使用方式
  • SpringBoot 将自定义类注册到IOC容器
  • SpringBoot 注入bean到自定义类成员变量
  • Springboot 取消限定符

二、开发环境

  • jdk 1.8
  • maven 3.6.2
  • springboot 2.4.3
  • kafka-client 2.6.6
  • idea 2020

三、原项目

1、接前文,我们开发了一个kafka插件,但在使用过程中发现有些不方便的地方,例如我们所有processor需要继承MmcKafkaKafkaAbastrctProcessor<T extends MmcKafkaMsg> ,其中的T为反序列化的实体类类型。


@Slf4j
@Service
public class OneProcessor extends MmcKafkaKafkaAbastrctProcessor<DemoMsg> {@Resourceprivate DemoService demoService;@Overrideprotected Class<DemoMsg> getEntityClass() {return DemoMsg.class;}@Overrideprotected void dealMessage(List<DemoMsg> datas) {demoService.dealMessage("one", datas.stream().map(x -> (MmcKafkaMsg) x).collect(Collectors.toList()));}}@Slf4j
@Service
public class TwoProcessor extends MmcKafkaKafkaAbastrctProcessor<DemoMsg> {@Resourceprivate DemoService demoService;public TwoProcessor() {}@Overrideprotected Class<DemoMsg> getEntityClass() {return DemoMsg.class;}@Overrideprotected void dealMessage(List<DemoMsg> datas) {demoService.dealMessage("two", datas.stream().map(x -> (MmcKafkaMsg) x).collect(Collectors.toList()));}}

2、可以看到这里有两个体验不太好的地方。

  • 自定义实体类DemoMsg 必须要继承 MmcKafkaMsg,很多同学会忘记这个步骤;
  • 需要覆盖getEntityClass()父类方法,用于反序列化指定实体类的类型,这里太冗余;

因此、所以我们要升级和优化。

四、修改项目

1、取消限定符,消息实体类不再强制要求实现MmcKafkaMsg接口,改为可选项,作为候选插件化的能力增强(后文介绍);

@Data
class DemoMsg {private String routekey;private String name;private Long timestamp;}

2、修改MmcKafkaKafkaAbastrctProcessor类,取消限定符并增加类型推断方法。

a、如果实现了MmcKafkaMsg接口,就拥有了单次消费内的batch数据去重能力;

public void onMessage(List<ConsumerRecord<String, String>> records) {if (null == records || CollectionUtils.isEmpty(records)) {log.warn("{} records is null or records.value is empty.", name);return;}Assert.hasText(name, "You must pass the field `name` to the Constructor or invoke the setName() after the class was created.");Assert.notNull(properties, "You must pass the field `properties` to the Constructor or invoke the setProperties() after the class was created.");try {Stream<T> dataStream = records.stream().map(ConsumerRecord::value).flatMap(this::doParse).filter(Objects::nonNull).filter(this::isRightRecord);// 支持配置强制去重或实现了接口能力去重if (properties.isDuplicate() || isSubtypeOfInterface(MmcKafkaMsg.class)) {// 检查是否实现了去重接口if (!isSubtypeOfInterface(MmcKafkaMsg.class)) {throw new RuntimeException("The interface "+ MmcKafkaMsg.class.getName() + " is not implemented if you set the config `spring.kafka.xxx.duplicate=true` .");}dataStream = dataStream.collect(Collectors.groupingBy(this::buildRoutekey)).entrySet().stream().map(this::findLasted).filter(Objects::nonNull);}List<T> datas = dataStream.collect(Collectors.toList());if (CommonUtil.isNotEmpty(datas)) {this.dealMessage(datas);}} catch (Exception e) {log.error(name + "-dealMessage error ", e);}}

b、新增类型推断方法,目的是去掉子类必须实现getEntityClass()的约束;

    protected boolean isSubtypeOfInterface(Class<?> interfaceClass) {if (null == type) {Type superClass = getClass().getGenericSuperclass();if (superClass instanceof ParameterizedType) {ParameterizedType parameterizedType = (ParameterizedType) superClass;Type[] typeArguments = parameterizedType.getActualTypeArguments();if (typeArguments.length > 0 && typeArguments[0] instanceof Class) {//noinspection uncheckedtype = (Class<T>) typeArguments[0];}}}return (null != type) && interfaceClass.isAssignableFrom(type);}protected  Class<T> getEntityClass() {if (null == type) {synchronized(this) {Type superClass = getClass().getGenericSuperclass();if (superClass instanceof ParameterizedType) {ParameterizedType parameterizedType = (ParameterizedType) superClass;Type[] typeArguments = parameterizedType.getActualTypeArguments();if (typeArguments.length > 0 && typeArguments[0] instanceof Class) {//noinspection uncheckedtype = (Class<T>) typeArguments[0];}}}}return type;}

c、修改去重方法,也就是取批次内最新一条消息,不再使用限定符;

    protected T findLasted(Map.Entry<String, List<T>> entry) {try {Optional<T> d = entry.getValue().stream().max(Comparator.comparing(x -> ((PandoKafkaMsg) x).getRoutekey()));if (d.isPresent()) {return d.get();}} catch (Exception e) {String content = JsonUtil.toJsonStr(entry.getValue());log.error("处理消息出错:{}", e.getMessage() + ": " + content, e);}return null;}protected String buildRoutekey(T t) {return ((MmcKafkaMsg) t).getRoutekey();}

3、修改MmcKafkaBeanPostProcessor,取消限定符。

public class MmcKafkaBeanPostProcessor implements BeanPostProcessor {@Getterprivate final Map<String, MmcKafkaKafkaAbastrctProcessor<?>> suitableClass = new ConcurrentHashMap<>();@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if (bean instanceof MmcKafkaKafkaAbastrctProcessor) {MmcKafkaKafkaAbastrctProcessor<?> target = (MmcKafkaKafkaAbastrctProcessor<?>) bean;suitableClass.putIfAbsent(beanName, target);suitableClass.putIfAbsent(bean.getClass().getName(), target);}return bean;}
}

4、修改MmcKafkaProcessorFactory,取消限定符。

五、测试一下

1、引入kafka测试需要的jar。参考文章:kafka单元测试

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency>

2、定义一个消息实体和业务处理类。

@Data
class DemoMsg  {private String routekey;private String name;private Long timestamp;}
@Slf4j
@Service
public class OneProcessor extends MmcKafkaKafkaAbastrctProcessor<DemoMsg> {@Resourceprivate DemoService demoService;@Overrideprotected void dealMessage(List<DemoMsg> datas) {datas.forEach(x -> {log.info("dealMessage one: {}", x);});}}

3、配置kafka地址和指定业务处理类。

spring.kafka.one.enabled=true
spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.one.topic=mmc-topic-one
spring.kafka.one.group-id=group-consumer-one
spring.kafka.one.processor=com.mmc.multi.kafka.starter.OneProcessor  // 业务处理类名称
spring.kafka.one.consumer.auto-offset-reset=latest
spring.kafka.one.consumer.max-poll-records=10
spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

4、编写测试类。

@Slf4j
@ActiveProfiles("dev")
@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = {MmcMultiConsumerAutoConfiguration.class, DemoService.class, OneProcessor.class})
@TestPropertySource(value = "classpath:application.properties")
@DirtiesContext
@EmbeddedKafka(topics = {"${spring.kafka.one.topic}"})
class AppTest {@Resourceprivate EmbeddedKafkaBroker embeddedKafkaBroker;@Value("${spring.kafka.one.topic}")private String topicOne;@Value("${spring.kafka.two.topic}")private String topicTwo;@Testvoid testDealMessage() throws Exception {// 模拟生产数据produceMessage();Thread.sleep(10 * 1000);}void produceMessage() {Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();for (int i = 0; i < 10; i++) {DemoMsg msg = new DemoMsg();msg.setRoutekey("routekey" + i);msg.setName("name" + i);msg.setTimestamp(System.currentTimeMillis());String json = JsonUtil.toJsonStr(msg);producer.send(new ProducerRecord<>(topicOne, "my-aggregate-id", json));producer.send(new ProducerRecord<>(topicTwo, "my-aggregate-id", json));producer.flush();}}
}

5、运行一下,测试通过。
在这里插入图片描述

五、小结

将本项目代码构建成starter,就可以大大提升我们开发效率,我们只需要关心业务代码的开发,github项目源码:轻触这里。如果对你有用可以打个星星哦。下一篇,升级本starter,在kafka单分区下实现十万级消费处理速度。

加我加群一起交流学习!更多干货下载、项目源码和大厂内推等着你

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/5399.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Matlab生成txt文件导入到Vivado仿真

Matlab处理数据并将其写入txt文件 %% Txt Generate pre_RS_datadec2bin(simDataIn,8); %将数据转化为8bit的二进制 fidfopen("F:\FPGA\Xilinx_vivado\project\dvbstestbench\dbvs\matlab\pre_RS_data.txt","wt"); for i1:n*nMessages %数据…

Ubuntu C++ man手册安装及使用

Ubuntu下C++ man手册安装 C++在线文档: http://www.cplusplus.com/reference/ 第一种办法:使用cppman $ sudo apt install cppman 使用方法 第二种办法: 打开网页:GCC mirror sites- GNU Project 点击下图中的突显行链接: Russia, Novosibirsk:

使用UmcFramework和unimrcpclient.xml连接多个SIP设置的配置指南及C代码示例

使用UmcFramework和unimrcpclient.xml连接多个SIP设置的配置指南及C代码示例 引言1. UniMRCP和UmcFramework简介2. 准备工作3. unimrcpclient.xml配置文件3.1 定义SIP设置3.2 定义MRCP会话配置文件 4. C代码示例5. 测试和验证6. 故障排查7. 结论8. 参考文献 引言 在多媒体通信…

小剧场短剧影视小程序源码_后端PHP

项目运行截图 源码贡献 https://githubs.xyz/boot?app42 部署说明 linux/win任选 PHP版本&#xff1a;7.3/7.2&#xff08;测试时我用的7.2要安装sg扩展 &#xff09; 批量替换域名http://video.owoii.com更换为你的 批量替换域名http://120.79.77.163:1更换为你的 这两个…

微服务之SpringCloud AlibabaSeata处理分布式事务

一、概述 1.1背景 一次业务操作需要跨多个数据源或需要跨多个系统进行远程调用&#xff0c;就会产生分布式事务问题 but 关系型数据库提供的能力是基于单机事务的&#xff0c;一旦遇到分布式事务场景&#xff0c;就需要通过更多其他技术手段来解决问题。 全局事务&#xff1a;…

【论文阅读】ChipNeMo中的数据集处理

前面总体学习了《ChipNeMo: Domain-Adapted LLMs for Chip Design》&#xff0c;然后又继续仔细看了论文中的领域适配分词和领域数据微调的预训练检索模型&#xff0c;对于数据集的处理&#xff0c;也需要仔细看一下。 提炼重点&#xff1a;1&#xff09;对于数据集&#xff0…

第1篇:创建Platform Designer系统

Q&#xff1a;本期我们开始使用Platform Designer工具创建带IP核的FPGA自定义硬件系统。 A&#xff1a;Platform Designer是集成在Quartus软件里的系统设计工具&#xff0c;名称随着Quartus的不断更新曾命名为SOPC Builder和Qsys。 使用Platform Designer可以添加Quartus已有自…

安卓数据库SQLite

目录 一、SQLite数据库二、SQLiteOpenHelper和SQLiteDatabase2.1 SQLiteOpenHelper2.2 SQLiteDatabase 三、常见数据库使用介绍3.1 创建数据库3.2 插入数据3.3 修改数据&#xff08;升级数据库&#xff09;3.4 删除数据3.5 查询数据3.6 关闭数据库3.7 删除数据库 一、SQLite数据…

基于uniapp vue3.0 uView 做一个点单页面(包括加入购物车动画和左右联动)

1、实现效果&#xff1a; 下拉有自定义组件&#xff08;商品卡片、进步器、侧边栏等&#xff09;源码 2、左右联动功能 使用scroll-view来做右边的菜单页&#xff0c;title的id动态绑定充当锚点 <scroll-view :scroll-into-view"toView" scroll-with-animation…

OSPF基本配置

原理概述 OSPF 是一种应用非常广泛的基于链路状态的动态路由协议&#xff0c;它具有区域&#xff08; Area )化的层次结构&#xff0c;扩展性好&#xff0c;收敛速度快&#xff0c;适合部署在各种规模的网络上。 在 OSPF 中&#xff0c;每台路由器都必须有一个 Router-I…

仓储机器人确实蛮卷的~

导语 大家好&#xff0c;我是智能仓储物流技术研习社的社长&#xff0c;老K。专注分享智能仓储物流技术、智能制造等内容。 新书《智能物流系统构成与技术实践》 视频来源于Agilox。 仓储机器人&#xff0c;无疑是现代物流业的一大亮点。它们小巧灵活&#xff0c;却能承担起繁重…

线上线下交友陪玩,支持小程序/app/h5三端打包,源码搭建!

社交APP定制开发的好处&#xff1a; 社交APP定制开发能够根据用户需求进行个性化定制&#xff0c;满足用户对于社交功能的特殊需求。不同用户对社交的理解和需求各不相同&#xff0c;定制开发可以根据用户的要求&#xff0c;提供更加个性化和专属的社交功能&#xff0c;为用户…

Zapier 与生成式 AI 的自动化(一)

原文&#xff1a;zh.annas-archive.org/md5/057fe0c351c5365f1188d1f44806abda 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 前言 当组织处理手动和重复性任务时&#xff0c;生产力会遇到重大问题。Zapier 处于无代码运动的前沿&#xff0c;提供了一种先进的工具&a…

Flutter运行项目一直:running gradle task

大体原因就是访问国外的资源由于网络等原因导致访问失败&#xff0c;解决方法就是换成国内的源 修改项目的android/build.gradle 文件&#xff0c;将里面的 google() mavenCentral()替换为 maven {allowInsecureProtocol trueurl https://maven.aliyun.com/repository/googl…

SpringCloud学习笔记(二)Ribbon负载均衡、Nacos注册中心、Nacos与Eureka的区别

文章目录 4 Ribbon负载均衡4.1 负载均衡原理4.2 源码解读4.3 负载均衡策略4.3.1 内置的负载均衡策略4.3.2 自定义负载均衡策略4.3.2.1 方式一&#xff1a;定义IRule4.3.2.2 方式二&#xff1a;配置文件 4.4 饥饿加载 5 Nacos注册中心5.1 认识和安装Nacos5.2 服务注册到Nacos5.3…

STM32 电源控制PWR

一、PWR电源控制 1.1 PWR&#xff08;Power Control&#xff09; PWR负责管理STM32内部的电源供电部分&#xff0c;可以实现可编程电压监测器和低功耗模式的功能 可编程电压监测器&#xff08;PVD&#xff09;可以监控VDD电源电压&#xff0c;当VDD下降到PVD阀值以下或上升到…

Postgresql 从小白到高手 十一 :数据迁移ETL方案

文章目录 Postgresql 数据迁移ETL方案1、Pg 同类型数据库2 、Pg 和 不同数据库 Postgresql 数据迁移ETL方案 1、Pg 同类型数据库 备份 : pg_dump -U username -d dbname -f backup.sql插入数据&#xff1a; psql -U username -d dbname -f backup.sqlpg_restore -U username…

基于PCIE4C的数据传输(三)——使用遗留中断与MSI中断

本文继续基于PCIE4C IP核实现主机&#xff08;RHEL 8.9&#xff09;与FPGA&#xff08;Xilinx UltrascaleHBM VCU128开发板&#xff09;间DMA数据传输时的中断控制。本文分为三个部分&#xff1a;FPGA设计、驱动程序设计、上板测试。 FPGA设计 基于PCIE4C的数据传输&#xff0…

聚醚醚酮(Polyether Ether Ketone)PEEK在粘接使用时可以使用UV胶水吗?要注意哪些事项?

一般情况下&#xff0c;聚醚醚酮&#xff08;Polyether Ether Ketone&#xff0c;PEEK&#xff09;是一种难以黏附的高性能工程塑料&#xff0c;而UV胶水通常不是与PEEK进行粘接的首选方法。PEEK表面的化学性质和高温性能使得它对常规胶水的附着性较低。然而&#xff0c;有一些…

深度学习之基于Matlab NN的伦敦房价预测

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 一、项目背景 房价预测是房地产领域的一个重要问题&#xff0c;对于投资者、开发商以及政策制定者等都具有重要的指…