Kafka入门及生产者详解

1. Kafka定义

传统定义:分布式的、基于发布/订阅模式消息队列,主要用于大数据实时处理领域。发布/订阅模式中,发布者不会直接将消息发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接受感兴趣的消息。

官网最新定义:开源的分布式事件流平台(Event Streaming Platform),用于高性能数据管道、流分析、数据集成。

2. 消息队列的应用场景及模式

传统的消息队列的主要应用场景包括:缓冲/消峰、解耦、异步通信

缓冲/消峰:有助于控制和优化数据流的速度,解决生产消息和消费消息的速度不一致的问题。

解耦:允许独立修改和扩展两边的处理过程,只需确保他们遵守相同的接口约束。

此时消息队列类似于一个超时,数据源是商品生产厂商,目的地是消费者,消费者无需跟各大厂商来往,而是去超市购物。

异步通信:允许用户把一个消息放入队列,不立即处理,再在需要的时候去处理。(比如发送验证码)

消息队列的两种模式:

1)点对点:消费者主动拉取数据,消息收到后清除消息

2)发布/订阅模式:有多个Topic主题;消费者消费完数据后,不删除数据,数据仍可以被其他消费者消费;每个消费者相互独立

3. Kafka基础架构

1)一个topic可以有多个分区,broker为服务器,即一份数据分为多个分区放在多个服务器

2)数据分为多块,消费者也有多个,组成一个消费者组,组内每个成员并行消费不同的分区

3)分区也有副本,不过分HDFS的副本有区别,HDFS的副本是相等的,而Kafka里的副本只有Leader的才能起作用,Follower的副本不能消费(除非Leader挂了,Follower成为Leader)

4)ZK里保存了Kafka的服务器id信息,以及每个topic的各个分区的Leader是哪个服务器,以及isr队列

4. Kafka命令行操作快速入门

针对Kafka基础架构的三大部分,分别有不同的脚本命令来操作。

生产者:kafka-console-producer.sh;

集群:kafka-topics.sh;

消费者:kafka-console-consumer.sh

1)kafka-topics.sh的命令参数如下

创建topic,1个分区,3个副本,并查看:

bin/kafka-topics.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first --create --partitions 1 --replication-factor 3bin/kafka-topics.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --listbin/kafka-topics.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first --describe

结果如下:

 修改分区数(分区数只能改大,不能改小)为3

bin/kafka-topics.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first --partitions 3bin/kafka-topics.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first --describe

结果如下: 

另外副本数也不能通过命令行修改

2) kafka-console-producer.sh

向指定分区发送数据

bin/kafka-console-producer.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first

3) kafka-console-consumer.sh

消费者消费指定分区的数据

bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first

再在生产端发送数据,消费者端可以收到数据,但不能收到历史数据(即生产者在消费者起来之前发送的数据),要想消费历史数据,加上参数:--from-beginning

bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first --from-beginning

5. 生产者异步发送与同步发送

添加依赖:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version>
</dependency>

创建Kafka生产者对象:

Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRP_SERVERS_CONFIG, "hadoop102:9092");// 指定key和value的序列化类型
// StringSerializer.class.getName()相当于StringSerializer的全路径名称
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

发送数据:

// new ProducerRecord第一个参数是topic,第二个参数是值(key为null)
kafkaProducer.send(new ProducerRecord<>("first", "value"));

ProducerRecord的多个构造函数:

关闭资源:

kafkaProducer.close();

发送数据也可以带回调函数,返回主题、分区等信息:

kafkaProducer.send(new ProducerRecord<>("first", "value"), new CallBack() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("topic: " + metadata.topic() + ", partition: " + metadata.partition());}}
});

同步发送只需在send方法之后加上get方法:

// new ProducerRecord第一个参数是topic,第二个参数是值(key为null)
kafkaProducer.send(new ProducerRecord<>("first", "value")).get();

6. 生产者分区策略 

