【Spring连载】使用Spring访问 Apache Kafka(二十一)----提示,技巧和例子

【Spring连载】使用Spring访问 Apache Kafka(二十一)----提示,技巧和例子Tips, Tricks and Examples

  • 一、手动分配所有分区Manually Assigning All Partitions
  • 二、Kafka事务与其他事务管理器的例子Examples of Kafka Transactions with Other Transaction Managers
  • 三、定制 JsonSerializer 和 JsonDeserializer

一、手动分配所有分区Manually Assigning All Partitions

假设你希望始终从所有分区读取所有记录(例如,当使用compacted topic加载分布式缓存时),手动分配分区而不使用Kafka的组管理会很有用。当有很多分区时,这样做可能会很困难,因为必须列出分区。如果分区数量随着时间的推移而变化,这也是一个问题,因为每次分区数量变化时都必须重新编译应用程序。
以下是如何在应用程序启动时使用SpEL表达式的强大功能动态创建分区列表的示例:

@KafkaListener(topicPartitions = @TopicPartition(topic = "compacted",partitions = "#{@finder.partitions('compacted')}"),partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")))
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {...
}@Bean
public PartitionFinder finder(ConsumerFactory<String, String> consumerFactory) {return new PartitionFinder(consumerFactory);
}public static class PartitionFinder {private final ConsumerFactory<String, String> consumerFactory;public PartitionFinder(ConsumerFactory<String, String> consumerFactory) {this.consumerFactory = consumerFactory;}public String[] partitions(String topic) {try (Consumer<String, String> consumer = consumerFactory.createConsumer()) {return consumer.partitionsFor(topic).stream().map(pi -> "" + pi.partition()).toArray(String[]::new);}}}

将其与“ConsumerConfig.AUTO_OFFSET_RESET_CONFIG=earliest”结合使用,将在每次启动应用程序时加载所有记录。你还应该将容器的AckMode设置为MANUAL,以防止容器为null消费者组提交偏移量。但是,从2.5.5版本开始,如上所示,你可以在所有分区应用初始偏移量;有关详细信息,请参阅明确的分区分配。

二、Kafka事务与其他事务管理器的例子Examples of Kafka Transactions with Other Transaction Managers

下面的Spring Boot应用程序是一个数据库和Kafka连锁事务的例子。监听器容器启动Kafka事务,@Transactional注解启动DB事务。首先提交数据库事务;如果Kafka事务提交失败,记录将被重新deliver,因此数据库更新应该是幂等的。

@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}@Beanpublic ApplicationRunner runner(KafkaTemplate<String, String> template) {return args -> template.executeInTransaction(t -> t.send("topic1", "test"));}@Beanpublic DataSourceTransactionManager dstm(DataSource dataSource) {return new DataSourceTransactionManager(dataSource);}@Componentpublic static class Listener {private final JdbcTemplate jdbcTemplate;private final KafkaTemplate<String, String> kafkaTemplate;public Listener(JdbcTemplate jdbcTemplate, KafkaTemplate<String, String> kafkaTemplate) {this.jdbcTemplate = jdbcTemplate;this.kafkaTemplate = kafkaTemplate;}@KafkaListener(id = "group1", topics = "topic1")@Transactional("dstm")public void listen1(String in) {this.kafkaTemplate.send("topic2", in.toUpperCase());this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");}@KafkaListener(id = "group2", topics = "topic2")public void listen2(String in) {System.out.println(in);}}@Beanpublic NewTopic topic1() {return TopicBuilder.name("topic1").build();}@Beanpublic NewTopic topic2() {return TopicBuilder.name("topic2").build();}}
spring.datasource.url=jdbc:mysql://localhost/integration?serverTimezone=UTC
spring.datasource.username=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driverspring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.properties.isolation.level=read_committedspring.kafka.producer.transaction-id-prefix=tx-#logging.level.org.springframework.transaction=trace
#logging.level.org.springframework.kafka.transaction=debug
#logging.level.org.springframework.jdbc=debug
create table mytable (data varchar(20));

对于仅生产者的事务,事务同步将起效:

@Transactional("dstm")
public void someMethod(String in) {this.kafkaTemplate.send("topic2", in.toUpperCase());this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
}

KafkaTemplate会将自己的事务与DB的事务同步,提交/回滚发生在数据库的行为之后。如果你希望首先提交Kafka事务,并且只在Kafka事务成功时提交DB事务,可以使用嵌套的@Transactional方法:

@Transactional("dstm")
public void someMethod(String in) {this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");sendToKafka(in);
}@Transactional("kafkaTransactionManager")
public void sendToKafka(String in) {this.kafkaTemplate.send("topic2", in.toUpperCase());
}

三、定制 JsonSerializer 和 JsonDeserializer

序列化器和反序列化器支持许多使用属性的自定义,请参阅JSON了解更多信息。kafka-clients代码,而不是Spring,会实例化这些对象,除非你将它们直接注入消费者和生产者工厂。如果希望使用属性配置序列化器/反序列化器,但又希望使用自定义的ObjectMapper,只需创建一个子类并将自定义映射器传递给super构造函数。例如:

public class CustomJsonSerializer extends JsonSerializer<Object> {public CustomJsonSerializer() {super(customizedObjectMapper());}private static ObjectMapper customizedObjectMapper() {ObjectMapper mapper = JacksonUtils.enhancedObjectMapper();mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);return mapper;}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/654686.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[m1pro ] ssh: connect to host localhost port 22: Connection refused

在学习Hadoop 的时候&#xff0c;使用 ssh localhost 遇到以下问题 原因&#xff1a; 本地没有打开远程登录 解决办法&#xff1a;打开远程登录 成功结果

C++ 蓝桥杯历届试题 —— 小苹果题目情景及代码

信奥真题情景 【题目描述】 小 Y 的桌子上放着 n 个苹果从左到右排成一列&#xff0c;编号为从 1 到 n。 小苞是小 Y 的好朋友&#xff0c;每天她都会从中拿走一些苹果。 每天在拿的时候&#xff0c;小苞都是从左侧第 1 个苹果开始、每隔 2 个苹果拿走 1 个苹果。 随后小苞…

three.js加载的stl模型的坐标位置(postion)与boundingbox计算的位置不一致怎么办?

three.js加载的stl模型的坐标位置[postion]与boundingbox计算的位置不一致怎么办? 问题原因问题解决运用在项目中 问题原因 在处理Three.js中加载的STL模型时&#xff0c;如果发现模型的坐标位置&#xff08;通过模型的 position 属性获取&#xff09;与通过其 BoundingBox 计…

防御保护---防火墙的用户认证

文章目录 前言一、pandas是什么&#xff1f;二、使用步骤 1.引入库2.读入数据总结 一.用户认证概述 防火墙用户认证是一种安全措施&#xff0c;用于验证和授权网络用户的身份。它是防火墙的一部分&#xff0c;旨在确保只有经过身份验证的用户才能访问网络资源。 防火墙用户认证…

JDK1.8新特性(Day24)

Lambda表达式 介绍 Lambda表达式是一种没有名字的函数,也可称为闭包&#xff0c;是Java 8 发布的最重要新特性。本质上是一段匿名内部类&#xff0c;也可以是一段可以传递的代码。还有叫箭头函数的... 闭包 闭包就是能够读取其他函数内部变量的函数,比如在java中,方法内部的局…

智慧文旅:打造无缝旅游体验的关键

随着科技的快速发展和消费者需求的不断升级&#xff0c;旅游业正面临着前所未有的变革压力。智慧文旅作为数字化转型的重要领域&#xff0c;旨在通过智能化、数据化手段为游客提供更加优质、便捷、个性化的服务&#xff0c;打造无缝的旅游体验。本文将深入探讨智慧文旅在打造无…

第九节HarmonyOS 常用基础组件17-ScrollBar

1、描述 滚动条组件ScrollBar&#xff0c;用于配合可滚动组件使用&#xff0c;如List、Grid、Scroll。 2、接口 可包含子组件 ScrollBar(value:{scroller:Scroller, direction?: ScrollBarDirection, state?: BarState}) 3、参数 参数名 参数类型 必填 描述 scrolle…

R-YOLO

Abstract 提出了一个框架&#xff0c;名为R-YOLO&#xff0c;不需要在恶劣天气下进行注释。考虑到正常天气图像和不利天气图像之间的分布差距&#xff0c;我们的框架由图像翻译网络&#xff08;QTNet&#xff09;和特征校准网络&#xff08;FCNet&#xff09;组成&#xff0c;…

spire.doc合并word文档

文章目录 spire.doc合并word文档1. 引入maven依赖2. 需要合并的word3. 合并文档代码4. 合并结果5. 合并产生段落&#xff0c;table样式混乱问题 spire.doc合并word文档 1. 引入maven依赖 <repositories><repository><id>com.e-iceblue</id><name&g…

CH395Q之CH395Q简介(一)

本节主要介绍以下内容&#xff1a; 1、TCP/IP协议栈是什么&#xff08;了解&#xff09; 2、CH395Q是什么&#xff08;了解&#xff09; 3、CH395Q工作命令&#xff08;熟悉&#xff09; 4、CH395Q & W5500 一、TCP/IP协议栈是什么 是一系列网络协议的总和&#xff0…

C语言标准的输入输出

目录 1. 格式化输入输出 2. 控制字符串长度 3. 混合格式化输出 4. 格式化浮点数 5. 格式化日期和时间 在C语言编程中&#xff0c;输入输出格式非常重要&#xff0c;它决定了程序如何向用户展示数据以及如何从用户接收数据。本篇博客将介绍C语言输入输出格式的一些基本概念…

阿里云混合云事业部更名为政企事业部;香港法院向中国恒大发出清盘令;抖音将与周星驰合作开发微短剧;特朗普被指提及对华进口商品加征60%关税

今日精选 • 阿里云混合云事业部更名为政企事业部• 香港法院向中国恒大发出清盘令• 抖音将与周星驰合作开发微短剧• 特朗普被指提及对华进口商品加征60%关税 科技动态 • 33岁女作家用AI写《东京共鸣塔》 获顶级文学奖• 科大讯飞星火座舱荣获第四届《中国汽车风云盛典》…

17. 使用 tslib 库

17. 使用 tslib 库 1. tslib 简介2. tslib 移植2.1 下载 tslib 源码2.2 编译 tslib 源码2.3 tslib 安装目录下的文件夹介绍2.4 在开发板上测试 tslib 3. tslib 库函数介绍3.1 打开触摸屏设备3.2 配置触摸屏设备3.3 读取触摸屏设备 4. 基于 tslib 编写触摸屏应用程序4.1 单点触摸…

打破静态安全扫描工具分析结果孤岛的桥梁-SARIF 详解

目录 SARIF 的产生背景 SARIF 的结构 SARIF 的优势 SARIF 的应用场景 小结 在软件开发过程中&#xff0c;静态分析是保障代码质量的一个重要方法。静态分析工具可以在不运行程序的情况下检查源代码&#xff0c;发现代码中的质量和安全问题。然而不同的静态分析工具可能会产…

强化学习-google football 实验记录

google football 实验记录 1. gru模型和dense模型对比实验 实验场景&#xff1a;5v5(控制蓝方一名激活球员)&#xff0c;跳4帧&#xff0c;即每个动作执行4次 实验点&#xff1a; 修复dense奖励后智能体训练效果能否符合预期 实验目的&#xff1a; 对比gru 长度为16 和 dens…

Mysql-存储引擎-InnoDB

数据文件 下面这条SQL语句执行的时候指定了ENGINE InnoDB存储引擎为InnoDB: CREATE TABLE tb_album (id bigint(20) NOT NULL AUTO_INCREMENT COMMENT 编号,title varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT 相册名称,image varc…

Flink Checkpoint 超时问题详解

第一种、计算量大&#xff0c;CPU密集性&#xff0c;导致TM内线程一直在processElement&#xff0c;而没有时间做CP【过滤掉部分数据&#xff1b;增大并行度】 代表性作业为算法指标-用户偏好的计算&#xff0c;需要对用户在商城的曝光、点击、订单、出价、上下滑等所有事件进…

给信息安全专业想做网络安全方面的人一些忠告

别一直打CTF 打CTF是为了打基础&#xff0c;大概知道一些基础就出来吧&#xff0c;千万不要一直打下去出不来了。简历上实习经历&#xff0c;项目经历以及漏洞成果才能构成一个不错的背景&#xff0c;只有ctf比赛会很尴尬。要知道有些人是py打比赛&#xff0c;面试官知道情况&…

小迪安全24WEB 攻防-通用漏洞SQL 注入MYSQL 跨库ACCESS 偏移

#知识点&#xff1a; 1、脚本代码与数据库前置知识 2、Access 数据库注入-简易&偏移 3、MYSQL 数据库注入-简易&权限跨库 #前置知识&#xff1a; -SQL 注入漏洞产生原理分析 -SQL 注入漏洞危害利用分析 -脚本代码与数据库操作流程 -数据库名&#xff0c…

探索设计模式的魅力:深入了解适配器模式-优雅地解决接口不匹配问题

设计模式专栏&#xff1a;http://t.csdnimg.cn/nolNS 目录 一、引言 1. 概述 2. 为什么需要适配器模式 3. 本文的目的和结构 二、简价 1. 适配器模式的定义和特点 定义 特点 2. 适配器模式的作用和适用场景 作用 适用场景 3. 适配器模式与其他设计模式的比较 三、适配…