缓存和数据库一致性

前言:

项目的难点是如何保证缓存和数据库的一致性。无论我们是先更新数据库,后更新缓存还是先更新数据库,然后删除缓存,在并发场景之下,仍然会存在数据不一致的情况(也存在删除失败的情况,删除失败可以使用异步重试解决)。有一种解决方法是延迟双删的策略,先删除缓存,再更新数据库,然后休眠一会儿,再删除一次缓存,这样做可以提高提高数据的一致性,但是,延迟的时间是要根据业务需求决定的,需要谨慎设置,同时由于删除了两次缓存,导致性能下降。这个项目中选择的是

先更新数据库,后更新缓存

假设我们采用「先更新数据库,再更新缓存」的方案,并且两步都可以「成功执行」的前提下,如果存在并发,情况会是怎样的呢?

有线程 A 和线程 B 两个线程,需要更新「同一条」数据,会发生这样的场景:

  1. 线程 A 更新数据库(X = 1)

  2. 线程 B 更新数据库(X = 2)

  3. 线程 B 更新缓存(X = 2)

  4. 线程 A 更新缓存(X = 1)

最终 X 的值在缓存中是 1,在数据库中是 2,发生不一致。A 虽然先于 B 发生,但 B 操作数据库和缓存的时间,却要比 A 的时间短,执行时序发生「错乱」,最终这条数据结果是不符合预期的。

先更新数据库,后删除缓存

依旧是 2 个线程并发「读写」数据:

  1. 缓存中 X 不存在(数据库 X = 1)

  2. 线程 A 读取数据库,得到旧值(X = 1)

  3. 线程 B 更新数据库(X = 2)

  4. 线程 B 删除缓存

  5. 线程 A 将旧值写入缓存(X = 1)

最终 X 的值在缓存中是 1(旧值),在数据库中是 2(新值),也发生不一致。

