kafka-生产者事务-数据传递语义事务介绍事务消息发送(SpringBoot整合Kafka)

文章目录

  • 1、kafka数据传递语义
  • 2、kafka生产者事务
  • 3、事务消息发送
    • 3.1、application.yml配置
    • 3.2、创建生产者监听器
    • 3.3、创建生产者拦截器
    • 3.4、发送消息测试
    • 3.5、使用Java代码创建主题分区副本
    • 3.6、屏蔽 kafka debug 日志 logback.xml
    • 3.7、引入spring-kafka依赖
    • 3.8、控制台日志

1、kafka数据传递语义

kafka发送消息时是否需要重试

  1. 仅发送一次:生产者发送消息后不重试,只发送一次 可能丢失消息 效率最高
  2. 至少一次:生产者发送消息后重试,可能重试多次 效率差
  3. 精准一次发送:生产者发送消息后无论是否重复发送 发送了多少次,在 kafka broker 中只保存一次消息,通过幂等性 + 生产者事务来实现

kafak天然支持幂等性,每个消息头中带了一个唯一的标志 kafka broker 根据此标志判断消息是否已经发送过,生产者事务可以保证数据没有最终发送成功时,消费者不可以消费,如果生产者发送消息时出现异常会自动回滚(清除之前发送的事务中的消息)

kafka天然幂等性:但是指的是生产者事务 生产消息时的幂等性,发送消息时消息中带唯一标识、broker接收到消息时如果重复不再保存,事务没提交消费者不能消费改消息

2、kafka生产者事务

保证消息生产的幂等性
一组消息要么一起成功 被消费者消息 要么一起失败都不能被消费者消费

  1. 配置ack为-1 分区所有副本均落盘成功
  2. 配置生产者重试(发送失败可以继续发送:需要保证发送失败后再次发送消息到kafka实现 精准一次发送)
  3. 需要给事务分配事务id(区分一个事务中的多条消息)

3、事务消息发送

3.1、application.yml配置

server:port: 8110# v1
spring:kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097producer: # producer 生产者retries: 1 # 重试次数 0表示不重试acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、-1/all)transaction-id-prefix: tx_  # 事务id前缀:配置后producer自动开启事务batch-size: 16384 # 批次大小 单位bytebuffer-memory: 33554432 # 生产者缓冲区大小 单位bytekey-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器

3.2、创建生产者监听器

package com.atguigu.kafka.listener;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
@Component
public class MyKafkaProducerListener implements ProducerListener<String,String> {//生产者 ack 配置为 0 只要发送即成功//ack为 1  leader落盘  broker ack之后 才成功//ack为 -1 分区所有副本全部落盘  broker ack之后 才成功@Overridepublic void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {//ProducerListener.super.onSuccess(producerRecord, recordMetadata);System.out.println("MyKafkaProducerListener消息发送成功:"+"topic="+producerRecord.topic()+",partition = "+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value()+",offset = "+recordMetadata.offset());}//消息发送失败的回调:监听器可以接收到发送失败的消息 可以记录失败的消息@Overridepublic void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {System.out.println("MyKafkaProducerListener消息发送失败:"+"topic="+producerRecord.topic()+",partition = "+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value()+",offset = "+recordMetadata.offset());System.out.println("异常信息:" + exception.getMessage());}
}

3.3、创建生产者拦截器

