Spring Kafka常用配置详解

目录

    • 前言
    • producer配置
    • consumer 配置
    • listener 配置

前言

在 Spring Kafka 中,主要的配置分为三大块,分别是producer、consumer、listener,下面我们就按模块介绍各个模块的常用配置

producer配置

在 Spring Kafka 中,spring.kafka.producer 用于配置 Kafka 生产者相关属性。下面是一些常用的 spring.kafka.producer 配置项的详解:

spring.kafka.producer.bootstrap-servers: 指定 Kafka 服务器的地址列表,格式为 host:port,多个地址使用逗号分隔。

spring.kafka.producer.key-serializer: 用于配置 Kafka 生产者发送消息中键(key)的序列化器。可以是字符串形式的完全限定类名,也可以是一个实现 org.apache.kafka.common.serialization.Serializer 接口的自定义序列化器类。

常见的键序列化器包括:

org.apache.kafka.common.serialization.StringSerializer:将键对象作为字符串进行序列化。
org.apache.kafka.common.serialization.IntegerSerializer:将键对象作为整数进行序列化。
org.apache.kafka.common.serialization.ByteArraySerializer:将键对象直接作为字节数组进行序列化。
自定义的序列化器:根据自己的需求实现键的序列化逻辑。

在 Kafka 中,每条消息都由一个键(key)和一个值(value)组成。键是一个可选的、用于标识消息的数据,而值则是实际的消息内容。在发送消息时,Kafka 生产者需要将键和值进行序列化,以便能够在网络上传输和存储到 Kafka 服务器。

spring.kafka.producer.value-serializer: 用于配置 Kafka 生产者发送消息中值(value)的序列化器类,用法同上。

spring.kafka.producer.acks:生产者发送消息的确认模式。可选的值有 “0”(不需要任何确认)、“1”(只需要 Leader 确认)和 “all”(需要 Leader 和所有副本确认)。

spring.kafka.producer.retries:配置生产者在发生错误时的重试次数。

spring.kafka.producer.retry-backoff-ms:配置重试之间的延迟时间(默认为 100 毫秒)。重试的间隔时间会随着重试次数的增加而指数级增长,以避免过度负载和大量的重复请求。

spring.kafka.producer.batch-size:配置每个批次中包含的消息大小。当应用程序使用 Kafka 生产者发送消息时,发送单个消息可能会带来一些性能开销。为了减少这种开销,可以将多个消息进行批量发送。spring.kafka.producer.batch-size 参数就是用来指定每个批次中包含的消息大小。

spring.kafka.producer.buffer-memory:用于配置 Kafka 生产者的缓冲区内存大小的属性,Kafka 生产者在发送消息时,不会立即将消息发送到服务器,而是先将消息缓存在生产者的缓冲区中。当缓冲区中的消息达到一定大小或达到一定时间限制时,生产者才会批量地将消息发送到 Kafka 服务器。
该参数的单位是字节,默认值是 33554432 字节(32MB)。

spring.kafka.producer.client-id:配置生产者的客户端 ID,如果你没有显式地设置该属性,则 Kafka 生产者会自动生成一个随机的客户端 ID。使用自定义的客户端 ID 可以帮助你更好地追踪和监控不同的生产者实例

spring.kafka.producer.compression-type:指定生产者使用的消息压缩类型

常见的压缩类型包括:
none:表示不使用压缩,消息以原始的形式发送。
gzip:表示使用 GZIP 压缩算法对消息内容进行压缩。
snappy:表示使用 Snappy 压缩算法对消息内容进行压缩。
lz4:表示使用 LZ4 压缩算法对消息内容进行压缩。

spring.kafka.producer.enable-idempotence:启用生产者的幂等性,确保消息的唯一性和顺序性。

在消息系统中,幂等性是指多次执行同一个操作所产生的影响与执行一次操作的影响相同。而在 Kafka 中,启用幂等性可以确保生产者发送的消息具有幂等性特性,即无论发送多少次相同的消息,最终的影响都是一样的。

启用幂等性可以提供以下好处:

1、消息去重:当生产者发送重复的消息时,Kafka 会自动去重,保证只有一条消息被写入。
2、顺序保证:Kafka 会确保相同键的消息按照发送顺序进行处理,保证消息的顺序性。
3、提高可靠性:当发生网络故障或生产者重试时,启用幂等性可以确保消息不会被重复发送,避免出现重复消费的问题。

