kafka第三课-可视化工具、生产环境问题总结以及性能优化

一、可视化工具

https://pan.baidu.com/s/1qYifoa4 密码:el4o

  1. 下载解压之后,编辑该文件,修改zookeeper地址,也就是kafka注册的zookeeper的地址,如果是zookeeper集群,以逗号分开

vi conf/application.conf

在这里插入图片描述

  1. 启动命令
bin/kafka-manager
  1. 默认9000端口,直接访问,添加一个名字,名字只是方便开发者使用,随便起,地址是zookeeper的地址和端口
    在这里插入图片描述
    在这里插入图片描述
  2. 点进去可以看到对应的主体以及kafka的集群节点信息

在这里插入图片描述

二、线上环境规划

在这里插入图片描述
JVM参数设置
kafka是scala语言开发,运行在JVM上,需要对JVM参数合理设置,参看JVM调优专题
修改bin/kafka-start-server.sh中的jvm设置,假设机器是32G内存,可以如下设置:

export KAFKA_HEAP_OPTS="-Xmx16G -Xms16G -Xmn10G -XX:MetaspaceSize=256M -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:G1HeapRegionSize=16M"
这种大内存的情况一般都要用G1垃圾收集器,因为年轻代内存比较大,用G1可以设置GC最大停顿时间,不至于一次minor gc就花费太长时间,当然,因为像kafka,rocketmq,es这些中间件,写数据到磁盘会用到操作系统的page cache,所以JVM内存不宜分配过大,需要给操作系统的缓存留出几个G。

这种大内存的情况一般都要用G1垃圾收集器,因为年轻代内存比较大,用G1可以设置GC最大停顿时间,不至于一次minor gc就花费太长时间,当然,因为像kafka,rocketmq,es这些中间件,写数据到磁盘会用到操作系统的page cache,所以JVM内存不宜分配过大,需要给操作系统的缓存留出几个G。

三、线上问题及优化

1、消息丢失情况:

消息发送端:
(1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。大数据统计报表场景,对性能要求很高,对数据丢失不敏感的情况可以用这种。
(2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
(3)acks=-1或all: 这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。当然如果min.insync.replicas配置的是1则也可能丢消息,跟acks=1情况类似。

消息消费端:
如果消费这边配置的是自动提交,万一消费到数据还没处理完,就自动提交offset了,但是此时你consumer直接宕机了,未处理完的数据丢失了,下次也消费不到了。

2、消息重复消费

消息发送端:
发送消息如果配置了重试机制,比如网络抖动时间过长导致发送端发送超时,实际broker可能已经接收到消息,但发送方会重新发送消息
消息消费端:
如果消费这边配置的是自动提交,刚拉取了一批数据处理了一部分,但还没来得及提交,服务挂了,下次重启又会拉取相同的一批数据重复处理
一般消费端都是要做消费幂等处理的。

3、消息乱序

如果发送端配置了重试机制,kafka不会等之前那条消息完全发送成功才去发送下一条消息,这样可能会出现,发送了1,2,3条消息,第一条超时了,后面两条发送成功,再重试发送第1条消息,这时消息在broker端的顺序就是2,3,1了
所以,是否一定要配置重试要根据业务情况而定。也可以用同步发送的模式去发消息,当然acks不能设置为0,这样也能保证消息发送的有序。
kafka保证全链路消息顺序消费,需要从发送端开始,将所有有序消息发送到同一个分区,然后用一个消费者去消费,但是这种性能比较低,可以在消费者端接收到消息后将需要保证顺序消费的几条消费发到内存队列(可以搞多个),一个内存队列开启一个线程顺序处理消息。

4、消息积压

1)线上有时因为发送方发送消息速度过快,或者消费方处理消息过慢,可能会导致broker积压大量未消费消息。
此种情况如果积压了上百万未消费消息需要紧急处理,可以修改消费端程序,让其将收到的消息快速转发到其他topic(可以设置很多分区),然后再启动多个消费者同时消费新主题的不同分区。

2)由于消息数据格式变动或消费者程序有bug,导致消费者一直消费不成功,也可能导致broker积压大量未消费消息。
此种情况可以将这些消费不成功的消息转发到其它队列里去(类似死信队列),后面再慢慢分析死信队列里的消息处理问题。

5、延时队列

