Kafka初步学习

kafka消息队列模式
点对点模式:一对一,消费者主动拉取数据,消息收到后消息清除
消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息
消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息
queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

发布订阅模式
消息生产者将消息发布到topic中,可以有多个topic
同时有多个消费者订阅消费该消息,消费数据后不删除
每个消费者相互独立,都可以消费到数据

专业名词:
事件:当你向kafka读取或写入数据时,你以事件的形式执行此操作,从概念上讲,事件具有键,值,时间戳和可选的元数据表头。
生产者
消费者:在kafka中,生产者和消费者完全解耦并且彼此不可知,生产者永远不需要等待消费者
主题:事件被组织并持久的存储在主题中,kafka中的主题始终是多生产者和多订阅者:一个主题可以是N个向其写入事件的生产者,以及订阅这些事件的
N个消费者。主题中的事件可以根据需要随时读取,与传统的消息传递系统不同,事件在消费后不会被删除。相反,您可以通过每个主题的配置设置来定义
kafka应该将您的事件保留多长时间,之后旧事件将被丢弃,kafka的性能在数据大小方面实际是恒定的,因此长时间存储数据是非常好的。

主题是分区的,这意味着一个主题分布在位于不同的kafka集群上的多个broker中,数据的这种分布式放置对于可伸缩性非常重要,因为它允许客户端应用程序同时
从多个broker读取和写入数据,当一个新事件发布到一个主题时,它实际上时附加到主题的分区之一。具有相同事件键的事件被写入同一个分区。并且kafka保证给定
主题分区的任何消费者将始终以与写入事件完全相同的顺序读取该分区的事件

消费者组:由多个consumer组成,消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费(组中一个消费者消费,组里面别的消费者不行),消费者组之间互不影响
partition:为了实现扩展性,一个非常大的topic可以分布到多个broker上,一个topic可以分布到多个broker中,一个topic可以氛围多个partition,每个partition是一个有序队列
replica:副本,为了保证集群中某个节点发生故障,该节点的partition数据不会丢失,一个topic的每个分区都有若干个副本,一个leader和若干个follower
leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。
follower:每个分区多个副本中的“从”,实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个follower会成为新的leader。

拦截器:主要用于实现clients端的定制化控制逻辑,它可以使的用户在消息发送前以及producer回调逻辑前有机会对消息做出一些定制化需求,比如修改消息等,同时允许
指定多个interceptor按序作用于同一调消息从而形成一个拦截器链。
拦截器需要实现ProducerInterceptor接口:具体方法解释
configure:做一些初始化工作
onsend:运行在用户的main线程中,producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好不要修改所属的topic和分区,否则会
影响目标分区计算。
onAcknowledgement:该方法会在消息被应答之前或消息发送失败时调用,并且通常都是回调逻辑触发之前,该方法运行在producer的I/O线程中,因此不要在该方法中放入很重的
逻辑,否则会拖慢producer的消息发送效率
close:主要执行一些资源清理工作
Interceptor可能运行在多个线程中,因此在具体实现时用户需
要自行确保线程安全。另外,若指定了多个Interceptor,则
producer将按照指定顺序调用它们,同时把每个Interceptor中捕获的异常记录写入到错误日志中而不是向上传递。这在使用过程中要特别留意。

分区优势:1.便以合理使用存储资源,每个partition在一个broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台broker上
合理控制分区的任务,可以实现负载均衡的效果
2.提高并行度,生产者可以以分区为单位发送数据,消费者可以以分区为单位进行消费数据
3.分区后,更方便于做副本备份,提高数据安全性
分区原则一:指明partition的情况下,直接将指定的值作为分区值,例如partition=1,对应数据就如分区1
分区原则二:没有具体的partition值而有key的情况下,消息要被发送到的目标分区号partition=Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions
分区原则三:既没有partition值也没有key的情况下,Kafka采用stickyPartitionCache.partition(topic, cluster) 黏性分区器,会随机选择一个分区,
并尽可能一直使用该分区,待该分区的batch已满或linger.ms设置的时间到了,再随机一个分区进行使用(通常和上一次的分区不同)。

