Kafka原生API使用Java代码-生产者-分区策略-默认分区策略轮询分区策略

文章目录

  • 1、代码演示
  • 1.1、pom.xml
  • 1.2、KafkaProducerPartitioningStrategy.java
    • 1.2.1、ProducerConfig.LINGER_MS_CONFIG取 0 值得情况,不轮询
    • 1.2.2、ProducerConfig.LINGER_MS_CONFIG取 0 值得情况,轮询
    • 1.2.3、ProducerConfig.LINGER_MS_CONFIG取 1000 值得情况,轮询
    • 1.2.4、ProducerConfig.LINGER_MS_CONFIG取 1000 值得情况,不轮询
  • 2、分区策略
    • 2.1、linger.ms参数的含义
    • 2.2、linger milliseconds
    • 2.3、linger.ms配置参数的理解

1、代码演示

1.1、pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu.kafka</groupId><artifactId>kafka-producer</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka-producer</name><description>kafka-producer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

1.2、KafkaProducerPartitioningStrategy.java

1.2.1、ProducerConfig.LINGER_MS_CONFIG取 0 值得情况,不轮询

不等待,不轮询,默认分区策略

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.UUID;
public class KafkaProducerPartitioningStrategy {public static void main(String[] args) {// 初始化Kafka生产者配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.74.148:9092"); // 指定Kafka broker的地址和端口// 确认消息写入策略,"acks"设置为all表示所有副本都确认消息接收后才会响应props.put("acks", "all");// 消息发送失败时的重试次数,设置为0表示不重试props.put("retries", 0);// 发送缓冲区等待时间,等待1秒后,发送//props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定键的序列化器props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定值的序列化器// 分区器选择RoundRobinPartitioner,实现消息在主题分区间的轮询分配//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner"  );// 创建Kafka生产者实例Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 1000; i++) {producer.send(new ProducerRecord<String, String>("my_topic3", "88"), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("recordMetadata.partition() = " + recordMetadata.partition());}}});}producer.close();}
}

在这里插入图片描述

1.2.2、ProducerConfig.LINGER_MS_CONFIG取 0 值得情况,轮询

不等待,立即发送,轮询策略

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.UUID;
public class KafkaProducerPartitioningStrategy {public static void main(String[] args) {// 初始化Kafka生产者配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.74.148:9092"); // 指定Kafka broker的地址和端口// 确认消息写入策略,"acks"设置为all表示所有副本都确认消息接收后才会响应props.put("acks", "all");// 消息发送失败时的重试次数,设置为0表示不重试props.put("retries", 0);// 发送缓冲区等待时间,设置为0表示不等待,立即发送props.put(ProducerConfig.LINGER_MS_CONFIG, 0);   props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定键的序列化器props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定值的序列化器// 分区器选择RoundRobinPartitioner,实现消息在主题分区间的轮询分配props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner"  );// 创建Kafka生产者实例Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 1000; i++) {producer.send(new ProducerRecord<String, String>("my_topic3", "88"), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("recordMetadata.partition() = " + recordMetadata.partition());}}});}producer.close();}
}

在这里插入图片描述

1.2.3、ProducerConfig.LINGER_MS_CONFIG取 1000 值得情况,轮询

等待1秒后发送,轮询策略

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.UUID;
public class KafkaProducerPartitioningStrategy {public static void main(String[] args) {// 初始化Kafka生产者配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.74.148:9092"); // 指定Kafka broker的地址和端口// 确认消息写入策略,"acks"设置为all表示所有副本都确认消息接收后才会响应props.put("acks", "all");// 消息发送失败时的重试次数,设置为0表示不重试props.put("retries", 0);// 发送缓冲区等待时间,等待1秒后,发送props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定键的序列化器props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定值的序列化器// 分区器选择RoundRobinPartitioner,实现消息在主题分区间的轮询分配props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner"  );// 创建Kafka生产者实例Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 1000; i++) {producer.send(new ProducerRecord<String, String>("my_topic3", "88"), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("recordMetadata.partition() = " + recordMetadata.partition());}}});}producer.close();}
}

在这里插入图片描述

1.2.4、ProducerConfig.LINGER_MS_CONFIG取 1000 值得情况,不轮询

等待1秒后发送,不轮询

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.UUID;
public class KafkaProducerPartitioningStrategy {public static void main(String[] args) {// 初始化Kafka生产者配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.74.148:9092"); // 指定Kafka broker的地址和端口// 确认消息写入策略,"acks"设置为all表示所有副本都确认消息接收后才会响应props.put("acks", "all");// 消息发送失败时的重试次数,设置为0表示不重试props.put("retries", 0);// 发送缓冲区等待时间,等待1秒后,发送props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定键的序列化器props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定值的序列化器// 分区器选择RoundRobinPartitioner,实现消息在主题分区间的轮询分配//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner"  );// 创建Kafka生产者实例Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 1000; i++) {producer.send(new ProducerRecord<String, String>("my_topic3", "88"), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("recordMetadata.partition() = " + recordMetadata.partition());}}});}producer.close();}
}

在这里插入图片描述

2、分区策略

kafka的生产者分区策略

