Kafka基本知识整理

首先Kafka是一个分布式消息队列中间件,Apache顶级项目,https://kafka.apache.org/   高性能、持久化、多副本备份、横向扩展。

生产者Producer往队列里发送消息,消费者Consumer从队列里消费消息,然后进行业务逻辑。应用场景主要有:解耦、削峰(缓冲)、异步处理、排队、分布式事务控制等等。

  1. Kafka对外使用Topic(主题)的概念,生产者往Topic里写消息,消费者从Topic中消费读消息。

  2. 为了实现水平扩展,一个Topic实际是由多个Partition(分区)组成的,遇到瓶颈时,可以通过增加Partition的数量来进行横向扩容。

  3. 单个Parition内是保证消息有序。持久化时,每收到一条消息,Kafka就是在对应的日志文件Append写,所以性能非常高。

Kafka Data Flow 消息流转图

640?wx_fmt=png

上图中,消息生产者Producers往Brokers里面的指定Topic中写消息,消息消费者Consumers从Brokers里面消费指定Topic的消息,然后进行业务处理。

在实际的部署架构中,Broker、Topic、Partition这些元数据保存在ZooKeeper中,Kafka的监控、消息路由(分区)由ZooKeeper控制。0.8版本的OffSet也由ZooKeeper控制。

一、消息生产/发送过程

640?wx_fmt=png

Kafka创建Message、发送时要指定对应的Topic和Value(消息体),Key(分区键)和Partition(分区)是可选参数。 

调用Producer的Send()方法后,消息先进行序列化(消息序列化器可自定义实现:例如:Protobuf),然后按照Topic和Partition,临时放到内存中指定的发送队列中。达到阈值后,然后批量发送。

发送时,当Partition没设置时,如果设置了Key-分区键(例如:单据类型),按照Key进行Hash取模,保证相同的Key发送到指定的分区Partition。如果未设置分区键Key,使用Round-Robin轮询随机选分区Partition。

二、分区Partition的高可用和选举机制

分区有副本的概念,保证消息不丢失。当存在多副本的情况下,会尽量把多个副本,分配到不同的broker上。

Kafka会为Partition选出一个Leader Broker(通过ZooKeeper),之后所有该Partition的请求,实际操作的都是Leader,然后再同步到其他的Follower。

当一个Kafka Broker宕机后,所有Leader在该Broker上的Partition都会重新选举,在剩余的Follower中选出一个Leader,继续提供服务。

正如上面所讲:Kafka使用ZooKeeper在多个Broker中选出一个Controller,用于Partition分配和Leader选举。以下是Partition的分配机制:

  • 将所有Broker(假设共n个Broker)和待分配的Partition排序

  • 将第i个Partition分配到第(i mod n)个Broker上 (这个就是leader)

  • 将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上

Controller会在ZooKeeper的/brokers/ids节点上注册Watch,一旦有broker宕机,它就能知道。

当Broker宕机后,Controller就会给受到影响的Partition选出新Leader。

Controller从ZooKeeper的/brokers/topics/[topic]/partitions/[partition]/state中,读取对应Partition的ISR(in-sync replica已同步的副本)列表,选一个出来做Leader。

选出Leader后,更新ZooKeeper的存储,然后发送LeaderAndISRRequest给受影响的Broker进行通知。

如果ISR列表是空,那么会根据配置,随便选一个replica做Leader,或者干脆这个partition就是宕机了。

如果ISR列表的有机器,但是也宕机了,那么还可以等ISR的机器活过来。

多副本同步:

服务端这边的处理是Follower从Leader批量拉取数据来同步。但是具体的可靠性,是由生产者来决定的。

生产者生产消息的时候,通过request.required.acks参数来设置数据的可靠性。

640?wx_fmt=png

 在acks=-1的时候,如果ISR少于min.insync.replicas指定的数目,那么就会返回不可用。

 这里ISR列表中的机器是会变化的,根据配置replica.lag.time.max.ms,多久没同步,就会从ISR列表中剔除。以前还有根据落后多少条消息就踢出ISR,在1.0版本后就去掉了,因为这个值很难取,在高峰的时候很容易出现节点不断的进出ISR列表。  

 从ISA中选出leader后,follower会从把自己日志中上一个高水位后面的记录去掉,然后去和leader拿新的数据。因为新的leader选出来后,follower上面的数据,可能比新leader多,所以要截取。这里高水位的意思,对于partition和leader,就是所有ISR中都有的最新一条记录。消费者最多只能读到高水位;

 从leader的角度来说高水位的更新会延迟一轮,例如写入了一条新消息,ISR中的broker都fetch到了,但是ISR中的broker只有在下一轮的fetch中才能告诉leader。

 也正是由于这个高水位延迟一轮,在一些情况下,kafka会出现丢数据和主备数据不一致的情况,0.11开始,使用leader epoch来代替高水位。

