使用 Kafka 保证消息不丢失的策略及原理解析


✨✨祝屏幕前的小伙伴们每天都有好运相伴左右,一定要天天开心!✨✨ 
🎈🎈作者主页: 喔的嘛呀🎈🎈

目录

一、引言

二. 持久化存储

2.1持久化存储原理:

2.2使用示例:

1. 安装 Kafka:

2. 生产者代码:

3. 消费者代码:

三. 消息确认机制

3.1消息确认机制原理:

3.2使用示例:

1. 生产者代码:

2. 消费者代码:

四. 事务机制

4.1事务机制原理:

4.2使用示例:

1. 生产者代码:

2. 消费者代码:

五. 数据备份与复制

5.1数据备份与复制原理

5.2使用示例:

1. Kafka Broker配置:

2. 生产者代码

3. 消费者代码

六. 消息过期机制

总结



一、引言

消息队列(Message Queue)是一种用于在不同组件、服务或系统之间传递消息的通信方式。在分布式系统中,消息队列起到了缓冲和解耦的作用,但在使用过程中,如何保证消息不丢失是一个重要的问题。下面详细探讨一下消息队列如何保证消息不丢失的方法。Apache Kafka是一个分布式消息系统,设计和实现了一套机制来保证消息队列中的消息不丢失。以下是一些关键的配置和实践方法。

二. 持久化存储

为了防止消息在队列中丢失,消息队列系统通常会提供持久化存储的机制。这意味着一旦消息被接收,它会被存储在持久化存储中,即使系统崩溃或重启,消息仍然可以被恢复。这种机制通常使用文件系统或数据库来实现。

在Java中使用消息队列的持久化存储,我们以Apache Kafka为例进行演示。Kafka是一个分布式的、可持久化的消息队列系统,适用于大规模的数据流处理。

2.1持久化存储原理:

Kafka通过将消息写入磁盘上的日志文件(日志段)来实现持久化存储。每个消息都会被追加到日志文件的末尾,确保消息在写入后不会被修改,从而保证了消息的持久性。

2.2使用示例:

1. 安装 Kafka:

首先,确保你已经安装并启动了 Kafka。你可以从 Kafka官方网站 下载并按照官方文档进行安装和启动。

2. 生产者代码:

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息,将消息设置为持久化ProducerRecord<String, String> record = new ProducerRecord<>("example_topic", "Hello, Kafka!");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("Message sent successfully. Offset: " + metadata.offset());} else {exception.printStackTrace();}}});producer.close();}
}

3. 消费者代码:

import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "example_group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("example_topic"));// 拉取消息,将消息设置为持久化while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());}}}
}

在上述代码中,通过将生产者和消费者配置中的acks属性设置为all(默认值),Kafka会等待消息被所有同步副本接收确认后再继续发送。这确保了消息在发送和接收时都会被持久化存储。

请注意,Kafka的配置和使用可能因版本而异,确保查阅相应版本的文档以获取准确的配置信息。

三. 消息确认机制

消息队列系统通常支持消息确认机制,确保消息在被消费者成功处理后才被标记为已处理。消费者在成功处理消息后发送确认给消息队列,然后消息队列才会将该消息从队列中移除。如果消费者处理失败,消息队列可以将消息重新投递给队列或者按照配置进行其他处理。

消息确认机制是确保消息在被消费者成功处理后才被标记为已处理的关键机制。在这里,我们将使用Apache Kafka作为示例进行演示,展示消息确认机制的实现。

3.1消息确认机制原理:

在Kafka中,消息确认机制主要通过Producer的acks参数和Consumer的手动确认来实现。acks参数表示生产者要求服务器确认消息的级别,而手动确认则是消费者在成功处理消息后通过调用特定的API来通知服务器。

3.2使用示例:

1. 生产者代码:

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("acks", "all");  // 设置为all表示等待所有副本确认// 创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息,等待确认ProducerRecord<String, String> record = new ProducerRecord<>("example_topic", "Hello, Kafka!");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("Message sent successfully. Offset: " + metadata.offset());} else {exception.printStackTrace();}}});producer.close();}
}

2. 消费者代码:

import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "example_group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("example_topic"));// 拉取消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());// 手动确认消息consumer.commitSync();}}}
}

在上述代码中,生产者的acks属性设置为all,表示等待所有副本确认。而消费者在处理完消息后,通过调用consumer.commitSync()手动确认消息。这确保了消息在被成功处理后才被标记为已处理。

