Kafka生产问题总结及性能优化实践

1、消息丢失情况

消息发送端:
(1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。大数据统计报表场景,对性能要求很高,对数据丢失不敏感的情况可以用这种。
(2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
(3)acks=-1或all: 这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。当然如果min.insync.replicas配置的是1则也可能丢消息,跟acks=1情况类似。

消息消费端:
如果消费这边配置的是自动提交,万一消费到数据还没处理完,就自动提交offset了,但是此时你consumer直接宕机了,未处理完的数据丢失了,下次也消费不到了。

2、消息重复消费

消息发送端:
发送消息如果配置了重试机制,比如网络抖动时间过长导致发送端发送超时,实际broker可能已经接收到消息,但发送方会重新发送消息。

消息消费端:
如果消费这边配置的是自动提交,刚拉取了一批数据处理了一部分,但还没来得及提交,服务挂了,下次重启又会拉取相同的一批数据重复处理。
一般消费端都是要做消费幂等处理的。

3、消息乱序

如果发送端配置了重试机制,kafka不会等之前那条消息完全发送成功才去发送下一条消息,这样可能会出现,发送了1,2,3条消息,第一条超时了,后面两条发送成功,再重试发送第1条消息,这时消息在broker端的顺序就是2,3,1了。所以,是否一定要配置重试要根据业务情况而定。也可以用同步发送的模式去发消息,当然acks不能设置为0,这样也能保证消息从发送端到消费端全链路有序。
kafka保证全链路消息顺序消费,需要从发送端开始,将所有有序消息发送到同一个分区,然后用一个消费者去消费,但是这种性能比较低,可以在消费者端接收到消息后将需要保证顺序消费的几条消费发到内存队列(可以搞多个),一个内存队列开启一个线程顺序处理消息。

4、消息积压

1)线上有时因为发送方发送消息速度过快,或者消费方处理消息过慢,可能会导致broker积压大量未消费消息。此种情况如果积压了上百万未消费消息需要紧急处理,可以修改消费端程序,让其将收到的消息快速转发到其他topic(可以设置很多分区),然后再启动多个消费者同时消费新主题的不同分区。
2)由于消息数据格式变动或消费者程序有bug,导致消费者一直消费不成功,也可能导致broker积压大量未消费消息。
此种情况可以将这些消费不成功的消息转发到其它队列里去(类似死信队列),后面再慢慢分析死信队列里的消息处理问题。

5、延时队列

延时队列存储的对象是延时消息。所谓的“延时消息”是指消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者才能获取这个消息进行消费,延时队列的使用场景有很多, 比如 :
1)在订单系统中, 一个用户下单之后通常有 30 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单将进行异常处理,这时就可以使用延时队列来处理这些订单了。
2)订单完成1小时后通知用户进行评价。

实现思路:发送延时消息时先把消息按照不同的延迟时间段发送到指定的队列中(topic_1s,topic_5s,topic_10s,…topic_2h,这个一般不能支持任意时间段的延时),然后通过定时器进行轮训消费这些topic,查看消息是否到期,如果到期就把这个消息发送到具体业务处理的topic中,队列中消息越靠前的到期时间越早,具体来说就是定时器在一次消费过程中,对消息的发送时间做判断,看下是否延迟到对应时间了,如果到了就转发,如果还没到这一次定时任务就可以提前结束了。

6、消息回溯

如果某段时间对已消费消息计算的结果觉得有问题,可能是由于程序bug导致的计算错误,当程序bug修复后,这时可能需要对之前已消费的消息重新消费,可以指定从多久之前的消息回溯消费,这种可以用consumer的offsetsForTimes、seek等方法指定从某个offset偏移的消息开始消费,参见上节课的内容。

7、分区数越多吞吐量越高吗

可以用kafka压测工具自己测试分区数不同,各种情况下的吞吐量

