Springboot使用kafka事务-生产者方

前言

在上一篇文章中,我们使用了springboot的AOP功能实现了kafka的分布式事务,但是那样实现的kafka事务是不完美的,因为请求进来之后分配的是不同线程,但不同线程使用的kafka事务却是同一个,这样会造成多请求情况下的事务失效。

而解决这个问题的方法,就是每个线程都使用一个新的事务生产者去发送一条新的事务消息,然后这个事务还要和当前线程进行绑定,实现不同线程之间的事务隔离。

通常来说,这个繁杂的过程虽然我们可以实现,但是始终没有框架研发者做的那么完美,所以,我们首先要去看一下框架的作者有没有实现这个功能。

幸运地是,上述功能在kafka之中是有实现的,而且首次实现的时间是在2017年,所以我们可以直接使用作者提供的基于springboot的事务管理功能。

注入kafka事务

在springboot中启用kafka的事务,有两种方式,第一种方式为使用springboot提供的自动配置,第二种是自己往容器中注入。

方式一:springboot自动注入

想要使用自动注入,我们只需要在配置文件中加入transaction-id-prefix即可,配置文件如下:

spring:kafka:producer:bootstrap-servers: localhost:9092#bootstrap-servers: localhost:9010key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializertransaction-id-prefix: test

这样配置之后,就开启了kafka的事务。

方式一弊端

这样虽然可以直接使用springboot自动装配功能,但是却有下面两个弊端

  • 只能使用一个kafka的集群地址
  • 全局开启了事务,有的方法并不需要全局开启事务
    所以一旦有多个kafka的地址需要配置,或者只想让部分方法使用事务,那么就可以使用第二种方法来解决,那就是自己往容器里面添加kafka的事务管理器。

方式二:向spring容器中添加自定义kafka事务管理器

在kafka事务管理器中,有三个重要的对象,分别是ProducerFactory、KafkaTemplate、KafkaTransactionManager,他们的作用如下:

  • ProducerFactory,用来创建kafka的生产者对象
  • KafkaTemplate,springboot封装的kafka模版
  • KafkaTransactionManager,kafka的事务管理器
    想要往spring容器中添加自定义的kafka事务管理器,其实就是添加一个自定义的KafkaTransactionManager对象,那么我们只需要想办法构造一个KafkaTransactionManager就好。

利用springboot的配置类,我们能很轻松的做到这一点。
第一步,构造一个配置类KafkaAndDataTransactionConfig,加上@Configuration注解。

@Configuration
public class KafkaAndDataTransactionConfig {
}

第二步,构建一个ProducerFactory对象的Bean,交给spring容器。

	@ResourceNacosDiscoveryProperties nacosDiscoveryProperties;/*** 注入一个kafka生产者,这个生产者的transactional.id自定义,避免导致多个生产者的事务id相同* @param props yaml文件中的定义属性*/@BeanProducerFactory<String, String> pf1(KafkaProperties props) {Map<String, Object> pProps = props.buildProducerProperties();pProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "product-transactional-id-" + nacosDiscoveryProperties.getIp() + "-" + nacosDiscoveryProperties.getPort());pProps.put(ProducerConfig.CLIENT_ID_CONFIG, "product-client-id-" + nacosDiscoveryProperties.getIp() + "-" + nacosDiscoveryProperties.getPort());return new DefaultKafkaProducerFactory<>(pProps);}

注意其中的nacosDiscoveryProperties变量,这是用来获取实例在nacos中的ip地址,因为在多实例的情况下需要保证每一个事务id的唯一,才不会被kafka的事务管理器识别为失效事务生产者,从而导致事务冲突失效。
第三步,创建一个KafkaTransactionManager对象的Bean,添加到spring容器。

	/*** 注入一个kafka事务管理器,这个事务管理器使用事务id* @param pf1* @return*/@BeanKafkaTransactionManager<String, String> kafkaTransactionManagerWithTxId(ProducerFactory<String, String> pf1) {return new KafkaTransactionManager<>(pf1);}

只需要将创建好的生产者bean,作为构造参数传入即可。
通过以上三步,我们就得到了一个支持事务的kafka事务管理器了,不过,此时我们还少创建了一个KafkaTemplate,没有这个对象我们将完不成事务发送的管控。

第四步,创建KafkaTemplate

	/*** 注入一个使用事务id的kafkaTemplate,这个kafkaTemplate可以使用事务* @param pf1* @return*/@BeanKafkaTemplate<String, String> kafkaTemplateWithTxId(ProducerFactory<String, String> pf1) {return new KafkaTemplate<>(pf1);}

