【Kafka】Kafka Producer 分区-05

【Kafka】Kafka Producer 分区-05

  • 1. 分区的好处
  • 2. 分区策略
    • 2.1 默认的分区器 DefaultPartitioner
  • 3. 自定义分区器

1. 分区的好处

(1)便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
(2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

在这里插入图片描述

see 2.4.1http://t.csdnimg.cn/m1O9u

2. 分区策略

2.1 默认的分区器 DefaultPartitioner

/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a 
partition based on a hash of the key
* <li>If no partition or key is present choose the sticky 
partition that changes when the batch is full.
* 
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {… …
}

策略如下:

  1. 如果记录中指定了分区,使用指定的分区。
    这意味着生产者在发送消息时明确指定了一个分区号,Kafka将直接使用这个分区。
  2. 如果没有指定分区但存在键,根据键的哈希值选择分区。
    如果消息中没有明确指定分区,但是提供了一个键,Kafka会使用键的哈希值来计算应该使用哪个分区。这可以保证具有相同键的消息被发送到相同的分区,从而保证消息的有序性。
  3. 如果既没有指定分区也没有键,选择一个“sticky partition”(粘性分区),在批次满时更换分区。
    如果消息既没有指定分区,也没有键,Kafka将使用“sticky partition”策略。这种策略会在消息批次满时更换分区,以便于提高效率和性能。

这个注释是对DefaultPartitioner类的说明,该类实现了Partitioner接口,用于定义消息分区的策略。DefaultPartitioner类的主要功能就是根据上述规则决定消息发送到哪个分区。以下是DefaultPartitioner类的示例实现结构:

public class DefaultPartitioner implements Partitioner {// 初始化方法@Overridepublic void configure(Map<String, ?> configs) {// 配置代码}// 计算分区的方法@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 分区计算逻辑}// 清理资源的方法@Overridepublic void close() {// 资源清理代码}
}

该实现中包括了三个主要的方法:

  • configure(Map<String, ?> configs):用于初始化和配置分区器。
  • partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster):用于根据给定的主题、键和值来计算应该使用哪个分区。
  • close():用于在分区器关闭时清理资源。

在这里插入图片描述

案例一: 将数据发往指定 partition 的情况下,例如,将所有数据发往分区 1 中。

import org.apache.kafka.clients.producer.*;import java.util.Properties;
public class CustomProducerCallbackPartitions {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102
:9092 ");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());KafkaProducer<String, String> kafkaProducer = newKafkaProducer<>(properties);for (int i = 0; i < 5; i++) {// 指定数据发送到 1 号分区,key 为空(IDEA 中 ctrl + p 查看参数)kafkaProducer.send(new ProducerRecord<>("first",1, "", "atguigu " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata,Exception e) {if (e == null) {System.out.println(" 主题: " +metadata.topic() + "->" + "分区:" + metadata.partition());} else {e.printStackTrace();}}});}kafkaProducer.close();}
}

案例二: 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值。

public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());KafkaProducer<String, String> kafkaProducer = newKafkaProducer<>(properties);for (int i = 0; i < 5; i++) {// 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,分别发往 1、2、0kafkaProducer.send(new ProducerRecord<>("first","a", "atguigu " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata,Exception e) {if (e == null) {System.out.println(" 主题: " +metadata.topic() + "->" + "分区:" + metadata.partition());} else {e.printStackTrace();}}});}kafkaProducer.close();}
}

3. 自定义分区器

