kafka个人笔记

大部分内容源于https://segmentfault.com/a/1190000038173886, 本人手敲一边加强印象方便复习

消息系统的作用

解耦
冗余
扩展性
灵活性(峰值处理
可恢复
顺序保证
缓冲
异步

  • 解耦:扩展两边处理过程,只需要让他们遵守约束即可
  • 冗余:持久化数据:规避丢失风险。采用 插入-获取-删除范式明确指出消息被处理完毕
  • 扩展性:解耦处理过程,容易扩展处理过程增大消息处理频率
  • 灵活性(峰值处理:访问激增情况不常见,无需投入过多标准资源。使用消息队列顶住访问压力
  • 可恢复:系统失效时仍可保证队列消息在系统恢复后处理
  • 顺序保证:kafka保证partition内消息有序
  • 缓冲:控制和优化 数据经过系统的速度,解决生产、消费速度不一致的问题
  • 异步:允许用户把一个或若干个消息放入队列,且不立即被处理

架构

在这里插入图片描述

  1. producer,消息生产者
  2. broker:kafka集群的服务器
  3. topic:消息的类别
  4. partition:kafka分配单位,一个topic包含一个或多个partition
  5. consumer:消息消费者,终端或服务
  6. comsumer group:
    high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
  7. replica:partition副本
  8. leader:特殊的replica,producer和consumer只和leader交互
  9. follower:除了leader的replica都为follwer,复制数据
  10. controller:服务器:用于leader选举和failover
  11. zookepper,存储集群meta信息

发布消息

producer用push发布到broker,消息被append到partition,顺序写磁盘

消息路由

//构造函数
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {if (topic == null)throw new IllegalArgumentException("Topic cannot be null");if (timestamp != null && timestamp < 0)throw new IllegalArgumentException("Invalid timestamp " + timestamp);this.topic = topic;this.partition = partition;this.key = key;this.value = value;this.timestamp = timestamp;
}private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {Integer partition = record.partition();if (partition != null) {//指定了 partition 则直接使用List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());int lastPartition = partitions.size() - 1;if (partition < 0 || partition > lastPartition) {throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition));}return partition;}//否则使用 key 计算return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (keyBytes == null) {//轮询int nextValue = counter.getAndIncrement();List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();return availablePartitions.get(part).partition();} else {return DefaultPartitioner.toPositive(nextValue) % numPartitions;}} else {//对 keyBytes 进行 hash 选出一个 patitionreturn DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}
}
  1. 指定partition直接用
  2. 未指定partition但指定了key,对key进行hash得到partition
  3. 都未指定,使用轮询

写入流程

在这里插入图片描述

  1. producer从zk的/brokers/…/stateleader
  2. producer发消息给leader
  3. leader把消息写入log
  4. follower从leader拉取消息写入log后发送ACK给leader
  5. leader收到所有replica的ACK后,增加high watermark(位置信息,即位移(offset))给producer发送ack

投递保证

    ① At most once 消息可能会丢,但绝不会重复传递② At least one  消息绝不会丢,但可能会重复传递③ Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户想要的

默认 at least one

接收消息的行为

  1. comsumer从broker读取消息后,可以选择commit或处理消息
    1. 如果commit
      1. zookeeper存在comsumer在partition下读取消息的offset
      2. comsumer下次读取partition从下一条开始读取
    2. 未commit
      1. 下次读取位置和上次commit后开始位置相同

at most once

读完消息先commit再处理消息。
若commit后未处理消息系统崩坏,下次重新开始工作无法读到已提交但未处理的消息

At least once

读完消息先处理再commit消费状态(保存offset)
若处理消息后未commit系统崩坏,重新工作的时候会处理未commit的消息(处理两次)

Exactly once 两阶段提交

协调offset和实际操作的输出。但由于许多输出系统不支持两阶段提交,更为通用的方式是将offset和操作输入存在同一个地方

  1. consumer拿到数据后可能把数据放到HDFS
  2. 最新的offset和数据一起写到HDFS
  3. 保证offset更新和数据输出同时完成

(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)。

消息保存

topic分为多个partition,每个partition对应一个文件夹

无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据

  • 基于时间:log.retention.hours=168
  • 基于大小:log.retention.bytes=1073741824
log.cleanup.policy=delete启用删除策略
直接删除,删除后的消息不可恢复。可配置以下两个策略:清理超过指定时间清理: 
log.retention.hours=16超过指定大小后,删除旧的消息:
log.retention.bytes=1073741824

请添加图片描述

topic的创造

  1. controller在ZK的/brokers/topics 节点上注册 watcher
    ,topic被创建的时候,controller 会通过 watch 得到该 topic 的 partition/replica 分配
  2. controller从 /brokers/ids 读取当前所有可用的 broker 列表,对于 set_p 中的每一个 partition:
    1. 分配给partition的所有replica(称为AR)任选一个可用的broker作为leader并将AR设置为ISR
    2. 新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state
  3. controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest。

请添加图片描述
删除 topic 的序列

  1. controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher
  2. topic 被删除,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配
  3. 若 delete.topic.enable=false,结束;反之controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 通过回调向对应的 broker 发送 StopReplicaRequest

kafka HA 高可用性

replica

同一个 partition 可能会有多个 replica —— erver.properties 配置中的 default.replication.factor=N

若没有replica,broker死机

  • patition 的数据都不可被消费
  • producer 也不能再将数据存于其上的 patition

引入replica,需要选取leader,leader与producer和consumer交互,其他replica与leader复制数据

分配规则

  1. 将所有 broker(假设共 n 个 broker)和待分配的 partition 排序
  2. 将第 i 个 partition 分配到第(i mod n)个 broker 上
  3. 将第 i 个 partition 的第 j 个 replica 分配到第((i + j) mode n)个 broker上

leader failover

partition 对应的 leader 宕机时,需要从 follower 中选举出新 leader

新的 leader 必须拥有旧 leader commit 过的所有消息

zookeeper 中(/brokers/…/state)动态维护了一个 ISR(in-sync replicas)。只有 ISR 里面的成员才能选为 leader。若有f个replica,partition可以保证f-1个replica失效情况下消息不丢失

failover方案

  • 等待 ISR 中的任一个 replica 活过来,并选它作为 leader。可保障数据不丢失,但时间可能相对较长。
  • 选择第一个活过来的 replica(不一定是 ISR 成员)作为 leader。无法保障数据不丢失,但相对不可用时间较短
    多用第二种方式

broker failover

在这里插入图片描述

  1. controller在zookeeper的/brokers/ids/[brokerId] 节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch
  2. controller从/brokers/ids 节点读取可用broker
  3. controller决定set_p,集合包含死机broker上所有partition
  4. 对set_p所有partition进行: 1. 读取/brokers/ids 节点读取可用broker的ISR 2. 决定新leader, 新leader ISR controller_epoch和leader_epoch信息写入state结点
  5. 通过RPC给broker发送 leaderAndISRRequest 命令

controller failover

controller 宕机时会触发 controller failover

  1. broker在zookeeper的controller节点注册watcher
  2. controller宕机时,zookeeper临时节点消失
  3. 所有存活broker收到fire通知
  4. 每个broker尝试创建新的controller path,其中一个竞选成功为controller
  5. 当选成功触发KafkaController.onControllerFailover
1. 读取并增加 Controller Epoch。
2. 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。
3. 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。
4. 通过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。
5. 若 delete.topic.enable=true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。
6. 通过 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注册Watch。
7. 初始化 ControllerContext 对象,设置当前所有 topic,“活”着的 broker 列表,所有 partition 的 leader 及 ISR等。
8. 启动 replicaStateMachine 和 partitionStateMachine。
9. 将 brokerState 状态设置为 RunningAsController。
10. 将每个 partition 的 Leadership 信息发送给所有“活”着的 broker。
11. 若 auto.leader.rebalance.enable=true(默认值是true),则启动 partition-rebalance 线程。
12. 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。

消费

kafka 提供了两套 consumer API:

The high-level Consumer API
The SimpleConsumer API

consumer API

high-level提供kafka消费数据的抽象

  1. 提供了 consumer group 的语义
  2. 消息只能被group内一个consumer消费
  3. 消费的时候不关注offset
  4. 最后一个offset由zookeeper保存

使用high-level consumer API可以是多线程应用

if(消费线程 > partition){部分线程收不到消息
}
if(消费线程 < partition){有些线程收到多个partition消息
}if(一个线程消费多个 patition){无法保证收到消息的顺序
}

** SimpleConsumer API**

适用以下情况

  • 多次读取一个消息
  • 只消费一个 patition 中的部分消息
  • 使用事务来保证一个消息仅被消费一次

partition, offset, broker, leader不透明,需要自己管理

  • 追踪offset确定下一条消费的信息
  • 找出每个partition的follower
  • 处理leader变更

流程如下

  1. 查找到一个“活着”的 broker,并且找出每个 partition 的 leader
  2. 找到partition的follower
  3. 定义好请求,该请求应该能描述应用程序需要哪些数据
  4. fetch数据
  5. 识别leader变化并做出响应

consumer group
kafka分配单位是partition,consumer属于一个group
一个partition被一个group内的一个consumer消费(但是多个group可以同时消费这个partition)

实现离线处理与实时处理

  • spark 实时处理
  • hadoop 离线处理

消费方法

consumer用pull模式从broker读数据

push 模式很难适应消费速率不同的消费者

  • 消息发送速率是由 broker 决定的
  • 尽可能以最快速度传递消息
  • 容易造成 consumer 来不及处理消息(拒绝服务、网络拥塞

pull模式,consumer根据自己的能力消费信息

pull的优点

  • 简化broker设计
  • consumer自主控制消费速率
  • consumer自主控制消费方式 —— 批量/逐条
  • 选择不同提交方式

消费者递送保证

consumer 设置为 autocommit,consumer 一旦读到数据立即自动 commit(Exactly once

实际使用过程中,并不是consumer读完消息就结束了,还需要进一步处理。
处理和commit顺序决定了 consumer delivery guarantee

  • 先commit,后处理消息(At most once
    • consumer 在 commit 后还没来得及处理消息就 crash
    • 重新开始工作后就无法读到刚刚已提交而未处理的消息
  • 先处理再commit( At least once
    • 处理完消息之后 commit 之前 consumer crash
    • 恢复工作:处理刚刚未 commit 的消息
  • 两阶段提交
    (offset 和操作输入存在同一个地方,会更简洁和通用)
    (若不支持,consumer 拿到数据后可能把数据放到 HDFS,如果把最新的 offset 和数据本身一起写到 HDFS,那就可以保证数据的输出和 offset 的更新要么都完成,要么都不完成,间接实现 Exactly once) —— high-level API里面offset存于zookeeper中,无法存于HDFS,simple可以存于HDFS

consumer rebalance

触发机制

  • consumer加入退出
  • partition改变(broker 加入退出

算法如下

  1. 目标topic的partition排序,存于PT
  2. 选择consumer group下所有consumer排序, 存于CG
  3. N = ⌈ s i z e ( P T ) / s i z e ( C G ) ⌉ N = \lceil size(PT)/size(CG)\rceil N=size(PT)/size(CG)⌉
  4. 对group内原本的分配partition解除关系
  5. 然后每N个partition分配给一个consumer

consumer调整了单个partition后,为了保证一致性,group内其他consumer也应触发balance

导致以下问题

herd effect

  • broker,comsumer增减触发rebalance

split brain

  • 每个consumer单独通过zk判断broker和consumer宕机,不同的consumer同时从zookeeper看到的view可能不一致 —— 导致不正确的rebalance
  • 所有consumer不知道其他consumer的rebalance是否成功,导致kafka工作状态不正确
  • 因此0.9开始使用中心coordinator空值rebalance,计划在consumer客户端分配方案

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/145091.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Leetcode刷题详解——衣橱整理

1. 题目链接&#xff1a;LCR 130. 衣橱整理 2. 题目描述&#xff1a; 家居整理师将待整理衣橱划分为 m x n 的二维矩阵 grid&#xff0c;其中 grid[i][j] 代表一个需要整理的格子。整理师自 grid[0][0] 开始 逐行逐列 地整理每个格子。 整理规则为&#xff1a;在整理过程中&am…

Git常用操作-MD

文章目录 1. 本地创建分支&#xff0c;编写代码&#xff0c;提交本地分支到远程仓库2. 提交本地代码到本地仓库3. 提交本地代码到本地dev分支4. 提交本地dev分支到远程仓库5. 本地dev分支拉取远程master分支&#xff0c;并将master分支内容合并到本地dev6. 同义命令7. 撤销上次…

计算机视觉的应用16-基于pytorch框架搭建的注意力机制,在汽车品牌与型号分类识别的应用

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下计算机视觉的应用16-基于pytorch框架搭建的注意力机制&#xff0c;在汽车品牌与型号分类识别的应用&#xff0c;该项目主要引导大家使用pytorch深度学习框架&#xff0c;并熟悉注意力机制模型的搭建&#xff0c;这个…

PDF文件中更改 PDF 文本颜色的最有效解决方案

PDF 是最常用的文档类型之一&#xff0c;也是商业中使用的首选文档。在工作中&#xff0c;我们经常需要修改PDF的文本内容&#xff0c;转换格式&#xff08;如PDF转Word&#xff0c;PDF转Excel等&#xff09;&#xff0c;合并PDF&#xff0c;以达到更好的工作效果。 然而&…

【精选】JavaScript语法大合集【附代码和超详细介绍以及使用】

JavaScript语法大合集 JavaScript引入到文件 嵌入到HTML文件中 <body><script>var num10;console.log(num);</script> </body>引入本地独立JS文件 <body><script src"./hello.js"></script> </body>引入网络来源…

基于SpringBoot+Vue的新能源汽车充电桩管理系统

基于SpringBootVue的新能源汽车充电桩管理系统的设计与实现~ 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringBootMyBatisVue工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 主页 充电桩详情 管理员界面 摘要 本项目是基于Spring Boot 和 …

【深度学习】吴恩达课程笔记(五)——超参数调试、batch norm、Softmax 回归

笔记为自我总结整理的学习笔记&#xff0c;若有错误欢迎指出哟~ 【吴恩达课程笔记专栏】 【深度学习】吴恩达课程笔记(一)——深度学习概论、神经网络基础 【深度学习】吴恩达课程笔记(二)——浅层神经网络、深层神经网络 【深度学习】吴恩达课程笔记(三)——参数VS超参数、深度…

前端安全策略保障

文章目录 前言后台管理系统网络安全XSSCSRFSQL注入 后言 前言 hello world欢迎来到前端的新世界 &#x1f61c;当前文章系列专栏&#xff1a;前端系列文章 &#x1f431;‍&#x1f453;博主在前端领域还有很多知识和技术需要掌握&#xff0c;正在不断努力填补技术短板。(如果出…

AR贴纸特效SDK,无缝贴合的虚拟体验

增强现实&#xff08;AR&#xff09;技术已经成为了企业和个人开发者的新宠。它通过将虚拟元素与现实世界相结合&#xff0c;为用户提供了一种全新的交互体验。然而&#xff0c;如何将AR贴纸完美贴合在人脸的面部&#xff0c;同时支持多张人脸的检测和标点及特效添加&#xff0…

遵循开源软件安全路线图

毫无疑问&#xff0c;开源软件对于满足联邦任务所需的开发和创新至关重要&#xff0c;因此其安全性至关重要。 OSS&#xff08;运营支持系统&#xff09; 支持联邦政府内的每个关键基础设施部门。 联邦政府认识到这一点&#xff0c;并正在采取措施优先考虑 OSS 安全&#xff…

LeetCode算法题解|LeetCode738. 单调递增的数字、LeetCode968. 监控二叉树

一、LeetCode738. 单调递增的数字 题目链接&#xff1a;738. 单调递增的数字 题目描述&#xff1a; 当且仅当每个相邻位数上的数字 x 和 y 满足 x < y 时&#xff0c;我们称这个整数是单调递增的。 给定一个整数 n &#xff0c;返回 小于或等于 n 的最大数字&#xff0c…

新晋“学霸”夸克大模型拿下C-Eval和CMMLU双榜第一

11月16日&#xff0c;根据最新成绩&#xff0c;千亿级参数的夸克大模型登顶C-Eval和CMMLU两大权威评测榜单&#xff0c;多项性能优于GPT-4。在国内大模型赛道火热的当下&#xff0c;夸克自研大模型凭借过硬的研发能力及数据、行业、平台等优势成为新晋“学霸”。 作为国内最权…

[C++]:8.C++ STL引入+string(介绍)

C STL引入string(介绍&#xff09; 一.STL引入&#xff1a;1.什么是STL2.什么是STL的版本&#xff1a;2-1&#xff1a;原始版本&#xff1a;2-2&#xff1a;P. J 版本&#xff1a;2-3&#xff1a;RW 版本&#xff1a;2-4&#xff1a;SGL版本&#xff1a; 3.STL 的六大组件&…

为React Ant-Design Table增加字段设置 | 京东云技术团队

最近做的几个项目经常遇到这样的需求&#xff0c;要在表格上增加一个自定义表格字段设置的功能。就是用户可以自己控制那些列需要展示。 在几个项目里都实现了一遍&#xff0c;每个项目的需求又都有点儿不一样&#xff0c;迭代了很多版&#xff0c;所以抽时间把这个功能封装了…

【Electron】electron-builder打包失败问题记录

文章目录 yarn下载的包不支持require()winCodeSign-2.6.0.7z下载失败nsis-3.0.4.1.7z下载失败待补充... yarn下载的包不支持require() 报错内容&#xff1a; var stringWidth require(string-width)^ Error [ERR_REQUIRE_ESM]: require() of ES Module /stuff/node_modules/…

一文浅入Springboot+mybatis-plus+actuator+Prometheus+Grafana+Swagger2.9.2开发运维一体化

Swagger是一个规范和完整的框架,用于生成、描述、调用和可视化 RESTFUL风格的Web服务,是非常流行的API表达工具。 Swagger能够自动生成完善的 RESTFUL AP文档,,同时并根据后台代码的修改同步更新,同时提供完整的测试页面来调试API。 Prometheus 是一个开源的服务监控系统和时…

设计模式解码:软件工程架构的航标

引言 软件工程领域的设计模式&#xff0c;就像是建筑师手中的设计蓝图&#xff0c;它们是经验的总结&#xff0c;指导开发者如何在面对层出不穷的编程难题时&#xff0c;构建出既稳固又灵活的软件结构。就像一座经过精心设计的大厦能够经受住风雨的考验一样&#xff0c;一个利用…

智慧城市怎么实时监测内涝积水的发生及解决办法?

随着城市化进程步伐不断加快&#xff0c;城市内涝问题越来越受到人们的关注。内涝不仅不便于人们的生活&#xff0c;还可能危害城市之中的基础设施比如路面等。因此实时监测内涝积水的发生并采取有效的解决办法是市政府的紧急任务&#xff0c;同时解决城市内涝也利于城市生命线…

OpenCV中的像素重映射原理及实战分析

引言 映射是个数学术语&#xff0c;指两个元素的集之间元素相互“对应”的关系&#xff0c;为名词。映射&#xff0c;或者射影&#xff0c;在数学及相关的领域经常等同于函数。 基于此&#xff0c;部分映射就相当于部分函数&#xff0c;而完全映射相当于完全函数。 说的简单点…

linux高级篇基础理论二(详细文档、LAMP、SHELL、sed正则表达式)

♥️作者&#xff1a;小刘在C站 ♥️个人主页&#xff1a; 小刘主页 ♥️不能因为人生的道路坎坷,就使自己的身躯变得弯曲;不能因为生活的历程漫长,就使求索的 脚步迟缓。 ♥️学习两年总结出的运维经验&#xff0c;以及思科模拟器全套网络实验教程。专栏&#xff1a;云计算技…