kafka consumer配置拉取速度慢_Kafka消费者的使用和原理

这周我们学习下消费者,仍然还是先从一个消费者的Hello World学起:

public class Consumer {    public static void main(String[] args) {        // 1. 配置参数        Properties properties = new Properties();        properties.put("key.deserializer",                "org.apache.kafka.common.serialization.StringDeserializer");        properties.put("value.deserializer",                "org.apache.kafka.common.serialization.StringDeserializer");        properties.put("bootstrap.servers", "localhost:9092");        properties.put("group.id", "group.demo");        // 2. 根据参数创建KafkaConsumer实例(消费者)        KafkaConsumer consumer = new KafkaConsumer<>(properties);        // 3. 订阅主题        consumer.subscribe(Collections.singletonList("topic-demo"));        try {            // 4. 轮循消费            while (true) {                ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));                for (ConsumerRecord record : records) {                    System.out.println(record.value());                }            }        } finally {            // 5. 关闭消费者            consumer.close();        }    }}

前两步和生产者类似,配置参数然后根据参数创建实例,区别在于消费者使用的是反序列化器,以及多了一个必填参数group.id,用于指定消费者所属的消费组。关于消费组的概念在《图解Kafka中的基本概念》中介绍过了,消费组使得消费者的消费能力可横向扩展,这次再介绍一个新的概念“再均衡”,其意思是将分区的所属权进行重新分配,发生于消费者中有新的消费者加入或者有消费者宕机的时候。我们先了解再均衡的概念,至于如何再均衡不在此深究。

我们继续看上面的代码,第3步,subscribe订阅期望消费的主题,然后进入第4步,轮循调用poll方法从Kafka服务器拉取消息。给poll方法中传递了一个Duration对象,指定poll方法的超时时长,即当缓存区中没有可消费数据时的阻塞时长,避免轮循过于频繁。poll方法返回的是一个ConsumerRecords对象,其内部对多个分区的ConsumerRecored进行了封装,其结构如下:

public class ConsumerRecords implements Iterable> {        private final Map>> records;    // ...    }

而ConsumerRecord则类似ProducerRecord,封装了消息的相关属性:

public class ConsumerRecord {    private final String topic;  // 主题    private final int partition;  // 分区号    private final long offset;  // 偏移量    private final long timestamp;  // 时间戳    private final TimestampType timestampType;  // 时间戳类型    private final int serializedKeySize;  // key序列化后的大小    private final int serializedValueSize;  // value序列化后的大小    private final Headers headers;  // 消息头部    private final K key;  // 键    private final V value;  // 值    private final Optional leaderEpoch;  // leader的周期号

相比ProdercerRecord的属性更多,其中重点讲下偏移量,偏移量是分区中一条消息的唯一标识。消费者在每次调用poll方法时,则是根据偏移量去分区拉取相应的消息。而当一台消费者宕机时,会发生再均衡,将其负责的分区交给其他消费者处理,这时可以根据偏移量去继续从宕机前消费的位置开始。

186eb45cca4e4eec9f4981883d999062.png

而为了应对消费者宕机情况,偏移量被设计成不存储在消费者的内存中,而是被持久化到一个Kafka的内部主题__consumer_offsets中,在Kafka中,将偏移量存储的操作称作提交。而消费者在每次消费消息时都将会将偏移量进行提交,提交的偏移量为下次消费的位置,例如本次消费的偏移量为x,则提交的是x+1。

2d92c93c956090c42bcc559ab3a537a5.png

在代码中我们并没有看到显示的提交代码,那么Kafka的默认提交方式是什么?默认情况下,消费者会定期以auto_commit_interval_ms(5秒)的频率进行一次自动提交,而提交的动作发生于poll方法里,在进行拉取操作前会先检查是否可以进行偏移量提交,如果可以,则会提交即将拉取的偏移量。

下面我们看下这样一个场景,上次提交的偏移量为2,而当前消费者已经处理了2、3、4号消息,正准备提交5,但却宕机了。当发生再均衡时,其他消费者将继续从已提交的2开始消费,于是发生了重复消费的现象。

12bd84d6f35bffdd86cc77ae58e57299.png

我们可以通过减小自动提交的时间间隔来减小重复消费的窗口大小,但这样仍然无法避免重复消费的发生。

按照线性程序的思维,由于自动提交是延迟提交,即在处理完消息之后进行提交,所以应该不会出现消息丢失的现象,也就是已提交的偏移量会大于正在处理的偏移量。但放在多线程环境中,消息丢失的现象是可能发生的。例如线程A负责调用poll方法拉取消息并放入一个队列中,由线程B负责处理消息。如果线程A已经提交了偏移量5,而线程B还未处理完2、3、4号消息,这时候发生宕机,则将丢失消息。

cfbbeae2c67daf5776415208989f662a.png

从上述场景的描述,我们可以知道自动提交是存在风险的。所以Kafka除了自动提交,还提供了手动提交的方式,可以细分为同步提交异步提交,分别对应了KafkaConsumer中的commitSync和commitAsync方法。我们先尝试使用同步提交修改程序:

while (true) {    ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));    for (ConsumerRecord record : records) {        System.out.println(record.value());    }    consumer.commitSync();;}

在处理完一批消息后,都会提交偏移量,这样能减小重复消费的窗口大小,但是由于是同步提交,所以程序会阻塞等待提交成功后再继续处理下一条消息,这样会限制程序的吞吐量。那我们改为使用异步提交:

while (true) {    ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));    for (ConsumerRecord record : records) {        System.out.println(record.value());    }    consumer.commitAsync();;}