经过以上代码,我们就得到了一个完整的kafka事务管理器了。
全部代码如下:

@Configuration
public class KafkaAndDataTransactionConfig {@ResourceNacosDiscoveryProperties nacosDiscoveryProperties;/*** 注入一个kafka生产者,这个生产者的transactional.id自定义,避免导致多个生产者的事务id相同* @param props yaml文件中的定义属性*/@BeanProducerFactory<String, String> pf1(KafkaProperties props) {Map<String, Object> pProps = props.buildProducerProperties();pProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "product-transactional-id-" + nacosDiscoveryProperties.getIp() + "-" + nacosDiscoveryProperties.getPort());pProps.put(ProducerConfig.CLIENT_ID_CONFIG, "product-client-id-" + nacosDiscoveryProperties.getIp() + "-" + nacosDiscoveryProperties.getPort());return new DefaultKafkaProducerFactory<>(pProps);/*** 注入一个kafka事务管理器,这个事务管理器使用事务id* @param pf1* @return*/@BeanKafkaTransactionManager<String, String> kafkaTransactionManagerWithTxId(ProducerFactory<String, String> pf1) {return new KafkaTransactionManager<>(pf1);}/*** 注入一个使用事务id的kafkaTemplate,这个kafkaTemplate可以使用事务* @param pf1* @return*/@BeanKafkaTemplate<String, String> kafkaTemplateWithTxId(ProducerFactory<String, String> pf1) {return new KafkaTemplate<>(pf1);}}

增加DataSourceTransaction事务管理器

默认情况,DataSourceTransaction事务管理器springboot会帮我们自动配置,但是在使用了kafka的事务之后,会存在一个类的加载冲突,导致DataSourceTransaction没有被springboot自动加载到,所以我们还需要自己将DataSourceTransaction事务管理加入进来。
在上面的代码中,再加入以下代码

	//构造器注入DataSource和transactionManagerCustomizersprivate final DataSource dataSource;private final TransactionManagerCustomizers transactionManagerCustomizers;KafkaAndDataTransactionConfig(DataSource dataSource,ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {this.dataSource = dataSource;this.transactionManagerCustomizers = transactionManagerCustomizers.getIfAvailable();}/*** @Bean 去掉了ConditionalOnMissingBean 避免注入了kafka事务管理器后,springboot不再注入DataSourceTransactionManager* @Primary  作为主事务管理器,这样在使用@Transactional时,就会使用DataSourceTransactionManager* @param properties* @return*/@Bean@Primarypublic DataSourceTransactionManager dstm(DataSourceProperties properties) {DataSourceTransactionManager transactionManager = new DataSourceTransactionManager(this.dataSource);if (this.transactionManagerCustomizers != null) {this.transactionManagerCustomizers.customize(transactionManager);}return transactionManager;}

增加ChainedKafkaTransactionManager管理器

在实际开发中,有时候一个方法需要既支持kafka的事务,又需要支持JDBC的事务,这个时候为了兼容两者的事务,我们需要将两者的事务放到同一个事务管理器中,让他们两个构成一个事务。kafka的作者为我们提供了ChainedKafkaTransactionManager这个对象,来支持这个操作,只需要加入以下代码即可

	 //多个事务管理器构成一个事务,使用ChainedKafkaTransactionManager管理,是因为可以自动偏移kafka事务给消费者@Bean public ChainedKafkaTransactionManager kafkaAndDataSourceTransactionManager(DataSourceTransactionManager transactionManager,@Autowired @Qualifier("kafkaTransactionManagerWithTxId") KafkaTransactionManager<?, ?> kafkaTransactionManager){return new ChainedKafkaTransactionManager<>(transactionManager, kafkaTransactionManager);}

以上,就是kafka集成springboot的方案,接下来,看看怎么使用

使用

基于以上的配置,一共有三种使用方式

  • 只使用kafka事务
  • 只使用JDBC事务
  • 同时使用kafka和JDBC事务

针对于上面的三种情况的切换,其实就是使用不同Transactional注解中的value值切换不同的事务管理器,事务的指定都在service层的实现类中。

只使用kafka事务

	//指定事务模版为自定义模版@Resource(name = "kafkaTemplateWithTxId")private KafkaTemplate<String, String> kafkaTemplate;@Transactional(rollbackFor = Exception.class,value = "kafkaAndDataSourceTransactionManager")public void transation() {ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<>("test-topic", "test");kafkaTemplate.send(stringStringProducerRecord);}

