Kafka核心原理

1、Topic的分片和副本机制

分片作用:

解决单台节点容量有限的问题,节点多,效率提升,吞吐量提升。通过分片,将一个大的容器分解为多个小的容器,分布在不同的节点上,从而实现分布式存储。

分片的数量没有限制,与节点数量没有关系,分片数量不会超过总节点数量的三倍。

副本作用:

提升数据的可靠性,副本越多数据越可靠,但是数据冗余越高。

副本数量有限制,最多和节点的数量相等,但是一般构建1~3个之间。

2、Kafka如何保证数据不丢失

数据传输的三个阶段:

生产者生产数据到broker

broker存储数据

消费者从broker上消费数据

<1> 生产端如何保证数据不丢失:
生产数据到broker之后的响应机制

当生产者生产数据到Broker后,Broker应该给于确认响应(ack)。

ack 确认机制,主要有三种方案,分别为0   1   -1(ALL)

0:生产者只管将数据生产到Borker ,不等待Broker返回的ack 信息

1:生产者将数据生产到Broker,需要等待Broker端Topic的对应分片上的主副本接收到消息后,即为成功发送消息。

-1: 生产者将数据生产到broker,需要等待broker端Topic的对应分片所有副本都接收到消息,即为成功发送

生产中一般根据消息重要情况以及生成和消费速率来选择相应的级别。一般来说,重要程度越高的,安全级别越高,速率越高,优先保证安全性,在此基础上,保持平衡。

相关问题思考:

1-生产者发送一条数据到Broker,Broker给于一次响应,如果Broker迟迟不予响应,怎么办?

先等待,然后重试,最后报错,先等待一段时间,当超时后,然后触发充重试策略,进行重试操作,当重试后依然没有响应,最后程序报错,停止发送。

2-生产者发送一条数据,Broker就要给予一次响应,那么这样是否会占用更多的带宽,如果占用,如何解决?

肯定会影响,可以引入缓存池,生产者在生产数据的时候,底层先将其放置到一个缓存池,当池子中消息数据达到一批数据大小后,会专门有一个子线程,触发执行,将数据生产到Broker,此时Broker只需要对这一批数据给予一次响应即可,异步发送。

3- 如果采用一批一批的发送,如果Broker又没有给予响应,但是,此时缓存池中数据已经满了,如何解决?

可以选择直接清空缓存池或者不清空,如果数据可以重复读,直接报错清空即可,后续重新读取数据即可,如果数据不可重复读,可以提前设置处理方案,将每一个消息提前先找一个容器进行备份存储,自己维护数据,当发送成功,删除一部分数据,如果出错,重启后,先从这个容器将剩余的数据发送即可,当然如果选择不清空,那么一直等待即可。

相关的一些参数设置:

buffer.memory 设置缓存池大小,默认值33554432(32M)

retries 重试的次数,默认值2147483647,最终的重试策略取决于超时设置

batch.size  表示一批数据大小,默认值16384(16KB)

delivery.timeout.ms 总超时时间,默认值120000(120S)

requesst.timeout.ms 每一次请求后的超时时间(等待时间),默认值 30000(30s )

<2>Broker 如何保证数据不丢失

Broker可以将每个分片的副本数量设置为多个,提供数据的可靠性同时还需要生产端将ACK设置为-1

<3>消费端如何保证数据不丢失

消费者连接kafka集群,kafka收到请求后,首先会根据group_id 查询上一次消费到了哪个消息偏移量,如果没有找到,默认从当前的位置开始消费数据,之前的消息默认不处理,如果找到了,就从记录的消息偏移量位置继续消费数据即可

消费者消费完数据后,会把对应的消息的偏移量信息重新提交给Broker记录

在提交偏移量的时候有两种提交方式:自动提交偏移量,手动提交偏移量

配置自动提交:

  consumer = KafkaConsumer("test",bootstrap_servers=['localhost:9092'],group_id='g_2',enable_auto_commit=True,auto_commit_interval_ms=1000)

