kafka学习笔记(三、生产者Producer使用及配置参数)

在这里插入图片描述


1.简介

1.1.producer介绍

生产者就是负责向kafka发送消息的应用程序。消息在通过send()方法发往broker的过程中,有可能需要经过拦截器(Interceptor)序列化器(Serializer)分区器(Partitioner)的一系列作用后才能被真正的发往broker
demo:

public class KafkaClient {private static final String brokerList = "localhost:9092";private static final String tipic = "topic-test";public static Properties initConfig() {Properties props = new Properties();props.put("bootstrap.servers", brokerList);props.put("key.serializer", "org.apache.kafka.common.seralization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.seralization.StringSerializer");props.put("clinet.id", "producer.client.id.test");return props;} public static void main(String[] args) {Properties props = initConfig();KafkaProducer<String, String> producer = new KafkaProducer<>(props);ProducerRecord<String, String> record = new ProducerRecord<>(topic, "kafka producer test.");try {producer.send(record);} catch (Exception e) {e.printStackTrace();}}
}

ProducerRecord类:

public class ProducerRecord<K, V> {private final String topic; // 主题private final Integer partition; // 分区号private final Headers headers; // 消息头部private final K key; // 键private final V value; // 值private final Long timestamp; // 消息的时间戳
}
  • ProducerRecord类中的key属性:
    key用来指定消息的键,不仅是附加消息还可以用来计算分区号而可以让消息发往特定的分区。

    • 同一个key的消息会被划分到同一分区
    • 有key的消息还可以支持日志压缩的功能
  • 必要参数:

    • bootstrap.servers:客户端连接kafka集群所属broker地址(host1:port1),并非需要所有的broker地址,生产者会从给定的broker中查到其他broker信息,建议至少设置两个以上。
    • key.serializer和value.serializer:broker端接收的消息必须以字节数组(byte[])的形式存在。发往broker之前需要将消息中对应的key和value做相应的序列化操作来转换整字节数组。
  • KafkaProducer是线程安全的,可以在多个线程中共享单个KafkaProducer实例,也可以将KafkaProducer实例进行池化来供其他线程使用。
  • 消息发送有三种模式
    • 发后即忘(fire-and-forget):producer.send(record);
    • 同步(sync):producer.send(record).get();
    • 异步(async):producer.send(record, callback); // 回调函数的调用可以保证分区有序。

1.2.生产者拦截器

用来在消息发送前做一些准备工作,比如按照某个规则过滤、修改消息内容等,也可以用来在发送回调逻辑中做一些定制化的需求。

实现: 需自定义实现org.apache.kafka.clinets.producer.ProducerInterception接口。接口中有三个方法:

public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record); // 对消息进行相应的定制化处理// 在消息被应答或发送失败时调用,优先于Callback之前执行。此方法运行在Producer的IO线程中,所以
// 此方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。
public void onAcknowledgement(RecordMetadata metadata, Exception exception);public void close();

KafkaProducer可以指定多个拦截器形成拦截链。拦截链会按照interceptor.classes参数配置的拦截器顺序来执行。

1.3.序列化

生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给kafka。而在对侧,消费者需要反序列化器(Deserializer)把从kafka中收到的字节数组转换成对应的对象。

客户端自带的序列化器都实现了org.apache.kafka.common.serialization.Serializer接口,此接口有三个方法:

public void configure(Map<String, ?> configs, boolean isKey); // 配置当前类,主要确定编码类型
public byte[] serialize(String topic, T data); // 将类型T的数据转换为byte[]
public void close(); // 关闭当前的序列化器,一般情况下为空方法

生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的。

1.4.分区器

为消息分配分区。如果消息ProducerRecord中没有指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值。

kafka提供的默认分区器是org.apache.kafka.clients.producer.internals.DefaultPartitioner,它实现了org.apache.kafka.clients.producer.Partitioner接口,此接口中定义了两个方法:

// 用来计算分区号
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
public void close(); // 关闭分区器的时候用来回收一些资源

