kafka服务端允许生产者发送最大消息体大小

1、kafka config服务端配置文件server.properties

        server.properties中加上的message.max.bytes配置,我目前设置为5242880,即5MB,可以根据实际情况增大。

message.max.bytes=5242880

        在生产者端配置max.request.size,这是单个消息最大字节数,根据实际调整,max.request.size 必须小于 message.max.bytes 以及消费者的 max.partition.fetch.bytes。这样消息就能不断发送。

2、重启kafka服务

3、生产者配置

# 发送所有ISR
acks=all
# 重试次数
retries=2
# 批量发送大小(128KB)
batch.size=131072
# 消息体大小(5M)
max.request.size=5242880
# 缓存大小,根据本机内存大小配置(64M)
buffer.memory=67108864
# 发送频率,和batch.size参数满足任一条件发送
linger.ms=5
# 发送端id,便于统计(单线程时)
client.id=producer-asyn
# Key序列化
key.serializer=org.apache.kafka.common.serialization.StringSerializer
# Value序列化
value.serializer=org.apache.kafka.common.serialization.StringSerializer

4、springboot kafka 配置 max.request.size

1、在 application.properties 文件中添加如下配置

spring.kafka.producer.max-request-size=5242880

 2、在 KafkaTemplate 中添加如下配置

        Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 5242880);KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(configs));

 3、在 KafkaProducer 中添加如下配置

        Properties props = new Properties();props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 5242880);KafkaProducer<String, String> producer = new KafkaProducer<>(props);

5、配置类参数设置

        Properties props = new Properties();props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576);props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("buffer.memory", 67108864);props.put("batch.size", 131072);props.put("linger.ms", 100);props.put("max.request.size", 10485760);props.put("retries", 10);props.put("retry.backoff.ms", 500);props.put("acks", "1");KafkaProducer<String, String> producer = new KafkaProducer<>(props);

6、参数详解

  • buffer.memory

  Kafka的客户端发送数据到服务器,不是来一条就发一条,而是经过缓冲的,也就是说,通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集成一个一个的Batch,再发送到Broker上去的,这样性能才可能高。

        buffer.memory的本质就是用来约束KafkaProducer能够使用的内存缓冲的大小的,默认值32MB。

  如果buffer.memory设置的太小,可能导致的问题是:消息快速的写入内存缓冲里,但Sender线程来不及把Request发送到Kafka服务器,会造成内存缓冲很快就被写满。而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了。 

  所以“buffer.memory”参数需要结合实际业务情况压测,需要测算在生产环境中用户线程会以每秒多少消息的频率来写入内存缓冲。经过压测,调试出来一个合理值。

  • batch.size

  每个Batch要存放batch.size大小的数据后,才可以发送出去。比如说batch.size默认值是16KB,那么里面凑够16KB的数据才会发送。

  理论上来说,提升batch.size的大小,可以允许更多的数据缓冲在里面,那么一次Request发送出去的数据量就更多了,这样吞吐量可能会有所提升。

  但是batch.size也不能过大,要是数据老是缓冲在Batch里迟迟不发送出去,那么发送消息的延迟就会很高。

  一般可以尝试把这个参数调节大些,利用生产环境发消息负载测试一下。

  • linger.ms

  一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了。

  比如说batch.size是16KB,但是现在某个低峰时间段,发送消息量很小。这会导致可能Batch被创建之后,有消息进来,但是迟迟无法凑够16KB,难道此时就一直等着吗?

  当然不是,假设设置“linger.ms”是50ms,那么只要这个Batch从创建开始到现在已经过了50ms了,哪怕他还没满16KB,也会被发送出去。 

  所以“linger.ms”决定了消息一旦写入一个Batch,最多等待这么多时间,他一定会跟着Batch一起发送出去。 

  linger.ms配合batch.size一起来设置,可避免一个Batch迟迟凑不满,导致消息一直积压在内存里发送不出去的情况。

  • max.request.size

  决定了每次发送给Kafka服务器请求消息的最大大小。

  如果发送的消息都是大报文消息,每条消息都是数据较大,例如一条消息可能要20KB。此时batch.size需要调大些,比如设置512KB,buffer.memory也需要调大些,比如设置128MB。 

  只有这样,才能在大消息的场景下,还能使用Batch打包多条消息的机制。

  此时“max.request.size”也得同步增加。

  • retries和retries.backoff.ms

  重试机制,也就是如果一个请求失败了可以重试几次,每次重试的间隔是多少毫秒,根据业务场景需要设置。

  • acks

