Redis学习(十)|使用消息队列的重试机制实现 MySQL 和 Redis 的数据一致性

文章目录

  • 介绍
  • 原理
  • 整体方案
  • 实现步骤
  • 示例代码
  • 总结
  • 其他:Kafka 重试策略配置
    • 1. 生产者重试策略配置
    • 2. 消费者重试策略配置

介绍

在分布式系统中,保持 MySQL 和 Redis 之间的数据一致性是至关重要的。为了确保数据的一致性,我们通常采取先更新数据库,再删除缓存的方案。然而,在实际应用中,由于网络问题、服务故障等原因,可能会导致数据库更新成功而缓存删除失败,进而导致数据不一致。为了解决这个问题,我们可以引入消息队列的重试机制,以确保缓存删除成功。

原理

重试机制是一种容错机制,用于在消息发送失败或者处理失败时进行重试。通过将数据更新操作封装成消息,并发送到消息队列中,在消费者处理消息时进行重试,可以提高系统的可靠性和稳定性。我们将使用消息队列的重试机制来实现 MySQL 和 Redis 的数据一致性。

整体方案

整体方案如下:

  1. 应用程序首先将数据更新操作发送到消息队列中。
  2. 消费者从消息队列中获取消息,并根据消息中的数据删除 Redis 中的缓存数据。
  3. 如果应用删除缓存失败,可以从消息队列中重新读取数据,然后再次删除缓存,这个就是重试机制。当然,如果重试超过的一定次数,还是没有成功,就需要向业务层发送报错信息了。
  4. 如果删除缓存成功,就要把数据从消息队列中移除,避免重复操作,否则就继续重试。

实现步骤

以下是使用消息队列的重试机制实现 MySQL 和 Redis 数据一致性的基本步骤:

  1. 将数据更新操作封装成消息:在应用程序中,将数据更新操作封装成消息,并发送到消息队列中。消息中应包含数据更新操作的类型(如插入、更新或删除)以及相关的数据。
  2. 消费者消息处理和失败重试:消费者从消息队列中获取消息,并根据消息中的数据更新操作来删除 Redis 中的缓存数据。消息消费失败后自动进行重试。可以根据重试次数和重试间隔来配置重试机制,例如指数退避策略。
  3. 消息确认机制:消费者在成功处理消息后,需要发送确认消息给消息队列,告知消息队列可以删除或标记消息为已处理。这样可以确保消息在成功处理后不会被重新处理,避免重复处理的情况。

示例代码

