如何保证消息队列不丢失消息(以kafka为例)

目录

一、引言

二. 持久化存储

2.1持久化存储原理:

2.2使用示例:

1. 安装 Kafka:

2. 生产者代码:

3. 消费者代码:

三. 消息确认机制

3.1消息确认机制原理:

3.2使用示例:

1. 生产者代码:

2. 消费者代码:

四. 事务机制

4.1事务机制原理:

4.2使用示例:

1. 生产者代码:

2. 消费者代码:

五. 数据备份与复制

5.1数据备份与复制原理

5.2使用示例:

1. Kafka Broker配置:

2. 生产者代码

3. 消费者代码

六. 消息过期机制

总结


一、引言

消息队列(Message Queue)是一种用于在不同组件、服务或系统之间传递消息的通信方式。在分布式系统中,消息队列起到了缓冲和解耦的作用,但在使用过程中,如何保证消息不丢失是一个重要的问题。下面详细探讨一下消息队列如何保证消息不丢失的方法。Apache Kafka是一个分布式消息系统,设计和实现了一套机制来保证消息队列中的消息不丢失。以下是一些关键的配置和实践方法。

二. 持久化存储

为了防止消息在队列中丢失,消息队列系统通常会提供持久化存储的机制。这意味着一旦消息被接收,它会被存储在持久化存储中,即使系统崩溃或重启,消息仍然可以被恢复。这种机制通常使用文件系统或数据库来实现。

在Java中使用消息队列的持久化存储,我们以Apache Kafka为例进行演示。Kafka是一个分布式的、可持久化的消息队列系统,适用于大规模的数据流处理。

2.1持久化存储原理:

Kafka通过将消息写入磁盘上的日志文件(日志段)来实现持久化存储。每个消息都会被追加到日志文件的末尾,确保消息在写入后不会被修改,从而保证了消息的持久性。

2.2使用示例:

1. 安装 Kafka:

首先,确保你已经安装并启动了 Kafka。你可以从 Kafka官方网站 下载并按照官方文档进行安装和启动。

2. 生产者代码:

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息,将消息设置为持久化ProducerRecord<String, String> record = new ProducerRecord<>("example_topic", "Hello, Kafka!");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("Message sent successfully. Offset: " + metadata.offset());} else {exception.printStackTrace();}}});producer.close();}
}

3. 消费者代码:

import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "example_group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("example_topic"));// 拉取消息,将消息设置为持久化while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());}}}
}

在上述代码中,通过将生产者和消费者配置中的acks属性设置为all(默认值),Kafka会等待消息被所有同步副本接收确认后再继续发送。这确保了消息在发送和接收时都会被持久化存储。

请注意,Kafka的配置和使用可能因版本而异,确保查阅相应版本的文档以获取准确的配置信息。

三. 消息确认机制

消息队列系统通常支持消息确认机制,确保消息在被消费者成功处理后才被标记为已处理。消费者在成功处理消息后发送确认给消息队列,然后消息队列才会将该消息从队列中移除。如果消费者处理失败,消息队列可以将消息重新投递给队列或者按照配置进行其他处理。

消息确认机制是确保消息在被消费者成功处理后才被标记为已处理的关键机制。在这里,我们将使用Apache Kafka作为示例进行演示,展示消息确认机制的实现。

3.1消息确认机制原理:

在Kafka中,消息确认机制主要通过Producer的acks参数和Consumer的手动确认来实现。acks参数表示生产者要求服务器确认消息的级别,而手动确认则是消费者在成功处理消息后通过调用特定的API来通知服务器。

3.2使用示例:

1. 生产者代码:

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("acks", "all");  // 设置为all表示等待所有副本确认// 创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息,等待确认ProducerRecord<String, String> record = new ProducerRecord<>("example_topic", "Hello, Kafka!");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("Message sent successfully. Offset: " + metadata.offset());} else {exception.printStackTrace();}}});producer.close();}
}

2. 消费者代码:

import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "example_group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("example_topic"));// 拉取消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());// 手动确认消息consumer.commitSync();}}}
}

在上述代码中,生产者的acks属性设置为all,表示等待所有副本确认。而消费者在处理完消息后,通过调用consumer.commitSync()手动确认消息。这确保了消息在被成功处理后才被标记为已处理。

请注意,Kafka的确认机制可能因版本而异,确保查阅相应版本的文档以获取准确的配置信息。

四. 事务机制

