2024-02-27(Kafka)

1.Kafka中所有的消息都是保存在主题中的,要生产消息到Kafka,首先必须要创建一个主题。

2.Kafka的生产者/消费者

安装kafka集群,可以测试如下:

        创建一个topic主题(消息都是存放在topic中,类似mysql建表的过程)

        基于kafka的内置测试生产者脚本来读取标准输入(键盘输入)的数据,并放入到topic中

        基于kafka的内置测试消费者脚本来消费topic中的数据

推荐在开发中使用kafka tool

        浏览kafka集群节点,多少个topic,多少个分区

        创建topic/删除topic

        浏览Zookeeper中的数据

3.Kafka的基准测试工具

Kafka中提供了内置的性能测试工具

        生产者:测试出来每秒传输的数据量(多少条数据,多少M的数据)

        消费者:测试消费者每秒拉取的数据量

对比生产者和消费者:消费者的速度更快

4.最简单的Kafka集群图

broker

一个Kafka集群通常是由多个broker组成,这样才能实现f负载均衡,以及容错机制。

broker是无状态(Stateless)的,它们是通过Zookeeper来维护集群状态。

一个Kafka的broker每秒可以处理数十万次读写,每个broker都可以处理TB消息而不互相影响性能。

zookeeper

ZK用来管理协调broker的,并且存储了Kafka的元数据(例如:有多少个topic,partition,consumer)

ZK服务主要用于通知生产者和消费者Kafka集群中有新的broker加入,或者Kafka集群中出现故障的broker。

题外话:Kafka正在逐步想办法将ZK剥离,维护两套集群成本较高,社区提出KIP-500就是要替换掉Zookeeper的依赖。“Kafka on Kafka”-----Kafka自己来管理自己的元数据。

生产者(producer)

生产者负责将数据推送给broker的topic。

消费者(consumer)

消费者负责从broker的topic中拉取数据,并自己处理。

消费者组(consumer group)

consumer group 是kafka提供的可扩展且具有容错机制的消费者机制。

一个消费者组可以包含多个消费者。

一个消费者组有一个唯一的ID(group id)

组内的消费者一起消费主题的所有分区数据。

分区(partition)

在Kafka集群中,主题被分为多个分区。

Kafka集群的分布式就是由这个分区来实现的。一个Topic中的数据(消息)可以分布在Topic中的不同partition中。

副本(Replicas)

副本用来实现Kafka集群的容错,其实就是分区partition的容错,一个topic应该至少包含大于1个的副本

副本可以确保某个服务器出现故障时,确保数据依然可以用。

在Kafka中,一般都会设计副本的数量 > 1。

主题(Topic)

一个topic可以包含多个分区(注意:这里是大数据里面的分区的概念),如下图所示:

主题是一个逻辑概念,用于生产者发布数据,消费者消费数据。

Kafka中的主题必须要有标识符,而且是唯一的,Kafka中可以有任意数量的主题,没有数量上的限制。

在主题中的消息是有结构的,一般一个主题包含某一类消息。

一旦生产者发送消息到主题中,这些消息就不能被更新(更改)。

偏移量(offset)

offset记录着下一条要发送给Consumer的消息序号。

默认Kafka将offset存储在zookeeper中。

在一个分区中,消息是有顺序的方式存储则着,每个在分区的消费都是一个递增的id,这个就是偏移量offset。

偏移量在分区中才是最有意义的。在分区之间,offset是没有任何意义的。

5.Kafka生产者的幂等性

如上图所示:在生产者生产消息的时候,如果出现retry,有可能会一条消息被发送了多次,如果Kafka不具备幂等性,就有可能会在partition中保存多条一模一样的消息。

代码中配置幂等性:props.put("enable.idempotence",true); 

幂等性原理:

为了实现幂等性,Kafka引入了Producer ID(PID)和Sequence Number的概念:

a.PID:每个Producer在初始化时,都会分配一个唯一的PID,这个PID对用户来说,是透明的。

