Kafka - 生产者

生产者消息对象

public class ProducerRecord<K, V> {private final String topic; // 主题private final Integer partition; //分区号private final Headers headers; //消息头部private final K key; //键private final V value; //值private final Long timestamp; //消息的时间戳
}

其中key是用来指定消息的键,它不仅是消息的附加信息,还可以用来计算分区号,进而让消息发往特定的分区,一般同一个key的消息会被划分到同一个分区中。
timestamp是指消息的时间戳,它有CreateTime和LogAppendTime两种类型,前者表示消息创建的时间,后者表示消息追加到日志文件的时间。

创建生产者实例

public static Properties initConfig() {Properties props = new Properties();props.put(ProducerConfig.KEY_SERIALZER_CLASS_CONFIG,StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALZER_CLASS_CONFIG,StringSerializer.class.getName());
}KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

消息的发送

创建生产者实例

创建生产者实例的方法有很多种,其中最简单的是下面的构造方于除了topic和value外的属性,其他都置为null。

public ProducerRecord(String topic, V value);

发送消息主要有三种模式:发完即忘(fire-and-forget),同步(sync)及异步(async)。

KafkaProducer的sand()方法返回值并非是void类型,而是Future类型,send()方法有两个重载方法,具体定义如下:

public Future<RecordMetadata> send(ProducerRecord<K,V> record);
public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback);
  • 发完即忘
    它只管往Kafka中发送消息而并不关心消息是否正确到达。
    在大多数情况下,这种发送方式没有什么问题,不过在某些时候(比如发生不可重试异常时),会造成消息的丢失。这种发送方式性能最高,但可靠性也最差。

  • 同步发送

try {producer.send(record).get();
} catch (ExecutionException | InterruptedException e) {e.printStackTrace();
}

通过feature对象中的get()方法,来阻塞等待kafka的响应,直到发送成功,或者发生异常。

同步发送的可靠性高,但性能会差很多,因为需要阻塞等到一条消息发送完之后,才能发送下一条。

  • 异步发送
producer.send(record, new Callback()) {@overridepublic void onCompletion(RecordMetadata metadata, Excetion exception){if (excetion != null) {exception.printStackTrace();} else {...}}
}

当Kafka有响应时候,就会有回调,要么发送成功,要么抛出异常。

序列化器

生产者需要用序列化器把对象转换成字节数组,才能通过网络发送给Kafka。而消费者需要用反序列器把从Kafka中收到的字节数组转换成相应的对象。

分区器

分区器的作用是为消息分配分区。

消息经过序列化后,就需要确定它发往的分区,如果消息ProducerRecord中指定了partition字段,那么就不需要分区器,因为patition代表的就是要发往的分区号。如果没有指定partition,则需要依赖分区器,根据key字段来计算partition的值。

拦截器

生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前,做一些定制化的需求,比如统计类工作。

原理分析

整体架构

在这里插入图片描述
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程Sender线程。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用后,缓存到消息收集器中(RecordAccumulator)。Sender现成负责从消息收集器中获取消息,并将其发送到kafka中。

RecordAccumulator
该收集器主要用来缓存消息,以便Sender线程可以批量发送,进而减少网络传输的资源消耗,以提高性能。

RecordAccumulator的内部为每个分区都维护了一个双端队列,队列中的内容为ProducerBatch,即Deque。消息写入缓存时候,追加到双端队列的尾部,读取消息时,从双端队列的头部读取。

Sender
Seender从RecordAccumulator中获取缓存的消息后,会进一步将原来<分区,Deque>的保存形式转变为<Node, List>的形式,其中Node表示集群的broken节点。

对于网络连接来说,生产者客户端是与具体的broken节点建立的连接,就是向具体的broken节点发送消息,而不关心消息属于哪一个分区;而对于KafkaProducer
的应用逻辑而言,我们只关注向哪个分区中发送消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。

在转换成<Node, List>后,Sender还会进一步封装成<Node, Request>的形式,这样就可以将Request发往各个Node了。

请求在从Sender线程发往Kafka之前还会保存到InFlightRequest中,InFlightRequest存对象的具体形式为Map<NodeId, Deque>,它的主要作用是缓存了已经发送出去但还是没有收到响应的请求。

元数据的更新

