Kafka3.0.0版本——消费者(Sticky分区分配策略以及再平衡)

目录

    • 一、Sticky分区分配策略原理
    • 二、Sticky分区分配策略 示例需求
    • 三、Sticky分区分配策略代码案例
      • 3.1、创建带有7个分区的sevenTopic主题
      • 3.2、创建三个消费者 组成 消费者组
      • 3.3、创建生产者
      • 3.4、测试
      • 3.5、Sticky分区分配策略代码案例说明
    • 四、Sticky分区分配再平衡案例
      • 4.1、停止某一个消费者后,(45s 以内)重新发送消息示例
      • 4.2、停止某一个消费者后,(45s 以后)重新发送消息示例
      • 4.3、Sticky分区分配再平衡案例说明

一、Sticky分区分配策略原理

  • 粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
  • 粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化

二、Sticky分区分配策略 示例需求

  • 设置主题为 sevenTopic,7个分区;准备 3 个消费者,采用粘性分区策略,并进行消费,观察
    消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。

三、Sticky分区分配策略代码案例

3.1、创建带有7个分区的sevenTopic主题

  • 在 Kafka 集群控制台,创建带有7个分区的sevenTopic主题

    bin/kafka-topics.sh --bootstrap-server 192.168.136.27:9092 --create --partitions 7 --replication-factor 1 --topic sevenTopic
    

    在这里插入图片描述