相关代码:

    public R save(UserVO userVO) {User user = new User();BeanUtils.copyProperties(userVO, user);saveUser(user);//删除缓存redisTemplate.delete("userInfo:" + user.getUserName());return R.success("操作成功");}

在查询数据时,先从缓存获取,如果缓存没有,就从数据库查询,并同时存放到缓存上,这样保证了下次访问时数据能直接从缓存获取,减少了数据库压力

    public User getByUserName(String userName) {User user = (User) redisTemplate.opsForValue().get(userName);if (user != null) {return user;}user = this.getOne(Wrappers.<User>lambdaQuery().eq(User::getUserName, userName));redisTemplate.opsForValue().set("userInfo:" + userName, user);return user;}

但是执行redis的删除操作时,比如因为网络问题,或者redis本身服务问题,就会失败,而且多线程并发访问时,也会出现数据不一致的情况。

    //使用注解,直接实现先更新数据库,后删除缓存的操作@CacheEvict(value = "category",allEntries = true)       //删除某个分区下的所有数据

延迟双删:

  1. 首先,删除了 Redis 中的缓存数据,以确保接下来的读取操作会从数据库中读取最新的数据。
  2. 接着,更新数据库中的数据,将数据更新为最新的值。
  3. 在此之后,代码让当前线程休眠一段时间N,这个时间段是为了给数据库操作足够的时间来完成,确保数据已经持久化到数据库中。
  4. 最后,代码再次删除 Redis 中的缓存数据。这里是延迟双删的关键步骤。由于之前已经删除了缓存数据,再次删除的目的是为了防止在休眠的时间内有其他线程读取到旧的数据,加载到缓存中。

休眠时间的控制

  • 延迟时间要大于「主从复制」的延迟时间

  • 延迟时间要大于线程 B 读取数据库 + 写入缓存的时间

方案的选择:

  • 延时双删适用于对数据一致性要求较高的场景。它能够保证在数据库更新期间,读取请求不会读取到已经失效的缓存数据,从而保证数据的一致性。但是它需要进行两次缓存删除操作,可能会增加一定的资源开销;
  • 先更新数据库后删除缓存对性能要求较高的场景。它能够减少一次缓存删除的开销,但是在数据库更新期间,读取请求可能会读取到已经失效的缓存数据,从而导致数据不一致。
     

RedisUtils.del(key);// 先删除缓存    
updateDB(user);// 更新db中的数据    
Thread.sleep(N);// 延迟一段时间,在删除该缓存key    
RedisUtils.del(key);// 先删除缓存

最好的方法是开设一个线程池,在线程中删除key,而不是使用Thread.sleep进行等待,这样会阻塞用户的请求。

代码:

OrderController中新增接口:

/*** 下单接口:先更新数据库,再删缓存* @param sid* @return*/
@RequestMapping("/createOrderWithCacheV2/{sid}")
@ResponseBody
public String createOrderWithCacheV2(@PathVariable int sid) {int count = 0;try {// 完成扣库存下单事务orderService.createPessimisticOrder(sid);// 删除库存缓存stockService.delStockCountCache(sid);} catch (Exception e) {LOGGER.error("购买失败:[{}]", e.getMessage());return "购买失败,库存不足";}LOGGER.info("购买成功,剩余库存为: [{}]", count);return String.format("购买成功,剩余库存为:%d", count);
}

新增线程池接口

// 延时双删线程池
private static ExecutorService cachedThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());/*** 缓存再删除线程*/
private class delCacheByThread implements Runnable {private int sid;public delCacheByThread(int sid) {this.sid = sid;}public void run() {try {LOGGER.info("异步执行缓存再删除,商品id:[{}], 首先休眠:[{}] 毫秒", sid, DELAY_MILLSECONDS);Thread.sleep(DELAY_MILLSECONDS);stockService.delStockCountCache(sid);LOGGER.info("再次删除商品id:[{}] 缓存", sid);} catch (Exception e) {LOGGER.error("delCacheByThread执行出错", e);}}
}

异步重试:

将删除缓存的请求写到消息队列中,如果删除成功,则去除消息;如果删除失败,执行失败策略,重试服务从消息队列中重新读取这些值,然后再次进行删除重试,重试超过的一定次数,向业务层发送报错信息。但是这在一定程度上也会增加代码的耦合度和维护成本。

高内聚,低耦合:耦合指模块与模块之间的关系,依赖程度,尽量减少一个模块过度依赖另一个模块的情况(我们在A元素去调用B元素,当B元素有问题或者不存在的时候,A元素就不能正常的工作,那么就说元素A和元素B耦合)。内聚模块内部的功能职责的相关性,如果元素有高度的相关职责,除了这些职责在没有其他的工作,那么该元素就有高内聚。这样做是为了可读性,复用性,可维护性和易变更性。

代码实现(使用rocketmq):

pom.xml新增rocketmq依赖:

<rocketmq-spring-boot-starter-version>2.0.3</rocketmq-spring-boot-starter-version><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.3</version>
</dependency>
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>${rocketmq-spring-boot-starter-version}</version>
</dependency>

yaml配置

rocketmq:name-server: xxx.xxx.xxx.174:9876;xxx.xxx.xxx.246:9876producer:group: shopDataGroup

创建服务:

@Override
@Transactional
public Result updateShopById(Shop shop) {Long id = shop.getId();if(ObjectUtil.isNull(id)){return Result.fail("====>店铺ID不能为空");}log.info("====》开始更新数据库");//更新数据库updateById(shop);String shopRedisKey = SHOP_CACHE_KEY + id;Message message = new Message(TOPIC_SHOP,"shopRe",shopRedisKey.getBytes());//异步发送MQtry {rocketMQTemplate.getProducer().send(message);} catch (Exception e) {log.info("=========>发送异步消息失败:{}",e.getMessage());}//stringRedisTemplate.delete(SHOP_CACHE_KEY + id);//int i = 1/0;  验证异常流程后,return Result.ok();
}

创建消费者:

package com.hmdp.mq;
/*** @author xbhog* @describe:* @date 2022/12/21*/
@Slf4j
@Component
@RocketMQMessageListener(topic = TOPIC_SHOP,consumerGroup = "shopRe",messageModel = MessageModel.CLUSTERING)
public class RocketMqNessageListener  implements RocketMQListener<MessageExt> {@Resourceprivate StringRedisTemplate stringRedisTemplate;@SneakyThrows@Overridepublic void onMessage(MessageExt message) {log.info("========>异步消费开始");String body = null;body = new String(message.getBody(), "UTF-8");stringRedisTemplate.delete(body);int reconsumeTimes = message.getReconsumeTimes();log.info("======>重试次数{}",reconsumeTimes);if(reconsumeTimes > 3){log.info("消费失败:{}",body);return;}throw new RuntimeException("模拟异常抛出");}}

kafuka的删除重试机制:Kafka 生产者负责将删除缓存的请求发送到指定主题,而 Kafka 消费者则监听该主题,处理删除缓存的逻辑。在处理失败时,通过不提交偏移量来实现消息的重试。

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class CacheDeletionConsumer {private static final String TOPIC = "cache-deletion-topic";private static final String GROUP_ID = "cache-deletion-group";private static final String BOOTSTRAP_SERVERS = "your_bootstrap_servers";public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {consumer.subscribe(Collections.singletonList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {try {// 处理删除缓存的业务逻辑processCacheDeletion(record);// 如果删除成功,手动提交偏移量consumer.commitSync();} catch (Exception e) {// 处理删除缓存失败,不提交偏移量,消息将在下次拉取时重新获取handleCacheDeletionFailure(record, e);}}}}}private static void processCacheDeletion(ConsumerRecord<String, String> record) {// 实际的删除缓存逻辑String cacheKey = record.value().substring("DELETE_CACHE:".length());System.out.println("Deleting cache for key: " + cacheKey);}private static void handleCacheDeletionFailure(ConsumerRecord<String, String> record, Exception e) {// 处理删除缓存失败的逻辑,可以记录日志、进行重试等System.err.println("Error deleting cache for key: " + record.value() + ". Exception: " + e.getMessage());}
}

参考:两难!先更新数据库再删缓存?还是先删缓存再更新数据库?-CSDN博客

https://www.cnblogs.com/xbhog/p/17004151.html

订阅binlog异步删除,使用canal

流程如下图所示:

(1)更新数据库数据

(2)数据库会将操作信息写入binlog日志当中

(3)canal订阅程序提取出所需要的数据以及key

(4)另起一段非业务代码,获得该信息

(5)尝试删除缓存操作,发现删除失败

(6)将这些信息发送至消息队列

(7)重新从消息队列中获得该数据,重试操作

canal基础知识:

Canal组件是一个基于MysQL数据库增量日志解析,提供增量数据订阅和消费,支持将增量数据投递到下游消费者(如Kafka、RocketMQ等)或者存储(如Elasticsearch、HBase等)的组件。

binlog的格式:

canal工作原理

  1. Canal将自己伪装为MysQL slave(从库),向MysQL master(住库)发送dump协议
  2. MysQLmaster(主库)收到dump请求,开始推送binarylog给slave(即canal).
  3. Canal接收并解析Binlog日志,得到变更的数据,执行后续逻辑

修改配置支持binlog:

修改canal.properties配置:

修改mysql文件的instance.properties配置:

代码:

//1.获取canal连接对象
CanalConnector canalConnector=CanalConnectors.newSingleConnector(new InetSocketAddress(   hostname:"localhost",port:11111),destination:"example",username:"",password:"");while(true){//2.获取连接canalConnector.connect(;//3.指定要监控的数据库canalConnector.subscribe( s:"canal-demo.*");//4.获取MessageMessage message =canalConnector.get(100);List<CanalEntry.Entry>entries =message.getEntries();if(entries.size()<=0){System.out.println("没有数据,休息一会");try {Thread.sleep(millis:1000);}catch(InterruptedException e){e.printStackTrace();}}else{for(CanalEntry.Entry entry:entries){//获取表名String tableName =entry.getHeader().getTableName();//Entry类型CanalEntry.EntryType entryType =entry.getEntryType();//判断entryType 是否为ROWDATAif (CanalEntry.EntryType.ROWDATA.equals(entryType)){    //序列化数据ByteString storeValue =entry.getStoreValue();// 反序列化CanalEntry.RowChange rowChange =CanalEntry.RowChange.parseForm(storeValue);//获取事件类型CanalEntry.EventType eventType =rowChange.getEventType();//获取具体的数据List<CanalEntry.RowData>rowDatasList =rowChange.getRowDatasList();//遍历并打卬for (CanalEntry.RowData rowData : rowDatasList){//获取变更前的列List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();Map<String,0bject>bMap=new HashMap<>();for (CanalEntry.CoLumn coLumn : beforeCoLumnsList)//列名和对应的值Map.put(column.getName(O),column.getValue(0);Map<String,0bject> afMap =new HashMap>();//变更后的List<CanalEntry.ColumnafterColumnsList=rowData.getAfterColumnsList();        

问:Canal是什么?有哪些特性?
答:Canal是阿里巴巴开源的一款基于Netty实现的分布式、高性能、可靠的消息队列,在实时数据同步和数据分发场景下有着广泛的应用。Canal具有以下特性:支持MysQL、Oracle等数据库的日志解析和订阅;支持多种数据输出方式,如Kafka、RocketMQ、ActiveMQ等;支持支持数据过滤和格式转换;拥有低延迟和高可靠性等优秀的性能指标。
问:Canal的工作原理是什么?
答:Canal主要通过解析数据库的binlog日志来获取到数据库的增、删、改操作,然后将这些变更事件发送给下游的消费者。Canal核心组件包括Client和Server两部分,Client负责连接数据库,并启动日志解析工作,将解析出来的数据发送给Server;Server则负责接收Client发送的数据,并进行数据过滤和分发。Canal还支持多种数据输出器,如Kafka、RocketMQ、ActiveMQ等,可以将解析出来的数据发送到不同的消息队列中,以便进行进一步的处理和分析。
问:Canal的优缺点是什么?
答:Canal的优点主要包括:高性能、分布式、可靠性好、支持数据过滤和转换、跨数据库类型(如MysQL、Oracle等)等。缺点包括:使用难度较大、对数据库的日志产生一定的影响、不支持数据的回溯(即无法获取历史数据)等。
问:Canal在业务中有哪些应用场景?
答:Canal主要用于实时数据同步和数据分发场景,常见的应用场景包括:数据备份与灾备、增量数据抽取和同步、数据实时分析、在线数据迁移等。特别是在互联网大数据场景下,Canal已经成为了各种数据处理任务的重要工具之一。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/620310.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何高效进行项目文档的编制及管理?

“做完一个项目到底会产出多少份文档&#xff1f;” 今天看到这样一个吐槽贴&#xff1a;小李作为刚入行的项目经理&#xff0c;每天上班期间电话、会议、邮件各种不停歇&#xff0c;晚上还要加班做各种文档&#xff1b;由于经验不足&#xff0c;熬到十一二点还做不完是常态。…

Vue学习笔记五--路由

1、什么是路由 2、VueRouter 2、1VueRouter介绍 2、2使用步骤 2、3路由封装 3、router-link 3.1两个类名 3.2声明式导航传参 4、路由重定向、404 当找不到路由时&#xff0c;跳转配置到404页面 5、路由模式 6、通过代码跳转路由---编程式导航&传参 路由跳转时传参 跳转方式…

Java 并发之《深入理解 JVM》关于 volatile 累加示例的思考

在周志明老师的 《深入理解 JVM》一书中关于 volatile 关键字线程安全性有一个示例代码&#xff08;代码有些许改动&#xff0c;语义一样&#xff09;&#xff1a; public class MyTest3 {private static volatile int race 0;private static void increase() {race;}public …

亚马逊时尚如何运用人工智能帮助您找到合适的尺码

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

Docker五部曲之一:容器术语介绍

文章目录 前言背景基本术语容器镜像容器镜像格式容器引擎容器容器主机注册中心容器编排 进阶术语容器运行时镜像层标签存储库名称空间 参考 前言 本文内容翻译自参考文献。 背景 要理解容器术语&#xff0c;重要的是要精确地理解容器是什么。容器实际上是两个不同的东西。像…

Linux驱动学习—I2C总线

1、应用层实现I2C通信 1.1 I2C简介 I2C是很常见的一种总线协议&#xff0c;I2C是NXP公司设计的&#xff0c;I2C使用两条线在主控制器和从机之间进行数据通信。一条是SCL&#xff08;串行时钟线&#xff09;&#xff0c;另外一条是SDA(串行数据线)&#xff0c;因为I2C这两条数…

Windows Server 2019配置多用户远程桌面登录服务器

正文共&#xff1a;1234 字 21 图&#xff0c;预估阅读时间&#xff1a;2 分钟 很久之前&#xff0c;在介绍显卡直通的时候我们简单介绍过RDP&#xff08;Remote Desktop Protocol&#xff0c;远程桌面协议&#xff09;&#xff08;前人栽树&#xff1a;失败的服务器显卡操作&a…

第 3 场 蓝桥杯小白入门赛 解题报告 | 珂学家 | 单调队列优化的DP + 三指针滑窗

前言 整体评价 T5, T6有点意思&#xff0c;这场小白入门场&#xff0c;好像没真正意义上的签到&#xff0c;整体感觉是这样。 A. 召唤神坤 思路: 前后缀拆解 #include <iostream> #include <algorithm> #include <vector> using namespace std;int main()…

Android平台RTMP推送|轻量级RTSP服务|GB28181设备接入模块之实时快照保存JPG还是PNG?

JPG还是PNG&#xff1f; JPG和PNG是两种常见的图片文件格式&#xff0c;在压缩方式、图像质量、透明效果和可编辑性等方面存在显著差异。 压缩方式&#xff1a;JPG是一种有损压缩格式&#xff0c;通过丢弃图像数据来减小文件大小&#xff0c;因此可能会损失一些图像细节和质量…

【AIGC】IP-Adapter:文本兼容图像提示适配器,用于文本到图像扩散模型

前言 IPAdapter能够通过图像给Stable Diffusion模型以内容提示&#xff0c;让其生成参考该图像画风&#xff0c;可以免去Lora的训练&#xff0c;达到参考画风人物的生成效果。 摘要 通过文本提示词生成的图像&#xff0c;往往需要设置复杂的提示词&#xff0c;通常设计提示词变…

系列十一、Spring Security登录接口兼容JSON格式登录

一、Spring Security登录接口兼容JSON格式登录 1.1、概述 前后端分离中&#xff0c;前端和后端的数据交互通常是JSON格式&#xff0c;而Spring Security的登录接口默认支持的是form-data或者x-www-form-urlencoded的&#xff0c;如下所示&#xff1a; 那么如何让Spring Securi…

Open3D 反算点云缩放系数(21)

Open3D 反算点云缩放系数(21) 一、算法介绍二、算法实现1.方法12.方法2(通用)一、算法介绍 上一章按照指定的系数,对点云进行了等比例缩放,这里输入缩放后的两块点云,反算二者之间的缩放系数。 二、算法实现 已知使用的俩点云是1/2的缩放关系,用于验证计算结果是否…

【数据结构】串,数组,广义表 | 笔记整理 | C/C++实现

文章目录 前言一、串1.1、串的定义1.2、案例引入1.3、串的类型定义和存储结构1.4、串的模式匹配算法1.4.1、BF算法1.4.2、KMP算法 二、数组2.1、数组的定义2.2、数组的抽象数据类型定义2.3、数组的顺序存储2.4、特殊矩阵的压缩存储 三、广义表四、病毒案例 前言 参考视频&…

【C++】wxWidgets库实现窗体程序

一、安装wxWidgets库 在Debian系统上使用wxWidgets库来创建一个基本的窗体程序&#xff0c;首先需要确保已经安装了wxWidgets相关的库和开发工具。下面是安装wxWidgets的步骤&#xff1a; 打开终端&#xff0c;使用下述命令安装wxWidgets库及其开发文件&#xff1a; sudo ap…

MySQL之导入、导出远程备份

一、Navicat工具导入、导出 1.1 导入 第一步&#xff1a; 右键&#xff0c;点击运行SQL文件 第二步&#xff1a; 选择要运行的SQL&#xff0c;点击开始 第三步&#xff1a; 关闭即可 1.2 导出 第一步&#xff1a; 右键选择&#xff0c;导出向导 第二步&#xff1a; 选择SQL脚…

1.3MATLAB变量及其操作

变量 变量是内存单元的一个抽象&#xff0c;在MATLAB中&#xff0c;变量以字母开头&#xff0c;后接数字下划线构成&#xff0c;MATLAB中变量名最多占据 63 个字符。变量区分大小写标准函数及命令一般使用小写字母 赋值语句 变量 表达式(;)表达式(;)总结&#xff1a;加分号&…

C++ 实现游戏(例如MC)键位显示

效果&#xff1a; 是不是有那味儿了&#xff1f; 显示AWSD&#xff0c;空格&#xff0c;Shift和左右键的按键情况以及左右键的CPS。 彩虹色轮廓&#xff0c;黑白填充。具有任务栏图标&#xff0c;可以随时关闭字体是Minecraft AE Pixel&#xff0c;如果你没有装&#xff08;大…

使用numpy处理图片——灰阶影像

大纲 载入图像灰阶处理lightnessaverageluminosity 灰阶&#xff08;Gray scale&#xff09;影像是每个像素只有一个采样颜色的图像。 载入图像 import numpy as np import PIL.Image as Imageimg Image.open(lena.png) data np.array(img)灰阶处理 我们有三种方法来生成这…

Linux中常使用的命令之ls、cd、pwd、mkdir、rmdir

ls: 列出目录 cd&#xff1a;切换目录 pwd&#xff1a;显示目前的目录 mkdir&#xff1a;创建一个新的目录 -m &#xff1a;配置文件的权限-p &#xff1a;帮助你直接将所需要的目录(包含上一级目录)递归创建起来&#xff01; rmdir&#xff1a;删除一个空的目录 注意这…

基于springboot时间管理系统源码和论文

在Internet高速发展的今天&#xff0c;我们生活的各个领域都涉及到计算机的应用&#xff0c;其中包括时间管理系统的网络应用&#xff0c;在外国时间管理系统已经是很普遍的方式&#xff0c;不过国内的管理系统可能还处于起步阶段。时间管理系统具有时间管理功能的选择。时间管…