自定义分区器:
一:
1.实现接口Partitioner
2.覆盖接口中的方法,主要分区逻辑在方法partition中完成
二:
在用于构造kafkaproducer的peoperties对象中设置partitioner.class参数

消息无丢失
acks:0生产者发送过来的数据,不需要等leader数据持久化就应答完成。数据可靠性最低,丢失数据概率较高,数据就丢失情景:leader未持久化down掉,使用最少
acks:1生产者发送过来的数据,leader收到持久化后应答完成。数据可靠性中等,丢失数据概率中等。丢失数据情景:leader持久化后,所有的follower还没同步数据,
leader挂了。用于传输普通日志。
acks:-1或all 生产者发送过来的数据,leader和isp队列里面的所有节点都持久化数据后才应答。数据可靠性最高,不会丢失数据。传输重要数据,比如和钱相关的数据
思考:在acks=-1或all的情况下,Leader接收到数据并持久化后,所有Follower开始同步Leader刚刚持久化的数据,但是有一个Follower因故障迟迟不能进行数据同步,该问题应该怎么解决?
解决方案:
Leader维护了一个动态的 in-sync replica set(ISR) ,意为和Leader保
持同步的Follower+Leader集合(leader:0,isr:0,1,2)。如果Follower长时间未向Leader发送通信请求或同步数据,则该
Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30000ms。例如1超时,(leader:0, isr:0,2)。这样就不用等长期联系不上或者已经故障的节点。
数据完全可靠性条件:
1ACK级别设置为-1
2分区副本>=2
3ISR应答的最小副本数>=2 (最小副本数有min.insync.replicas设置,默认为1)
生成者中配置响应级别:
// 设置 acks
prop.put(ProducerConfig.ACKS_CONFIG, “all”);
//重试次数 retries,默认是 int 最大值,2147483647
prop.put(ProducerConfig.RETRIES_CONFIG, 3);

消息去重
至少一次(At Least Once):ACK级别设置为-1 + 分区副本
大于等于2 + ISR里应答的最小副本数量大于等于2最多一次(At Most Once):ACK级别设置为0
精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。
Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。
1.At Least Once可以保证数据不丢失,但是不能保证数据不重复;
2.At Most Once可以保证数据不重复,但是不能保证数据不丢失。
生产者_ 数据去重_幂等性
幂等性就是指producer不论向broker发送多少次重复数据,broker都只会持久化一条,保证了不重复
精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 +分区副本数>=2 + ISR最小副本数量>=2) 。
重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence
Number是单调自增的。所以幂等性只能保证的是在单分区单会话内不重复。
如何使用幂等性?
开启参数 enable.idempotence 默认为 true,false 关闭。
生产者_ 数据去重_事务

消费者
消费者有两种消费方式:
push推:模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的,它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,具体表现就是
决绝服务以及网络拥塞
consumer采用pull模式从broker中读取数据,pull模式则可以根据consumer的消费能力以适当的速率消费消息。pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一致返回
空数据,针对这一点,kafka消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可消费,consumer会等待一段时间之后在返回,这段时长就是timeout
offser刨析
每个消费者的offset由消费者提交到kafka系统主题_consumer_offsets.
__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic名称+分区号,value 就是当前 offset 的值。
每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic名称+分区号就保留最新数据。
手动提交offset的方法有两种方式:
commitSync 同步提交:必须等待offset提交完毕,再去消费下一批数据。
commitAsync 异步提交:发送完提交offset请求后,就开始消费下一批数据了。
两者的区别:
相同点是,都会将本次消费的一批数据的最高的偏移量提交;
不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/23776.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【DevOps】网站安全事件分析方法详解和实战分析

