Kafka中的fetch-min-size、fetch-max-wait和request.timeout.ms配置

当前kafka的版本为2.8.11,Spring Boot的版本为2.7.6,在pom.xml中引入下述依赖: 

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version>
</dependency>

然后在yml配置文件进行如下配置:

spring:kafka:bootstrap-servers: 127.0.0.1:9092consumer:group-id: 0key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerfetch-max-wait: 25000msfetch-min-size: 10producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

1、fetch-min-size

fetch-min-size表示消费者一次拉取请求中,服务器应该返回的最小数据量,单位是字节,默认值为1个字节。如果服务器没有足够的数据返回,请求会等待。

在Spring Boot与Kafka整合中,yml文件中的fetch-min-size参数对应着kafka的fetch.min.bytes参数。

UTF-8编码下一个字符占用一个字节

2、fetch-max-wait

fetch-max-wait表示消费者的 fetch(拉取)请求发给broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比较长。这个配置就是来配置消费者最多等待response多久。

若是不满足fetch.min.bytes时,该参数就会生效,其值表示等待消费端请求的最长等待时间,默认是500ms。

在项目中创建一个生产者用于往主题 topic0 中投递消息,如下所示:

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaProducer {// 自定义的主题名称public static final String TOPIC_NAME="topic0";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@RequestMapping("/send")public String send(@RequestParam("msg")String msg) {log.info("准备发送消息为:{}",msg);// 1.发送消息ListenableFuture<SendResult<String,String>> future=kafkaTemplate.send(TOPIC_NAME,msg);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable throwable) {// 2.发送失败的处理log.error("生产者 发送消息失败:"+throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, String> stringObjectSendResult) {// 3.发送成功的处理log.info("生产者 发送消息成功:"+stringObjectSendResult.toString());}});return "接口调用成功";}
}

接着再在项目中创建一个消费者用于消息主题 topic0 中的消息,如下所示:

import java.util.Optional;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class KafkaConsumer {// 自定义主题名称,这里要注意的是主题名称中不能包含特殊符号:“.”、“_”public static final String TOPIC_NAME="topic0";@KafkaListener(topics = TOPIC_NAME, groupId = "ONE")public void topic_one(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional message = Optional.ofNullable(record.value());if (message.isPresent()) {Object msg = message.get();log.info("消费者组One消费了消息:Topic:" + topic + ",Record:" + record + ",Message:" + msg);}}
}

项目启动以后,这时控制台中会打印下述信息:

ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [127.0.0.1:9092]
client.id = consumer-ONE-1
enable.auto.commit = false
fetch.max.wait.ms = 25000
fetch.min.bytes = 10
group.id = ONE
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

接着利用 /kafka/send?msg=xxx 接口连续往主题 topic0 中生产 9 条消息,我们会发现 9 条消息发送完了以后消费者并不会立马就可以消费到这些数据,大约在等待 25 秒时长以上才能消费到这些数据。然后后面我们再调用/kafka/send?msg=xxx 接口连续往主题 topic0 中生产多条消息,不管你怎么发送数据,这时消费者都可以立马消费到这些数据。

重启项目利用生产者往主题 topic0 中投递 10 条消息,我们会发现 10 条消息发送完了以后消费者并不会立马就可以消费到这些数据,大约在 25 秒时长以内就能消费到这些数据。接着我们再调用/kafka/send?msg=xxx 接口连续往主题 topic0 中生产多条消息,不管你怎么发送数据,这时消费者也是都可以立马消费到这些数据。

通过上述示例我们可以看出fetch-min-size和fetch-max-wait这两个参数的配置项确实生效了,fetch-min-size值的设置仅仅是在项目启动以后Kafka客户端第一次发起Fetch请求时(不管消费者有没有获取到数据)才会生效,仅生效一次。一旦完成一次Fetch请求后续对于消息的消费都是即时消费。而fetch-max-wait的值是一直会生效的,kefka客户端每发起一次Fetch请求,在服务端没有足够数据返回时,该Fetch请求会处于等待状态,直到达到最长等待时间。

fetch-min-size这是一个调优相关的参数,默认为1字节,表示如果请求的数据量小于1字节,broker就是攒一攒,等足够1字节了再一起返回给Consumer,这个值建议可以适当调大一点,以提高服务的吞吐量。 

如果在fetch.max.wait.ms指定的时间内,数据量依然没有达到fetch.min.bytes所设置的值,那broker也不会再等了,将直接返回数据给Consumer,此值默认为500ms。 

3、request.timeout.ms