b.Sequence Number:针对每个生产者(对于PID)发送到指定主题分区的消息都对应一个从0开始递增的Sequence Number;

具体看下图:

综述

生产者消息重复问题:

        Kafka生产者生产消息到partition,如果直接发送消息,kafka会将消息保存到分区中,但是kafka会返回一个ack给生产者,表示当前操作是否成功,是否已经保存了这条消息。如果ack响应的过程失败了,此时生产者会重试,继续发送没有发送成功的消息,Kafka又会保存一条一模一样的消息。

在Kafka中可以开启幂等性:

        a.当Kafka的生产者生产消息时,会增加一个pid(生产者的唯一编号)和sequence number(针对消息的一个递增序列)

        b.发送消息,会连着pid和sequence number一并发送

        c.kafka收到消息,会将消息和pid,sequence number一并保存下来

        d.如果ack响应失败,生产者重试,再次发送消息时,kafka会根据pid,sequence number来判断是否需要在保存这条消息。

        e.判断条件:生产者发送过来的sequence number是否小于等于partition中消息对应的sequence number。

6.生产者分区写入策略

生产者写入消息到topic,Kafka将根据不同的策略将数据分配到不同分区中去。策略:

a.轮询分区策略(默认策略,key为null,就用这个策略)

b.随机分区策略(不用了)

c.按key分区分配策略(可能出现数据倾斜,key.hash()%分区数量)

d.自定义分区策略

7.消费者组的Rebalance机制

Kafka中的Rebalance称之为再均衡,是Kafka中确保消费者组中所有的消费者如何达成一致,分配订阅topic中每个分区的机制

Rebalance触发的时机:

1.消费者组中消费者的个数发生了变化,比如有新的消费者加入或者某个消费者停止了。

2.订阅的topic数量发生变化

3.订阅的topic分区数发生了变化

Rebalance的不良影响:

1.发生再分配(rebalance)时,消费者组下所有的消费者都会协调在一起共同参与,Kafka使用分配策略尽可能达到最公平的分配。

2.再分配过程会对消费者组产生非常严重的影响,再分配的过程中所有消费者都将停止工作,直到再分配的完成。

8.消费者的分区分配策略

目的是保证每个消费者尽量能够均衡的消费分区的数据,不能出现某个消费者消费分区的数量特别多,某个消费者消费分区的数量特别少

1.Range范围分配策略

range范围分配策略是Kafka默认的分配策略,它可以确保每个消费者消费的分区数量是均衡的。

注意:Range范围分配策略是针对每个Topic的。

算法公式:

m = 分区数量 / 消费者数量

n = 分区数量 % 消费者数量

前m个消费者消费n + 1 个

剩余消费者消费n个

2.RoundRobin轮询策略

RoundRobin轮询策略是将消费者组内所有消费者以及消费者所订阅的所有Topic的partition按照字典序排序(topic和分区的hashcode进行排序),然后通过轮询的方式逐个将分区以此分配给每个消费者。

3.Stricky粘性分配策略

Kafka 0.11x引入次策略。目的:

1)分区分配尽可能均匀

2)在发生rebalance的时候,分区的分配尽可能与上一次分配保持相同

没有发生rebalance时,Stricky粘性分配策略和RoundRobin分配策略类似。

9.副本机制

副本的目的就是冗余备份,当某个Broker上的分区数据丢失时,依然可以保障数据可用。因为在其他的Broker上的副本是可用的

生产者会不断的往Kafka中写入数据,写入数据会有一个返回结果表示是否写入成功。这里对应有一个ACKs的配置。

producer的ACKs参数:

1)acks配置为0:

        不等待broker确认,直接发送下一条数据,性能最高,但可能会存在数据丢失的情况

2)acks配置为1:

        等待leader副本确认接收后,才会发送下一条数据,性能中等。

3)acks配置为-1或者all:

