【Kafka】Kafka提高生产者吞吐量、数据可靠性-06

【Kafka】Kafka提高生产者吞吐量-06

  • 1. 提高生产者吞吐量
  • 2.数据可靠性
    • 2.1 回顾数据的发送流程
    • 2.2 ack应答级别
      • 2.2.1 acks:0
      • 2.2.2 acks:1
      • 2.2.2 acks:-1(all)
        • 2.2.2.1 数据可靠性分析
        • 2.2.2.2 数据完全可靠
    • 2.3 可靠性总结
    • 2.4 可靠性代码配置

1. 提高生产者吞吐量

在这里插入图片描述

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducerParameters {public static void main(String[] args) throws InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// batch.size:批次大小,默认 16Kproperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// linger.ms:等待时间,默认 0properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// RecordAccumulator:缓冲区大小,默认 32M:buffer.memoryproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstdproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = newKafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(newProducerRecord<>("first", "atguigu " + i));}// 5. 关闭资源kafkaProducer.close();}
}

测试:
①在 hadoop102 上开启 Kafka 消费者

[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
atguigu 0
atguigu 1
atguigu 2
atguigu 3
atguigu 4

2.数据可靠性

2.1 回顾数据的发送流程

在这里插入图片描述

2.2 ack应答级别

在这里插入图片描述

2.2.1 acks:0

在这里插入图片描述
当应答级别为0时,生产者发送过来的数据,不需要等数据落盘应答。
工作:生产者将数据发送到leader中,如果leader突然挂掉了,leader还没有与follower同步,那么整个数据就全部都丢了。

2.2.2 acks:1

在这里插入图片描述
当应答级别为1时,生产者发送过来的数据,Leader收到数据后应答。

工作:如果应答完毕之后,leader还未与follower同步,leader挂了,新的leader会产生,原来的一条数据不会再次发送,造成了数据的丢失。

2.2.2 acks:-1(all)

在这里插入图片描述
当应答级别为-1时,生产者发送过来的数据,Leader+和isr队列
里面的所有节点收齐数据后应答。-1和all等价。

工作:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader同步,那这个问题怎么解决呢?

Leader维护了一个动态的in-sync replica set (ISR),意为和Leader保持问步的Folower + Leader集合(leader:0, isr:0,1,2)

如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。例如2超时,(leader:0, isr:0,1)。

这样就不用等长期联系不上或者己经故障的节点。

2.2.2.1 数据可靠性分析

如果分区副本设置为1个,或者ISR里应答的最小副本数量(min.insync.replicas 默认为1)设置为1,和ack=1的效果是一样的,仍然有丢数的风险(leader:0,isr:0)


2.2.2.2 数据完全可靠

数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

2.3 可靠性总结

可靠性总结:

  1. acks=0,生产者发送过来数据就不管了,可靠性差,效率高

  2. acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等

  3. acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低

  4. 在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。

2.4 可靠性代码配置

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducerAck {public static void main(String[] args) throwsInterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置 acksproperties.put(ProducerConfig.ACKS_CONFIG, "all");// 重试次数 retries,默认是 int 最大值,2147483647properties.put(ProducerConfig.RETRIES_CONFIG, 3);// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i));}// 5. 关闭资源kafkaProducer.close();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/28200.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

科技云报道:“元年”之后,生成式AI将走向何方?

科技云报道原创。 近两年&#xff0c;以大模型为代表的生成式AI技术&#xff0c;成为引爆数字原生最重要的技术奇点&#xff0c;人们见证了各类文生应用的进展速度。Gartner预测&#xff0c;到2026年&#xff0c;超过80%的企业将使用生成式AI的API或模型&#xff0c;或在生产环…

「实战应用」如何用图表控件LightningChart JS创建SQL仪表板应用(二)

LightningChart JS是Web上性能特高的图表库&#xff0c;具有出色的执行性能 - 使用高数据速率同时监控数十个数据源。 GPU加速和WebGL渲染确保您的设备的图形处理器得到有效利用&#xff0c;从而实现高刷新率和流畅的动画&#xff0c;常用于贸易&#xff0c;工程&#xff0c;航…

OpenCore 引导完美升级

备份原有 OC (做好回滚的准备下载新版 OpenCore https://github.com/acidanthera/OpenCorePkg/releases将 1, 3, 4 里面的文件使用新版进行替换 4 里面的文件严格来说并不需要, 只是留着方便使用不追求完美到这就可以收工了将 OC 复制到 U 盘 EFI U 盘格式化可以使用: diskutil…

微服务开发与实战Day09 - Elasticsearch

一、DSL查询 Elasticsearch提供了DSL&#xff08;Domain Specific Language&#xff09;查询&#xff0c;就是以JSON格式来定义查询条件。类似这样&#xff1a; DSL查询可以分为两大类&#xff1a; 叶子查询&#xff08;Leaf query clauses&#xff09;&#xff1a;一般是在特…

Docker Jenkins(改错版本)

Devops:它强调开发(Development)和运维(Operations)团队之间的协作.实现更快,更可靠的软件交付部署. JenKins是一个开源的自动化服务器,广泛用于构建,测试和部署软件项目.它是持续集成(CI)和持续交付/部署(CD)的工具.JenKins是实现DevOps实践的重要工具. 前端项目部署一般流程:…

Matlab|基于V图的配电网电动汽车充电站选址定容-可视化

1主要内容 基于粒子群算法的电动汽车充电站和光伏最优选址和定容 关键词&#xff1a;选址定容 电动汽车 充电站位置 仿真平台&#xff1a;MATLAB 主要内容&#xff1a;代码主要做的是一个电动汽车充电站和分布式光伏的选址定容问题&#xff0c;提出了能够计及地理因素和服…

蓝队-溯源技巧

溯源技巧 大致思想 通常情况下&#xff0c;接到溯源任务时&#xff0c;获得的信息如下 攻击时间 攻击 IP 预警平台 攻击类型 恶意文件 受攻击域名/IP其中攻击 IP、攻击类型、恶意文件、攻击详情是溯源入手的点。 通过攻击类型分析攻击详情的请求包&#xff0c;看有没有攻击者…

实在智能应邀出席中国移动科技工作者论坛,分享基于大模型+Agent智能体的“企业大脑”

为大力弘扬科学家精神&#xff0c;激励广大科技工作者践行科技报国、创新为民&#xff0c;争做高水平科技自立自强排头兵&#xff0c;6月6日&#xff0c;中国移动在线营销服务中心&#xff08;以下简称“在线中心”&#xff09;“2024年科技工作者大讲堂暨青年创新创效论坛”于…

Matlab|基于手肘法的kmeans聚类数的精确识别【K-means聚类】

主要内容 在电力系统调度研究过程中&#xff0c;由于全年涉及的风、光和负荷曲线较多&#xff0c;为了分析出典型场景&#xff0c;很多时候就用到聚类算法&#xff0c;而K-means聚类就是常用到聚类算法&#xff0c;但是对于K-means聚类算法&#xff0c;需要自行指定分类数&…

C/C++:指针用法详解

C/C&#xff1a;指针 指针概念 指针变量也是一个变量 指针存放的内容是一个地址&#xff0c;该地址指向一块内存空间 指针是一种数据类型 指针变量定义 内存最小单位&#xff1a;BYTE字节&#xff08;比特&#xff09; 对于内存&#xff0c;每个BYTE都有一个唯一不同的编号…

赶紧转行大模型,预计风口就今年一年,明年市场就饱和了!不是开玩笑

恕我直言&#xff0c;就这几天&#xff0c;各大厂都在裁员&#xff0c;什么开发测试运维都裁&#xff0c;只有大模型是急招人。 你说你不知道大模型是什么&#xff1f;那可太对了&#xff0c;你不知道说明别人也不知道&#xff0c;就是要趁只有业内部分人知道的时候入局&#…

String常用方法详解

auth&#xff1a;别晃我的可乐 date&#xff1a;2024年06月16日 比较大小 equals(Object obj): 用于比较字符串内容是否相等。compareTo(String anotherString): 按字典顺序比较两个字符串。 String str1 "hello"; String str2 "world";boolean isEqual …

配置Linux DNS服务器作为自己的windows 的 DNS服务器和 配置遇到的问题

安装DNS 库 和 DNS工具&#xff1a; # bind 是用于创建 dns服务的&#xff0c; bind-utils是用于测试DNS服务的工具 yum -y install bind bind-utils配置主配置文件&#xff1a; # 下载好后就已经有DNS服务&#xff0c;但是需要你自己去配置DNS服务信息# 配置主配置文件 [rootl…

SylixOS下UDP组播测试程序

SylixOS下UDP组播测试 测试效果截图如下: udp组播发送测试程序。 /********************************************************************************************************* ** ** 中国软件开源组织 ** ** …

vue的安装配置并创建项目

npm 工具的安装 安装node.js之后&#xff0c;npm工具会自动安装到系统环境中 网址:https://nodejs.org/en vue的安装 安装vue并创建项目 npm create vuelatest 进入项目之后&#xff0c;然后启动npm run dev 解决方法&#xff1a; npm install -g create-vite 再次启动 通…

13.泛型、trait和生命周期(下)

目录 6. 生命周期与引用有效性6.1 生命周期引入6.2 得到长度最大的String值6.3 生命周期标注语法1&#xff09;说明2&#xff09;普通标注示例3&#xff09;函数参数中的生命周期标注 6.4 深入理解生命周期6.5 结构体定义中的生命周期标注6.6 生命周期省略 6.7 方法定义中的生命…

应急响应 | 基本技能 | 01-系统排查

系统排查 目录 系统基本信息 Windows系统Linux系统 用户信息 Windows系统 1、命令行方式2、图形界面方法3、注册表方法4、wmic方法 Linux系统 查看所有用户信息分析超级权限账户查看可登录的用户查看用户错误的登录信息查看所有用户最后的登录信息查看用户最近登录信息查看当…

机器学习周报第46周

目录 摘要Abstract一、文献阅读1.1 摘要1.2 研究背景1.3 论文方法1.4 模块分析1.5 网络规格1.6 高效的端到端对象检测1.7 mobile former模块代码 目录 摘要Abstract一、文献阅读1.1 摘要1.2 研究背景1.3 论文方法1.4 模块分析1.5 网络规格1.6 高效的端到端对象检测1.7 mobile f…

C++ 58 之 计算器案例

虚函数,vitual function C动态多态性是通过虚函数来实现的&#xff0c;虚函数允许子类&#xff08;派生类&#xff09;重新定义父类&#xff08;基类&#xff09;成员函数&#xff0c;而子类&#xff08;派生类&#xff09;重新定义父类&#xff08;基类&#xff09;虚函数的做…

day38-39| 509. 斐波那契数 70. 爬楼梯 746. 使用最小花费爬楼梯 62.不同路径 343. 整数拆分 96.不同的二叉搜索树

文章目录 前言动态规划理论基础509. 斐波那契数思路方法一 完整动态规划方法二 dp简化版方法三 使用递归 70. 爬楼梯思路方法一 动态规划方法一2 教程里面的简化方法方法二 拓展 746. 使用最小花费爬楼梯思路方法一方法二 拓展 62.不同路径思路 动态规划方法一方法二 递归 63. …