手动提交:

  consumer = KafkaConsumer("test",bootstrap_servers=['localhost:9092'],group_id='g_2',enable_auto_commit=False,auto_commit_interval_ms=1000)consumer.commit()  #同步提交
consumer.commit_async()  #异步提交

3、Kafka中生产者的数据分发策略

分发策略:生产者生产数据到Broker的某一Topic,这个数据最终落入到那个分片的副本,即是分发策略

1、Hash策略 -- 支持
def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):

如果发送数据时指定了topic ,value , key, 即是采用hash 策略

相同key 的hash是一样的,会分发到同一个分区

2、随机分发策略---python客户端支持,Java客户端不支持

发送数据时,如果只传递了Topic和value,即是随机分发

3、轮询策略 -- 2.4版本以上修改为粘性策略,2.4版本以下支持,但是这两种方式Java客户端支持,python不支持
4、指定分区的策略 -- 支持

当发送数据时,如果指定了partition参数,即是采用指定分区策略,分区的编号从0 开始

5、自定义分区策略 -- 支持
from kafka.partitioner import DefaultPartitioner

参考Kafka的默认分发策略方法DefaultPartitioner

    def __call__(cls, key, all_partitions, available):"""Get the partition corresponding to key:param key: partitioning key:param all_partitions: list of all partitions sorted by partition ID:param available: list of available partitions in no particular order:return: one of the values from all_partitions or available"""if key is None:if available:return random.choice(available)return random.choice(all_partitions)idx = murmur2(key)idx &= 0x7fffffffidx %= len(all_partitions)return all_partitions[idx]

自定义实现:

class MyPartitioner(object):def __call__(self, key, all_partitions, available):# 实现分发策略return all_partitions[i]kafkaProducer = KafkaProducer(bootstrap_servers=[],ack=-1,partitioner=MyPartitioner()
)

4、kakfa的存储和查询机制

1、存储:

数据存储在磁盘中,位置取决于配置log.dirs参数

在对应目录下,以topic名称+分片号创建目录,目录下有2个主要文件log文件和index文件

log文件:存储消息数据,前面的数字代表消息从哪个偏移量开始存储

index文件:存索引数据,用于加速查询

默认情况下,当log文件达到1G时,会拆分数据,滚动形成一个新的log文件,index文件也会随之产生

问题:为什么不放在同一个文件?

一个文件过大,打开和关闭可能很耗费资源,从一个log中检索数据相对也很慢,影响效率。

Kafka本质上是一个消息队列的中间件,仅仅负责消息的临时存储,当消息超过一定的时间,Kafka就会执行删除数据操作,默认168小时

如果保存到一个文件,删除数据就会挺慢。当一个log永远不会达到1G时,就永远不会被删除。

2、查询

首先确定要读取的offset在那个segment 片段中

查询这个片段的index文件,根据offset确定这个消息在log文件中的什么位置

读取log文件,检索对应位置下的内容即可(底层是基于磁盘顺序查询)

获取最终的消息数据

5、kafka的消费者的负载均衡机制

假设生产者速率400条/分钟, 消费者速率400条/分钟

随着发展,生产者速率达到1200/分钟,但是消费者还是400/分钟,会造成什么问题?

会造成大量的数据在Broker积压,影响消息处理的时效性问题。增加消费者,必须保证都在同一个消费者组内

随着发展,生产者速率达到1600/分钟,但是消费者还是400/分钟,会造成什么问题?

会造成大量的数据在Broker积压,影响消息处理的时效性问题。此时再添加一个消费者,但是总有一个消费者无法消费数据,这是因为Kafka的消费者的负载均衡机制,只能增加topic的分片数量。

消费者的负载均衡机制:

1、在一个消费者组内,消费者的数量最多和所监听的topic 分片数量是相等的,如果有多余的消费者,那么会出现某些消费者处于限制的状态

2、在一个消费者组内,topic的一个分片的数据,只能被一个消费者所消费,不允许出现一个分片被组内的多个消费者消费的情况,但一个消费者可以消费多个分区的数据