异步提交时,程序将不会阻塞,但异步提交在提交失败时也不会进行重试,所以提交是否成功是无法保证的。因此我们可以组合使用两种提交方式。在轮询中使用异步提交,而当关闭消费者时,再通过同步提交来保证提交成功。

try {    while (true) {        ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));        for (ConsumerRecord record : records) {            System.out.println(record.value());        }        consumer.commitAsync();    }} finally {    try {        consumer.commitSync();    } finally {        consumer.close();    }}

上述介绍的两种无参的提交方式都是提交的poll返回的一个批次的数据。若未来得及提交,也会造成重复消费,如果还想更进一步减少重复消费,可以在for循环中为commitAsync和commitSync传入分区和偏移量,进行更细粒度的提交,例如每1000条消息我们提交一次:

Map currentOffsets = new HashMap<>();int count = 0;while (true) {    ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));    for (ConsumerRecord record : records) {        System.out.println(record.value());        // 偏移量加1        currentOffsets.put(new TopicPartition(record.topic(), record.partition()),                           new OffsetAndMetadata(record.offset() + 1));        if (count % 1000 == 0) {            consumer.commitAsync(currentOffsets, null);        }        count++;    }}

关于提交就介绍到这里。在使用消费者的代理中,我们可以看到poll方法是其中最为核心的方法,能够拉取到我们需要消费的消息。所以接下来,我们一起深入到消费者API的幕后,看看在poll方法中,都发生了什么,其实现如下:

public ConsumerRecords poll(final Duration timeout) {    return poll(time.timer(timeout), true);}

在我们使用设置超时时间的poll方法中,会调用重载方法,第二个参数includeMetadataInTimeout用于标识是否把元数据的获取算在超时时间内,这里传值为true,也就是算入超时时间内。下面再看重载的poll方法的实现:

private ConsumerRecords poll(final Timer timer, final boolean includeMetadataInTimeout) {    // 1. 获取锁并确保消费者没有关闭    acquireAndEnsureOpen();    try {        // 2.记录poll开始        this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());        // 3.检查是否有订阅主题        if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {            throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");        }        do {            // 4.安全的唤醒消费者            client.maybeTriggerWakeup();            // 5.更新偏移量(如果需要的话)            if (includeMetadataInTimeout) {                updateAssignmentMetadataIfNeeded(timer, false);            } else {                while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {                    log.warn("Still waiting for metadata");                }            }            // 6.拉取消息            final Map>> records = pollForFetches(timer);            if (!records.isEmpty()) {                // 7.如果拉取到了消息或者有未处理的请求,由于用户还需要处理未处理的消息                // 所以会再次发起拉取消息的请求(异步),提高效率                if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {                    client.transmitSends();                }                // 8.调用消费者拦截器处理                return this.interceptors.onConsume(new ConsumerRecords<>(records));            }        } while (timer.notExpired());        return ConsumerRecords.empty();    } finally {        // 9.释放锁        release();        // 10.记录poll结束        this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());    }}

我们对上面的代码逐步分析,首先是第1步acquireAndEnsureOpen方法,获取锁并确保消费者没有关闭,其实现如下:

private void acquireAndEnsureOpen() {    acquire();    if (this.closed) {        release();        throw new IllegalStateException("This consumer has already been closed.");    }}

其中acquire方法用于获取锁,为什么这里会要上锁。这是因为KafkaConsumer是线程不安全的,所以需要上锁,确保只有一个线程使用KafkaConsumer拉取消息,其实现如下:

private static final long NO_CURRENT_THREAD = -1L;private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);private final AtomicInteger refcount = new AtomicInteger(0);private void acquire() {    long threadId = Thread.currentThread().getId();    if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))        throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");    refcount.incrementAndGet();}