以下是一个简单的示例代码,演示了如何使用 Java 实现消息队列的重试机制,以确保 MySQL 和 Redis 的数据一致性:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class MySQLToKafka {private static final String TOPIC_NAME = "mysql_updates";private static final String BOOTSTRAP_SERVERS = "localhost:9092";private static final String GROUP_ID = "cache-deletion-group";public static void main(String[] args) {// 模拟 MySQL 更新后发送消息到 KafkasendMySQLUpdateToKafka("data_update");// 模拟从 Kafka 拉取消息删除 Redis 缓存pullMessageFromKafkaAndDeleteCache();}// 发送 MySQL 更新消息到 Kafkaprivate static void sendMySQLUpdateToKafka(String message) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());Producer<String, String> producer = new KafkaProducer<>(props);ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);try {producer.send(record).get();System.out.println("Message sent to Kafka successfully: " + message);} catch (Exception e) {e.printStackTrace();} finally {producer.close();}}// 从 Kafka 拉取消息并删除 Redis 缓存private static void pullMessageFromKafkaAndDeleteCache() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList(TOPIC_NAME));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());if (deleteCacheFromRedis(record.value())) {System.out.println("Cache deleted from Redis successfully.");// 处理消息成功后手动提交偏移量consumer.commitAsync();} else {System.out.println("Cache deletion from Redis failed. Kafka will retry.");}}}} catch (Exception e) {e.printStackTrace();} finally {consumer.close();}}// 模拟删除 Redis 缓存private static boolean deleteCacheFromRedis(String message) {// 这里省略删除 Redis 缓存的逻辑,直接模拟删除成功和失败// 模拟删除成功的概率为 0.7if (Math.random() < 0.7) {return true;}return false;}
}

以上代码完成了以下几个功能:

  1. 发送 MySQL 更新消息到 Kafka: sendMySQLUpdateToKafka 方法模拟了 MySQL 更新后发送消息到 Kafka 的过程。它使用 Kafka 生产者将消息发送到指定的 Kafka 主题中。
  2. 从 Kafka 拉取消息并删除 Redis 缓存: pullMessageFromKafkaAndDeleteCache 方法模拟了从 Kafka 拉取消息并删除 Redis 缓存的过程。它使用 Kafka 消费者订阅指定的 Kafka 主题,并轮询获取消息。对于每条消息,它尝试删除对应的 Redis 缓存。如果删除失败,就会打印一条消息表示失败,然后 Kafka 将自动重试该消息。只有在成功删除缓存后,才会手动提交偏移量。
  3. 模拟删除 Redis 缓存: deleteCacheFromRedis 方法用于模拟删除 Redis 缓存的过程。它返回一个布尔值,表示缓存删除操作的成功或失败。在实际应用中,这个方法应该被替换为真正的删除 Redis 缓存的逻辑。

通过这样的流程,模拟了一个简单的数据更新、消息发送、消息消费、缓存删除的完整流程,并且在处理消息失败时利用 Kafka 的自动重试机制进行了处理。

总结

通过引入消息队列的重试机制,可以有效地实现 MySQL 和 Redis 的数据一致性。使用重试机制,可以确保数据在 MySQL 更新后 Redis 的对应缓存能够成功删除,从而保持数据的一致性。这种方法适用于需要处理大量数据更新和异步消息传输的场景,同时也提高了系统的可靠性和稳定性。

其他:Kafka 重试策略配置

1. 生产者重试策略配置

对于 Kafka 生产者,可以通过配置以下参数来定义重试策略:

  • retries: 设置生产者在发生可重试的异常时重试的最大次数。默认值为 2147483647(即最大整数)。
  • retry.backoff.ms: 设置生产者在重试之间等待的时间。默认值为 100 毫秒。

示例配置:

# 设置最大重试次数为 3 次
retries=3
# 设置重试之间的等待时间为 500 毫秒
retry.backoff.ms=500

2. 消费者重试策略配置

对于 Kafka 消费者,可以通过以下参数来定义重试策略:

  • enable.auto.commit: 指定消费者是否自动提交偏移量。默认为 true,表示自动提交。
  • auto.commit.interval.ms: 如果启用了自动提交偏移量,可以通过该参数设置自动提交的间隔时间。默认值为 5000 毫秒。
  • max.poll.interval.ms: 设置消费者在拉取消息之间的最大时间间隔。如果消费者在此间隔内没有发送心跳,将被认为失败,并且将其分区重新分配给其他消费者。默认值为 300000 毫秒(5 分钟)。
  • max.poll.records: 设置消费者在单次调用 poll 方法中拉取的最大记录数。默认值为 500 条。

示例配置:

# 禁用自动提交偏移量
enable.auto.commit=false
# 设置自动提交偏移量的间隔时间为 1000 毫秒
auto.commit.interval.ms=1000
# 设置拉取消息之间的最大时间间隔为 10 秒
max.poll.interval.ms=10000
# 设置单次 poll 方法拉取的最大记录数为 100 条
max.poll.records=100

通过以上配置,可以定制 Kafka 生产者和消费者的重试策略,以适应不同的业务需求和性能要求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/7964.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

去中心化金融与Web3:科技驱动的金融革命

随着区块链技术的发展和普及&#xff0c;去中心化金融&#xff08;DeFi&#xff09;作为其重要应用之一&#xff0c;正在成为金融领域的一场革命。结合Web3技术&#xff0c;去中心化金融正在以前所未有的方式重新定义着金融服务和产品的交付方式&#xff0c;推动着金融领域的创…

bfs之八数码

文章目录 八数码解题思路图解举例算法思路 代码CPP代码Java代码 八数码 在一个 33的网格中&#xff0c;1∼8这 8个数字和一个 x 恰好不重不漏地分布在这 33 的网格中。 例如&#xff1a; 1 2 3 x 4 6 7 5 8在游戏过程中&#xff0c;可以把 x 与其上、下、左、右四个方向之一…

【Oracle】Linux x86-64 安装Oracle 23AI指南

本文为云贝教育 刘峰 原创&#xff0c;请尊重知识产权&#xff0c;转发请注明出处&#xff0c;不接受任何抄袭、演绎和未经注明出处的转载。 前言 在信息技术日新月异的今天&#xff0c;企业级数据库系统扮演着数据管理与业务支撑的核心角色。Oracle数据库&#xff0c;作为全球…

IAP15W4K61S4单片机EEPROM读写程序

/*-------------关闭IAP----------------*/ void IapIdle() { IAP_CONTR 0; //关闭IAP功能 IAP_CMD 0; //清除命令寄存器 IAP_TRIG 0; …

专业软件测试会议

全国软件测试会议&#xff1a;这是一个系列性的专业会议&#xff0c;由中国的学术机构或专业组织主办&#xff0c;例如中国计算机学会的容错计算专业委员会。此会议自2005年起开始举办&#xff0c;历届会议地点包括北京、昆明和武汉等地。会议内容覆盖软件测试理论、实践、工具…

跟TED演讲学英文:4 pillars of college success in science by Freeman Hrabowski

4 pillars of college success in science Link: https://www.ted.com/talks/freeman_hrabowski_4_pillars_of_college_success_in_science Speaker: Freeman Hrabowski Date: February 2013 文章目录 4 pillars of college success in scienceIntroductionVocabularyTranscr…

uniapp打包的程序在Xcode中运行到模拟器报错的解决方法

uniapp打包的程序在Xcode中运行到模拟器报错的解决方法 问题描述&#xff1a; Building for iOS-simulator, but linking in object file (/Users/hori/Documents/SDK/SDK/Libs/DCUniRecord.framework/DCUniRecord[arm64][3](PGRecord.o)) built for iOS Linker command fail…

Day 25 数据库查询

数据库查询 一&#xff1a;基本查询 1.简介 ​ 单表查询 ​ 简单查询 ​ 通过条件查询 ​ 查询排序 ​ 限制查询记录数 ​ 使用集合函数查询 ​ 分组查询 ​ 使用正则表达式查询 2.案例 创建案例所需表&#xff1a;company.employee5 雇员编号 id int雇…

ISIS的工作原理

1.邻居关系建立 &#xff08;1&#xff09;IS-IS领接关系建立原则 1、通过将以太网接口模拟成点到点接口&#xff0c;可以建立点到点链路邻接关系。 2、当链路两端IS-IS接口的地址不在同一网段时&#xff0c;如果配置接口对接收的Hello报文不作IP地址检查&#xff0c;也可以建…

【AI】Tavily

Tavily是一个为人工智能代理&#xff08;如大型语言模型&#xff0c;LLMs&#xff09;和检索增强生成&#xff08;RAG&#xff09;应用优化的搜索引擎。它旨在提供高效、快速和持久的搜索结果。Tavily Search API 允许人工智能开发人员轻松地将他们的应用程序与实时在线信息集成…

深入理解MySQL的Purge机制

在MySQL中&#xff0c;尤其是在使用InnoDB存储引擎时&#xff0c;Purge机制起着至关重要的作用。它主要负责清理那些因为早期的版本或删除操作而不再需要的数据行版本。本文将详细介绍MySQL中的Purge机制&#xff0c;包括其作用、工作原理、如何配置以及优化步骤。 1. Purge机…

网络基础(1)详解

目录 1.计算机网络背景 2.网络协议 3.网络中的地址管理 1.计算机网络背景 1.1 网络发展 (1)计算机从独立模式到网络互联(多态计算机连接共享数据)再到局域网LAN(通过交换机和路由器连接)接着是广域网WAN 1.2 协议 协议就是双方的一种约定. 为什么要有协议? 因为在数据长距…

LeetCode 面试经典150题 252.会议室

题目&#xff1a;给定一个会议时间安排的数组 intervals &#xff0c;每个会议时间都会包括开始和结束的时间 intervals[i] [starti, endi] &#xff0c;请你判断一个人是否能够参加这里面的全部会议。 思路&#xff1a;因为一个人在同一时刻只能参加一个会议&#xff0c;因此…

Unit5

Unit5 1. main&#xff0c;man 停留 maintain maintenance remain remaining remainder permanent 2. place 地方&#xff1b;放置 place placement plaza palace replace replace A with B replacement irreplaceable birthplace workplace marketplace misplace mis…

一起刷C语言菜鸟教程100题(15-26含解析)

五一过的好快&#xff0c;五天假期说没就没&#xff0c;因为一些事情耽搁到现在&#xff0c;不过还是要继续学习的&#xff0c;之后就照常更新&#xff0c;先说一下&#xff0c;这个100题是菜鸟教程里面的&#xff0c;但是有一些题&#xff0c;我加入了自己的理解&#xff0c;甚…

网络1--通信过程的理解

1.封装与解包 通信的过程就是不断的封装和解包的过程 封装即就是按照“应用”“传输” “网络” “链路” 层&#xff0c;封装给每一层都加上相应的包头&#xff08;每一层都有协议&#xff0c;&#xff09;解包就是接受到的包文被一层层去掉相对应的包头。 任何一层的协议都…

JavaScript解决精度问题-math.js-使用入门

JavaScript精度失真案例 0.1+0.2 结果是:0.300000000000000041-0.9 结果是:0.099999999999999984.10*100 结果是:409.999999999999946.10/0.1 结果是:60.99999999999999大数计算 9007199254740992+1 结果是9007199254740992 JavaScript 浮点数运算结果不对,因浮点数的存储…

ResNet神经网络搭建

一、定义残差结构 BasicBlock 18层、34层网络对应的残差结构 浅层网络主线由两个3x3的卷积层链接&#xff0c;相加后通过relu激活函数输出。还有一个shortcut捷径 参数解释 expansion 1 &#xff1a; 判断对应主分支的残差结构有无变化 downsampleNone &#xff1a; 下…

Minio(官方docker版)容器部署时区问题研究记录

文章目录 感慨&概述补充&#xff1a;MINIO_REGION和容器时间的关系 问题一&#xff1a;minio容器和本地容器时间不一致问题说明原因探究解决方法结果验证 问题二&#xff1a;minio修改时间和本地查询结果不一致具体问题原因探究解决办法时间转化工具类调用测试和验证上传文…

Unit6

Unit6 1. val 强壮 valid invalid validate invalidate prevail prevailing prevalent 2. pri 主要的 prime Prime Minister premier primary primary school prior prior to sth prioritize priority principle principal prince princess 3. nov 新 news newspap…