5分钟带你了解Kafka的技术架构 | 技术头条


戳蓝字“CSDN云计算”关注我们哦!

640?wx_fmt=jpeg

技术头条:干货、简洁、多维全面。更多云计算精华知识尽在眼前,get要点、solve难题,统统不在话下!

大家都知道 Kafka 是一个非常牛逼的消息队列框架,阿里的 RocketMQ 也是在 Kafka 的基础上进行改进的。对于初学者来说,一开始面对这么一个庞然大物会不知道怎么入手。那么这篇文章就带你先了解一下 Kafka 的技术架构,让你从全局的视野认识 Kafka。了解了 Kafka 的整体架构和消息流程之后,脑海里就会有一个大致的结构,这时候再去学习每个部分就容易得多了。

我们先来看一下 Kafka 的整体架构图:

640?wx_fmt=png

Kafka 的架构图可以分为四个部分:

  • Producer Cluster:生产者集群。一般由许多个实际的业务项目组成,其不断地往 Kafka 集群中写入数据。

  • Kafka Cluster:Kafka 服务器集群。这里就是 Kafka 作为重要的一部分,这里负责接收生产者写入的数据,并将其持久化到文件里,最终将消息提供给 Consumer Cluster。

  • Zookeeper Cluster:Zookeeper 集群。Zookeeper 负责维护整个 Kafka 集群的 Topic 信息、Kafka Controller 等信息。

  • Consumer Cluster:消费者集群。与 Producer Cluster 一样,其一般是由许多个实际的业务项目组成,不断地从 Kafka Cluster 中读取数据。

了解了 Kafka 的整体架构,那一个消息是怎么从生产者到 Kafka Server,又是如何从 Kafka Server 到消费者的呢?一般来说,一个消息的流转可以分为下面几个阶段:

  • 服务器启动阶段

  • 生产者发送消息阶段

  • Kafka存储消息阶段

  • 消费者拉取消息阶段

服务器启动阶段

首先,我们会启动 Zookeeper 服务器,作为集群管理服务器。接着,启动 Kafka Server。Kafka Server 会向 Zookeeper 服务器注册信息,接着启动线程池监听客户端的连接请求。最后,启动生产者和消费者,连接到 Zookeeper 服务器,从 Zookeeper 服务器获取到对应的 Kafka Server 信息[1]。

生产者发送消息阶段

当需要将消息存入消息队列中时,生产者根据配置的分片算法,选择分到哪一个 partition 中。在发送一条消息时,可以指定这条消息的 key,Producer 根据这个 key 和 Partition 机制来判断应该将这条消息发送到哪个 Parition。

Paritition 机制可以通过指定 Producer 的 paritition.class 这一参数来指定,该 class 必须实现 kafka.producer.Partitioner 接口。如果不实现 Partition 接口,那么会使用默认的分区算法,即根据根据 key 哈希后取余[2]。

随后生产者与该 Partition Leader 建立联系,之后将消息发送至该 partition leader。之后生产者会根据设置的 request.required.acks 参数不同,选择等待或或直接发送下一条消息。

  • request.required.acks = 0 表示 Producer 不等待来自 Leader 的 ACK 确认,直接发送下一条消息。在这种情况下,如果 Leader 分片所在服务器发生宕机,那么这些已经发送的数据会丢失。

  • request.required.acks = 1 表示 Producer 等待来自 Leader 的 ACK 确认,当收到确认后才发送下一条消息。在这种情况下,消息一定会被写入到 Leader 服务器,但并不保证 Follow 节点已经同步完成。所以如果在消息已经被写入 Leader 分片,但是还未同步到 Follower 节点,此时Leader 分片所在服务器宕机了,那么这条消息也就丢失了,无法被消费到。

  • request.required.acks = -1 表示 Producer 等待来自 Leader 和所有 Follower 的 ACK 确认之后,才发送下一条消息。在这种情况下,除非 Leader 节点和所有 Follower 节点都宕机了,否则不会发生消息的丢失。

Kafka存储消息阶段

当 Kafka 接收到消息后,其并不直接将消息写入磁盘,而是先写入内存中。之后根据生产者设置参数的不同,选择是否回复 ack 给生产者。之后有一个线程会定期将内存中的数据刷入磁盘,这里有两个参数控制着这个过程:

  1. # 数据达到多少条就将消息刷到磁盘

  2. #log.flush.interval.messages=10000

  3. # 多久将累积的消息刷到磁盘,任何一个达到指定值就触发写入

  4. #log.flush.interval.ms=1000

如果我们设置 log.flush.interval.messages=1,那么每次来一条消息,就会刷一次磁盘。通过这种方式,就可以达到消息绝对不丢失的目的,这种情况我们称之为同步刷盘。反之,我们称之为异步刷盘。

