搭建大型分布式服务(四十)SpringBoot 整合多个kafka数据源-支持生产者

系列文章目录


文章目录

  • 系列文章目录
  • 前言
      • 一、本文要点
      • 二、开发环境
      • 三、原项目
      • 四、修改项目
      • 五、测试一下
      • 五、小结


前言

本插件稳定运行上百个kafka项目,每天处理上亿级的数据的精简小插件,快速上手。

<dependency><groupId>io.github.vipjoey</groupId><artifactId>multi-kafka-starter</artifactId><version>最新版本号</version>
</dependency>

例如下面这样简单的配置就完成SpringBoot和kafka的整合,我们只需要关心com.mmc.multi.kafka.starter.OneProcessorcom.mmc.multi.kafka.starter.TwoProcessor 这两个Service的代码开发。

## topic1的kafka配置
spring.kafka.one.enabled=true
spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.one.topic=mmc-topic-one
spring.kafka.one.group-id=group-consumer-one
spring.kafka.one.processor=com.mmc.multi.kafka.starter.OneProcessor // 业务处理类名称
spring.kafka.one.consumer.auto-offset-reset=latest
spring.kafka.one.consumer.max-poll-records=10
spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer## topic2的kafka配置
spring.kafka.two.enabled=true
spring.kafka.two.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.two.topic=mmc-topic-two
spring.kafka.two.group-id=group-consumer-two
spring.kafka.two.processor=com.mmc.multi.kafka.starter.TwoProcessor // 业务处理类名称
spring.kafka.two.consumer.auto-offset-reset=latest
spring.kafka.two.consumer.max-poll-records=10
spring.kafka.two.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.two.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer## pb 消息消费者
spring.kafka.pb.enabled=true
spring.kafka.pb.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.pb.topic=mmc-topic-pb
spring.kafka.pb.group-id=group-consumer-pb
spring.kafka.pb.processor=pbProcessor
spring.kafka.pb.consumer.auto-offset-reset=latest
spring.kafka.pb.consumer.max-poll-records=10
spring.kafka.pb.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.pb.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer## kafka消息生产者
spring.kafka.four.enabled=true
spring.kafka.four.producer.name=fourKafkaSender
spring.kafka.four.producer.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.four.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.four.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

国籍惯例,先上源码:Github源码

一、本文要点

本文将介绍通过封装一个starter,来实现多kafka数据源的配置,通过通过源码,可以学习以下特性。系列文章完整目录

  • SpringBoot 整合多个kafka数据源
  • SpringBoot 批量消费kafka消息
  • SpringBoot 优雅地启动或停止消费kafka
  • SpringBoot kafka本地单元测试(免集群)
  • SpringBoot 利用map注入多份配置
  • SpringBoot BeanPostProcessor 后置处理器使用方式
  • SpringBoot 将自定义类注册到IOC容器
  • SpringBoot 注入bean到自定义类成员变量
  • Springboot 取消限定符
  • SpringBoot 支持消费protobuf类型的kafka消息
  • SpringBoot Aware设计模式
  • SpringBoot 获取kafka消息中的topic、offset、partition、header等参数
  • SpringBoot 使用任意生产者发送kafka消息
  • SpringBoot 配置任意数量的kafka生产者

二、开发环境

  • jdk 1.8
  • maven 3.6.2
  • springboot 2.4.3
  • kafka-client 2.6.6
  • idea 2020

三、原项目

1、接前文,我们基本完成了kafka consumer常用的特性开发,有小伙伴问,我们该如何配置多个数据源生产者,想consumer一样简单,发送kafka消息呢?


## 1.配置
spring.kafka.four.enabled=true
spring.kafka.four.producer.name=fourKafkaSender
spring.kafka.four.producer.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.four.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.four.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer## 2.引用
@Resource(name = "fourKafkaSender")
private MmcKafkaMultiSender mmcKafkaMultiSender;## 3.使用
mmcKafkaMultiSender.sendStringMessage(topicOne, "aaa", json);

