kafka发送消息-生产者发送消息的分区策略(消息发送到哪个分区中?是什么策略)

生产者发送消息的分区策略(消息发送到哪个分区中?是什么策略)

  • 1、默认策略,程序自动计算并指定分区
    • 1.1、指定key,不指定分区
    • 1.2、不指定key,不指定分区
  • 2、轮询分配策略RoundRobinPartitioner
    • 2.1、创建配置类
    • 2.2、application.yml文件
    • 2.3、生产者
    • 2.4、测试类
    • 2.5、执行结果
  • 3、自定义分区分配策略
    • 3.1、创建自定义分配策略类
    • 3.2、修改kafka配置类
    • 3.3、application.yml文件
    • 3.4、生产者
    • 3.5、测试类
    • 3.6、测试结果
    • 3.7、总结

在这里插入图片描述

1、默认策略,程序自动计算并指定分区

1.1、指定key,不指定分区

生产者:在编写代码发送消息时我们先不指定分区,即分区设为null,看看程序最终会把消息发送到哪个分区。

package com.power.producer;import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate2;public void send9(){User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();//分区是null,让kafka自己去决定把消息发送到哪个分区kafkaTemplate2.send("heTopic",null,System.currentTimeMillis(),"k9",user);}
}

测试类:

package com.power;import com.power.model.User;
import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;
import java.util.Date;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid send9(){eventProducer.send9();}
}

程序最终是通过以下代码进行目标分区计算的:

Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

通过调试发现,程序是通过以下代码进行目标分区计算的:
程序自动读取生产者发送消息时的key(本次发送时值为“key9”),将key生成一个32位的HASH值,将该HASH值与默认分区数(这个topic中有9个分区)取余数(余数结果一定在0-8之间),进而计算得出消息默认发送到的分区值

在这里插入图片描述

在这里插入图片描述

1.2、不指定key,不指定分区

生产者:
在这里插入图片描述

测试类:

在这里插入图片描述
此时时通过随机数与默认分区取余数计算默认分区的

使用随机数 % numPartitions

2、轮询分配策略RoundRobinPartitioner

通过查看kafka源码发现,分区接口有一个轮询分配策略相关实现类。
在这里插入图片描述

在application.yml配置文件中生产者配置项,我发现并生产者并没有相关轮询分配策略的配置,那么该如何试下轮询指定分区的配置呢?
在这里插入图片描述

需要编写代码试下轮询指定分区策略:

在这里插入图片描述

2.1、创建配置类

package com.power.config;import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.key-serializer}")private String keySerializer;@Value("${spring.kafka.producer.value-serializer}")private String valueSerializer;public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);return props;}public ProducerFactory<String, ?> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, ?> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}//第二次创建@Beanpublic NewTopic newTopic9() {return new NewTopic("heTopic", 9, (short) 1);}
}

2.2、application.yml文件

spring:application:#应用名称name: spring-boot-01-kafka-base#kafka连接地址(ip+port)kafka:bootstrap-servers: <你的kafka服务器IP>:9092#配置生产者(24个配置)producer:#key默认是StringSerializer序列化key-serializer: org.apache.kafka.common.serialization.StringSerializer#value默认是ToStringSerializer序列化value-serializer: org.springframework.kafka.support.serializer.ToStringSerializer#配置消费者(24个配置)consumer:auto-offset-reset: earliesttemplate:default-topic: default-topic

在这里插入图片描述

2.3、生产者

package com.power.producer;import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate2;public void send10(){User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();//分区是null,让kafka自己去决定把消息发送到哪个分区kafkaTemplate2.send("heTopic",user);}
}

在这里插入图片描述

2.4、测试类

在这里插入图片描述

package com.power;import com.power.model.User;
import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;
import java.util.Date;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid send10(){for (int i = 0; i <5 ; i++) {eventProducer.send10();}}}

2.5、执行结果

执行完测试类,发现5次请求分别发送到了kafka的heTopic主题的5个不同分区中:
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3、自定义分区分配策略

在这里插入图片描述

3.1、创建自定义分配策略类

