Kafka之Producer原理

1. 生产者发送消息源码分析

public class  SimpleProducer {public static void main(String[] args) {Properties pros=new Properties();pros.put("bootstrap.servers","192.168.8.144:9092,192.168.8.145:9092,192.168.8.146:9092");
//        pros.put("bootstrap.servers","192.168.8.147:9092");pros.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");pros.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 0 发出去就确认 | 1 leader 落盘就确认| all(-1) 所有Follower同步完才确认pros.put("acks","1");// 异常自动重试次数pros.put("retries",3);// 多少条数据发送一次,默认16Kpros.put("batch.size",16384);// 批量发送的等待时间pros.put("linger.ms",5);// 客户端缓冲区大小,默认32M,满了也会触发消息发送pros.put("buffer.memory",33554432);// 获取元数据时生产者的阻塞时间,超时后抛出异常pros.put("max.block.ms",3000);// 创建Sender线程Producer<String,String> producer = new KafkaProducer<String,String>(pros);for (int i =0 ;i<1000000;i++) {producer.send(new ProducerRecord<String,String>("mytopic",Integer.toString(i),Integer.toString(i)));// System.out.println("发送:"+i);}//producer.send(new ProducerRecord<String,String>("mytopic","1","1"));//producer.send(new ProducerRecord<String,String>("mytopic","2","2"));producer.close();}

a. 首先我们是创建一些kafka的连接配置以及参数配置,然后先new出来一个生产者,创建一个sender线程,由下图源码可以看出,我们在new生产者的时候,kafak会帮我们船舰一个sender线程,并进行了命名和启动

 b.随后我们的main线程中,进行批量send发送,那么接下来我们看下send方法

可以看到,在send方法中,还有一个interceptors做了一个拦截器的处理, 那么拦截器应该怎么使用的呢?

我们只需要实现ProducerInterceptor中的onsend方法,并且在kafka send消息前进行配置就可以了

public class ChargingInterceptor implements ProducerInterceptor<String, String> {// 发送消息的时候触发@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {System.out.println("1分钱1条消息,不管那么多反正先扣钱");return record;
}

带有拦截器的kafka demo