元数据是指Kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上follower副本又分配在哪些节点上等等信息。

假设我们通过如下的方式创建了一条消息ProducerRecord,

ProducerRecord<String, String> record = new ProducerRecord<>(topic, “xxx”);

这里的发送指令,我们只知道主题名称,和需要发送的内容,对其他信息却一无所知。例如要将此消息追加到指定主题的某个分区所对应的leader副本之前,首先需要知道主题的分区数量,然后计算出目标分区,还需要知道leader副本所在的broken节点的地址、端口等信息才能建立链接,这些都属于元信息。

元数据的更新是在客户端进行的,对客户端的外部使用者不可见。更新操作是由Sender线程发起的,主线程也需要读取这些信息,这里的数据同步,是通过Synchronized和final关键字来保障。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/49520.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

opencv 按键开启连续截图,并加载提示图片

背景图小图 键盘监听使用的是pynput 库 保存图片时使用了年月日时分秒命名 原图&#xff1a; from pynput import keyboard import cv2 import time# 键盘监听 def on_press(key):global jieglobal guanif key.char a:jie Trueelif key.char d:jie Falseelif key.char…

配置三个Spring Boot应用并通过Nginx进行反向代理 讨论中

要配置三个Spring Boot应用并通过Nginx进行反向代理&#xff0c;你可以按照以下步骤操作&#xff1a; 步骤 1: 准备Spring Boot应用 确保每个Spring Boot应用都有不同的端口号。例如&#xff0c;你可以设置第一个应用监听8080端口&#xff0c;第二个监听8081端口&#xff0c;…

FPGA JTAG最小系统 EP2C5T144C8N

FPGA的文档没有相应的基础还真不容易看懂&#xff0c;下面是B站上对FPGA文档的解读(本文非对文档解读&#xff0c;只是为个人记录第三期&#xff1a;CycloneIV E最小系统板设计&#xff08;一&#xff09;从Datasheet上获取FPGA的基本参数_哔哩哔哩_bilibili 电源部份 核心电…

TS config

moduleResolution 是 TypeScript 编译器中的一个选项&#xff0c;用于控制如何解析模块导入。这个选项影响着 TypeScript 如何查找和解析 import 和 export 声明中指定的模块。 {"compilerOptions": {"moduleResolution": "Node"//小写也没问题…

SELinux的 getenforce setenforce 配置文件/etc/selinux/config的 SELINUX和SELINUXTYPE

SELinux&#xff08;Security-Enhanced Linux&#xff09;是一个为Linux系统提供访问控制安全策略的安全模块。它是Linux内核的一个功能强大的安全子系统&#xff0c;旨在提供访问控制安全策略机制&#xff0c;以限制程序中特定代码段的权限。SELinux超越了传统的UNIX权限模型&…

AI学习记录 -使用react开发一个网页,对接chatgpt接口,附带一些英语的学习prompt

实现了如下功能&#xff08;使用react实现&#xff0c;原创&#xff09; 实现功能&#xff1a; 1、对接gpt35模型问答&#xff0c;并实现了流式传输&#xff08;在java端&#xff09; 2、在实际使用中&#xff0c;我们的问答历史会经常分享给他人&#xff0c;所以下图的 copy …

Python酷库之旅-第三方库Pandas(042)

目录 一、用法精讲 141、pandas.Series.agg(regate)方法 141-1、语法 141-2、参数 141-3、功能 141-4、返回值 141-5、说明 141-6、用法 141-6-1、数据准备 141-6-2、代码示例 141-6-3、结果输出 142、pandas.Series.transform方法 142-1、语法 142-2、参数 142…

1196. 拐角I

问题描述 输入整数 &#x1d441;N &#xff0c;输出相应方阵。 输入一个整数 &#x1d441;N 。&#xff08; 0<&#x1d441;≤100) 输出一个方阵&#xff0c;每个数字的场宽为 3 附代码&#xff1a; #include<iostream> using namespace std; int main() { …

大屏数据看板一般是用什么技术实现的?

我们看到过很多企业都会使用数据看板&#xff0c;那么大屏看板的真正意义是什么呢&#xff1f;难道只是为了好看&#xff1f;答案当然不仅仅是。 大屏看板不仅可以提升公司形象&#xff0c;还可以提升企业的管理层次。对于客户&#xff0c;体现公司实力和品牌形象&#xff0c;…