三、消息消费过程

订阅topic是以一个消费组来订阅的,一个消费组里面可以有多个消费者。同一个消费组中的两个消费者,不会同时消费一个partition。

换句话来说,就是一个partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费。因此,如果消费组内的消费者如果比partition多的话,那么就会有个别消费者一直空闲。

 消息Offset偏移量(消息的顺序号)管理

 一个消费组消费partition,需要保存offset记录消费到哪,以前保存在zk中,由于zk的写性能不好,以前的解决方法都是consumer每隔一分钟上报一次。

 ZooKeeper的性能严重影响了消费的速度,而且很容易出现重复消费。

 在0.10版本后,Kafka把这个offset的保存,从ZooKeeper总剥离,保存在一个名叫__consumeroffsets topic的Topic中。

 消息的key由groupid、topic、partition组成,value是偏移量offset。topic配置的清理策略是compact。总是保留最新的key,其余删掉。

 一般情况下,每个key的offset都是缓存在内存中,查询的时候不用遍历partition,如果没有缓存,第一次就会遍历partition建立缓存,然后查询返回。

 Partitin的Rebalance

 生产过程中broker要分配partition,消费过程这里,也要分配partition给消费者。类似broker中选了一个controller出来,消费也要从broker中选一个coordinator,用于分配partition。

 coordinator的选举过程

  1. 看offset保存在那个partition

  2. 该partition leader所在的broker就是被选定的coordinator

 交互流程 

  1. consumer启动、或者coordinator宕机了,consumer会任意请求一个broker,发送ConsumerMetadataRequest请求,broker会按照上面说的方法,选出这个consumer对应coordinator的地址。

  2. consumer 发送heartbeat请求给coordinator,返回IllegalGeneration的话,就说明consumer的信息是旧的了,需要重新加入进来,进行reblance。返回成功,那么consumer就从上次分配的partition中继续执行。

  Rebalance

  1. consumer给coordinator发送JoinGroupRequest请求。

  2. 这时其他consumer发heartbeat请求过来时,coordinator会告诉他们,要reblance了。

  3. 其他consumer发送JoinGroupRequest请求。

  4. 所有记录在册的consumer都发了JoinGroupRequest请求之后,coordinator就会在这里consumer中随便选一个leader。然后回JoinGroupRespone,这会告诉consumer你是follower还是leader,对于leader,还会把follower的信息带给它,让它根据这些信息去分配partition

  5. consumer向coordinator发送SyncGroupRequest,其中leader的SyncGroupRequest会包含分配的情况。

  6. coordinator回包,把分配的情况告诉consumer,包括leader。

   当partition或者消费者的数量发生变化时,都得进行reblance。

   列举一下会reblance的情况:

  1. 增加Partition

  2. 增加消费者

  3. 消费者主动关闭

  4. 消费者宕机

  5. coordinator宕机

四、消息投递语义

kafka支持3种消息投递语义,
At most once:最多一次,消息可能会丢失,但不会重复
At least once:最少一次,消息不会丢失,可能会重复
Exactly once:只且一次,消息不丢失不重复,只且消费一次(0.11中实现,仅限于下游也是kafka)

At least once:(业务中使用比较多)

先获取数据,再进行业务处理,业务处理成功后commit offset。

  1. 生产者生产消息异常,消息是否成功写入不确定,重做,可能写入重复的消息

  2. 消费者处理消息,业务处理成功后,更新offset失败,消费者重启的话,会重复消费

At most once:

先获取数据,再commit offset,最后进行业务处理。

  1. 生产者生产消息异常,不管,生产下一个消息,消息就丢了

  2. 消费者处理消息,先更新offset,再做业务处理,做业务处理失败,消费者重启,消息就丢了。

Exactly once:

首先要保证消息不丢,再去保证不重复。所以盯着At least once的原因来搞。 

  1. 生产者重做导致重复写入消息----生产保证幂等性

  2. 消费者重复消费---消灭重复消费,或者业务接口保证幂等性重复消费也没问题

业务处理的幂等性非常重要。Kafka控制不了,需要业务来实现。比如所判断消息是否已经处理。

解决重复消费有两个方法:

  1. 下游系统保证幂等性,重复消费也不会导致多条记录。

  2. 把commit offset和业务处理绑定成一个事务。