请注意,Kafka的确认机制可能因版本而异,确保查阅相应版本的文档以获取准确的配置信息。

四. 事务机制

一些消息队列系统支持事务机制,允许生产者发送一组消息,并且只有在这组消息都成功写入队列后才被提交。如果有任何一个消息写入失败,整个事务会被回滚,从而确保消息的一致性。

事务机制是确保消息队列中一组消息要么全部成功处理,要么全部回滚的重要机制。在这里,我们以Apache Kafka为例进行演示,展示事务机制的实现。

4.1事务机制原理:

Kafka的事务机制主要涉及Producer API的事务支持。生产者可以在一组消息的发送过程中开启事务,然后要么全部提交(所有消息发送成功),要么全部回滚(任何一个消息发送失败)。

4.2使用示例:

1. 生产者代码:

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaTransactionalProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("acks", "all");  // 设置为all表示等待所有副本确认props.put("enable.idempotence", "true");  // 开启幂等性props.put("transactional.id", "my-transactional-id");  // 设置事务ID// 创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 开启事务producer.initTransactions();try {producer.beginTransaction();// 发送消息ProducerRecord<String, String> record1 = new ProducerRecord<>("example_topic", "Message 1");ProducerRecord<String, String> record2 = new ProducerRecord<>("example_topic", "Message 2");producer.send(record1);producer.send(record2);// 提交事务producer.commitTransaction();} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {// 处理异常,中止事务producer.close();} catch (KafkaException e) {// 处理其他Kafka异常,回滚事务producer.abortTransaction();}producer.close();}
}

在上述代码中,通过设置enable.idempotencetrue和配置transactional.id为唯一的事务ID,生产者开启了事务。然后,通过beginTransactioncommitTransactionabortTransaction来控制事务的提交和回滚。

请注意,生产者中使用了enable.idempotence开启幂等性,这对于确保消息不会被重复发送也是非常重要的。同时,确保事务ID是唯一的,以避免与其他事务冲突。

2. 消费者代码:

消费者的代码相对简单,与普通的消费者代码基本相同。消费者不直接参与生产者的事务,而是通过消费消息来处理相关业务逻辑。

import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "example_group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("example_topic"));// 拉取消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());}}}
}

在实际应用中,消费者的业务逻辑可能会与生产者的事务有关,例如在接收到特定消息时触发某些操作。在这种情况下,需要谨慎处理事务间的协调。

五. 数据备份与复制

数据备份与复制是确保消息队列系统可靠性和容错性的关键机制之一。在这里,我们以Apache Kafka为例进行演示,展示数据备份与复制的实现。

5.1数据备份与复制原理

Kafka通过数据备份与复制来防止因节点故障或灾难性事件导致的数据丢失。每个分区的数据会被复制到多个副本,这些副本分布在不同的节点上。这样即使一个节点发生故障,仍然可以从其他节点的副本中恢复数据。

5.2使用示例:

1. Kafka Broker配置:

在Kafka的server.properties配置文件中,可以配置副本的数量和复制策略。

# server.properties# 设置每个分区的副本数量
default.replication.factor=3# 设置副本的分布策略,可以选择不同的策略
# 可选值为: "rack-aware", "broker-aware", "0-1" (default)
# 具体策略的选择根据实际需求和环境
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector

2. 生产者代码

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息ProducerRecord<String, String> record = new ProducerRecord<>("example_topic", "Hello, Kafka!");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("Message sent successfully. Offset: " + metadata.offset());} else {exception.printStackTrace();}}});producer.close();}
}

3. 消费者代码

import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "example_group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("example_topic"));// 拉取消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());}}}
}

在上述代码中,通过设置default.replication.factor来指定每个分区的副本数量,这里设置为3。副本的分布策略由replica.selector.class指定,这里选择了RackAwareReplicaSelector,可根据实际需求选择其他策略。

请注意,这里的代码示例主要是演示Kafka的配置和使用,实际上,Kafka会自动处理数据的备份和复制,你无需手动编写代码来执行这些操作。

六. 消息过期机制

消息过期机制是一种保证消息不会永远存在于消息队列中的重要机制。在消息队列系统中,可以设置消息的过期时间,一旦消息过期,系统会自动将其删除或标记为无效。消息过期机制有助于确保系统中的消息不会占用过多的资源并且能够及时清理不再需要的消息。

