搭建大型分布式服务(四十一)SpringBoot 整合多个kafka数据源-支持亿级消息生产者

系列文章目录


文章目录

  • 系列文章目录
  • 前言
      • 一、本文要点
      • 二、开发环境
      • 三、原项目
      • 四、修改项目
      • 五、测试一下
      • 五、小结


前言

本插件稳定运行上百个kafka项目,每天处理上亿级的数据的精简小插件,快速上手。

<dependency><groupId>io.github.vipjoey</groupId><artifactId>multi-kafka-starter</artifactId><version>最新版本号</version>
</dependency>

例如下面这样简单的配置就完成SpringBoot和kafka的整合,我们只需要关心com.mmc.multi.kafka.starter.OneProcessorcom.mmc.multi.kafka.starter.TwoProcessor 这两个Service的代码开发。

## topic1的kafka配置
spring.kafka.one.enabled=true
spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.one.topic=mmc-topic-one
spring.kafka.one.group-id=group-consumer-one
spring.kafka.one.processor=com.mmc.multi.kafka.starter.OneProcessor // 业务处理类名称
spring.kafka.one.consumer.auto-offset-reset=latest
spring.kafka.one.consumer.max-poll-records=10
spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer## topic2的kafka配置
spring.kafka.two.enabled=true
spring.kafka.two.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.two.topic=mmc-topic-two
spring.kafka.two.group-id=group-consumer-two
spring.kafka.two.processor=com.mmc.multi.kafka.starter.TwoProcessor // 业务处理类名称
spring.kafka.two.consumer.auto-offset-reset=latest
spring.kafka.two.consumer.max-poll-records=10
spring.kafka.two.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.two.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer## pb 消息消费者
spring.kafka.pb.enabled=true
spring.kafka.pb.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.pb.topic=mmc-topic-pb
spring.kafka.pb.group-id=group-consumer-pb
spring.kafka.pb.processor=pbProcessor
spring.kafka.pb.consumer.auto-offset-reset=latest
spring.kafka.pb.consumer.max-poll-records=10
spring.kafka.pb.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.pb.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer## kafka消息生产者
spring.kafka.four.enabled=true
spring.kafka.four.producer.name=fourKafkaSender
spring.kafka.four.producer.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.four.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.four.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

国籍惯例,先上源码:Github源码

一、本文要点

本文将介绍通过封装一个starter,来实现多kafka数据源的配置,通过通过源码,可以学习以下特性。系列文章完整目录

  • SpringBoot 整合多个kafka数据源
  • SpringBoot 批量消费kafka消息
  • SpringBoot 优雅地启动或停止消费kafka
  • SpringBoot kafka本地单元测试(免集群)
  • SpringBoot 利用map注入多份配置
  • SpringBoot BeanPostProcessor 后置处理器使用方式
  • SpringBoot 将自定义类注册到IOC容器
  • SpringBoot 注入bean到自定义类成员变量
  • Springboot 取消限定符
  • SpringBoot 支持消费protobuf类型的kafka消息
  • SpringBoot Aware设计模式
  • SpringBoot 获取kafka消息中的topic、offset、partition、header等参数
  • SpringBoot 使用任意生产者发送kafka消息
  • SpringBoot 配置任意数量的kafka生产者

二、开发环境

  • jdk 1.8
  • maven 3.6.2
  • springboot 2.4.3
  • kafka-client 2.6.6
  • idea 2020

三、原项目

1、接前文,我们整合生产者到这个组件,那么假如我们生产的消息过亿级别,一个生产者不足以支持这么大的buffer发送量级,我们该如何操作?是否支持配置多个数量的生产者?


## 1.配置
spring.kafka.four.enabled=true
spring.kafka.four.producer.count=1 ## 生产者数量,默认为1个
spring.kafka.four.producer.name=fourKafkaSender
spring.kafka.four.producer.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.four.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.four.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer## 2.引用
@Resource(name = "fourKafkaSender")
private MmcKafkaSender mmcKafkaMultiSender;## 3.使用
mmcKafkaMultiSender.sendStringMessage(topicOne, "aaa", json);

答案是可以的、但我们要升级和优化一下。

四、修改项目

1、新增MmcKafkaMultiSender类,用于支持任意数量的发送者,利用负载均衡发送消息。