需要注意的是,启用幂等性会对性能产生一定的影响,因为 Kafka 生产者会为每个分区维护序列号和重试缓冲区。因此,在性能和可靠性之间需要进行权衡,根据具体的业务需求来决定是否启用幂等性。

spring.kafka.producer.max-in-flight-requests-per-connection:指定在单个连接上允许的未完成请求的最大数目。

consumer 配置

pring.kafka.consumer 用于配置 Kafka 消费者相关属性,下面是一些常见的 spring.kafka.consumer 配置属性及其作用:

spring.kafka.consumer.bootstrap-servers:指定 Kafka 服务器的地址列表,格式为 host:port,多个地址使用逗号分隔。

spring.kafka.consumer.group-id:指定消费者所属的消费组的唯一标识符。

在 Kafka 中,每个消费者都必须加入一个消费组(Consumer Group)才能进行消息的消费。消费组的作用在于协调多个消费者对消息的处理,以实现负载均衡和容错机制。

具体来说,spring.kafka.consumer.group-id 的作用包括以下几点:

消费者协调:Kafka 会根据 group-id 将不同的消费者分配到不同的消费组中,不同的消费组之间相互独立。消费组内的消费者协调工作由 Kafka 服务器自动完成,确保消息在消费组内得到均匀地分发。

负载均衡:当多个消费者加入同一个消费组时,Kafka 会自动对订阅的主题进行分区分配,以实现消费者之间的负载均衡。每个分区只会分配给消费组内的一个消费者进行处理,从而实现并行处理和提高整体的消息处理能力。

容错机制:在消费组内,如果某个消费者出现故障或者新的消费者加入,Kafka 会自动重新平衡分区的分配,确保各个分区的消息能够被有效地消费。

需要注意的是,同一个消费组内的消费者共享消费位移(offset),即每个分区的消息只会被消费组内的一个消费者处理。因此,同一个主题下的不同消费组是相互独立的,不会进行负载均衡和消费位移的共享。

spring.kafka.consumer.key-deserializer:指定键(key)的反序列化器。将从 Kafka 中读取的键字节流反序列化为对象。

spring.kafka.consumer.value-deserializer:指定值(value)的反序列化器。将从 Kafka 中读取的值字节流反序列化为对象。

spring.kafka.consumer.enable-auto-commit:指定是否开启自动提交消费位移(offset)的功能。设置为 true 则开启自动提交,设置为 false 则需要手动调用 Acknowledgment 接口的 acknowledge() 方法进行位移提交。

spring.kafka.consumer.auto-commit-interval:当开启自动提交时,指定自动提交的间隔时间(以毫秒为单位)。

spring.kafka.consumer.auto-offset-reset:指定当消费者加入一个新的消费组或者偏移量无效时的重置策略。常见的取值有 earliest(从最早的偏移量开始消费)和 latest(从最新的偏移量开始消费)。

spring.kafka.consumer.auto-offset-reset 属性有以下几种取值:

latest:表示从当前分区的最新位置开始消费,即只消费从启动之后生产的消息,不消费历史消息。
earliest:表示从该分区的最早位置开始消费,即包含历史消息和当前的消息。
none:表示如果没有找到先前的消费者偏移量,则抛出异常。

需要注意的是,spring.kafka.consumer.auto-offset-reset 的默认值是 latest,如果不设置该属性,则新加入消费组的消费者将从该主题的最新位置开始消费。

spring.kafka.consumer.max-poll-records:指定每次拉取的最大记录数。用于控制每次消费者向服务器拉取数据的数量,默认为 500。

listener 配置

在 Spring 中,是使用 Kafka 监听器来进行消息消费的,spring.kafka.listener用来配置监听器的相关配置,以下是一些常见的 spring.kafka.listener 相关配置及作用:

spring.kafka.listener.concurrency:指定监听器容器中并发消费者的数量。默认值为 1。通过设置并发消费者的数量,可以实现多个消费者同时处理消息,提高消息处理的吞吐量。

spring.kafka.listener .autoStartup:指定容器是否在启动时自动启动。默认值为 true。可以通过设置为 false 来在应用程序启动后手动启动容器。

spring.kafka.listener.clientIdPrefix:指定用于创建消费者的客户端 ID 的前缀。默认值为 “spring”.

spring.kafka.listener .ackMode:指定消息确认模式,包括 RECORD、BATCH 和 MANUAL_IMMEDIATE等。可根据需求选择不同的确认模式,用于控制消息的确认方式。

