Kafka - 生产者

生产者消息对象

public class ProducerRecord<K, V> {private final String topic; // 主题private final Integer partition; //分区号private final Headers headers; //消息头部private final K key; //键private final V value; //值private final Long timestamp; //消息的时间戳
}

其中key是用来指定消息的键,它不仅是消息的附加信息,还可以用来计算分区号,进而让消息发往特定的分区,一般同一个key的消息会被划分到同一个分区中。
timestamp是指消息的时间戳,它有CreateTime和LogAppendTime两种类型,前者表示消息创建的时间,后者表示消息追加到日志文件的时间。

创建生产者实例

public static Properties initConfig() {Properties props = new Properties();props.put(ProducerConfig.KEY_SERIALZER_CLASS_CONFIG,StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALZER_CLASS_CONFIG,StringSerializer.class.getName());
}KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

消息的发送

创建生产者实例

创建生产者实例的方法有很多种,其中最简单的是下面的构造方于除了topic和value外的属性,其他都置为null。

public ProducerRecord(String topic, V value);

发送消息主要有三种模式:发完即忘(fire-and-forget),同步(sync)及异步(async)。

KafkaProducer的sand()方法返回值并非是void类型,而是Future类型,send()方法有两个重载方法,具体定义如下:

public Future<RecordMetadata> send(ProducerRecord<K,V> record);
public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback);
  • 发完即忘
    它只管往Kafka中发送消息而并不关心消息是否正确到达。
    在大多数情况下,这种发送方式没有什么问题,不过在某些时候(比如发生不可重试异常时),会造成消息的丢失。这种发送方式性能最高,但可靠性也最差。

  • 同步发送

try {producer.send(record).get();
} catch (ExecutionException | InterruptedException e) {e.printStackTrace();
}

通过feature对象中的get()方法,来阻塞等待kafka的响应,直到发送成功,或者发生异常。

同步发送的可靠性高,但性能会差很多,因为需要阻塞等到一条消息发送完之后,才能发送下一条。

  • 异步发送
producer.send(record, new Callback()) {@overridepublic void onCompletion(RecordMetadata metadata, Excetion exception){if (excetion != null) {exception.printStackTrace();} else {...}}
}

当Kafka有响应时候,就会有回调,要么发送成功,要么抛出异常。

序列化器

生产者需要用序列化器把对象转换成字节数组,才能通过网络发送给Kafka。而消费者需要用反序列器把从Kafka中收到的字节数组转换成相应的对象。

分区器

分区器的作用是为消息分配分区。

消息经过序列化后,就需要确定它发往的分区,如果消息ProducerRecord中指定了partition字段,那么就不需要分区器,因为patition代表的就是要发往的分区号。如果没有指定partition,则需要依赖分区器,根据key字段来计算partition的值。

拦截器

生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前,做一些定制化的需求,比如统计类工作。

原理分析

整体架构

在这里插入图片描述
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程Sender线程。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用后,缓存到消息收集器中(RecordAccumulator)。Sender现成负责从消息收集器中获取消息,并将其发送到kafka中。

RecordAccumulator
该收集器主要用来缓存消息,以便Sender线程可以批量发送,进而减少网络传输的资源消耗,以提高性能。

RecordAccumulator的内部为每个分区都维护了一个双端队列,队列中的内容为ProducerBatch,即Deque。消息写入缓存时候,追加到双端队列的尾部,读取消息时,从双端队列的头部读取。

Sender
Seender从RecordAccumulator中获取缓存的消息后,会进一步将原来<分区,Deque>的保存形式转变为<Node, List>的形式,其中Node表示集群的broken节点。

对于网络连接来说,生产者客户端是与具体的broken节点建立的连接,就是向具体的broken节点发送消息,而不关心消息属于哪一个分区;而对于KafkaProducer
的应用逻辑而言,我们只关注向哪个分区中发送消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。

在转换成<Node, List>后,Sender还会进一步封装成<Node, Request>的形式,这样就可以将Request发往各个Node了。

请求在从Sender线程发往Kafka之前还会保存到InFlightRequest中,InFlightRequest存对象的具体形式为Map<NodeId, Deque>,它的主要作用是缓存了已经发送出去但还是没有收到响应的请求。

元数据的更新

元数据是指Kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上follower副本又分配在哪些节点上等等信息。

