Kafka之消费者客户端

1、历史上的二个版本

与生产者客户端一样,在Kafka的发展过程当中,消费者客户端主要有两个大的版本:

  • 旧消费者客户端(Old Consumer):基于Scala语言开发的版本,又称为Scala消费者客户端
  • 新消费者客户端(New Consumer):从Kafka 0.9.0版本之后基于Java语言开发的版本,又称为Java消费者客户端

2、必要的参数配置

  • bootstrap.servers

    用来指定连接Kafka集群所需的broker地址清单,形式为:host1:port1,host2:port2,…,多个broker之间以“,”隔开。

    不用将所有broker列出来,消费者可以根据一个broker查询到其他broker。

    建议至少配置2个或2个以上的broker,防止只有一个broker的话,宕机的时候就无法连接到Kafka集群了。

  • group.id

    消费者隶属消费组的名称。

  • key.deserializer 和 value.deserializer

    与生产者客户端 KafkaProducer中的key.serializer和value.serializer参数对应。

    用来将字节数组中的key和value反序列化还原为原来的对象格式。

3、订阅主题与分区

一个消费者可以订阅一个或多个主题。

Kafka消费者客户端提供了三种订阅方式:集合订阅subscribe(Collection)、正则表达式订阅subscribe(Pattern)、指定分区订阅assign(Collection)。

这三种订阅方式分别代表了三种不同的订阅状态,依次为AUTO_TOPICS、 AUTO_PATTERN、USER_ASSIGNED。如果没有订阅,订阅状态为NONE。

其中的集合订阅subscribe(Collection)和正则表达式订阅subscribe(Pattern)这两种订阅方式有消费者自动再均衡的功能,可以根据分区分配策略自动的为消费者分配对应的分区。而指定分区订阅assign(Collection)方式则不具备消费者自动再均衡的功能。

综上所述梳理了一张关于订阅方式、订阅状态和再均衡功能的关系表:
在这里插入图片描述

4、消费消息

消息消费一般有两种方式:

  • 推模式:服务器主动将消息推送给消费者。
  • 拉模式:消费者主动向服务器发起请求来来取信息。

Kafka采用的消息消费模式是拉模式。

在拉取消息的时候有一个超时时间参数(timeout),如果消费者的缓存区中无可用数据(即没有要消费消息),我们可以通过这个timeout参数来设置等待的时长。如果timeout=0,则不管有无数据立刻返回结果。

5、位移提交

在Kafka的分区当中,每一个消息都有一个唯一的标识offset,我们可以用它来表示消息在分区中的位置。

对于消费者而言,也有一个offset的概念,我们可以用它来表示消费到分区中某消息的位置。

对于offset这个单词,我们既可以翻译为偏移量,也可以翻译为位移,并没有什么严格的区分。但是为了更好的区分不同的使用场景,我们可以将用来表示消息在分区中位置的offset称为偏移量。对于用来表示消费者消费到的消息所处位置的offset称为位移,更明确的话称为“消费位移”

通过下图希望能够帮助大家更清晰的理解:偏移量、消费位移、位移提交。
在这里插入图片描述
通过上图我们可以了解到如下信息:

  1. 正在消费的消息下标为3。
  2. 所以对于分区来说,它的偏移量为3;对于消费者来说,它的消费位移也为3。
  3. 对于分区来说,下标4则作为下一个消息要写入的位置。
  4. 对于消费者来说,将要提交的消费位移(即位移提交)是下标4。

Kafka默认情况下,消费位移的提交方式为自动提交,提交间隔时间默认为5秒。

根据位移提交的具体情况,可能会出现重复消费和消息丢失的现象。我们通过下面一个例子更详细介绍下重复消费和消息丢失是如何出现的。让我们先来看一张图:
在这里插入图片描述
根据上图,我们假设本次拉取的消息为x+2 ~ x+7,x+2为上一次的提交的消费位移,x+8为下一次要提交的消费位移,目前正在处理x+5。

  • 消息丢失

    假设我们在处理x+5之前(即在处理x+0或x+1或x+2…)就提交了本次的消费位移(即x+8),当到处理x+5的时候出现了异常,恢复后,就要从x+8开始拉取了,此时x+5、x+6、x+7实际上并没有被消费,这样便发生了消息丢失的现象。(在消费消息出现异常之前就执行了位移提交)。

  • 重复消费

    假设我们在处理x+5的时候出现了异常,此时还没有提交本次的消费位移(即x+8),恢复后,就还需要从x+2开始拉取消息,这样x+2 ~ x+4就又得再消费一次,这种现象就是重新消费。(在消费消息出现异常之前没有执行位移提交)。

通过以上的描述我们还可以发现:拉取线程和消息处理线程完全是两个独立的线程。