package com.power.config;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;public class CustomerPartitioner implements Partitioner {private AtomicInteger nextPartition = new AtomicInteger(0);@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if(key==null){//使用轮询方式选择分区int next = nextPartition.getAndIncrement();if(next>=numPartitions){nextPartition.compareAndSet(next,0);}if(next>0){next--;}System.out.println("分区值:"+next);return next;}else {//如果key不为inull,则使用默认的分区策略return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

3.2、修改kafka配置类

指定使用自定义的分区分配类
在这里插入图片描述

3.3、application.yml文件

spring:application:#应用名称name: spring-boot-01-kafka-base#kafka连接地址(ip+port)kafka:bootstrap-servers: <你的kafka服务器IP>:9092#配置生产者(24个配置)producer:#key默认是StringSerializer序列化key-serializer: org.apache.kafka.common.serialization.StringSerializer#value默认是ToStringSerializer序列化value-serializer: org.springframework.kafka.support.serializer.ToStringSerializer#配置消费者(24个配置)consumer:auto-offset-reset: earliesttemplate:default-topic: default-topic

在这里插入图片描述

3.4、生产者

package com.power.producer;import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate2;public void send10(){User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();//分区是null,让kafka自己去决定把消息发送到哪个分区kafkaTemplate2.send("heTopic",user);}
}

在这里插入图片描述

3.5、测试类

在这里插入图片描述

package com.power;import com.power.model.User;
import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;
import java.util.Date;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid send10(){for (int i = 0; i <5 ; i++) {eventProducer.send10();}}}

3.6、测试结果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.7、总结

使用自定义分区策略类尝试发送消息,发现发送的5次消息,并没有连续发送到5个挨着的分区中,查看kafka源码的org.apache.kafka.clients.producer.KafkaProducer类的doSend方法发现,每一次发送前,调用了两次计算分区的方法,导致第一个得到的分区并不会正在的发送消息。

doSend方法;

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {TopicPartition tp = null;try {throwIfProducerClosed();// first make sure the metadata for the topic is availablelong nowMs = time.milliseconds();ClusterAndWaitTime clusterAndWaitTime;try {clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);} catch (KafkaException e) {if (metadata.isClosed())throw new KafkaException("Producer closed while send in progress", e);throw e;}nowMs += clusterAndWaitTime.waitedOnMetadataMs;long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);Cluster cluster = clusterAndWaitTime.cluster;byte[] serializedKey;try {serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException cce) {throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +" specified in key.serializer", cce);}byte[] serializedValue;try {serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());} catch (ClassCastException cce) {throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +" specified in value.serializer", cce);}int partition = partition(record, serializedKey, serializedValue, cluster);tp = new TopicPartition(record.topic(), partition);setReadOnly(record.headers());Header[] headers = record.headers().toArray();int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),compressionType, serializedKey, serializedValue, headers);ensureValidRecordSize(serializedSize);long timestamp = record.timestamp() == null ? nowMs : record.timestamp();if (log.isTraceEnabled()) {log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);}// producer callback will make sure to call both 'callback' and interceptor callbackCallback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);if (transactionManager != null && transactionManager.isTransactional()) {transactionManager.failIfNotReadyForSend();}RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);if (result.abortForNewBatch) {int prevPartition = partition;partitioner.onNewBatch(record.topic(), cluster, prevPartition);partition = partition(record, serializedKey, serializedValue, cluster);tp = new TopicPartition(record.topic(), partition);if (log.isTraceEnabled()) {log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);}// producer callback will make sure to call both 'callback' and interceptor callbackinterceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);}if (transactionManager != null && transactionManager.isTransactional())transactionManager.maybeAddPartitionToTransaction(tp);if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);this.sender.wakeup();}return result.future;// handling exceptions and record the errors;// for API exceptions return them in the future,// for other exceptions throw directly} catch (ApiException e) {log.debug("Exception occurred during message send:", e);if (callback != null)callback.onCompletion(null, e);this.errors.record();this.interceptors.onSendError(record, tp, e);return new FutureFailure(e);} catch (InterruptedException e) {this.errors.record();this.interceptors.onSendError(record, tp, e);throw new InterruptException(e);} catch (KafkaException e) {this.errors.record();this.interceptors.onSendError(record, tp, e);throw e;} catch (Exception e) {// we notify interceptor about all exceptions, since onSend is called before anything else in this methodthis.interceptors.onSendError(record, tp, e);throw e;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/52620.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux网络:网络基础

Linux网络&#xff1a;网络基础 一、网络诞生背景及产生的诸多问题1. 1 网络诞生背景1.2 网络诞生面临的困境 二、网络协议栈&#xff08;OSI七层模型、CP/IP五层模型&#xff09;2.1 TCP/IP五层(或四层)模型 三、网络和系统关系四、网络传输流程4.1 同一个局域网中的两台主机进…

折腾 Quickwit,Rust 编写的分布式搜索引擎-官方教程

快速上手 在本快速入门指南中&#xff0c;我们将安装 Quickwit&#xff0c;创建一个索引&#xff0c;添加文档&#xff0c;最后执行搜索查询。本指南中使用的所有 Quickwit 命令都在 CLI 参考文档 中进行了记录。 https://quickwit.io/docs/main-branch/reference/cli 使用 Qui…

如何在Ubuntu 16.04上更新Firefox版本

如何在Ubuntu 16.04上更新Firefox版本 在Ubuntu 16.04上更新Firefox版本有多种方法&#xff0c;每种方法都有其优点。下面我们将介绍几种常见的方法&#xff0c;帮助您确保浏览器保持最新状态。 1. 使用官方PPA&#xff08;个人包档案&#xff09; 官方PPA提供了最新版本的F…

ubuntu22.04安装redis

更新包管理器的索引&#xff1a; sudo apt update安装Redis&#xff1a; sudo apt install redis-server确认Redis已经安装并且正在运行&#xff1a; sudo systemctl status redis-server

flutter 中 ssl 双向证书校验

SSL 证书&#xff1a; 在处理 https 请求的时候&#xff0c;通常可以使用 中间人攻击的方式 获取 https 请求以及响应参数。应为通常我们是 SSL 单向认证&#xff0c;服务器并没有验证我们的客户端的证书。为了防止这种中间人攻击的情况。我么可以通过 ssl 双向认证的方式。即…

用Python实现时间序列模型实战——Day1:时间序列的基本概念

一、学习内容 1. 时间序列数据的定义与特点 定义&#xff1a; 时间序列数据是一组按时间顺序排列的观测值。时间序列的每个观测值都与特定时间点相关联。例如&#xff0c;气温每天的记录、股票每日的收盘价等。 特点&#xff1a; 时间依赖性&#xff1a;时间序列数据的一个…

Eureka的生命周期管理:服务注册、续约与下线的完整流程解析

Eureka的生命周期管理&#xff1a;服务注册、续约与下线的完整流程解析 引言 在分布式系统中&#xff0c;服务发现是微服务架构的核心问题之一。Eureka是Netflix开源的一个服务发现框架&#xff0c;它能够有效地管理微服务的生命周期&#xff0c;包括服务注册、续约和下线。这…

8.27-dockerfile的应用+私有仓库的创建

一、dockerfile应用 通过dockerfile创建⼀个在启动容器时&#xff0c;就可以启动httpd服务的镜像 1.步骤 : 1.创建⼀个⽬录&#xff0c;⽤于存储Docker file所使⽤的⽂件2.在此⽬录中创建Docker file⽂件&#xff0c;以及镜像制作所使⽤的⽂件3.使⽤docker build创建镜像4.使…

基于x86 平台opencv的图像采集和seetaface6的图像质量评估功能

目录 一、概述二、环境要求2.1 硬件环境2.2 软件环境三、开发流程3.1 编写测试3.2 配置资源文件3.3 验证功能一、概述 本文档是针对x86 平台opencv的图像采集和seetaface6的图像质量评估功能,opencv通过摄像头采集视频图像,将采集的视频图像送给seetaface6的图像质量评估模块…

全新的大语言模型Grok-2,最新测评!!

埃隆马斯克再次引发轰动&#xff0c;他旗下的xAI公司推出了全新的大语言模型Grok-2&#xff01; 最新的Grok-2测试版已经发布&#xff0c;用户可以在&#x1d54f;平台上体验小版本的Grok-2 mini。 马斯克还通过一种谜语般的方式揭开了困扰大模型社区一个多月的谜团&#xff1a…

Java笔试面试题AI答之面向对象(2)

文章目录 7. Java中的组合、聚合和关联有什么区别&#xff1f;1. 关联&#xff08;Association&#xff09;2. 聚合&#xff08;Aggregation&#xff09;3. 组合&#xff08;Composition&#xff09;总结 8. 请设计一个符合开闭原则的设计模式的例子&#xff1f;策略模式示例1.…

每日刷力扣SQL(九)

1484.按日期分组销售产品 转载 首先&#xff0c;根据题目的描述以及给出的示例。我们能得到结果集中各个字段的含义如下&#xff1a; ① sell_date&#xff1a;卖出产品的日期&#xff08;应该不用解释了&#xff09; ② num_sold&#xff1a;当前这个日期下&…

工业软件架构2:(QT和C++实现)

工业软件架构 - 事件驱动 - 2 1. 命令模式的使用命令模式(Command Pattern)命令模式的基本概念命令模式的运作机制1. 定义命令接口2. 实现具体命令3. 调用者类4.扩展命令模式的功能撤销命令:宏命令:总结2. MVVM 模式的使用View(界面)部分则通过绑定与 ViewModel 交互:3.…

FFmpeg的入门实践系列三(基础知识)

欢迎诸位来阅读在下的博文~ 在这里&#xff0c;在下会不定期发表一些浅薄的知识和经验&#xff0c;望诸位能与在下多多交流&#xff0c;共同努力 文章目录 前期博客一、音视频常用术语二、FFmpeg库的结构介绍三、FFmpeg的常用函数初始化封装格式编解码器相关 四、FFmpeg常用的数…

FastCGI简述

FastCGI (FCGI) 是一种协议&#xff0c;用于改善 Web 服务器和应用程序之间的通信效率。它是在 CGI&#xff08;Common Gateway Interface&#xff09;的基础上发展起来的&#xff0c;旨在解决 CGI 在处理大量并发请求时存在的性能问题。 CGI的由来 最早的Web服务器只能简单地…

WHAT - Jamstack(JavaScript, APIs, and Markup 与 Serverless 无服务器架构)

目录 一、介绍1. JavaScript2. APIs3. Markup4. Jamstack 的优势5. Jamstack 架构的工作流程 二、常见的 Jamstack 技术栈三、无服务器架构&#xff08;Serverless&#xff09;无服务器架构的核心概念1. 函数即服务&#xff08;FaaS&#xff09;2. 后端即服务&#xff08;BaaS&…

解决 JS WebSocket 心跳检测 重连

解决 JS WebSocket 心跳检测 重连 文章目录 解决 JS WebSocket 心跳检测 重连一、WebSocket 心跳检测的作用二、心跳检测的处理方案1. 创建 WebSocket 连接2. 心跳参数设置3. 心跳检测逻辑4. 心跳包响应处理5. 断线重连机制 三、总结 一、WebSocket 心跳检测的作用 WebSocket 是…

序列化组件对比

1、msgpack介绍 1.MsgPack产生的数据更小&#xff0c;从而在数据传输过程中网络压力更小 2.MsgPack兼容性差&#xff0c;必须按照顺序保存字段 3.MsgPack是二进制序列化格式&#xff0c;兼容跨语言 官网地址&#xff1a; https://msgpack.org/ 官方介绍&#xff1a;Its lik…

C++ 移动构造函数为什么设置noexcept?

答案显然是&#xff1a; 移动构造函数设置了noexcept后STL的容器可以显著提高性能。 For example: class MyClass { public:MyClass(int v) { val v; }MyClass(const MyClass& o) {val o.val;std::cout << "Copy constructor " << val << …

Go 语言协程管理精解

1.基础 协程切换需要操作寄存器&#xff0c;这些操作需要通过汇编辅助实现。另外&#xff0c;每一个协程都有一个协程栈&#xff0c;实际上协程栈也是有结构的。汇编程序和栈结构这些概念可能大部分开发者都不太了解&#xff0c;在介绍协程管理之间&#xff0c;先简要介绍。 1…