生产者的默认分区器:DefaultPatitioner,即如果指定分区,就发送到指定分区;如果没指定分区,指定了key,则将key的哈希值对分区数取模得到分区;如果也没指定key,选择粘性分区(sticky partition),即随机选取一个分区,本批次数据满了或者linger.ms时间到了,再次选择另一个分区。

自定义分区,主要是实现Partitioner接口,重写其中的partition方法:

@Overrride
public int partition(String topic, Object key, byte[] keybytes, Object value, byte[] valuebytes, Cluster cluster) {String valueStr = value.toString();if (valueStr.contains("xxx")) {return 0;} else if (valueStr.contains("zzz")) {return 1;} else {return 2;}
}

配置关联自定义分区器:

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());

 7. 如何提高吞吐量

batch.size:每批次数据大小,数据量达到这个值,就开始发送,默认为16K

linger.ms:等待时间,如果到了这个时间,无论数据量多大,立即发送,默认为0

如果linger.ms设置为0,意味着一旦有数据来就立马发送,这样效率并不高,所以适当提高linger.ms有利于提高吞吐量,但是不能太大,这样会造成较大的数据延迟。

也可以发送数据的过程中采用数据压缩(snappy)的方式,来提高实际发送的数据量。

还可以修改缓冲区大小RecordAccumulator

设置缓冲区大小:

properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 32M

设置批次大小:

properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 16K

设置linger.ms:

properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

设置压缩格式:

properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

8. 数据可靠性

生产者发送给Kakfa集群,会收到如下几种应答:

1)ack = 0:不需要等待数据落盘,可靠性最差,存在丢数风险,一般不会用这种模式

2)ack = 1:需要Leader收到数据并进行落盘,也有丢数风险,比如Leader刚应答完就挂了,还没来得及同步数据给Follower

3)ack = -1/all,需要Leader和isr队列中所有节点收到数据并进行落盘,可靠性最好,但是数据可能会重复。

所谓ISR队列,就是和Leader保持同步的Leader+Follower的集合,例如:leader:0; isr: 0,1。如果某个Follower长时间未与Leader通信,该Follower就会被提出isr队列,这样就不会出现Leader长期等待某个故障Follower节点的问题。

ack = -1,如果分区副本数为1,或者isr队列里只有一个节点,则与ack=1效果一样,仍有丢数风险。

数据完全可靠 = (ACK = -1)+ (分区副本数 >= 2) + (ISR队列里节点数 >= 2)

代码配置:

properties.put(ProducerConfig.ACK_CONFIG, -1);
//重试次数,默认为int最大值
properties.put(ProducerConfig.RETRIES_CONFIG, 3);

为解决ack = -1时的数据重复性问题,kafka引入了幂等性和事务的概念。所谓幂等性,就是Producer无论向broker发送多少次重复数据,broker都只会持久化一条。

精确依次(Exactly Once) = 幂等性 + 数据完全可靠

重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,broker只会持久化一条。PID是每次Kafka重启时会分配一个新的,Partition表示分区号,SeqNumber是单调递增的。所以能保证单分区单次会话数据不重复。

开启幂等性,只需将enable.idempodence设为true即可(默认就是true)。

Kafka事务原理:

 使用事务发送数据:

properties.put(ProducerConfig.TRANSACTION_ID_CONFIG, "01");KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);kafkaProducer.initTransactions();
kafkaProducer.beginTransactions();
try {kafkaProducer.send(new ProducerRecord<>("first", "value"));kafkaProducer.commitTransactions();
} catch (Exception e) {kafkaProducer.abortTransactions();
} finnaly {kafkaProducer.close();
}

9. 数据乱序

Kafka生产者发送数据给broker,每个broker默认缓存5个请求,如果其中一个请求发送失败,不影响后面请求发送,加入失败的请求后来又重试成功了,那么broker收到的数据会是乱序的。只需将max.in.flight.requests.per.connection设置小于等于5,broker就会自动排序。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/731506.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux系统Docker部署DbGate并结合内网穿透实现公网管理本地数据库