等待所有的副本已经将数据同步后,才会发送下一条数据,性能最慢。

根据业务情况来选择ack机制,是要求高性能,一部分数据丢失影响不大,可以选择0/1,如果要求数据一定不能丢失,就配置为-1/all。

分区中有leader和follower概念。为了确保消费者消费的数据是一致的,只能从分区leader去读写消息,follower做的事情就是同步数据。

10.Kafka-Eagle:kafka监控工具

11.分区的leader和follower

(注意:leader和follower这两个概念是针对分区来的,而不是broker)

在Kafka中,每个topic都可以配置多个分区以及多个副本。每个分区都有一个leader以及0个或者多个follower,在创建topic时,Kafka会将每个分区的leader均匀的分配在每个broker上。我们正常使用kafka是感受不到leader,follower的存在的。但其实,所有的读写操作都是由leader处理,而所有的follower都复制leader的日志数据文件,如果leader出现故障,follower就会被选举为leader。所以可以这样说:

Kafka中的leader负责处理读写操作,而follower只负责副本数据的同步。

如果leader出现故障,其他follower会被重新选举为leader。

follower像一个consumer一样,拉取leader对应分区的数据,并保存到日志数据文件中

12.AR,ISR,OSR

AR(Assigned Replicas------已分配的副本):表示一个Topic下的所有副本。

ISR(In-sync replicas------在同步中的副本):正在同步的副本(可以理解为当前有几个follower是存活的)。

OSR(Out-of-Sync Replicas):不在同步的副本。 

大白话总结:AR所有的,ISR正常的,OSR异常的

正常情况下,所有的follower副本都应该与leader副本保持同步,即AR = ISR,OSR集合为空。

13.Leader选举

kafka的吞吐量很高,延迟很低,所以要选举leader的话,必须要快。

Controller介绍

Kafka启动时,会在所有的broker中选择一个controller

前面leader和follower是针对partition分区,而controller是针对broker的

创建topic、或者添加分区、修改副本数量之类的管理任务都是由controller完成的

Kafka分区leader的选举,也是由Controller决定的

1)所有的分区的leader选举都由controller决定

2)controller会将leader的改变直接通过RPC的方式通知需为此做出响应的Broker

3)controller读取到当前分区的ISR,只要有一个Replica还存活,就选择其中一个作为leader,否则,则任意选择一个Replica作为leader。

4)如果该分区的所有Replica都已经宕机,则新的leader为-1。

Controller:controller是kafka集群的老大,是针对broker的一个角色

        controller是高可用的,是通过ZK来选举的

Leader:是针对partition分区的一个角色

        Leader是通过ISR来快速选举的

14.Kafka中生产者的数据写入流程

生产者先从ZK的 "/brokers/topics/主题名/partitions/分区名/state"节点找到该分区的leader。

生产者在ZK中找到该ID对应的broker

broker进程上的leader将消息写入到本地log中

follower从leader上拉取消息,写入本地log,并向leader发送ACK

leader接收到所有ISR中的Replica的ACK后,并向生产者返回ACK

15.Kafka的读写流程

写流程:

        通过ZK找到分区对应的leader,leader是负责读写的

        生产者开始写入数据

        ISR里面的follower开始同步数据,并返回给leader ACK确认

        最后返回给生产者ACK

读流程:

        通过ZK找到分区对应的leader,leader是负责读写的

        通过ZK找到消费者对应的offset

        从offset往后顺序拉取数据

        提交offset

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/706294.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

iMazing 3.0.0.3 for mac 中文破解版2024最新图文安装教程

我们刚刚发布了iMazing 3.0.0.3 for mac 中文版本。Windows和macOS用户现在都可以试驾并体验iPhone管理的未来。 备受期待的第一个Windows版本得益于过去几个月macOS测试版的所有改进,使其成为一个稳定的初始版本。 我们的开发团队创造了一种无缝的外观和体验&#…

Android日历提醒增删改查事件、添加天数不对问题

