Kafka原生API使用Java代码-生产者-分区策略-默认分区策略轮询分区策略

文章目录

  • 1、代码演示
  • 1.1、pom.xml
  • 1.2、KafkaProducerPartitioningStrategy.java
    • 1.2.1、ProducerConfig.LINGER_MS_CONFIG取 0 值得情况,不轮询
    • 1.2.2、ProducerConfig.LINGER_MS_CONFIG取 0 值得情况,轮询
    • 1.2.3、ProducerConfig.LINGER_MS_CONFIG取 1000 值得情况,轮询
    • 1.2.4、ProducerConfig.LINGER_MS_CONFIG取 1000 值得情况,不轮询
  • 2、分区策略
    • 2.1、linger.ms参数的含义
    • 2.2、linger milliseconds
    • 2.3、linger.ms配置参数的理解

1、代码演示

1.1、pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu.kafka</groupId><artifactId>kafka-producer</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka-producer</name><description>kafka-producer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

1.2、KafkaProducerPartitioningStrategy.java

1.2.1、ProducerConfig.LINGER_MS_CONFIG取 0 值得情况,不轮询

不等待,不轮询,默认分区策略

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.UUID;
public class KafkaProducerPartitioningStrategy {public static void main(String[] args) {// 初始化Kafka生产者配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.74.148:9092"); // 指定Kafka broker的地址和端口// 确认消息写入策略,"acks"设置为all表示所有副本都确认消息接收后才会响应props.put("acks", "all");// 消息发送失败时的重试次数,设置为0表示不重试props.put("retries", 0);// 发送缓冲区等待时间,等待1秒后,发送//props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定键的序列化器props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定值的序列化器// 分区器选择RoundRobinPartitioner,实现消息在主题分区间的轮询分配//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner"  );// 创建Kafka生产者实例Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 1000; i++) {producer.send(new ProducerRecord<String, String>("my_topic3", "88"), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("recordMetadata.partition() = " + recordMetadata.partition());}}});}producer.close();}
}

在这里插入图片描述

1.2.2、ProducerConfig.LINGER_MS_CONFIG取 0 值得情况,轮询

不等待,立即发送,轮询策略

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.UUID;
public class KafkaProducerPartitioningStrategy {public static void main(String[] args) {// 初始化Kafka生产者配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.74.148:9092"); // 指定Kafka broker的地址和端口// 确认消息写入策略,"acks"设置为all表示所有副本都确认消息接收后才会响应props.put("acks", "all");// 消息发送失败时的重试次数,设置为0表示不重试props.put("retries", 0);// 发送缓冲区等待时间,设置为0表示不等待,立即发送props.put(ProducerConfig.LINGER_MS_CONFIG, 0);   props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定键的序列化器props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定值的序列化器// 分区器选择RoundRobinPartitioner,实现消息在主题分区间的轮询分配props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner"  );// 创建Kafka生产者实例Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 1000; i++) {producer.send(new ProducerRecord<String, String>("my_topic3", "88"), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("recordMetadata.partition() = " + recordMetadata.partition());}}});}producer.close();}
}

在这里插入图片描述

1.2.3、ProducerConfig.LINGER_MS_CONFIG取 1000 值得情况,轮询

等待1秒后发送,轮询策略

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.UUID;
public class KafkaProducerPartitioningStrategy {public static void main(String[] args) {// 初始化Kafka生产者配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.74.148:9092"); // 指定Kafka broker的地址和端口// 确认消息写入策略,"acks"设置为all表示所有副本都确认消息接收后才会响应props.put("acks", "all");// 消息发送失败时的重试次数,设置为0表示不重试props.put("retries", 0);// 发送缓冲区等待时间,等待1秒后,发送props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定键的序列化器props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定值的序列化器// 分区器选择RoundRobinPartitioner,实现消息在主题分区间的轮询分配props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner"  );// 创建Kafka生产者实例Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 1000; i++) {producer.send(new ProducerRecord<String, String>("my_topic3", "88"), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("recordMetadata.partition() = " + recordMetadata.partition());}}});}producer.close();}
}

在这里插入图片描述

1.2.4、ProducerConfig.LINGER_MS_CONFIG取 1000 值得情况,不轮询

等待1秒后发送,不轮询

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.UUID;
public class KafkaProducerPartitioningStrategy {public static void main(String[] args) {// 初始化Kafka生产者配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.74.148:9092"); // 指定Kafka broker的地址和端口// 确认消息写入策略,"acks"设置为all表示所有副本都确认消息接收后才会响应props.put("acks", "all");// 消息发送失败时的重试次数,设置为0表示不重试props.put("retries", 0);// 发送缓冲区等待时间,等待1秒后,发送props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定键的序列化器props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定值的序列化器// 分区器选择RoundRobinPartitioner,实现消息在主题分区间的轮询分配//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner"  );// 创建Kafka生产者实例Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 1000; i++) {producer.send(new ProducerRecord<String, String>("my_topic3", "88"), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("recordMetadata.partition() = " + recordMetadata.partition());}}});}producer.close();}
}

在这里插入图片描述

2、分区策略

kafka的生产者分区策略