延时队列存储的对象是延时消息。所谓的“延时消息”是指消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者才能获取这个消息进行消费,延时队列的使用场景有很多, 比如 :
1)在订单系统中, 一个用户下单之后通常有 30 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单将进行异常处理,这时就可以使用延时队列来处理这些订单了。
2)订单完成1小时后通知用户进行评价。
实现思路:发送延时消息时先把消息按照不同的延迟时间段发送到指定的队列中(topic_1s,topic_5s,topic_10s,…topic_2h,这个一般不能支持任意时间段的延时),然后通过定时器进行轮训消费这些topic,查看消息是否到期,如果到期就把这个消息发送到具体业务处理的topic中,队列中消息越靠前的到期时间越早,具体来说就是定时器在一次消费过程中,对消息的发送时间做判断,看下是否延迟到对应时间了,如果到了就转发,如果还没到这一次定时任务就可以提前结束了。

6、消息回溯
如果某段时间对已消费消息计算的结果觉得有问题,可能是由于程序bug导致的计算错误,当程序bug修复后,这时可能需要对之前已消费的消息重新消费,可以指定从多久之前的消息回溯消费,这种可以用consumer的offsetsForTimes、seek等方法指定从某个offset偏移的消息开始消费,参见上节课的内容。

当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费latest(默认) :只消费自己启动之后发送到主题的消息earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBeginning(每次都从头开始消费)*///props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

7、分区数越多吞吐量越高吗

可以用kafka压测工具自己测试分区数不同,各种情况下的吞吐量

# 往test里发送一百万消息,每条设置1KB
# throughput 用来进行限流控制,当设定的值小于 0 时不限流,
#当设定的值大于 0 时,当发送的吞吐量大于该值时就会被阻塞一段时间
#一条消息1024个字节,也就是1kb
#acks=1 参考上满
bin/kafka-producer-perf-test.sh --topic test --num-records 1000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=192.168.85.200:9092 acks=1

在这里插入图片描述
平均值是1秒多,最大发送时间是1.9秒,不过这个时间都是一批消息的时间,百万数据kafka是分批发的,具体参考上一个文档,看他的缓存区和bash区
网络上很多资料都说分区数越多吞吐量越高 , 但从压测结果来看,分区数到达某个值吞吐量反而开始下降,实际上很多事情都会有一个临界值,当超过这个临界值之后,很多原本符合既定逻辑的走向又会变得不同。一般情况分区数跟集群机器数量相当就差不多了。
当然吞吐量的数值和走势还会和磁盘、文件系统、 I/O调度策略等因素相关。
注意:如果分区数设置过大,比如设置10000,可能会设置不成功,后台会报错"java.io.IOException : Too many open files"。
异常中最关键的信息是“ Too many open flies”,这是一种常见的 Linux 系统错误,通常意味着文件描述符不足,它一般发生在创建线程、创建 Socket、打开文件这些场景下 。 在 Linux系统的默认设置下,这个文件描述符的个数不是很多 ,通过 ulimit -n 命令可以查看:一般默认是1024,可以将该值增大,比如:ulimit -n 65535

8、消息传递保障

  • at most once(消费者最多收到一次消息,0–1次):acks = 0 可以实现。
  • at least once(消费者至少收到一次消息,1–多次):ack = all 可以实现。
  • exactly once(消费者刚好收到一次消息):at least once 加上消费者幂等性可以实现,还可以用kafka生产者的幂等性来实现。

kafka生产者的幂等性:因为发送端重试导致的消息重复发送问题,kafka的幂等性可以保证重复发送的消息只接收一次,只需在生产者加上参数 props.put(“enable.idempotence”, true) 即可,默认是false不开启。
具体实现原理是,kafka每次发送消息会生成PID和Sequence Number,并将这两个属性一起发送给broker,broker会将PID和Sequence Number跟消息绑定一起存起来,下次如果生产者重发相同消息,broker会检查PID和Sequence Number,如果相同不会再接收。

PID:每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个PID 对用户完全是透明的。生产者如果重启则会生成新的PID。
Sequence Number:对于每个 PID,该 Producer 发送到每个 Partition 的数据都有对应的序列号,这些序列号是从0开始单调递增的。

9、kafka的事务(了解)

Kafka的事务不同于Rocketmq,Rocketmq是保障本地事务(比如数据库)与mq消息发送的事务一致性,Kafka的事务主要是保障一次发送多条消息的事务一致性(要么同时成功要么同时失败),一般在kafka的流式计算场景用得多一点,比如,kafka需要对一个topic里的消息做不同的流式计算处理,处理完分别发到不同的topic里,这些topic分别被不同的下游系统消费(比如hbase,redis,es等),这种我们肯定希望系统发送到多个topic的数据保持事务一致性。Kafka要实现类似Rocketmq的分布式事务需要额外开发功能。

Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("transactional.id", "my-transactional-id");Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());//初始化事务producer.initTransactions();try {//开启事务producer.beginTransaction();for (int i = 0; i < 100; i++){//发到不同的主题的不同分区producer.send(new ProducerRecord<>("hdfs-topic", Integer.toString(i), Integer.toString(i)));producer.send(new ProducerRecord<>("es-topic", Integer.toString(i), Integer.toString(i)));producer.send(new ProducerRecord<>("redis-topic", Integer.toString(i), Integer.toString(i)));}//提交事务producer.commitTransaction();} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {// We can't recover from these exceptions, so our only option is to close the producer and exit.producer.close();} catch (KafkaException e) {// For all other exceptions, just abort the transaction and try again.//回滚事务producer.abortTransaction();}producer.close();