域名解析到ipv6,并用CF隐藏端口

要求&#xff1a;域名解析到 IPv6 地址并隐藏端口 ‍ 效果&#xff1a;用域名 https://myhalo.soulio.top​ 访问http://[2409:8a62:867:4f12:56c7:5508:f7x6:8]:8080​。唯一缺点是延迟有点高。 ​​ ‍ 难度&#xff1a;需要有一定域名解析、cloudflare使用基础 ‍ 实…

org.springframework.context.annotation.DeferredImportSelector如何使用?

DeferredImportSelector 是 Spring 框架中一个比较高级的功能&#xff0c;主要用于在 Spring 应用上下文的配置阶段延迟导入某些组件或配置。这个功能特别有用&#xff0c;比如在处理依赖于其他自动配置的场景&#xff0c;或者当你想基于某些条件来决定是否导入特定的配置类时。…

一个简单的UAF/HeapOverflow fuzz demo学习

前言 偶然逛到的一篇&#xff0c;UAF_overflow_check | Note (gitbook.io) 最近正好也在初学fuzz&#xff0c;学习下对一些漏洞的检测fuzz思路。 刚好这个也指了一条学习方法&#xff1a;看成熟的fuzz工具的源码是如何处理的。 https://github.com/google/sanitizers/tree/m…

【Django+Vue3 线上教育平台项目实战】Elasticsearch实战指南:从基础到构建课程搜索与数据同步接口

文章目录 前言一、Elasticsearch倒排索引 二、Docker 搭建 ESDocker 安装Docker 搭建 ES 三、ES基础语法创建索引查看索引删除索引添加数据查询数据修改数据删除数据条件查询分页查询排序 多条件查询andor 范围查询 四、ES在项目中的应用示例 前言 在数据驱动的时代&#xff0c…

c#Action委托和Func委托

using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks;namespace Action委托 {internal class Program{static void PrintString(){Console.WriteLine("hello world.");}static void PrintInt(int …

vue3页面编写-导入导出excel、展开查询项等

数据保持 <router-view v-slot"{ Component, route }"><keep-alive><component :is"Component" :key"route.name" v-if"route.meta.keepAlive" /></keep-alive><component :is"Component" :key…

HarmonyOS应用开发者高级认证,Next版本发布后最新题库 - 多选题序号4

基础认证题库请移步&#xff1a;HarmonyOS应用开发者基础认证题库 注&#xff1a;有读者反馈&#xff0c;题库的代码块比较多&#xff0c;打开文章时会卡死。所以笔者将题库拆分&#xff0c;单选题20个为一组&#xff0c;多选题10个为一组&#xff0c;题库目录如下&#xff0c;…

大语言模型-RetroMAE-检索预训练模型

一、背景信息&#xff1a; RetroMAE是2022年10月由北邮和华为提出的一种密集检索预训练策略。 RetroMAE主要应用于检索模型的预训练&#xff0c;模型架构为非对称的Encoder-Decode结构。 二、整体结构&#xff1a; RetroMAE的模型架构为非对称的Encoder-Decode结构。 Encod…

C++学习笔记-operator关键字:重载与自定义操作符

在C编程中&#xff0c;operator关键字扮演着极其重要且独特的角色。它允许开发者为内置类型或自定义类型重载或定义新的操作符行为。这一特性极大地增强了C的表达能力&#xff0c;使得代码更加直观、易于理解和维护。本文将深入探讨C中operator关键字的使用&#xff0c;包括操作…

Interesting bug caused by getattr

题意&#xff1a;由 getattr 引起的有趣的 bug 问题背景&#xff1a; I try to train 8 CNN models with the same structures simultaneously. After training a model on a batch, I need to synchronize the weights of the feature extraction layers in other 7 models. …

Elasticsearch:Golang ECS 日志记录 - zap

ECS 记录器是你最喜欢的日志库的格式化程序/编码器插件。它们可让你轻松地将日志格式化为与 ECS 兼容的 JSON。 编码器以 JSON 格式记录日志&#xff0c;并在可能的情况下依赖默认的 zapcore/json_encoder。它还处理 ECS 错误格式的错误字段记录。 默认情况下&#xff0c;会添…