假设我们通过如下的方式创建了一条消息ProducerRecord,

ProducerRecord<String, String> record = new ProducerRecord<>(topic, “xxx”);

这里的发送指令,我们只知道主题名称,和需要发送的内容,对其他信息却一无所知。例如要将此消息追加到指定主题的某个分区所对应的leader副本之前,首先需要知道主题的分区数量,然后计算出目标分区,还需要知道leader副本所在的broken节点的地址、端口等信息才能建立链接,这些都属于元信息。

元数据的更新是在客户端进行的,对客户端的外部使用者不可见。更新操作是由Sender线程发起的,主线程也需要读取这些信息,这里的数据同步,是通过Synchronized和final关键字来保障。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/49520.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

opencv 按键开启连续截图,并加载提示图片

背景图小图 键盘监听使用的是pynput 库 保存图片时使用了年月日时分秒命名 原图&#xff1a; from pynput import keyboard import cv2 import time# 键盘监听 def on_press(key):global jieglobal guanif key.char a:jie Trueelif key.char d:jie Falseelif key.char…

FPGA JTAG最小系统 EP2C5T144C8N

FPGA的文档没有相应的基础还真不容易看懂&#xff0c;下面是B站上对FPGA文档的解读(本文非对文档解读&#xff0c;只是为个人记录第三期&#xff1a;CycloneIV E最小系统板设计&#xff08;一&#xff09;从Datasheet上获取FPGA的基本参数_哔哩哔哩_bilibili 电源部份 核心电…

AI学习记录 -使用react开发一个网页,对接chatgpt接口,附带一些英语的学习prompt

实现了如下功能&#xff08;使用react实现&#xff0c;原创&#xff09; 实现功能&#xff1a; 1、对接gpt35模型问答&#xff0c;并实现了流式传输&#xff08;在java端&#xff09; 2、在实际使用中&#xff0c;我们的问答历史会经常分享给他人&#xff0c;所以下图的 copy …

Python酷库之旅-第三方库Pandas(042)

目录 一、用法精讲 141、pandas.Series.agg(regate)方法 141-1、语法 141-2、参数 141-3、功能 141-4、返回值 141-5、说明 141-6、用法 141-6-1、数据准备 141-6-2、代码示例 141-6-3、结果输出 142、pandas.Series.transform方法 142-1、语法 142-2、参数 142…

1196. 拐角I

问题描述 输入整数 &#x1d441;N &#xff0c;输出相应方阵。 输入一个整数 &#x1d441;N 。&#xff08; 0<&#x1d441;≤100) 输出一个方阵&#xff0c;每个数字的场宽为 3 附代码&#xff1a; #include<iostream> using namespace std; int main() { …

大屏数据看板一般是用什么技术实现的?

我们看到过很多企业都会使用数据看板&#xff0c;那么大屏看板的真正意义是什么呢&#xff1f;难道只是为了好看&#xff1f;答案当然不仅仅是。 大屏看板不仅可以提升公司形象&#xff0c;还可以提升企业的管理层次。对于客户&#xff0c;体现公司实力和品牌形象&#xff0c;…

域名解析到ipv6,并用CF隐藏端口

要求&#xff1a;域名解析到 IPv6 地址并隐藏端口 ‍ 效果&#xff1a;用域名 https://myhalo.soulio.top​ 访问http://[2409:8a62:867:4f12:56c7:5508:f7x6:8]:8080​。唯一缺点是延迟有点高。 ​​ ‍ 难度&#xff1a;需要有一定域名解析、cloudflare使用基础 ‍ 实…

【Django+Vue3 线上教育平台项目实战】Elasticsearch实战指南:从基础到构建课程搜索与数据同步接口

文章目录 前言一、Elasticsearch倒排索引 二、Docker 搭建 ESDocker 安装Docker 搭建 ES 三、ES基础语法创建索引查看索引删除索引添加数据查询数据修改数据删除数据条件查询分页查询排序 多条件查询andor 范围查询 四、ES在项目中的应用示例 前言 在数据驱动的时代&#xff0c…

c#Action委托和Func委托