在Apache Kafka中,消息的过期机制并不是直接支持的特性,而是通过消费者在处理消息时判断消息的时间戳或其他属性来实现的。以下是一个简单的示例,展示了如何在消费者端处理消息的过期逻辑。

import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerWithExpirationExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "example_group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("example_topic"));// 拉取消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 判断消息是否过期(假设消息中包含时间戳字段)long timestamp = Long.parseLong(record.value());long currentTimestamp = System.currentTimeMillis();// 设置消息过期时间为10分钟long expirationTime = 10 * 60 * 1000;if (currentTimestamp - timestamp < expirationTime) {// 处理消息System.out.printf("Received message: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());} else {// 消息过期,可以进行相应的处理,例如记录日志或丢弃消息System.out.printf("Expired message: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());}}}}
}

在上述代码中,假设消息中包含一个时间戳字段,消费者在处理消息时通过比较时间戳判断消息是否过期。如果消息过期,可以根据实际需求进行相应的处理,例如记录日志或丢弃消息。

请注意,这只是一个简单的示例,实际上,消息的过期机制可能需要根据具体的业务逻辑和消息队列系统的特性进行更复杂的处理。

总结

综上所述,消息队列通过持久化存储、消息确认机制、事务机制、数据备份与复制以及消息过期机制等手段,保证了消息在传递过程中不丢失。在设计分布式系统时,合理选择并配置这些机制可以有效地提高消息队列的可靠性和稳定性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/794205.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据结构:详解【树和二叉树】

1. 树的概念及结构&#xff08;了解&#xff09; 1.1 树的概念 树是一种非线性的数据结构&#xff0c;它是由n&#xff08;n>0&#xff09;个有限结点组成一个具有层次关系的集合。把它叫做树是因为它看起来像一棵倒挂的树&#xff0c;也就是说它是根朝上&#xff0c;而叶朝…

Qt --- 常用控件

目录 1. 前言 2. QWidget核心属性 2.1. enabled(控件是否可用) 2.2. geometry(尺寸) 2.2.1. windowframe的影响 2.3. windowTitle(窗口标题) 2.4. windowIcon(窗口图标) 2.5. 使用qrc文件资源管理 2.6. windowOpacity(半透明效果) 2.7. cursor(设置鼠标光标) 2.8. fo…

DAY15|二叉树的层序遍历,226.翻转二叉树,101.对称二叉树

文章目录 二叉树的层序遍历226.翻转二叉树&#xff08;层序遍历的衍生题&#xff09;101.对称二叉树 二叉树的层序遍历 文字讲解&#xff1a;二叉树的层序遍历 视频讲解&#xff1a;二叉树的层序遍历 状态&#xff1a;看了文字讲解后理解了&#xff0c;熟悉队列来遍历每层子节点…

哈佛大学商业评论 --- 第四篇:一家公司的AR经验

AR将全面融入公司发展战略&#xff01; AR将成为人类和机器之间的新接口&#xff01; AR将成为人类的关键技术之一&#xff01; 请将此文转发给您的老板&#xff01; --- 专题作者&#xff1a;Michael E.Porter和James E.Heppelmann 虽然物理世界是三维的&#xff0c;但大多…

LIN总线基础

文章目录 1 什么是LIN 总线&#xff1f;1.1 LIN总线的历史 2.LIN总线的特点2.1 LIN总线的电气特性 3. 应用4 LIN总线基本概念4.1 LIN报文帧结构4.1.1 主节点与从节点4.1.2 调度表4.1.3网络管理4.1.4 帧头结构4.1.4.1 电平4.1.4.2 同步间隔段&#xff08;间隔场&#xff09;4.1.…

算法学习18:动态规划

算法学习18&#xff1a;动态规划 文章目录 算法学习18&#xff1a;动态规划前言一、线性DP1.数字三角形&#xff1a;f[i][j] max(f[i - 1][j - 1] a[i][j], f[i - 1][j] a[i][j]);2.1最长上升子序列&#xff1a;f[i] max(f[i], f[j] 1);2.2 打印出最长子序列3.最长公共子序…

免版权素材库:在营销和宣传中的重要性与应用

title: 免版权素材库&#xff1a;在营销和宣传中的重要性与应用 date: 2024/4/5 18:21:43 updated: 2024/4/5 18:21:43 tags: 免版权素材库营销宣传高质量素材节省成本避免侵权创意启发数字营销 免版权素材库在宣传和营销中的重要性不言而喻。在当今数字化时代&#xff0c;图片…