spring.kafka.listener.ackCount:当ackMode为"COUNT”或者"COUNT_TIME"时,处理多少个消息后才进行消息确认。

ackMode的详细介绍可以看我上一篇文章: ackMode详解

spring.kafka.listener.missing-topics-fatal:配置当消费者订阅的主题不存在时的行为

当将 spring.kafka.listener.missing-topics-fatal 设置为 true 时,如果消费者订阅的主题在 Kafka 中不存在,应用程序会立即失败并抛出异常,阻止消费者启动。这意味着应用程序必须依赖于确保所有订阅的主题都存在,否则应用程序将无法正常运行。

当将 spring.kafka.listener.missing-topics-fatal 设置为 false 时,如果消费者订阅的主题在 Kafka 中不存在,应用程序将继续启动并等待主题出现。一旦主题出现,消费者将开始正常地消费消息。这种情况下,应用程序需要能够处理主题缺失的情况,并在主题出现后自动适应。

默认情况下,spring.kafka.listener.missing-topics-fatal 属性的值为 false,这意味着如果消费者订阅的主题不存在,应用程序将会等待主题出现而不会立刻失败。

spring.kafka.listener.syncCommits:指定是否在关闭容器时同步提交偏移量。默认值为 false。可以通过设置为 true 来确保在关闭容器时同步提交偏移量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/206235.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

小红书蒲公英平台开通后,有哪些注意的地方,以及如何进行报价?

今天来给大家聊聊当小红书账号过1000粉后,开通蒲公英需要注意的事项。 蒲公英平台是小红书APP中的一个专为内容创作者设计的平台。它为品牌和创作者提供了一个完整的服务流程,包括内容的创作、推广、互动以及转换等多个方面。 2.蒲公英平台的主要功能 &…

【C语言】vfprintf函数

vfprintf 是 C 语言中的一个函数,它是 fprintf 函数的变体,用于格式化输出到文件中。vfprintf 函数接受一个格式化字符串和一个指向可变参数列表的指针,这个列表通常是通过 va_list 类型来传递的。vfprintf 函数的主要用途是在需要处理不定数…

NGINX安装升级

nginx介绍 Nginx的版本分为开发版、稳定版和过期版,nginx以功能丰富著称,它即可以作为http服务器,也可以作为反向代理服务器或者邮件服务器,也可以作为反向代理服务器或者邮件服务器,能够快速的响应静态网页的请求&am…

vue+django 开发环境跨域前后端联调配置

vue环境是127.0.0.1:8080,django环境是127.0.0.1:8000 要解决url相对路径和Axios跨域权限问题。 注意:程序发起了一个 POST 请求,但请求的 URL 没有以斜杠结尾。Django 默认设置是无法执行重定向到带斜杠 URL的。例如:url http:/…

远传智能水表一般应用于哪些场景?

远传智能水表是一种在水表领域应用广泛的创新技术,它利用物联网和无线通信技术使水表具备了远程监测和数据传输的能力。这种智能水表的应用场景多种多样,可适用于各个领域和环境。那么,远传智能水表一般应用于哪些场景呢? 首先&am…

ElasticSearch之Refresh API

使用本方法,显式的执行refresh操作。 默认情况下,ElasticSearch启动后台任务,周期性执行refresh操作,周期使用参数index.refresh_interval控制。 本方法触发的refresh为同步操作,运行完毕之后才会返回任务的执行结果。…

CPU密集型和IO密集型任务

1. CPU密集型任务 1.1 定义 CPU密集型任务是指在任务执行过程中,主要由计算操作占用大部分时间,而不是等待外部资源的任务类型。 1.2 特点 计算密集性: 需要大量的数学运算、逻辑判断和数据处理。高CPU利用率: 任务执行期间&a…

9.关于Java的程序设计-基于Springboot的家政平台管理系统设计与实现

摘要 随着社会的进步和生活水平的提高,家政服务作为一种重要的生活服务方式逐渐受到人们的关注。本研究基于Spring Boot框架,设计并实现了一种家政平台管理系统,旨在提供一个便捷高效的家政服务管理解决方案。系统涵盖了用户注册登录、家政服…

mybatis数据输出-map类型输出

1、建库建表 create table emp (empNo varchar(10) null,empName varchar(100) null,sal int null,deptno varchar(10) null ); 2、pom.xml <dependencies><dependency><groupId>org.mybatis</groupId><artifactId>mybatis<…

