Kafka官方提供的RoundRobinPartitioner出现奇偶数据不均匀

Kafka官方提供的RoundRobinPartitioner出现奇偶数据不均匀

参考:
https://www.cnblogs.com/cbc-onne/p/18140043

  1. 使用RoundRobinPartitioner
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at**    http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.kafka.clients.producer;import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** The "Round-Robin" partitioner - MODIFIED TO WORK PROPERLY WITH STICKY PARTITIONING (KIP-480)* <p>* This partitioning strategy can be used when user wants to distribute the writes to all* partitions equally. This is the behaviour regardless of record key hash.*/
public class RoundRobinPartitioner implements Partitioner {private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinPartitioner.class);private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();private final ConcurrentMap<String, Queue<Integer>> topicPartitionQueueMap = new ConcurrentHashMap<>();public void configure(Map<String, ?> configs) {}/*** Compute the partition for the given record.** @param topic      The topic name* @param key        The key to partition on (or null if no key)* @param keyBytes   serialized key to partition on (or null if no key)* @param value      The value to partition on or null* @param valueBytes serialized value to partition on or null* @param cluster    The current cluster metadata*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {Queue<Integer> partitionQueue = partitionQueueComputeIfAbsent(topic);Integer queuedPartition = partitionQueue.poll();if (queuedPartition != null) {LOGGER.trace("Partition chosen from queue: {}", queuedPartition);return queuedPartition;} else {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();int nextValue = nextValue(topic);List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (!availablePartitions.isEmpty()) {int part = Utils.toPositive(nextValue) % availablePartitions.size();int partition = availablePartitions.get(part).partition();LOGGER.trace("Partition chosen: {}", partition);return partition;} else {// no partitions are available, give a non-available partitionreturn Utils.toPositive(nextValue) % numPartitions;}}}private int nextValue(String topic) {AtomicInteger counter =topicCounterMap.computeIfAbsent(topic,k -> {return new AtomicInteger(0);});return counter.getAndIncrement();}private Queue<Integer> partitionQueueComputeIfAbsent(String topic) {return topicPartitionQueueMap.computeIfAbsent(topic, k -> {return new ConcurrentLinkedQueue<>();});}public void close() {}/*** Notifies the partitioner a new batch is about to be created. When using the sticky partitioner,* this method can change the chosen sticky partition for the new batch.** @param topic         The topic name* @param cluster       The current cluster metadata* @param prevPartition The partition previously selected for the record that triggered a new*                      batch*/@Overridepublic void onNewBatch(String topic, Cluster cluster, int prevPartition) {LOGGER.trace("New batch so enqueuing partition {} for topic {}", prevPartition, topic);Queue<Integer> partitionQueue = partitionQueueComputeIfAbsent(topic);partitionQueue.add(prevPartition);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/35469.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何提高网页加载速度?

如何以闪电般的速度加载网站&#xff1f; 看看这 8 个提升前端性能的技巧&#xff1a; 01 压缩 在传输之前压缩文件可以减少其大小&#xff0c;减少需要传输的数据量&#xff0c;从而加快加载时间。 实现方法&#xff1a; Gzip/Brotli 压缩: 配置你的 web 服务器&#xff08…

[Linux] 历史根源

UNIX系统&#xff1a; 1969年&#xff0c;由贝尔实验室的K.Thompson和D.M.Ritchie为PDP-7机器编写的一个分时操作系统&#xff0c; 最初使用汇编语言编写&#xff0c; 后来1972年C语言出世以后&#xff0c;二人由使用C写了UNIX3&#xff0c; 此后UNIX大为流行开来 UNIX流派树&a…

一个简单的盐值md5破解

声明&#xff0c;仅供学习&#xff0c;请勿用于非法用途&#xff01; 首先需要获取到salt值和密文&#xff0c;自己有字典 我是做vulhub的cmsms复现的时候&#xff0c;用他的poc跑出来密文和盐值&#xff0c;发现这个是做了字段拼接再加密&#xff0c;也就是加盐了&#xff0c…

华为交换机的堆叠-Stack配置(基于业务口普通线缆的堆叠配置)

不想看原理请跳过一、二、三、四&#xff0c; 直接到配置五&#xff0c;干完活有时间在慢慢看原理。 一、什么是堆叠-Stack 指将多台交换机通过堆叠线缆连接在一起&#xff0c;逻辑上变成一台交换设备&#xff0c;作为一个整体参与数据转发。即&#xff1a;1 1 一 二、堆叠…

ChatGPT 宣布终止对中国提供 API 服务? 来用国产大模型吧

国产大模型 阿里云 将为 OpenAI API 用户提供更具性价比的中国大模型替代方案&#xff0c;同时宣布中国开发者提供 2200 万免费 tokens 和专属迁移服务 硅基流动 SiliconCloud 免费开放 7 款大模型 智谱 AI 正式推出 OpenAI API 用户特别搬家计划&#xff0c;同时还会为开…

如何通过待办工具提升个人效率 减轻压力提升效率的待办app

在快节奏的现代社会中&#xff0c;工作任务繁重&#xff0c;人们的压力日益增大。为了减轻压力并提升工作效率&#xff0c;我们急需找到一种有效的方法来管理日常任务。幸运的是&#xff0c;随着科技的进步&#xff0c;各种新兴工具应运而生&#xff0c;为我们提供了便捷的解决…

qt报错:“QtRunWork”任务返回了 false,但未记录错误。

qt报错&#xff1a;“QtRunWork”任务返回了 false&#xff0c;但未记录错误。 说明情况一 说明 这个报错可能的原因有很多&#xff0c;这里只写一种&#xff0c;以后遇到再进行补充。 情况一 如果 Q_OBJECT 宏未正确处理&#xff0c;通常会出现类似的错误。 要使用信号与槽…

3.优化算法之二分查找1

二分查找简介 1.特点 最简单最恶心&#xff0c;细节最多&#xff0c;最容易写出死循环的算法 2.学习中的侧重点 1&#xff09;算法原理 数组有序的情况 2&#xff09; 模板 不要死记硬背 ->理解之后再记忆 1.朴素的二分模板 2.查找左边界的二分模板 3.查找右边界的二分模板 …

24年了 直播带货的未来如何?

32 个国家在取消电商&#xff0c; 那我国的电商呢&#xff0c;首先电商是不会被取缔的。直播电商会被严格的控制&#xff0c;比如有一家饼店&#xff0c;它线下的销售是 3000 万&#xff0c;线上抖音的销售是 5, 000 万。 这一类型小而精又专业的品牌企业&#xff0c;未来在抖…

java图片处理(图片逆时针旋转90度,图片剪裁截取)

base64字符串转化成图片给定坐标点&#xff0c;以及宽高范围&#xff0c;然后截取图片图片逆时针旋转90度 import org.apache.commons.codec.binary.Base64; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; import j…

Sensei for Mac:一键清理,系统如新!

Sensei for Mac是一款高效且易于使用的系统优化清理工具。它能够深入Mac系统内部&#xff0c;智能识别并清理无用的缓存文件、临时文件、垃圾邮件等&#xff0c;从而释放磁盘空间&#xff0c;提升系统性能。无论是日常使用还是长时间工作后&#xff0c;Sensei都能帮助你的Mac恢…

鸿蒙 HarmonyOS NEXT星河版APP应用开发阶段三-热门组件使用及案例

一、样式和结果重用 介绍 /* Extend:扩展组件&#xff08;样式、事件&#xff09; Styles: 抽取通用数据、事件 Builder:自定义构建函数&#xff08;结构、样式、事件&#xff09; */Extend /* 作用&#xff1a;扩展组件&#xff08;样式、事件&#xff09; 场景&#xff1a;…

实现异步操作sleep

这两个函数 sleep 和 delay 都是使用 JavaScript 的 Promise 和 setTimeout 来实现异步操作的。不过它们的功能略有不同&#xff0c;下面我为你详细解释&#xff1a; sleep 函数 sleep 函数接受一个秒数 seconds&#xff0c;并返回一个 Promise&#xff0c;该 Promise 在指定的…

封装图片占位图组件

<laze-image class="image" :url="item.image" :game_name="item.game_name" :placeholder="require(@/static/images/common/placeholder.png)"></laze-image> 1.通过调用组件实现 先加载预览图片,再加载真实的图片 2…

Vscode连接远程服务器中的docker容器

配置远程docker容器 docker run -itd --name ubuntu-hkx --mount typebind,source/home/huangkx/docker,target/docker -p2001:22 ubuntu-hkx-image 进入docker docker exec -it ubuntu-hkx bash docker安装ssh 安装完成后启动 service ssh start 配置key 把本地的~/.ssh…

中国杀出全球首个烹饪大模型

什么&#xff1f;烹饪也有大模型&#xff1f;&#xff01; 没有听错&#xff0c;这就是国产厨电龙头老板电器最新发布——“食神”大模型。 数十亿级行业数据&#xff0c;数千万级知识图谱加持&#xff0c;据称还是全球首个。 它能为每个人提供个性化量身定制的解决方案&…

Kubernetes面试整理-如何配置和使用Service, Ingress?

在 Kubernetes 中,Service 和 Ingress 是用于管理和暴露应用程序的网络访问的主要资源。以下是如何配置和使用 Service 和 Ingress 的详细指南: Service Service 是一种抽象,用于定义一组 Pod 的逻辑集合,并提供一种访问这些 Pod 的策略。Service 可以使应用程序内部或外部…

TikTok短视频矩阵系统

随着数字化时代的到来&#xff0c;短视频已成为人们获取信息、娱乐消遣的重要渠道。TikTok&#xff0c;作为全球最受欢迎的短视频平台之一&#xff0c;其背后的短视频矩阵系统是支撑其成功的关键因素。本文将深入探讨TikTok短视频矩阵系统的构成、功能以及它在新媒体时代中的影…

什么领夹麦的音质最好又降噪?揭秘多款降噪出色的无线领夹麦克风

随着短视频的兴起&#xff0c;将视频拍摄方面的外设推到了风口浪尖上&#xff0c;麦克风作为视频拍摄或者现场直播使用的主要拾音工具&#xff0c;自然成为了大家非常关注的一个摄影外设工具&#xff0c;毕竟一款好的拾音工具能够给视频创作者或者直播博主带来更好的使用体验。…

汇川H5u小型PLC作modbusRTU从站设置及测试

目录 新建工程COM通讯参数配置协议选择协议配置 查看手册Modbus地址对应关系仿真测试 新建工程 新建一个H5U工程&#xff0c;不使用临时工程 系列选择H5U即可 COM通讯参数配置 协议选择 选择ModbusRTU从站 协议配置 端口号默认不可选择 波特率这里使用9600 数据长度&…