kafka消费者api和分区分配和offset消费

kafka消费者

消费者的消费方式为主动从broker拉取消息,由于消费者的消费速度不同,由broker决定消息发送速度难以适应所有消费者的能力

拉取数据的问题在于,消费者可能会获得空数据

消费者组工作流程

Consumer Group(CG):消费者组

  • 由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。
  • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
  • 消费者组之间互不影响。所有的消费者都属于某个消费者组(即使只有一个消费者),即消费者组是逻辑上的一个订阅者
  • 分区和消费者的分配取决于具体的分配策略
  • 如果消费者组中的消费者数量超过分区数量,则会由部分消费者处于空闲状态,不会接受任何消息

在这里插入图片描述

初始化流程

在这里插入图片描述

  1. 每个broker上都有coordinator
  2. 选择coordinator作为消费者组的初始化和分区分配的协调者,使用消费者组的**groupid的hashcode%50(即__consumer_offsets的分区数量)**得到对应的broker id。该broker将作为整个消费者组的协调者。消费者组中的消费者向该分区提交offset
  3. 所有消费者向coordinator发送请求加入消费者组
  4. coordinator随机选择一个consumer作为leader
  5. 将要消费的topic情况发送给leader消费者
  6. leader制定消费方案,并将方案发送到coordinator
  7. coordinator把消费方案分发给哥哥消费者
  8. 每个消费者都会和coordinator保持心跳(默认3s)
    • 一旦超时 (session.timeout.ms=45s),该消费者会被移除,并触发再平衡;
    • 或者消费者处理消息的时间过长(max.poll.interval.ms 5分钟),也会触发再平衡
  9. 尽量避免消费者组的再平衡,非常消耗性能

消费流程

在这里插入图片描述

  1. 消费者创建ConsumerNetworkClient,用于和kafka集群进行通信
  2. 消费者开始初始化抓取数据的参数
    • fetch.min.bytes,每批次最小抓取大小,默认1字节
    • fetch.max.wait.ms,超时时间即使数据批次未达到大小也会抓取,默认500ms
    • fetch.max.bytes,每批次最大抓取大小,默认50m
  3. 参数初始化完成后,开始调用send方法发送请求
  4. 通过onSuccess回调拉取数据,存放在消息队列中
  5. 消费者开始拉取数据(max.poll.records,一次拉取数据返回消息的最大值,默认500条)
  6. 将消息进行反序列化和拦截器(kafka本身并不处理数据)

消费者相关参数

  • bootstrap.servers 向 Kafka 集群建立初始连接用到的 host/port 列表。
  • key.deserializer 和 value.deserializer 指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。
  • group.id 标记消费者所属的消费者组。
  • enable.auto.commit 默认值为 true,消费者会自动周期性地向服务器提交偏移量
  • auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s
  • auto.offset.reset 当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在 (如,数据被删除了),该如何处理?
    • earliest:自动重置偏移量到最早的偏移量。
    • latest:默认,自动重置偏移量为最新的偏移量。
    • none:如果消费组原来的(previous)偏移量 不存在,则向消费者抛异常。
    • anything:向消费者抛异常
  • offsets.topic.num.partitions ,即__consumer_offsets 的分区数,默认是 50 个分区。
  • heartbeat.interval.ms Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。 该条目的值必须小于 session.timeout.ms (45s),也不应该高于 session.timeout.ms 的 1/3。
  • session.timeout.ms Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。 超过该值,该消费者被移除,消费者组执行再平衡

消费者API

创建topic

kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic first --create --partitions 3 --replication-factor 3

独立消费者

  • 即使只有单独的消费者,也必须配置消费者组id
  • kafka命令行启动消费者,如果不填写消费者组id,则会被自动填充随机的消费者组id