# 往test里发送一百万消息,每条设置1KB
# throughput 用来进行限流控制,当设定的值小于 0 时不限流,当设定的值大于 0 时,当发送的吞吐量大于该值时就会被阻塞一段时间
bin/kafka‐producer‐perf‐test.sh ‐‐topic test ‐‐num‐records 1000000 ‐‐record‐size 1024 ‐‐throughput ‐1
‐‐producer‐props bootstrap.servers=192.168.65.60:9092 acks=1

在这里插入图片描述
网络上很多资料都说分区数越多吞吐量越高 , 但从压测结果来看,分区数到达某个值吞吐量反而开始下降,实际上很多事情都会有一个临界值,当超过这个临界值之后,很多原本符合既定逻辑的走向又会变得不同。一般情况分区数跟集群机器数量相当就差不多了。
当然吞吐量的数值和走势还会和磁盘、文件系统、 I/O调度策略等因素相关。
注意:如果分区数设置过大,比如设置10000,可能会设置不成功,后台会报错"java.io.IOException : Too many open files"。
异常中最关键的信息是“ Too many open flies”,这是一种常见的 Linux 系统错误,通常意味着文件描述符不足,它一般发生在创建线程、创建 Socket、打开文件这些场景下 。 在 Linux系统的默认设置下,这个文件描述符的个数不是很多 ,通过 ulimit -n 命令可以查看:一般默认是1024,可以将该值增大,比如:ulimit -n 65535

8、消息传递保障

at most once(消费者最多收到一次消息,0-1次):acks = 0 可以实现。
at least once(消费者至少收到一次消息,1-多次):ack = all 可以实现。
exactly once(消费者刚好收到一次消息):at least once 加上消费者幂等性可以实现,还可以用kafka生产者的幂等性来实
现。

kafka生产者的幂等性::因为发送端重试导致的消息重复发送问题,kafka的幂等性可以保证重复发送的消息只接收一次,只需在生产者加上参数 props.put(“enable.idempotence”, true) 即可,默认是false不开启。
具体实现原理是,kafka每次发送消息会生成PID和Sequence Number,并将这两个属性一起发送给broker,broker会将PID和
Sequence Number跟消息绑定一起存起来,下次如果生产者重发相同消息,broker会检查PID和Sequence Number,如果相同不会再接收。

PID:每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个PID 对用户完全是透明的。生产者如果重启则会生成新的PID。
Sequence Number:对于每个 PID,该 Producer 发送到每个 Partition 的数据都有对应的序列号,这些序列号是从0开始单调递增的。

9、kafka的事务

Kafka的事务不同于Rocketmq,Rocketmq是保障本地事务(比如数据库)与mq消息发送的事务一致性,Kafka的事务主要是保障一次发送多条消息的事务一致性(要么同时成功要么同时失败),一般在kafka的流式计算场景用得多一点,比如,kafka需要对一个topic里的消息做不同的流式计算处理,处理完分别发到不同的topic里,这些topic分别被不同的下游系统消费(比如hbase,redis,es等),这种我们肯定希望系统发送到多个topic的数据保持事务一致性。Kafka要实现类似Rocketmq的分布式事务需要额外开发功能。
kafka的事务处理可以参考官方文档:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my‐transactional‐id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
//初始化事务
producer.initTransactions();try {
//开启事务
producer.beginTransaction();
for (int i = 0; i < 100; i++){
//发到不同的主题的不同分区
producer.send(new ProducerRecord<>("hdfs‐topic", Integer.toString(i), Integer.toString(i)));
producer.send(new ProducerRecord<>("es‐topic", Integer.toString(i), Integer.toString(i)));
producer.send(new ProducerRecord<>("redis‐topic", Integer.toString(i), Integer.toString(i)));
}
//提交事务
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
//回滚事务
producer.abortTransaction();
}
producer.close();

10、kafka高性能的原因

磁盘顺序读写:kafka消息不能修改以及不会从文件中间删除保证了磁盘顺序读,kafka的消息写入文件都是追加在文件末尾,
不会写入文件中的某个位置(随机写)保证了磁盘顺序写。
数据传输的零拷贝。
读写数据的批量batch处理以及压缩传输。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/214965.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JavaCV之rtmp推流(FLV和M3U8)