如果研发人员可以根据企业需求,自己重新实现分区器

  1. 需求
    例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,不包含 atguigu,就发往 1 号分区。

  2. 实现步骤
    定义类实现 Partitioner 接口
    重写 partition()方法

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;/*** 1. 实现接口 Partitioner* 2. 实现 3 个方法:partition,close,configure* 3. 编写 partition 方法,返回分区号*/
public class MyPartitioner implements Partitioner {/*** 返回信息对应的分区** @param topic 主题* @param key 消息的 key* @param keyBytes 消息的 key 序列化后的字节数组* @param value 消息的 value* @param valueBytes 消息的 value 序列化后的字节数组* @param cluster 集群元数据可以查看分区信息* @return*/@Overridepublic int partition(String topic, Object key, byte[]keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取消息String msgValue = value.toString();// 创建 partitionint partition;// 判断消息是否包含 atguiguif (msgValue.contains("atguigu")) {partition = 0;} else {partition = 1;}// 返回分区号return partition;}// 关闭资源@Overridepublic void close() {}// 配置方法@Overridepublic void configure(Map<String, ?> configs) {}
}

使用分区器的方法,在生产者的配置中添加分区器参数。

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class CustomProducerCallbackPartitions {public static void main(String[] args) throws InterruptedException {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 添加自定义分区器properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atguigu.kafka.producer.MyPartitioner");KafkaProducer < String, String > kafkaProducer = newKafkaProducer<>(properties);for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata,Exception e) {if (e == null) {System.out.println(" 主题: " +metadata.topic() + "->" + "分区:" + metadata.partition());} else {e.printStackTrace();}}});}kafkaProducer.close();}
}

测试
①在 hadoop102 上开启 Kafka 消费者

[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 控制台观察回调信息

主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/853397.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MyBatis的逆向工程详细步骤操作

1. MyBatis的逆向工程详细步骤操作 文章目录 1. MyBatis的逆向工程详细步骤操作2. 逆向工程配置与生成2.1 MyBatis3Simple&#xff1a;基础版&#xff0c;只有基本的增删改查2.1.1 第一步&#xff1a;在pom.xml 中添加逆向工程插件2.1.2 第二步&#xff1a;配置 generatorConfi…

Ubuntu 18.04下普通用户的一次提权过程

Ubuntu 18.04下普通用户的一次提权过程 一.背景介绍:二.主要调试过程:三.相关命令:1.设置BMC密码,获取BMC IP2.找一台ubuntu搭建TFTP服务,用来替换grub.cfg文件3.从调试服务器的/boot/grub/grub.cfg中提取出recovery mode的配置,简化并生成新的配置文件grub.cfg,放在tftp服务的…

万能破题方法包(3)暴力破解法

一、前言 暴力破解法是指通过尝试所有可能的密码组合来破解密码 1.1、概念 暴力破解法是一种通过尝试所有可能的密码组合来破解密码的方法。它基于暴力的方式&#xff0c;不依赖于任何密码漏洞或特殊技巧&#xff0c;而是通过穷举所有可能性来找到正确的密码。 1.2、解决步骤 …

Qt项目天气预报(2) - 重写事件函数

鼠标右键实现退出界面 知识点QMenu: QMenu 弹出对话框 --> 相对QMessageBox 更加轻量点 QMenu是Qt库中用于创建弹出式菜单的类&#xff0c;它通常出现在应用程序的顶部菜单栏、按钮的右键菜单或自定义上下文菜单中。以下是关于QMenu的详细介绍&#xff1a; 1. 类的基本特…

【WEB前端2024】3D智体编程:乔布斯3D纪念馆-第38课-密室逃脱-3D互动剧情

【WEB前端2024】3D智体编程&#xff1a;乔布斯3D纪念馆-第38课-密室逃脱 使用dtns.network德塔世界&#xff08;开源的智体世界引擎&#xff09;&#xff0c;策划和设计《乔布斯超大型的开源3D纪念馆》的系列教程。dtns.network是一款主要由JavaScript编写的智体世界引擎&…

热镀锌钢板耐液体性能测 彩钢板抗拉强度检测

钢板检测范围&#xff1a;钢板、彩钢板、不锈钢板、耐磨钢板、合金钢板、压型钢板、冷轧钢板、弹簧钢板、碳钢板、热轧钢板、厚钢板、热镀锌钢板、冲孔钢板、船用钢板、硅钢板、花纹钢板、压力容器钢板、耐候钢板、 钢板检测项目包括化学性能检测、性能检测、机械性能检测、老…

AI办公自动化:kimi批量搜索提取PDF文档中特定文本内容

工作任务&#xff1a;PDF文档中有资料来源这一行&#xff0c;比如&#xff1a; 资料来源&#xff1a;moomoo tech、The Information、Bloomberg、Reuters&#xff0c;浙商证券研究所 数据来源&#xff1a;CSDN、浙商证券研究所 数据来源&#xff1a;CSDN、arXiv、浙商证券研…

C++ 19 之 封装

c19封装.cpp #include <iostream> #include <string.h> using namespace std;// 封装&#xff1a;将行为和属性作为一个整体来表现生活中的事物// 人&#xff1a; 行为&#xff1a; 吃饭 属性&#xff1a; 姓名、年龄 struct person {char name[20];int age;…

掌握特劳特定位理论核心,明晰企业战略定位之重

在当今瞬息万变的市场环境中&#xff0c;企业战略定位的重要性日益凸显。它不仅是企业在激烈竞争中保持优势的关键&#xff0c;更是企业实现长期可持续发展的基石。 哈佛大学战略学教授迈克尔波特&#xff08;Michael Porter&#xff09;指出战略就是形成一套独具的运营活动&a…

为什么微信输入法是比搜狗输入法更好的选择?

微信输入法官网&#xff1a;https://z.weixin.qq.com/ 最近使用搜狗输入法时&#xff0c;频繁弹出广告&#xff0c;实在令人烦恼&#xff0c;于是我干脆卸载了它。然而&#xff0c;电脑上没有输入法是不行的。经过在网上对比了许多输入法软件后&#xff0c;我发现了微信输入法。…

skywalking9.4 链路追踪

下载&#xff0c;很慢很慢很慢&#xff01;&#xff01;&#xff01;&#xff01; jdk 使用jdk17 skywalking-apm 9.4 java-agent 9.0 idea 本地开发配置 第1行配置按实际来&#xff1b; 第2行自定义&#xff0c;一般和微服务名称相同&#xff1b; 第3行ip写安装的机器ip,端…

装备名称检索与推荐

1、引言 在这个信息爆炸的时代&#xff0c;无论是军事爱好者、科研工作者&#xff0c;还是户外探险者&#xff0c;他们都需要快速准确地获取特定装备的信息。装备名称检索推荐系统正是为了应对这一挑战而生。它像一位经验丰富的向导&#xff0c;引领用户穿越复杂的装备海洋&am…

微信公众号打通与登录的实现

今天实现一下与微信公众号进行对接&#xff0c;通过扫描二维码的方式来进行注册与登录&#xff0c;获取用户的微信唯一标识作为用户的username&#xff0c;下面我们开始编写。 骨架建立&#xff1a; 建包&#xff1a; 第一步还是先将骨架建好&#xff0c;与网关骨架差不多&a…

玄机平台应急响应—MySQL应急

前言 这个是比较简单的&#xff0c;其实和MySQL没啥太大的关系&#xff0c;没涉及太多MySQL的知识。看一下它的flag要求吧。 flag1 它说黑客写入的shell&#xff0c;那我们就去它的网站目录去看看&#xff0c;果然有一个叫sh.php的文件。 flag1{ccfda79e-7aa1-4275-bc26-a61…

DeepDriving | CUDA编程-05:流和事件

本文来源公众号“DeepDriving”&#xff0c;仅用于学术分享&#xff0c;侵权删&#xff0c;干货满满。 原文链接&#xff1a;CUDA编程-05&#xff1a;流和事件 1 CUDA流 在CUDA中有两个级别的并发&#xff1a;内核级并发和网格级并发。前面的文章DeepDriving | CUDA编程-04&…

ESP32 BLE学习(0) — 基础架构

前言 &#xff08;1&#xff09;学习本文之前&#xff0c;需要先了解一下蓝牙的基本概念&#xff1a;BLE学习笔记&#xff08;0.0&#xff09; —— 基础概念&#xff08;0&#xff09; &#xff08;2&#xff09; 学习一款芯片的蓝牙肯定需要先简单了解一下该芯片的体系结构&a…

园区地图导航系统:技术原理、部署方案与智能化应用解析

随着智能化时代的到来&#xff0c;园区管理面临诸多挑战。维小帮园区地图导航系统&#xff0c;采用前沿技术&#xff0c;为园区提供全面的导航解决方案&#xff0c;极大提升了园区管理效率和用户体验。 一、园区地图导航系统的功能特点 维小帮园区地图导航系统&#xff0c;以其…

MongoDB 多层级查询

多层级查询 注意&#xff1a;要注意代码顺序 查询层级数据代码放前面&#xff0c;查询条件放后面 if (StringUtils.isBlank(params.getDocType())) {params.setDocType(DOC_TDCTYPE);}String docName mapper.findByDocInfo(params.getDocType());List<ExpertApprovalOpin…

智能化状态管理:自动状态流转处理模块

目录 基本背景介绍 具体实现 基本数据准备 基本数据表 状态转换常量 状态转换注解 任务处理模版 各任务实现逻辑 开启比对任务进行处理 降噪字段处理任务处理 开启业务数据比对处理 业务数据比对处理 开始核对数据生成最终报告处理 核对数据生成最终报告处理 状…

AI项目二十二:行人属性识别

若该文为原创文章&#xff0c;转载请注明原文出处。 分享一个行人属性分析系统&#xff0c;识别行人&#xff0c;并标记每个人的属性。 项目代码来自公众号渡码的项目。 本人用Win10复现完整项目&#xff0c;并记录过程。 源码会上传到github,可以自行下载测试。 Yinyifen…