订阅主题进行消费

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;public class SimpleConsumer {public static void main(String[] args) {// configureProperties properties = new Properties();// connectproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// key,value反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// create consumerproperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> topics = new ArrayList<String>();topics.add("first");kafkaConsumer.subscribe(topics);while (true) {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> ConsumerRecord : consumerRecords) {System.out.println(ConsumerRecord);}}}
}
output:
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 2,offset = 3, CreateTime = 1629169606820, serialized key size = -1,serialized value size = 8, headers = RecordHeaders(headers = [],isReadOnly = false), key = null, value = hello1)
ConsumerRecord(topic = test, partition = 1, leaderEpoch = 3,offset = 2, CreateTime = 1629169609524, serialized key size = -1,serialized value size = 6, headers = RecordHeaders(headers = [],isReadOnly = false), key = null, value = hello2)

订阅分区进行消费

...
ArrayList<TopicPartition> topics = new ArrayList<TopicPartition>();
topics.add(new TopicPartition("test", 0)); // 指定消费分区0的数据
kafkaConsumer.assign(topics);

消费者组消费数据

  • 只需要启动多个消费者即可,消费者按照消费者组的id自动归属于同一个消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
ArrayList<String> topics = new ArrayList<String>();
topics.add("first");
kafkaConsumer.subscribe(topics);

分区的分配和再平衡

分区的分配设计到同一个topic中的partition由那个consumer来消费的问题

Kafka有四种主流的分区分配策略(所谓的分区分配策略就是消费方案):

  • Range

  • RoundRobin

  • Sticky

  • CooperativeSticky

可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用 多个分区分配策略

range策略

Range 是对每个 topic 而言的。

  • 对同一个 topic 里面的分区按照序号进行排序,假如现在有 7 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4,5,6
  • 对消费者按照字母顺序进行排序。 消费者排序完之后将会是C0,C1,C2。
  • 通过 partitions数/consumer数 来决定每个消费者应该 消费几个分区。如果除不尽,那么前面几个消费者将会多消费 1 个分区。

注意:如果只是针对 1 个 topic 而言,C0消费者多消费1 个分区影响不是很大。但是如果有 N 个 topic,那么针对每 个 topic,消费者 C0都将多消费 1 个分区,topic越多,C0消费的分区会比其他消费者明显多消费 N 个分区。 容易产生数据倾斜

在这里插入图片描述

注意:

  • 修改主题的分区数,只能增加不能减少

  • 如果在消费过程中某个consumer挂掉,当超出45s后,则该consumer消费的所有分区都会整体分配给某一个其他消费者

  • 消费者被移出消费者组,消费策略按照存活的消费者重新分配分区

  • Kafka 默认的分区分配策略是 Range + CooperativeSticky

RoundRobin策略

针对所有topic而言

  • 所有的 partition 和所有的 consumer 都列出来,然后按照 hashcode 进行排序
  • 按照轮询算法将partition分配给消费者

注意

  • 需要修改分区分配策略

    properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
    
  • 如果在消费过程中某个consumer挂掉,超过45s后,该消费者的分区会重新按照轮询的方式在其他消费者中分配

  • 消费者被移出消费者组,消费策略按照存活的消费者重新分配分区

Sticky策略

在执行一次新的分配之前, 考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销

粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化

如果有0,1,2,3,4,5,6分区和C0,C1,C2消费者,则最终分配比例仍旧是223,但是每个消费者分配中的partition是随机的

注意

  • 需要修改分区分配策略

    ArrayList<String> startegys = new ArrayList<>();
    startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
    properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,startegys);
    
  • 如果在消费过程中某个consumer挂掉,超过45s后,该消费者的分区会按照粘性规则,尽可能均衡分配给其他的消费者

  • 消费者被移出消费者组,消费策略按照存活的消费者重新分配分区

offset位移

offset维护的位置在不同版本的kafka中存在区别

  • 0.9版本之前存储在zk中,如果client和zk之前存在大量网络通信,则会导致性能我呢提
  • 0.9版本后存储在kafka集群中的_consumer_offsets主题中

在内部主题中采用kv的方式存储offset

  • key的值为,group.id+topic+ 分区号
  • value的值为,当前offset
  • 每隔一段时间,kafka对主题中的数据进行compact

默认内部主题不可消费

  • 修改config/comsumer.properties文件中的参数exclude.internal.topics=false, 默认是 true,表示不能消费系统主题

  • 查看消费者消费主题

    kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server 127.0.0.1:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
    