6、指定位移消息

首先提出一个问题:当消费者遇到无法获取所记录的消费位移的时候该怎么办?

为了要解决这个问题,消费者客户端提供了auto.offset.reset参数,用来在遇到这种情况的时候告诉消费者客户端从哪里开始拉取消息消费,该参数的值有几种选择:

  • latest:默认值,意为从分区末尾开始消费消息(即分区中下一条消息要写入的位置)。
  • earliest:意为消费者会从起始处也就是0开始消费。
  • none:直接抛出NoOffsetForPartitionException异常。

7、再均衡

所谓再均衡就是将一个分区的所属权从一个消费者转移到另外一个消费者。

再均衡的过程中,消费组内的消费者无法读取消息。

再均衡后,可能会出现重复消费的情况。因为再均衡的时候,消费者会丢掉当前的状态。如果在上一个消费者(即具有分区所属权的消费者)正在消费消息(已消费了一部分消息了)还没有来得及提交消费位移的时候就发生了再均衡,那么新的消费者(分区所属权转移后的消费者)会重新拉取曾经消费过的消息再消费一遍。

8、消费者拦截器

我们可以通过消费者拦截器在poll返回消息之前消费位移提交之后进行一些特定的处理。

9、多线程实现

为了提高整体的消费能力,我们对消费者客户端采取多线程来实现。

有三种多线程的实现方式:

  1. 线程封闭,即为每一个线程实现一个KafkaConsumer对象,如下图: 在这里插入图片描述
  2. 多个消费线程同时消费一个分区,通过assign()、seek()等方法实现,打破了原有的消费线程的个数不能超过分区个数的限制。但是这种实现方式会使位移提交和顺序控制变得非常负责,实际场景中很少会用到。
  3. 将处理消息的逻辑改为多线程实现,也就是在一个KafkaConsumer对象中有多个处理消息的handler线程,如下图: 在这里插入图片描述
    在这种实现方式中,为了能够正确的完成位移提交,引入了一个共享变量offsets来参与提交,如下图:
    在这里插入图片描述
    基于这种实现方式提供以下两种实现方案:
    • 通过消费者拉取一个批次的消息,然后再将这些消息交给多线程去处理。
    • 基于滑动窗口来实现,将拉取的消息以批次为单位暂存起来,多个消费线程拉取暂存的消息消费,如下图: 在这里插入图片描述
      窗口滑动过程描述:上一次滑动窗口的范围是2 ~ 5,startOffset为2,当2中的消息都被消费完成后,提交2中的消费位移,窗口向前滑动一格,范围变为3 ~ 6,startOffset变为3。

上一篇:Kafka之消费组与消费者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/58087.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

蚁剑连接本地木马文件报错