生产的幂等性:

为每个producer分配一个pid,作为该producer的唯一标识。producer会为每一个<topic,partition>维护一个单调递增的seq。类似的,broker也会为每个<pid,topic,partition>记录下最新的seq。当req_seq == broker_seq+1时,broker才会接受该消息。因为:

  1. 消息的seq比broker的seq大超过时,说明中间有数据还没写入,即乱序了。

  2. 消息的seq不比broker的seq小,那么说明该消息已被保存。

640?wx_fmt=png

  事务性和原子性

   场景是这样的:

  1. 先从多个源topic中获取数据。

  2. 做业务处理,写到下游的多个目的topic。

  3. 更新多个源topic的offset。

   其中第2、3点作为一个事务,要么全成功,要么全失败。这里得益与offset实际上是用特殊的topic去保存,这两点都归一为写多个topic的事务性处理。

   640?wx_fmt=png

   引入tid(transaction id),和pid不同,这个id是应用程序提供的,用于标识事务,和producer是谁并没关系。就是任何producer都可以使用这个tid去做事务,这样进行到一半就死掉的事务,可以由另一个producer去恢复。
   同时为了记录事务的状态,类似对offset的处理,引入transaction coordinator用于记录transaction log。在集群中会有多个transaction coordinator,每个tid对应唯一一个transaction coordinator。
   注:transaction log删除策略是compact,已完成的事务会标记成null,compact后不保留。
   启动事务时,先标记开启事务,写入数据,全部成功就在transaction log中记录为prepare commit状态,否则写入prepare abort的状态。之后再去给每个相关的partition写入一条marker(commit或者abort)消息,标记这个事务的message可以被读取或已经废弃。成功后     在transaction log记录下commit/abort状态,至此事务结束。

   整体的数据流是这样的:

   640?wx_fmt=png

  1. 首先使用tid请求任意一个broker(代码中写的是负载最小的broker),找到对应的transaction coordinator。

  2. 请求transaction coordinator获取到对应的pid,和pid对应的epoch,这个epoch用于防止僵死进程复活导致消息错乱,当消息的epoch比当前维护的epoch小时,拒绝掉。tid和pid有一一对应的关系,这样对于同一个tid会返回相同的pid。

  3. client先请求transaction coordinator记录<topic,partition>的事务状态,初始状态是BEGIN,如果是该事务中第一个到达的<topic,partition>,同时会对事务进行计时;client输出数据到相关的partition中;client再请求transaction coordinator记录offset的<topic,partition>事务状态;client发送offset commit到对应offset partition。

  4. client发送commit请求,transaction coordinator记录prepare commit/abort,然后发送marker给相关的partition。全部成功后,记录commit/abort的状态,最后这个记录不需要等待其他replica的ack,因为prepare不丢就能保证最终的正确性了。

     这里prepare的状态主要是用于事务恢复,例如给相关的partition发送控制消息,没发完就宕机了,备机起来后,producer发送请求获取pid时,会把未完成的事务接着完成。

     当partition中写入commit的marker后,相关的消息就可被读取。所以kafka事务在prepare commit到commit这个时间段内,消息是逐渐可见的,而不是同一时刻可见。

    消息消费事务

    消费时,partition中会存在一些消息处于未commit状态,即业务方应该看不到的消息,需要过滤这些消息不让业务看到,kafka选择在消费者进程中进行过来,而不是在broker中过滤,主要考虑的还是性能。kafka高性能的一个关键点是zero copy,如果需要在broker中过 滤,那么势必需要读取消息内容到内存,就会失去zero copy的特性。


  五、 Kafka文件组织

  kafka的数据,实际上是以文件的形式存储在文件系统的。topic下有partition,partition下有segment,

segment是实际的一个个文件

,topic和partition都是抽象概念。

  在目录/${topicName}-{$partitionid}/下,存储着实际的log文件(即segment),还有对应的索引文件。

  每个segment文件大小相等,文件名以这个segment中最小的offset命名,文件扩展名是.log;segment对应的索引的文件名字一样,扩展名是.index。有两个index文件,一个是offset index用于按offset去查message,一个是time index用于按照时间去查,其实这里可以优化合到一起,下面只说offset index。总体的组织是这样的:

640?wx_fmt=png

  为了减少索引文件的大小,降低空间使用,方便直接加载进内存中,这里的索引使用稀疏矩阵,不会每一个message都记录下具体位置,而是每隔一定的字节数,再建立一条索引。 索引包含两部分,分别是baseOffset,还有position。

  baseOffset:意思是这条索引对应segment文件中的第几条message。这样做方便使用数值压缩算法来节省空间。例如kafka使用的是varint。

  position:在segment中的绝对位置。

  查找offset对应的记录时,会先用二分法,找出对应的offset在哪个segment中,然后使用索引,在定位出offset在segment中的大概位置,再遍历查找message。

六、Kafka常用配置项

  Broker配置

640?wx_fmt=png

  Topic配置

640?wx_fmt=png

  参考链接:123archu 链接:https://www.jianshu.com/p/d3e963ff8b70 

原文地址:https://www.cnblogs.com/tianqing/p/10808717.html

.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com 
640?wx_fmt=jpeg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/316022.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

牛客练习赛89——牛牛小数点(未解决)

牛牛小数点 题意&#xff1a; 题解&#xff1a; 本题先说结论&#xff1a; 对于一个数x2a∗5b∗px2^a*5^b*px2a∗5b∗p 如果p1,也就是质因子只有2和5&#xff0c;则x是不循环小数&#xff0c;即f(x)0如果p!1,则x是循环的&#xff0c;且循环开始于小数点后第1max{p2,p5p_{2},…

针对.NET Core, Xamarin以及.NET的自动类型安全Rest库: Refit

本文大部分内容是针对Refit官网的翻译。官网地址&#xff1a; https://github.com/reactiveui/refitRefit是一个类似于Retrofit的Restful Api库&#xff0c;使用它&#xff0c;你可以将你的Restful Api定义在接口中。例如&#xff1a;public interface IGitHubApi { [Get(&quo…

用ProGet搭建本地私有NuGet仓库

搭建ProGet下载官网下载Windows版本的Inedo Hub &#xff08;https://inedo.com/proget/download&#xff09;下载下来的软件名&#xff1a; ProGetInstaller.exe安装点击ProGetInstaller.exe&#xff0c;出现如下安装界面Registration 选项选择 Free ;SQL Sever 选项选择 Spec…

CQRS架构下Equinox开源项目分析

一.DDD分层架构介绍本篇分析CQRS架构下的Equinox开源项目。该项目在github上star占有2.4k。便决定分析Equinox项目来学习下CQRS架构。再讲CQRS架构时&#xff0c;先简述下DDD风格&#xff0c;在DDD分层架构中&#xff0c;一般包含表现层、应用程序层(应用服务层)、领域层(领域服…

仿B站(一) 目的分析以及创建 WebAPI + Angular7 项目

前言&#xff1a;本系列文章主要为对所学 Angular 框架的一次微小的实践&#xff0c;对 b站页面作简单的模仿。本系列文章主要参考资料&#xff1a;微软文档&#xff1a;    https://docs.microsoft.com/zh-cn/aspnet/core/getting-started/?viewaspnetcore-2.1&tabsw…

Mac中搭建Kubernetes

Kubernetes是Google和RadHat公司共同主导的开源容器编排项目&#xff0c;功能非常强大&#xff0c;也非常的火热和流行&#xff0c;但同时里面也有很多的概念和名词需要我们去学习和理解。学习任何一个技术先需要把基础环境搭建起来&#xff0c;本篇就介绍怎样在Mac中启动单节点…

树莓派也跑Docker和.NET Core

树莓派就是一个卡片大小的迷你电脑。有了电脑&#xff0c;我们当然得先安装系统。系统下载https://www.raspberrypi.org/downloads/raspbian/ &#xff0c;我选择的Raspbian Stretch Lite&#xff0c;不带界面的最小安装。下载win32diskimager&#xff08;烧录系统&#xff09;…

开源]OSharpNS 步步为营系列 - 1. 业务模块设计

OSharpNS全称OSharp Framework with .NetStandard2.0&#xff0c;是一个基于.NetStandard2.0开发的一个.NetCore快速开发框架。这个框架使用最新稳定版的.NetCore SDK&#xff08;当前是.NET Core 2.2&#xff09;&#xff0c;对 AspNetCore 的配置、依赖注入、日志、缓存、实体…

CF1479A Searching Local Minimum

CF1479A Searching Local Minimum 题意&#xff1a; 题解&#xff1a; 先说结论&#xff1a; 若l&#xff0c;r满足&#xff1a; al−1>al,ar<ar1a_{l-1}>a_{l},a_{r}<a_{r1}al−1​>al​,ar​<ar1​al,al1,....,ara_{l},a_{l1},....,a_{r}al​,al1​,....…

C#8.0的两个有趣的新特性以及gRPC