using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks;namespace Action委托 {internal class Program{static void PrintString(){Console.WriteLine("hello world.");}static void PrintInt(int …

大语言模型-RetroMAE-检索预训练模型

一、背景信息&#xff1a; RetroMAE是2022年10月由北邮和华为提出的一种密集检索预训练策略。 RetroMAE主要应用于检索模型的预训练&#xff0c;模型架构为非对称的Encoder-Decode结构。 二、整体结构&#xff1a; RetroMAE的模型架构为非对称的Encoder-Decode结构。 Encod…

C++学习笔记-operator关键字:重载与自定义操作符

在C编程中&#xff0c;operator关键字扮演着极其重要且独特的角色。它允许开发者为内置类型或自定义类型重载或定义新的操作符行为。这一特性极大地增强了C的表达能力&#xff0c;使得代码更加直观、易于理解和维护。本文将深入探讨C中operator关键字的使用&#xff0c;包括操作…

Interesting bug caused by getattr

题意&#xff1a;由 getattr 引起的有趣的 bug 问题背景&#xff1a; I try to train 8 CNN models with the same structures simultaneously. After training a model on a batch, I need to synchronize the weights of the feature extraction layers in other 7 models. …

Elasticsearch:Golang ECS 日志记录 - zap

ECS 记录器是你最喜欢的日志库的格式化程序/编码器插件。它们可让你轻松地将日志格式化为与 ECS 兼容的 JSON。 编码器以 JSON 格式记录日志&#xff0c;并在可能的情况下依赖默认的 zapcore/json_encoder。它还处理 ECS 错误格式的错误字段记录。 默认情况下&#xff0c;会添…

OpenWrt 为软件包和docker空间扩容

参考资料 【openwrt折腾日记】解决openwrt固件刷入后磁盘空间默认小的问题&#xff0c;关联openwrt磁盘扩容空间扩容【openwrt分区扩容】轻松解决空间可用不足的尴尬丨老李一瓶奶油的YouTube 划分空间 参考一瓶奶油的YouTube 系统 -> 磁盘管理 -> 磁盘 -> 修改 格…

【机器学习】不同操作系统下如何安装Jupyter Notebook和Anaconda

引言 Jupyter Notebook 是一个非常流行的开源Web应用程序&#xff0c;允许你创建和共享包含代码、方程、可视化和解释性文本的文档 文章目录 引言一、如何安装Jupyter Notebook1.1 对于Windows用户1.2 对于macOS用户1.3 对于Linux用户&#xff1a; 二、如何安装Anaconda2.1 对于…

基于Element UI内置的Select下拉和Tree树形组件,组合封装的树状下拉选择器

目录 简述 效果 功能描述 代码实现 总结 简述 基于Element UI内置的Select下拉和Tree树形组件&#xff0c;组合封装的树状下拉选择器。 效果 先看效果&#xff1a; 下拉状态&#xff1a; 选择后状态&#xff1a; 选择的数据&#xff1a; 功能描述 1、加载树结构&…

Linux云计算 |【第一阶段】SERVICES-DAY2

主要内容&#xff1a; DNS服务基础及搭建、特殊解析(针对地址库文件&#xff1a;DNS轮询 DNS泛域名解析 DNS别名&#xff09;、缓存DNS&#xff08;全局转发forwarders&#xff09;、DNS递归迭代&#xff08;子域授权&#xff09;、DNS主从架构搭建、DNS主从数据同步 一、DNS工…

【技术升级】Docker环境下Nacos平滑升级攻略,安全配置一步到位

目前项目当中使用的Nacos版本为2.0.2&#xff0c;该版本可能存在一定的安全风险。软件的安全性是一个持续关注的问题&#xff0c;尤其是对于像Nacos这样的服务发现与配置管理平台&#xff0c;它在微服务架构中扮演着核心角色。随着新版本的发布&#xff0c;开发团队会修复已知的…

光伏模拟器应用

太阳能光伏 (PV) 模拟器是一种可编程电源&#xff0c;用于模拟太阳能电池板。模拟器具有快速瞬态响应&#xff0c;可响应负载条件的变化并保持电压-电流特性的输出。 用户可以根据系统规格定义太阳能电池板配置&#xff0c;并通过选择环境条件来选择适当的环境条件进行模拟。用…

pytest+allure

安装 下载&#xff1a;github win环境下载zip 环境变量&#xff1a; pycharm&#xff1a; pip install allure-pytest 验证安装 生成结果&#xff1a; if __name__ __main__:pytest.main([-s,test_createTag2.py,--alluredir,result]) 生成报告&#xff1a; allure gener…