项目场景: 本地搭建php和蚁剑环境,连接本地木马文件ma.php 问题描述 使用蚁剑连接localhost时报错 错误{ "address":"127.0.0.1" "code":"ECONNREFUSED", "errno":"ECONNREFUSED", &qu…

【JVM】——JVM运行机制、类加载机制、内存划分

阿华代码,不是逆风,就是我疯 你们的点赞收藏是我前进最大的动力!! 希望本文内容能够帮助到你!! 目录 一:JVM引入 1:编程语言 2:JAVA运行机制 二:JVM中内存…

1U服务器和Hyper-V虚拟机使用记录

记录最近接触服务器和虚拟机的一些使用操作知识 背景:1U服务器上架使用,备份其他服务器vm虚拟机,Hyper-V管理虚拟机使用测试 设备:IBM3550服务器交换机, 移动硬盘:附加存储盘, u盘1&#xff1…

Openshift上使用Elasticsearch (ECK) Operator部署ES

部署 7.16.2 版本 Elasticsearch (ECK) Operator部署ES oc new-project middleware-elasticsearchapiVersion: elasticsearch.k8s.elastic.co/v1 kind: Elasticsearch metadata:name: es-testnamespace: middleware-elasticsearch spec:http:tls:selfSignedCertificate:disab…

go高并发之路——本地缓存

一、使用场景 试想一个场景,有一个配置服务系统,里面存储着各种各样的配置,比如直播间的直播信息、点赞、签到、红包、带货等等。这些配置信息有两个特点: 1、并发量可能会特别特别大,试想一下,一个几十万…

Anchor DETR:Transformer-Based目标检测的Query设计

写在前面 文中指出之前DETR-like算法存在以下问题: 之前DETR-liked检测算法里,object query是一组可学习的嵌入表示(就是一组256-d的向量),缺乏明确的物理意义,不能解释它们会关注什么地方。每个object q…

探索现代软件开发中的持续集成与持续交付(CI/CD)实践

探索现代软件开发中的持续集成与持续交付(CI/CD)实践 随着软件开发的飞速进步,现代开发团队已经从传统的开发模式向更加自动化和灵活的开发流程转变。持续集成(CI) 与 持续交付(CD) 成为当下主…

【SSM-Day5】SpringMVC

【SSM-Day5】SpringMVC Web->Servlet->Servlet容器MVC档案SpringMVC档案SpringMVC核心操作📢建立连接RequestMapping:实现路由映射Controller/ResponseBody:表示Spring某个类是否可以接收HTTP请求 📢接收请求1. 直接接收一个…

【skywalking 】选择Elasticsearch存储

介绍 skywalking支持 Elasticsearch 和 OpenSearch 作为存储。 OpenSearch 是 ElasticSearch 7.11 的一个分支,但在 Apache 2.0 中获得许可。 OpenSearch 存储与 ElasticSearch 共享相同的配置。为了激活 OpenSearch 作为存储,请将存储提供程序设置为e…

MySQL中的Redo Log、Undo Log和Binlog:深入解析

引言 在数据库管理系统中,日志是保障数据一致性和完整性的关键机制。MySQL作为一种广泛使用的关系型数据库管理系统,提供了多种日志类型来满足不同的需求。本文将详细介绍MySQL中的Redo Log、Undo Log和Binlog,从背景、业务场景、功能、底层…

【QT】Qt窗口(上)

个人主页~ Qt窗口 一、菜单栏二、工具栏三、状态栏四、浮动窗口 Qt窗口是通过QMainWindow类来实现的,我们之前的学习是通过QWidget类实现的 QMainWindow包含一个菜单栏Menu Bar②,多个工具栏Tool Bars③,多个浮动窗口Dock Widgets&#xff0c…

CISC(Complex Instruction Set Computer)和RISC(Reduced Instruction Set Computer)

CISC(Complex Instruction Set Computer)和RISC(Reduced Instruction Set Computer)是两种不同类型的指令集架构(ISA),它们在设计理念、指令复杂性、寻址方式、实现方式以及应用场景上存在显著差…

关闭windows更新方法

在windows更新里选择暂停windows更新 然后按下winr,输入regedit 在注册表里找到 计算机\HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\WindowsUpdate\UX\Settings\PauseUpdatesExpiryTime 修改时间即可

什么是事件冒泡?如何阻止事件冒泡和浏览器默认事件?

事件冒泡是浏览器事件处理模型中的一个重要概念。当一个事件发生在某个元素上时,它会首先在该元素上触发,然后逐层向上冒泡到其父元素,直到根元素(通常是 document)为止。这意味着如果在一个嵌套的元素上触发了事件&am…

【数据分享】中国汽车市场年鉴(2013-2023)

数据介绍 在这十年里,中国自主品牌汽车迅速崛起。吉利、长城、比亚迪等品牌不断推出具有竞争力的车型,在国内市场乃至全球市场都占据了一席之地。同时,新能源汽车的发展更是如日中天。随着环保意识的提高和政策的大力支持,电动汽车…

PCL库中的算法封装详解

摘要 Point Cloud Library(PCL)是一个广泛应用于三维点云处理的开源库,涵盖了从基础数据结构到高级算法的丰富功能。PCL通过面向对象的设计和模块化的架构,将各种算法封装成独立的类,使得用户能够方便地调用和组合这些…

第十八届联合国世界旅游组织/亚太旅游协会旅游趋势与展望大会在广西桂林开幕

10月19日,第十八届联合国世界旅游组织/亚太旅游协会旅游趋势与展望大会(以下简称“大会”)在广西桂林开幕,来自美国、英国、德国、俄罗斯、柬埔寨等25个国家约120名政府官员、专家学者和旅游业界精英齐聚一堂,围绕“亚洲及太平洋地区旅游业&a…

Git 创建SSH秘钥

1、命令行输入 ssh-keygen -t rsa -b 4096 2、系统提示你“Enter a file in which to save the key”,直接按回车键 3、再提示你输入密码的时候直接按回车键,创建没有密码的SSH密钥 4、密钥对创建后,可以在自己电脑对应的 ~/.ssh 目录下找到…

java的String方法

lastIndexOf() 源码: public int lastIndexOf(String str) {return lastIndexOf(str, length());} lastIndexOf(String str):用于在一个字符串中查找指定字符最后一次出现的位置 subString() 源码: public String substring(int beginIn…

数据库如何保证主键唯一性

数据库保证主键(Primary Key)的唯一性主要通过以下机制实现: 1. **主键约束(PRIMARY KEY Constraint)**: 这是保证主键唯一性的核心机制。在数据库表中,通过定义主键约束,可以确…