Partitioner接口还有一个父接口org.apache.kafka.common.Configurable,此接口只有一个方法:

void configure(Map<String, ?> configs); // 获取配置信息机初始化数据

kafka中除了使用默认的分区器进行分区外还可以使用自定义的分区器,只要实现Partitioner即可。

2.整体架构

在这里插入图片描述

2.1.RecordAccumulator讲解

主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源以便提升性能。(缓存大小通过生产者客户端参数buffer.memory配置,默认32MB)。

如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,此时KafkaProducer的send()方法调用要么被堵塞要么抛出异常,这个取决于参数max.block.ms的配置,默认为60秒。

  • 主线程发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,RecordAccumulator内部为每个分区都维护了一个双端队列,队列的内容就是ProducerBatch,即Deque<ProducerBatch>

    注意: ProducerBatch中包含一个或多个ProducerRecord。ProducerRecord是生产者创建的消息,ProducerBatch是一个消息批次,ProducerRecord会被包含在ProducerBatch中,使字节的使用更加紧凑,也可以减少网络请求的次数以提高整体的吞吐量。

  • 消息写入缓存时,追加到Deque的尾部;Sender读取消息时,从Deque的头部读取。
  • Kafka生产者客户端通过java.io.ByteBuffer实现消息内存的创建和释放,不过频繁的创建和释放是比较耗费资源的,所以在RecordAccumulator内部存在一个BufferPool来实现ByteBuffer的复用,达到缓存高效利用的目的。

    BufferPool只针对特定大小(通过batch.size参数指定,默认16KB)的ByteBuffer进行管理,其他大小的ByteBuffer不会缓存到BufferPool中,我们可以适当的调大batch.size参数以便多缓存一些消息。

2.2.消息进入RecordAccumulator的逻辑

当一条ProducerRecord进入RecordAccumulator时:

  1. 先寻找与消息分区对应的Deque(没有则创建);
  2. 再从这个Deque尾部获取一个ProducerBatch(没有则创建);
  3. 查看ProducerBatch中是否可以写入这个ProducerRecord,可以则写入不可以则需新建ProducerBatch再写入;

    新建ProducerBatch时需评估这条消息是否超过batch.size参数的大小,如不超过则以batch.size参数大小来创建ProducerBatch(使用完这段内存区域后可以通过BufferPool的管理来复用),若超过则以评估的大小来创建(此段内存区域不会被复用)。

2.3.Sender步骤

Sender线程从RecordAccumulator中获取缓存的消息后:

  1. <分区, Deque<ProducerBatch>>转换为<Node, List<ProducerBatch>>,其中Node表示kafka集群的节点;

    对网络连接来说,生产者客户端是与具体的broker节点建立连接,不关心消息的分区;而KafkaProducer的应用逻辑则只关注往哪个分区发送那些消息,所以这里需要做一个应用逻辑层面到网络IO层面的转换

  2. 将转换的<Node, List<ProducerBatch>>进一步封装成<Node, Request>的形式,这样就可以将Request发往各个Node了。

    这里的Request指kafka的各种协议请求,对于消息的发送而言就是指具体的ProducerRequest。

  3. 请求在从Sender线程发往kafka之前还会以Map<NodeId, Deque<Request>>形式保存到InFlightRequest中,以缓存已经发出去但还没有收到相应的请求。

    通过配置参数可以限制每个连接(客户端与Node之间的连接)最多缓存的请求数。此配置参数为:max.in.flight.requests.per.connection,默认为5,即每个连接最多只能缓存5个未响应的请求,超过此值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应。

2.4.元数据

值kafka集群的元数据,这些数据包括集群中有主题信息、主题上的分区信息、分区的leader副本信息、follower副本信息、副本的AR和ISR集合信息、集群的节点信息以及控制器节点信息等。

更新元数据信息的条件:

  • 当客户端中没有需要使用的元数据信息,比如没有指定的Topic信息;
  • 超过metadata.max.age.ms时间没有更新元数据信息,此配置默认为5分钟;