答案是可以的、但我们要升级和优化一下。

四、修改项目

1、修改内部类MmcKafkaProperties类,增加生产者相关的配置。

    @EqualsAndHashCode(callSuper = true)@Datapublic static class Producer extends KafkaProperties.Producer {/*** 是否启用.*/private boolean enabled = true;/*** 生产者名称,如果有设置则会覆盖默认的xxxKakfkaSender名称.*/private String name;}/*** 生产者.*/private final Producer producer = new Producer();/*** Create an initial map of producer properties from the state of this instance.* <p>* This allows you to add additional properties, if necessary, and override the* default kafkaProducerFactory bean.** @return the producer properties initialized with the customizations defined on this*         instance*/Map<String, Object> buildProducerProperties() {return new HashMap<>(this.producer.buildProperties());}

2、新增MmcKafkaSender接口,作为发送Kafka消息的唯一约束。

public interface MmcKafkaSender {/*** 发送kafka消息.** @param topic        topic名称* @param partitionKey 消息分区键* @param message      具体消息*/void sendStringMessage(String topic, String partitionKey, String message);/*** 发送kafka消息.** @param topic        topic名称* @param partitionKey 消息分区键* @param message      具体消息*/void sendProtobufMessage(String topic, String partitionKey, byte[] message);
}

3、新增MmcKafkaOutputContainer容器类,用于存储所有生产者,方便统一管理;

@Getter
@Slf4j
public class MmcKafkaOutputContainer {/*** 存放所有生产者.*/private final Map<String, MmcKafkaSender> outputs;/*** 构造函数.*/public MmcKafkaOutputContainer(Map<String, MmcKafkaSender> outputs) {this.outputs = outputs;}}

4、新增MmcKafkaSingleSender实现类,用于真实发送Kafka消息;

public class MmcKafkaSingleSender implements MmcKafkaSender {private final KafkaTemplate<String, Object> template;public MmcKafkaSingleSender(KafkaTemplate<String, Object> template) {this.template = template;}@Overridepublic void sendStringMessage(String topic, String partitionKey, String message) {template.send(topic, partitionKey, message);}@Overridepublic void sendProtobufMessage(String topic, String partitionKey, byte[] message) {template.send(topic, partitionKey, message);}}

5、修改MmcMultiProducerAutoConfiguration配置类,遍历所有配置,组装并生成MmcKafkaSingleSender,并注册到IOC容器;