于此同时,Kafka 服务器也会进行副本的复制,该 Partition 的 Follower 会从 Leader 节点拉取数据进行保存。然后将数据存储到 Partition 的 Follower 节点中。

消费者拉取消息阶段

在消费者启动时,其会连接到 zk 注册节点,之后根据所连接 topic 的 partition 个数和消费者个数,进行 partition 分配。一个 partition 最多只能被一个线程消费,但一个线程可以消费多个 partition。其分配算法如下:

  1. 1. 将目标 topic 下的所有 partirtion 排序,存于PT

  2. 2. 对某 consumer group 下所有 consumer 排序,存于 CG,第 i 个consumer 记为 Ci

  3. 3. N=size(PT)/size(CG),向上取整

  4. 4. 解除 Ci 对原来分配的 partition 的消费权(i从0开始)

  5. 5. 将第i*N到(i+1)*N-1个 partition 分配给 Ci

我们用例子简单描述下这个算法的内容:假设我们连接的 topic 有 8 个 partition,此时有 3 个消费线程。那么 partition 的分配过程大致是这样的:

  • 8/3=2.667,向上取整就是3,也就是说每个consumer分配3个分区。

  • 那么给第一个消费者分配p0/p1/p2三个分区。

  • 给第二个消费者分配p3/p4/p5三个分区。

  • 给第三个消费者分配p6/p7两个分区。

接着消费者连接对应分区的 Kafka Server,并从该分区服务器拉取数据。

总结

这篇文章简单介绍了 Kafka 框架的技术架构以及消息流转过程,并介绍了其中的某些细节。通过这篇文章,相信大家对 Kafka 框架应该有个大致的了解。

参考资料

  • [1].kafka broker启动流程和server结构

  • [2].kafka发送消息分区选择策略详解

 

640?wx_fmt=png


福利

扫描添加小编微信,备注“姓名+公司职位”,加入【云计算学习交流群】,和志同道合的朋友们共同打卡学习!


640?wx_fmt=jpeg


推荐阅读:

  • Elastic Jeff Yoshimura:开源正在开启新一轮的创新 | 人物志

  • 深入浅出Docker 镜像 | 技术头条

  • 19岁当老板, 20岁ICO失败, 21岁将项目挂到了eBay, 为何初创公司如此艰难?

  • 码二代的出路是什么?

  • 机器学习萌新必备的三种优化算法 | 选型指南

  • 小程序的侵权“生死局”

  • @996 程序员,ICU 你真的去不起!


640?wx_fmt=png真香,朕在看了!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/523920.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mysql和Oracle 数据库操作工具类

适配Mysql和Oracle数据源 文章目录1. 适配Mysql和Oracle数据源2. 适配于Mysql数据源3. 适配Oeacle数据源1. 适配Mysql和Oracle数据源 package cn.stylefeng.guns.generator.core.util; import cn.stylefeng.guns.generator.modular.entity.DatabaseInfo; import lombok.extern…

QString类型转换为bool类型

方法 直接上代码 QString str "12.345";QVariant onLineTemp str;bool isValue onLineTemp.toBool();

Spark精华问答 | Spark的计算方法是什么?

戳蓝字“CSDN云计算”关注我们哦!Spark是一个针对超大数据集合的低延迟的集群分布式计算系统,比MapReducer快40倍左右,是hadoop的升级版本,Hadoop作为第一代产品使用HDFS,第二代加入了Cache来保存中间计算结果&#xf…

qt中创建控件布局以及删除原有布局和控件

引言 当根据数据来创建控件并布局时,如果数据更新,那么之前创建的控件便需要删除后重新创建布局。该文主要说明将原来的布局和控件删除,重新创建并布局。 示例 先看一下ui文件: 下面是实现代码: void StatusViewDi…

Spring精华问答 | Spring 能帮我们做什么?

Spring框架是一个开源的Java平台,它提供了非常容易,非常迅速地开发健壮的Java应用程序的全面的基础设施支持。今天就让我们一起来看看关于Spring的精华问答吧。1Q:什么是Spring框架?A:Spring框架是一个为Java应用程序的开发提供了综合、广泛的…

qt自定义控件的样式