基于Python的微博舆论分析,微博评论情感分析可视化系统,附源码

博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…

刷题之Leetcode34题(超级详细)

34. 在排序数组中查找元素的第一个和最后一个位置 力扣链接(opens new window)https://leetcode.cn/problems/find-first-and-last-position-of-element-in-sorted-array/ 给定一个按照升序排列的整数数组 nums&#xff0c;和一个目标值 target。找出给定目标值在数组中的开始…

c# wpf template ItemsPanel 简单试验

1.概要 2.代码 <Window x:Class"WpfApp2.Window9"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://schemas.microsoft.com/winfx/2006/xaml"xmlns:d"http://schemas.microsoft.com/expression/blend/…

【御控物联】JavaScript JSON结构转换(14):对象To数组——规则属性重组

文章目录 一、JSON结构转换是什么&#xff1f;二、术语解释三、案例之《JSON对象 To JSON数组》四、代码实现五、在线转换工具六、技术资料 一、JSON结构转换是什么&#xff1f; JSON结构转换指的是将一个JSON对象或JSON数组按照一定规则进行重组、筛选、映射或转换&#xff0…

中医肝胆笔记

目录 肝胆的经络足厥阴肝经足少阳胆经 疏肝健脾的药舒肝益脾颗粒&#xff1a;逍遥丸&#xff1a;疏肝颗粒 -> 疏肝理气的力度大-> 肝郁的程度深&#xff0c;逍遥丸没用的是时候用这个加味逍遥丸 -> 清热的力度最大->适用 肝郁火大&#xff0c;舌苔黄丹栀逍遥丸->…

LangChain Demo | 如何调用stackoverflow并结合ReAct回答代码相关问题

背景 楼主决定提升与LLM交互的质量&#xff0c;之前是直接prompt->answer的范式&#xff0c;现在我希望能用上ReAct策略和能够检索StackOverflow&#xff0c;让同一款LLM发挥出更大的作用。 难点 1. 怎样调用StackOverflow step1 pip install stackspi step 2 from la…

基于单片机的有害气体检查系统设计

**单片机设计介绍&#xff0c;基于单片机的有害气体检查系统设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机的有害气体检查系统设计旨在实现对环境中各种有害气体的实时监测与报警&#xff0c;保障人员健康和环境…

如何使用NumPy处理数组翻转与变形

NumPy是Python中一个强大的库&#xff0c;主要用于处理大型多维数组和矩阵的数学运算。处理数组翻转与变形是NumPy的常用功能。 1.对多维数组翻转 n np.random.randint(0,100,size(5,6))n# 执行结果array([[ 9, 48, 20, 85, 19, 93], [ 1, 63, 20, 25, 19, 44], …

用 Wireshark 解码 H.264

H264&#xff0c;你不知道的小技巧-腾讯云开发者社区-腾讯云 这篇文章写的非常好 这里仅做几点补充 init.lua内容&#xff1a; -- Set enable_lua to false to disable Lua support. enable_lua trueif not enable_lua thenreturn end-- If false and Wireshark was start…

OpenCV入门例程:裁剪图片、模糊检测、黑屏检测

初级代码游戏的专栏介绍与文章目录-CSDN博客 我的github&#xff1a;codetoys&#xff0c;所有代码都将会位于ctfc库中。已经放入库中我会指出在库中的位置。 这些代码大部分以Linux为目标但部分代码是纯C的&#xff0c;可以在任何平台上使用。 本例程运行环境为CentOS7&…

JS详解-函数柯里化

简介&#xff1a; 柯里化&#xff08;Currying&#xff09;是一种关于函数的高阶技术。柯里化是一种函数的转换&#xff0c;它是指将一个函数从可调用的 f(a, b, c) 转换为可调用的 f(a)(b)(c)。柯里化不会调用函数。它只是对函数进行转换。 举个例子&#xff1a; 已最简单的…

图像处理入门 3(how to get the pixel pitch / 如何获得单个像素的尺寸)

在这里一节里面&#xff0c;将记录如何获得一个相机传感器中单个像素点的尺寸&#xff0c;为了实现不同相机照片之间的匹配。 如果我们知道了相机传感器的尺寸和分辨率的大小&#xff0c;自然就可以求出单个像素的大小。 在这里插入图片描述&#xff1a; 如何获得相机传感器的…