这个是生产者和消费者的通用参数配置,该配置控制客户端等待请求响应的最长时间。如果在超时时间过去之后客户端仍然未收到响应,则客户端将在必要时重新发送请求,或者在重试次数用尽时使请求失败,默认值为30秒。

例如将上述yml配置文件修改为如下所示,fetch-max-wait 的值为31秒(超过了30秒):

spring:kafka:bootstrap-servers: 127.0.0.1:9092consumer:group-id: 0key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerfetch-max-wait: 31000msfetch-min-size: 10producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

那么在项目启动以后,不管客户端有没有获取到消息,控制台中每隔一段时间就会报下述提示信息:

[Consumer clientId=consumer-ONE-1, groupId=ONE] Disconnecting from node 1001 due to request timeout.
[Consumer clientId=consumer-ONE-1, groupId=ONE] Cancelled in-flight FETCH request with correlation id 21 due to node 1001 being disconnected (elapsed time since creation: 30015ms, elapsed time since send: 30011ms, request timeout: 30000ms)
[Consumer clientId=consumer-ONE-1, groupId=ONE] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1001:

意思就是说由于客户端和服务端的连接被断开了,取消客户端的在线fetch请求,所以一般情况下,request.timeout.ms 的默认值未改变时,建议 fetch-max-wait 这个值不要超过30秒。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/225546.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

非暴力沟通-情绪篇

我相信&#xff0c;如果我们能够问自己两个问题&#xff0c;我们将会看到&#xff0c;惩罚永远不会以建设性的方式让我们的需要真正得到满足。 第一个问题&#xff1a; 你想要对方去做哪些和现在不一样的事情呢&#xff1f; 如果我们之问这一个问题&#xff0c;可能有时候看上…

vue3组件的基本结构

<template><div class"login_wrap"><div class"form_wrap"> <!-- 账号输入--> <el-form ref"formRef" :model"user" class"demo-dynamic" > <!--prop要跟属性名称对应-->…

微服务组件Sentinel的学习(2)

限流规则 流控模式直接模式关联模式链路模式 流控效果快速失败warm up排队等待 热点参数限流 流控模式 添加限流规则&#xff0c;可点击高级选项&#xff0c;有三种流控模式选择&#xff1a; 直接:统计当前资源的请求&#xff0c;触发闻值时对当前资源直接限流&#xff0c;也是…

Axure之动态面板轮播图

目录 一.介绍 二.好处 三.动态面板轮播图 四.动态面板多方式登录 五.ERP登录 六.ERP的左侧菜单栏 七.ERP的公告栏 今天就到这了哦&#xff01;&#xff01;&#xff01;希望能帮到你了哦&#xff01;&#xff01;&#xff01; 一.介绍 Axure中的动态面板是一个非常有用的组…

【数据结构】——查找、散列表简答题模板

目录 一、查找的概念&#xff08;一&#xff09;静态查找和动态查找&#xff08;二&#xff09;二分查找的适用情况&#xff08;三&#xff09;查找算法中的监视哨 二、散列查找&#xff08;一&#xff09;同义词&#xff08;二&#xff09;构造哈希函数&#xff08;三&#xf…

2024年视频监控行业发展趋势预测及EasyCVR视频分析技术应用

随着技术的改进&#xff0c;视频监控领域在过去十年迅速发展。与此同时&#xff0c;该行业正在通过先进创新技术&#xff08;如人工智能和云计算等技术&#xff09;的积极商业化&#xff0c;获得了新的增长机会。视频监控系统不再仅仅用于记录图像&#xff0c;而是已经成为全球…

力扣题目学习笔记(OC + Swift) 12. 整数转罗马数字

12. 整数转罗马数字 罗马数字包含以下七种字符&#xff1a; I&#xff0c; V&#xff0c; X&#xff0c; L&#xff0c;C&#xff0c;D 和 M。 字符 数值 I 1 V 5 X 10 L 50 C 100 D 500 M 1000 例如&#xff0c; 罗马数字 2 写做 II &#xff0c;即为两个并列的 1。12 写做 XI…

LVM异常分析

环境信息 硬件环境 软件环境 相关软件包 云上鲲鹏RH220 操作系统&#xff1a;麒麟V10sp1-0711 系统自带多路径&#xff1a;multipath-tools-0.8.4-6 光纤连接华为存储Oceanstor18500 v5 内核版本&#xff1a;4.19.90 故障描述 云上鲲鹏RH220安装系统麒麟V10sp1-071…

ssm+vue的高校智能培训管理系统分析与设计(有报告)。Javaee项目,ssm vue前后端分离项目。