引言 自定义控件创建后,有时需要设置样式,下面记录一下设置样式时需要注意的点。 注意 1.设置自定义控件的样式时,需要下面的代码: void paintEvent(QPaintEvent *event) {Q_UNUSED(event);QStyleOption opt;opt.init(this);Q…

linux 上传文件 rz命令 提示command not found 解决方法

-bash: rz: command not found rz命令没找到? 执行sz,同样也没找到。 安装lrzsz: yum -y install lrzsz现在就可以正常使用rz、sz命令上传、下载数据了。 使用方法: 上传文件 rz filename下载文件 sz filename

IoT与大数据 如何激发数字营销最大潜能?

戳蓝字“CSDN云计算”关注我们哦!技术头条:干货、简洁、多维全面。更多云计算精华知识尽在眼前,get要点、solve难题,统统不在话下!译者:风车云马 物联网与大数据概述物联网(IOT)简单理解,除了电…

qt中生成含有中文的json文件,读取含有中文的json文件

引言 之前将变量保存并在本地生成json文件,由于其中含有中文,导致生成的json文件出现乱码,或者就是生成的json文件没有乱码,但是读取生成的json文件时出现乱码,不能正常解析json. 示例 运行效果: 下面是…

年初新立Flag,新华三解决方案部做了点儿啥?

戳蓝字“CSDN云计算”关注我们哦!极客头条:速递、最新、绝对有料。这里有企业新动、这里有业界要闻,打起十二分精神,紧跟fashion你可以的!人人都提及的数字化时代,企业不想方设法提升效率怎么行&#xff1f…

qt中判断文件是否存在

实现 判断一个文件是否存在 bool isExistSpecificFile(QString strPath) {if (QFile::exists(strPath)) {return true;}return false; }注意: 这里的路径strPath是指文件所在的绝对路径,即完整的文件路径。

Docker - 实战TLS加密通讯

使用说明 演示环境(centos7,docker17.06.0-ce) 创建一个文件夹 mkdir /sslcd /ssl创建ca密钥 openssl genrsa -aes256 -out ca-key.pem 4096创建ca证书 openssl req -new -x509 -days 1000 -key ca-key.pem -sha256 -subj "/CN*" -out ca.pem创建服务器私钥 open…

qt中拖动窗口widget

提要 继承与QDialog的窗口,窗口原本按住标题栏可以拖动窗口,但是设置了窗口的隐藏标题栏属性后,窗口不再能够拖动。或者继承于QWidget的窗体,不具有窗口拖动功能。 本文实现继承于窗口widget或者继承于QDialog隐藏窗口标题栏的窗…

idea gblfy常用快捷键

gblfy日常快捷键: 关键词说明idea中对应的操作CTRL左方向光标向左跳跃一个单词Editor Actions -> Move Caret to Previous WordCTRL右方向光标向右跳跃一个单词Editor Actions -> Move Caret to Next Wordmianmian方法输入main后按着(自动提示)alt/sout快速打…

qt实现窗口拖动的两种思路

提要 窗口按下鼠标不放拖动窗口移动,鼠标释放的时候,停止拖动。这个过程可以用两种方法来实现。 1.鼠标点击后,获取鼠标按下点的坐标和起初窗口左上角的坐标,用鼠标按下点的坐标减去鼠标左上角的坐标,求出这个固定值。…

Linux怎么取消ftp的匿名访问功能

编辑vsftpd.conf: vim /etc/vsftpd/vsftpd.conf修改anonymous_enableYES 为 NO 保存退出 重起服务生效: /etc/init.d/vsftpd restart

看华为生态大学 如何玩转人才生态?

戳蓝字“CSDN云计算”关注我们哦!极客头条:速递、最新、绝对有料。这里有企业新动、这里有业界要闻,打起十二分精神,紧跟fashion你可以的!从孔子兴私学开始,千百年来,中国人在私塾中开始或完成自…

银河麒麟通过命令行安装软件没有安装上

提要 安装软件时出现: nigulasinigulasi-virtual-machine:~$ dpkg -L fcitx-frontend-qt5 | grep .so dpkg-query: 软件包 fcitx-frontend-qt5 没有被安装 使用 dpkg --info ( dpkg-deb --info) 来检测打包好的文件, 还可以通过 dpkg --contents ( dpk…

Hadoop精华问答 | 关于Hadoop核心技术的精华问答

戳蓝字“CSDN云计算”关注我们哦!随着科技时代的发展,大数据与云计算已势不可挡的架势席卷未来,不可否认,大数据时代已经来临,并将深刻地改变着我们的工作和生活。学习大数据技术,是时代的召唤,…

如何将本地代码推送至远程仓库

文章目录一、现在远程仓库创建仓库二、本地操作流程1. 用idea打开项目2. 选择需要打开项目3. 选择这个窗口打开或者用一个新的窗口打开都可以4. 导入成功的项目结构三、将本地仓库的代码推送远程仓库1. 初始化本地git仓库2. 将项目代码提交到暂存区3. 将暂存区的代码提交到本地…