Android日历提醒是非常好的提醒功能&#xff0c;笔者在做的过程中&#xff0c;遇到的一些问题&#xff0c;现整理出来&#xff0c;以供参考。 一、申请日历的读写权限 <uses-permission android:name"android.permission.WRITE_CALENDAR" /> <uses-permiss…

Lua速成(5)table

一、table table 是 Lua 的一种数据结构用来帮助我们创建不同的数据类型&#xff0c;如&#xff1a;数组、字典等。 Lua table 使用关联型数组&#xff0c;你可以用任意类型的值来作数组的索引&#xff0c;但这个值不能是 nil。 Lua table 是不固定大小的&#xff0c;你可以…

springboot-基础-eclipse集成mybatis+使用方法+排错

备份笔记。所有代码都是2019年测试通过的&#xff0c;如有问题请自行搜索解决&#xff01; 目录 集成mybatis安装mybatis的jar包安装插件&#xff1a;mybatis-generator安装方法生成方法报错&#xff1a;java.lang.RuntimeException: Exception getting JDBC Driver mybatis注解…

数据之美:用山海鲸展现数据魅力

在数据分析领域&#xff0c;数据可视化是一个至关重要的环节。作为一名资深的数据分析师&#xff0c;我深知一个直观、易于理解的数据可视化看板对于传达信息、辅助决策的重要性。今天&#xff0c;我将以自己在用的山海鲸可视化软件为例&#xff0c;与大家分享如何制作一个高效…

LeetCode 热题 100 | 图论(上)

目录 1 200. 岛屿数量 2 994. 腐烂的橘子 2.1 智障遍历法 2.2 仿层序遍历法 菜鸟做题&#xff0c;语言是 C 1 200. 岛屿数量 解题思路&#xff1a; 遍历二维数组&#xff0c;寻找 “1”&#xff08;若找到则岛屿数量 1&#xff09;寻找与当前 “1” 直接或间接连接在…

项目登录方案选型

一.Cookie + Session 登录 大家都知道,HTTP 是一种无状态的协议。无状态是指协议对于事务处理没有记忆能力,服务器不知道客户端是什么状态。即我们给服务器发送 HTTP 请求之后,服务器根据请求返回数据,但不会记录任何信息。为了解决 HTTP 无状态的问题,出现了 Cookie。Co…

离线数仓(四)【数仓数据同步策略】

前言 今天来把数仓数据同步解决掉&#xff0c;前面我们已经把日志数据到 Kafka 的通道打通了。 1、实时数仓数据同步 关于实时数仓&#xff0c;我们的 Flink 直接去 Kafka 读取即可&#xff0c;我们在学习 Flink 的时候也知道 Flink 提供了 Kafka Source&#xff0c;所以这里不…

协议-http协议-基础概念02-请求应答过程-请求响应报文结构-头部字段-请求方法-响应方式

参考来源&#xff1a; 极客时间-透视HTTP协议(作者&#xff1a;罗剑锋)&#xff1b; web抓包实战课-陶辉&#xff1b; 01-HTTP协议请求-应答过程 最简单的浏览器 HTTP 请求过程 浏览器从地址栏的输入中获得服务器的 IP 地址和端口号&#xff1b;浏览器用 TCP 的三次握手与服…

CSS复合选择器(一)

CSS复合选择器&#xff08;一&#xff09; 1.交集选择器2. 并集选择器3. 后代选择器4. 子代选择器5.兄弟选择器5.1相邻兄弟选择器&#xff1a;5.2通用兄弟选择器&#xff1a; 6.属性选择器 1.交集选择器 作用&#xff1a;选中同时符合多个条件的元素。 交集有并且的含义&#…

基于springboot的4S店车辆管理系统源码和论文