自动提交offset

kafka提供了自动提交offset的功能,使用户专注于自身的业务逻辑

  • enable.auto.commit:是否开启自动提交offset功能,默认是true
  • auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s

在这里插入图片描述

在java消费者中添加消费者参数

// 是否自动提交 offset,实际上不用设置此参数,默认为true
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 提交 offset 的时间周期 1000ms,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

手动提交offset

手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)

相同点:都会将本次提交的一批数据最高的偏移量提交

不同点:同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故 有可能提交失败

  • commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据
  • commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据

在这里插入图片描述

在java消费者中添加消费者参数

  • 同步提交存在重试机制,因此更加可靠,但是由于阻塞提交效率较低(吞吐量低)
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//4. 设置消费主题 形参是列表
consumer.subscribe(Arrays.asList("first"));//5. 消费数据
while (true){// 读取消息ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));// 输出消息for (ConsumerRecord<String, String> consumerRecord :
consumerRecords) {System.out.println(consumerRecord.value());}// 同步提交 offsetconsumer.commitSync();// 异步提交 offset// consumer.commitAsync();}

指定offset消费

在命令行中创建消费者指定--from-beginning,表示从头开始消费。

当消费者组首次消费(没有初始偏移量时),根据以下参数进行消费行为

  • earliest,将偏移量重置为从头开始消费

  • latest(默认值),自动将偏移量重置为最新偏移量

  • none:如果未找到消费者组的先前偏移量,则向消费者抛出异常

如何在java消费者中指定offset进行消费

// 1 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new
KafkaConsumer<>(properties);// 订阅主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);Set<TopicPartition> assignment= new HashSet<>();while (assignment.size() == 0) {kafkaConsumer.poll(Duration.ofSeconds(1));// 获取消费者分区分配信息(有了分区分配信息才能开始消费)assignment = kafkaConsumer.assignment();
}// 遍历所有分区,并指定 offset 从 1700 的位置开始消费
for (TopicPartition tp: assignment) {kafkaConsumer.seek(tp, 1700);
}// 开始消费
while (true) {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}
}

指定时间开始消费

逻辑上可以通过指定某个时刻的offset来实现

Set<TopicPartition> assignment = new HashSet<>();while (assignment.size() == 0) {kafkaConsumer.poll(Duration.ofSeconds(1));// 获取消费者分区分配信息(有了分区分配信息才能开始消费)assignment = kafkaConsumer.assignment();
}
HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();// 封装集合存储,每个分区对应一天前的数据
for (TopicPartition topicPartition : assignment) {timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}// 获取从 1 天前开始消费的每个分区的 offset
Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);// 遍历每个分区,对每个分区设置消费时间。
for (TopicPartition tp : assignment) {OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);// 根据时间指定开始消费的位置if (offsetAndTimestamp != null){kafkaConsumer.seek(tp, offsetAndTimestamp.offset());}
}

漏消费和重复消费问题

重复消费问题,当前一次自动提交offset后,消费者开始消费数据2s后挂掉。此时重启consumer会从上一次自动提交的offset开始消费,导致重复消费的问题

在这里插入图片描述

漏消费问题,在手动提交offset模式下,当提交offset后如果消费者数据还未落盘出现宕机,则这部分未落盘的数据由于offset已经更新无法再次消费

在这里插入图片描述

生产环境的消费者

消费者事务

控制consumer端精准消费同样需要事务支持(要Kafka消费端将消费过程和提交offset 过程做原子绑定)

此时需要将offset保存到支持事务的介质中

在这里插入图片描述

数据积压问题

消费者能力不足造成积压(考虑扩充消费者数量)

下游数据处理不及时导致数据积压,提升每批次拉取数据的量

相关参数

  • fetch.max.bytes 默认 Default: 52428800(50 m)。消费者获取服务器端一批 消息最大的字节数。如果服务器端一批次的数据大于该值 (50m)仍然可以拉取回来这批数据,因此,这不是一个绝 对最大值。一批次的大小受
  • message.max.bytes (broker config)or max.message.bytes (topic config)影响。 max.poll.records 一次 poll 拉取数据返回消息的最大条数,默认是 500 条

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/7048.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

