Apche Kafka + Spring的消息监听容器

目录

  • 一、消息的接收
    • 1.1、消息监听器
  • 二、消息监听容器
    • 2.1、 实现方法
      • 2.1.1、KafkaMessageListenerContainer
        • 2.1.1.1、 基本概念
        • 2.1.1.2、如何使用 KafkaMessageListenerContainer
      • 2.1.2、ConcurrentMessageListenerContainer
    • 三、偏移
  • 四、监听器容器自动启动

一、消息的接收

消息的接收:可以通过配置MessageListenerContainer并提供消息侦听器或使用@KafkaListener注释来接收消息。本章我们主要说明通过配置MessageListenerContainer并提供消息侦听器的方式接收消息。

1.1、消息监听器

当使用消息监听容器时,就必须提供一个监听器来接收数据。目前有八个支持消息侦听器的接口:

public interface MessageListener<K, V> { // 当使用自动提交或容器管理的提交方法之一时,使用此接口处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。void onMessage(ConsumerRecord<K, V> data);
}public interface AcknowledgingMessageListener<K, V> { // 当使用手动提交方法之一时,使用此接口处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { // 当使用自动提交或容器管理的提交方法之一时,使用此接口处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。提供对 Consumer 对象的访问。void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);}public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { //当使用手动提交方法之一时,使用此接口处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。提供对 Consumer 对象的访问。void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);}public interface BatchMessageListener<K, V> { //当使用自动提交或容器管理的提交方法之一时,使用此接口处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。使用此接口时不支持 AckMode.RECORD,因为侦听器会获得完整的批次。void onMessage(List<ConsumerRecord<K, V>> data);}public interface BatchAcknowledgingMessageListener<K, V> { // 当使用手动提交方法之一时,使用此接口处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);}public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { // 当使用自动提交或容器管理的提交方法之一时,使用此接口处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。使用此接口时不支持 AckMode.RECORD,因为侦听器会获得完整的批次。提供对 Consumer 对象的访问。void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);}public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { //当使用手动提交方法之一时,使用此接口处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。提供对 Consumer 对象的访问。void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);}

注意:1、 Consumer对象不是线程安全的;2、不应执行任何Consumer<?, ?>影响消费者位置和/或监听器中已提交偏移量的方法;容器需要管理这些信息。

二、消息监听容器

2.1、 实现方法

MessageListenerContainer 提供了两种实现方式 :
1、KafkaMessageListenerContainer,
2、ConcurrentMessageListenerContainer

2.1.1、KafkaMessageListenerContainer

2.1.1.1、 基本概念

KafkaMessageListenerContainer在单个线程上接收来自所有主题或分区的所有消息。委托ConcurrentMessageListenerContainer给一个或多个KafkaMessageListenerContainer实例以提供多线程消费。

  • 从2.2.7版本开始,可以添加一个记录拦截器(RecordInterceptor)监听器容器;它将在调用侦听器之前调用,以允许检查或修改记录。如果拦截器返回 null,则不会调用侦听器。
  • 从版本 2.7 开始,它具有在侦听器退出后(通常或通过抛出异常)调用的附加方法。
  • 批处理拦截器(BatchInterceptor)为批量监听器(Batch Listeners)提供类似的功能。
  • 此外,ConsumerAwareRecordInterceptor(和 BatchInterceptor)提供对 Consumer<?, ?> 的访问。 例如,这可以用于访问拦截器中的消费者指标。
  • CompositeRecordInterceptor and CompositeBatchInterceptor可以调用多个拦截器。
  • 默认情况下,当使用事务时,拦截器在事务启动后被调用。从版本 2.3.4 开始,可以设置侦听器容器的 interceptBeforeTx 属性在事务开始之前调用拦截器。
  • 从版本 2.3.8、2.4.6 开始,当并发大于 1 时 ConcurrentMessageListenerContainer 支持静态成员资格。 group.instance.id 后缀为 -n ,起始n于1。这与增加 session.timeout.ms 的值 一起可用于减少重新平衡事件,例如,当应用程序实例重新启动时。
  • 静态成员资格是指在提高流应用程序、消费者组和其他构建在组再平衡协议之上的应用程序的可用性。再平衡协议依赖组协调器为组成员分配实体 ID。这些生成的 ID 是短暂的,并且会在成员重新启动和重新加入时发生变化。对于基于消费者的应用程序,这种“动态成员资格”可能会导致在管理操作(例如代码部署、配置更新和定期重新启动)期间将大部分任务重新分配给不同的实例。对于大型状态应用程序,洗牌任务在处理之前需要很长时间才能恢复其本地状态,从而导致应用程序部分或完全不可用。受这一观察的启发,Kafka 的组管理协议允许组成员提供持久的实体 ID。根据这些 ID,组成员资格保持不变,因此不会触发重新平衡。

同样的,拦截器中不应该执行任何影响消费者的位置和/或提交的偏移量的方法,容器需要管理这些信息。

如果拦截器改变了记录(通过创建新记录),则topic、partition和offset必须保持不变,以避免意外的副作用,例如记录丢失。

2.1.1.2、如何使用 KafkaMessageListenerContainer

  • KafkaMessageListenerContainer 构造函数

    public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,ContainerProperties containerProperties)
    

    该构造函数接收接收消费者工厂(ConsumerFactory)有关对象中主题和分区以及其他配置的信息。

  • 容器属性(ContainerProperties)包含3个构造函数,下面我们一个一个介绍它们。
    1、以TopicPartitionOffset为参数

    public ContainerProperties(TopicPartitionOffset... topicPartitions)
    

    该构造函数采用一个主题分区偏移量(TopicPartitionOffset)参数数组来显式指示容器要使用哪些分区(使用消费者assign()方法)并带有可选的初始偏移量。默认情况下,正值是绝对偏移量,负值是相对于分区内当前最后一个偏移量。TopicPartitionOffset提供了一个带有附加参数的构造函,boolean如果是true,则在容器启动时相对于该消费者的当前位置初始偏移(正或负)。
    2、以String为参数

    public ContainerProperties(String... topics)
    

    该构造函数采用主题数组,Kafka 根据属性分配分区group.id——在组中分配分区
    3、以Pattern为参数

    public ContainerProperties(Pattern topicPattern)
    

    该构造函数使用正则表达式Pattern来选择主题。

  • 如何将监听器分配给容器
    监听器有了容器也有了,如何将监听器分配给容器呢?。要将 MessageListener 分配给容器,可以在创建 Container 时使用 ContainerProps.setMessageListener 方法:

    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    containerProps.setMessageListener(new MessageListener<Integer, String>() {...
    });
    DefaultKafkaConsumerFactory<Integer, String> cf =new DefaultKafkaConsumerFactory<>(consumerProps());
    KafkaMessageListenerContainer<Integer, String> container =new KafkaMessageListenerContainer<>(cf, containerProps);
    return container;
    

    要注意的是,在创建 DefaultKafkaConsumerFactory 时,使用仅接受上述属性的构造函数意味着从配置中选取键和值反序列化器类。 或者,反序列化器实例可以传递到 DefaultKafkaConsumerFactory 构造函数以获取键和/或值,在这种情况下,所有消费者共享相同的实例。 另一种选择是提供Supplier(从版本2.3开始),它将用于为每个消费者获取单独的Deserializer实例:

     DefaultKafkaConsumerFactory<Integer, CustomValue> cf =new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new      CustomValueDeserializer());KafkaMessageListenerContainer<Integer, String> container =new KafkaMessageListenerContainer<>(cf, containerProps);
    return container;
    

