【Kafka】Kafka提高生产者吞吐量、数据可靠性-06

【Kafka】Kafka提高生产者吞吐量-06

  • 1. 提高生产者吞吐量
  • 2.数据可靠性
    • 2.1 回顾数据的发送流程
    • 2.2 ack应答级别
      • 2.2.1 acks:0
      • 2.2.2 acks:1
      • 2.2.2 acks:-1(all)
        • 2.2.2.1 数据可靠性分析
        • 2.2.2.2 数据完全可靠
    • 2.3 可靠性总结
    • 2.4 可靠性代码配置

1. 提高生产者吞吐量

在这里插入图片描述

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducerParameters {public static void main(String[] args) throws InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// batch.size:批次大小,默认 16Kproperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// linger.ms:等待时间,默认 0properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// RecordAccumulator:缓冲区大小,默认 32M:buffer.memoryproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstdproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = newKafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(newProducerRecord<>("first", "atguigu " + i));}// 5. 关闭资源kafkaProducer.close();}
}

测试:
①在 hadoop102 上开启 Kafka 消费者

[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
atguigu 0
atguigu 1
atguigu 2
atguigu 3
atguigu 4

2.数据可靠性

2.1 回顾数据的发送流程

在这里插入图片描述

2.2 ack应答级别

在这里插入图片描述

2.2.1 acks:0

在这里插入图片描述
当应答级别为0时,生产者发送过来的数据,不需要等数据落盘应答。
工作:生产者将数据发送到leader中,如果leader突然挂掉了,leader还没有与follower同步,那么整个数据就全部都丢了。

2.2.2 acks:1

在这里插入图片描述
当应答级别为1时,生产者发送过来的数据,Leader收到数据后应答。

工作:如果应答完毕之后,leader还未与follower同步,leader挂了,新的leader会产生,原来的一条数据不会再次发送,造成了数据的丢失。

2.2.2 acks:-1(all)

在这里插入图片描述
当应答级别为-1时,生产者发送过来的数据,Leader+和isr队列
里面的所有节点收齐数据后应答。-1和all等价。

工作:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader同步,那这个问题怎么解决呢?

Leader维护了一个动态的in-sync replica set (ISR),意为和Leader保持问步的Folower + Leader集合(leader:0, isr:0,1,2)

如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。例如2超时,(leader:0, isr:0,1)。

这样就不用等长期联系不上或者己经故障的节点。

2.2.2.1 数据可靠性分析

如果分区副本设置为1个,或者ISR里应答的最小副本数量(min.insync.replicas 默认为1)设置为1,和ack=1的效果是一样的,仍然有丢数的风险(leader:0,isr:0)


2.2.2.2 数据完全可靠

数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

2.3 可靠性总结

可靠性总结:

  1. acks=0,生产者发送过来数据就不管了,可靠性差,效率高

  2. acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等

  3. acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低

  4. 在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。

2.4 可靠性代码配置

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducerAck {public static void main(String[] args) throwsInterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置 acksproperties.put(ProducerConfig.ACKS_CONFIG, "all");// 重试次数 retries,默认是 int 最大值,2147483647properties.put(ProducerConfig.RETRIES_CONFIG, 3);// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i));}// 5. 关闭资源kafkaProducer.close();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/28200.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

科技云报道:“元年”之后,生成式AI将走向何方?

科技云报道原创。 近两年&#xff0c;以大模型为代表的生成式AI技术&#xff0c;成为引爆数字原生最重要的技术奇点&#xff0c;人们见证了各类文生应用的进展速度。Gartner预测&#xff0c;到2026年&#xff0c;超过80%的企业将使用生成式AI的API或模型&#xff0c;或在生产环…

LeetCode 第402场周赛个人题解

目录 100304. 构成整天的下标对数目 I 原题链接 思路分析 AC代码 100301. 构成整天的下标对数目 II 原题链接 思路分析 AC代码 100316. 施咒的最大总伤害 原题链接 思路分析 AC代码 100317. 数组中的峰值 原题链接 思路分析 AC代码 100304. 构成整天的下…

File API以及相关概念

一、Blob Blob 对象表示一个不可变、原始数据的类文件对象。它的数据可以按文本或二进制的格式进行读取&#xff0c;也可以转换成 ​ReadableStream 来用于数据操作。 构造方法 new Blob(blobParts) new Blob(blobParts,options) blobParts是一个可迭代对象&#xff08;就是能…

深度学习:PyCharm中运行Bash脚本

GitHub上的开源代码有很多是用 Bash 脚本来自动化数据处理、模型训练和模型评估等任务的&#xff0c;如何使用PyCharm来运行Bash脚本&#xff0c;从而快速上手GitHub开源代码&#xff0c;是一个实用的技巧&#xff0c;本文主要介绍PyCharm中运行Bash脚本的方法。 PyCharm 是一个…

「实战应用」如何用图表控件LightningChart JS创建SQL仪表板应用(二)

LightningChart JS是Web上性能特高的图表库&#xff0c;具有出色的执行性能 - 使用高数据速率同时监控数十个数据源。 GPU加速和WebGL渲染确保您的设备的图形处理器得到有效利用&#xff0c;从而实现高刷新率和流畅的动画&#xff0c;常用于贸易&#xff0c;工程&#xff0c;航…

OpenCore 引导完美升级

备份原有 OC (做好回滚的准备下载新版 OpenCore https://github.com/acidanthera/OpenCorePkg/releases将 1, 3, 4 里面的文件使用新版进行替换 4 里面的文件严格来说并不需要, 只是留着方便使用不追求完美到这就可以收工了将 OC 复制到 U 盘 EFI U 盘格式化可以使用: diskutil…

微服务开发与实战Day09 - Elasticsearch

一、DSL查询 Elasticsearch提供了DSL&#xff08;Domain Specific Language&#xff09;查询&#xff0c;就是以JSON格式来定义查询条件。类似这样&#xff1a; DSL查询可以分为两大类&#xff1a; 叶子查询&#xff08;Leaf query clauses&#xff09;&#xff1a;一般是在特…

Fortran 编程整理

Fortran编程语法整理 01 Fortran 中的程序单元1-1 submodule&#xff08;子模块&#xff09; 02 子程序与函数2-1 概述2-2 函数与子程序特性&#xff08;1&#xff09;为函数结果指定不同名称 03 stop命令后跟说明04 do concurrent4-1 do concurrent 的使用 05 纯过程&#xff…

Docker Jenkins(改错版本)

Devops:它强调开发(Development)和运维(Operations)团队之间的协作.实现更快,更可靠的软件交付部署. JenKins是一个开源的自动化服务器,广泛用于构建,测试和部署软件项目.它是持续集成(CI)和持续交付/部署(CD)的工具.JenKins是实现DevOps实践的重要工具. 前端项目部署一般流程:…

Matlab|基于V图的配电网电动汽车充电站选址定容-可视化

1主要内容 基于粒子群算法的电动汽车充电站和光伏最优选址和定容 关键词&#xff1a;选址定容 电动汽车 充电站位置 仿真平台&#xff1a;MATLAB 主要内容&#xff1a;代码主要做的是一个电动汽车充电站和分布式光伏的选址定容问题&#xff0c;提出了能够计及地理因素和服…

ubantu 计算一个文件夹内的文件数量命令

ubantu 计算一个文件夹内的文件数量命令 在Ubuntu中&#xff0c;你可以使用find命令来计算一个文件夹内的文件数量。以下是一个基本的命令示例&#xff1a; find /path/to/directory -type f | wc -l这里的/path/to/directory是你想要计算文件数量的文件夹的路径。find命令会…

蓝队-溯源技巧

溯源技巧 大致思想 通常情况下&#xff0c;接到溯源任务时&#xff0c;获得的信息如下 攻击时间 攻击 IP 预警平台 攻击类型 恶意文件 受攻击域名/IP其中攻击 IP、攻击类型、恶意文件、攻击详情是溯源入手的点。 通过攻击类型分析攻击详情的请求包&#xff0c;看有没有攻击者…

Web前端网页源代码:深入剖析与实用技巧

Web前端网页源代码&#xff1a;深入剖析与实用技巧 在Web开发的浩瀚领域中&#xff0c;前端网页源代码扮演着至关重要的角色。它不仅是网页的骨架&#xff0c;更是实现各种交互和视觉效果的基石。本文将从四个方面、五个方面、六个方面和七个方面&#xff0c;对Web前端网页源代…

C# OpenCvSharp 矩阵计算-determinant、trace、eigen、calcCovarMatrix、solve

🚀 在C#中使用OpenCvSharp库进行矩阵操作和图像处理 在C#中使用OpenCvSharp库,可以实现各种矩阵操作和图像处理功能。以下是对所列函数的详细解释和示例,包括运算过程和结果。📊✨ 1. determinant - 计算行列式 🧮 定义: double determinant(InputArray mtx); 参数…

web前端网页实例:深度剖析与实践指南

web前端网页实例&#xff1a;深度剖析与实践指南 在数字化时代的浪潮中&#xff0c;web前端网页已成为企业与用户之间的桥梁&#xff0c;承载着信息的传递与交互的重任。本文将通过四个方面、五个方面、六个方面和七个方面的详细剖析&#xff0c;带您深入了解web前端网页实例的…

实在智能应邀出席中国移动科技工作者论坛,分享基于大模型+Agent智能体的“企业大脑”

为大力弘扬科学家精神&#xff0c;激励广大科技工作者践行科技报国、创新为民&#xff0c;争做高水平科技自立自强排头兵&#xff0c;6月6日&#xff0c;中国移动在线营销服务中心&#xff08;以下简称“在线中心”&#xff09;“2024年科技工作者大讲堂暨青年创新创效论坛”于…

Matlab|基于手肘法的kmeans聚类数的精确识别【K-means聚类】

主要内容 在电力系统调度研究过程中&#xff0c;由于全年涉及的风、光和负荷曲线较多&#xff0c;为了分析出典型场景&#xff0c;很多时候就用到聚类算法&#xff0c;而K-means聚类就是常用到聚类算法&#xff0c;但是对于K-means聚类算法&#xff0c;需要自行指定分类数&…

C/C++:指针用法详解

C/C&#xff1a;指针 指针概念 指针变量也是一个变量 指针存放的内容是一个地址&#xff0c;该地址指向一块内存空间 指针是一种数据类型 指针变量定义 内存最小单位&#xff1a;BYTE字节&#xff08;比特&#xff09; 对于内存&#xff0c;每个BYTE都有一个唯一不同的编号…

赶紧转行大模型,预计风口就今年一年,明年市场就饱和了!不是开玩笑

恕我直言&#xff0c;就这几天&#xff0c;各大厂都在裁员&#xff0c;什么开发测试运维都裁&#xff0c;只有大模型是急招人。 你说你不知道大模型是什么&#xff1f;那可太对了&#xff0c;你不知道说明别人也不知道&#xff0c;就是要趁只有业内部分人知道的时候入局&#…

深入理解Python中的多线程与多进程编程

深入理解Python中的多线程与多进程编程 在现代计算中,充分利用多核处理器和并行计算资源变得越来越重要。Python提供了多线程和多进程编程的支持,使开发者能够编写高效的并行程序。本文将深入探讨Python中的多线程与多进程编程,包括基本概念、使用方法以及实际应用场景。 …