用一个原子变量currentThread作为锁,通过cas操作获取锁,如果cas失败,即获取锁失败,表示发生了竞争,有多个线程在使用KafkaConsumer,则会抛出ConcurrentModificationException异常,如果cas成功,还会将refcount加一,用于重入。

再看第2、3步,记录poll的开始以及检查是否有订阅主题。然后进入do-while循环,如果没有拉取到消息,将在不超时的情况下一直轮循。

第4步,安全的唤醒消费者,并不是唤醒,而是检查是否有唤醒的风险,如果程序在执行不可中断的方法或是收到中断请求,会抛出异常,这里我还不是很明白,先放一下。

第5步,更新偏移量,就是我们在前文说的在进行拉取操作前会先检查是否可以进行偏移量提交。

第6步,pollForFetches方法拉取消息,其实现如下:

private Map>> pollForFetches(Timer timer) {    long pollTimeout = coordinator == null ? timer.remainingMs() :    Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());    // 1.如果消息已经有了,则立即返回    final Map>> records = fetcher.fetchedRecords();    if (!records.isEmpty()) {        return records;    }    // 2.准备拉取请求    fetcher.sendFetches();    if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {        pollTimeout = retryBackoffMs;    }    Timer pollTimer = time.timer(pollTimeout);    // 3.发送拉取请求    client.poll(pollTimer, () -> {        return !fetcher.hasAvailableFetches();    });    timer.update(pollTimer.currentTimeMs());    // 3.返回消息    return fetcher.fetchedRecords();}

如果fetcher已经有消息了则立即返回,这里和下面将要讲的第7步对应。如果没有消息则使用Fetcher准备拉取请求然后再通过ConsumerNetworkClient发送请求,最后返回消息。

为啥消息会已经有了呢,我们回到poll的第7步,如果拉取到了消息或者有未处理的请求,由于用户还需要处理未处理的消息,这时候可以使用异步的方式发起下一次的拉取消息的请求,将数据提前拉取,减少网络IO的等待时间,提高程序的效率。

第8步,调用消费者拦截器处理,就像KafkaProducer中有ProducerInterceptor,在KafkaConsumer中也有ConsumerInterceptor,用于处理返回的消息,处理完后,再返回给用户。

第9、10步,释放锁和记录poll结束,对应了第1、2步。

对KafkaConsumer的poll方法就分析到这里。最后用一个思维导图回顾下文中较为重要的知识点:

2b77e380fd8dbd8f7b5e69582388de92.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/383065.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前缀和

前缀和 输入一个长度为n的整数序列。 接下来再输入m个询问&#xff0c;每个询问输入一对l, r。 对于每个询问&#xff0c;输出原序列中从第l个数到第r个数的和。 输入格式 第一行包含两个整数n和m。 第二行包含n个整数&#xff0c;表示整数数列。 接下来m行&#xff0c;…

子矩阵的和

