kafka发送消息-生产者发送消息的分区策略(消息发送到哪个分区中?是什么策略)

生产者发送消息的分区策略(消息发送到哪个分区中?是什么策略)

  • 1、默认策略,程序自动计算并指定分区
    • 1.1、指定key,不指定分区
    • 1.2、不指定key,不指定分区
  • 2、轮询分配策略RoundRobinPartitioner
    • 2.1、创建配置类
    • 2.2、application.yml文件
    • 2.3、生产者
    • 2.4、测试类
    • 2.5、执行结果
  • 3、自定义分区分配策略
    • 3.1、创建自定义分配策略类
    • 3.2、修改kafka配置类
    • 3.3、application.yml文件
    • 3.4、生产者
    • 3.5、测试类
    • 3.6、测试结果
    • 3.7、总结

在这里插入图片描述

1、默认策略,程序自动计算并指定分区

1.1、指定key,不指定分区

生产者:在编写代码发送消息时我们先不指定分区,即分区设为null,看看程序最终会把消息发送到哪个分区。

package com.power.producer;import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate2;public void send9(){User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();//分区是null,让kafka自己去决定把消息发送到哪个分区kafkaTemplate2.send("heTopic",null,System.currentTimeMillis(),"k9",user);}
}

测试类:

package com.power;import com.power.model.User;
import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;
import java.util.Date;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid send9(){eventProducer.send9();}
}

程序最终是通过以下代码进行目标分区计算的:

Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

通过调试发现,程序是通过以下代码进行目标分区计算的:
程序自动读取生产者发送消息时的key(本次发送时值为“key9”),将key生成一个32位的HASH值,将该HASH值与默认分区数(这个topic中有9个分区)取余数(余数结果一定在0-8之间),进而计算得出消息默认发送到的分区值

在这里插入图片描述

在这里插入图片描述

1.2、不指定key,不指定分区

生产者:
在这里插入图片描述

测试类:

在这里插入图片描述
此时时通过随机数与默认分区取余数计算默认分区的

使用随机数 % numPartitions

2、轮询分配策略RoundRobinPartitioner

通过查看kafka源码发现,分区接口有一个轮询分配策略相关实现类。
在这里插入图片描述

在application.yml配置文件中生产者配置项,我发现并生产者并没有相关轮询分配策略的配置,那么该如何试下轮询指定分区的配置呢?
在这里插入图片描述

需要编写代码试下轮询指定分区策略:

在这里插入图片描述

2.1、创建配置类

package com.power.config;import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.key-serializer}")private String keySerializer;@Value("${spring.kafka.producer.value-serializer}")private String valueSerializer;public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);return props;}public ProducerFactory<String, ?> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, ?> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}//第二次创建@Beanpublic NewTopic newTopic9() {return new NewTopic("heTopic", 9, (short) 1);}
}

2.2、application.yml文件

spring:application:#应用名称name: spring-boot-01-kafka-base#kafka连接地址(ip+port)kafka:bootstrap-servers: <你的kafka服务器IP>:9092#配置生产者(24个配置)producer:#key默认是StringSerializer序列化key-serializer: org.apache.kafka.common.serialization.StringSerializer#value默认是ToStringSerializer序列化value-serializer: org.springframework.kafka.support.serializer.ToStringSerializer#配置消费者(24个配置)consumer:auto-offset-reset: earliesttemplate:default-topic: default-topic

在这里插入图片描述

2.3、生产者

package com.power.producer;import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate2;public void send10(){User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();//分区是null,让kafka自己去决定把消息发送到哪个分区kafkaTemplate2.send("heTopic",user);}
}

在这里插入图片描述

2.4、测试类

在这里插入图片描述

package com.power;import com.power.model.User;
import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;
import java.util.Date;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid send10(){for (int i = 0; i <5 ; i++) {eventProducer.send10();}}}

2.5、执行结果

执行完测试类,发现5次请求分别发送到了kafka的heTopic主题的5个不同分区中:
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3、自定义分区分配策略

在这里插入图片描述

3.1、创建自定义分配策略类