3.2、创建三个消费者 组成 消费者组

  • 复制 CustomConsumer1类,创建 CustomConsumer2和CustomConsumer3。这样可以由三个消费者组成消费者组,组名都为“test2”,设置分区分配策略为 Sticky。

    package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;public class CustomConsumer1 {public static void main(String[] args) {// 0 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test2");// 设置分区分配策略properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");// 1 创建一个消费者  "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 sevenTopicArrayList<String> topics = new ArrayList<>();topics.add("sevenTopic");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
    }
    

3.3、创建生产者

  • 创建CustomProducer生产者。

    package com.xz.kafka.producer;import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {//1、创建 kafka 生产者的配置对象Properties properties = new Properties();//2、给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");//3、指定对应的key和value的序列化类型 key.serializer value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//4、创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//5、调用 send 方法,发送消息for (int i = 0; i < 200; i++) {kafkaProducer.send(new ProducerRecord<>("sevenTopic", "hello kafka" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null){System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());}}});Thread.sleep(2);}// 3 关闭资源kafkaProducer.close();}
    }
    

3.4、测试

  • 首先,在 IDEA中分别启动消费者1、消费者2和消费者3代码
    在这里插入图片描述

  • 然后,在 IDEA中分别启动生产者代码
    在这里插入图片描述

  • 在 IDEA 控制台观察消费者1、消费者2和消费者3控制台接收到的数据,如下图所示:
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

3.5、Sticky分区分配策略代码案例说明

  • 由上述测试输出结果截图可知: 消费者1消费0、2、4分区的数据;消费者2消费1、3分区的数据;消费者3消费5、6分区的数据。
  • 说明:Kafka 采用修改后的Sticky分区分配策略。

四、Sticky分区分配再平衡案例

4.1、停止某一个消费者后,(45s 以内)重新发送消息示例

  • 由下图控制台输出可知:2号消费者 消费到 2号分区数据。
    在这里插入图片描述

  • 由下图控制台输出可知:3号消费者 消费到 0、4号分区数据。
    在这里插入图片描述

4.2、停止某一个消费者后,(45s 以后)重新发送消息示例

  • 由下图控制台输出可知:2号消费者 消费到 1、2、3号分区数据。
    在这里插入图片描述

  • 由下图控制台输出可知:3号消费者 消费到 0、4、5、6号分区数据。
    在这里插入图片描述

4.3、Sticky分区分配再平衡案例说明

  • 1 号消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别由 2号消费者或者 3号消费者消费。

  • 1号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

  • 消费者 1 已经被踢出消费者组,所以重新按照粘性方式分配。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/75124.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis 删除策略

文章目录 Redis 删除策略一、过期数据二、数据删除策略1、定时删除2、惰性删除3、定期删除4、删除策略对比 三、逐出算法 Redis 删除策略 一、过期数据 Redis是一种内存级数据库&#xff0c;所有数据均存放在内存中&#xff0c;内存中的数据可以通过TTL指令获取其状态 XX &a…

【李自然说】在创业路演现场,3分钟打动投资人 (notebook)[Updating]

【李自然说】在创业路演现场&#xff0c;3分钟打动投资人_哔哩哔哩_bilibili 融资的核心问题&#xff08;三分钟内讲清楚&#xff09; - 竞争者做什么&#xff0c;我们做什么 &#xff08;是否有&#xff0c;没有那潜在competitor是谁&#xff0c;有没有可能变成合作伙伴&#…

构建高效实时数据流水线:Flink、Kafka 和 CnosDB 的完美组合

当今的数据技术生态系统中&#xff0c;实时数据处理已经成为许多企业不可或缺的一部分。为了满足这种需求&#xff0c;Apache Flink、Apache Kafka和CnosDB等开源工具的结合应运而生&#xff0c;使得实时数据流的收集、处理和存储变得更加高效和可靠。本篇文章将介绍如何使用 F…

java的动态代理如何实现

一. JdkProxy jdkproxy动态代理必须基于接口(interface)实现 接口UserInterface.java public interface UserService {String getUserName(String userCde); }原始实现类&#xff1a;UseServiceImpl.java public class UserServiceImpl implements UserSerice {Overridepub…

举例说明PyTorch函数torch.cat与torch.stack的区别

一、torch.cat与torch.stack的区别 torch.cat用于在给定的维度上连接多个张量&#xff0c;它将这些张量沿着指定维度堆叠在一起。 torch.stack用于在新的维度上堆叠多个张量&#xff0c;它会创建一个新的维度&#xff0c;并将这些张量沿着这个新维度堆叠在一起。 二、torch.…

原生js之dom表单改变和鼠标常用事件

那么好,本次我们聊聊表单改变时如何利用onchange方法来触发input改变事件以及鼠标常用的滑入滑出,点击down和点击up事件. 关于onchange方法 onchange方法在鼠标输入完后点击任何非输入框位置时触发.触发时即可改变原有输入框的值. out 、leave、over、down、up鼠标方法 当用…

React refers to UMD global, but the current file is a module vite初始化react项目

vite搭建react项目 初始化项目 npm create vite 在执行完上面的命令后&#xff0c;npm 首先会自动下载create-vite这个第三方包&#xff0c;然后执行这个包中的项目初始化逻辑。输入项目名称之后按下回车&#xff0c;此时需要选择构建的前端框架&#xff1a; ✔ Project na…

932. 漂亮数组

932. 漂亮数组 原题链接&#xff1a;完成情况&#xff1a;解题思路&#xff1a;参考代码&#xff1a; 原题链接&#xff1a; 932. 漂亮数组 https://leetcode.cn/problems/beautiful-array/description/ 完成情况&#xff1a; 解题思路&#xff1a; nums 是由范围 [1, n] 的…

jmeter如何压测和存储

一、存储过程准备&#xff1a; 1、建立一个空表&#xff1a; 1 CREATE TABLE test_data ( id NUMBER, name VARCHAR2(50), age NUMBER ); 2、建立一个存储过程&#xff1a; CREATE OR REPLACE PROCEDURE insert_test_data(n IN NUMBER) ASBEGIN--EXECUTE IMMEDIATE trunca…

4个维度讲透ChatGPT技术原理,揭开ChatGPT神秘技术黑盒!(文末送书)

&#x1f935;‍♂️ 个人主页&#xff1a;艾派森的个人主页 ✍&#x1f3fb;作者简介&#xff1a;Python学习者 &#x1f40b; 希望大家多多支持&#xff0c;我们一起进步&#xff01;&#x1f604; 如果文章对你有帮助的话&#xff0c; 欢迎评论 &#x1f4ac;点赞&#x1f4…

浏览器进程,性能指标,性能优化

目录 浏览器进程&#xff1a;多进程 主进程&#xff1a;显示、交互&#xff0c;增删进程 UI进程&#xff1a;控制地址栏、书签、前进后退 存储进程&#xff1a;cookie&#xff0c;webstorage&#xff0c;indexDB 渲染进程&#xff1a;每个标签页或窗口都有一个独立的渲染进…

【深度学习】分类损失函数解析

【深度学习】分类相关的损失解析 文章目录 【深度学习】分类相关的损失解析1. 介绍2. 解析3. 代码示例 1. 介绍 在分类任务中&#xff0c;我们通常使用各种损失函数来衡量模型输出与真实标签之间的差异。有时候搞不清楚用什么&#xff0c;下面是几种常见的分类相关损失函数及其…

Boost搜索引擎

项目背景 先说一下什么是搜索引擎,很简单,就是我们平常使用的百度,我们把自己想要所有的内容输入进去,百度给我们返回相关的内容.百度一般给我们返回哪些内容呢?这里很简单,我们先来看一下. 搜索引擎基本原理 这里我们简单的说一下我们的搜索引擎的基本原理. 我们给服务器发…

Fast RCNN

【简介】 Fast RCNN[6]网络是RCNN和SPPNet的改进版&#xff0c;该网路使得我们可以在相同的网络配置下同时训练一个检测器和边框回归器。该网络首先输入图像&#xff0c;图像被传递到CNN中提取特征&#xff0c;并返回感兴趣的区域ROI&#xff0c;之后再ROI上运用ROI池化层以保证…

如何用Jmeter提取和引用Token

1.执行获取token接口 在结果树这里&#xff0c;使用$符号提取token值。 $根节点&#xff0c;$.data.token表示提取根节点下的data节点下的token节点的值。 2.使用json提取器&#xff0c;提取token 变量路径就是把在结果树提取的路径写上。 3.使用BeanShell取样器或者BeanShell后…

浅谈泛在电力物联网、能源互联网与虚拟电厂

导读&#xff1a;从能源互联网推进受阻&#xff0c;到泛在电力物联网名噪一时&#xff0c;到虚拟电厂再次走向火爆&#xff0c;能源领域亟需更进一步的数智化发展。如今&#xff0c;随着新型电力系统建设推进&#xff0c;虚拟电厂有望迎来快速发展。除了国网和南网公司下属的电…

常见排序算法

排序简介常见排序算法插入排序直接插入排序希尔排序 选择排序选择排序堆排序 交换排序冒泡排序快速排序hoare版挖坑法前后指针法非递归实现快排优化 归并排序非递归实现归并排序海量数据排序问题 基数排序&#xff08;不用比较就能够排序&#xff09;桶排序计数排序&#xff08…

常用消息中间件有哪些

RocketMQ 阿里开源&#xff0c;阿里参照kafka设计的&#xff0c;Java实现 能够保证严格的消息顺序 提供针对消息的过滤功能 提供丰富的消息拉取模式 高效的订阅者水平扩展能力 实时的消息订阅机制 亿级消息堆积能力 RabbitMQ Erlang实现&#xff0c;非常重量级&#xff0c;更适…

一百七十三、Flume——Flume写入HDFS后的诸多小文件问题

一、目的 在用Flume采集Kafka中的数据写入HDFS后&#xff0c;发现写入HDFS的不是每天一个文件&#xff0c;而是一个文件夹&#xff0c;里面有很多小文件&#xff0c;浪费namenode的宝贵资源 二、Flume的配置文件优化&#xff08;参考了其他博文&#xff09; &#xff08;一&a…

OpenCV(二十四):可分离滤波

目录 1.可分离滤波的原理 2.可分离滤波函数sepFilter2D() 3.示例代码 1.可分离滤波的原理 可分离滤波的原理基于滤波器的可分离性。对于一个二维滤波器&#xff0c;如果它可以表示为水平方向和垂直方向两个一维滤波器的卷积&#xff0c;那么它就是可分离的。也就是说&#x…