消息中间件篇之Kafka-消息不丢失

一、 正常工作流程

        生产者发送消息到kafka集群,然后由集群发送到消费者。

        但是可能中途会出现消息的丢失。下面是解决方案。

二、 生产者发送消息到Brocker丢失

1. 设置异步发送

    //同步发送RecordMetadata recordMetadata = kafkaProducer.send(record).get();//异步发送kafkaProducer.send(record,new Callback() {@Override public void onCompletion (RecordMetadata recordMetadata, Exception e){if (e != null) {System.out.println("消息发送失败 | 记录日志");}long offset = recordMetadata.offset();int partition = recordMetadata.partition();String topic = recordMetadata.topic();}});

2.消息重试

//设置重试次数
prop.put(ProducerConfig.RETRIES_CONFIG,10);

三、消息在Brocker中存储丢失

        发送确认机制acks。消息首先Topic是key,到达Topic以后才选择分区Partition(默认就一个分区,0号分区),默认连接的就是分区的Leader节点,由leader分区同步到follower区中。

四、消费者从Brocker接收消息丢失

1.分区机制

        1. Kafka 中的分区机制指的是将每个主题划分成多个分区(Partition)。

        2. topic分区中消息只能由消费者组中的唯一一个消费者处理,不同的分区分配给不同的消费者(同一个消费者组)。

2.消费方式

        消费者默认是自动按期提交已经消费的偏移量,默认是每隔5s提交一次,如果出现重平衡的情况,可能会重复消费或丢失数据

3.那如何解决重复消费

        禁用自动提交偏移量,改为手动: 1. 同步提交。  2. 异步提交。 3. 同步+异步组合提交。

       

五、面试题

面试官:Kafka是如何保证消息不丢失?

候选人:嗯,这个保证机制很多,在发送消息到消费者接收消息,在每个阶段都有可能会丢失消息,所以我们解决的话也是从多个方面考虑:

第一个是生产者发送消息的时候,可以使用异步回调发送,如果消息发送失败,我们可以通过回调获取失败后的消息信息,可以考虑重试或记录日志,后边再做补偿都是可以的。同时在生产者这边还可以设置消息重试,有的时候是由于网络抖动的原因导致发送不成功,就可以使用重试机制来解决。

第二个在broker中消息有可能会丢失,我们可以通过kafka的复制机制来确保消息不丢失,在生产者发送消息的时候,可以设置一个acks,就是确认机制。我们可以设置参数为all,这样的话,当生产者发送消息到了分区之后,不仅仅只在leader分区保存确认,在follwer分区也会保存确认,只有当所有的副本都保存确认以后才算是成功发送了消息,所以,这样设置就很大程度了保证了消息不会在broker丢失。

第三个有可能是在消费者端丢失消息,kafka消费消息都是按照offset进行标记消费的,消费者默认是自动按期提交已经消费的偏移量,默认是每隔5s提交一次,如果出现重平衡的情况,可能会重复消费或丢失数据。我们一般都会禁用掉自动提价偏移量,改为手动提交,当消费成功以后再报告给broker消费的位置,这样就可以避免消息丢失和重复消费了。

面试官:Kafka中消息的重复消费问题如何解决的?

候选人:kafka消费消息都是按照offset进行标记消费的,消费者默认是自动按期提交已经消费的偏移量,默认是每隔5s提交一次,如果出现重平衡的情况,可能会重复消费或丢失数据。我们一般都会禁用掉自动提价偏移量,改为手动提交,当消费成功以后再报告给broker消费的位置,这样就可以避免消息丢失和重复消费了。

为了消息的幂等,我们也可以设置唯一主键来进行区分,或者是加锁,数据库的锁,或者是redis分布式锁,都能解决幂等的问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/705371.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Java程序设计】【C00296】基于Springboot的4S车辆管理系统(有论文)

基于Springboot的4S车辆管理系统(有论文) 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于Springboot的4S店车辆管理系统 本系统分为销售员功能模块、管理员功能模块以及维修员功能模块。 管理员功能模块:管理员登录进入4S…

Langchain 加载网络信息实现RAG以及UnstructuredURLLoader的使用

以下实现了从 wikipedia 加载 Android 的网页然后保存在本地的向量数据库,然后通过上下文发给大模型,让他来总结什么是android 。 from langchain_community.vectorstores import Chroma from langchain_core.prompts import ChatPromptTemplate from l…

少儿编程热潮背后的冷思考、是不是“智商税”?

在科技飞速发展的今天,编程已成为一项基础技能,如同数学和语言一样,被认为是未来社会的重要通行证。随之而来的是少儿编程教育的火爆,各种编程班、在线课程如雨后春笋般涌现,吸引了无数家长的目光。然而,这…

【JS】【Vue3】【React】获取滚轮位置的方法:JavaScript、Vue 3和React示例

目录 使用JavaScript原生方法在Vue 3中获取滚轮位置在React中获取滚轮位置 随着Web应用程序的发展,滚轮位置的获取变得越来越重要,可以用于实现页面的滚动效果、导航条的隐藏和显示等功能。本文将探讨在JavaScript、Vue 3和React中获取滚轮位置的不同方法…

C语言——switch 语句的基本格式是什么?