目录 一、网站安全事件分析流程 二、常用的网站安全事件分析方法 三、网站被植入挖矿脚本事件分析 1、 事件发现与确认 2、 事件隔离与取证 3、 事件分析与溯源 4、 事件处置与恢复 5、 事件总结与改进 四、网站安全事件分析的关键点 五、提高网站安全事件分析能力的…

python使用appium截图手机屏幕图片

要使用 Appium 截取手机屏幕图片并在电脑上显示&#xff0c;你需要执行几个步骤。以下是一个基本的流程&#xff1a; 设置 Appium 环境&#xff1a;确保你已经安装了 Appium 服务器和 Python 客户端库&#xff08;如 appium-python-client&#xff09;。编写 Python 脚本&…

详解大厂实时数仓建设V4.0

一、实时数仓建设背景 1. 实时需求日趋迫切 目前各大公司的产品需求和内部决策对于数据实时性的要求越来越迫切&#xff0c;需要实时数仓的能力来赋能。传统离线数仓的数据时效性是 T1&#xff0c;调度频率以天为单位&#xff0c;无法支撑实时场景的数据需求。即使能将调度频…

参数传递和剪枝,从修剪二叉树谈起

669. 修剪二叉搜索树 - 力扣&#xff08;LeetCode&#xff09; 一、参数传递 Java中的参数传递方式只有一种&#xff0c;那就是值传递。如果我们传的是基本数据类型&#xff0c;那么函数接收到的就是该数据的副本&#xff0c;如果我们传的是对象&#xff0c;那么函数接收到的就…

Uniapp与第三方应用数据通讯

首先说明一点&#xff0c;这个只是uniapp代码编写的应用之间相互传递数据&#xff0c;uniapp编写的与其他语言编写的我尚不知道能不能传递。 应用1&#xff1a; plus.runtime.launchApplication({pname: "应用的appid",// extra 中可以自定数据&#xff0c;url和da…

【Qt知识】部分QWidget属性表格

QWidget是Qt库中所有图形用户界面组件的基类&#xff0c;它提供了大量属性以供自定义和配置控件的行为和外观。下面列出了一些主要的QWidget属性及其作用。 属性 作用 accessibleName 控件的辅助技术名称&#xff0c;用于无障碍访问。 accessibleDescription 控件的辅助技…

详解MySQL的间隙锁

MySQL 的间隙锁&#xff08;Gap Lock&#xff09; 间隙锁&#xff08;Gap Lock&#xff09;是 InnoDB 存储引擎中的一种锁机制&#xff0c;用于防止幻读现象。幻读是指在一个事务中&#xff0c;两次读取同一个范围的数据时&#xff0c;第二次读取出现了第一次没有出现的数据行…

CSS真题合集(一)

CSS真题合集&#xff08;一&#xff09; 1. 盒子模型1.1 盒子模型的基本组成1.2 盒子模型的实际大小1.3 盒子模型的两种类型1.4 设置盒子模型1.5 弹性盒子模型 2. BFC2.1 主要用途2.2 触发BFC的方法2.2 解决外边距的塌陷问题&#xff08;垂直塌陷&#xff09; 3. 响应式布局3.1…

接口自动化框架封装思想建立(全)

httprunner框架&#xff08;上&#xff09; 一、什么是Httprunner&#xff1f; 1.httprunner是一个面向http协议的通用测试框架&#xff0c;以前比较流行的是2.X版本。 2.他的思想是只需要维护yaml/json文件就可以实现接口自动化测试&#xff0c;性能测试&#xff0c;线上监…

spring aop小记

一、aop概念 面向切面编程 参考&#xff1a;https://blog.csdn.net/lhj520cb/article/details/125820513 常用术语解释&#xff08;根据代码理解的&#xff09;&#xff1a; Aspect(切面)&#xff1a;Advice 通知(即增强)和 Pointcut 切点的结合。&#xff08;数学上可以理…

#define 和 const 定义常量的区别

