消息队列-kafka-消息发送流程(源码跟踪) 与消息可靠性

官方网址

源码:https://kafka.apache.org/downloads
快速开始:https://kafka.apache.org/documentation/#gettingStarted
springcloud整合

发送消息流程

在这里插入图片描述
主线程:主线程只负责组织消息,如果是同步发送会阻塞,如果是异步发送需要传入一个回调函数。
Map集合:存储了主线程的消息。
Sender线程:真正的发送其实是sender去发送到broker中。

源码阅读

1 首先打开Producer.send()可以看到里面的内容

// 返回值是一个 Future 参数为ProducerRecord
Future<RecordMetadata> send(ProducerRecord<K, V> record);
// ProducerRecord定义了这些信息
// 主题
private final String topic;
// 分区
private final Integer partition;
// header
private final Headers headers;
private final K key;
private final V value;
// 时间戳
private final Long timestamp;

2 发送之前的前置处理

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {// intercept the record, which can be potentially modified; this method does not throw exceptions// 这里给开发者提供了前置处理的勾子ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);// 我们最终发送的是经过处理后的消息 并且如果是异步发送会有callback 这个是用户定义的return doSend(interceptedRecord, callback);}

3 进入真正的发送逻辑Future doSend()

  • 由于是网络通信,所以我们要序列化,在这个函数里面就做了序列化的内容。
try {serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException cce) {throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +" specified in key.serializer", cce);}byte[] serializedValue;try {serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());} catch (ClassCastException cce) {throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +" specified in value.serializer", cce);}
  • 然后我们获取分区
// 然后这里又是一个策略者模式 也是由用户可以配置的  DefaultPartitioner UniformStickyPartitioner RoundRobinPartitioner 提供了这样三个分区器
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {Integer partition = record.partition();return partition != null ?partition :partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

4 到了我们的RecordAccumulator,也就是先由主线程发送到了RecordAccumulator

// 也就是对图中的Map集合
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

我们发现里面是用一个MAP存储的一个分区和ProducerBatch 是讲这个消息写到内存里面MemoryRecordsBuilder 通过这个进行写入

// 可以看到是一个链表实现的双向队列,也就是消息会按append的顺序写到 内存记录中去
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

5 接着我们看,我们append了以后,会有一个判断去唤醒sender线程,见下面的注释

// 如果说哦我们当前的 这个batch满了或者 我们创建了一个新的batch 这个时候唤醒 sender线程去发送数据
if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);// 唤醒sender 去发送数据this.sender.wakeup();}
// 实现了Runnable 所以我们去看一下RUN方法的逻辑
public class Sender implements Runnable 

好上来就是一个循环

while (running) {try {runOnce();} catch (Exception e) {log.error("Uncaught error in kafka producer I/O thread: ", e);}
}

接着进入runOnece方法,直接看核心逻辑

// 从RecordAccumulator 拿数据 然后发送
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);addToInflightBatches(batches);
// 中间省去了非核心逻辑
sendProduceRequests(batches, now);

如果继续跟踪的话最终是走到了selector.send()里面:

Send send = request.toSend(destination, header);InFlightRequest inFlightRequest = new InFlightRequest(clientRequest,header,isInternalRequest,request,send,now);this.inFlightRequests.add(inFlightRequest);selector.send(send);

6 接着我们就要看返回逻辑了,可以看到在sendRequest里面sendProduceRequest方法是通过传入了一个回调函数处理返回的。

RequestCompletionHandler callback = new RequestCompletionHandler() {public void onComplete(ClientResponse response) {handleProduceResponse(response, recordsByPartition, time.milliseconds());}};
// 如果有返回
if (response.hasResponse()) {ProduceResponse produceResponse = (ProduceResponse) response.responseBody();for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {TopicPartition tp = entry.getKey();ProduceResponse.PartitionResponse partResp = entry.getValue();ProducerBatch batch = batches.get(tp);completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());}this.sensors.recordLatency(response.destination(), response.requestLatencyMs());} 

追踪到ProducerBatch