acks

含义
0 Producer 往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
1 Producer 往集群发送数据只要 Leader 应答就可以发送下一条,只确保 Leader 接收成功。
-1 或 all Producer 往集群发送数据需要所有的ISR Follower 都完成从 Leader 的同步才会发送下一条,确保 Leader 发送成功和所有的副本都成功接收。安全性最高,但是效率最低。

附:

Kafka参数调优,解决The request included a message larger than the max message size the server will accept._Pallas_Cat的博客-CSDN博客

转载请注明出处:BestEternity亲笔。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/11399.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【React】精选5题

第1题&#xff1a;简述下 React 的生命周期&#xff1f;每个生命周期都做了什么&#xff1f; React 组件的生命周期可以分为三个阶段&#xff1a;挂载阶段、更新阶段和卸载阶段。每个生命周期方法都有特定的目的和功能。 挂载阶段&#xff1a; constructor&#xff1a;组件的构…

protobuf入门实践2

如何在proto中定义一个rpc服务? syntax "proto3"; //声明protobuf的版本package fixbug; //声明了代码所在的包 &#xff08;对于C来说就是namespace)//下面的选项&#xff0c;表示生成service服务类和rpc方法描述&#xff0c; 默认是不生成的 option cc_generi…

如何正确使用npm常用命令

npm常用命令&#xff1a; 官方文档&#xff1a;CLI Commands | npm Docs 1. npm -v&#xff1a;查看 npm 版本 2. npm init&#xff1a;初始化后会出现一个 Package.json 配置文件&#xff0c;可以在后面加上 -y&#xff0c;快速跳到问答界面 3. npm install&#xff1a;会…

Iceberg 合并datafiles、manifest files,清除过期snapshot、删除孤立文件

本文Iceberg使用的为HiveCataLog&#xff0c;依赖HiveMemstore 1、首先获取要操作的表对象及SparkSession import org.apache.iceberg.{CatalogProperties, Table} import org.apache.iceberg.spark.actions.SparkActions ...... ...... ...... //获取表 val tabled: TableIden…

6个高清图片素材网站,免费下载,值得推荐~

关于图片素材网站&#xff0c;我一直都在推荐这几个&#xff0c;免费下载&#xff0c;可商用&#xff0c;建议收藏起来~ 菜鸟图库 https://www.sucai999.com/pic.html?vNTYwNDUx 网站主要是为新手设计师提供免费素材的&#xff0c;素材的质量都很高&#xff0c;类别也很多&a…

在C++中 ,什么时候用:: ?什么时候用. ?什么时候用->?

在C中 &#xff0c;什么时候用:: ?什么时候用. ?什么时候用->?在 C 中&#xff0c;::、. 和 -> 是三种不同的运算符&#xff0c;用于访问类、结构体、命名空间、指针等的成员。它们的使用场景如下&#xff1a; ::&#xff08;作用域解析运算符&#xff09;&#xff1a…

Git移除commit过的大文件

前言&#xff1a;在提交推送本地更改至仓库时&#xff0c;误将大文件给提交了&#xff0c;导致push时报错文件过大&#xff0c;因此需要将已经commit的大文件移除后再push 若已知要删除的文件或文件夹路径&#xff0c;则可以从第4步开始 1.对仓库进行gc操作 $ git gc 2.查询…

23款奔驰S400豪华型升级后排电动腿托系统,提升后排乘坐舒适性

奔驰S400L后排座椅是不带腿托和脚托的&#xff0c;也没有一键躺平功能&#xff0c;相对于奔驰S级高配车型上配置的右边老板位座椅&#xff0c;舒适性就差强了一些。

【基于Spark的电影推荐系统】环境准备

概览 本科毕设做过电影推荐系统&#xff0c;但当时的推荐算法只有一个&#xff0c;现在已经忘记大部分了&#xff0c;当时也没有记录&#xff0c;因此写这个博客重新来记录一下。此外&#xff0c;技术栈由于快秋招原因来不及做过多的部分&#xff0c;因此只实现简单的功能&…

Vue.nextTick函数的用法及在异步更新中的应用