一些消息队列系统支持事务机制,允许生产者发送一组消息,并且只有在这组消息都成功写入队列后才被提交。如果有任何一个消息写入失败,整个事务会被回滚,从而确保消息的一致性。

事务机制是确保消息队列中一组消息要么全部成功处理,要么全部回滚的重要机制。在这里,我们以Apache Kafka为例进行演示,展示事务机制的实现。

4.1事务机制原理:

Kafka的事务机制主要涉及Producer API的事务支持。生产者可以在一组消息的发送过程中开启事务,然后要么全部提交(所有消息发送成功),要么全部回滚(任何一个消息发送失败)。

4.2使用示例:

1. 生产者代码:

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaTransactionalProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("acks", "all");  // 设置为all表示等待所有副本确认props.put("enable.idempotence", "true");  // 开启幂等性props.put("transactional.id", "my-transactional-id");  // 设置事务ID// 创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 开启事务producer.initTransactions();try {producer.beginTransaction();// 发送消息ProducerRecord<String, String> record1 = new ProducerRecord<>("example_topic", "Message 1");ProducerRecord<String, String> record2 = new ProducerRecord<>("example_topic", "Message 2");producer.send(record1);producer.send(record2);// 提交事务producer.commitTransaction();} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {// 处理异常,中止事务producer.close();} catch (KafkaException e) {// 处理其他Kafka异常,回滚事务producer.abortTransaction();}producer.close();}
}

在上述代码中,通过设置enable.idempotencetrue和配置transactional.id为唯一的事务ID,生产者开启了事务。然后,通过beginTransactioncommitTransactionabortTransaction来控制事务的提交和回滚。

请注意,生产者中使用了enable.idempotence开启幂等性,这对于确保消息不会被重复发送也是非常重要的。同时,确保事务ID是唯一的,以避免与其他事务冲突。

2. 消费者代码:

消费者的代码相对简单,与普通的消费者代码基本相同。消费者不直接参与生产者的事务,而是通过消费消息来处理相关业务逻辑。

import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "example_group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("example_topic"));// 拉取消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());}}}
}

在实际应用中,消费者的业务逻辑可能会与生产者的事务有关,例如在接收到特定消息时触发某些操作。在这种情况下,需要谨慎处理事务间的协调。

五. 数据备份与复制

数据备份与复制是确保消息队列系统可靠性和容错性的关键机制之一。在这里,我们以Apache Kafka为例进行演示,展示数据备份与复制的实现。

5.1数据备份与复制原理

Kafka通过数据备份与复制来防止因节点故障或灾难性事件导致的数据丢失。每个分区的数据会被复制到多个副本,这些副本分布在不同的节点上。这样即使一个节点发生故障,仍然可以从其他节点的副本中恢复数据。

5.2使用示例:

1. Kafka Broker配置:

在Kafka的server.properties配置文件中,可以配置副本的数量和复制策略。

# server.properties# 设置每个分区的副本数量
default.replication.factor=3# 设置副本的分布策略,可以选择不同的策略
# 可选值为: "rack-aware", "broker-aware", "0-1" (default)
# 具体策略的选择根据实际需求和环境
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector

2. 生产者代码

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息ProducerRecord<String, String> record = new ProducerRecord<>("example_topic", "Hello, Kafka!");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("Message sent successfully. Offset: " + metadata.offset());} else {exception.printStackTrace();}}});producer.close();}
}

3. 消费者代码

import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "example_group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("example_topic"));// 拉取消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());}}}
}

在上述代码中,通过设置default.replication.factor来指定每个分区的副本数量,这里设置为3。副本的分布策略由replica.selector.class指定,这里选择了RackAwareReplicaSelector,可根据实际需求选择其他策略。

请注意,这里的代码示例主要是演示Kafka的配置和使用,实际上,Kafka会自动处理数据的备份和复制,你无需手动编写代码来执行这些操作。

六. 消息过期机制

消息过期机制是一种保证消息不会永远存在于消息队列中的重要机制。在消息队列系统中,可以设置消息的过期时间,一旦消息过期,系统会自动将其删除或标记为无效。消息过期机制有助于确保系统中的消息不会占用过多的资源并且能够及时清理不再需要的消息。

在Apache Kafka中,消息的过期机制并不是直接支持的特性,而是通过消费者在处理消息时判断消息的时间戳或其他属性来实现的。以下是一个简单的示例,展示了如何在消费者端处理消息的过期逻辑。