从版本 2.3.5 开始,引入了一个名为authorizationExceptionRetryInterval 的新容器属性。 这会导致容器在从 KafkaConsumer 获取任何 AuthorizationException 后重试获取消息。 例如,当配置的用户被拒绝读取特定主题时,就会发生这种情况。 定义authorizationExceptionRetryInterval应该有助于应用程序在授予适当的权限后立即恢复。

2.1.2、ConcurrentMessageListenerContainer

ConcurrentMessageListenerContainer只有一个构造函数与构造函数类似 KafkaListenerContainer。

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,ContainerProperties containerProperties)

它有一个concurrency属性,这个属性的作用是创建几个 KafkaMessageListenerContainer 实例。例如:container.setConcurrency(3) 创建三个 KafkaMessageListenerContainer 实例。

当监听多个主题时,默认的分区分布可能不是我们所期望的。 例如,如果有 3 个主题,每个主题有 5 个分区,并且我们想要使用 concurrency=15,但是我们只会看到 5 个活动使用者,每个使用者从每个主题分配一个分区,而其他 10 个使用者处于空闲状态。 这是因为默认的 Kafka PartitionAssignor 是 RangeAssignor。 对于这种情况,我们需要考虑使用 RoundRobinAssignor,它将分区分配给所有使用者。 然后,为每个消费者分配一个主题或分区。 我们可以在提供给DefaultKafkaConsumerFactory的属性中设置partition.assignment.strategy消费者属性来更改要更改PartitionAssignor。(ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)。
在springboot中可以这样:
spring.kafka.consumer.properties.partition.assignment.strategy=
org.apache.kafka.clients.consumer.RoundRobinAssignor