10、kafka高性能的原因

  • 磁盘顺序读写:kafka消息不能修改以及不会从文件中间删除保证了磁盘顺序读,kafka的消息写入文件都是追加在文件末尾,不会写入文件中的某个位置(随机写)保证了磁盘顺序写。
  • 数据传输的零拷贝
  • 读写数据的批量batch处理以及压缩传输

数据传输零拷贝原理:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/5616.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python 逻辑回归:理论与实践

文章目录 1. 介绍1.1 什么是逻辑回归&#xff1f;1.2 逻辑回归的应用领域 2. 逻辑回归的原理2.1 Sigmoid 函数2.2 决策边界2.3 损失函数 3. 逻辑回归的实现3.1 数据准备3.2 创建逻辑回归模型3.3 模型训练3.4 模型预测3.5 模型评估 4. 可视化决策边界4.1 绘制散点图4.2 绘制决策…

基于SaaS模式的Java基层卫生健康云HIS系统源码【运维管理+运营管理+综合监管】

云HIS综合管理平台 一、模板管理 模板分为两种&#xff1a;病历模板和报表模板。模板管理是运营管理的核心组成部分&#xff0c;是基层卫生健康云中各医疗机构定制电子病历和报表的地方&#xff0c;各医疗机构可根据自身特点特色定制电子病历和报表&#xff0c;制作的电子病历…

Docker 容器生命周期:创建、启动、暂停与停止----从创建到停止多角度分析

&#x1f337;&#x1f341; 博主 libin9iOak带您 Go to New World.✨&#x1f341; &#x1f984; 个人主页——libin9iOak的博客&#x1f390; &#x1f433; 《面试题大全》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33…

代码随想录| 图论04 查并集 ●查并集理论知识 ●1971.寻找图中是否存在路径 ●684.冗余连接 ●685.冗余连接II

#查并集理论知识 并查集用处&#xff1a;解决连通性问题 将两个元素添加到一个集合中。判断两个元素在不在同一个集合 思路&#xff1a;将三个元素A&#xff0c;B&#xff0c;C &#xff08;分别是数字&#xff09;放在同一个集合&#xff0c;其实就是将三个元素连通在一起&a…

Python 算法基础篇:插入排序和希尔排序

Python 算法基础篇&#xff1a;插入排序和希尔排序 引言 1. 插入排序算法概述2. 插入排序算法实现实例1&#xff1a;插入排序 3. 希尔排序算法概述4. 希尔排序算法实现实例2&#xff1a;希尔排序 5. 插入排序与希尔排序的对比总结 引言 插入排序和希尔排序是两种常用的排序算法…

017-从零搭建微服务-系统服务(四)

写在最前 如果这个项目让你有所收获&#xff0c;记得 Star 关注哦&#xff0c;这对我是非常不错的鼓励与支持。 源码地址&#xff08;后端&#xff09;&#xff1a;https://gitee.com/csps/mingyue 源码地址&#xff08;前端&#xff09;&#xff1a;https://gitee.com/csps…

【NLP】如何使用Hugging-Face-Pipelines?

一、说明 随着最近开发的库&#xff0c;执行深度学习分析变得更加容易。其中一个库是拥抱脸。Hugging Face 是一个平台&#xff0c;可为 NLP 任务&#xff08;如文本分类、情感分析等&#xff09;提供预先训练的语言模型。 本博客将引导您了解如何使用拥抱面部管道执行 NLP 任务…

超详细图文教程:3DS Max 中创建低多边形游戏长剑模型

推荐&#xff1a; NSDT场景编辑器助你快速搭建可二次开发的3D应用场景 在此&#xff0c;由两部分组成的教程的第一部分中&#xff0c;我将向您展示如何&#xff1a; 对剑柄进行建模剑的护手模型剑刃建模 1. 如何制作剑柄 步骤 1 在本教程中使用正交视图。要更改视图&#x…

AI时代带来的图片造假危机,该如何解决

一、前言 当今&#xff0c;图片造假问题非常泛滥&#xff0c;已经成为现代社会中一个严峻的问题。随着AI技术不断的发展&#xff0c;人们可以轻松地通过图像编辑和AI智能生成来篡改和伪造图片&#xff0c;使其看起来真实而难以辨别&#xff0c;之前就看到过一对硕士夫妻为了骗…