所以点对点和发布订阅的实现

点对点:把监听这个topic的消费者全部放到同一个消费者组内,这样消息必然只能有一个消费者消费

发布订阅:把监听这个topic的消费者放置在不同的消费者组内,这样消息就可以被多个消费者消费了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/99834.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Scala第十九章节

Scala第十九章节 scala总目录 文档资料下载 章节目标 了解Actor的相关概述掌握Actor发送和接收消息掌握WordCount案例 1. Actor介绍 Scala中的Actor并发编程模型可以用来开发比Java线程效率更高的并发程序。我们学习Scala Actor的目的主要是为后续学习Akka做准备。 1.1 Ja…

C:数组传值调用和传地址调用

传地址调用 对数组进行修改&#xff1a;排序… #include <stdio.h>// 函数用于交换两个整数的值 void swap(int *a, int *b) {int temp *a;*a *b;*b temp; }// 函数用于对整数数组进行升序排序 void sortArray(int *arr, int size) {for (int i 0; i < size - 1…

开源ERP和CRM套件Dolibarr

什么是 Dolibarr &#xff1f; Dolibarr ERP & CRM 是一个现代软件包&#xff0c;用于管理您组织的活动&#xff08;联系人、供应商、发票、订单、库存、议程…&#xff09;。它是开源软件&#xff08;用 PHP 编写&#xff09;&#xff0c;专为中小型企业、基金会和自由职业…

【taro react】 ---- 常用自定义 React Hooks 的实现【四】之遮罩层

1. 问题场景 在实际开发中我们会遇到一个遮罩层会受到多个组件的操作影响,如果我们不采用 redux 之类的全局状态管理,而是选择组件之间的值传递,我们就会发现使用组件的变量来控制组件的显示和隐藏很不方便,更不要说像遮罩层这样一个项目多处使用的公共组件,他的隐藏和显示…

Python字典全解析:从基础到高级应用

更多资料获取 &#x1f4da; 个人网站&#xff1a;涛哥聊Python 字典是一种强大而多才多艺的数据类型&#xff0c;它以键-值对的形式储存信息&#xff0c;让我们能够以惊人的效率处理和管理数据。 字典能够将键和值关联在一起&#xff0c;使得数据的存储和检索变得非常高效。…

ansible条件示例剧本

备注&#xff1a;以下下示例主机组均为pxg ####系统符合就安装apache### --- - name: Install Apache on CentOS 7 hosts: pxg remote_user: root # 使用root身份执行命令 tasks: - name: Install Apache yum: name: httpd # Apache软件…

calc方法和vue中calc不生效踩坑

calc方法 calc()方法是css用来计算的,比如一个场景,上下固定高度,中间自适应,就可以使用这个方法。 预编译less也是可以使用这个方法的 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewp…

链表(7.27)

3.3 链表的实现 3.3.1头插 原理图&#xff1a; newnode为新创建的节点 实现&#xff1a; //头插 //让新节点指向原来的头指针&#xff08;节点&#xff09;&#xff0c;即新节点位于开头 newnode->next plist; //再让头指针&#xff08;节点&#xff09;指向新节点&#…

插入排序/折半插入排序

