Kafka知识总结(事务+数据存储+请求模型+常见场景)

文章收录在网站:http://hardyfish.top/

文章收录在网站:http://hardyfish.top/

文章收录在网站:http://hardyfish.top/

文章收录在网站:http://hardyfish.top/

在这里插入图片描述

事务

事务Producer保证消息写入分区的原子性,即这批消息要么全部写入成功,要么全失败。此外,Producer重启回来后,kafka依然保证它们发送消息的精确一次处理。

开启enable.idempotence = true

设置Producer端参数transctional.id

数据的发送需要放在beginTransaction和commitTransaction之间。

Consumer端的代码也需要加上isolation.level参数,用以处理事务提交的数据。

producer.initTransactions();
try {producer.beginTransaction();producer.send(record1);producer.send(record2);producer.commitTransaction();
} catch (KafkaException e) {producer.abortTransaction();
}

事务Producer虽然在多分区的数据处理上保证了幂等,但是处理性能上相应的是会有一些下降的。

数据存储

Kafka 消息以 Partition 作为存储单元,每个 Topic 的消息被一个或者多个 Partition 进行管理。

  • Partition 是一个有序的,不变的消息队列,消息总是被追加到尾部。
  • 一个 Partition 不能被切分成多个散落在多个 Broker 上或者多个磁盘上。

Partition 又划分成多个 Segment 来组织数据。

Segment 在它的下面还有两个组成部分:

  • 索引文件:以 .index 后缀结尾,存储当前数据文件的索引。
  • 数据文件:以 .log 后缀结尾,存储当前索引文件名对应的数据文件。

在这里插入图片描述

请求模型

在这里插入图片描述

请求到Broker后,也会通过类似于请求转发的组件Acceptor转发到对应的工作线程上,Kafka中被称为网络线程池,一般默认每个Broker上为3个工作线程,可以通过参数 num.network.threads 进行配置。

并且采用轮询的策略,可以很均匀的将请求分发到不同的网络线程中进行处理。

但是实际的处理请求并不是由网络线程池进行处理的,而是会交给后续的IO线程池,当网络线程接受到请求的时候,会将请求写入到共享的请求队列中,而IO线程池会进行异步的处理,默认情况下是8个,可以通过 num.io.threads 进行配置。

常见场景

重复消费

consumer 在消费过程中,应用进程被强制kill掉或发生异常退出。

例如在一次poll500条消息后,消费到200条时,进程被强制kill消费到offset未提交,或出现异常退出导致消费到offset未提交。

下次重启时,依然会重新拉取500消息,造成之前消费到200条消息重复消费了两次。

消费者消费时间过长。

max.poll.interval.ms参数定义了两次poll的最大间隔,它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起 离开组 的请求,Coordinator 也会开启新一轮 Rebalance。

因为上次消费的offset未提交,再次拉取的消息是之前消费过的消息,造成重复消费。

提高消费能力,提高单条消息的处理速度;根据实际场景max.poll.interval.ms值设置大一点,避免不必要的rebalance;

可适当减小max.poll.records的值,默认值是500,可根据实际消息速率适当调小。

消息丢失

消费者程序丢失数据

Consumer 程序从 Kafka 获取到消息后开启了多个线程异步处理消息,而 Consumer 程序自动地向前更新位移

假如某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于 Consumer 而言实际上是丢失了。

最佳配置:

不要使用 producer.send(msg),而要使用 producer.send(msg, callback)

设置 acks = all:

  • 设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是 已提交。

设置 retries 为一个较大的值。

  • 当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。

设置 unclean.leader.election.enable = false

设置 replication.factor >= 3

  • 防止消息丢失的主要机制就是冗余。

设置 min.insync.replicas > 1

  • 控制的是消息至少要被写入到多少个副本才算是 已提交 。
  • 设置成大于 1 可以提升消息持久性。
  • 在实际环境中千万不要使用默认值 1。

确保 replication.factor > min.insync.replicas

  • 如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。

确保消息消费完成再提交。

  • Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。

消息顺序

乱序场景一

因为一个topic可以有多个partition,kafka只能保证partition内部有序。

1、可以设置topic 有且只有一个partition。

2、根据业务需要,需要顺序的指定为同一个partition。

乱序场景二

对于同一业务进入了同一个消费者组之后,用了多线程来处理消息,会导致消息的乱序。

消费者内部根据线程数量创建等量的内存队列,对于需要顺序的一系列业务数据,根据key或者业务数据,放到同一个内存队列中,然后线程从对应的内存队列中取出并操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/51405.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

flask和python,navicat实现数据库和python的结合

app.py import pymysql from flask import Flask, render_template, requestapp Flask(__name__)app.route(/add/user, methods["GET", "POST"]) def add_user():if request.method "GET":return render_template(add_user.html) # 修正模板…

C语言 ——— 指针和字符数组的相关笔试题和解析(上篇)

目录 字符数组和指针的笔试题(sizeof篇) 字符数组和指针的笔试题(strlen篇) 字符数组和指针的笔试题(sizeof篇) 1. char arr[] { a, b, c, d, e, f }; printf("%d\n", sizeof(arr)); 首先要…

国内本地化OCSP服务的SSL证书:提升安全与效率的新选择

在数字化时代,网络安全成为企业运营和用户体验的重要基石。HTTPS(Hypertext Transfer Protocol Secure)作为一种安全的网络协议,通过SSL(Secure Sockets Layer)加密技术,保障了数据传输的机密性…

Windows 端口占用 Port 端口占用 如何发现端口占用并且强杀?

应用场景 场景 有时候本地测试,经常发现端口占用。 如何找到端口占用,并且 kill 掉呢? 端口占用情况 lsof -i:XXX查看 sudo netstat -apn | grep XXX查看 ps -aux | grep XXX详细信息 ps -ef | grep XXX根据分类条件查询信息 发现并…