package com.power.config;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;public class CustomerPartitioner implements Partitioner {private AtomicInteger nextPartition = new AtomicInteger(0);@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if(key==null){//使用轮询方式选择分区int next = nextPartition.getAndIncrement();if(next>=numPartitions){nextPartition.compareAndSet(next,0);}if(next>0){next--;}System.out.println("分区值:"+next);return next;}else {//如果key不为inull,则使用默认的分区策略return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

3.2、修改kafka配置类

指定使用自定义的分区分配类
在这里插入图片描述

3.3、application.yml文件

spring:application:#应用名称name: spring-boot-01-kafka-base#kafka连接地址(ip+port)kafka:bootstrap-servers: <你的kafka服务器IP>:9092#配置生产者(24个配置)producer:#key默认是StringSerializer序列化key-serializer: org.apache.kafka.common.serialization.StringSerializer#value默认是ToStringSerializer序列化value-serializer: org.springframework.kafka.support.serializer.ToStringSerializer#配置消费者(24个配置)consumer:auto-offset-reset: earliesttemplate:default-topic: default-topic

在这里插入图片描述

3.4、生产者

package com.power.producer;import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate2;public void send10(){User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();//分区是null,让kafka自己去决定把消息发送到哪个分区kafkaTemplate2.send("heTopic",user);}
}

在这里插入图片描述

3.5、测试类

在这里插入图片描述

package com.power;import com.power.model.User;
import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;
import java.util.Date;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid send10(){for (int i = 0; i <5 ; i++) {eventProducer.send10();}}}

3.6、测试结果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.7、总结

使用自定义分区策略类尝试发送消息,发现发送的5次消息,并没有连续发送到5个挨着的分区中,查看kafka源码的org.apache.kafka.clients.producer.KafkaProducer类的doSend方法发现,每一次发送前,调用了两次计算分区的方法,导致第一个得到的分区并不会正在的发送消息。

doSend方法;

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {TopicPartition tp = null;try {throwIfProducerClosed();// first make sure the metadata for the topic is availablelong nowMs = time.milliseconds();ClusterAndWaitTime clusterAndWaitTime;try {clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);} catch (KafkaException e) {if (metadata.isClosed())throw new KafkaException("Producer closed while send in progress", e);throw e;}nowMs += clusterAndWaitTime.waitedOnMetadataMs;long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);Cluster cluster = clusterAndWaitTime.cluster;byte[] serializedKey;try {serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException cce) {throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +" specified in key.serializer", cce);}byte[] serializedValue;try {serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());} catch (ClassCastException cce) {throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +" specified in value.serializer", cce);}int partition = partition(record, serializedKey, serializedValue, cluster);tp = new TopicPartition(record.topic(), partition);setReadOnly(record.headers());Header[] headers = record.headers().toArray();int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),compressionType, serializedKey, serializedValue, headers);ensureValidRecordSize(serializedSize);long timestamp = record.timestamp() == null ? nowMs : record.timestamp();if (log.isTraceEnabled()) {log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);}// producer callback will make sure to call both 'callback' and interceptor callbackCallback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);if (transactionManager != null && transactionManager.isTransactional()) {transactionManager.failIfNotReadyForSend();}RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);if (result.abortForNewBatch) {int prevPartition = partition;partitioner.onNewBatch(record.topic(), cluster, prevPartition);partition = partition(record, serializedKey, serializedValue, cluster);tp = new TopicPartition(record.topic(), partition);if (log.isTraceEnabled()) {log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);}// producer callback will make sure to call both 'callback' and interceptor callbackinterceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);}if (transactionManager != null && transactionManager.isTransactional())transactionManager.maybeAddPartitionToTransaction(tp);if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);this.sender.wakeup();}return result.future;// handling exceptions and record the errors;// for API exceptions return them in the future,// for other exceptions throw directly} catch (ApiException e) {log.debug("Exception occurred during message send:", e);if (callback != null)callback.onCompletion(null, e);this.errors.record();this.interceptors.onSendError(record, tp, e);return new FutureFailure(e);} catch (InterruptedException e) {this.errors.record();this.interceptors.onSendError(record, tp, e);throw new InterruptException(e);} catch (KafkaException e) {this.errors.record();this.interceptors.onSendError(record, tp, e);throw e;} catch (Exception e) {// we notify interceptor about all exceptions, since onSend is called before anything else in this methodthis.interceptors.onSendError(record, tp, e);throw e;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/52620.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux网络:网络基础

Linux网络&#xff1a;网络基础 一、网络诞生背景及产生的诸多问题1. 1 网络诞生背景1.2 网络诞生面临的困境 二、网络协议栈&#xff08;OSI七层模型、CP/IP五层模型&#xff09;2.1 TCP/IP五层(或四层)模型 三、网络和系统关系四、网络传输流程4.1 同一个局域网中的两台主机进…

折腾 Quickwit,Rust 编写的分布式搜索引擎-官方教程

快速上手 在本快速入门指南中&#xff0c;我们将安装 Quickwit&#xff0c;创建一个索引&#xff0c;添加文档&#xff0c;最后执行搜索查询。本指南中使用的所有 Quickwit 命令都在 CLI 参考文档 中进行了记录。 https://quickwit.io/docs/main-branch/reference/cli 使用 Qui…

flutter 中 ssl 双向证书校验

SSL 证书&#xff1a; 在处理 https 请求的时候&#xff0c;通常可以使用 中间人攻击的方式 获取 https 请求以及响应参数。应为通常我们是 SSL 单向认证&#xff0c;服务器并没有验证我们的客户端的证书。为了防止这种中间人攻击的情况。我么可以通过 ssl 双向认证的方式。即…

用Python实现时间序列模型实战——Day1:时间序列的基本概念

一、学习内容 1. 时间序列数据的定义与特点 定义&#xff1a; 时间序列数据是一组按时间顺序排列的观测值。时间序列的每个观测值都与特定时间点相关联。例如&#xff0c;气温每天的记录、股票每日的收盘价等。 特点&#xff1a; 时间依赖性&#xff1a;时间序列数据的一个…

8.27-dockerfile的应用+私有仓库的创建

一、dockerfile应用 通过dockerfile创建⼀个在启动容器时&#xff0c;就可以启动httpd服务的镜像 1.步骤 : 1.创建⼀个⽬录&#xff0c;⽤于存储Docker file所使⽤的⽂件2.在此⽬录中创建Docker file⽂件&#xff0c;以及镜像制作所使⽤的⽂件3.使⽤docker build创建镜像4.使…

基于x86 平台opencv的图像采集和seetaface6的图像质量评估功能

目录 一、概述二、环境要求2.1 硬件环境2.2 软件环境三、开发流程3.1 编写测试3.2 配置资源文件3.3 验证功能一、概述 本文档是针对x86 平台opencv的图像采集和seetaface6的图像质量评估功能,opencv通过摄像头采集视频图像,将采集的视频图像送给seetaface6的图像质量评估模块…

全新的大语言模型Grok-2,最新测评!!

埃隆马斯克再次引发轰动&#xff0c;他旗下的xAI公司推出了全新的大语言模型Grok-2&#xff01; 最新的Grok-2测试版已经发布&#xff0c;用户可以在&#x1d54f;平台上体验小版本的Grok-2 mini。 马斯克还通过一种谜语般的方式揭开了困扰大模型社区一个多月的谜团&#xff1a…

FFmpeg的入门实践系列三(基础知识)

欢迎诸位来阅读在下的博文~ 在这里&#xff0c;在下会不定期发表一些浅薄的知识和经验&#xff0c;望诸位能与在下多多交流&#xff0c;共同努力 文章目录 前期博客一、音视频常用术语二、FFmpeg库的结构介绍三、FFmpeg的常用函数初始化封装格式编解码器相关 四、FFmpeg常用的数…

FastCGI简述

FastCGI (FCGI) 是一种协议&#xff0c;用于改善 Web 服务器和应用程序之间的通信效率。它是在 CGI&#xff08;Common Gateway Interface&#xff09;的基础上发展起来的&#xff0c;旨在解决 CGI 在处理大量并发请求时存在的性能问题。 CGI的由来 最早的Web服务器只能简单地…

解决 JS WebSocket 心跳检测 重连

解决 JS WebSocket 心跳检测 重连 文章目录 解决 JS WebSocket 心跳检测 重连一、WebSocket 心跳检测的作用二、心跳检测的处理方案1. 创建 WebSocket 连接2. 心跳参数设置3. 心跳检测逻辑4. 心跳包响应处理5. 断线重连机制 三、总结 一、WebSocket 心跳检测的作用 WebSocket 是…

序列化组件对比

1、msgpack介绍 1.MsgPack产生的数据更小&#xff0c;从而在数据传输过程中网络压力更小 2.MsgPack兼容性差&#xff0c;必须按照顺序保存字段 3.MsgPack是二进制序列化格式&#xff0c;兼容跨语言 官网地址&#xff1a; https://msgpack.org/ 官方介绍&#xff1a;Its lik…

一、undo log、Buffer Pool、WAL、redo log

目录 1、undo log2、Buffer Pool3、WAL4、redo log5、总结6、问题 1、undo log undo log日志是一种用于撤销回退的逻辑日志&#xff0c;在事务未提交前会记录相反的操作到undo log&#xff0c;当事务回滚&#xff0c;使用undo log 进行回滚&#xff0c;保证了事务的原子性。MV…

C++ TinyWebServer项目总结(8. 高性能服务器程序框架)

《Linux 高性能服务器编程》一书中&#xff0c;把这一章节作为全书的核心&#xff0c;同时作为后续章节的总览。这也意味着我们在经历了前置知识的学习后&#xff0c;正式进入了 Web 服务器项目的核心部分&#xff01; 前置内容回顾&#xff1a; 1. C TinyWebServer项目总结&…

等保测评中的安全测试方法

等保测评&#xff0c;即信息安全等级保护测评&#xff0c;是我国网络安全领域的重要评估机制&#xff0c;用于验证网络系统或应用是否满足相应的安全保护等级要求。在等保测评中&#xff0c;安全测试方法扮演着至关重要的角色。本文将详细介绍等保测评中常用的安全测试方法及其…

LinkedHashMap和TreeMap的基本使用

一.LinkedHashMap集合&#xff1a;&#xff08;是HashMap集合的儿子&#xff0c;Map集合的孙子&#xff09; 1.特点&#xff1a; 2.代码实现&#xff1a; 1)键的唯一性&#xff1a; package com.itheima.a01myMap; ​ import java.util.LinkedHashMap; ​ public class A07_…

YOLOv8目标检测部署RK3588全过程,附代码pt->onnx->rknn,附【详细代码】

目录 一、训练yolov8模型&#xff0c;得到最佳权重文件best.pt 二、pt转onnx,即best.pt->best11.onnx 1、对下载的YOLOv8代码修改 2、加入模型导出功能&#xff0c; 3、导出指令采用如下代码 三、ONNX转RKNN 四、RK3588部署 1、拷贝rknn文件到rk3588板子内 2、执行…

48.x86游戏实战-封包抓取进图call

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 本次游戏没法给 内容参考于&#xff1a;微尘网络安全 工具下载&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1rEEJnt85npn7N38Ai0_F2Q?pwd6tw3 提…

递归神经网络 (RNN) 简介

文章目录 一、介绍二、什么是递归神经网络 &#xff08;RNN&#xff09;&#xff1f;三、展开递归神经网络四、训练递归神经网络五、RNN 的类型六、现实生活中的 RNN 用例七、RNN 的两个主要限制八、RNN的变体8.1 双向递归神经网络 &#xff08;BRNN&#xff09;8.2 长短期记忆…

YOLOv8改进 | 融合改进 | C2f融合Faster-GELU模块提升检测速度【完整代码 + 主要代码解析】

秋招面试专栏推荐 &#xff1a;深度学习算法工程师面试问题总结【百面算法工程师】——点击即可跳转 &#x1f4a1;&#x1f4a1;&#x1f4a1;本专栏所有程序均经过测试&#xff0c;可成功执行&#x1f4a1;&#x1f4a1;&#x1f4a1; 专栏目录 &#xff1a;《YOLOv8改进有效…

基于信号量和环形队列的生产者消费者模型

文章目录 POSIX信号量信号量接口初始化信号量销毁信号量等待信号量发布信号量 基于环形队列的生产者消费者模型单生产单消费多生产多消费 POSIX信号量 POSIX信号量和SystemV信号量作用相同&#xff0c;都是用于同步操作&#xff0c;达到无冲突的访问共享资源目的。 但POSIX可以…