演示视频&#xff1a; ssmvue的高校智能培训管理系统分析与设计&#xff08;有报告&#xff09;。Javaee项目&#xff0c;ssm vue前后端分离项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09…

npm ,yarn 更换使用国内镜像源,阿里源,清华大学源

在平时开发当中&#xff0c;我们经常会使用 Npm&#xff0c;yarn 来构建 web 项目。但是npm默认的源的服务器是在国外的&#xff0c;如果没有梯子的话。会感觉特别特别慢&#xff0c;所以&#xff0c;使用国内的源是非常有必要的。 在这里插入图片描述 Nnpm&#xff0c; yarn …

vue+react题集整理

1.Typescript中 interface 和 type 的差别是什么&#xff1f; interface只能用来描述对象类型 type可以描述任何类型组合 type后边需要有 interface后边没有 当多次使用相同名称定义一个 interface 时&#xff0c;它们会自动合并为一个接口。同名属性的不能进行类型覆盖修改&am…

1130 - Host “WIN-CA4FHERGO9J‘ is not allowed to connect to this MySQL server

1、知识小课堂 1.1 Mysql MySQL是一个关系型数据库管理系统&#xff0c;由瑞典MySQL AB公司开发&#xff0c;属于Oracle旗下产品。它是最流行的关系型数据库管理系统之一&#xff0c;在WEB应用方面&#xff0c;MySQL是最好的RDBMS (Relational Database Management System&am…

elementui + vue2实现表格行的上下移动

场景&#xff1a; 如上&#xff0c;要实现表格行的上下移动 实现&#xff1a; <el-dialogappend-to-bodytitle"条件编辑":visible.sync"dialogVisible"width"60%"><el-table :data"data1" border style"width: 100%&q…

GEE(ccdc)——连续变化检测和分类 (CCDC)概述

连续变化检测和分类 (CCDC) 1 背景 1.1 土地变化监测 土地覆盖变化影响自然和人为环境,被全球气候观测系统视为基本气候变量。例如,荒漠化导致土地覆盖从植物生态系统转变为沙漠,森林砍伐导致森林转变为人类改变的土地用途,城市发展可以将自然环境转变为被建筑物和道路覆…

MVVM和MVC以及MVP的原理以及它们的区别

MVVM、MVC 和 MVP 都是前端架构模式&#xff0c;它们各自有不同的原理和特点。 MVC&#xff08;Model-View-Controller&#xff09; 原理&#xff1a;MVC 将应用程序分为三个部分&#xff1a;模型&#xff08;Model&#xff09;、视图&#xff08;View&#xff09;和控制器&a…

antd+vue:tree组件:父级节点禁止选择并不展示选择框——基础积累

antdvue:tree组件&#xff1a;父级节点禁止选择并不展示选择框——基础积累 1.判断哪些是父节点&#xff0c;给父节点添加disabled属性——this.permissionList是数据源2.通过css样式来处理disabled的父节点3.完整代码如下&#xff1a; 最近在写后台管理系统的时候&#xff0c;…

[GXYCTF2019]Ping Ping Ping (文件执行漏洞)

本题考点&#xff1a; 1、命令联合执行 2、命令绕过空格方法 3、变量拼接 1、命令联合执行 ; 前面的执行完执行后面的| 管道符&#xff0c;上一条命令的输出&#xff0c;作为下一条命令的参数&#xff08;显示后面的执行结果&#xff09;|| 当前面的执行出错时&#xff08;为…

从计时器失效到判断页面可见性

目录 1&#xff0c;问题 - 计时器失效2&#xff0c;解决 - 页面可见性判断1&#xff0c;页面可见性2&#xff0c;visibilitychange3&#xff0c;终极解决方案 - lifecycle 3&#xff0c;精准计时 1&#xff0c;问题 - 计时器失效 问题复现&#xff1a;移动端必现&#xff0c;p…

BMS常见名词解释

&#xff08;1&#xff09;阻抗&#xff1a;在具有电阻、电感和电容的电路里&#xff0c;对电路中的电流所起的阻碍作用叫作阻抗。阻抗常用Z表示&#xff0c;是一个复数&#xff0c;实部称为电阻&#xff0c;虚部称为电抗。电容在电路中对交流电所起的阻碍作用称为容抗 ,电感在…

C#winform实现单页面自由切换窗口

一、介绍 这是效果图&#xff0c;由于视频压缩画质很差&#xff0c;看个效果就好。 左侧是打开界面的按钮&#xff0c;点击左侧按钮右侧打开不同窗口&#xff0c;点击右侧窗口中的按钮&#xff0c;也可以切换页面&#xff0c;可以方便的进行返回、下一页等操作。 每个窗口打开…