public class MmcKafkaMultiSender implements MmcKafkaSender {private final AtomicLong atomicLong = new AtomicLong(1);private final List<KafkaTemplate<String, Object>> templates;public MmcKafkaMultiSender(List<KafkaTemplate<String, Object>> templates) {this.templates = templates;}@Overridepublic void sendStringMessage(String topic, String partitionKey, String message) {KafkaTemplate<String, Object> template = templates.get((int) (atomicLong.getAndIncrement() % templates.size()));template.send(topic, partitionKey, message);}@Overridepublic void sendProtobufMessage(String topic, String partitionKey, byte[] message) {KafkaTemplate<String, Object> template = templates.get((int) (atomicLong.getAndIncrement() % templates.size()));template.send(topic, partitionKey, message);}
}

2、修改MmcMultiProducerAutoConfiguration配置类,配置多个数量的Sender;

 @Beanpublic MmcKafkaOutputContainer mmcKafkaOutputContainer() {// 初始化一个存储所有生产者的哈希映射Map<String, MmcKafkaSender> outputs = new HashMap<>();// 获取所有的Kafka配置信息Map<String, MmcMultiKafkaProperties.MmcKafkaProperties> kafkas = mmcMultiKafkaProperties.getKafka();// 逐个遍历,并生成producerfor (Map.Entry<String, MmcMultiKafkaProperties.MmcKafkaProperties> entry : kafkas.entrySet()) {// 唯一生产者名称String name = entry.getKey();// 生产者配置MmcMultiKafkaProperties.MmcKafkaProperties properties = entry.getValue();// 是否开启if (properties.isEnabled()&& properties.getProducer().isEnabled()&& CommonUtil.isNotEmpty(properties.getProducer().getBootstrapServers())) {// bean名称String beanName = Optional.ofNullable(properties.getProducer().getName()).orElse(name + "KafkaSender");// 数量List<KafkaTemplate<String, Object>> templates = new ArrayList<>(properties.getProducer().getCount());for (int i = 0; i < properties.getProducer().getCount(); i++) {log.info("[pando] init producer {} - {} ", name, i);KafkaTemplate<String, Object> template = mmcKafkaTemplate(properties);templates.add(template);}// 创建实例MmcKafkaSender sender = new MmcKafkaMultiSender(templates);outputs.put(beanName, sender);// 注册到IOCbeanDefinitionRegistry.registerSingleton(beanName, sender);}}return new MmcKafkaOutputContainer(outputs);}

五、测试一下

1、引入kafka测试需要的jar。参考文章:kafka单元测试

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.18.0</version><scope>test</scope></dependency><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java-util</artifactId><version>3.18.0</version><scope>test</scope></dependency>

2、消费者配置保持不变,增加生产者配置。

## json消息消费者
spring.kafka.one.enabled=true
spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.one.topic=mmc-topic-one
spring.kafka.one.group-id=group-consumer-one
spring.kafka.one.processor=oneProcessor
spring.kafka.one.duplicate=false
spring.kafka.one.snakeCase=false
spring.kafka.one.consumer.auto-offset-reset=latest
spring.kafka.one.consumer.max-poll-records=10
spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.container.threshold=2
spring.kafka.one.container.rate=1000
spring.kafka.one.container.parallelism=8## json消息生产者
spring.kafka.four.enabled=true
spring.kafka.four.producer.name=fourKafkaSender
spring.kafka.four.producer.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.four.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.four.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

3、编写测试类。

@Slf4j
@ActiveProfiles("dev")
@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = {MmcMultiProducerAutoConfiguration.class, MmcMultiConsumerAutoConfiguration.class,DemoService.class, OneProcessor.class})
@TestPropertySource(value = "classpath:application-string.properties")
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"},topics = {"${spring.kafka.one.topic}"})
class KafkaStringMessageTest {@Value("${spring.kafka.one.topic}")private String topicOne;@Value("${spring.kafka.two.topic}")private String topicTwo;@Resource(name = "fourKafkaSender")private MmcKafkaSender mmcKafkaSender;@Testvoid testDealMessage() throws Exception {Thread.sleep(2 * 1000);// 模拟生产数据produceMessage();Thread.sleep(10 * 1000);}void produceMessage() {for (int i = 0; i < 10; i++) {DemoMsg msg = new DemoMsg();msg.setRoutekey("routekey" + i);msg.setName("name" + i);msg.setTimestamp(System.currentTimeMillis());String json = JsonUtil.toJsonStr(msg);mmcKafkaSender.sendStringMessage(topicOne, "aaa", json);}}
}

5、运行一下,测试通过,可以看到能正常发送消息和消费。
在这里插入图片描述

五、小结

将本项目代码构建成starter,就可以大大提升我们开发效率,我们只需要关心业务代码的开发,github项目源码:轻触这里。如果对你有用可以打个星星哦。下一篇,升级本starter,在kafka单分区下实现十万级消费处理速度。

《搭建大型分布式服务(三十六)SpringBoot 零代码方式整合多个kafka数据源》
《搭建大型分布式服务(三十七)SpringBoot 整合多个kafka数据源-取消限定符》
《搭建大型分布式服务(三十八)SpringBoot 整合多个kafka数据源-支持protobuf》
《搭建大型分布式服务(三十九)SpringBoot 整合多个kafka数据源-支持Aware模式》
搭建大型分布式服务(四十)SpringBoot 整合多个kafka数据源-支持生产者
搭建大型分布式服务(四十一)SpringBoot 整合多个kafka数据源-支持亿级消息生产者

加我加群一起交流学习!更多干货下载、项目源码和大厂内推等着你

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/36097.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【ARM】MCU和SOC的区别

【更多软件使用问题请点击亿道电子官方网站】 1、 文档目标 了解SOC芯片和MCU芯片的区别 2、 问题场景 用于了解SOC芯片和MCU芯片的区别&#xff0c;内部结构上的区别。 3、软硬件环境 1&#xff09;、软件版本&#xff1a;无 2&#xff09;、电脑环境&#xff1a;无 3&am…

Java——枚举

1. 概念 枚举是在JDK1.5之后引入的&#xff0c;主要用途是&#xff1a;将一组常量组织起来&#xff0c;在这之前表示一组常量通常使用定义常量的方式&#xff1a; public static final int RED 1; public static final int GREEN 2; public static final int BLACK 3;但是…

第 12 课:基于隐语的VisionTransformer框架

基于之前MPC的基础知识&#xff0c;本讲主要内容是MPCViT基于SecretFlow的VisionTransformer框架&#xff0c;主要从神经网络架构&#xff0c;隐私推理框架和实验结果三方面介绍。 一、MPCViT&#xff1a;安全且高效的MPC友好型 Vision Transformer架构 MPCViT隐私推理总体框架…

QT中子工程的创建,以及如何在含有库的子工程项目中引用主项目中的qt资源

1、背景 在qt中创建多项目类型,如下: CustomDll表示其中的一个动态库子项目; CustomLib表示其中的一个静态库子项目; MyWidget表示主项目窗口(main函数所在项目); 2、qrc资源的共享 如何在CustomDll和CustomLib等子项目中也同样使用 MyWidget项目中的qrc资源呢??? 直…

【项目实训】后端逻辑完善

经测试&#xff0c;我们决定前端可以同时选择多个类型的岗位进行查询&#xff0c;以显示相应的公司岗位信息 于是&#xff0c;修改后端函数的逻辑&#xff1a; 后端 首先&#xff0c;因为要对checkList中的job_name进行模糊匹配查询&#xff0c;于是使用以下代码&#xff1a…

【科学计算与可视化】3. Matplotlib 绘图基础

安装 pip install matplotlib 官方文档 https://matplotlib.org/stable/api/pyplot_summary.html 主要介绍一些图片绘制的简要使用&#xff0c;更加详细和进阶需要可参考 以上官方文档。 1 绘制基础 方法名说明title()设置图表的名称xlabel()设置 x 轴名称ylabel()设置 y 轴…

负载组指南说明-负载柜

什么是负载组&#xff1f; 负载组是一种设备&#xff0c;旨在准确模拟电源在实际应用中看到的负载。这种负载组可以用电阻、电感或电容元件构建。它是一种电阻装置&#xff0c;以热量的形式消散一定量的能量&#xff0c;可以通过自然对流、强制空气或水冷系统去除。 为什么要使…

江协科技51单片机学习- p11 Proteus安装模拟51单片机

前言&#xff1a; 本文是根据哔哩哔哩网站上“江协科技51单片机”视频的学习笔记&#xff0c;在这里会记录下江协科技51单片机开发板的配套视频教程所作的实验和学习笔记内容。本文大量引用了江协科技51单片机教学视频和链接中的内容。 引用&#xff1a; Proteus快速入门&…

可溶性聚四氟乙烯离子交换柱PFA层析柱微柱一体成型

PFA微柱&#xff0c;也叫PFA层析柱、PFA离子交换柱等&#xff0c;主要用于地质同位素超净化、痕量、超痕量、微量元素分析实验室。 规格参考&#xff1a;1.5ml、15ml、30ml等。 其主要特性有&#xff1a; 1、PFA层析柱&#xff08;微柱&#xff09;专为离子交换设计&#xff…

SAP ERP公有云(全称 SAP S/4HANA Cloud Public Edition),赋能企业成为智能可持续的企业

在数字化浪潮中&#xff0c;每一家企业都需要应对快速的市场变化&#xff0c;不断追求降本增效&#xff0c;为创新提供资源&#xff0c;发展新的业务模式&#xff0c;安全无忧地完成关键任务系统的转型。 10年前&#xff0c;SAP进入云领域&#xff0c;用云ERP和覆盖全线业务的云…

双通道源表KEITHELY2636B详情参数吉时利2636B

Keithley的2636B是一款2600B系列双通道系统源表(SMU)仪器(0.1fA, 10A脉冲)。它是业界领先的电流/电压源和测量解决方案。这种双通道模型结合了精密电源&#xff0c;真电流源&#xff0c;6 1/2位DMM&#xff0c;任意波形发生器&#xff0c;脉冲发生器和电子负载的能力&#xff0…

锐起RDV5高性能云桌面

锐起是上海锐起信息技术有限公司旗下品牌。该公司创立于 2001 年&#xff0c;是桌面虚拟化产品和解决方案提供商&#xff0c;专注于桌面管理系统和私有云存储系统的系列软件产品研发&#xff0c;致力于简化 IT 管理、增强系统安全&#xff0c;提供简单、易用、稳定、安全的产品…

我在高职教STM32——GPIO入门之按键输入(1)

大家好&#xff0c;我是老耿&#xff0c;高职青椒一枚&#xff0c;一直从事单片机、嵌入式、物联网等课程的教学。对于高职的学生层次&#xff0c;同行应该都懂的&#xff0c;老师在课堂上教学几乎是没什么成就感的。正因如此&#xff0c;才有了借助 CSDN 平台寻求认同感和成就…

【观察】戴尔科技+AMD:释放技术创新“乘数效应”,助力制造业打造“新质生产力”...

在今年的政府工作报告中&#xff0c;“人工智能”首次被写入报告&#xff0c;同时“大力推进现代化产业体系建设&#xff0c;加快发展新质生产力”也被列为2024年的首项政府工作任务&#xff0c;其重要性不言而喻。 尤其是最近几年&#xff0c;以人工智能、大模型、大数据、云计…

java设计模式(六)代理模式(Proxy Pattern)

1、模式介绍&#xff1a; 代理模式&#xff08;Proxy Pattern&#xff09;是一种结构型设计模式&#xff0c;它允许你在不改变客户端代码的情况下&#xff0c;向某个对象提供一个代理&#xff0c;以控制对该对象的访问。代理对象通常会在实际对象的方法调用前后添加一些附加逻…

《UDS协议从入门到精通》系列——图解0x35:请求上传

《UDS协议从入门到精通》系列——图解0x35&#xff1a;请求上传 一、简介二、数据包格式2.1 服务请求格式2.2 服务响应格式2.2.1 肯定响应2.2.2 否定响应 三、通信示例 Tip&#x1f4cc;&#xff1a;本文描述中但凡涉及到其他UDS服务的&#xff0c;将陆续提供链接跳转方式以便快…

AMSR-E/Aqua 第 3 级全球地表土壤水分月平均值 V005 (AMSRE_AVRMO)

AMSR-E/Aqua level 3 global monthly Surface Soil Moisture Averages V005 (AMSRE_AVRMO) at GES DISC AMSR-E/Aqua level 3 global monthly Surface Soil Moisture Standard Deviation V005 (AMSRE_STDMO) at GES DISC 简介 GES DISC 的 AMSR-E/Aqua 第 3 级全球地表土壤水…

操作系统入门 -- 内存管理

操作系统入门 – 内存管理 1.内存种类 1.1 虚拟内存&#xff08;VIRT&#xff09; 进程需要的虚拟内存大小&#xff0c;包括进程使用的库、代码、数据以及malloc、new分配的堆空间和栈空间等。若进程申请了10MB内存但实际使用了1MB&#xff0c;则物理空间会增长10MB。 1.2 …

如何以智能方式安装 Python

Python易于使用&#xff0c;对初学者友好&#xff0c;功能强大&#xff0c;几乎可以为任何应用程序创建强大的软件。 但与任何其他软件一样&#xff0c;Python 的设置和管理可能很复杂。 在本文中&#xff0c;我们将介绍如何正确设置 Python。 您将学习如何选择合适的版本、…

学习笔记——动态路由——RIP(附加度量值配置)

六、附加度量值配置 RIP协议cost开销值&#xff1a;默认值为0&#xff0c;路由信息每传递一次&#xff0c;值增加1&#xff0c;最大15,(路由器不能超过15台)16代表不可达。 入接口附加度量值 rip metricin 5 //可以修改开销改变路径。只能增加&#xff0c;不能减小 …