更新元数据信息的步骤:

  1. 元数据更新操作时在是在客户端内部进行的,对外部不可见。
  2. 当需要更新元数据信息时,会先挑选出leastLoadedNode,然后向这个Node发送MetadataRequest请求来获取具体的元数据信息。
  3. 这个跟新操作由Sender线程发起,在创建完MetadataRequest之后同样会存入InFlightRequests,之后的步骤和发送消息类似。

元数据信息Sender线程负责更新,主线程需要读取,所以这里的数据同步通过synchronizedfinal关键字来保障。

Producer配置参数

参数默认值含义
bootstarp.servers“”指定连接kafka集群的broker地址(可以只有部分broker地址)
key.serializer“”消息中key对应的序列化类,需实现org.apahce.kafka.common.serialization.Serialiaer
value.serializer“”消息中value对应的序列化类,需实现org.apahce.kafka.common.serialization.Serialiaer
client.id“”指定kafkaProducer对应的客户端id(用来标记消息是从哪个客户端发来的)
acks“1”指定分区中必须有多少个副本收到此条消息生产者才会认为这条消息是成功写入的。它涉及消息的可靠性和吞吐量之间的权衡。asks=1:leader成功写入则返回成功;acks=0:发送消息后不需要等待服务器的响应;scks=-1/all:ISK集合中所有副本成功写入才能收到服务器的成功响应。
buffer.memory32MB生产者客户端中用于缓存消息的缓冲区大小。详见本章节#2.1
batch.size16KB用于指定producerBatch可以复用的内存区域的大小。
max.request.size1MB限制生产者客户端能发送消息的最大值,一般不建议盲目的增大,因为此参数与broker端的message.max.bytes参数有联动。
retries0生产者重试次数
retry.backoff.ms100设定两次重试之间的时间间隔
metadata.max.age30000ms如果在这个时间内无数据没有更新的话会被强制更新
compression.type“none”指定消息的压缩方式,默认情况下消息不会被压缩,,该参数还可以配置为"gzip"、"snappy"和"l24"。消息压缩可极大的减少网络传输量,减低网络的IO,提高整体的性能,是一种以时间换空间的优化方式。
connections.max.idle.ms540000ms指定多久之后关闭闲置的连接
linger.ms0指定生产者发送producerBatch之前等待更过的producerRecord加入ProducerBatch的时间。生产者客户端会在producerBatch被填满或等待时间超过linger.ms值时发送出去。
receive.buffer.bytes32KB设置socket接收消息缓冲区(SO_RECBUF)的大小。如果设置为-1,则使用操作系统的默认值;如果Producer与kafka处于不同的机房,则可以适当调大这个参数值。
send.buffer.bytes128KB设置socket发送消息缓冲区的大小…
request.timeout.ms30000ms配置Producer等待请求响应的最大时间。注意此参数需要比broker端参数replica.lag.time.max.ms的值大,这样可以减少因客户端重试而引起的消息重复的概率
max.block.ms60000用来控制kafkaProducersend()方法和partitionsFor()方法的阻塞时间。当生产者的送缓冲区已满或者没有可用的元数据时,这些方法就会阻塞。
partitioner.class~Defaultpartitioner用来指定分区器,需要实现org.apache.kafka.clients.producer.partitioner
enable.idempotencefalse是否开启幂等性功能。所谓幂等简单说就是对接口的多次调用所产生的结果和调用一次是一致的。生产者在进行重试的时候有可能会重复的写入消息,而使用kafka的幂等性功能之后就可以避免这种情况。
interceptor.classes“”用来设定生产者拦截器,需实现org.apache.kafka.clients.producer.ProducerInterceptor接口。
max.in.fligh.request5限制客户端与Node之间的连接最多缓存的请求数。
per.connection.transactional.idnull设置事务id,必须唯一

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/832918.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode算法题:7. 整数反转