一、问题 C语⾔中有两个构成选择结构的语句,即构成双分⽀的让if..else 语句和构成多分⽀的 switch..case 语句,switch 语句的基本格式是什么? 二、解答 switch (表达式) { case 常量表达式 1:语句1;break;case 常量表达式 n:语句n;break;de…

嵌入式学习day26 Linux

1.exec函数族 extern char **environ; int execl(const char *path, const char *arg, ... /* (char *) NULL */); int execlp(const char *file, const char *arg, ... /* (char *) NULL */); int execle(const char *…

【深度好文】simhash文本去重流程

对于类似于头条客户端而言,推荐的每一刷的新闻都必须是不同的新闻,这就需要对新闻文本进行排重。传统的去重一般是对文章的url链接进行排重,但是对于抓取的网页来说,各大平台的新闻可能存在重复,对于只通过文章url进行排重是不靠谱的,为了解决这个痛点于是就提出了用simh…

测试环境搭建整套大数据系统(七:集群搭建kafka(2.13)+flink(1.14)+dinky+hudi)

一:搭建kafka。 1. 三台机器执行以下命令。 cd /opt wget wget https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz tar zxvf kafka_2.13-3.6.1.tgz cd kafka_2.13-3.6.1/config vim server.properties修改以下俩内容 1.三台机器分别给予各自的broker_id…

MapGIS农业信息化解决方案(2)

农业资源采集与调查 农业各项生产活动与农业资源息息相关,对农业资源进行调查,摸清农业家底, 为构筑农业“一张图”核心数据库奠定数据基础。MapGIS 农业资源采集与调查系统集成遥感、手持终端等调查技术,为农业资源采集提供实用、简捷的采集调查和信息录入工具,实现农田…

PCB设计十大黄金准则

PCB设计十大黄金准则 控制走线长度控制走线长度,顾名思义,即短线规则,在进行PCB设计时应该控制布线长度尽量短,以免因走线过长引入不必要的干扰,特别是一些重要信号线,如时钟信号走线,务必将其…

GoLevelDB构建数据字典

GoLevelDB 是一个开源的键值存储数据库,可以用于构建数据字典,下面是一些示例代码,展示了如何使用 GoLevelDB 来生成数据字典。 首先,你需要在 Go 中导入 GoLevelDB 包,并创建一个数据库实例。可以使用 leveldb.OpenF…

C# 找出两个Rectangle或是矩形的相互重合与非重合部分?

一、找出两个Rectangle或是矩形的相互重合与非重合部分? 示例代码1,求非重合部分: 使用GraphicsPath获取到非重合的路径,然后使用FillPath填充非重合部分Brush颜色。 using System.Drawing; using System.Drawing.Drawing2D; u…

linux查看socket信息

netstat netstat 是一个用于显示网络相关信息的命令行工具。它可以显示当前系统的网络连接状态、路由表、接口统计信息等。 下面是一些常见的 netstat 命令选项和用法: 显示所有活动的网络连接: netstat -a 显示所有正在监听的端口: ne…

深度学习 精选笔记(4)线性神经网络-交叉熵回归与Softmax 回归

学习参考: 动手学深度学习2.0Deep-Learning-with-TensorFlow-bookpytorchlightning ①如有冒犯、请联系侵删。 ②已写完的笔记文章会不定时一直修订修改(删、改、增),以达到集多方教程的精华于一文的目的。 ③非常推荐上面(学习参考&#x…

现代化数据架构升级:毫末智行自动驾驶如何应对年增20PB的数据规模挑战?

毫末智行是一家致力于自动驾驶的人工智能技术公司,其前身是长城汽车智能驾驶前瞻分部,以零事故、零拥堵、自由出行和高效物流为目标,助力合作伙伴重塑和全面升级整个社会的出行及物流方式。 在自动驾驶领域中,是什么原因让毫末智行…

【设计模式】5种创建型模式详解

创建型模式提供创建对象的机制,能够提升已有代码的灵活性和复用性。 常用的有:单例模式、工厂模式(工厂方法和抽象工厂)、建造者模式。不常用的有:原型模式。一、单例模式 1.1 单例模式介绍 1 ) 定义 单例模式(Singleton Pattern)是 Java 中最简单的设计模式之一,此模…

Jupyterlab 和 JupyternoteBook 修改默认路径

Jupyterlab 和 JupyternoteBook 修改默认路径 在使用 JupyterLab 或 Jupyter Notebook 进行数据分析、机器学习项目时,经常会遇到需要修改默认工作目录的需求。默认情况下,JupyterLab 和 Jupyter Notebook 会在启动时打开你的用户目录(例如&…

Linux 不同架构、不同系统的问题

文章目录 一、麒麟V10(kylin)操作系统中,sudo执行程序后,其环境变量依然为用户家目录。(1)背景(2)原因(3)解决办法 二、统信(UOS)操作…

.norm() 范数

(A- B).norm().item() 是默认计算A与B的第二范数,如果你想计算差向量的第一范数(也称为L1范数),可以在norm()方法中传递p1参数,这样就会计算出L1范数。例如: (A- B).norm(p1).item() 其中,使用…

【CMake】(13)流程控制

条件判断 基本语法 条件判断的基本语法如下: if(<condition>)<commands> elseif(<condition>)<commands> else()<commands> endif()if(<condition>):检查条件是否满足。如果满足,则执行随后的命令直到遇到elseif、else或endif。else…