当使用 TopicPartitionOffset 配置容器属性时,ConcurrentMessageListenerContainer 会在委托 KafkaMessageListenerContainer 实例之间分发 TopicPartitionOffset 实例。

假设提供了 6 个 TopicPartitionOffset 实例,并发度为 3; 每个容器有两个分区。 对于五个
TopicPartitionOffset 实例,两个容器获得两个分区,第三个容器获得一个分区。 如果并发数大于TopicPartition的数量,则降低并发数,使每个容器获得一个分区。

三、偏移

spring提供了几个偏移选项, 如果 enable.auto.commit 消费者属性为 true,Kafka会根据其配置自动提交偏移量。 如果为 false,则容器支持多种 AckMode 设置。 默认 AckMode 为 BATCH。

从版本 2.3 开始,框架将 enable.auto.commit 设置为 false,除非在配置中明确设置。以前,如果未设置该属性,则使用 Kafka 默认值 (true)。

消费者 poll() 方法返回一个或多个 ConsumerRecord。 为每条记录调用 MessageListener。 以下列表描述了容器对每个 AckMode 采取的操作(当未使用事务时):

  • RECORD:当侦听器处理记录后返回时提交偏移量。

  • BATCH:当 poll() 返回的所有记录都已处理完毕时提交偏移量。

  • TIME:当 poll() 返回的所有记录都处理完毕后,只要超过了自上次提交以来的 ackTime,就提交偏移量。

  • COUNT:当 poll() 返回的所有记录都已处理完毕时,提交偏移量,只要自上次提交以来已收到 ackCount 条记录。

  • COUNT_TIME:与 TIME 和 COUNT 类似,但如果任一条件为真,则执行提交。

  • MANUAL:消息侦听器负责acknowledge() 确认。 之后,应用与 BATCH 相同的语义。

  • MANUAL_IMMEDIATE:当侦听器调用 Acknowledgment.acknowledge() 方法时立即提交偏移量。

使用事务(transactions)时,偏移量将发送到事务,语义相当于 RECORD 或 BATCH,具体取决于侦听器类型(记录或批处理)。MANUAL 和 MANUAL_IMMEDIATE 要求侦听器是 AcknowledgingMessageListener 或 BatchAcknowledgingMessageListener。

根据syncCommits容器属性,使用消费者上的commitSync()或commitAsync()方法。 默认情况下,syncCommits 为 true。

作者个人建议建议设置:ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 为 false。

从版本 2.3 开始,Acknowledgment 接口增加了两个方法 nack(long sleep) 和 nack(int index, long sleep)。 第一个与记录侦听器一起使用,第二个与批处理侦听器一起使用。 为侦听器类型调用错误的方法将引发 IllegalStateException。在此之前他是这样:

public interface Acknowledgment {void acknowledge();}
  • 如果要提交部分批次,使用 nack()。
  • 使用事务时,将 AckMode 设置为 MANUAL;
  • 调用 nack() 会将成功处理的记录的偏移量发送到事务。
  • nack() 只能在调用侦听器的消费者线程上调用。
  • 当调用 nack() 时,将提交所有挂起的偏移量,丢弃上次轮询的剩余记录,并在其分区上执行查找,以便在下一次轮询时重新传递失败的记录和未处理的记录( )。
  • 通过设置 sleep 参数,消费者线程可以在重新交付之前暂停。 这与在容器配置了 SeekToCurrentErrorHandler 时抛出异常的功能类似。

