Kafka 消费者组 Rebalance 详解

Rebalance作用

Rebalance 本质上是一种协议,主要作用是为了保证消费者组(Consumer Group)下的所有消费者(Consumer)消费的主体分区达成均衡。

比如:我们有10个分区,当我们有一个消费者时,该消费者消费10个分区,当我们增加一个消费者,理论上每个消费者消费5个分区,这个分配的过程我们成为Rebalance(重平衡)

 

触发条件

常见的有三种情况会触发Rebalance:

  • 组成员数发生变更
  • 订阅主题数发生变更
  • 订阅主题的分区数发生变更

 

缺点

  • Rebalance时所有消费者无法消费数据
  • Rebalance速度慢
  • Rebalance 效率不高

Coordinator(协调者)介绍

Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。

所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,所有 Broker 都有各自的 Coordinator 组件。

 

如何避免 Rebalance

最简单粗暴的就是 : 减少组成员数量发生变化

每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求,表明它还存活着。如果某个 Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该 Consumer 已经“死”了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。Consumer 端有个参数,叫 session.timeout.ms,就是被用来表征此事的。该参数的默认值是 10 秒,即如果 Coordinator 在 10 秒之内没有收到 Group 下某 Consumer 实例的心跳,它就会认为这个 Consumer 实例已经挂了。可以这么说,session.timout.ms 决定了 Consumer 存活性的时间间隔。

除了这个参数,Consumer 还提供了一个允许你控制发送心跳请求频率的参数,就是 heartbeat.interval.ms。这个值设置得越小,Consumer 实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance,因为,目前 Coordinator 通知各个 Consumer 实例开启 Rebalance 的方法,就是将 REBALANCE_NEEDED 标志封装进心跳请求的响应体中。

除了以上两个参数,Consumer 端还有一个参数,用于控制 Consumer 实际消费能力对 Rebalance 的影响,即 max.poll.interval.ms 参数。它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。

 

触发Rebalance时Broker服务端与Consumer客户端日志

增加消费者时

当增加一个消费者进程时,broker server.log中GroupCoordinator 打印日志如下

[2020-03-28 23:03:59,453] INFO [GroupCoordinator 0]: Preparing to rebalance group test-consumer in state PreparingRebalance with old generation 7 (__consumer_offsets-23) (reason: Adding new member consumer-1-b1c600af-a17a-465d-9b43-01fd82df0b8f with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
[2020-03-28 23:04:02,005] INFO [GroupCoordinator 0]: Stabilized group test-consumer generation 8 (__consumer_offsets-23) (kafka.coordinator.group.GroupCoordinator)
[2020-03-28 23:04:02,008] INFO [GroupCoordinator 0]: Assignment received from leader for group test-consumer for generation 8 (kafka.coordinator.group.GroupCoordinator)

在Consumer客户端Debug日志中有以下信息提示,说明已经该group产生了Rebalance

23:04:07,379 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-1, groupId=test-consumer] Sending JoinGroup ((type: JoinGroupRequest, groupId=test-consumer, sessionTimeout=10000, rebalanceTimeout=10000, memberId=consumer-1-ccc4eca4-04a4-4547-910f-142c58506c14, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@442675e1)) to coordinator DESKTOP-I0EG1MJ.localdomain:9092 (id: 2147483647 rack: null)
23:04:07,379 DEBUG org.apache.kafka.clients.NetworkClient                        - [Consumer clientId=consumer-1, groupId=test-consumer] Sending JOIN_GROUP {group_id=test-consumer,session_timeout=10000,rebalance_timeout=10000,member_id=consumer-1-ccc4eca4-04a4-4547-910f-142c58506c14,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=22 cap=22]}]} with correlation id 146 to node 2147483647

减少消费者时

当减少一个消费者组的进程时,broker server.log中GroupCoordinator 打印日志如下

