Kafka之Producer原理

1. 生产者发送消息源码分析

public class  SimpleProducer {public static void main(String[] args) {Properties pros=new Properties();pros.put("bootstrap.servers","192.168.8.144:9092,192.168.8.145:9092,192.168.8.146:9092");
//        pros.put("bootstrap.servers","192.168.8.147:9092");pros.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");pros.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 0 发出去就确认 | 1 leader 落盘就确认| all(-1) 所有Follower同步完才确认pros.put("acks","1");// 异常自动重试次数pros.put("retries",3);// 多少条数据发送一次,默认16Kpros.put("batch.size",16384);// 批量发送的等待时间pros.put("linger.ms",5);// 客户端缓冲区大小,默认32M,满了也会触发消息发送pros.put("buffer.memory",33554432);// 获取元数据时生产者的阻塞时间,超时后抛出异常pros.put("max.block.ms",3000);// 创建Sender线程Producer<String,String> producer = new KafkaProducer<String,String>(pros);for (int i =0 ;i<1000000;i++) {producer.send(new ProducerRecord<String,String>("mytopic",Integer.toString(i),Integer.toString(i)));// System.out.println("发送:"+i);}//producer.send(new ProducerRecord<String,String>("mytopic","1","1"));//producer.send(new ProducerRecord<String,String>("mytopic","2","2"));producer.close();}

a. 首先我们是创建一些kafka的连接配置以及参数配置,然后先new出来一个生产者,创建一个sender线程,由下图源码可以看出,我们在new生产者的时候,kafak会帮我们船舰一个sender线程,并进行了命名和启动

 b.随后我们的main线程中,进行批量send发送,那么接下来我们看下send方法

可以看到,在send方法中,还有一个interceptors做了一个拦截器的处理, 那么拦截器应该怎么使用的呢?

我们只需要实现ProducerInterceptor中的onsend方法,并且在kafka send消息前进行配置就可以了

public class ChargingInterceptor implements ProducerInterceptor<String, String> {// 发送消息的时候触发@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {System.out.println("1分钱1条消息,不管那么多反正先扣钱");return record;
}

带有拦截器的kafka demo