氧化性低密度脂蛋白抗体原料——博迈伦生物

氧化性低密度脂蛋白抗体原料——博迈伦生物 引言 氧化性低密度脂蛋白&#xff08;oxLDL&#xff09;是动脉粥样硬化等心血管疾病的关键因素之一。抗体作为诊断和研究工具&#xff0c;在oxLDL的检测和相关疾病的研究中发挥着重要作用。本文将深入探讨氧化性低密度脂蛋白抗体原料…

Elasticsearch 8.9 flush刷新缓存中的数据到磁盘源码

一、相关API的handler1、接收HTTP请求的hander2、每一个数据节点(node)执行分片刷新的action是TransportShardFlushAction 二、对indexShard执行刷新请求1、首先获取读锁&#xff0c;再获取刷新锁&#xff0c;如果获取不到根据参数决定是否直接返回还是等待2、在刷新之后transl…

芯知识 | 如何选择合适的单片机语音芯片?

随着科技的飞速发展&#xff0c;单片机语音芯片已经广泛应用于各个领域。然而&#xff0c;在众多的芯片产品中&#xff0c;如何选择合适的单片机语音芯片成为了一个重要的问题。本文将为您提供一些建议&#xff0c;助您找到最适合您需求的单片机语音芯片。 一、明确需求 在选…

Android Audio实战——音频链路分析(二十五)

在 Android 系统的开发过程当中,音频异常问题通常有如下几类:无声、调节不了声音、爆音、声音卡顿和声音效果异常(忽大忽小,低音缺失等)等。尤其声音效果这部分问题通常从日志上信息量较少,相对难定位根因。想要分析此类问题,便需要对声音传输链路有一定的了解,能够在链…

AI发展下服务器的选择非常重要

在AI发展下&#xff0c;服务器的选择非常重要。以下是一些选择服务器时需要考虑的因素&#xff1a; 1. 计算能力&#xff1a;AI需要大量的计算资源来进行训练和推理。因此&#xff0c;选择具有强大计算能力的服务器是至关重要的。 2. 内存容量&#xff1a;AI需要大量的内存来…

cryptojs加密和java解密:AES算法

试了一下午终于跑通了&#xff0c;一开始尝试RC4算法生成的密文在java中解密不出来&#xff0c;放弃了&#xff0c;改用AES。 js代码 import aes from crypto-js/aes; import base from crypto-js/enc-base64;function encrypt(plaintext: string) {const iv base.parse(ZGY…

【论文解读】:大模型免微调的上下文对齐方法

本文通过对alignmenttuning的深入研究揭示了其“表面性质”&#xff0c;即通过监督微调和强化学习调整LLMs的方式可能仅仅影响模型的语言风格&#xff0c;而对模型解码性能的影响相对较小。具体来说&#xff0c;通过分析基础LLMs和alignment-tuned版本在令牌分布上的差异&#…

测试:SSE VS WebSocket

SSE&#xff08;Server-Sent Events&#xff09; SSE&#xff08;Server-Sent Events&#xff09;接口是一种实现服务器到客户端单向实时通信的技术。通过SSE&#xff0c;服务器可以主动向客户端推送数据&#xff0c;而不需要客户端不断地向服务器请求数据。这种技术特别适合于…

100多种视频转场素材|专业胶片,抖动,光效电影转场特效PR效果预设

100多种 Premiere Pro 效果预设&#xff0c;包含&#xff1a;“胶片框架”、“胶片烧录”、“彩色LUT”、“相机抖动”、“电影Vignette”和“胶片颗粒”。非常适合制作复古风格的视频&#xff0c;添加独特的色彩。包括视频教程。 来自PR模板网&#xff1a;https://prmuban.com…

git 本地有改动,远程也有改动,且文件是自动生成的配置文件

在改动过的地方 文件是.lock文件&#xff0c;自动生成的。想切到远程的分支&#xff0c;但是远程的分支也有改动过。这时候就要解决冲突&#xff0c;因为这是两个分支&#xff0c;代码都是不一样的&#xff0c;要先把这改动的代码提交在本地或者提交在本分支的远程才可以切到其…

在Arch Linux上安装yay

有点麻烦。 准备 # pacman -Syu # pacman -S --needed base-devel git 变身为普通用户 不能使用root下载代码。所以要变身为普通用户&#xff1a; # sueradd tsit # su tsit 下载代码 $ git clone https://aur.archlinux.org/yay.git 编译安装 $ cd yay $ makepkg -si…