给你一个 32 位的有符号整数 x &#xff0c;返回将 x 中的数字部分反转后的结果。 如果反转后整数超过 32 位的有符号整数的范围 [−2^31, 2^31 − 1] &#xff0c;就返回 0。 假设环境不允许存储 64 位整数&#xff08;有符号或无符号&#xff09;。 示例 1&#xff1a; 输…

武汉星起航:自运营团队—亚马逊平台运营典范,优势凸显业绩斐然

武汉星起航电子商务有限公司&#xff0c;作为跨境电商领域的佼佼者&#xff0c;凭借自运营团队多年的深耕经验与对亚马逊市场规则的深刻理解&#xff0c;成功在亚马逊平台开设多家自营店铺&#xff0c;并取得显著成绩。公司月流水达到几百万的辉煌业绩&#xff0c;不仅彰显了其…

嵌入式学习<1>:建立工程、GPIO

嵌入式学习_part1 本部分笔记用于学习记录&#xff0c;笔记源头 >>b站江科大_STM32入门教程 建立工程、GPIO 开发环境&#xff1a;keil MDK、STM32F103C8T6 1 &#xff09;建立工程 &#xff08;1&#xff09;基于寄存器开发、基于标准库 或者 基于HAL库开发; &…

IDEA无法下载远程仓库jar包问题

问题描述&#xff1a; idea无法下载远程仓库jar包&#xff0c;最奇怪的是idea有多个项目&#xff0c;有些项目可以下载&#xff0c;有些项目不行。报错如下&#xff1a; 一开始&#xff1a; unable to find valid certification path to requested target Try run Maven impo…

UV胶是什么材料制成的?

UV胶是一种特殊的胶水&#xff0c;由丙烯酸酯单体、活性稀释剂、光引发剂、助剂等材料制成的。它是指在紫外线照射下能够迅速固化的胶水。UV胶的主要成分是丙烯酸酯单体&#xff0c;它在没有紫外线照射时是液体状态&#xff0c;但一旦受到紫外线照射&#xff0c;就会迅速发生聚…

docker安装Debian:11 freeswitch1.10.5

文章目录 一、生成一个镜像二、切换一个镜像源为阿里源三、安装一些相关依赖和freeswitch3.1第一步&#xff1a;安装freeswitch-mod和下载所需的依赖项3.2 设置密钥3.3 安装freeswitch所需的依赖项3.4 报错3.4.1 报错13.4.2 报错23.4.3 报错3 四、运行4.1 通话三十秒自动挂断 一…

我独自升级崛起下载方法分享 下载教程

《我独自升级&#xff1a;崛起》这款精彩绝伦的动作角色扮演游戏&#xff0c;灵感来源于大热网络漫画&#xff0c;让玩家亲自踏上主角程肖宇的征途&#xff0c;从觉醒初阶到实力飞跃&#xff0c;每一步成长都扣人心弦。值得注意的是&#xff0c;尽管全球正式发布日期定在了五月…

谷歌推广和seo收录是一回事吗?

那自然不是一回事&#xff0c;谷歌推广一般指的是谷歌的广告服务&#xff0c;通过购买广告位&#xff0c;以便用户在谷歌搜索特定关键词时显示您的广告&#xff0c;这种方式通常基于点击收费&#xff0c;意味着您只有在有人点击您的广告时才需要支付费用。谷歌推广可以让您的网…

局域网监控软件能干什么|有哪些好用的局域网监控软件

企业局域网已成为日常工作中不可或缺的一部分。 然而&#xff0c;网络环境的复杂性和员工上网行为的多样性&#xff0c;使得企业面临着诸多安全风险和管理挑战。 因此&#xff0c;高效局域网监控上网记录监测成为了企业保障信息安全和提升工作效率的重要手段。 高效局域网监控…

自然语言(NLP)

It’s time for us to learn how to analyse natural language documents, using Natural Language Processing (NLP). We’ll be focusing on the Hugging Face ecosystem, especially the Transformers library, and the vast collection of pretrained NLP models. Our proj…

电脑显示丢失mfc140u.dll怎么修复,总共有7个方法