插入排序/折半插入排序 插入排序 插入排序(英语&#xff1a;Insertion Sort)是一种简单直观的排序算法。它的工作原理是通过构建有序序列&#xff0c;对于未排序数据&#xff0c;在已排序序列中从后向前扫描&#xff0c;找到相应位置并插入。插入排序在实现上&#xff0c;通常…

盲盒商城源码 盲盒开箱源码 潮物盲盒商城源码 仿CSGO盲盒开箱源码

百度seo的要求对于网页内容的伪原创是相对严格的,需要进行一定的修改使其符合百度seo的标准 根据您的要求,我们将对后面的这段话进行伪原创修改,只修改文字符号,不对字数进行调整: 盲盒商城源码、盲盒开箱源码、潮物盲盒商城源码、仿CSGO盲盒开箱源码 带有Vue源代码,前端…

AF_UNIX和127.0.0.1(AF_INET)回环地址写数据速度对比

在linux下&#xff0c;存在着这样的情况&#xff0c;本地的进程间通信&#xff0c;并且其中一个是服务端&#xff0c;另外的都是客户端。 服务端通过绑定端口&#xff0c;客户端往127.0.0.1的对应端口发送&#xff0c;即可办到&#xff0c;不过这样会浪费一个端口&#xff0c;同…

如何实现MySQL的增删改查

MySQL是一种关系型数据库管理系统&#xff0c;提供了增删改查&#xff08;CRUD&#xff09;操作来管理数据库中的数据。下面是一些关于MySQL的增删改查的简要说明&#xff1a; 增加&#xff08;Create&#xff09;: 使用INSERT语句向数据库表中插入新的数据行。语法如下&#x…

git常用命令和开发常用场景

git命令 git init 创建一个空的git仓库或者重新初始化已有仓库 git clone [url] 将存储库克隆到新目录 git add 添加内容到索引 git status 显示工作树状态 git commit -m "" 记录仓库的修改 git reset 重置当前HEAD到指定的状态 git reset –-soft&#xff1a;…

2020架构真题(四十六)

、以下关于操作系统微内核架构特征的说法&#xff0c;不正确的是&#xff08;&#xff09;。 微内核的系统结构清晰&#xff0c;利于协作开发微内核代码量少&#xff0c;系统具有良好的可移植性微内核有良好的的伸缩性和扩展性微内核功能代码可以互相调用&#xff0c;性能很高…

3款国产办公软件,不仅好用,还支持linux国产操作系统

当提到国产办公软件并支持Linux国产操作系统时&#xff0c;以下是三款备受好评的软件&#xff1a; 1. WPS Office&#xff08;金山办公套件&#xff09; WPS Office是中国知名的办公软件套件&#xff0c;也是一款跨平台的应用程序。它包含文字处理、表格编辑和演示文稿等常见办…

06-进程间通信

学习目标 熟练使用pipe进行父子进程间通信熟练使用pipe进行兄弟进程间通信熟练使用fifo进行无血缘关系的进程间通信使用mmap进行有血缘关系的进程间通信使用mmap进行无血缘关系的进程间通信 2 进程间通信相关概念 2.1 什么是进程间通信 Linux环境下&#xff0c;进程地址空间…

STM32F030在使用内部参考电压 (VREFINT)时与STM32G070的区别

背景&#xff1a; 之前使用过STM32G070的内部参考电压来提升ADC采集的准确度&#xff08;STM32使用内部参考电压提高ADC采集准确度&#xff09;&#xff0c;所以本次使用STM32F030的芯片时直接把之前G070的代码拿过来用了&#xff0c;但是出现了问题。 查找资料发现两者不同&am…

STM32CubeMX学习笔记-USART_DMA

STM32CubeMX学习笔记-USART_DMA 一、DMA的概念二、数据传输方式普通模式循环模式 三、以串口方式讲解串口DMA方式发送函数&#xff1a;HAL_UART_Transmit_DMA串口DMA方式接收函数&#xff1a;HAL_UART_Receive_DMA获取未传输数据个数函数&#xff1a;__HAL_DMA_GET_COUNTER关闭…

解析navicate数据库密码

在线运行地址:代码在线运行 - 在线工具 <?php class NavicatPassword {protected $version 0;protected $aesKey libcckeylibcckey;protected $aesIv libcciv libcciv ;protected $blowString 3DC5CA39;protected $blowKey null;protected $blowIv null;public func…

spring boot整合Minio

MinIO 安装MinIo # 先创建minio 文件存放的位置 mkdir -p /opt/docker/minio/data# 启动并指定端口 docker run \-p 9000:9000 \-p 5001:5001 \--name minio \-v /opt/docker/minio/data:/data \-e "MINIO_ROOT_USERminioadmin" \-e "MINIO_ROOT_PASSWORDmini…