 public static void main(String[] args) {Properties props=new Properties();props.put("bootstrap.servers","192.168.8.144:9092,192.168.8.145:9092,192.168.8.146:9092");
//        props.put("bootstrap.servers","192.168.8.147:9092");props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 0 发出去就确认 | 1 leader 落盘就确认| all 所有Follower同步完才确认props.put("acks","1");// 异常自动重试次数props.put("retries",3);// 多少条数据发送一次,默认16Kprops.put("batch.size",16384);// 批量发送的等待时间props.put("linger.ms",5);// 客户端缓冲区大小,默认32M,满了也会触发消息发送props.put("buffer.memory",33554432);// 获取元数据时生产者的阻塞时间,超时后抛出异常props.put("max.block.ms",3000);// 添加拦截器List<String> interceptors = new ArrayList<>();interceptors.add("com.zsc.mq.kafka.javaapi.interceptor.ChargingInterceptor");// 这个键就是拦截器的配置,因为拦截器是个list,因此可以实现多个拦截器props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);Producer<String,String> producer = new KafkaProducer<String,String>(props);producer.send(new ProducerRecord<String,String>("mytopic","1","1"));producer.send(new ProducerRecord<String,String>("mytopic","2","2"));producer.close();}}

 c.  send方法走完拦截器后,我们进入到dosend方法中,接着看

 

可以看到,kafka对我们的消息进行了一个序列化,那么序列化方式就是在我们初始配置参数的时候进行配置的,可以指定不同的序列化方式,并且也可以自定义序列化方式,实现序列化接口,增加到配置类中即可

d. 看完序列化,我们的消息发送接着往下面走, 进入到分区器流程

 

 

 由上面可知,我们的分区器如果指定了分区就会走我们指定的分区;消息没有指定分区但是自定义了消息分区器,就会走到消息分区器中,自定义消息器代码如下(实现partitioer接口即可):

public class SimplePartitioner implements Partitioner {public SimplePartitioner() {}@Overridepublic void configure(Map<String, ?> configs) {}@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String k = (String) key;System.out.println(k);if (Integer.parseInt(k) % 2 == 0){return 0;}else{return 1;}// return Integer.parseInt(k)%2;}@Overridepublic void close() {}

 还有第三个情况,既没有指定也没有自定义分区器。那么key不为空,那就是走hash取模算法;key也会空的话,就是采用粘连策略(根据topic来确定在哪里存储)

e. 当我们消息的分区器走完之后,就进入到我们的累加器,在上篇博客MQ之初识kafka-CSDN博客我们介绍组件的时候就提到过,kafka为了提升高吞吐,查询效率快,消息并不是堆积在一起的,而是一批一批去放的,因此经过一个累加器。

可以看到 按批次添加到累加器中,那么添加到累加器之后,是怎么触发流程的呢?

f . 顺着源码再往下看,可以看到一个判断,当累加器的批次满了的话或者是刚创建的批次,就会去唤醒sender线程,向Broker中发送消息。

生产者发送消息的整个流程图如下所示:

 

2. ACK应答机制与ISR机制

2.1 服务器端响应策略的必要性

如图所示,我们正常的执行流程是生产者producer向leader中发送消息,然后leader同步到两个follower副本中,那么当发送消息的过程中服务异常的话,我们的leader就接收不到消息了,因此需要一个应答机制来保证我们能够接收到消息,如果leader没有接收到消息,就触发重发机制,让producer重新发送消息给leader

 2.2 ACK应答机制

kafka中提供了三种可靠性级别,可以根据对可靠性和延迟性的要求进行选择

1.acks = 0  producer 不等待 broker动作、直接返回ack
2.acks = 1(默认) producer等待broker 动作、 leader 落盘成功、返回 ack
3.acks=-1(all)  producer 等待 broker 动作、 leader&follower 全部落盘成功、返回 ack

 props.put("ack","0");  不等待ACK

这种情况是说我们的producer发送消息给leader,leader异步返回ack给peoducer告诉消息已经发送成功了,这种正常存储的情况下肯定是没有问题的。但是如果还没有同步副本的情况下,我们的leader此时挂掉了,而producer已经收到了应答,因此不会再重发消息。当再次重启leader所在的服务器时,数据就丢失了

props.put("ack","1");  Leader落盘、返回ACK(默认)

这种下,我们peoducer确定了leader已经落盘了,但是如果极端情况下,leader还没有同步副本给follower,那么此时leader服务器挂了,数据是不是也就丢失了,因为也还没有进行备份

 props.put("ack","-1");  Leader和全部Follower落盘、返回ACK

这种情况是我们的producer等待leader和follower全部落盘成功后,进行ack响应,这种策略的可靠性最高,但是吞吐量是最低的,因此要根据具体业务具体配置。那么这种策略是不是就没有什么问题了呢?当然也有,比如当leader和follower都落盘后,再返回应答信号时,leader挂了,那么peoducer没有收到消息,就会任务leader没有接收到消息,还是会对消息进行重发,那这个问题怎么解决呢? 可以用消息幂等性(在第三章进行赘述说明)

 应答异常

如上图,当一个flower挂了的情况下,是不是我们的leader就没法同步了,没法同步,就会造成整个链路的阻塞,peoducer没收到应答信息还啥也不知道,又往leader发消息,如果这样持续下去,服务是不是就该崩了,因此引入了一个ISR机制。

2.3 ISR机制

 ISR是一组动态维护的同步副本集合,它的作用就是把leader和follower同时放到一个ISR队列中,比如上面的P0_R0挂掉了,同步不积极,那么就把它移除ISR队列,默认为30s,可以经过replica.lag.time.max.ms进行配置,当ISR中的队列都同步完了的话,就返回ACK应答信号

AR = ISR+OSR

3. 消息幂等性

发送消息情形-1: 正常发送

发送消息情形-2:消息发送失败,触发消息重发,造成消息重发写入

 

 发送消息情形-3:消息发送失败,触发消息重发,消息不重复写入

如上图所示,是怎么保证消息不被重复写入的呢?利用幂等性,在发送消息的时候新增两个参数PID与Sequence Number分别代表生产者ID和消息的编码,那么Broker存储的时候也会多加一点空间存储这两个值,当ack应答异常时,再次重发消息到队列中时,就会进行一次判断a.如果PID和sequence Number都相等,则消息写入队列失败,b.如果Sequence Number为1 则顺序写入 c.如果Sequence Number为2,则抛出异常,表示数据有丢失

幂等性生产者发送消息流程总结:

1 Producer 端发送消息(消息本身、 PID Sequence Number
2 Broker 端接收到消息(将消息和 PID Sequence Number 一起保存)
3 、若 ACK 响应失败,生产者重试,再次发送消息

 kafka是在Broker端完成的去重处理

4. Kafka生产者事务

生产者的幂等性只能保证在单分区单会话的场景下有效,因此对于多分区来说,kafka事务就提供了对多个分区写入操作的原子性。但是kafka事务的前提是开启幂等性。

kafka事务API的相关方法

initTransactions() 初始化事务
beginTransaction() 开启事务
commitTransaction() 提交事务
abortTransaction() 中止事务
sendOffsetsToTransaction()

事务的一个demo

    public static void main(String[] args) {Properties props=new Properties();//props.put("bootstrap.servers","192.168.8.144:9092,192.168.8.145:9092,192.168.8.146:9092");props.put("bootstrap.servers","192.168.8.147:9092");props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 0 发出去就确认 | 1 leader 落盘就确认| all或-1 所有Follower同步完才确认props.put("acks","all");// 异常自动重试次数props.put("retries",3);// 多少条数据发送一次,默认16Kprops.put("batch.size",16384);// 批量发送的等待时间props.put("linger.ms",5);// 客户端缓冲区大小,默认32M,满了也会触发消息发送props.put("buffer.memory",33554432);// 获取元数据时生产者的阻塞时间,超时后抛出异常props.put("max.block.ms",3000);props.put("enable.idempotence",true);// 事务ID,唯一props.put("transactional.id", UUID.randomUUID().toString());Producer<String,String> producer = new KafkaProducer<String,String>(props);// 初始化事务producer.initTransactions();try {producer.beginTransaction();producer.send(new ProducerRecord<String,String>("transaction-test","1","1"));producer.send(new ProducerRecord<String,String>("transaction-test","2","2"));
//            Integer i = 1/0;producer.send(new ProducerRecord<String,String>("transaction-test","3","3"));// 提交事务producer.commitTransaction();} catch (KafkaException e) {// 中止事务producer.abortTransaction();}producer.close();}
}

kafka事务操作的基本流程

最后标记消费状态后,就可以进行消费了

 kafka的事务细节流程:

5. 总结

        本文主要是介绍了kafka生产者端的一些原理,先是从源码出发,介绍了生产者发送消息到Broker经历的一系列过程:先是创建了一个sender线程,然后在发送消息的过程中一次经过拦截器、累加器、分区器最后根据分区的批量消息是否新建或者满了来触发sender线程发送到Broker服务器中。随后我们介绍了,peoducer跟broker服务器之间的交互采用的是应答机制,在这里有3种配置,可根据业务需要来具体配置,当配置-1的时候,我们分析了为什么会出现重发消息的问题,通过幂等性来保证,follower从节点挂了的情况下,应答异常,采用ISR队列机制进行避免。但是幂等性只能保证单分区单会话的场景,而针对多分区的情况下,kafka主要是采用分布式事务来解决,利用分布式ID,事务coordinattor和事务日志分二PC提交,并且对事务的状态进行存储标记,当事务的状态更改为可消费的时候,才会进行消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/23893.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

26-LINUX--I/O复用-select

一.I/O复用概述 /O复用使得多个程序能够同时监听多个文件描述符&#xff0c;对提高程序的性能有很大帮助。以下情况适用于I/O复用技术&#xff1a; ◼ TCP 服务器同时要处理监听套接字和连接套接字。 ◼ 服务器要同时处理 TCP 请求和 UDP 请求。 ◼ 程序要同时处理多个套接…

使用 Django 构建动态网页

文章目录 创建 Django 项目和应用程序创建 HTML 模板创建视图函数配置 URL 路由运行 Django 服务器使用 Django 模板语言 Django 是一个流行的 Python Web 框架&#xff0c;它能够帮助开发人员快速构建强大的 Web 应用程序。在 Django 中&#xff0c;HTML 是用于呈现网页内容的…

1. 计算机系统概述

1. 计算机系统概述 文章目录 1. 计算机系统概述1.1 计算机的发展硬件的发展软件的发展 1.2.1 计算机硬件的基本组成早期冯诺依曼的结构现代计算机的结构 1.2.2 各个硬件的工作原理主存储器运算器控制器计算机工作过程 1.2.3 计算机系统的多级层次结构1.3 计算机的性能指标存储器…

GD32如何配置中断优先级分组以及中断优先级

使用GD32 MCU的过程中&#xff0c;大家可能会有以下疑问&#xff1a;中断优先级如何配置和使用&#xff1f; 本文将会为大家解析中断优先级分组以及中断优先级的配置使用&#xff1a; 中断优先级分组配置 一个GD32 MCU系统需要大家明确系统中使用的中断优先级分组&#xff0…

代驾公司在市场竞争中如何保持优势?

在竞争激烈的市场中&#xff0c;代驾公司可以通过多种策略保持其竞争优势&#xff0c;包括利用市场潜力、创新服务模式、提高服务效率以及加强品牌建设等。以下是具体的策略&#xff1a; 利用市场潜力 汽车产业空间巨大&#xff1a;随着汽车保有量的增加&#xff0c;代驾行业…

扫地机器人:卷价格,不如卷技术

扫地机器人内卷的终点是技术和价值&#xff0c;价格只是附属品。 一路上涨的价格&#xff0c;一路下跌的销量 从价格飙升&#xff0c;到重新卷回价格&#xff0c;尴尬的背后是扫地机器人在骨感现实下的无奈抉择。 根据数据显示&#xff0c;2020中国扫地机器人线上市场零售均价…

通过可识别性和深度学习重建大脑功能网络

摘要 本研究提出了一种新的方法来重建代表大脑动力学的功能网络&#xff0c;该方法基于两个脑区在同一认知任务中的共同参与会导致其可识别性或其动力学特性降低的观点。这种可识别性是通过深度学习模型在监督分类任务中获得的分数来估计的&#xff0c;因此不需要对这种协同参…

Flink端到端的精确一次(Exactly-Once)

目录 状态一致性 端到端的状态一致性 端到端精确一次&#xff08;End-To-End Exactly-Once&#xff09; Flink内部的Exactly-Once 输入端保证 输出端保证 幂等写入 事务写入 Flink和Kafka连接时的精确一次保证 整体介绍 需要的配置 案例 状态一致性 流式计算本身就…

无线麦克风哪个牌子性价比高?一文告诉你无线领夹麦克风怎么挑选

​当我们谈论到演讲、表演或者录制视频时&#xff0c;一个高质量的无线麦克风能够使得整个体验提升至一个全新的水平。它不仅能够保证声音的清晰度和真实度&#xff0c;还能够让使用者在演讲或者表演时更加自信和舒适。基于对市场的深入研究和用户体验的考量&#xff0c;我挑选…

【Java】解决Java报错:IllegalArgumentException

文章目录 引言1. 错误详解2. 常见的出错场景2.1 非法的参数值2.2 空值或 null 参数2.3 非法的数组索引 3. 解决方案3.1 参数验证3.2 使用自定义异常3.3 使用Java标准库中的 Objects 类 4. 预防措施4.1 编写防御性代码4.2 使用注解和检查工具4.3 单元测试 结语 引言 在Java编程…

【NPS】微软NPS配置802.1x,验证域账号,动态分配VLAN(有线网络续篇)

继上一篇文章中成功实施了有线802.1x验证域账号并动态分配VLAN的策略之后&#xff0c;我们迎来了一个新的目标&#xff1a;在用户验证失败时&#xff0c;自动分配一个Guest VLAN&#xff0c;以确保用户至少能够访问基本的网络服务。这一改进将显著提升网络的灵活性和用户的上网…

Kafka的分区副本机制

目录 生产者的分区写入策略 轮询策略 随机策略 按key分配策略 乱序分区 自定义分区策略 实现步骤&#xff1a; 消费者组Rebalance机制 Rebalance触发时机 Rebalance的不良影响 消费者分区分配策略 Range范围分配策略 RoundRobin轮询策略 Stricky粘性分配策略 生产…

计算机网络-NAT配置与ACL

目录 一、ACL 1、ACL概述 2、ACL的作用 3、ACL的分类 4、ACL的配置格式 二、NAT 1、NAT概述 2、NAT分类 2.1 、 静态NAT 2.2 、 动态NAT 3、NAT的功能 4、NAT的工作原理 三、NAT配置 1、静态NAT配置 2、动态NAT配置 四、总结 一、ACL 1、ACL概述 ACL&#xff…

让编程变得更加直观与高效 “JAVA图形化编程”官网上线!

公测预约开启 我们历经了长达三年的时光&#xff0c;执着地坚守并潜心地进行探索&#xff0c;始终怀着一颗敬畏的心&#xff0c;最终极为谨慎地推出了这款图形化编程桌面。它能够使得业务与程序清晰明了地呈现&#xff0c;而且还能与传统的低零代码平台实现紧密…

新品发布 | 飞凌嵌入式RK3576核心板,为AIoT应用赋能

为了充分满足AIoT市场对高性能、高算力和低功耗主控日益增长的需求&#xff0c;飞凌嵌入式全新推出基于Rockchip RK3576处理器开发设计的FET3576-C核心板&#xff01; 集成4个ARM Cortex-A72和4个ARM Cortex-A53高性能核&#xff0c;内置6TOPS超强算力NPU&#xff0c;为您的AI…

LeetCode 两数之和 + 三数之和

两数之和 简单题 思路&#xff1a;一个Map&#xff0c;key是数值&#xff0c;value是该数值对应的下标&#xff0c;遍历的时候判断一下当前数组下标对应的值在map里有没有可组合成target的&#xff08;具体体现为在map里找target-nums【i】)&#xff0c;如果有&#xff0c;直接…

IDEA使用阿里通义灵码插件

在这个AI火热的时代&#xff0c;纯手工写代码已经有点out了&#xff0c;使用AI插件可以帮我们快速写代码&#xff0c;起码能省去写那些简单、重复性的代码&#xff0c;大大提高编码效率&#xff0c;在这里我推荐使用阿里的通义灵码 注册安装 安装注册好后&#xff0c;打开我们…

【流媒体】音频相关概念详解

文章目录 一、前言二、概述三、音频相关概念1、采样率&#xff08;Sampling rate&#xff09;2、位深度&#xff08;Bit depth&#xff09;3、比特率&#xff08;Bit rate&#xff09;4、声道&#xff08;Audio channel&#xff09;5、音频帧6、音频编码7、音频解码 一、前言 …

搭建自己的DNS服务器

个人名片 &#x1f393;作者简介&#xff1a;java领域优质创作者 &#x1f310;个人主页&#xff1a;码农阿豪 &#x1f4de;工作室&#xff1a;新空间代码工作室&#xff08;提供各种软件服务&#xff09; &#x1f48c;个人邮箱&#xff1a;[2435024119qq.com] &#x1f4f1…

腺苷调节合成高密度脂蛋白用于三阴性乳腺癌的化学免疫治疗

引用信息 文 章&#xff1a;Adenosine-modulating synthetic high-density lipoprotein for chemoimmunotherapy of triple-negative breast cancer 期 刊&#xff1a;Journal of Controlled Release&#xff08;影响因子&#xff1a;10.8&#xff09; 发表时间&am…