Flink-端到端精确一次(End-To-End Exactly-Once)

1.总结 目的&#xff1a;想要在故障恢复后不丢数据 输入端 保证可以重复发送数据如果是kafka&#xff0c;Flink负责维护offset&#xff0c;不用kafka维护设置kafka的隔离级别为&#xff1a;读已提交flink 开启检查点采用对齐或者不对齐的精确一次输出端 kafka 幂等事务两阶段…

一文了解Python中的while循环语句

目录 &#x1f969;循环语句是什么 &#x1f969;while循环 &#x1f969;遍历猜数字 &#x1f969;while循环嵌套 &#x1f969;while循环嵌套案例 &#x1f990;博客主页&#xff1a;大虾好吃吗的博客 &#x1f990;专栏地址&#xff1a;Python从入门到精通专栏 循环语句是什…

Mysql表锁与行锁

Mysql锁实战 前言&#xff1a;什么是锁一&#xff1a;全局锁1.1 概念1.2 作用1.3 使用1.4 特点 二&#xff1a;表级锁2.1 概念2.2 分类2.2.1 表锁2.2.2 元数据锁 MDL2.2.3 意向锁 三&#xff1a;行级锁3.1 行锁(Record Lock)3.2 间隙锁(Gap Lock)3.3 临键锁(Next-Key Lock): 四…

C# 委托详解

一.委托的概念 C#中委托也叫代理&#xff0c;委托提供了后期绑定机制(官方解释)&#xff0c;功能类似于C中的函数指针&#xff0c;它存储的就是一系列具有相同签名和返回类型的方法的地址&#xff0c;调用委托的时候&#xff0c;它所包含的所有方法都会被执行。 二.委托的用法…

自然语言处理基础详解入门

1、自然语言的概念 自然语言是指人类社会约定俗成的&#xff0c;并且区别于人工语言&#xff08;如计算机程序&#xff09;的语言&#xff0c;&#xff0c;是自然而然的随着人类社会发展演变而来的语言&#xff0c;它是人类学习生活的重要工具。 2、自然语言处理概述 自然语言…

Redis【实践篇】之RedisTemplate基本操作

Redis 从入门到精通【应用篇】之RedisTemplate详解 文章目录 Redis 从入门到精通【应用篇】之RedisTemplate详解0. 前言1. RedisTemplate 方法1. 设置RedisTemplate的序列化方式2. RedisTemplate的基本操作 2. 源码浅析2.1. 构造方法2.2. 序列化方式2.3. RedisTemplate的操作方…

【数据可视化】基于Python和Echarts的中国经济发展与人口变化可视化大屏

1.题目要求 本次课程设计要求使用Python和ECharts实现数据可视化大屏。要求每个人的数据集不同&#xff0c;用ECharts制作Dashboard&#xff08;总共至少4图&#xff09;&#xff0c;要求输入查询项&#xff08;地点和时间&#xff09;可查询数据&#xff0c;查询的数据的地理…

Stable Diffusion如何生成高质量的图-prompt写法介绍

文章目录 Stable Diffusion使用尝试下效果prompt的编写技巧prompt 和 negative promptPrompt格式Prompt规则细节优化Guidance Scale 总结 Stable Diffusion Stable Diffusion是一个开源的图像生成AI系统,由Anthropic公司开发。它基于 Transformer模型架构,可以通过文字描述生成…

Asp.net Core配置CORS 跨域无效(记录一下)

问题 学习老杨的英语网站项目&#xff0c;运行项目时&#xff0c;发现出现了跨域的问题。 然后自己建一项目&#xff0c;进行配置&#xff0c;测试&#xff0c;发现配置CORS 跨域时&#xff0c;发现跨域的配置无效&#xff0c;依旧报错。 解决 网上找了一天&#xff0c;然后…

USG6000v防火墙的基本使用:制定安全策略让不同安全区域的设备进行访问

目录 一、首先配置环境&#xff1a; 二、实验拓扑及说明 拓扑&#xff1a; PC1和PC2配置ip地址&#xff1a;​编辑​编辑 r4路由器配置ip&#xff1a; 进行防火墙的设置&#xff1a; 1、创建trust1区域和untrust1区域 2、制定防火墙的策略&#xff1a; 3、为防火墙增加可以…

hive常用函数

行列转换 create table tmp_summer1(id string,name string brith string);insert into tmp_summer1 values(001,A,20211202); insert into tmp_summer1 values(001,B,20211202); insert into tmp_summer1 values(002,A,20211202); insert into tmp_summer1 values(001,B,20211…