JavaCV与FFmpeg FFmpeg是一款开源的多媒体处理工具集&#xff0c;它包含了一系列用于处理音频、视频、字幕等多媒体数据的库和工具。 JavaCV集成了FFmpeg库&#xff0c;使得Java开发者可以使用FFmpeg的功能&#xff0c;比如视频解码、编码、格式转换等。 除了FFmpeg&#xff0…

LeetCode力扣每日一题(Java):35、搜索插入位置

一、题目 二、解题思路 1、我的思路&#xff08;又称&#xff1a;论API的重要性&#xff09; 读完题目之后&#xff0c;我心想这题目怎么看着这么眼熟&#xff1f;好像我之前学过的一个API呀&#xff01; 于是我回去翻了翻我之前写的博客&#xff1a;小白备战蓝桥杯&#xf…

通用的AGI 安全风险

传统安全风险 平台基础设施安全风险 模型与数据层安全风险 应用层安全风险 平台基础设施安全风险 &#xff08;1&#xff09;物理攻击&#xff1a;机房管控不到位 &#xff08;2&#xff09;网络攻击 &#xff08;3&#xff09;计算环境&#xff1a;自身安全漏洞&#xf…

编辑器Sublime text 常用快捷命令 列模式 替换空行

平替notepad 下载可取官网 www.sublimetext.com 据说可以无限试用&#xff0c;没有功能限制 1、快速删除空行 ctrl h选择正则表达式 .*Find输入&#xff1a; ^(\t)*$\nReplace输入&#xff1a;点击Replace All 2、快速选择指定字符 用鼠标选中alt f3修改 3、列编辑模式 ct…

宇视科技视频监控 main-cgi 文件信息泄露漏洞复现

0x01 产品简介 宇视(Uniview)高清网络摄像机是一种高性能的网络摄像机,它可以通过网络进行视频传输和监控。该摄像机采用先进的视频技术,具有高清晰度、低照度、宽动态等特点,能够提供高质量的视频图像。 0x02 漏洞概述 宇视(Uniview)高清网络摄像机存在信息泄露漏洞…

ppt编辑密码如何设置?

大家在PPT中设置了限制编辑&#xff0c;发现后面任然可以编辑文件。那么如何将PPT文件设置成禁止修改模式呢&#xff1f;今天分享几个方法给大家。 方法一 将PPT文件直接保存或者另存为一份文件&#xff0c;在保存时&#xff0c;将文件格式选择为PowerPoint图片演示文稿 方法…

.NET 8 编写 LiteDB vs SQLite 数据库 CRUD 接口性能测试(测试篇)

WebAppDbTest 项目测试 测试工具 ltt介绍安装使用方式1、Drill2、Hammer3、Nailgun 测试主机规格配置CRUD 性能测试对比1、ltt 工具测试1.1、AddSingle 单条数据添加1.2、AddBulk 批量数据&#xff08;1000&#xff09;条添加1.3、GetSingle 单条数据查询1.4、GetAll 多条&…

多合一iPhone 解锁工具:iMyFone LockWiper iOS

多合一iPhone 解锁工具 无需密码解锁 iPhone/iPad/iPod touch 上所有类型的屏幕锁定 在几分钟内解锁 iPhone Apple ID、Touch ID 和 Face ID 立即绕过 MDM 并删除 iPhone/iPad/iPod touch 上的 MDM 配置文件 支持所有 iOS 版本和设备&#xff0c;包括最新的 iOS 17 和 iPhone 1…

JAVA实操经验

零&#xff1a; 按照需要&#xff0c;可以使用需要某个类下&#xff08;主要是java提供的&#xff09;的方法来实现某个功能。&#xff08;主要是用在不同类下的方法会进行重写功能不同&#xff09; 方法和构造方法不同&#xff1a;方法是方法&#xff0c;构造方法是构造器&a…

基于FPGA的视频接口之高速IO