MATLAB基础应用精讲-【数模应用】Poisson 回归分析(附R语言代码实现)

目录 前言 知识储备 基于泊松回归、负二项回归模型 数据分布介绍 模型介绍 模型的选择 案例介绍 算法原理 泊松回归 数学模型 适用条件 参数估计与假设检验 SPSSAU Poisson 回归案例 1、背景 2、理论 3、操作 4、SPSSAU输出结果 5、文字分析 6、剖析 疑难解…

OpenSSL SSL_connect: Connection was reset in connection to github.com:443

OpenSSL SSL_connect: Connection was reset in connection to github.com:443 目录 OpenSSL SSL_connect: Connection was reset in connection to github.com:443 【常见模块错误】 【解决方案】 欢迎来到英杰社区https://bbs.csdn.net/topics/617804998 欢迎来到我的主页&…

kuberneter管理GUI工具Lens

从github上可以知道,lens的前端是用electron做的客户端工具,打开安装路径你会发现kubectl.exe,没错,就是你经常用的kubectl命令行的客户端工具。kubectl本来就能输出json的数据类型,集成前端更方便了。看到这里你是不是发现&#…

前端Long类型精度丢失:后端处理策略

文章目录 精度丢失的具体原因解决方法1. 使用 JsonSerialize 和 ToStringSerializer2. 使用 JsonFormat 注解3. 全局配置解决方案 结论 开发商城管理系统的品牌管理界面时,发现一个问题,接口返回品牌Id和页面展示的品牌Id不一致,如接口返回的…

Transformer--输入部分

🏷️上文我们简单介绍了Transformer模型的总体架构,本章我们主要介绍其输入部分 📖前言 📖文本嵌入层的作用 📖位置编码器的作用 📖前言 输入部分主要包括源文本嵌入层以及位置编码器,目标文本…

HX1838红外接收模块-红外遥控(外部中断+状态机)

目录 红外遥控 模块介绍 HX1838红外接收二极管 红外发射遥控器 遥控器键码 模块接线 NEC协议编码 状态机分析 驱动代码 IR.h IR.c main.c 红外遥控 红外遥控是利用红外光进行通信的设备,由红外LED将调制后的信号发出,由专用的红外接收头进行…

Unity + Hybridclr + Addressable + 微信小程序 热更新报错

报错时机: Generate All 怎么All 死活就是报错 生成微信小程序,并启动后 报错内容: MissingMethodException:AoT generic method notinstantiated in aot.assembly:Unity.ResourceManager:dll, 原因: Hybridclr 开发文档 解…

使用 Baklib 构建多语言知识库

技术的快速发展使企业能够更轻松地走向全球。在国外市场寻找新机会的关键方面之一是了解并与新客户群互动。此外,如果您在特定人群中拥有任何现有客户,那么让您的企业网站和支持也以他们可以参与的语言提供是合乎逻辑的。 假设您的公司以英语为主导市场…

SSL/TLS和SSL VPN

1、SSL/TLS SSL安全套接字层:是一种加密协议,用于在网络通信中建立安全连接。它在应用层和传输层(TCP/IP)之间提供数据加密、服务器身份验证以及信息完整性验证 SSL只保护TCP流量,不保护UDP协议 TLS:传输层…

GORM:优雅的Go语言ORM库

文章目录 引言GORM原理基础使用安装GORM定义模型连接数据库CRUD操作 高级使用关联事务回调 优点结论 引言 在Go语言开发中,数据库操作是不可或缺的一部分。虽然直接使用SQL语句可以灵活地与数据库交互,但随着项目规模的扩大,SQL语句的编写、…

成为git砖家(4): git status 命令简介

1. untracked 和 tracked 状态 Remember that each file in your working directory can be in one of two states: tracked or untracked. Tracked files are files that were in the last snapshot, as well as any newly staged files; they can be unmodified, modified, o…

华为od-开发-终端云面试总结

华为OD - 终端云 资面 主要问一些在校经历,做过那些项目,大学期间觉得做过的最有价值的事情,大学期间令你感到最有成就感的事情,期望薪资。 技术一面(1h20min)7.18 1、项目里使用到RPC去取代HTTP&#x…

Nginx周末部署

背景 Nginx是本人学习的一类中间件,上次完成了vue的搭建,所以顺便把项目加入Nginx吧 1. 镜像拉取与测试 查询dockerHub,选择最新最稳定的版本 docker pull nginx:stable-perl 执行下载 docker run -d --name mynginx -p 8080:80 -v D:\IM…

基于bert的自动对对联系统

目录 概述 演示效果 核心逻辑 使用方式 1.裁剪数据集 根据自己的需要选择 2.用couplet数据集训练模型 模型存储在model文件夹中 3.将模型转换为ONNX格式 4.打开index.html就可以在前端使用此自动对对联系统了。 本文所涉及所有资源均在传知代码平台可获取。 概述 这个生成器利用…

【Python检查两个列表是不是有重复项有关案例】

以下是一些具体的例子,展示了如何使用不同的方法来检查两个列表是否有重复项: 例子1:使用集合 list1 [1, 2, 3, 4, 5] list2 [4, 5, 6, 7, 8]# 转换为集合并求交集 duplicates list(set(list1) & set(list2))if duplicates:print(&q…

面完英伟达算法岗,心态崩了。。。

最近这一两周看到不少互联网公司都已经开始秋招提前批了。不同以往的是,当前职场环境已不再是那个双向奔赴时代了。求职者在变多,HC 在变少,岗位要求还更高了。 最近,我们又陆续整理了很多大厂的面试题,帮助一些球友解…