 public static void main(String[] args) {Properties props=new Properties();props.put("bootstrap.servers","192.168.8.144:9092,192.168.8.145:9092,192.168.8.146:9092");
//        props.put("bootstrap.servers","192.168.8.147:9092");props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 0 发出去就确认 | 1 leader 落盘就确认| all 所有Follower同步完才确认props.put("acks","1");// 异常自动重试次数props.put("retries",3);// 多少条数据发送一次,默认16Kprops.put("batch.size",16384);// 批量发送的等待时间props.put("linger.ms",5);// 客户端缓冲区大小,默认32M,满了也会触发消息发送props.put("buffer.memory",33554432);// 获取元数据时生产者的阻塞时间,超时后抛出异常props.put("max.block.ms",3000);// 添加拦截器List<String> interceptors = new ArrayList<>();interceptors.add("com.zsc.mq.kafka.javaapi.interceptor.ChargingInterceptor");// 这个键就是拦截器的配置,因为拦截器是个list,因此可以实现多个拦截器props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);Producer<String,String> producer = new KafkaProducer<String,String>(props);producer.send(new ProducerRecord<String,String>("mytopic","1","1"));producer.send(new ProducerRecord<String,String>("mytopic","2","2"));producer.close();}}

 c.  send方法走完拦截器后,我们进入到dosend方法中,接着看

 

可以看到,kafka对我们的消息进行了一个序列化,那么序列化方式就是在我们初始配置参数的时候进行配置的,可以指定不同的序列化方式,并且也可以自定义序列化方式,实现序列化接口,增加到配置类中即可

d. 看完序列化,我们的消息发送接着往下面走, 进入到分区器流程

 

 

 由上面可知,我们的分区器如果指定了分区就会走我们指定的分区;消息没有指定分区但是自定义了消息分区器,就会走到消息分区器中,自定义消息器代码如下(实现partitioer接口即可):

public class SimplePartitioner implements Partitioner {public SimplePartitioner() {}@Overridepublic void configure(Map<String, ?> configs) {}@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String k = (String) key;System.out.println(k);if (Integer.parseInt(k) % 2 == 0){return 0;}else{return 1;}// return Integer.parseInt(k)%2;}@Overridepublic void close() {}

 还有第三个情况,既没有指定也没有自定义分区器。那么key不为空,那就是走hash取模算法;key也会空的话,就是采用粘连策略(根据topic来确定在哪里存储)

e. 当我们消息的分区器走完之后,就进入到我们的累加器,在上篇博客MQ之初识kafka-CSDN博客我们介绍组件的时候就提到过,kafka为了提升高吞吐,查询效率快,消息并不是堆积在一起的,而是一批一批去放的,因此经过一个累加器。

可以看到 按批次添加到累加器中,那么添加到累加器之后,是怎么触发流程的呢?

f . 顺着源码再往下看,可以看到一个判断,当累加器的批次满了的话或者是刚创建的批次,就会去唤醒sender线程,向Broker中发送消息。

生产者发送消息的整个流程图如下所示:

 

2. ACK应答机制与ISR机制

2.1 服务器端响应策略的必要性

如图所示,我们正常的执行流程是生产者producer向leader中发送消息,然后leader同步到两个follower副本中,那么当发送消息的过程中服务异常的话,我们的leader就接收不到消息了,因此需要一个应答机制来保证我们能够接收到消息,如果leader没有接收到消息,就触发重发机制,让producer重新发送消息给leader

 2.2 ACK应答机制

kafka中提供了三种可靠性级别,可以根据对可靠性和延迟性的要求进行选择

1.acks = 0  producer 不等待 broker动作、直接返回ack
2.acks = 1(默认) producer等待broker 动作、 leader 落盘成功、返回 ack
3.acks=-1(all)  producer 等待 broker 动作、 leader&follower 全部落盘成功、返回 ack

 props.put("ack","0");  不等待ACK

这种情况是说我们的producer发送消息给leader,leader异步返回ack给peoducer告诉消息已经发送成功了,这种正常存储的情况下肯定是没有问题的。但是如果还没有同步副本的情况下,我们的leader此时挂掉了,而producer已经收到了应答,因此不会再重发消息。当再次重启leader所在的服务器时,数据就丢失了

props.put("ack","1");  Leader落盘、返回ACK(默认)

这种下,我们peoducer确定了leader已经落盘了,但是如果极端情况下,leader还没有同步副本给follower,那么此时leader服务器挂了,数据是不是也就丢失了,因为也还没有进行备份

 props.put("ack","-1");  Leader和全部Follower落盘、返回ACK

这种情况是我们的producer等待leader和follower全部落盘成功后,进行ack响应,这种策略的可靠性最高,但是吞吐量是最低的,因此要根据具体业务具体配置。那么这种策略是不是就没有什么问题了呢?当然也有,比如当leader和follower都落盘后,再返回应答信号时,leader挂了,那么peoducer没有收到消息,就会任务leader没有接收到消息,还是会对消息进行重发,那这个问题怎么解决呢? 可以用消息幂等性(在第三章进行赘述说明)

 应答异常

如上图,当一个flower挂了的情况下,是不是我们的leader就没法同步了,没法同步,就会造成整个链路的阻塞,peoducer没收到应答信息还啥也不知道,又往leader发消息,如果这样持续下去,服务是不是就该崩了,因此引入了一个ISR机制。

2.3 ISR机制

 ISR是一组动态维护的同步副本集合,它的作用就是把leader和follower同时放到一个ISR队列中,比如上面的P0_R0挂掉了,同步不积极,那么就把它移除ISR队列,默认为30s,可以经过replica.lag.time.max.ms进行配置,当ISR中的队列都同步完了的话,就返回ACK应答信号

AR = ISR+OSR

3. 消息幂等性

发送消息情形-1: 正常发送

发送消息情形-2:消息发送失败,触发消息重发,造成消息重发写入

 

 发送消息情形-3:消息发送失败,触发消息重发,消息不重复写入

如上图所示,是怎么保证消息不被重复写入的呢?利用幂等性,在发送消息的时候新增两个参数PID与Sequence Number分别代表生产者ID和消息的编码,那么Broker存储的时候也会多加一点空间存储这两个值,当ack应答异常时,再次重发消息到队列中时,就会进行一次判断a.如果PID和sequence Number都相等,则消息写入队列失败,b.如果Sequence Number为1 则顺序写入 c.如果Sequence Number为2,则抛出异常,表示数据有丢失

幂等性生产者发送消息流程总结:

1 Producer 端发送消息(消息本身、 PID Sequence Number
2 Broker 端接收到消息(将消息和 PID Sequence Number 一起保存)
3 、若 ACK 响应失败,生产者重试,再次发送消息

 kafka是在Broker端完成的去重处理

4. Kafka生产者事务

生产者的幂等性只能保证在单分区单会话的场景下有效,因此对于多分区来说,kafka事务就提供了对多个分区写入操作的原子性。但是kafka事务的前提是开启幂等性。

kafka事务API的相关方法

initTransactions() 初始化事务
beginTransaction() 开启事务
commitTransaction() 提交事务
abortTransaction() 中止事务
sendOffsetsToTransaction()

事务的一个demo

    public static void main(String[] args) {Properties props=new Properties();//props.put("bootstrap.servers","192.168.8.144:9092,192.168.8.145:9092,192.168.8.146:9092");props.put("bootstrap.servers","192.168.8.147:9092");props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 0 发出去就确认 | 1 leader 落盘就确认| all或-1 所有Follower同步完才确认props.put("acks","all");// 异常自动重试次数props.put("retries",3);// 多少条数据发送一次,默认16Kprops.put("batch.size",16384);// 批量发送的等待时间props.put("linger.ms",5);// 客户端缓冲区大小,默认32M,满了也会触发消息发送props.put("buffer.memory",33554432);// 获取元数据时生产者的阻塞时间,超时后抛出异常props.put("max.block.ms",3000);props.put("enable.idempotence",true);// 事务ID,唯一props.put("transactional.id", UUID.randomUUID().toString());Producer<String,String> producer = new KafkaProducer<String,String>(props);// 初始化事务producer.initTransactions();try {producer.beginTransaction();producer.send(new ProducerRecord<String,String>("transaction-test","1","1"));producer.send(new ProducerRecord<String,String>("transaction-test","2","2"));
//            Integer i = 1/0;producer.send(new ProducerRecord<String,String>("transaction-test","3","3"));// 提交事务producer.commitTransaction();} catch (KafkaException e) {// 中止事务producer.abortTransaction();}producer.close();}
}

kafka事务操作的基本流程

最后标记消费状态后,就可以进行消费了

 kafka的事务细节流程:

5. 总结

        本文主要是介绍了kafka生产者端的一些原理,先是从源码出发,介绍了生产者发送消息到Broker经历的一系列过程:先是创建了一个sender线程,然后在发送消息的过程中一次经过拦截器、累加器、分区器最后根据分区的批量消息是否新建或者满了来触发sender线程发送到Broker服务器中。随后我们介绍了,peoducer跟broker服务器之间的交互采用的是应答机制,在这里有3种配置,可根据业务需要来具体配置,当配置-1的时候,我们分析了为什么会出现重发消息的问题,通过幂等性来保证,follower从节点挂了的情况下,应答异常,采用ISR队列机制进行避免。但是幂等性只能保证单分区单会话的场景,而针对多分区的情况下,kafka主要是采用分布式事务来解决,利用分布式ID,事务coordinattor和事务日志分二PC提交,并且对事务的状态进行存储标记,当事务的状态更改为可消费的时候,才会进行消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/23893.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OceanBase v4.2 特性解析:Lateral Derived Table 优化查询

前言 从传统规则来看&#xff0c;内联视图通常不允许引用在同一FROM子句中前面定义的表的列。但从OceanBase 4.2.2版本开始&#xff0c;这一限制得到了突破&#xff0c;允许内联视图作为Lateral Derived Table来定义&#xff0c;从而允许此类引用。Lateral Derived Table的语法…

26-LINUX--I/O复用-select

一.I/O复用概述 /O复用使得多个程序能够同时监听多个文件描述符&#xff0c;对提高程序的性能有很大帮助。以下情况适用于I/O复用技术&#xff1a; ◼ TCP 服务器同时要处理监听套接字和连接套接字。 ◼ 服务器要同时处理 TCP 请求和 UDP 请求。 ◼ 程序要同时处理多个套接…

Facebook广告素材如何测试?手把手教你!

广告素材对Facebook广告效果的影响是很大的&#xff0c;用对了素材&#xff0c;Facebook广告的价值就越高。广告主们通常会先通过广告测试&#xff0c;根据数据反馈来挑选素材。今天就手把手教你做Facebook素材测试的技巧&#xff0c;让你更有灵感和思路&#xff01; 创意测试 …

Hudi CLI 安装配置总结

前言 上篇文章 总结了Spark SQL Rollback, Hudi CLI 也能实现 Rollback,本文总结下 Hudi CLI 安装配置以及遇到的问题。 官方文档 https://hudi.apache.org/cn/docs/cli/ 版本 Hudi 0.13.0(发现有bug)、(然后升级)0.14.1Spark 3.2.3打包 mvn clean package -DskipTes…

使用 Django 构建动态网页

文章目录 创建 Django 项目和应用程序创建 HTML 模板创建视图函数配置 URL 路由运行 Django 服务器使用 Django 模板语言 Django 是一个流行的 Python Web 框架&#xff0c;它能够帮助开发人员快速构建强大的 Web 应用程序。在 Django 中&#xff0c;HTML 是用于呈现网页内容的…

Spring Boot 复习

2 3 5&#xff08;不考&#xff09; 9 (1)RestController 注解是一个组合注解&#xff0c;等同于Controller 和ResponseBody 两个注解结合使用的效果。主要作用是将当前类作为控制层的组件添加到 Spring 容器中&#xff0c;同时该类的方法无法返回 JSP 页面&#xff0c;而且…

Flutter 中的 RenderObjectToWidgetAdapter 小部件:全面指南

Flutter 中的 RenderObjectToWidgetAdapter 小部件&#xff1a;全面指南 Flutter 是一个功能强大的 UI 框架&#xff0c;由 Google 开发&#xff0c;允许开发者使用 Dart 语言构建跨平台的移动、Web 和桌面应用。在 Flutter 的渲染体系中&#xff0c;RenderObjectToWidgetAdap…

MyBatis面试题系列三

1、#{}和${}的区别是什么&#xff1f; #{}是预编译处理&#xff0c;${}是字符串替换。 Mybatis 在处理#{}时&#xff0c;会将 sql 中的#{}替换为?号&#xff0c;调用 PreparedStatement 的 set 方法来赋值&#xff1b; Mybatis 在处理${}时&#xff0c;就是把${}替换成变量的值…

SpringBoot项目启动时“jar中没有主清单属性”异常

资料参考 Spring Boot 启动时 “jar中没有主清单属性” 异常 - spring 中文网 (springdoc.cn) 实际解决 更详细的参考以上&#xff0c;我这边的话只需要在 pom文件 中加上 spring-boot-maven-plugin 插件就能解决该异常&#xff0c;具体如下&#xff1a; <build><p…

1. 计算机系统概述

1. 计算机系统概述 文章目录 1. 计算机系统概述1.1 计算机的发展硬件的发展软件的发展 1.2.1 计算机硬件的基本组成早期冯诺依曼的结构现代计算机的结构 1.2.2 各个硬件的工作原理主存储器运算器控制器计算机工作过程 1.2.3 计算机系统的多级层次结构1.3 计算机的性能指标存储器…

GD32如何配置中断优先级分组以及中断优先级

使用GD32 MCU的过程中&#xff0c;大家可能会有以下疑问&#xff1a;中断优先级如何配置和使用&#xff1f; 本文将会为大家解析中断优先级分组以及中断优先级的配置使用&#xff1a; 中断优先级分组配置 一个GD32 MCU系统需要大家明确系统中使用的中断优先级分组&#xff0…

代驾公司在市场竞争中如何保持优势?

在竞争激烈的市场中&#xff0c;代驾公司可以通过多种策略保持其竞争优势&#xff0c;包括利用市场潜力、创新服务模式、提高服务效率以及加强品牌建设等。以下是具体的策略&#xff1a; 利用市场潜力 汽车产业空间巨大&#xff1a;随着汽车保有量的增加&#xff0c;代驾行业…

扫地机器人:卷价格,不如卷技术

扫地机器人内卷的终点是技术和价值&#xff0c;价格只是附属品。 一路上涨的价格&#xff0c;一路下跌的销量 从价格飙升&#xff0c;到重新卷回价格&#xff0c;尴尬的背后是扫地机器人在骨感现实下的无奈抉择。 根据数据显示&#xff0c;2020中国扫地机器人线上市场零售均价…

通过可识别性和深度学习重建大脑功能网络

摘要 本研究提出了一种新的方法来重建代表大脑动力学的功能网络&#xff0c;该方法基于两个脑区在同一认知任务中的共同参与会导致其可识别性或其动力学特性降低的观点。这种可识别性是通过深度学习模型在监督分类任务中获得的分数来估计的&#xff0c;因此不需要对这种协同参…

零、测试开发前置知识

文章目录 1、什么是冒烟测试、回归测试&#xff1f;2、设计测试用例的方法有哪些&#xff1f;3、对于404或500&#xff0c;你会如何分析定位&#xff1f;4、什么是敏捷开发&#xff1f;敏捷开发流程是怎么样的&#xff1f;5、做接口测试过程中&#xff0c;下游接口需要上游数据…

Flink端到端的精确一次(Exactly-Once)

目录 状态一致性 端到端的状态一致性 端到端精确一次&#xff08;End-To-End Exactly-Once&#xff09; Flink内部的Exactly-Once 输入端保证 输出端保证 幂等写入 事务写入 Flink和Kafka连接时的精确一次保证 整体介绍 需要的配置 案例 状态一致性 流式计算本身就…

Java工作学习笔记

1、ConfigurationProperties注解是什么意思&#xff1f; ConfigurationProperties 可以将属性文件与一个Java类绑定&#xff0c;将属性文件中的变量值注入到该Java类的成员变量中 示例代码&#xff1a; /*** SSP配置** author mua*/ Component Data ConfigurationProperties…

如何提高接口响应速度

在非大数据&#xff08;几万以上记录&#xff09;的情况下&#xff0c;影响接口响应速度的因素中最大的是查询数据库的次数&#xff0c;其次才是数组遍历和简单数据处理&#xff08;如根据已有字段增加新的属性&#xff0c;或计算值&#xff09;。 一般一次数据库查询需要50毫秒…

Java Web应用,IPv6问题解决

在Java Web程序中&#xff0c;如果使用Tomcat并遇到了IPv6相关的问题&#xff0c;可以通过以下几种方式来解决&#xff1a; 1. 配置Tomcat以使用IPv4 默认情况下&#xff0c;Java可能会优先使用IPv6。如果你希望Tomcat使用IPv4&#xff0c;最简单的方法是通过设置系统属性来强…

无线麦克风哪个牌子性价比高?一文告诉你无线领夹麦克风怎么挑选

​当我们谈论到演讲、表演或者录制视频时&#xff0c;一个高质量的无线麦克风能够使得整个体验提升至一个全新的水平。它不仅能够保证声音的清晰度和真实度&#xff0c;还能够让使用者在演讲或者表演时更加自信和舒适。基于对市场的深入研究和用户体验的考量&#xff0c;我挑选…