package com.atguigu.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;
import java.util.Map;
//拦截器必须手动注册给kafka生产者(KafkaTemplate)
@Component
public class MyKafkaInterceptor implements ProducerInterceptor<String,String> {//kafka生产者发送消息前执行:拦截发送的消息预处理@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {System.out.println("生产者即将发送消息:topic = "+ producerRecord.topic()+",partition:"+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value());return null;}//kafka broker 给出应答后执行@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {//exception为空表示消息发送成功if(e == null){System.out.println("消息发送成功:topic = "+ recordMetadata.topic()+",partition:"+recordMetadata.partition()+",offset="+recordMetadata.offset()+",timestamp="+recordMetadata.timestamp());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

3.4、发送消息测试

package com.atguigu.kafka.producer;import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;@SpringBootTest
class KafkaProducerApplicationTests {//装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中@ResourceKafkaTemplate kafkaTemplate;@ResourceMyKafkaInterceptor myKafkaInterceptor;@PostConstructpublic void init() {kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);}@Testvoid contextLoads() throws IOException {kafkaTemplate.send("my_topic1", "spring-kafka-生产者监听器");//回调是等kafka,ack以后才执行,需要阻塞System.in.read();}//kafka事务支持spring-tx的事务注解//单元测试中的事务会自动回滚@Testvoid testTransaction() throws  IOException {//多个消息的发送在一个事务中执行kafkaTemplate.executeInTransaction((var1) -> {//通过一个事务中的operations对象来发送消息,执行事务操作var1.send("my_topic1",0,"", "spring-kafka-事务1");var1.send("my_topic1",0,"", "spring-kafka-事务2");int i = 1/0;var1.send("my_topic1",0,"", "spring-kafka-事务3");return "发送消息失败";});System.in.read();}
}

3.5、使用Java代码创建主题分区副本

package com.atguigu.kafka.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Component;
@Component
public class KafkaTopicConfig {@Beanpublic NewTopic myTopic1() {//相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)return TopicBuilder.name("my_topic1")//主题名称.partitions(3)//主题分区.replicas(3)//主题分区副本数.build();//创建}
}

3.6、屏蔽 kafka debug 日志 logback.xml

<configuration>      <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>

3.7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu.kafka</groupId><artifactId>kafka-producer</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka-producer</name><description>kafka-producer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

3.8、控制台日志

生产者即将发送消息:topic = my_topic1,partition:0,key = ,value = spring-kafka-事务1
生产者即将发送消息:topic = my_topic1,partition:0,key = ,value = spring-kafka-事务2
MyKafkaProducerListener消息发送失败:topic=my_topic1,partition = 0,key = ,value = spring-kafka-事务1,offset = -1
异常信息:Failing batch since transaction was aborted
MyKafkaProducerListener消息发送失败:topic=my_topic1,partition = 0,key = ,value = spring-kafka-事务2,offset = -1
异常信息:Failing batch since transaction was abortedjava.lang.ArithmeticException: / by zero

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/25192.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于烫烫烫和屯屯屯

微较的msvc编译器&#xff0c;调试模式下为了方便检测内存的非法访问&#xff0c;对于不同的内存做了初始化&#xff0c; 未初始化栈&#xff1a; 0xCCCCCCCC 未初始化堆&#xff1a; 0xCDCDCDCD 已释放的堆&#xff1a; 0xDDDDDDDD 0xCCCC解释为GB2312字符即是烫&#xff…

Django 鸡与蛋问题

"Django 的鸡与蛋问题"通常指的是在开始 Django 项目时&#xff0c;你可能会遇到的一个困境&#xff1a;是先设计数据库模型还是先编写视图和控制器&#xff08;即视图函数&#xff09;&#xff1f; 这个问题的实质是在于&#xff0c;Django 的核心部分是由数据库模…

Qt5/6使用SqlServer用户连接操作SqlServer数据库

网上下载SQLServer2022express版数据库,这里没啥可说的,随你喜欢,也可以下载Develop版本。安装完后,我们可以直接连接尝试, 不过一般来说,还是下载SQLServer管理工具来连接数据更加方便。 所以直接下载ssms, 我在用的时候,一开始只能用Windows身份登录。 所以首先,我…

入门matlab

常识 如何建一个新文件 创建新文件&#xff0c;点击新建&#xff0c;我们就可以开始写代码了 为什么要在代码开头加入clear 假如我们有2个文件&#xff0c;第一个文件里面给x赋值100&#xff0c;第二个文件为输出x 依次运行&#xff1a; 结果输出100&#xff0c;这是因为它们…

ChatGPT Prompt技术全攻略-精通篇:Prompt工程技术的高级应用

系列篇章&#x1f4a5; No.文章1ChatGPT Prompt技术全攻略-入门篇&#xff1a;AI提示工程基础2ChatGPT Prompt技术全攻略-进阶篇&#xff1a;深入Prompt工程技术3ChatGPT Prompt技术全攻略-高级篇&#xff1a;掌握高级Prompt工程技术4ChatGPT Prompt技术全攻略-应用篇&#xf…

电脑缺失msvcp110.dll文件的解决方法,总结5种靠谱的方法

在计算机使用过程中&#xff0c;我们可能会遇到一些错误提示&#xff0c;其中之一就是“找不到msvcp110.dll”。这个错误提示通常出现在运行某些软件时&#xff0c;那么&#xff0c;它究竟会造成哪些问题呢&#xff1f; 一&#xff0c;msvcp110.dll文件概述 msvcp110.dll是Mic…

推荐云盘哪个好,各有各的优势

选择合适的云盘服务是确保数据安全、便捷分享和高效协作的关键。下面将从多个维度对目前主流的云盘服务进行详细的对比和分析&#xff1a; 速度性能 百度网盘青春版&#xff1a;根据测试&#xff0c;其上传和下载确实不限速&#xff0c;但主要定位是办公人群&#xff0c;适用于…

STM32F103C8T6 HAL库 USART1 DMA方式接收数据

前言&#xff1a; 前面的两篇文章都说关于发送的&#xff0c;HAL库发送数据可以调用现成的函数&#xff0c;而接收数据&#xff0c;现成函数不太好用。这里为了记录了一下自己参考了网上几个大佬的代码&#xff0c;整理了一下USART1 DMA方式接受数据的代码&#xff0c;…

Elasticsearch 认证模拟题 - 17

这两道题目非常具有代表性&#xff0c;分别是跨集群复制和跨集群检索&#xff0c;需要相应的 许可 这里在虚拟机上搭建集群完成这两道题目&#xff0c;这里补充一下 elasticsearch 和 kibana 的配置文件 # elasticsearch.yml cluster.name: cluster2 node.name: cluster2-node…

Linux之文件操作

目录 第1关&#xff1a;文件的创建 任务描述 相关知识 文件的创建 编程要求 答案&#xff1a; 第2关&#xff1a;文件打开与关闭 任务描述 相关知识 文件的打开 文件的关闭 编程要求 答案&#xff1a; 第3关&#xff1a;文件读写操作 任务描述 相关知识 文件的写操作 文件的读…

【Redis学习笔记05】Jedis客户端(中)

Jedis客户端 1. 命令 1.1 String类型 1.1.1 常见命令 SET命令 语法&#xff1a;SET key value [EX seconds | PX milliseconds] [NX|XX] 说明&#xff1a;将string类型的value值设置到指定key中&#xff0c;如果之前该key存在&#xff0c;则会覆盖原先的值&#xff0c;原先…

前端计网面试题(二)

一、在浏览器中输入url并且按下回车之后发生了什么&#xff1f; 首先解析url&#xff0c;判断url是否合法&#xff0c;如果合法再判断是否完整。如果不合法&#xff0c;则使用用户默认的搜索引擎进行搜索。DNS域名解析获取URL对应的ip地址。&#xff08;首先看本地是否有缓存&…

HTML开发 Vue2.x + Element-UI 动态生成表单项并添加表单校验

基于vue2.x 和element-ui 动态生成表单项并添加表单校验&#xff1b; 1、需求问题 如下图&#xff0c;项目有个需求&#xff0c;点击添加按钮&#xff0c;新增一行设备信息&#xff0c;且每项信息必填&#xff1b; 2、代码 看到这个需求&#xff0c;首先想到要使用v-for的形…

使用 flask + qwen 实现 txt2sql 流式输出

前言 一般的大模型提供的 api 都是在提问之后过很久才会返回对话内容&#xff0c;可能要耗时在 3 秒以上了&#xff0c;如果是复杂的问题&#xff0c;大模型在理解和推理的耗时会更长&#xff0c;这种展示结果的方式对于用户体验是很差的。 其实大模型也是可以进行流式输出&a…

Vue3 一 快速启动基于Vite 创建项目

编码规范 TypeScript 组合式API setup语法糖 基于Vite 创建项目 WinR输入 CMD 回车后打开CMD命令行 已安装 18.3以上版本的NodeJS,js(安装) 我们用 NPM 方式安装 输入命令npm create vuelatest PS D:\WORK\NodeJS> npm create vuelatest Need to install the following …

《软件定义安全》之一:SDN和NFV:下一代网络的变革

第1章 SDN和NFV&#xff1a;下一代网络的变革 1.什么是SDN和NFV 1.1 SDN/NFV的体系结构 SDN SDN的体系结构可以分为3层&#xff1a; 基础设施层由经过资源抽象的网络设备组成&#xff0c;仅实现网络转发等数据平面的功能&#xff0c;不包含或仅包含有限的控制平面的功能。…

Python语言读取图像

import cv2 import numpy as np width 640 # 图像宽度height 480 # 图像高度channels 3 # 颜色通道数imgEmpty np.empty((height, width, channels), np.uint8) # 创建空白数组imgBlack np.zeros((height, width, channels), np.uint8) # 创建黑色图像 RGB0imgWhite …

STM32 uc/OS-III多任务程序

目录 一、项目创建 二、代码移植 1、uC/OS-III源码处理 2、KEIL文件配置 ​编辑3、文件修改 启动文件 ​编辑app_cfg.h includes.h bsp.c和bsp.h main.c lib_ cfg.h app.c和app.h 三、总结 学习目标&#xff1a; 学习嵌入式实时操作系统&#xff08;RTOS&#xf…

覆盖路径规划经典算法 The Boustrophedon Cellular Decomposition 论文及代码详解

2000年一篇论文 Coverage of Known Spaces: The Boustrophedon Cellular Decomposition 横空出世&#xff0c;解决了很多计算机和机器人领域的覆盖路径问题&#xff0c;今天我来详细解读这个算法。 The Boustrophedon Cellular Decomposition 算法详解 这篇论文标题为"C…

办理公司诉讼记录删除行政处罚记录删除

企业行政处罚记录是可以做到撤销消除的&#xff0c;一直被大多数企业忽略&#xff0c;如果相关诉讼记录得不到及时删除&#xff0c;不仅影响企业招投标&#xff0c;还影响企业的贷款申请&#xff0c;严重的让企业资金链断裂&#xff0c;影响企业长远发展和企业形象。行政处罚是…