[2020-03-28 23:04:39,367] INFO [GroupCoordinator 0]: Member consumer-1-b1c600af-a17a-465d-9b43-01fd82df0b8f in group test-consumer has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-03-28 23:04:39,367] INFO [GroupCoordinator 0]: Preparing to rebalance group test-consumer in state PreparingRebalance with old generation 8 (__consumer_offsets-23) (reason: removing member consumer-1-b1c600af-a17a-465d-9b43-01fd82df0b8f on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2020-03-28 23:04:43,765] INFO [GroupCoordinator 0]: Stabilized group test-consumer generation 9 (__consumer_offsets-23) (kafka.coordinator.group.GroupCoordinator)
[2020-03-28 23:04:43,768] INFO [GroupCoordinator 0]: Assignment received from leader for group test-consumer for generation 9 (kafka.coordinator.group.GroupCoordinator)

在未停止的Consumer客户端Debug日志中有以下信息提示,说明已经该group产生了Rebalance

23:04:56,507 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-1, groupId=test-consumer] Sending JoinGroup ((type: JoinGroupRequest, groupId=test-consumer, sessionTimeout=10000, rebalanceTimeout=10000, memberId=consumer-1-ccc4eca4-04a4-4547-910f-142c58506c14, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@49e202ad)) to coordinator DESKTOP-I0EG1MJ.localdomain:9092 (id: 2147483647 rack: null)
23:24:56,507 DEBUG org.apache.kafka.clients.NetworkClient                        - [Consumer clientId=consumer-1, groupId=test-consumer] Sending JOIN_GROUP {group_id=test-consumer,session_timeout=10000,rebalance_timeout=10000,member_id=consumer-1-ccc4eca4-04a4-4547-910f-142c58506c14,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=22 cap=22]}]} with correlation id 321 to node 2147483647

注意:注意对于不同topic,使用相同consumer group,如果有一个消费者程序停止或新增,所有相同consumer group都会Rebalance

 

所以在我们日常开发中,不想干的业务也要避免Consumer Group 设置成不相同的

 

Rebalance 学习思维导图

下面是我大概整理的Rebalance的知识点参考《Kafka核心技术与实战》

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509742.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Kafka JMX 监控 之 jmxtrans + influxdb + grafana

目录 效果图 环境准备 安装 influxdb 安装我们刚刚下载 influxdb rpm文件 查看默认配置 修改参数 启动 influxdb 查看启动状态 设置基本配置 influxdb 其他命令扩展 安装 jmxtrans 可能遇到的异常 验证jmxtrans是否成功运行 安装 Grafana 安装 influxDB 与 Grafa…

实例浅析epoll的水平触发和边缘触发,以及边缘触发为什么要使用非阻塞IO

一.基本概念 我们通俗一点讲: Level_triggered(水平触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如…

UML序列图

UML学习(三)-----序列图 UML的模型中可分为两种,动态模型和静态模型。用例图、类图和对象图都是UML中的静态结构模型。而在UML系统动态模型的其中一种就是交互视图,它描述了执行系统功能的各个角色之间相互传递消息的顺序关系。序…

OpenTSDB 开发指南之 查询数据

前面博主写了一篇文章去介绍opentsdb的http接口的使用方法,但是某一些接口的使用还是比较复杂,这篇文章会通过example来详细讲述opentsdb的一些特性。 本文的举的例子有这些: 基本的写入和查询数据的注释和说明子查询查询中的filters使用查询数据的rat…

libcurl使用方法

原文地址:http://curl.haxx.se/libcurl/c/libcurl-tutorial.html 译者:JGood(http://blog.csdn.net/JGood ) 译者注:这是一篇介绍如何使用libcurl的入门教程。文档不是逐字逐句按原文翻译,而是根据笔者对libcurl的理解&#xff0c…

OpenTSDB 开发指南之 Grafana 展示OpenTSDB监控数据

目录 准备数据 在Grafana创建OpenTSDB连接 创建一个仪表盘 统计 准备数据 将数据插入OpenTSDB {"metric":"jast.data","value":1023,"timestamp":1588742563,"tags":{"type":"jast-graph-data"}}…

CDH 版本 Kafka 外网设置

登陆CDH页面,进入Kafka配置页面 搜索 advertised 修改advertised.host.name,这里我们有三台Broker,我们把每台的外网ip填写到对应的机器上 advertised.port不填写 我们kafka的端口设置的是9099 将外网端口9099开放,允许外网访问 (这里不做介绍

OpenTSDB 安装

下载目录 https://github.com/OpenTSDB/opentsdb/releases https://github.com/OpenTSDB/opentsdb/releases/download/v2.4.0/opentsdb-2.4.0.noarch.rpm 安装 GnuPlot yum install gnuplot -y 直接安装OpenTSDB会报错 [rootecs-t-001-0001 openTSDB]# rpm -ivh opentsdb-2.…

HBase原理 – snapshot 快照

目录 snapshot(快照)基础原理 snapshot能实现什么功能? hbase snapshot用法大全 hbase snapshot分布式架构-两阶段提交 snapshot核心实现 clone_snapshot如何实现呢? 其他需要注意的 参考文献 更多信息可参考《…

linux如何自动化部署脚本实现免密登录并访问资源

任务把weijie主机jdk文件安装到weijie1中。 首先再各台主机中安装必要的命令: expect、wget、httpd、ssh 执行命令 如:expect提示命令不存在,则分别安装命令 yum install expect yum install wget yum install httpd yum install ssh 开…

时序数据库技术体系 – InfluxDB TSM存储引擎之数据读取

任何一个数据库系统内核关注的重点无非:数据在内存中如何存储、在文件中如何存储、索引结构如何存储、数据写入流程以及数据读取流程。关于InfluxDB存储内核,笔者在之前的文章中已经比较全面的介绍了数据的文件存储格式、倒排索引存储实现以及数据写入流…

java多线程之生产者和消费者问题

线程通信:不同的线程执行不同的任务,如果这些任务有某种关系,线程之间必须能够通信,协调完成工作. 经典的生产者和消费者案例(Producer/Consumer):分析案例:1):生产者和消费者应该操作共享的资源(实现方式来做).2):使用一个或多个线程来表示生产者(Producer).3):使用一个或多个…