if (this.finalState.compareAndSet(null, tryFinalState)) {completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);return true;}
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {// Set the future before invoking the callbacks as we rely on its state for the `onCompletion` callproduceFuture.set(baseOffset, logAppendTime, exception);// execute callbacksfor (Thunk thunk : thunks) {try {if (exception == null) {RecordMetadata metadata = thunk.future.value();if (thunk.callback != null)thunk.callback.onCompletion(metadata, null);} else {if (thunk.callback != null)thunk.callback.onCompletion(null, exception);}} catch (Exception e) {log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);}}produceFuture.done();}

Thunk 这个其实就是我们在Append的时候的回调:
在这里插入图片描述
至此整个流程就完成了,从发送消息,到响应后回调我们的函数。

消息可靠性

// 所有消费者的配置都在ProducerConfig 里面
public static final String ACKS_CONFIG = "acks";

acks = 0:异步形式,单向发送,不会等待 broker 的响应
acks = 1:主分区保存成功,然后就响应了客户端,并不保证所有的副本分区保存成功
acks = all 或 -1:等待 broker 的响应,然后 broker 等待副本分区的响应,总之数据落地到所有的分区后,才能给到producer 一个响应

在可靠性的保证下,假设一些故障:

  • Broker 收到消息后,同步 ISR 异常:只要在 -1 的情况下,其实不会造成消息的丢失,因为有重试机制
  • Broker 收到消息,并同步 ISR 成功,但是响应超时:只要在 -1 的情况下,其实不会造成消息的丢失,因为有重试机制

可靠性能保证哪些,不能保障哪些?

  • 保证了消息不会丢失
  • 不保证消息一定不会重复(消息有重复的概率,需要消费者有幂等性控制机制)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/730473.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

APEX开发过程中需要注意的小细节3

【问题记录】在编辑数据信息时发现辩护的数据无法保存&#xff0c;提示 “ORA-01799: 列不能外部联接到子查询” 仅展示的数据来自主表&#xff0c;这部分都是关联子表重点内容&#xff0c;编辑时无法保存 于是想将扩展表作为主表&#xff0c;在主表进行修改试试 新的报错&a…

C#实现归并排序算法

C#实现归并排序算法 以下是 C# 中的归并排序算法实现示例&#xff1a; using System;class MergeSortAlgorithm {// 合并两个子数组static void Merge(int[] arr, int left, int mid, int right){// 计算左子数组和右子数组的长度int n1 mid - left 1;int n2 right - mid;/…

C语言从入门到精通 第十二章(程序的编译及链接)

写在前面&#xff1a; 本系列专栏主要介绍C语言的相关知识&#xff0c;思路以下面的参考链接教程为主&#xff0c;大部分笔记也出自该教程。除了参考下面的链接教程以外&#xff0c;笔者还参考了其它的一些C语言教材&#xff0c;笔者认为重要的部分大多都会用粗体标注&#xf…

读《文明之光》第1册总结

人类几千年的文明史和地球的历史相比&#xff0c;实在是太短暂了&#xff0c;大约相当于几分钟和一年的关系。人类已经走过的路&#xff0c;相比今后要走的漫漫长路&#xff0c;只能算是刚刚起步。如果跳出一个个具体事件&#xff0c;站在历史的高度去看&#xff0c;我们会发现…

支小蜜校园防欺凌系统听到声音之后会自动识别吗

在校园安全领域&#xff0c;特别是在预防和应对欺凌问题上&#xff0c;校园防欺凌系统作为新兴的技术应用&#xff0c;正在受到越来越多的关注和探索。那么当这样的系统听到声音之后&#xff0c;它是否能够自动识别并作出相应反应呢&#xff1f;本文将围绕这一问题展开探讨。 …

Python通过SFTP实现网络设备配置备份

一、背景 为了防止网络设备意外损坏&#xff0c;导致配置文件无法恢复&#xff0c;可以通过将网络设备的配置文件备份到本地电脑上。 一般情况下&#xff0c;设备支持通过FTP、TFTP、FTPS、SFTP和SCP备份配置文件。其中使用FTP和TFTP备份配置文件比较简单&#xff0c;但是存在…

K线实战分析系列之十八:十字线——判断行情顶部的有效信号

K线实战分析系列之十八&#xff1a;十字线——判断行情顶部的有效信号 一、十字线二、十字线总结三、三种特殊十字线四、长腿十字线五、墓碑十字线六、蜻蜓十字线七、特殊十字线总结 一、十字线 重要的反转信号 幅度较大的下跌&#xff0c;出现一根十字线&#xff0c;正好是在…

编译内核错误 multiple definition of `yylloc‘

编译内核错误 # make ARCHarm CROSS_COMPILEarm-mix410-linux- uImageHOSTLD scripts/dtc/dtc /usr/bin/ld: scripts/dtc/dtc-parser.tab.o:(.bss0x10): multiple definition of yylloc; scripts/dtc/dtc-lexer.lex.o:(.bss0x0): first defined here collect2: error: ld ret…

【Lazy ORM】 小工具 acw 本地客户端 你负责点击页面,他负责输出代码

介绍 wu-smart-acw-client 简称acw-client&#xff0c;是一个基于Lazy ORM定制的客户端代码生成小工具 Lazy ORM 小工具 acw 本地客户端 你负责点击页面&#xff0c;他负责输出代码安装 <dependency><groupId>top.wu2020</groupId><artifactId>wu-sma…

数据结构——lesson7二叉树 堆的介绍与实现

前言&#x1f49e;&#x1f49e; 啦啦啦~这里是土土数据结构学习笔记&#x1f973;&#x1f973; &#x1f4a5;个人主页&#xff1a;大耳朵土土垚的博客 &#x1f4a5; 所属专栏&#xff1a;数据结构学习笔记 &#x1f4a5;对于数据结构顺序表链表有疑问的都可以在上面数据结…

第四届信息通信与软件工程国际会议(ICICSE 2024)即将召开!

2024年第四届信息通信与软件工程国际会议&#xff08;ICICSE 2024&#xff09;将于2024年5月10-12日在中国北京举办。本次会议由北京工业大学、IEEE以及Comsoc 联合主办。随着当今社会信息化的高速发展&#xff0c;电子信息技术的应用更是随处可见。其中&#xff0c;信息通信与…

备份 ChatGPT 的聊天纪录

备份 ChatGPT 的聊天纪录 ChatGPT 在前阵子发生了不少次对话纪录消失的情况&#xff0c;让许多用户觉得困扰不已&#xff0c;也担心自己想留存的聊天记录消失不见。 好消息是&#xff0c;OpenAI 在 2023 年 4 月 11 日推出了 ChatGPT 聊天记录备份功能&#xff0c;无论是免费…

Redis什么这么快和Redis单线程模型和多线程

概述 1、完全基于内存&#xff0c;绝大部分请求是纯粹的内存操作&#xff0c;非常快速。数据存在内存中&#xff0c;类似于HashMap&#xff0c;HashMap的优势就是查找和操作的时间复杂度都是O(1)&#xff1b; 2、数据结构简单&#xff0c;对数据操作也简单&#xff0c;Redis中…

二叉树入门

这篇博客通过手动创建的一个简单二叉树&#xff0c;实现二叉树遍历&#xff0c;返回节点&#xff0c;叶子个数&#xff0c;查找结点等相关操作。 1. 二叉树的概念 二叉树不为空时&#xff0c;由根节点&#xff0c;左/右子树组成&#xff0c;逻辑结构如下&#xff0c;当二叉树…

Postman(注册,使用,作用)【详解】

目录 一、Postman 1. Postman介绍 2. 安装Postman 3. 注册帐号再使用(可保存测试记录) 4. 创建workspace 5. 测试并保存测试记录 一、Postman postman工具可以发送不同方式的请求,浏览器只能发送get请求(所有用这个工具) 在前后端分离开发模式下&#xff0c;前端技术人员…

Spring Boot中SQL语句报错

报错原因&#xff1a; You have an error in your SQL syntax 你的SQL语句出现错误 报错位置&#xff1a; check the manual that corresponds to your MySQL server version for the right syntax to use near :/sql/schema.sql.t_film at line 1 在:/sql/schema.sql附近使用…

主网NFT的发布合约

1.什么是nft? NFT:Non-fungible-token 非同质化货币 2.新建suimove项目 使用sui move new 项目名命令新建sui move项目 sui move new nft_qyx项目结构如下: 3.写nft合约 module qyx123::nft{use sui::object::{Self, UID};use sui::transfer;use sui::tx_context::{Sel…

【工具】Raycast – Mac提效工具

引入 以前看到同事们锁屏的时候&#xff0c;不知按了什么键&#xff0c;直接调出这个框&#xff0c;然后输入lock屏幕就锁了。 跟我习惯的按Mac开机键不大一样。个人觉得还是蛮炫酷的&#xff5e; 调研 但是由于之前比较繁忙&#xff0c;这件事其实都忘的差不多了&#xff0…

JVM-垃圾收集底层算法实现

三色标记 背景描述 在并发标记的过程中&#xff0c;因为标记期间应用线程还在继续跑&#xff0c;对象间的引用可能发生变化&#xff0c;多标和漏标的情况就有可能发生。 如何解决上面的问题&#xff1f; 引入“三色标记” 意思就是&#xff0c;把Gcroots可达性分析遍历对象过程…

BUUCTF---[MRCTF2020]你传你呢1

1.题目描述 2.打开题目链接 3.上传shell.jpg文件&#xff0c;显示连接成功&#xff0c;但是用蚁剑连接却连接不上。shell文件内容为 <script languagephp>eval($_REQUEST[cmd]);</script>4.用bp抓包&#xff0c;修改属性 5.需要上传一个.htaccess的文件来把jpg后缀…