@Slf4j
@Configuration
@EnableConfigurationProperties(MmcMultiKafkaProperties.class)
@ConditionalOnProperty(prefix = "spring.kafka", value = "enabled", matchIfMissing = true)
public class MmcMultiProducerAutoConfiguration implements BeanFactoryAware {private DefaultListableBeanFactory beanDefinitionRegistry;@Resourceprivate MmcMultiKafkaProperties mmcMultiKafkaProperties;@Beanpublic MmcKafkaOutputContainer mmcKafkaOutputContainer() {// 初始化一个存储所有生产者的哈希映射Map<String, MmcKafkaSender> outputs = new HashMap<>();// 获取所有的Kafka配置信息Map<String, MmcMultiKafkaProperties.MmcKafkaProperties> kafkas = mmcMultiKafkaProperties.getKafka();// 逐个遍历,并生成producerfor (Map.Entry<String, MmcMultiKafkaProperties.MmcKafkaProperties> entry : kafkas.entrySet()) {// 唯一生产者名称String name = entry.getKey();// 生产者配置MmcMultiKafkaProperties.MmcKafkaProperties properties = entry.getValue();// 是否开启if (properties.isEnabled()&& properties.getProducer().isEnabled()&& CommonUtil.isNotEmpty(properties.getProducer().getBootstrapServers())) {// bean名称String beanName = Optional.ofNullable(properties.getProducer().getName()).orElse(name + "KafkaSender");KafkaTemplate<String, Object> template = mmcdKafkaTemplate(properties);// 创建实例MmcKafkaSender sender = new MmcKafkaSingleSender(template);outputs.put(beanName, sender);// 注册到IOCbeanDefinitionRegistry.registerSingleton(beanName, sender);}}return new MmcKafkaOutputContainer(outputs);}private KafkaTemplate<String, Object> mmcdKafkaTemplate(MmcMultiKafkaProperties.MmcKafkaProperties producer) {return new KafkaTemplate<>(baseKafkaProducerFactory(producer));}private ProducerFactory<String, Object> baseKafkaProducerFactory(MmcMultiKafkaProperties.MmcKafkaProperties producer) {return new DefaultKafkaProducerFactory<>(producer.buildProducerProperties());}@Overridepublic void setBeanFactory(BeanFactory beanFactory) throws BeansException {this.beanDefinitionRegistry = (DefaultListableBeanFactory) beanFactory;}
}

五、测试一下

1、引入kafka测试需要的jar。参考文章:kafka单元测试

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.18.0</version><scope>test</scope></dependency><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java-util</artifactId><version>3.18.0</version><scope>test</scope></dependency>

2、消费者配置保持不变,增加生产者配置。

## json消息消费者
spring.kafka.one.enabled=true
spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.one.topic=mmc-topic-one
spring.kafka.one.group-id=group-consumer-one
spring.kafka.one.processor=oneProcessor
spring.kafka.one.duplicate=false
spring.kafka.one.snakeCase=false
spring.kafka.one.consumer.auto-offset-reset=latest
spring.kafka.one.consumer.max-poll-records=10
spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.container.threshold=2
spring.kafka.one.container.rate=1000
spring.kafka.one.container.parallelism=8## json消息生产者
spring.kafka.four.enabled=true
spring.kafka.four.producer.name=fourKafkaSender
spring.kafka.four.producer.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.four.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.four.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

3、编写测试类。

@Slf4j
@ActiveProfiles("dev")
@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = {MmcMultiProducerAutoConfiguration.class, MmcMultiConsumerAutoConfiguration.class,DemoService.class, OneProcessor.class})
@TestPropertySource(value = "classpath:application-string.properties")
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"},topics = {"${spring.kafka.one.topic}"})
class KafkaStringMessageTest {@Value("${spring.kafka.one.topic}")private String topicOne;@Value("${spring.kafka.two.topic}")private String topicTwo;@Resource(name = "fourKafkaSender")private MmcKafkaSingleSender mmcKafkaSingleSender;@Testvoid testDealMessage() throws Exception {Thread.sleep(2 * 1000);// 模拟生产数据produceMessage();Thread.sleep(10 * 1000);}void produceMessage() {for (int i = 0; i < 10; i++) {DemoMsg msg = new DemoMsg();msg.setRoutekey("routekey" + i);msg.setName("name" + i);msg.setTimestamp(System.currentTimeMillis());String json = JsonUtil.toJsonStr(msg);mmcKafkaSingleSender.sendStringMessage(topicOne, "aaa", json);}}
}

5、运行一下,测试通过,可以看到能正常发送消息和消费。
在这里插入图片描述

五、小结

将本项目代码构建成starter,就可以大大提升我们开发效率,我们只需要关心业务代码的开发,github项目源码:轻触这里。如果对你有用可以打个星星哦。下一篇,升级本starter,在kafka单分区下实现十万级消费处理速度。

《搭建大型分布式服务(三十六)SpringBoot 零代码方式整合多个kafka数据源》
《搭建大型分布式服务(三十七)SpringBoot 整合多个kafka数据源-取消限定符》
《搭建大型分布式服务(三十八)SpringBoot 整合多个kafka数据源-支持protobuf》
《搭建大型分布式服务(三十九)SpringBoot 整合多个kafka数据源-支持Aware模式》
搭建大型分布式服务(四十)SpringBoot 整合多个kafka数据源-支持生产者
搭建大型分布式服务(四十一)SpringBoot 整合多个kafka数据源-支持亿级消息生产者

加我加群一起交流学习!更多干货下载、项目源码和大厂内推等着你

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/861631.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ZYNQ学习教程?ZYNQ-FPGA实战教程!

学习 ZYNQ 比FPGA、MCU、ARM 等传统工具开发要求更高&#xff0c;想学好 ZYNQ 也不是一蹴而就的事情。 学习 ZYNQ 要具备的技能&#xff1a; 1、 软件开发人员  计算机组成原理、 C、C语言、 计算机操作系统、tcl 脚本、良好的英语基础 2、 逻辑开发人员 计算机组成原理…

关于服务器的一些知识

1. 云服务器 和 轻量应用服务器 腾讯云中的"云服务器"&#xff08;Cloud Virtual Machine, CVM&#xff09;和"轻量应用服务器"&#xff08;Lite Cloud Server&#xff09;都是提供云端计算资源的服务&#xff0c;但它们在定位、特性和使用场景上存在一些差…

LongRAG:利用长上下文大语言模型提升检索生成效果

一、前言 前面我们已经介绍了多种检索增强生成 (RAG) 技术&#xff0c;基本上在保证数据质量的前提下&#xff0c;检索增强生成&#xff08;RAG&#xff09;技术能够有效提高检索效率和质量&#xff0c;相对于大模型微调技术&#xff0c;其最大的短板还是在于有限的上下文窗口…

幽默证明题!高考成绩公布后,妈妈连夜写了一封信:孩子,这就是我不让你玩手机的原因——早读(逆天打工人爬取热门微信文章解读)

毛毛雨&#xff0c;五分钟结束&#xff0c;怎么证明今天早上有下雨呢&#xff1f; 引言Python 代码第一篇 洞见 高考成绩公布后&#xff0c;妈妈连夜写了一封信&#xff1a;孩子&#xff0c;这就是我不让你玩手机的原因第二篇 视频新闻结尾 引言 今天睡眠质量不错 发现一个问题…

10分钟微调专属于自己的大模型_10分钟微调大模型

1.环境安装 # 设置pip全局镜像 (加速下载) pip config set global.index-url https://mirrors.aliyun.com/pypi/simple/ # 安装ms-swift pip install ms-swift[llm] -U# 环境对齐 (通常不需要运行. 如果你运行错误, 可以跑下面的代码, 仓库使用最新环境测试) pip install -r r…

vb6多线程异步,VB.NET 全用API实现:CreateThread创建多线程,等待线程完成任务

在VB.NET中&#xff0c;你可以使用API函数来创建多线程并等待线程完成任务。以下是一个示例代码&#xff0c;展示如何使用API函数来实现这个功能&#xff1a; Imports System.Runtime.InteropServices Imports System.ThreadingPublic Class Form1Private Delegate Sub ThreadC…

大模型+多模态合规分析平台,筑牢金融服务安全屏障

随着金融市场的快速发展&#xff0c;金融产品和服务日趋多样化&#xff0c;消费者面临的风险也逐渐增加。 为保护消费者权益&#xff0c;促进金融市场长期健康稳定发展&#xff0c;国家监管机构不断加强金融监管&#xff0c;出台了一系列法律法规和政策文件。对于金融从业机构…

【DC-DC升压电推剪方案】FP6277,FP6296电源升压芯片在电推剪中扮演着一个怎样的角色?带你深入了解电推剪的功能和应用及工作原理

随着人们对个人形象要求的不断提高&#xff0c;理发器作为一个必备的家居用品&#xff0c;也在不断进行技术升级。而其中的核心装备之一&#xff0c;电推剪理发器升压芯片FP6277、FP6296&#xff0c;正在引领着现代理发技术的突破。本文将给大家带来的是电推剪在传统意义上运用…

keil仿真,查看函数执行时间和执行次数

Execution Profiler执行档案器 The Execution Profiler records timing and execution statistics about instructions for the complete program code. To view the values in the Editor or Disassembly Window, use Show Time or Show Calls from the menu Debug — Executi…

6.18-6.26 旧c语言

第一章 概述 32关键字 9种控制语句 优点&#xff1a;能直接访问物理地址&#xff0c;位操作&#xff0c;代码质量高&#xff0c;执行效率高 可移植性好 面向过程&#xff1a;以事件为中心 面向对象&#xff1a;以实物为中心 printf&#xff1a;系统定义的标准函数 #include&l…

探索 JQuery EasyUI:构建简单易用的前端页面

介绍 当我们站在网页开发的浩瀚世界中&#xff0c;眼花缭乱的选择让我们难以抉择。而就在这纷繁复杂的技术海洋中&#xff0c;JQuery EasyUI 如一位指路明灯&#xff0c;为我们提供了一条清晰的航线。 1.1 什么是 JQuery EasyUI&#xff1f; JQuery EasyUI&#xff0c;简单来…

DM达梦数据库转换、条件函数整理

&#x1f49d;&#x1f49d;&#x1f49d;首先&#xff0c;欢迎各位来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里不仅可以有所收获&#xff0c;同时也能感受到一份轻松欢乐的氛围&#xff0c;祝你生活愉快&#xff01; &#x1f49d;&#x1f49…

【乐吾乐2D可视化组态编辑器】画布

5.1 设置画布属性 默认颜色&#xff1a;预先设置默认颜色&#xff0c;拖拽到画布的节点&#xff08;基础图形、文字、icon&#xff09;自动统一默认颜色。 画笔填充颜色&#xff1a;预先设置画笔填充颜色&#xff0c;拖拽到画布的节点&#xff08;基础图形&#xff09;自动统…

QT自定义信号和槽函数

在QT中最重要也是必须要掌握的机制&#xff0c;就是信号与槽机制&#xff0c;在MFC上也就是类型的机制就是消息与响应函数机制 在QT中我们不仅要学会如何使用信号与槽机制&#xff0c;还要会自定义信号与槽函数&#xff0c;要自定义的原因是系统提供的信号&#xff0c;在一些情…

免费录制视频软件推荐,这3款软件超实用!

随着网络技术的发展&#xff0c;录制视频已经成为人们日常生活中的一个重要需求。无论是教学、会议、游戏还是娱乐&#xff0c;视频录制都为我们提供了极大的便利。然而&#xff0c;市场上的视频录制软件琳琅满目&#xff0c;如何选择一款适合自己的免费录制视频软件成为了一个…

Java基础知识-Map、HashMap、HashTable和TreeMap

1、HashMap 和 Hashtable 的区别&#xff1f; HashMap 和 Hashtable是Map接口的实现类&#xff0c;它们大体有一下几个区别&#xff1a; 1. 继承的父类不同。HashMap是继承自AbstractMap类&#xff0c;而HashTable是继承自Dictionary类。 2. 线程安全性不同。Hashtable 中的方…

MapStruct-JavaBean映射工具使用指南

在软件开发中&#xff0c;对象之间的转换是一项常见的任务&#xff0c;尤其是在处理数据模型间的映射时。传统的做法&#xff0c;如使用JavaBeanUtils&#xff0c;可能会导致性能下降&#xff0c;而手动编写转换代码则效率低下且易出错。为了解决这些问题&#xff0c;MapStruct…

为何整个 AI 领域都朝着 AI Agents 这一方向发展?

编者按&#xff1a; 当前大热的大语言模型和检索增强生成模型&#xff0c;虽然在语言理解和内容生成方面取得了突破性的进展&#xff0c;但仍然存在诸多限制。它们缺乏根据目标导引行为、持续学习和与环境交互的能力&#xff0c;难以应对复杂多变的现实场景需求。 今天为大家带…

Go Error 处理

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…

HarmonyOS(39) Preferences 入门指南

Preferences Preferences应用实例初始化preferences保存并持久化数据获取数据 参考资料 Preferences Android开发程序员对此应该很熟悉&#xff0c;HarmonyOS里的Preferences跟Android里的SharePreference差不多&#xff0c;应用提供Key-Value键值型的数据处理能力&#xff0c…