时序数据库技术体系 – InfluxDB TSM存储引擎之数据写入

之前两篇文章笔者分别从TSM File文件存储格式、倒排索引文件存储格式这两个方面对InfluxDB最基础、最底层也最核心的存储模块进行了介绍,接下来笔者会再用两篇文章在存储文件的基础上分别介绍InfluxDB是如何处理用户的写入(删除)请求和读取请…

zookeeper结构和命令详解

1.1. zookeeper特性1、Zookeeper:一个leader,多个follower组成的集群 2、全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个server,数据都是一致的 3、分布式读写,更新请求转发&#xf…

时序数据库技术体系 – InfluxDB 多维查询之倒排索引

在时序数据库概述一文中,笔者提到时序数据库的基础技术栈主要包括高吞吐写入实现、数据分级存储|TTL、数据高压缩率、多维度查询能力以及高效聚合能力等,上文《时序数据库技术体系 – InfluxDB存储引擎TSM》基于InfluxDB存储引擎TSM介绍了时序…

OSG框架分析

本文参考<<osg最长一帧>>, <<OpenSceneGraph三维渲染引擎编程指南>>, <<OpenSceneGraph三维渲染引擎设计与实践>> 整理而来, 感谢大牛们的精彩著作. 相比Ogre来说, Ogre代码很规范, 只是入门资料较少,如果能在学习之前能总体上对架构有个…

在Eclipse中如何操作zookpeer

导入jar包 jar包下载链接 代码解析 package com.itcast.zookpeer.zk;import java.io.IOException; import java.util.List;import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apac…

Linux 系统进程守护工具 cesi + superviosr

一、安装 Supervisor pip install supervisor 使用 echo_supervisord_conf 命令生成默认配置文件 echo_supervisord_conf > /etc/supervisord.conf 配置文件说明 位置&#xff1a;etc/supervisord.conf内容&#xff1a;# 指定了socket file的位置 [unix_http_server] f…

Docker 服务器安装(一)

使用官方安装脚本自动安装 安装命令如下&#xff1a; curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun 也可以使用国内 daocloud 一键安装命令&#xff1a; curl -sSL https://get.daocloud.io/docker | sh 设置docker 加速器 sudo curl -sSL https…

Docker 入门使用 (二)

配置国内的源 > /etc/docker/daemon.json{"registry-mirrors" : ["https://mirror.ccs.tencentyun.com","http://registry.docker-cn.com","http://docker.mirrors.ustc.edu.cn","http://hub-mirror.c.163.com"],"…