git 提示 不能合并

今天A分支合并B分支&#xff0c;提示“不能合并” 最终发现&#xff0c;是另一个分支的版本落后导致&#xff0c;但是git并未提示出来 有遇到这种问题可以先检查下版本

如何有效跟踪你的计费时间?

对于自由职业者、小型企业和远程团队来说&#xff0c;时间跟踪是必需的。了解自己在单个项目或任务上投入了多少时间&#xff0c;可以帮助他们有效管理资源和优化工作流程。 然而&#xff0c;在向客户收费时&#xff0c;时间跟踪多了一层复杂性&#xff1a;不仅需要跟踪所花费…

Linux工具——vim

安装vim yum -y install vim 如果安装失败&#xff0c;提示Could not resolve host:mirrorlist.centos.org: Unkown error的问题&#xff0c;需要替换yum源&#xff0c;可以参考这个文章 配置vim root的vim配置文件在 /etc/vimrc 普通用户的vim配置文件在用户对应家目录下&a…

react实现页面动态表单设计器(自定义推拽表单)

react实现页面动态表单设计器&#xff08;自定义推拽表单&#xff09; 实现效果安装插件使用组件介绍基本设置&#xff0c;可设置控件标签&#xff0c;是否必填&#xff0c;校验规则校验规则有如下几种多选&#xff0c;下拉&#xff0c;单选可动态设置每个选择的label以及值 实…

一百三十三、Hive——Hive外部表加载含有JSON格式字段的CSV文件数据

一、目标 在Hive的ODS层建外部表&#xff0c;然后加载HDFS中的CSV文件数据 注意&#xff1a;CSV文件中含有未解析的JSON格式的字段数据&#xff0c;并且JSON字段中还有逗号 二、第一次建外部表&#xff0c;直接以&#xff0c;分隔行字段&#xff0c;结果JSON数据只显示一部分…

【1++的C++初阶】之list

&#x1f44d;作者主页&#xff1a;进击的1 &#x1f929; 专栏链接&#xff1a;【1的C初阶】 文章目录 一&#xff0c;什么是list二&#xff0c;构造与析构2.1 结点结构2.2 链表结构2.3 迭代器结构 三&#xff0c;部分重要接口的作用及其实现3.1 迭代器相关的接口3.2 list相关…

【VCS】(5)Fast RTL-level Verification

Fast RTL-level Verification General Coding GuidlinesLab --- simprofile$display() 输出彩色内容 前面的内容都是在说怎样进行仿真和验证&#xff0c;即如何使用 VCS 。 但是&#xff0c;仿真和验证是不是也有所讲究&#xff1f; 有没有一些标准来衡量设计代码和验证代码的质…

62. 不同路径

题目链接&#xff1a;力扣 解法一&#xff1a;动态规划 定义状态&#xff1a;对于m*n的网络&#xff0c;从最后一行到右下角&#xff0c;以及从最后一列到右下角&#xff0c;都只有一条不同路径&#xff1a;一直向右或一直向下&#xff0c;所以可以定义状态&#xff1a;dp[i][…

OpenCV系列__chapter2

这里写目录标题 1 图像加减乘除位运算1.1 加法 img cv2.add(img1, img2)1.2 减法 img cv2.subtract(img1, img2)1.3 乘法 img cv2.multiply(img1, img2)1.4 除法 img cv2.divide(img1, img2)1.5 位运算 2 图像增强2.1 线性变换2.2 非线性变换 3 图像几何变换3.1 裁剪、放大…

七大排序算法和计数排序

文章目录 一、直接插入排序二、希尔排序三、直接选择排序四、堆排序五、冒泡排序六、快速排序6.1递归实现快速排序6.2非递归实现快速排序 七、归并排序7.1递归实现归并排序7.2非递归实现归并排序 八、计数排序 以下排序以从小到大排序为例 一、直接插入排序 时间复杂度&#x…

文章审核之敏感词过滤