在Vue.js中&#xff0c;我们经常会遇到需要在DOM更新之后执行一些操作的情况。但是由于Vue的响应式更新是异步执行的&#xff0c;直接在更新数据后立即操作DOM可能得不到正确的结果。为了解决这个问题&#xff0c;Vue提供了Vue.nextTick函数。 Vue.nextTick函数是一个异步方法…

从零开始 Spring Cloud 7:Gateway

从零开始 Spring Cloud 7&#xff1a;Gateway 图源&#xff1a;laiketui.com Spring Cloud Gateway 是 Spring Cloud 的一个全新项目&#xff0c;该项目是基于 Spring 5.0&#xff0c;Spring Boot 2.0 和 Project Reactor 等响应式编程和事件流技术开发的网关&#xff0c;它旨…

服务都挂了你还在打代码?

服务挂了&#xff1f; 线上服务在疯狂的报错&#xff0c;你还在悠哉悠哉的打代码&#xff0c;等到用户开始反馈问题&#xff0c;这时候才去线上查日志&#xff0c;黄花菜都凉了。老板&#xff1a;“去财务结一下账吧”。 异常告警 对于很多基础设施比较完善的公司&#xff0…

ICC2删除所有电源的方法

我正在「拾陆楼」和朋友们讨论有趣的话题&#xff0c;你⼀起来吧&#xff1f;知识星球入口 remove_pg_patterns -all remove_pg_strategies -all remove_pg_strategy_via_rules -all remove_pg_via_master_rules -all remove_pg_regions -all remove_routes -net_types {p…

网安周报 | 银行业成为开源软件供应链攻击的目标

网安周报是棱镜七彩推出的安全资讯专栏&#xff0c;旨在通过展示一周内发生的与开源安全、软件供应链安全相关攻击事件&#xff0c;让用户了解开源及软件供应链威胁&#xff0c;提高对安全的重视&#xff0c;做好防御措施。 1、银行业成为开源软件供应链攻击的目标 网络安全研…

微服务入门---SpringCloud(二)

微服务入门---SpringCloud&#xff08;二&#xff09; 1.Nacos配置管理1.1.统一配置管理1.1.1.在nacos中添加配置文件1.1.2.从微服务拉取配置 1.2.配置热更新1.2.1.方式一1.2.2.方式二 1.3.配置共享1&#xff09;添加一个环境共享配置2&#xff09;在user-service中读取共享配置…

vue2+wangEditor5富文本编辑器(图片视频自定义上传七牛云/服务器)

1、安装使用 安装 yarn add wangeditor/editor # 或者 npm install wangeditor/editor --save yarn add wangeditor/editor-for-vue # 或者 npm install wangeditor/editor-for-vue --save在main.js中引入样式 import wangeditor/editor/dist/css/style.css在使用编辑器的页…

Android 截图功能实现

Android 截图功能实现 简介效果图功能实现1. 截取当前可见范围屏幕2. 截取当前可见范围屏幕&#xff08;不包含状态栏&#xff09;3. 截取某个控件4. 截取ScrollView5. 长截图6. 截屏动画效果7. 显示截屏结果&#xff0c;自动消失6. 完整代码 简介 在Android应用中开发截图功能…

顺序表的实现

文章目录 1.概念及结构 2.接口实现 3.数组相关oj题 4.顺序表的问题及思考 文章内容 1.概念及结构 线性表&#xff08;linear list&#xff09;是n个具有相同特性的数据元素的有限序列。 线性表是一种在实际中广泛使 用的数据结构&#xff0c;常见的线性表&#xff1a;顺序…

【物理】模拟粒子在电场和磁场中的轨迹研究(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f308;4 Matlab代码实现 &#x1f4a5;1 概述 模拟粒子在电场和磁场中的轨迹研究是物理学中的一个重要研究领域&#xff0c;涉及到电磁场、粒子运动、轨迹分析等多个方面。在这个研究中&…

Python爬虫实例之淘宝商品页面爬取(api接口)

可以使用Python中的requests和BeautifulSoup库来进行网页爬取和数据提取。以下是一个简单的示例&#xff1a; import requests from bs4 import BeautifulSoupdef get_product_data(url):# 发送GET请求&#xff0c;获取网页内容headers {User-Agent: Mozilla/5.0 (Windows NT…