kafka如何保证消息不丢失 不重复消费 消息的顺序

如何保证消息的不丢失

消息为什么会丢失

在这里插入图片描述
想要保证消息不丢失就要首先知道消息为什么会丢失,在哪个环节会丢失,然后在丢失的环节做处理

1.生产者生产消息发送到broker,broker收到消息后会给生产者发送一个ack指令.生产者接收到broker发送成功的指令,这个时候我们就可以认为消息发送成功了.没有接收到ack指令我们就认为消息发送失败.

 public <T,Throwable> void sendEventByKafka(String topic, String content ,T t, KafkaSendErrorCallback<T, java.lang.Throwable> function) {kafkaTemplate.send(topic, content).addCallback(success -> {log.info("执行kafka消息发送kafka成功!");log.info(content);}, failure -> {log.error("执行kafka消息发送kafka失败!");//失败的消息保存到消息表function.saveMqDb(t,failure);});}

上述的逻辑有个前提条件就是,确定broker确实是接受并保存了消息.需要设置ack的级别
acks=0(不等待确认):
在这种模式下,生产者发送消息后不会等待来自Broker的任何确认。它会立即继续发送下一条消息。
这是最低延迟的选项,但也是最不可靠的,因为生产者无法知道消息是否已经成功到达Broker。
acks=1(Leader确认):
在这种模式下,生产者发送消息后会等待Broker的领导者(Leader)确认。领导者会确认消息已经被接收,但不一定已经被完全复制到所有的副本。
这种模式提供了一定程度的可靠性,因为生产者知道消息至少已经被领导者接收,但仍然可能丢失消息,因为它们可能还没有被复制到其他副本。
acks=all(全部确认):
这是最可靠的确认模式,在这种模式下,生产者发送消息后会等待所有的ISR(In-Sync Replicas,同步副本)确认。ISR是分区的所有副本中与领导者保持同步的副本集合。
在这种模式下,消息只有在被领导者和所有同步副本都确认接收后才被视为已提交。这确保了消息的可靠性。

如何保证不重复消费

2.不重复消费,在处理业务时,用唯一建来处理,如果没有唯一建,可以借助消息表来做,处理完了之后给这条消息打个已处理的标记.

.消费者接受消息处理业务给broker发送ack,broker认为消息消费成功,删除这条消息

    @KafkaListener(id = KafkaConstants.MESSAGE_GROUP, topics = KafkaConstants.MESSAGE_TOPIC, concurrency = "5")public void listen(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {try{//处理业务数据}catch (Exception e){//消费失败后的处理,保存到消息表}finally {//ack确认acknowledgment.acknowledge();}}

如何保证消息的顺序

为什么顺序会乱.kafka在生产者生产消息的时候使我们代码控制的,可以保证顺序,比如付款成功后我先发送一个修改订单状态的消息,再发送一个扣减库存的消息,再发送一个物流通知的消息 一个topic 一个partion 代码写入的顺序就是消息的顺序.如果只有一个消费者监听一个partion也是可以保证顺序的.但是多个消费者监听同一个partion消费者2执行完成 消费者1.3还没有执行.这样顺序就乱了.
一个 topic,一个 partition,一个 consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。
写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。(我们就是这么干的,相同的key的数据在一个队列里面,然后使用多线程开worker按照key不同进行分别消费)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/178056.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot中如何优雅地使用重试

1 缘起 项目中使用了第三方的服务&#xff0c; 第三方服务偶尔会出现不稳定、连接不上的情况&#xff0c; 于是&#xff0c;在调用时为了保证服务的相对高可用&#xff0c;添加了超时连接重试&#xff0c; 当连接第三方服务超时时&#xff0c;多重试几次&#xff0c;比如3次&a…

GPTS-生成一个动漫图像GPT

介绍 GPTs是ChatGPT的定制版本,用户可以通过组合指令、知识和功能来定制用于特定任务或主题的GPT。它们可以根据需要简单或复杂,解决从语言学习到技术支持等各种事情。 创建GPTs Plus和Enterprise用户可以在chat.openai.com/create上开始创建GPTs。 您可以通过在ChatGPT上的…

「有问必答」秒杀系统 Go并发编程实践!

有问必答 摘要 本文将介绍如何使用Go语言的并发原语来构建一个简单的高并发秒杀系统。 我们将使用Go语言的原生库和一些常见的技术手段&#xff0c;包括互斥锁、通道、计数器等&#xff0c;来解决并发访问和数据一致性的问题。 本文只是一个简单的示例&#xff0c;重点是Go语…

MySQL数据库如何应对故障恢复与数据恢复回滚

一个最基本的数据库&#xff0c;应当可以做到以下几点 数据持久化&#xff0c;可以将数据保存到磁盘&#xff0c;服务重启数据依然存在。 可以按照某种关系存储数据&#xff0c;如果你用过IO流&#xff0c;那么你会发现整理数据也是一件复杂的事情。我是该追加写呢还是找到某条…

K 最近邻算法

K 最近邻算法 简单 KNN海伦约会手写数字识别KNN 算法的优缺点 K 最近邻&#xff08;K-NearestNeighbor&#xff0c;KNN&#xff09;算法&#xff0c;是 1967 年由 Cover T 和 Hart P 提出的一种用于分类与回归的方法。 基本原理&#xff1a;存在一个带标签的数据集&#xff08;…

漏电保护器工作原理

漏电保护器 漏电保护器是低压线路中最常用的保护器之一&#xff0c;简称漏保&#xff0c;又称漏电开关或漏电断路器。漏电保护器除了具有空开的所有保护功能外&#xff0c;还具备漏电保护功能。 需要了解 一根通电导线可以产生磁场&#xff0c;磁场与电流方向遵循右手螺旋关…

swingbench造数失败可能原因及解决方法

swingbench造数失败解决方法 1.临时表空间文件内存不足&#xff0c;扩展临时表空间文件内存 alter database tempfile/home/oracle/oradata/orcl/temp01.dbf resize 30G;&#xff08;这里扩展完temp临时表空间后可以造数成功&#xff0c;则不需要扩展soe用户表空间&#xff09…

【MATLAB】VMD分解+FFT+HHT组合算法

有意向获取代码&#xff0c;请转文末观看代码获取方式~也可转原文链接获取~ 1 基本定义 VMD&#xff08;Variational Mode Decomposition&#xff09;是一种信号分解方法&#xff0c;基于HHT&#xff08;Hilbert-Huang Transform&#xff0c;希尔伯特-黄变换&#xff09;。HH…

智能学习台灯_AI摄像头学习机基于MTk8175方案

智能学习台灯是一款专为中小学生设计的学习辅助工具&#xff0c;具有多项突出的参数和功能。首先&#xff0c;它采用了基于联发科MTK平台的解决方案&#xff0c;内置了12纳米四核Cortex-A53处理器&#xff0c;提供了稳定而高效的性能。操作系统方面&#xff0c;智能学习台灯运行…

ATFX汇市:英、日、欧央行行长同日发声,均强调2%通胀目标尚未达成

ATFX动态&#xff1a;11月27日&#xff0c;英国央行行长贝利表示&#xff0c;“我们必须将&#xff08;通胀&#xff09;降到2%&#xff0c;这就是为什么我最近一直在给有关我们正在讨论降息或者我们将在可预见的未来降息的假设泼冷水&#xff0c;因为现在谈这个问题还太早”。…

springcloud nacos配置优先级研究及配置管理最佳实践

目录 背景工具版本SpringCloud配置存放位置及相应优先级代码中nacosjar包外挂 多种配置共同存在时的优先级项目配置管理最佳实践无nacos的情况有nacos的情况 参考文献 背景 公司有很多应用是基于SpringBoot/SpringCloud开发。由于在配置文件中经常会涉及数据库账号密码之类的敏…

Boot工程快速启动【Linux】

Boot工程快速启动【Linux】 在idea中打包cd usr/在local文件夹下mkdir app进入app文件夹把打包好的文件&#xff08;只上传其中的jar&#xff09;上传到app文件下检查linux中的Java版本&#xff0c;保证和项目的Java 版本保持一致运行 java -jar sp补全***.jar想看效果得查询当…

UniApp项目中 使用微信小程序原生语言 进行开发

看效果 wxcomponents 下放的是微信小程序原生代码写的组件。我进行了封装 上干货 在你下uniApp 项目的根目录创建一个 wxcomponents 名字千万不要错 京东、支付宝灯参考下面图片 官方文档也有介绍 然后在你需要引入原生功能的页面里面引入你的组件&#xff08;我这里提前已经放…

初识Java 18-2 泛型

目录 构建复杂模型 类型擦除 C中的泛型 迁移的兼容性 类型擦除存在的问题 边界的行为 对类型擦除的补偿 创建类型实例 泛型数组 本笔记参考自&#xff1a; 《On Java 中文版》 构建复杂模型 泛型的一个优点就是&#xff0c;能够简单且安全地创建复杂模型。 【例子&am…

nginx反向代理解决跨域前端实践

需求实现 本地请求百度的一个搜索接口&#xff0c;用nginx代理解决跨域思路&#xff1a;前端和后端都用nginx代理到同一个地址8080&#xff0c;这样访问接口就不存在跨域限制 本地页面 查询一个百度搜索接口&#xff0c;运行在http://localhost:8035 index.js const path …

elment Loading 加载组件动态变更 text 值bug记录

先上效果图: 倒计时4分钟组件方法 // 倒计时 4分钟getSencond() {this.countDown 4分00秒this.interval setInterval(() > {this.maxTime--;let minutes Math.floor(this.maxTime / 60);let seconds Math.floor(this.maxTime % 60);minutes minutes < 10 ? 0 minu…

台式机加独显引发的故事

弄到一块NVIDIA1660显卡&#xff0c;想要加到台式机&#xff0c;枯树逢春&#xff1b;中间引发不少事情&#xff0c;记录下来共勉 1.台式机插入显卡 1&#xff09;拆开主机后部的接口片 2&#xff09;显卡插入显卡巢&#xff0c;很内存条结构类似&#xff08;长短布局&#xff…

电子学会C/C++编程等级考试2022年06月(二级)真题解析

C/C++等级考试(1~8级)全部真题・点这里 第1题:小白鼠再排队 N只小白鼠(1 < N < 100),每只鼠头上戴着一顶有颜色的帽子。现在称出每只白鼠的重量,要求按照白鼠重量从小到大的顺序输出它们头上帽子的颜色。帽子的颜色用 “red”,“blue”等字符串来表示。不同的小白…

React 之 airbnb - 项目实战

一、开发前言 1. 规范 2. 创建项目 node -v > 18.0.0 npm -v > 8.6.0 create-react-app star-airbnb 3. 项目基本配置 配置jsconfig.json {"compilerOptions": {"target": "es5","module": "esnext","ba…

kafka2.x常用命令:创建topic,查看topic列表、分区、副本详情,删除topic,测试topic发送与消费

原创/朱季谦 接触kafka开发已经两年多&#xff0c;也看过关于kafka的一些书&#xff0c;但一直没有怎么对它做总结&#xff0c;借着最近正好在看《Apache Kafka实战》一书&#xff0c;同时自己又搭建了三台kafka服务器&#xff0c;正好可以做一些总结记录。 本文主要是记录如…