import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerWithExpirationExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "example_group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("example_topic"));// 拉取消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 判断消息是否过期(假设消息中包含时间戳字段)long timestamp = Long.parseLong(record.value());long currentTimestamp = System.currentTimeMillis();// 设置消息过期时间为10分钟long expirationTime = 10 * 60 * 1000;if (currentTimestamp - timestamp < expirationTime) {// 处理消息System.out.printf("Received message: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());} else {// 消息过期,可以进行相应的处理,例如记录日志或丢弃消息System.out.printf("Expired message: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());}}}}
}

在上述代码中,假设消息中包含一个时间戳字段,消费者在处理消息时通过比较时间戳判断消息是否过期。如果消息过期,可以根据实际需求进行相应的处理,例如记录日志或丢弃消息。

请注意,这只是一个简单的示例,实际上,消息的过期机制可能需要根据具体的业务逻辑和消息队列系统的特性进行更复杂的处理。

总结

综上所述,消息队列通过持久化存储、消息确认机制、事务机制、数据备份与复制以及消息过期机制等手段,保证了消息在传递过程中不丢失。在设计分布式系统时,合理选择并配置这些机制可以有效地提高消息队列的可靠性和稳定性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/647536.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Unity配置表xlsx/xls打包后读取错误问题

前言 代码如下&#xff1a; //文本解析private void ParseText(){//打开文本 读FileStream stream File.Open(Application.streamingAssetsPath excelname, FileMode.Open, FileAccess.Read, FileShare.Read);//读取文件流IExcelDataReader excelRead ExcelReaderFactory…

Win32 MDI 程序学习1

这个是从 Windows程序设计第五版 改来的;我还没完全理解;先初步看一下; #include <windows.h> #include "resource.h"#define INIT_MENU_POS 0 #define HELLO_MENU_POS 2#define IDM_FIRSTCHILD 50000LRESULT CALLBACK FrameWndProc(HWND, UINT, WP…

vue实现甘特图

目录 实现效果 一、安装依赖 二、使用 二、绕过license 实现效果 一、安装依赖 npm i --save vue-gantt-schedule-timeline-calendar 实现甘特图需先安装上述依赖&#xff0c;安装依赖实际上是通过gantt-schedule-timeline-calendar来实现的。所以node_module中因包含以下…

JQuery下载和一些语法

最近学了六种jQuery选择器&#xff0c;我想把学过案例和知识结合起来&#xff0c;给大家分享下&#xff01; 那么既然学jQuery选择器&#xff0c;肯定要先了解下jQuery是什么吧&#xff01;jQuery是一个快速、简洁的JavaScript框架&#xff0c;相当于用jQuery能更加高效的创建…

【快影】怎么制作卡拉OK字幕

您好&#xff0c;您添加了字幕之后可以添加动画&#xff0c;选择卡拉OK&#xff0c;其中 卡拉OK1是支持修改颜色的&#xff0c;卡拉OK2只支持修改文字的底色。

Denoising diffusion implicit models 阅读笔记2

Denoising diffusion probabilistic models (DDPMs)从马尔科夫链中采样生成样本&#xff0c;需要迭代多次&#xff0c;速度较慢。Denoising diffusion implicit models (DDIMs)的提出是为了在复用DDPM训练的网络的前提下&#xff0c;加速采样过程。 加速采样的基本思路是&#…

MyBatis 的一对多查询可以通过在 <resultMap> 标签中使用 <collection> 标签,将查询结果映射成 Java 对象的嵌套集合。

MyBatis 的一对多查询可以通过在 <resultMap> 标签中使用 <collection> 标签&#xff0c;将查询结果映射成 Java 对象的嵌套集合。具体步骤如下&#xff1a; 在 <resultMap> 中使用 <collection> 标签 在 <resultMap> 标签中使用 <collecti…

geemap学习笔记052:影像多项式增强

前言 下面介绍的主要内容是应用Image.polynomial() 对图像进行多项式增强。 1 导入库并显示地图 import ee import geemap ee.Initialize() Map geemap.Map(center[40, -100], zoom4)2 多项式增强 # 使用函数 -0.2 2.4x - 1.2x^2 对 MODIS 影像进行非线性对比度增强。# L…

创建第一个 Spring 项目(IDEA社区版)

文章目录 创建 Spring 项目创建一个普通的 Maven 项目添加 Spring 依赖IDEA更换国内源 运行第一个 Spring 项目新建启动类存储 Bean 对象将Bean注册到Spring 获取并使用 Bean 对象 创建 Spring 项目 创建一个普通的 Maven 项目 首先创建一个普通的 Maven 项目 添加 Spring 依…