题目描述 输入一个n行m列的整数矩阵&#xff0c;再输入q个询问&#xff0c;每个询问包含四个整数x1, y1, x2, y2&#xff0c;表示一个子矩阵的左上角坐标和右下角坐标。 对于每个询问输出子矩阵中所有数的和。 输入格式 第一行包含三个整数n&#xff0c;m&#xff0c;q。 …

jmeter 循环取值赋值给form_JMeter系列(三)逻辑控制器详解

循环控制器&#xff1a;指定迭代次数&#xff0c;可以用具体数字&#xff0c;也可以通过变量控制永远&#xff1a;表示无限循环点击查看示例&#xff1a;Jmeter实例(四)_图片爬虫简单控制器&#xff1a;这是最基础的一个控制器&#xff0c;它可以让脚本分层&#xff0c;变成一个…

c 复杂的前置后置面试题_OPPO Reno拆解:优秀工艺由外而内,复杂用料不负旗舰之名...

OPPO的新系列Reno手机最近吸引了不少注意力&#xff0c;不管是消费者还是手机极客都对其优秀的性能和强大的配置抱有极大的兴趣。最近&#xff0c;知名数码博主爱玩客对Reno十倍变焦版进行了拆解&#xff0c;从内部结构向我们揭示了这部手机的强大之处。并且点评道&#xff1a;…

差分矩阵

题目描述 输入一个n行m列的整数矩阵&#xff0c;再输入q个操作&#xff0c;每个操作包含五个整数x1, y1, x2, y2, c&#xff0c;其中(x1, y1)和(x2, y2)表示一个子矩阵的左上角坐标和右下角坐标。 每个操作都要将选中的子矩阵中的每个元素的值加上c。 请你将进行完所有操作后…

python常用的开发环境包括_Python语言主要包括哪些集成开发环境?_学小易找答案...

【填空题】Python的标准随机数生成器模块是【简答题】Why does critical thinking matter?【简答题】采集瓶子的外形进行创意设计 用点、线、面进行装饰填充 A4纸手绘,构图要有新意,要饱满【简答题】How can a lack of critical thinking cause a loss of personal freedom?【…

最长连续不重复子序列

题目描述 给定一个长度为n的整数序列&#xff0c;请找出最长的不包含重复数字的连续区间&#xff0c;输出它的长度。 输入格式 第一行包含整数n。 第二行包含n个整数&#xff08;均在0~100000范围内&#xff09;&#xff0c;表示整数序列。 输出格式 共一行&#xff0c;包…

ocp跟oce的区别 oracle_Oracle视频10g 11g认证视频教程 OCA/OCP 从入门到精通 数据库DBA...

一、认证Oracle OCP认证(Database 10g Administrator Certified Professional)为Oracle公司的数据库专家的认证。拥有OCP认证说明你拥有了大型Oracle数据库管理的技术能力&#xff0c;具备了成为大型企业核心数据库系统管理员的资格。OCE 1Z0-051&#xff1a;Oracle Database 1…

小爱同学app安卓版_小爱同学app下载-小米小爱同学下载2.9.21安卓版-西西软件下载...

小米小爱同学是小米AI音箱的配套软件&#xff0c;小爱同学是AI音箱的拟人虚拟形象&#xff0c;是一个二次元的萌妹子&#xff0c;如果你购买了小米AI音箱可以通过跟小爱同学交流来让小米智能音箱帮你完成你想要的服务。小爱同学支持海量互联网内容&#xff0c;包括在线音乐&…

python画太极八卦图_先天太极八卦图的唯一正确画法

我们先百度一下先天太极八卦图.↑&#xff0c;看看结果百度出来的图片第一页上半部分&#xff0c;结果非常惊人&#xff0c;40张图片&#xff0c;没有一张是正确的。错误原因分为两大类&#xff1a;1.太极图旋转方向或阴阳鱼所在位置错误 2.八卦中每卦的三爻画法错误1. 先天太极…

函数无法识别_PostgreSQL找不到最佳函数问题解析

最近给项目做支持&#xff0c;由于函数类型问题&#xff0c;加了几条函数定义。用户使用函数场景是func(string, string)。当时给用户添加了一条函数定义&#xff1a;func(text, text)。后来由于和其他函数冲突改成了func(varchar, varchar)。varchar和text同样都是字符串类型&…