  1. 默认分区策略:减少重新建立分区连接的性能损耗 开发使用最多的分区方式,采用黏性分区,默认向第一次连接上的主题分区发送消息,直到消息累积到 batch.size大小(16kb)
  2. 轮询分区策略:每个分区接收一次消息(linger.ms决定生产者一次批量发送多少条消息 到一个分区中),开发中一定不会用轮询分区策略,顶多自定义,因为轮询性能太差,频繁跟不同的分区建立连接,大数据会用轮询策略

2.1、linger.ms参数的含义

在Kafka的生产者(Producer)配置中,props.put("linger.ms", 1); 这行代码是用于设置生产者的linger.ms参数的。

linger.ms参数的含义是:生产者会在发送消息之前等待更多消息被发送到同一个分区(partition)的额外时间(以毫秒为单位)。这样做的目的是为了提高吞吐量,因为将多个消息批量发送到同一个分区可以减少网络传输的开销和服务器端的I/O开销。

具体来说,当你设置了linger.ms参数(比如设置为1毫秒),Kafka生产者会尝试在发送消息之前等待1毫秒,看看是否还有其他的消息要发送到同一个分区。如果有,这些消息将会被合并成一个批次(batch)一起发送。

注意,设置linger.ms参数可能会增加消息的延迟,因为生产者会等待指定的时间以合并更多的消息。所以,这个参数需要在吞吐量和延迟之间进行权衡。

这里是一个简化的示例,展示如何在使用Java Kafka生产者时设置linger.ms参数:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 设置linger.msprops.put("linger.ms", 1);KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息...producer.close();}
}

在这个示例中,我们创建了一个KafkaProducer对象,并设置了包括linger.ms在内的多个配置参数。然后,你可以使用这个生产者对象来发送消息到Kafka集群。

2.2、linger milliseconds

linger.ms 的英文全称就是 “linger milliseconds”,其中 “linger” 是指延迟或等待,“milliseconds” 是毫秒的意思。

在 Kafka 的 Producer 配置中,linger.ms 参数用于控制 Producer 在发送消息之前等待更多消息到达相同分区(partition)的时间,以便可以将这些消息一起发送,从而提高吞吐量。默认情况下,linger.ms 的值为 0,这意味着 Producer 收到消息后会立即发送,不进行任何延迟。

linger.ms 参数与 batch.size 参数一起使用时,可以实现更复杂的消息发送策略。batch.size 参数定义了单个批次(batch)中允许的最大消息字节数。当 Producer 收到消息时,它会尝试将消息添加到当前批次中。如果linger.ms 大于 0,并且当前批次中的消息数量尚未达到 batch.size 的限制,那么 Producer 会等待 linger.ms 指定的时间,看看是否还有更多的消息要发送到相同的分区。如果有,这些消息将被添加到当前批次中;如果没有,那么在当前时间到达后,Producer 将发送当前批次中的所有消息。

需要注意的是,linger.ms 参数的值应该根据具体的业务场景和性能需求进行调整。较小的值可以提高消息的实时性,但可能会降低吞吐量;较大的值可以提高吞吐量,但可能会增加消息的延迟。因此,在实际应用中需要根据实际情况进行权衡和选择。

2.3、linger.ms配置参数的理解

在Kafka中,linger.ms是一个配置参数,用于控制生产者(producer)在发送消息到broker之前的等待时间,以便将更多的消息累积到同一批次中,从而提高吞吐量。linger.ms的取值可以是任何非负整数,表示毫秒数。

以下是关于linger.ms的一些关键点:

  • 如果linger.ms设置为0,生产者会立即发送消息到broker,不会等待其他消息来累积到同一批次。
  • 如果linger.ms设置为大于0的值,生产者会等待该指定的毫秒数,或者直到达到batch.size(批次大小)的限制,然后将累积的消息作为一个批次发送到broker。
  • 增大linger.ms的值可能会提高吞吐量,因为可以累积更多的消息到同一批次中,减少网络传输的次数。但是,这也会增加消息的延迟。
  • linger.ms的取值可以根据具体的应用场景和需求进行调整。在需要低延迟的场景中,可以将linger.ms设置为较小的值;在可以容忍一定延迟的场景中,可以尝试增大linger.ms的值以提高吞吐量。

综上所述,linger.ms的取值并没有固定的几个选项,而是可以根据实际需求设置为任何非负整数。在配置Kafka生产者时,需要根据具体的业务场景和需求来选择合适的linger.ms值。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/19032.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python使用modbustcp协议与PLC进行简单通信

AI应用开发相关目录 本专栏包括AI应用开发相关内容分享&#xff0c;包括不限于AI算法部署实施细节、AI应用后端分析服务相关概念及开发技巧、AI应用后端应用服务相关概念及开发技巧、AI应用前端实现路径及开发技巧 适用于具备一定算法及Python使用基础的人群 AI应用开发流程概…

LFCDR:Latent mutual feature extraction for cross-domain recommendation

Latent mutual feature extraction for cross-domain recommendation Knowledge and Information Systems-Hoon Park , Jason J. Jung-2024 思路 大部分研究都是主要集中在同构域,所以在没有共同用户的情况下,项目和元数据的异构对推荐任务造成了限制。 于是提出一个异构的…

B/S架构+java语言+Mysqladr数 据 库ADR药物不良反应监测系统源码 ADR药物不良反应监测系统有哪些作用?

B/S架构&#xff0b;java语言&#xff0b;Mysqladr数 据 库ADR药物不良反应监测系统源码 ADR药物不良反应监测系统有哪些作用&#xff1f; 药物不良反应(ADR)是指在合格药物以正常用量和用法用于预防、诊断、治疗疾病或调节生理功能时所发生的意外的、与防治目的无关的、不利或…

AI Agent智能体概述及原理

AI Agent概述 AI Agent旨在理解、分析和响应人类输入&#xff0c;像人类一样执行任务、做出决策并与环境互动。它们可以是遵循预定义规则的简单系统&#xff0c;也可以是根据经验学习和适应的复杂、自主的实体&#xff1b;可以是基于软件的实体&#xff0c;也可以是物理实体。…

大模型“1元购”?AI公司加速奔向应用端“大航海时代”

自字节跳动发布豆包大模型&#xff0c;互联网大厂纷纷就位&#xff0c;击穿“地板价”的打法从C端向B端拓展。这也成为今年“618”最亮眼的价格战。 5月15日&#xff0c;字节跳动率先宣布豆包大模型已通过火山引擎开放给企业客户&#xff0c;大模型定价降至0.0008元/千Tokens&…

ubuntu22.04部署docker版zlmediakit和源码运行wvp-GB28181-pro

1 运行zlmediakit 1. 修改zlmediakit配置文件 先用run命令运行zlmediakit&#xff0c;将zlmediakit的配置文件拷贝出来 docker run -d -p 1935:1935 -p 8080:80 -p 8554:554 \ -p 10000:10000 -p 10000:10000/udp -p 8000:8000/udp \ --name zlmediakit \ zlmediakit/zlmedi…

嵌入式0基础开始学习 ⅠC语言(6)函数

0.问题引入 有时候&#xff0c;经常需要在一个程序中&#xff0c;对一个数组进行输入输出 如&#xff1a; int a[3][4]; int i,j; for(i0;i<3;i) { for(j0;j<4;j) …

风萧萧兮易水寒,壮士一去兮不复还 的 rm 命令

风萧萧兮易水寒&#xff0c;壮士一去兮不复还 的 rm 命令 风萧萧兮易水寒&#xff0c;壮士一去兮不复还 的 rm语法几个示例/bin/rm Argument list too long – Linux”配合find与xargs完成删除海量文件使用find的delete选项 快速删除大文件 风萧萧兮易水寒&#xff0c;壮士一去…

设计模式20——职责链模式

写文章的初心主要是用来帮助自己快速的回忆这个模式该怎么用&#xff0c;主要是下面的UML图可以起到大作用&#xff0c;在你学习过一遍以后可能会遗忘&#xff0c;忘记了不要紧&#xff0c;只要看一眼UML图就能想起来了。同时也请大家多多指教。 职责链模式&#xff08;Chain …

FreeRTOS【8】二值信号量使用

1.开发背景 FreeRTOS 提供了队列可以在线程间快速交换信息&#xff0c;那么还有没有其他交互渠道&#xff1f;答案是有的&#xff0c;相对于队列传递信息&#xff0c;还有更轻量级的线程唤醒操作&#xff0c;那就是信号量&#xff0c;而二值信号量就是最简单的一种。 二值信号量…

解读一下15.52.34.160/27

IP地址15.52.34.160/27可以分解为两部分来解读&#xff1a; IP地址: 15.52.34.160 这是分配给网络接口的地址&#xff0c;用于在网络中标识一个特定的设备。 子网掩码: /27 这表示子网掩码是27位长&#xff0c;意味着网络部分占据了IP地址的前27位&#xff0c;剩下的5位用于主…

C++模拟实现stack和queue

1 stack 1.1概念 stl栈 1.2栈概念 1.3代码 2 queue 2.1概念 stl队列 2.2队列概念 2.3代码

【python】实用性python脚本链接汇总

目标检测方向–python脚本 标签处理 链接备注基于YOLO目标检测任务相关将一张labelImg图片标注的xml标签文件检测框复制到其他图片目标检测任务的VOC标签&#xff0c;可复制xml修改目标检测的xml标签(VOC)类别名目标检测任务的VOC标签&#xff0c;修改标签文件的类别名称(nam…

网上打印资料A4纸一般多少钱一张

我们知道&#xff0c;在打印需求上A4纸&#xff08;210mmx297mm&#xff09;是较为常见的打印用纸&#xff0c;同时因为纸张的不同在价格上也存在一定的差异。当然&#xff0c;因在网上打印平台打印资料&#xff0c;能够降低一定的租金个人工成本。 因此&#xff0c;在网上打印…

Swift 中的Getter 和 Setter

目录 前言 1. 什么是Getter和Setter 1.定义 2.作用 2.属性 1.存储属性 2.计算属性 3.属性观察者 3. 使用 Getter 和 Setter 的场景 1.数据转换 2.懒加载 3.数据验证和限制 4.触发相关操作 4.自定义Getter 和 Setter 5. 参考资料 前言 属性是 Swift 编程中的基本…

关于 Spring 是什么

Spring 是什么 我们通常所说的 Spring 指的是 Spring Framework&#xff08;Spring 框架&#xff09;&#xff0c;它是⼀个开源框架&#xff0c;有着活跃⽽庞⼤的社区&#xff0c;这就是它之所以能⻓久不衰的原因。Spring ⽀持⼴泛的应⽤场景&#xff0c;它可以让 Java 企业级的…

九章云极DataCanvas公司DingoDB完成中国信通院权威多模数据库测试

2024年5月16日&#xff0c;九章云极DataCanvas公司自主研发和设计的开源多模向量数据库DingoDB顺利完成中国信息通信研究院&#xff08;以下简称中国信通院&#xff09;多模数据库产品测试。本次测试的成功标志着DingoDB在技术能力、性能表现和产品稳定性方面得到了权威机构的高…

python深入探索斐波那契数列:代码示例与不满足的外围条件

新书上架~&#x1f447;全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一、斐波那契数列的初步实现 二、外围条件的不满足情况 总结 一、斐波那契数列的初步实现 …

Presto 从提交SQL到获取结果 源码详解(2)

逻辑执行计划&#xff1a; //进入逻辑执行计划阶段 doAnalyzeQuery().new LogicalPlanner().plan(analysis);//createAnalyzePlan createAnalyzePlan(analysis, (Analyze) statement);//返回RelationPlan&#xff0c;&#xff08;返回root根节点&#xff0c;逻辑树上包含输出字…

docker容器中解决中文乱码

1. 找到dockerfile文件 2. 编辑Dockerfile 添加 ENV LANG en_US.UTF-8 ENV LANGUAGE en_US:en ENV LC_ALL en_US.UTF-8 3. 生成新的镜像文件 FROM java17_yinpeng:latest MAINTAINER YP <2064676101QQ.COM> ADD jiquan_online_chat.jar jiquan_online_chat #CM…