mfc140u.dll 是一个动态链接库&#xff08;Dynamic Link Library&#xff09;文件&#xff0c;它是Microsoft Foundation Class (MFC)库的一部分&#xff0c;专为使用C编程语言开发Windows应用程序而设计。MFC库由微软提供&#xff0c;作为一个高级的应用程序框架&#xff0c;旨…

XSS漏洞---XSS-labs通关教程

文章目录 前言一、pandas是什么&#xff1f;二、使用步骤 1.引入库2.读入数据总结 Level-1 过滤源码&#xff1a;无 pyload&#xff1a; name<script>alert(1)</script> Level-2 过滤源码&#xff1a;利用转译函数将特殊字符转译为实体字符 $str $_GET["…

【完美解决】使用git时候出现error setting certificate verify locations: CAfile:问题

1、出现场景&#xff1a; 在使用idea的时候&#xff0c;进行git下的push&#xff0c;出现下面的错误&#xff1a; 2、原因分析&#xff1a; 可能因为重装过系统&#xff0c;或者是安装git的位置发生了变化等情况出现。 3、解决方案&#xff1a; 找到git的安装路径&#xf…

翻译《The Old New Thing》- Rendering menu glyphs is slightly trickier

Rendering menu glyphs is slightly trickier - The Old New Thing (microsoft.com)https://devblogs.microsoft.com/oldnewthing/20050802-13/?p34743 Raymond Chen 2005年08月02日 上次&#xff0c;我们看到了如何绘制有主题和无主题的单选按钮&#xff0c;我提到菜单位图更…

Centos 7.9 配置VNCServer实现远程vnc连接

文章目录 1、Centos安装图形界面1.1、安装X Windows System图形界面1.2、安装GNOME图形界面 2、VNC SERVER配置2.1、VNC SERVER安装2.2、VNC SERVER配置1&#xff09;创建vnc配置文件2&#xff09;修改配置文件内容3&#xff09;完整配置文件参考 2.3、设置vnc密码2.4、配置防火…

PyQt5中重要的概念:信号与槽

PyQt中信号与槽概念定义如下&#xff08;网络上引用的&#xff09;&#xff1a; 信号&#xff08;signal&#xff09;和槽&#xff08;slot&#xff09;是Qt的核心机制&#xff0c;也是在PyQt编程中对象之间进行通信的机制。在创建事件循环之后&#xff0c;通过建立信号和槽的…

postcss-px-to-viewport 从入坑到放弃 (nuxt3搭建响应式官网解决方案 )

前沿 什么是 postcss-px-to-viewport 将px单位转换为视口单位的 (vw, vh, vmin, vmax) 的 PostCSS 插件。 为什么使用 postcss-px-to-viewport 在pc端盛行的时代 &#xff0c;如果你不想去适配更多的pc端代码&#xff0c;可以采用它。 由于nuxt3本身已带postcss&#xff0c;所…

六西格玛项目的核心要素:理论学习、实践应用与项目经验

许多朋友担心&#xff0c;没有项目经验是否就意味着无法考取六西格玛证书。针对这一疑问&#xff0c;张驰咨询为大家详细解答。 首先&#xff0c;需要明确的是&#xff0c;六西格玛项目不仅仅是一种管理工具或方法&#xff0c;更是一种追求卓越、持续改进的思维方式。它强调通…

git与gitlab

目录 gitlab 下载与安装 重置管理员密码 gitlab命令 git远程gitlab相关命令 认证 补充 git git 分布式版本控制 安装 git的四个区域与文件的四个状态 使用git 常用命令 git 分布式管理系统 gitlab 企业私有库 github 公网共享库&#xff0c;全球…

PHP基础教程

&#x1f40c;博主主页&#xff1a;&#x1f40c;​倔强的大蜗牛&#x1f40c;​ &#x1f4da;专栏分类&#xff1a;PHP &#x1f4da;参考教程&#xff1a;菜鸟\编程网❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 目录 一、PHP语法 基本的 PHP 语法 PHP 注释 PHP空白不敏…