文章目录 一、数组定义1.1 全局作用域中定义数组1.2 局部作用域定义数组 二、细节补充2.1 #define 和 const 的区别2.2 全局数组和局部数组的区别2.3 编译时常量 vs 运行时常量2.4 为什么局部作用域不要求编译时常量 总结 一、数组定义 1.1 全局作用域中定义数组 在全局作用域…

【Android面试八股文】Java异常机制中,异常Exception与错误Error区别是什么?

Java异常机制中,异常Exception与错误Error区别是什么? 这道题想考察什么? 在开发时需要时候需要自定义异常时,应该选择定义Excption还是Error? 编写的代码触发Excption或者Error分别代表什么? 考察的知识点 Java异常机制 考生应该如何回答 在Java中存在一个 Throwa…

Git - 详解 创建一个新仓库 / 推送现有文件夹 / 推送现有的 Git 仓库 到私有Gitlab

文章目录 【推送现有文件夹】详细步骤指令说明Git 全局设置设置Git全局用户名设置Git全局电子邮件地址 推送现有文件夹1. 进入现有文件夹2. 初始化Git仓库并设置初始分支为main3. 添加远程仓库4. 添加所有文件到暂存区5. 提交更改6. 推送代码到远程仓库并设置上游分支 创建一个…

ESXi内安装OpenWrt

目录 0、前言 1、环境 2、转换格式 3、创建虚拟机 4、OpenWrt设置 5、单臂流量测试 6、总结 0、前言 前几天在ESXi中先安装了PVE,然后在PVE中安装OpenWrt,没有来得及深入测试,仅仅作为安装和熟悉PVE的过程。后来转念一想为什么不在ES…

js之this

最近写代码学习了this&#xff0c;这个非常好用啊&#xff0c;今天拿出来给大家分享一下&#xff0c;根据我的理解以及前辈们的讲解 this 关键字代表当前执行上下文中的对象。它通常指向调用函数的对象&#xff0c;但其值可能取决于函数的调用方式。 全局上下文中的 this&…

项目经验,用什么思路去叙述?

文章目录 前言一、项目经验二、关于自学 前言 如果你的项目掌握的不高&#xff0c;面试官对项目的拷打&#xff0c;hold不住怎么办&#xff1f;首先一定要把项目说通透&#xff0c;一定要会说&#xff0c;就算可能没真实做过一个项目&#xff0c;会说出来讲通透就行。写在简历…

CS1061 “HtmlHelper”未包含“Partial”的定义,并且找不到可接受第一个“HtmlHelper”类型参数的可访问扩展方法“Partial”

严重性 代码 说明 项目 文件 行 禁止显示状态 错误 CS1061 “HtmlHelper”未包含“Partial”的定义&#xff0c;并且找不到可接受第一个“HtmlHelper”类型参数的可访问扩展方法“Partial”(是否缺少 using 指令或程序集引用?) 14_Views_Message_E…

找嵌入式软件工作,freertos要掌握到什么程度?

对于嵌入式软件工程师来说&#xff0c;掌握RTOS&#xff08;实时操作系统&#xff09;的程度并不是决定性因素&#xff0c;而更重要的是工程思维和解决问题的能力。我这里有一套嵌入式入门教程&#xff0c;不仅包含了详细的视频讲解&#xff0c;项目实战。如果你渴望学习嵌入式…

GitHub个人访问令牌登录

生成个人访问令牌 登录 GitHub。访问 Personal Access Tokens 页面。点击 “Generate new token”。为令牌选择一个描述性名称。选择 repo 权限。点击 “Generate token” 生成令牌。复制生成的令牌。 推送命令 设置存储库 URL&#xff08;如果需要&#xff09;&#xff1a;…

Angular封装高德地图组件实现输入框搜索,地图点击选地点

Angular封装高德地图组件实现输入框搜索,地图点击选地点(Angular17版本) 话不多说直接上代码 创建一个独立组件 html代码: <div style"position: relative;"><input #searchInput nz-input placeholder"请输入地址"/><div #mapContaine…