只使用JDBC事务

不需要指定任何的事务管理器

	@Override@Transactional(rollbackFor = Exception.class)public void transationOfJdbc() {xxxService.update(user);}

同时使用kafka和JDBC事务

指定自定义的事务管理器

	//指定事务模版为自定义模版@Resource(name = "kafkaTemplateWithTxId")private KafkaTemplate<String, String> kafkaTemplate;@Transactional(rollbackFor = Exception.class,value = "kafkaAndDataSourceTransactionManager")public void transationAll() {xxxService.update(user);spreadMonitorService.sendMsg();ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<>("test-topic", "test");kafkaTemplate.send(stringStringProducerRecord);}

结语

以上,就是在springboot中生产端实现事务的方法,总结一下,一共分为以下几步

  • 增加kafka事务管理器
  • 增加JDBC事务管理器
  • 增加事务链事务管理器
  • 使用三种事务管理器

下一篇,将写springboot中消费端如何配置。


引用资料:
kafka官网
kafka的github
spring-kafka官网

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/55674.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

QT ListQvector at赋值出错以及解决办法 QT基础入门【QT存储结构】

1、问题 error: passing const QString as this argument discards qualifiers error: assignment of read-only location vec.QVector<int>::at(0) 在Qt中QList,Qvector一般获取元素都是通过at(index)来获取,但是at()的返回是一个const & 常引用,也就是元素不支…

[shell] $@ 与 eval

$ 与 $* $的用法以及与$*的区别可以参考 What does $ mean in a shell script? 测试脚本如下 #!/bin/bashset -e set -xbash -c $ bash -c "$" bash -c $* bash -c "$*"一般情况下&#xff0c;我们可能会这样执行脚本 ./test.sh Y324 X1 echo hello输…

微信小程序canvas type=2d生成海报保存到相册、文字换行溢出显示...、文字删除线、分享面板

一、简介 做个简单的生成二维码海报分享&#xff0c;我做的时候也找简单的方法看能不能实现页面直接截图那种生成图片&#xff0c;原生小程序不支持&#xff0c;不多介绍下面有全部代码有注释、参数自行替换运行看看&#xff0c;还有需要优化的地方&#xff0c;有问题可以咨询…

squid服务器

目录 squid初识 安装squid代理 常用命令 主要配置文件 正向代理 环境配置 linux服务器设置 windows客户端设置 反向代理 环境配置 在web服务器配置服务 linux服务器配置 squid初识 含义&#xff1a;squid cache是一个流行的自由软件&#xff08;GNU通用公共许可证…

低代码与低代码平台的概念解析

随着数字化转型和软件需求的不断增长&#xff0c;传统的手写代码开发方式已经无法满足迅速推出应用程序的需求。为了加快软件开发的速度并降低技术门槛&#xff0c;低代码开发模式应运而生。本文将介绍低代码的概念&#xff0c;探讨什么是低代码什么是低代码平台&#xff1f; 一…

poi带表头多sheet导出

导出工具类 package com.hieasy.comm.core.excel;import com.hieasy.comm.core.excel.fragment.ExcelFragment; import com.hieasy.comm.core.utils.mine.MineDateUtil; import org.apache.poi.hssf.usermodel.*; import org.apache.poi.ss.usermodel.*; import org.apache.po…

Git基本操作(Idea版)

第一次发布项目&#xff08;本地->远程&#xff09; 方式一 通过push的方式推送本地库到远程库&#xff08;远程已创建好仓库&#xff09; 这种方式需要提前创建好仓库。 右键点击项目&#xff0c;可以将当前分支的内容 push 到 GitHub 的远程仓库中。 注意&#xff1a…

Readl DOM (真实DOM) 和 Virtual DOM (虚拟DOM)之间的区别,以及优缺点

文章目录 一、Readl DOM (真实DOM) 和 Virtual DOM (虚拟DOM)之间的区别&#xff1f;二、优点缺点1.优点2.缺点 一、Readl DOM (真实DOM) 和 Virtual DOM (虚拟DOM)之间的区别&#xff1f; 两者之间的区别&#xff1a; 1、真实DOM&#xff1a; 真实DOM是浏览器中的实际DOM结构…

CSDN编程题-每日一练(2023-08-27)