&#x1f308;个人主页: Aileen_0v0 &#x1f525;热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法|MySQL| ​&#x1f4ab;个人格言:“没有罗马,那就自己创造罗马~” #mermaid-svg-66GkyG9g7oNq7tl8 {font-family:"trebuchet ms",verdana,arial,sans-serif;f…

基于yolov5的草莓成长期检测系统,可进行图像目标检测,也可进行视屏和摄像检测(pytorch框架)【python源码+UI界面+功能源码详解】

功能演示&#xff1a; 基于yolov5的草莓成长期检测系统&#xff0c;支持图像检测&#xff0c;视频检测和实时摄像检测功能_哔哩哔哩_bilibili &#xff08;一&#xff09;简介 基于yolov5的草莓成长期检测系统是在pytorch框架下实现的&#xff0c;成长期分为3类&#xff1a;…

加密与安全_使用Java代码操作RSA算法生成的密钥对

文章目录 Pre概述什么是非对称加密算法&#xff1f;如何工作&#xff1f;示例&#xff1a;RSA算法特点和优势ECC&#xff1a;另一种非对称加密算法 Code生成公钥和私钥私钥加密私钥加密私钥解密 ( 行不通 )私钥加密公钥解密公钥加密和公钥解密 &#xff08;行不通&#xff09;保…

字符指针变量

在指针的类型中我们知道有⼀种指针类型为字符指针 char* 1.一般使用方法如下&#xff1a; int main() { char ch w; char *pc &ch; *pc w; return 0; } 2.还有⼀种使⽤⽅式如下&#xff1a; int main() { const char* pstr "hello world."; printf("…

吴恩达深度学习笔记:深度学习引言1.1-1.5

目录 第一门课&#xff1a;神经网络和深度学习 (Neural Networks and Deep Learning)第一周&#xff1a;深度学习引言(Introduction to Deep Learning)1.1 欢迎(Welcome)1.2 什么是神经网络&#xff1f;(What is a Neural Network)1.3 神经网络的监督学习(Supervised Learning …

Linux系统——Haproxy高性能负载均衡软件

目录 一、Haproxy介绍 1.Haproxy定义 2.Haproxy主要特性 二、安装Haproxy 1.yum安装 2.第三方rpm包安装 3.编译安装 3.1解决Lua环境 3.2编译安装Haproxy 三、配置文件详解 1.状态页 2.日志管理 2.1定义日志到其他主机站点 3.指定进程线程个数 4.cpu亲缘性 5.多进…

【IP178G】分享一个IP178G的8口交换机电路图

前提 IP178G是一个集成度非常高的8口交换机芯片&#xff0c;还可以通过外部EEPROM配置交换机芯片实现一些简单的网管功能**(VLAN/端口镜像etc)**&#xff0c;拿来作为一个网络调试工具来使用非常方便&#xff0c;一个TYPE-C的USB口的线就可以供电(5V USB就可以带的了)

职场成功的秘诀:如何高效管理时间

在职场中&#xff0c;时间管理是一项至关重要的技能。高效的时间管理不仅能够提高工作效率&#xff0c;还能够帮助我们更好地平衡工作与生活&#xff0c;实现职场成功。本文将分享一些职场成功人士都在使用的时间管理秘诀&#xff0c;帮助你更好地管理时间&#xff0c;提升职场…

小红书brief怎么创作?品牌营销技巧

在小红书平台进行品牌传播&#xff0c;首先需要具备的能力&#xff0c;就是与达人进行沟通。那么就一定要学会写小红书brief。今天我们和大家分享一下小红书brief怎么创作&#xff1f;品牌营销技巧&#xff01; 一、什么是小红书brief brief是工作简报的意思。小红书brief是品牌…

基于Docker搭建Maven私服仓库(Linux)详细教程

文章目录 1. 下载镜像并启动容器2. 配置Nexus3. 配置本地Maven仓库 1. 下载镜像并启动容器 下载Nexus3镜像 docker pull sonatype/nexus3查看Nexus3镜像是否下载成功 docker images创建Nexus3的挂载文件夹 mkdir /usr/local/nexus-data && chown -R 200 /usr/local…