简介 相对于其他视频接口来说,高速IO接口(以Xilinx公司为例,spartan 6系列的GTP、Artix7系列的GTP,KENTEX7系列的GTX和GTH等)具有简化设计、充分利用FPGA资源、降低设计成本等功能。 高速IO接口传输视频,一般会被拓展为万兆以太网、40G以太网、10G光纤、40G光纤、3G-SDI、…

c语言插入排序及希尔排序详解

目录 前言&#xff1a; 插入排序&#xff1a; 希尔排序&#xff1a; 前言&#xff1a; 排序在我们生活中无处不在&#xff0c;比如学生成就排名&#xff0c;商品价格排名等等&#xff0c;所以排序在数据结构的学习中尤为重要&#xff0c;今天就为大家介绍两个经典的排序算法&…

深入解析C++中的虚函数和虚继承:实现多态性与继承关系的高级特性

这里写目录标题 虚函数虚函数实现动态绑定虚继承抽象类 虚函数 虚函数是在C中用于实现多态性的一种特殊函数。它通过使用关键字"virtual"进行声明&#xff0c;在基类中定义&#xff0c;可在派生类中进行重写。虚函数允许在运行时根据对象的实际类型来调用相应的函数…

BigData之Google Hadoop中间件安装

前言 Hadoop / Zookeeper / Hbase 因资源有限 这三个都是安装在同一台Centos7.9的机器上 但通过配置 所以在逻辑上是distributed模式 1 Java安装 1.1 下载java11 tar/opt/java/jdk-11.0.5/ 1.2 环境配置修改 文件/etc/profile export JAVA_HOME/opt/java/jdk-11.0.5/ e…

HarmonyOS编译开源native库(OpenSSL实例)

前言 近期项目要开始做鸿蒙版本&#xff0c;有一部分依赖native的代码也需要迁移&#xff0c;某个native模块依赖openssl&#xff0c;需要在鸿蒙下重新编译openssl才行。一开始找了很多相关文档都没有得到方法&#xff0c;无奈只能自己凭经验慢慢试&#xff0c;最后还是成功了…

JS基础之执行上下文

JS基础之执行上下文 执行上下文顺序执行可执行代码执行上下文栈回顾上文 执行上下文 顺序执行 写个JavaScript的开发者都会有个直观的印象&#xff0c;那就是顺序执行&#xff1a; var foo function(){console.log(foo1) } foo(); //foo1 var foo function(){console.log(…

HTML面试题---专题一

文章目录 一、前言二、 HTML5 中 <header> 和 <footer> 标签的用途是什么&#xff1f;三、如何在 HTML 中嵌入 SVG&#xff08;可缩放矢量图形&#xff09;文件&#xff1f;四、解释 contenteditable 属性的用途五、如何创建随屏幕尺寸缩放的响应式图像&#xff1f…

线上扭蛋机小程序搭建,扭蛋与科技的完美结合

扭蛋机作为当下比较热门的一种盲盒玩法&#xff0c;在年轻人群体中非常受欢迎。随着经济的增长和人们生活水平的提高&#xff0c;人们对娱乐消费需求也在增加&#xff0c;扭蛋机的受众群体也在扩大。 目前线上扭蛋机小程序也获得了大众的青睐&#xff0c;扭蛋机小程序就是把线…

记录一下快速上手Springboot登录注册项目

本教程需要安装以下工具&#xff0c;如果不清楚怎么安装的可以看下我的这篇文章 链接: https://blog.csdn.net/qq_30627241/article/details/134804675 管理工具&#xff1a; maven IDE&#xff1a; IDEA 数据库&#xff1a; MySQL 测试工具&#xff1a; Postman 打开IDE…

AR-LDM原理及代码分析

AR-LDM原理AR-LDM代码分析pytorch_lightning(pl)的hook流程main.py 具体分析TrainSampleLightningDatasetARLDM blip mm encoder AR-LDM原理 左边是模仿了自回归地从1, 2, ..., j-1来构造 j 时刻的 frame 的过程。 在普通Stable Diffusion的基础上&#xff0c;使用了1, 2, .…