随着信息技术和网络技术的飞速发展&#xff0c;人类已进入全新信息化时代&#xff0c;传统管理技术已无法高效&#xff0c;便捷地管理信息。为了迎合时代需求&#xff0c;优化管理效率&#xff0c;各种各样的管理系统应运而生&#xff0c;各行各业相继进入信息管理时代&#xf…

python中写monogo的uri参数里,为什么有多个IP,是连接多个服务器吗

问题来源&#xff1a; 代码如下&#xff1a; from pymongo import MongoClientmongo_uri "mongodb://admin:password192.168.93.23:32725,192.132.9.35:32740,192.22.16.42:30538/?" # 创建MongoClient实例 client MongoClient(mongo_uri)为什么uri中会存在多个…

win10开机黑屏,只有鼠标,解决方案

问题描述 win10进不去桌面&#xff0c;可以进去锁屏&#xff0c;只有鼠标&#xff0c;也能进去任务管理器&#xff08;ctrlwindelete&#xff09;, 问题分析 进入任务管理器->文件->运行新任务 然后输入 explorer.exe 发现找不到了 原因&#xff1a;误删explorer.exe …

Linux系统---nginx(1)服务

目录 一.Nginx概述 1.定义 2.Nginx模块作用 &#xff08;1&#xff09;main模块 &#xff08;2&#xff09;stream服务模块 &#xff08;3&#xff09;邮件服务模块 &#xff08;4&#xff09;第三方模块 &#xff08;5&#xff09;events模块 &#xff08;6&#xff0…

300分钟吃透分布式缓存-16讲:常用的缓存组件Redis是如何运行的?

Redis 基本原理 Redis 简介 Redis 是一款基于 ANSI C 语言编写的&#xff0c;BSD 许可的&#xff0c;日志型 key-value 存储组件&#xff0c;它的所有数据结构都存在内存中&#xff0c;可以用作缓存、数据库和消息中间件。 Redis 是 Remote dictionary server 即远程字典服务…

SpringCloud有哪些组件

什么是SpringCloud&#xff1f; Spring Cloud是基于Spring Boot的分布式系统开发工具&#xff0c;它提供了一系列开箱即用的、针对分布式系统开发的特性和组件&#xff0c;用于帮助开发人员快速构建和管理云原生应用程序。 Spring Cloud的主要目标是解决分布式系统中的常见问题…

linux使用 busybox microcom AT指令测试4G/5G模块

1、busybox microcom命令使用方法如下&#xff1a; 参数&#xff1a; -d&#xff1a;表示延时时间&#xff0c;一般不设置。 -t&#xff1a;表示超时时间&#xff0c;超多长时间后该命令自动退出。单位为ms。 -s&#xff1a;表示传输速度&#xff0c;即串口波特率。 -X&#…

阿里云oss工具ossutil使用手册(windows)

一、下载安装 下载地址和教程 安装ossutil。 单击下载链接下载Windows安装包。 将工具解压&#xff0c;并双击运行ossutil.bat文件。 配置ossutil。 输入配置命令。 ossutil config 根据提示设置配置文件路径。 请输入配置文件名&#xff0c;文件名可以带路径&#xff08;…

代码随想录算法训练营第二十七天补|39. 组合总和 ● 40.组合总和II ● 131.分割回文串

组合问题&#xff1a;集合内元素的组合&#xff0c;不同集合内元素的组合 分割问题&#xff1a;本质还是组合问题&#xff0c;注意一下如何分割字符串 回溯模板伪代码 void backtracking(参数) {if (终止条件) {存放结果;return;}for (选择&#xff1a;本层集合中元素&#xf…

HTML5 增加了辅助 DOM 焦点管理的功能

焦点管理 ​ HTML5 增加了辅助 DOM 焦点管理的功能。 ​ activeElement 可以用来查询文档&#xff0c;确定哪个元素拥有焦点&#xff0c;hasFocus() 方法可以查询文档是否获得了焦点&#xff0c; 而这对于保证 Web 应用程序的无障碍使用是非常重要的。无障碍 Web 应用程序的一…