Xshell链接不上云服务器的解决方案

1.ssh拒绝请求 先该配置文件 https://blog.csdn.net/u012206617/article/details/83026777?ops_request_misc&request_id&biz_id102&utm_termssh%E6%9C%8D%E5%8A%A1%E5%99%A8%E6%8B%92%E7%BB%9D%E4%BA%86%E5%AF%86%E7%A0%81%20%E8%AF%B7%E5%86%8D%E8%AF%95%E4%B8…

框架controller找不到_SpingBoot框架知识详解

Spring boot框架1、什么是Spring Boot&#xff1f;​ Spring Boot是Spring开源组织下的子项目&#xff0c;是Spring组件一站式解决方案&#xff0c;主要是简化了使用Spring的难度&#xff0c;简省了繁重的配置&#xff0c;提供了各种启动器&#xff0c;开发者能快速上手。Sprin…

架构的演变

基本概念 在介绍架构之前&#xff0c;为了避免部分读者对架构设计中的一些概念不了解&#xff0c;下面对几个最基础的概念进行介绍。 1.什么是分布式&#xff1f; 系统中的多个模块在不同服务器上部署&#xff0c;即可称为分布式系统&#xff0c;如Tomcat和数据库分别部署在…

axure8.0导出页面打不开问题_excel怎么转pdf?excel打不开?转换成PDF就行了

excel转pdf怎么做&#xff1f;年底最后一天了&#xff0c;我都被一堆的Excel文件搞得头疼&#xff0c;在这些时间里&#xff0c;要让我对几个G的文件进行操作&#xff0c;我已经是忙得不可开交&#xff0c;而在最后的最后&#xff0c;我的主管还说他的电脑无法打开我的Excel 了…

质数相关问题

试除法判定质数 题目描述 给定n个正整数ai&#xff0c;判定每个数是否是质数。 输入格式 第一行包含整数n。 接下来n行&#xff0c;每行包含一个正整数ai。 输出格式 共n行&#xff0c;其中第 i 行输出第 i 个正整数ai是否为质数&#xff0c;是则输出“Yes”&#xff0c…

python怎么爬虫理数据_Python神技能 | 使用爬虫获取汽车之家全车型数据

最近想在工作相关的项目上做技术改进&#xff0c;需要全而准的车型数据&#xff0c;寻寻觅觅而不得&#xff0c;所以就只能自己动手丰衣足食&#xff0c;到网上获&#xff08;窃&#xff09;得&#xff08;取&#xff09;数据了。汽车之家是大家公认的数据做的比较好的汽车网站…

linux运算_CentOS「linux」学习笔记22:算术运算符、逻辑运算符、关系运算符

​linux基础操作&#xff1a;主要介绍啦算术运算符、逻辑运算符、关系运算符1.算术运算符[主要用来计算数值]注意使用expr运算时运算符和数值之间需要有空格&#xff0c;其他方式运算时不能有空格。常用算术运算符号&#xff1a;表示相加&#xff0c;&#xff0d;表示相减&…

python实现小型搜索引擎设计_基于JAVA的中小型饭店餐饮管理系统的设计与实现...

好程序设计擅长JAVA(SSM,SSH,SPRINGBOOT)、PYTHON(DJANGO/FLASK)、THINKPHP、C#、安卓、微信小程序、MYSQL、SQLSERVER等&#xff0c;欢迎咨询今天将为大家分析一个中小型饭店餐饮管理系统(俗话说“民以食为天”,中国的饮食文化有着久远的历史。“吃”不仅仅指的是填饱肚子,它早…

评估报告有效期过期了怎么办_托福成绩过期了怎么办?

托福成绩是有期限的&#xff0c;考生申请美国大学的时候也只能在托福成绩有效期内。所以考托福的时候一定要关注一下托福成绩什么时候过期&#xff0c;以及大学申请的截止日期&#xff0c;提前做好安排。下面我们一起看看关于托福成绩有效期的相关问题。托福成绩有效期是多久&a…