  1. 默认分区策略:减少重新建立分区连接的性能损耗 开发使用最多的分区方式,采用黏性分区,默认向第一次连接上的主题分区发送消息,直到消息累积到 batch.size大小(16kb)
  2. 轮询分区策略:每个分区接收一次消息(linger.ms决定生产者一次批量发送多少条消息 到一个分区中),开发中一定不会用轮询分区策略,顶多自定义,因为轮询性能太差,频繁跟不同的分区建立连接,大数据会用轮询策略

2.1、linger.ms参数的含义

在Kafka的生产者(Producer)配置中,props.put("linger.ms", 1); 这行代码是用于设置生产者的linger.ms参数的。

linger.ms参数的含义是:生产者会在发送消息之前等待更多消息被发送到同一个分区(partition)的额外时间(以毫秒为单位)。这样做的目的是为了提高吞吐量,因为将多个消息批量发送到同一个分区可以减少网络传输的开销和服务器端的I/O开销。

具体来说,当你设置了linger.ms参数(比如设置为1毫秒),Kafka生产者会尝试在发送消息之前等待1毫秒,看看是否还有其他的消息要发送到同一个分区。如果有,这些消息将会被合并成一个批次(batch)一起发送。

注意,设置linger.ms参数可能会增加消息的延迟,因为生产者会等待指定的时间以合并更多的消息。所以,这个参数需要在吞吐量和延迟之间进行权衡。

这里是一个简化的示例,展示如何在使用Java Kafka生产者时设置linger.ms参数:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 设置linger.msprops.put("linger.ms", 1);KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息...producer.close();}
}

在这个示例中,我们创建了一个KafkaProducer对象,并设置了包括linger.ms在内的多个配置参数。然后,你可以使用这个生产者对象来发送消息到Kafka集群。

2.2、linger milliseconds

linger.ms 的英文全称就是 “linger milliseconds”,其中 “linger” 是指延迟或等待,“milliseconds” 是毫秒的意思。

在 Kafka 的 Producer 配置中,linger.ms 参数用于控制 Producer 在发送消息之前等待更多消息到达相同分区(partition)的时间,以便可以将这些消息一起发送,从而提高吞吐量。默认情况下,linger.ms 的值为 0,这意味着 Producer 收到消息后会立即发送,不进行任何延迟。

linger.ms 参数与 batch.size 参数一起使用时,可以实现更复杂的消息发送策略。batch.size 参数定义了单个批次(batch)中允许的最大消息字节数。当 Producer 收到消息时,它会尝试将消息添加到当前批次中。如果linger.ms 大于 0,并且当前批次中的消息数量尚未达到 batch.size 的限制,那么 Producer 会等待 linger.ms 指定的时间,看看是否还有更多的消息要发送到相同的分区。如果有,这些消息将被添加到当前批次中;如果没有,那么在当前时间到达后,Producer 将发送当前批次中的所有消息。

需要注意的是,linger.ms 参数的值应该根据具体的业务场景和性能需求进行调整。较小的值可以提高消息的实时性,但可能会降低吞吐量;较大的值可以提高吞吐量,但可能会增加消息的延迟。因此,在实际应用中需要根据实际情况进行权衡和选择。

2.3、linger.ms配置参数的理解

在Kafka中,linger.ms是一个配置参数,用于控制生产者(producer)在发送消息到broker之前的等待时间,以便将更多的消息累积到同一批次中,从而提高吞吐量。linger.ms的取值可以是任何非负整数,表示毫秒数。

以下是关于linger.ms的一些关键点:

  • 如果linger.ms设置为0,生产者会立即发送消息到broker,不会等待其他消息来累积到同一批次。
  • 如果linger.ms设置为大于0的值,生产者会等待该指定的毫秒数,或者直到达到batch.size(批次大小)的限制,然后将累积的消息作为一个批次发送到broker。
  • 增大linger.ms的值可能会提高吞吐量,因为可以累积更多的消息到同一批次中,减少网络传输的次数。但是,这也会增加消息的延迟。
  • linger.ms的取值可以根据具体的应用场景和需求进行调整。在需要低延迟的场景中,可以将linger.ms设置为较小的值;在可以容忍一定延迟的场景中,可以尝试增大linger.ms的值以提高吞吐量。

综上所述,linger.ms的取值并没有固定的几个选项,而是可以根据实际需求设置为任何非负整数。在配置Kafka生产者时,需要根据具体的业务场景和需求来选择合适的linger.ms值。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/19032.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

B/S架构+java语言+Mysqladr数 据 库ADR药物不良反应监测系统源码 ADR药物不良反应监测系统有哪些作用?

B/S架构&#xff0b;java语言&#xff0b;Mysqladr数 据 库ADR药物不良反应监测系统源码 ADR药物不良反应监测系统有哪些作用&#xff1f; 药物不良反应(ADR)是指在合格药物以正常用量和用法用于预防、诊断、治疗疾病或调节生理功能时所发生的意外的、与防治目的无关的、不利或…

AI Agent智能体概述及原理

AI Agent概述 AI Agent旨在理解、分析和响应人类输入&#xff0c;像人类一样执行任务、做出决策并与环境互动。它们可以是遵循预定义规则的简单系统&#xff0c;也可以是根据经验学习和适应的复杂、自主的实体&#xff1b;可以是基于软件的实体&#xff0c;也可以是物理实体。…

大模型“1元购”?AI公司加速奔向应用端“大航海时代”

自字节跳动发布豆包大模型&#xff0c;互联网大厂纷纷就位&#xff0c;击穿“地板价”的打法从C端向B端拓展。这也成为今年“618”最亮眼的价格战。 5月15日&#xff0c;字节跳动率先宣布豆包大模型已通过火山引擎开放给企业客户&#xff0c;大模型定价降至0.0008元/千Tokens&…

设计模式20——职责链模式

写文章的初心主要是用来帮助自己快速的回忆这个模式该怎么用&#xff0c;主要是下面的UML图可以起到大作用&#xff0c;在你学习过一遍以后可能会遗忘&#xff0c;忘记了不要紧&#xff0c;只要看一眼UML图就能想起来了。同时也请大家多多指教。 职责链模式&#xff08;Chain …

FreeRTOS【8】二值信号量使用

1.开发背景 FreeRTOS 提供了队列可以在线程间快速交换信息&#xff0c;那么还有没有其他交互渠道&#xff1f;答案是有的&#xff0c;相对于队列传递信息&#xff0c;还有更轻量级的线程唤醒操作&#xff0c;那就是信号量&#xff0c;而二值信号量就是最简单的一种。 二值信号量…

C++模拟实现stack和queue

1 stack 1.1概念 stl栈 1.2栈概念 1.3代码 2 queue 2.1概念 stl队列 2.2队列概念 2.3代码

网上打印资料A4纸一般多少钱一张

我们知道&#xff0c;在打印需求上A4纸&#xff08;210mmx297mm&#xff09;是较为常见的打印用纸&#xff0c;同时因为纸张的不同在价格上也存在一定的差异。当然&#xff0c;因在网上打印平台打印资料&#xff0c;能够降低一定的租金个人工成本。 因此&#xff0c;在网上打印…

Swift 中的Getter 和 Setter

目录 前言 1. 什么是Getter和Setter 1.定义 2.作用 2.属性 1.存储属性 2.计算属性 3.属性观察者 3. 使用 Getter 和 Setter 的场景 1.数据转换 2.懒加载 3.数据验证和限制 4.触发相关操作 4.自定义Getter 和 Setter 5. 参考资料 前言 属性是 Swift 编程中的基本…

关于 Spring 是什么

Spring 是什么 我们通常所说的 Spring 指的是 Spring Framework&#xff08;Spring 框架&#xff09;&#xff0c;它是⼀个开源框架&#xff0c;有着活跃⽽庞⼤的社区&#xff0c;这就是它之所以能⻓久不衰的原因。Spring ⽀持⼴泛的应⽤场景&#xff0c;它可以让 Java 企业级的…

九章云极DataCanvas公司DingoDB完成中国信通院权威多模数据库测试

2024年5月16日&#xff0c;九章云极DataCanvas公司自主研发和设计的开源多模向量数据库DingoDB顺利完成中国信息通信研究院&#xff08;以下简称中国信通院&#xff09;多模数据库产品测试。本次测试的成功标志着DingoDB在技术能力、性能表现和产品稳定性方面得到了权威机构的高…

python深入探索斐波那契数列:代码示例与不满足的外围条件

新书上架~&#x1f447;全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一、斐波那契数列的初步实现 二、外围条件的不满足情况 总结 一、斐波那契数列的初步实现 …

存储型XSS

前言 什么是存储型xss&#xff1a; 提交恶意xss数据&#xff0c;存入数据库中&#xff0c;访问时触发。 存储型xss和反射型xss区别&#xff1a; 存储型存入数据库中&#xff0c;可持续时间长&#xff0c;而反射型持续时间短&#xff0c;仅对本次访问有影响&#xff0c;反射型一…

中华活页文选高中版投稿发表

《中华活页文选&#xff08;高中版&#xff09;》创刊于1960年&#xff0c;是中宣部所属中国出版传媒股份有限公司主管、中华书局主办的国家级基础教育期刊&#xff0c;曾获得“中国期刊方阵双效期刊”、国家新闻出版广电总局推荐的“百种优秀报刊”等荣誉称号。本刊以高中学科…

Day 6:2981. 找出出现至少三次的最长特殊子字符串 I

Leetcode 2981. 找出出现至少三次的最长特殊子字符串 I 给你一个仅由小写英文字母组成的字符串 s 。 如果一个字符串仅由单一字符组成&#xff0c;那么它被称为 特殊 字符串。例如&#xff0c;字符串 “abc” 不是特殊字符串&#xff0c;而字符串 “ddd”、“zz” 和 “f” 是特…

性能猛兽:OrangePi Kunpeng Pro评测!

1.引言 随着物联网和嵌入式系统的不断发展&#xff0c;对于性能强大、资源消耗低的单板计算机的需求也日益增加。在这个快节奏的技术时代&#xff0c;单板计算机已成为各种应用场景中不可或缺的组成部分&#xff0c;从家庭娱乐到工业自动化&#xff0c;再到科学研究&#xff0…

差分曼彻斯特编码详解

这是一种双向码&#xff0c;和曼彻斯特编码不同的是&#xff0c;这种码元中间的电平转换边只作为定时信号&#xff0c;不表示数据。数据的表示在于每一位开始处是否有电平转换&#xff1a;有电平转换则表示0&#xff0c;无则表示1。然后这就出现一个问题&#xff0c;很多小伙伴…

App Inventor 2 低功耗蓝牙BLE 两种通信方式:扫描和广播

低功耗蓝牙&#xff0c;也称为蓝牙 LE 或简称 BLE&#xff0c;是一种类似于经典蓝牙的新通信协议&#xff0c;不同之处在于它旨在消耗更少的功耗&#xff0c;同时保持可比的功能。 因此&#xff0c;低功耗蓝牙是与耗电资源有限的物联网设备进行通信的首选。BluetoothLE 扩展需…

DiffBIR论文阅读笔记

这篇是董超老师通讯作者的一篇盲图像修复的论文&#xff0c;目前好像没看到发表在哪个会议期刊&#xff0c;应该是还在投&#xff0c;这个是arxiv版本&#xff0c;代码倒是开源了。本文所指的BIR并不是一个single模型对任何未知图像degradation都能处理&#xff0c;而是用同一个…

数据结构(十)图

文章目录 图的简介图的定义图的结构图的分类无向图有向图带权图&#xff08;Wighted Graph&#xff09; 图的存储邻接矩阵&#xff08;Adjacency Matrix&#xff09;邻接表代码实现 图的遍历深度优先搜索&#xff08;DFS&#xff0c;Depth Fisrt Search&#xff09;遍历抖索过程…

【搜索方法推荐】高效信息检索方法和实用网站推荐

博主未授权任何人或组织机构转载博主任何原创文章&#xff0c;感谢各位对原创的支持&#xff01; 博主链接 本人就职于国际知名终端厂商&#xff0c;负责modem芯片研发。 在5G早期负责终端数据业务层、核心网相关的开发工作&#xff0c;目前牵头6G算力网络技术标准研究。 博客…