微信小程序打卡定位实现方案

1背景 业务场景是考勤打卡,在考勤打卡这个业务场景中有两个关键技术点:定位和人员识别。用户界面初步确定是用微信小程序来实现,本文就定位问题做了技术上的调研。 2调研内容 平台注意事项 获取位置 选择位置 查看位置 距离计算 定位精度 防作弊 Demo 3调研结果 3.1平台注…

全面解析开源大语言模型:BLOOM

大型语言模型 &#xff08;LLM&#xff09; 的兴起一直是自然语言处理 &#xff08;NLP&#xff09; 领域的一个决定性趋势&#xff0c;导致它们在各种应用程序中的广泛采用。然而&#xff0c;这种进步往往是排他性的&#xff0c;大多数由资源丰富的组织开发的 LLM 仍然无法向公…

什么是servlet

什么是servlet 什么是servlet Servlet&#xff08;Server Applet&#xff09;是 Java Servlet 的简称&#xff0c;称为小服务程序或服务连接器&#xff0c;用 Java 编写的服务器端程序&#xff0c;具有独立于平台和协议的特性&#xff0c;主要功能在于交互式地浏览和生成数据…

java数据结构与算法刷题-----LeetCode769. 最多能完成排序的块

java数据结构与算法刷题目录&#xff08;剑指Offer、LeetCode、ACM&#xff09;-----主目录-----持续更新(进不去说明我没写完)&#xff1a;https://blog.csdn.net/grd_java/article/details/123063846 解题思路 这道题可以理解为&#xff0c;只能保证块内有序的情况下&#xf…

大模型学习笔记一:大模型应用开发基础

文章目录 一、大模型一些概念介绍 一、大模型一些概念介绍 1&#xff09;产品和大模型的区别&#xff08;产品通过调用大模型来具备的能力&#xff09; 2&#xff09;AGI定义 概念&#xff1a;一切问题可以用AI解决 3&#xff09;大模型通俗原理 根据上文&#xff0c;猜测下…

WordPress反垃圾评论插件Akismet有什么用?如何使用Akismet插件?

每次我们成功搭建好WordPress网站后&#xff0c;都可以在后台 >> 插件 >> 已安装的插件&#xff0c;在插件列表中可以看到有一个“Akismet反垃圾邮件&#xff1a;垃圾邮件保护”的插件&#xff08;个人觉得是翻译错误&#xff0c;应该是反垃圾评论&#xff09;。具…

vue实现在线Excel表格功能

目录 1.安装x-data-spreadsheet xlsx 2.引入 3.使用 1.安装x-data-spreadsheet xlsx npm i x-data-spreadsheet xlsx2.引入 import zhCN from "x-data-spreadsheet/src/locale/zh-cn"; import Spreadsheet from "x-data-spreadsheet"; import * as X…

【c++】高精度算法(洛谷刷题2024)乒乓球详解

系列文章目录 第一题 乒乓球 视频&#xff1a;http://【洛谷题单 - 算法 - 高精度】https://www.bilibili.com/video/BV1Ym4y1s7BD?vd_source66a11ab493493f42b08b31246a932bbb 目录 系列文章目录 第一题 乒乓球 前言 一、题目以及引领思考 二、题解与代码 1.输入输出案例 …

C语言实现快速排序算法(附带源代码)

快速排序 在区间中随机挑选一个元素作基准&#xff0c;将小于基准的元素放在基准之前&#xff0c;大于基准的元素放在基准之后&#xff0c;再分别对小数区与大数区进行排序。 动态效果过程演示&#xff1a; 快速排序&#xff08;Quick Sort&#xff09;是一种常用的排序算法&…

WIFI电路原理时序检修思路

uart是串口&#xff0c;bt是蓝牙&#xff0c;hsic是高速接口。pcm是音频接口。时序图的第五步是发出就绪信号&#xff0c;然后第六步与门发出就绪信号。 wifi芯片是u8_rf。 特别说明&#xff1a;short表示短接。xw表示实际是看不到物体的&#xff0c;是直接相连的。 找信号50_…

nginx部署https域名ssl证书

1、在你服务器nginx文件夹下创建ssl文件夹存放证书文件和秘钥文件 把.crt和.key证书放上 2、在nginx.conf文件中配置 在nginx.conf文件中server下加入listen 443 ssl; server {listen 443 ssl;charset utf-8;index index.html index.htm index.jsp index.do;add_heade…