技术选型 DFA实现原理 DFA全称为&#xff1a;Deterministic Finite Automaton,即确定有穷自动机。 存储&#xff1a;一次性的把所有的敏感词存储到了多个map中&#xff0c;就是下图表示这种结构 敏感词&#xff1a;冰毒、大麻、大坏蛋 工具类 最下面的main方法是测试用的&a…

Java版本电子招标采购系统源代码—企业战略布局下的采购寻源

智慧寻源 多策略、多场景寻源&#xff0c;多种看板让寻源过程全程可监控&#xff0c;根据不同采购场景&#xff0c;采取不同寻源策略&#xff0c; 实现采购寻源线上化管控&#xff1b;同时支持公域和私域寻源。 询价比价 全程线上询比价&#xff0c;信息公开透明&#xff0c;可…

微信小程序-地图上的图标计算旋转值朝向经纬度计算

废话不多说&#xff0c;开整 // 参数为寄件人经纬度和收件人经纬度 // 根据寄收件人经纬度弧度π进行rotate旋转计算 const getRotate (po1, po2) > {if (!(po1 && po2)) return 0const lng_a po1.longitudeconst lat_a po1.latitudeconst lng_b po2.longitud…

MySQL使用

目录 1 MySQL的登录 1.1 服务的启动和终止 1.2 自带客户端的登录与退出 2 MySQL演示使用 2.1 MySQL的使用演示 2.2 MySQL的编码设置 1 MySQL的登录 1.1 服务的启动和终止 MySQL安装完毕以后&#xff0c;需要启动服务器进程&#xff0c;不然客户端无法连接数据库。 在前面…

vue-cli项目中,使用webpack-bundle-analyzer进行模块分析,查看各个模块的体积,方便后期代码优化

一、安装 npm install --save-dev webpack-bundle-analyzer 二、在vue.config.js中配置 const BundleAnalyzerPlugin require(webpack-bundle-analyzer).BundleAnalyzerPlugin plugins: [new BundleAnalyzerPlugin({analyzerMode: server,analyzerHost: 127.0.0.1,analyze…

Word2Vec实现文本识别分类

深度学习训练营之使用Word2Vec实现文本识别分类 原文链接环境介绍前言前置工作设置GPU数据查看构建数据迭代器 Word2Vec的调用生成数据批次和迭代器模型训练初始化拆分数据集并进行训练 预测 原文链接 &#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&…

探析国内数字孪生引擎技术现状

在数字孪生软件来发中&#xff0c;渲染引擎是一个关键点&#xff0c;国内大多数字孪生平台引擎通常使用的是自研的渲染引擎或者采用开源的渲染引擎。下面通过一些常见的渲染引擎在国内数字孪生引擎中的应用带大家了解数字孪生软件开发的方式。 自研渲染引擎&#xff1a;许多数…

HTTPS安全套接字层超文本传输协议

HTTPS安全套接字层超文本传输协议 HTTPS简介HTTPS和HTTP的主要区别客户端在使用HTTPS方式与Web服务器通信时的步骤SSL/TLS协议的加密&#xff08;握手&#xff09;过程为什么数据传输阶段使用对称加密HTTPS 的优点HTTPS 的缺点HTTPS 的优化证书优化会话复用 HTTPS简介 HTTP协议…

文件包含漏洞利用思路

简介 通过PHP函数引入文件时&#xff0c;传入的文件名没有经过合理的验证&#xff0c;从而操作了预想之外的文件&#xff0c;导致意外的文件泄漏甚至恶意代码注入。 常见的文件包含函数 php中常见的文件包含函数有以下四种&#xff1a; include()require()include_once()re…

苍穹外卖day05——Redis(被病毒入侵)+店铺营业状态设置

Redis被病毒入侵了 数据删光光然后只剩这四个玩意&#xff0c;乱下东西乱删东西&#xff0c;还好是docker部署&#xff0c;不然就寄了。 在服务器上部署redis记得一定要设置密码&#xff0c;不然被人扫肉鸡注入病毒整个服务器给你崩掉。 使用配置类的方式搭建相关程序 配置数…