07.axios封装实例

一.简易axios封装-获取省份列表 1. 需求&#xff1a;基于 Promise 和 XHR 封装 myAxios 函数&#xff0c;获取省份列表展示到页面 2. 核心语法&#xff1a; function myAxios(config) {return new Promise((resolve, reject) > {// XHR 请求// 调用成功/失败的处理程序}) …

微服务韧性工程:利用Sentinel实施有效服务容错与限流降级

目录 一、雪崩效应 二、Sentinel 服务容错 2.1 Sentinel容错思路 2.2 内部异常兼容 2.3 外部流量控制 三、Sentinel 项目搭建 四、Sentinel 工作原理 服务容错是微服务设计中一项重要原则和技术手段&#xff0c;主要目标是在服务出现故障、网络波动或其他不可预见的异常情况…

刷题日记——约数的个数KY3

分析 用例的0超过9个&#xff0c;需要使用long long&#xff0c;为了保险&#xff0c;我用的是unsigned long long判断约数有这样的规律&#xff1a;任何正整数a&#xff0c;如果存在约数对<m,n>&#xff0c;即amn&#xff0c;设mmin{m,n},nmax{m,n}&#xff0c;即设m是…

数据库备份.....

一.环境准备 数据库备份&#xff0c;数据库为school&#xff0c;素材如下 >create database school; >use school1.创建student和score表CREATE TABLE student ( id INT(10) NOT NULL UNIQUE PRIMARY KEY , name VARCHAR(20) NOT NULL , sex VARCHAR(4) , bi…

【论文阅读】Mamba:选择状态空间模型的线性时间序列建模(一)

文章目录 Mamba:选择状态空间模型的线性时间序列建模介绍状态序列模型选择性状态空间模型动机&#xff1a;选择作为一种压缩手段用选择性提升SSM 选择性SSM的高效实现先前模型的动机选择扫描总览&#xff1a;硬件感知状态扩展 Mamba论文 Mamba:选择状态空间模型的线性时间序列建…

SaulLM-7B: A pioneering Large Language Model for Law

SaulLM-7B: A pioneering Large Language Model for Law 相关链接&#xff1a;arxiv 关键字&#xff1a;Large Language Model、Legal Domain、SaulLM-7B、Instructional Fine-tuning、Legal Corpora 摘要 本文中&#xff0c;我们介绍了SaulLM-7B&#xff0c;这是为法律领域量…

服务器-->网站制作-->接口开发,一篇文章一条龙服务(1)

作者&#xff1a;q: 1416279170v: lyj_txd前述&#xff1a;本人非专业&#xff0c;兴趣爱好自学自研&#xff0c;很多没有说清楚的地方见谅&#xff0c;欢迎一起讨论的小伙伴~ 概述 不想看概述&#xff0c;直接点击服务器部分 三者之间的关系 服务器、网站制作、接口开发这…

Redis进阶--一篇文章带你走出Redis

目录 什么是Redis?? Redis有哪些使用场景? Redis是单线程还是多线程? 为什么Redis是单线程速度还是很快?? Redis持久化 RDB机制:(Redis DataBase) [是redis中默认的持久化方式] AOF机制:(Append Only File) Redis和MySQL如何保持数据一致????…

NodeJS实现堆排序算法

NodeJS实现堆排序算法 以下是使用 Node.js 实现堆排序算法的示例代码&#xff1a; // 堆排序函数 function heapSort(arr) {// 构建最大堆buildMaxHeap(arr);// 依次取出最大堆的根节点&#xff08;最大值&#xff09;&#xff0c;并调整堆结构for (let i arr.length - 1; i…

STM32 SDRAM知识点

1.SDRAM和SRAM的区别 SRAM不需要刷新电路即能保存它内部存储的数据。而SDRAM&#xff08;Dynamic Random Access Memory&#xff09;每隔一段时间&#xff0c;要刷新充电一次&#xff0c;否则内部的数据即会消失&#xff0c;因此SRAM具有较高的性能&#xff0c;但是SRAM也有它…