当通过组管理使用分区分配时,确保 sleep 参数(加上处理先前轮询的记录所花费的时间)小于使用者 max.poll.interval.ms属性,这个非常重要

四、监听器容器自动启动

侦听器容器实现 SmartLifecycle,并且 autoStartup 默认为 true。 容器在后期启动 (Integer.MAX-VALUE - 100)。 实现 SmartLifecycle 来处理来自侦听器的数据的其他组件应在早期阶段启动。 -100 为后续阶段留出了空间,使组件能够在容器之后自动启动。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/40023.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【机器学习】sklearn数据集的使用,数据集的获取和划分

「作者主页」&#xff1a;士别三日wyx 「作者简介」&#xff1a;CSDN top100、阿里云博客专家、华为云享专家、网络安全领域优质创作者 「推荐专栏」&#xff1a;对网络安全感兴趣的小伙伴可以关注专栏《网络安全入门到精通》 sklearn数据集 二、安装sklearn二、获取数据集三、…

mac录屏工具,录屏没有声音的解决办法

mac录屏工具&#xff0c;录屏没有声音的解决办法 在使用macbook录制屏幕时&#xff0c;发现自带的录屏工具QuickTime Player没有声音&#xff0c;于是尝试了多款录屏工具&#xff0c;对其做一些经验总结&#xff08;省流&#xff1a;APP Store直接可以免费下载使用Omi录屏专家…

第三课-界面介绍SD-Stable Diffusion 教程

前言 我们已经安装好了SD&#xff0c;这篇文章不介绍难以理解的原理&#xff0c;说使用。以后再介绍原理。 我的想法是&#xff0c;先学会画&#xff0c;然后明白原理&#xff0c;再去提高技术。 我失败过&#xff0c;知道三天打鱼两天晒网的痛苦&#xff0c;和很多人一样试了…

TiDB数据库从入门到精通系列之六:使用 TiCDC 将 TiDB 的数据同步到 Apache Kafka

TiDB数据库从入门到精通系列之六&#xff1a;使用 TiCDC 将 TiDB 的数据同步到 Apache Kafka 一、技术流程二、搭建环境三、创建Kafka changefeed四、写入数据以产生变更日志五、配置 Flink 消费 Kafka 数据 一、技术流程 快速搭建 TiCDC 集群、Kafka 集群和 Flink 集群创建 c…

【网络编程系列】网络编程实战

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kuan 的首页,持续学…

使用Vue.js框架的指令和事件绑定实现一个购物车的页面布局

使用了v-model指令来实现全选/全不选的功能&#xff0c;当全选框被点击时&#xff0c;isAllChecked的值会被改变。使用了v-if指令来判断购物车中是否有商品&#xff0c;如果有商品则渲染商品列表&#xff0c;否则显示购物车为空的提示。使用了v-for指令来遍历datalist数组&…

jvm内存溢出排查(使用idea自带的内存泄漏分析工具)

文章目录 1.确保生成内存溢出文件2.使用idea自带的内存泄漏分析工具3.具体实验一下 1.确保生成内存溢出文件 想分析堆内存溢出&#xff0c;一定在运行jar包时就写上参数-XX:HeapDumpOnOutOfMemoryError&#xff0c;可以看我之前关于如何运行jar包的文章。若你没有写。可以写上…

Keepalived入门指南:实现故障转移和负载均衡

文章目录 一、简介1. Keepalived概述2. 高可用性和负载均衡的重要性 二、故障转移1. 什么是故障转移2. Keepalived的故障转移原理a) VRRP协议b) 虚拟路由器ID和优先级 3. 配置Keepalived实现故障转移a) 主备服务器的设置b) 监控网络接口c) 虚拟IP的配置d) 备份服务器接管流程 三…

Python学习笔记_基础篇(九)_面向对象编程

本篇内容: 1、反射2、面向对象编程3、面向对象三大特性4、类成员5、类成员修饰符6、类的特殊成员7、单例模式 反射 python中的反射功能是由以下四个内置函数提供&#xff1a;hasattr、getattr、setattr、delattr&#xff0c;改四个函数分别用于对对象内部执行&#xff1a;检…

el-form自定义校验规则