最近每天忙着跑很多地方&#xff0c;回家就不想动了&#xff0c;没什么心情写东西。今天有空&#xff0c;稍微写一点。下文中&#xff1a;关于C#语法特性的部分需要Visual Studio 2019支持。关于.NET Core的部分需要安装.NET 3.0 Preview4&#xff0c;低版本或许也可以但我没实…

CF1479C Continuous City

CF1479C Continuous City 题意&#xff1a; 给定 L, R. 构造一个有向带权图, 其中点数不大于 32, 且所有边都是从较小的点指向较大的点. 假设这个有向图有 n 个点, 你需要保证从 1到n 的所有路径的权值都在 [L, R]内且不存在 x∈[L,R], 使得不存在或存在多于一条从 1 到 n 的…

Office转PDF,Aspose太贵,怎么办?

在程序开发中经常需要将Office文件转换成PDF&#xff0c;著名的Aspose的三大组件可以很容易完成这个功能&#xff0c;但是Aspose的每个组件都单独收费&#xff0c;而且每个都卖的不便宜。在老大的提示下&#xff0c;换了一种思路来解决这个问题。环境dotNetCore:2.1CentOS:7.5D…

收起.NET程序的dll来

作为上床后需要下床检查好几次门关了没有的资深强迫症患者&#xff0c;有一个及其搞我的问题&#xff0c;就是dll问题。曾几何时&#xff0c;在没有nuget的年代&#xff0c;当有依赖项需要引用的时候&#xff0c;只能通过文件引用来管理引用问题&#xff0c;版本问题&#xff0…

从壹开始 [ Ids4实战 ] 之三║ 详解授权持久化 用户数据迁移

哈喽大家周三好&#xff0c;今天终于又重新开启 IdentityServer4 的落地教程了&#xff0c;不多说&#xff0c;既然开始了&#xff0c;就要努力做好?。书接上文&#xff0c;在很久之前的上篇文章《二║ 基础知识集合 & 项目搭建一》中&#xff0c;我们简单的说了说 Identi…

微软XAML Studio - WPF, UWP, Xamarin等技术开发者的福音

最近在继续倒腾WPF的项目&#xff0c;继续使用Caliburn.Micro和Xceed来堆代码。每次调试xaml上的binding&#xff0c;都有种要疯的赶脚。今天路过 https://channel9.msdn.com/ 浏览 WPF相关的学习视频时&#xff0c;遇到微软推荐的相关视频 - XAML sutdio简介https://channel9.…

AddMvc 和 AddMvcCore 的区别

目录本文出自《从零开始学 ASP.NET CORE MVC》目录 视频课程效果更佳&#xff1a;从零开始学 Asp.Net Core MVC ASP.NET Core 为什么有 AddMvc 和 AddMvcCore 他们是什么关系&#xff1f;在本视频中&#xff0c;我们将讨论 AddMvc()和 AddMvcCore()方法之间的区别。要在 ASP.NE…

浅谈容量规划

俗话说&#xff0c;”人无远虑&#xff0c;必有近忧”&#xff0c;容量规划就是”远虑”。所谓容量规划&#xff0c;是一个产品满足用户目标需求而决定生产能力的过程。当产品发展到一个较为稳定成熟的阶段&#xff0c;产品的整体处理能力的把控自然是不可或缺&#xff0c;尽管…

CF1063C Dwarves, Hats and Extrasensory Abilities

CF1063C Dwarves, Hats and Extrasensory Abilities 题意&#xff1a; 首先题目会给出 n &#xff0c;表示要输入多少点。 然后你输出n 个点的坐标&#xff0c;每输出一个点会告诉你这个点的颜色是黑色或者白色。 最后你需要输出两个点的坐标代表一条直线&#xff0c;这条直线…

Blazor——Asp.net core的新前端框架

Blazor是微软在Asp.net core 3.0中推出的一个前端MVVM模型&#xff0c;它可以利用Razor页面引擎和C#作为脚本语言来构建WEB页面&#xff0c;如下代码简单演示了它的基本功能&#xff1a;和Angular JS和VUE的模型非常类似&#xff0c;Blazor 支持大多数应用所需的核心方案&#…

CF1149B Three Religions

CF1149B Three Religions 题意&#xff1a; 给定长度为 n 的母串和三个子串s1,s2,s3s_1,s_2,s_3s1​,s2​,s3​ 。初始时子串均为空。有 q 次询问。你需要支持两种操作&#xff1a;向某个子串末尾添加一个字母&#xff0c;或者删去某个子串末尾的字母。在每次操作后&#xff…