CSDN编程题-每日一练&#xff08;2023-08-27&#xff09; 一、题目名称&#xff1a;异或和二、题目名称&#xff1a;生命进化书三、题目名称&#xff1a;熊孩子拜访 一、题目名称&#xff1a;异或和 时间限制&#xff1a;1000ms内存限制&#xff1a;256M 题目描述&#xff1a; …

css3滤镜属性filter让网页变黑白

今天是特殊的日子&#xff0c;抗击疫情全国哀悼日&#xff0c;向英雄们致敬&#xff0c;一路走好&#xff01;应该发现了今天很多网站页面都是黑白色的&#xff0c;我的博客今天都是黑白色&#xff0c;用css3滤镜属性filter让网页马上变黑白&#xff0c;一行代码就搞定。 在你…

R包开发-2.1:在RStudio中使用Rcpp制作R-Package(更新于2023.8.23)

目录 0-前言 1-在RStudio中创建R包项目 2-创建R包 2.1通过R函数创建新包 2.2在RStudio通过菜单来创建一个新包 2.3关于R包创建的说明 3-添加R自定义函数 4-添加C函数 0-前言 目标&#xff1a;在RStudio中创建一个R包&#xff0c;这个R包中包含C函数&#xff0c;接口是Rc…

c++中的const与constexpt的区别

c中的const与constexpr的区别 const const 是一种修饰符&#xff0c;用于声明一个只读的常量。它可以用于变量、函数参数和函数返回类型。声明为 const 的变量的值在初始化后就不能再改变。 适用场景&#xff1a; const 适用于声明运行时常量&#xff0c;在编译时无法确定值…

如何使用HTML5新增的标签来构建语义化的页面结构?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ <header>&#xff1a;⭐ <nav>&#xff1a;⭐ <main>&#xff1a;⭐ <section>&#xff1a;⭐ <article>&#xff1a;⭐ <aside>&#xff1a;⭐ <footer>&#xff1a;⭐ <figure> 和 &l…

剑指Offer --- 字符串篇

剑指Offer — 字符串篇 — 剑指的题解K神已经写的已经非常详细了&#xff0c;并且Github上开源的电子书目前热度也非常高&#xff0c;这个12天12个模块系列就当作自己的秋招刷题汇总了&#xff0c;欢迎大家交流。 剑指 Offer 05. 替换空格 思路 **(线性扫描) ** O(n) 这个…

登录校验-Filter-详解

目录 执行流程 拦截路径 过滤器链 小结 执行流程 过滤器Filter拦截到请求之后&#xff0c;首先执行方放行之前的逻辑&#xff0c;然后执行放行操作&#xff08;doFilter&#xff09;&#xff0c;然后会访问对应的Web资源&#xff08;对应的Controller类&#xff09;&#…

【图像分类】基于卷积神经网络和主动学习的高光谱图像分类(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

Web应用登录验证的几种方式

一、SessionCookie登录 传统的sessioncookie登录是一种有状态 的登录 1、传统的sessioncookie 流程 浏览器登录发送账号和密码&#xff0c;服务端查找数据库验证用户验证成功后&#xff0c;服务端把用户状态&#xff08;登录状态&#xff0c;角色&#xff0c;权限等信息&…

【python】jupyter notebook导出pdf和pdf不显示中文问题

文章目录 写在前面1. 使用jupyter notebook导出pdf1.1 安装Pandoc1.2 安装MiKTex1.3 示例导出pdf 2. 中文显示问题2.1 显示中文问题示例2.2 解决办法1&#xff1a;修改tex2.3 解决办法2&#xff1a;修改内置文件 写在前面 使用jupyter notebook导出pdf时&#xff0c;出现了一些…

31、springboot 配置HTTP服务端口及如何通过WebServer实例动态获取项目中的HTTP端口

配置HTTP服务端口及如何通过WebServer实例动态获取项目中的HTTP端口 ★ 设置HTTP服务端口&#xff1a; - server.port或者SERVER_PORT环境变量——总结来说&#xff0c;其实就是要配置server.port外部配置属性。▲ 同样遵守如下优先级&#xff1a; 这些都是外部配置源&#x…

Win 11 电脑的 Win + E 快捷键失效

报的错误信息如下&#xff1a; 该文件没有与之关联的应用来执行该操作。请安装应用&#xff0c;若已经安装应用&#xff0c;请在”默认应用设置"页面中创建关联。 报错原因&#xff1a;系统注册表被改写了导致的出错 解决办法&#xff1a; 1、首先&#xff0c;按键盘上…