Vue 的 el-form 组件可以使用自定义校验规则进行表单验证。自定义校验规则可以通过传递一个函数来实现&#xff0c;该函数接受要校验的字段的值作为参数&#xff0c;并返回一个布尔值或一个 Promise 对象。 下面是一个示例&#xff0c;演示如何在 el-form 中使用自定义校验规则…

若依前端npm run dev启动时报错

本文主要解决问题:若依前端npm run dev启动时报错,解决办法。 目录 1、第1种解决方案(亲测有效) 2、第2种解决方案(亲测有效) Error: error:0308010C:digital envelope routines::unsupportedat new Hash (node:internal/crypto/hash:67:19)at Object.createHash (node…

解决 adb install 错误INSTALL_FAILED_UPDATE_INCOMPATIBLE

最近给游戏出包&#xff0c;平台要求 v1 签名吧&#xff0c;AS 打包后&#xff0c;adb 执行安装到手机&#xff0c;我用的设备是google pixel6 , android 系统 13&#xff0c; 提示如下&#xff1a; adb install -r v5_android_202308161046.apk Performing Streamed Install a…

centos 安装.net 6 sdk

按照以下步骤在 CentOS 上安装 .NET 6 SDK&#xff1a; 更新系统&#xff1a; sudo yum update安装依赖项&#xff1a; sudo yum install -y curl libunwind libicu下载并添加 Microsoft 的软件包存储库密钥&#xff1a; sudo rpm -Uvh https://packages.microsoft.com/config/…

单片机第一季:零基础13——AD和DA转换

1&#xff0c;AD转换基本概念 51 单片机系统内部运算时用的全部是数字量&#xff0c;即0 和1&#xff0c;因此对单片机系统而言&#xff0c;无法直接操作模拟量&#xff0c;必须将模拟量转换成数字量。所谓数字量&#xff0c;就是用一系列0 和1 组成的二进制代码表示某个信号大…

Linux -- 进阶 Autofs自动挂载服务 实验详解

服务端创建共享目录&#xff0c; 客户端实现自动挂载 第一步 &#xff1a; 客户端&#xff0c;服务端 均关闭安全软件 [rootserver ~]# setenforce 0 [rootserver ~]# systemctl stop firewalld [rootnode1 ~]# setenforce 0 [rootnode1 ~]# systemctl stop firewalld 第二…

在K8s上处理nginx

基本说明 创建一个名为ssl的TLS类型的Secret对象&#xff0c;用于存储证书和密钥信息。 kubectl create secret tls ssl --certserver.crt --keyserver.key配置Nginx的events块&#xff0c;设置worker连接数为1024。 events {worker_connections 1024; }配置Nginx的http块&a…

MyBaits(单独使用,与整合无关)小白版

文章目录 概述比较配置写xml加载上面配置并执行加载配置的方法方式一 执行方法方式一方式二(MyBatis映射器) 写配置文件的映射文件设置对象的别名&#xff08;简写&#xff09;获取自动生成的主键 查询结果和java的映射规则基本类型映射&#xff1a;简单对象映射&#xff1a;嵌…

加盐加密算法

MD5加密加盐加密项目密码升级 MD5加密 MD5一系列公式进行复杂数学运算&#xff1b;特点&#xff1a;&#xff08;用途校验和、计算hash值方式、加密&#xff09; 1&#xff1a;定长&#xff1b;无论原始数据多长&#xff1b;算出的结果都是4或者8字节的版本。 2&#xff1a;冲…

Java多线程实战

Java多线程实战 java多线程&#xff08;超详细&#xff09; java自定义线程池总结 Java创建线程方式 方法1&#xff0c;继承Thread类 方法2&#xff0c;实现Runable接口 方法2-2&#xff0c;匿名内部类形式lambda表达式 方法3&#xff0c;实现Callable接口&#xff0c;允许…

【深入理解Linux内核锁】三、原子操作

我的圈子: 高级工程师聚集地 我是董哥,高级嵌入式软件开发工程师,从事嵌入式Linux驱动开发和系统开发,曾就职于世界500强企业! 创作理念:专注分享高质量嵌入式文章,让大家读有所得! 文章目录 1、原子操作思想2、整型变量原子操作2.1 API接口2.2 API实现2.2.1 原子变量结…