文章目录
- 1. 经典面试题
- 2. 双写一致性
- 3. 更新策略
- 4. canal简介
- 5. Redis与Mysql数据双写一致性工程落地案例
1. 经典面试题
- 上面的业务逻辑你用java代码如何实现?
- 你只要用缓存,就可能会涉及到redis缓存与数据库双存储双写,你只要是双写,就一定会有数据一致性问题,那么你如何解决数据一致性问题?
- 双写一致性,你先动redis缓存还是数据库mysql中的哪一个?为什么?
- 延时双删你做过吗?会有哪些问题?
- 有这个一种情况,微服务查询redis无mysql有,为保证数据双写一致性回写redis你需要注意什么?双检加锁策略你了解过吗?如何尽量避免缓存击穿问题?
- redis和mysql双写100%会出纰漏,做不到强一致性,你如何保证最终一致性?
2. 双写一致性
首先,如果redis中有数据,就需要和数据库中的值相同,如果redis中没有数据,数据库中的值要是最新值,且准备回写redis。redis按照操作来分,分为两种,即只读缓存,实际环境中只能读(实际业务中也是读的操作居多),另一种是读写缓存,redis此时不仅可以读数据,还可以写数据(充当redis的门神),而这种缓存,redis与mysql数据同步有两种策略:
- 同步直写策略
写数据库后也同步写redis,缓存和数据库中的数据一致,对于读写缓存来说,要想保证缓存和数据库中的数据一致,就要采用同步直写策略。
- 异步缓写策略
正常业务当中,mysql数据变动了,但是可以在业务上容许出现一定时间后才作用于redis,比如仓库、物流系统等。如果出现异常情况,不得不将失败的动作重新修补,有可能需要借助kafka或者RabbitMQ等消息中间件,实现重试重写
有这个一种情况,微服务查询redis无mysql有,为保证数据双写一致性回写redis你需要注意什么?双检加锁策略你了解过吗?如何尽量避免缓存击穿问题?
考虑一个场景,在QPS很高的一个项目中,有几百万个线程要访问redis,发现redis中没有这几个数据,那么这些线程又要去访问Mysql,Mysql此时在此海量的访问下可能会奔溃,此时,mysql查询完数据后要回写redis,可能会出现redis数据覆盖的问题(例如一个线程A已经回写了a这条数据到redis中,在回写的过程中另一个线程B正在读mysql,B也是读的a这条数据,然后回写又覆盖了A回写到redis的数据)。总结下来,上面的场景揭露了两个核心问题:
- Redis没有数据,海量访问可能会访问mysql,导致mysql奔溃
- 海量线程回写数据到redis,可能出现同一条数据多次回写,出现数据覆盖的问题
出现上面问题的本质原因就是访问mysql数据库和回写数据的redis,这两个操作并不是原子的。所以这时候就出现了双检加锁策略:
多个线程同时区去查询数据库的这条数据,那么我们可以在第一个查询数据的请求上使用一个互斥锁锁住它。其他线程走到这一步拿不到锁就等着,等第一个线程查询到了数据,然后做缓存,后面的线程进来发现已经有缓存了,就直接走缓存。
public String getData(String key) {// 先尝试从 Redis 中获取数据String data = redisClient.get(key);if (data == null) {// 如果 Redis 中没有数据,则加锁并从 MySQL 中获取数据synchronized (lock) {// 再次检查 Redis 中是否已经存在数据,因为在获取锁之前可能其他线程已经更新了缓存data = redisClient.get(key);if (data == null) {try {// 从 MySQL 中获取数据PreparedStatement statement = mysqlConn.prepareStatement("SELECT data FROM table WHERE key = ?");statement.setString(1, key);ResultSet resultSet = statement.executeQuery();if (resultSet.next()) {data = resultSet.getString("data");// 将数据存入 Redis,并设置过期时间redisClient.setex(key, 3600, data);}statement.close();} catch (SQLException e) {e.printStackTrace();}}}}return data;}
在加锁前进行一次检查的目的是避免不必要的加锁操作。如果在加锁之前就发现缓存中已经存在数据,那么就不需要加锁,直接返回缓存中的数据,从而减少了对锁资源的竞争,提高了并发性能。加锁后进行一次检查是为了处理竞态条件。即使在加锁前检查时发现缓存中没有数据,但在获取锁之前可能其他线程已经更新了缓存。因此,在获取锁之后,需要再次检查缓存中是否已经存在数据,以避免重复向缓存中写入数据或者以过期数据为准备数据返回。
3. 更新策略
- 你只要用缓存,就可能会涉及到redis缓存与数据库双存储双写,你只要是双写,就一定会有数据一致性问题,那么你如何解决数据一致性问题?
- 双写一致性,你先动redis缓存还是数据库mysql中的哪一个?为什么?
上面两个的问题的最终目的都是达到最终一致性。给缓存设置过期时间,定义清理缓存并回写,是保证最终一致性的解决方案。我们可以对存入缓存的数据设置过期时间,所有的写操作都以数据库为准,对缓存操作只是尽最大努力即可。也就是说,如果数据库写成功,缓存更新失败,那么只要达过期时间,则后面的请求自然会从数据库中读取到最新值然后回填到缓存,达到一致性,切记,要以Mysql数据库写入库为准
。
如何解决数据一致性问题,如果机器可以停机,那么就可以直接停机维护了,但一般公司的业务是不能随便停止的,所以这里我们主要探讨4种更新策略:(这一部分十分重要)
- 先更新数据库,再更新缓存
- 先更新缓存,再更新数据库
- 先删除缓存,再删除数据库
- 先更新数据库,再删除缓存
- 方案一:先更新数据库后更新缓存
这种方案存在两个问题:
问题一
例如先更新mysql的某商品的库存,当前商品的库存是100,更新为99个。先更新mysql修改为99,然后更新redis。此时假设异常出现(例如redis挂了),更新redis失败了,这导致mysql里面的库存是99,而redis里面还是100。上面问题发生,会导致数据库里面和缓存里面的数据不一致,客户端会读到redis的脏数据。
问题二
A、B两个线程发起调用:
A update mysql 100
A update redis 100
B update mysql 80
B update redis 80
但在多线程环境下,上面代码的执行顺序不是固定的,可能出现很多情况,例如下面情况:
A update redis 100
A update mysql 100
B update redis 80
B update mysql 80
现在就出现了mysql数据为100,而redis的数据为80的情况,出现了数据不一致问题,说到底这个问题就是高并发导致的问题。
- 方案二:先更新缓存再更新数据库
这种方案是不太推荐的,因为业务上通常把Mysql作为底单数据库
,保证最后解释。考虑下面这种情况
A update redis 100
A update mysql 100
B update redis 80
B update mysql 80
同样是多线程缓存可能出现下面情况:
A update redis 100
B update redis 80
B update mysql 80
A update mysql 100
上面现象同样导致了缓存不一致的情况
针对上面这种情况,有一些解决方案可以考虑:
使用乐观锁或版本控制:在更新数据时,使用乐观锁或版本控制来确保更新的原子性。例如,在更新数据库时,可以使用版本号字段或者时间戳来判断数据是否被其他线程修改过,从而避免覆盖其他线程的更新。
引入分布式锁:在更新缓存和数据库时,可以引入分布式锁来保证操作的原子性。这样可以确保在更新缓存和数据库时只有一个线程可以执行,从而避免并发更新导致的问题。
使用消息队列:将更新缓存和更新数据库的操作放入消息队列中按顺序执行,这样可以保证更新的顺序性。例如,先将更新缓存的消息发送到消息队列中,等待更新完成后再将更新数据库的消息发送到消息队列中,这样可以保证先更新缓存再更新数据库。
细化业务逻辑:根据具体业务情况,设计更加细化的业务逻辑来处理缓存和数据库的更新顺序。例如,可以将缓存和数据库的更新操作放在同一个事务中执行,以保证操作的原子性和一致性。
- 方案三:先删除缓存再更新数据库
这种方案同样有异常问题,例如:A线程先删除了redis里面的数据,然后去更新mysql,此时Mysql正在更新中,还没有结束(比如出现了网络延迟),此时B突然出现要来读取缓存。
此时redis里面的数据是空的,B线程来读,先去读redis里面的数据(但数据被A给删除了),此时就有两个问题了:
- B去读Mysql但是获取了旧值(A没更新完)
- B获取旧值后写回redis(刚被A删除的数据又写回redis了)
此时A更新完Mysql了,发现redis里面的缓存是脏数据,给A线程干的CPU直接烧了。于是缓存中的数据还是老数据,导致缓存中的数据是脏的,而且还会一直脏下去。
延时双删你做过吗?会有哪些问题?
在注释暂停两秒的地方,加上Thread.sleep(2)
,就是为了让b线程先从数据看中读取数据,再把缺失的数据写入缓存,然后线程A再进行删除,所以,线程A sleep的时间,就需要大雨线程B读取数据的时间,这样一来,其它线程读取数据时,会发现缓存缺失,所以会从数据看中读取最新的值,因为这个方案会在第一次删除缓存值后,延迟一段时间再次进行删除,所以我们也把它叫做延迟双删。
针对延迟双删也会出现一些问题:
这个删除该休眠多久?
确定这个时间有两种方法,第一种是在业务程序运行的时候,统计程序读数据和写缓存的操作时间,自行评估自己的项目的读数据业务逻辑的耗时。以此为基础进行估算,然后写数据的休眠时间则在读数据业务逻辑的耗时基础上加上100ms。这么做的目的就是为了确保请求结束,写请求可以删除读请求造成的缓存脏数据。另一种方案就是新启动一个后台监控程序,比如要讲解的WatchDog。
这种同步淘汰策略,吞吐量降低怎么办?
由于A线程会睡眠阻塞后面的业务,这确实会导致业务吞吐量降低。此时可以考虑第二次删的时候,再开一个线程来异步删除。
- 方案四:先更新数据库再删除缓存
这种方案相比上面三种方案是最好的,但它也存在它的问题。
上面缓存删除失败或者来不及删除,导致请求再次访问redis缓存命中的时候,读取到的缓存是旧的值。
redis和mysql双写100%会出纰漏,做不到强一致性,你如何保证最终一致性?
- 我们可以把要删除的缓存值或者是要更新的值暂存在消息队列中(Kafka/RabbitMQ)
- 当程序没有能够成功地删除缓存值或者是更新数据库值时,可以从消息队列中重写读取这些值面,然后再次进行删除或者更新
- 如果能够成功删除或更新,我们就要把这些值从消息队列中去除,以免重复操作,此时,我们也可以保证数据库和缓存的数据一致了,否则还需要重试
- 如果重试超过一定次数后还没有成功,我们就需要向业务层发送报错信息了,通知运维人员
业务指导思想:
微软云
阿里巴巴的canal
- 总结
4. canal简介
如何知道Mysql数据发生了变动?
Mysql在更新数据的时候所有的更新操作都会记录到binlog中,如果我们拿到了这个binlog我们相应就知道了Mysql数据库的变动情况。所以我们需要一个技术,能监听到mysql变动,且能通知到redis,这个中间件就是canal。
Canal是什么?
Canal是阿里巴巴的Mysql Binlog增量订阅和消费组件,主要用途是用于Mysql数据库增量日志数据的订阅、消费和解析,是阿里巴巴开发并开源的,采用java语言开发。Canal模拟MySQL的slave角色,向MySQL请求binlog,然后解析binlog成为DBChange对象。
Canal能干什么?
- 数据库景象
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务cache刷新
- 带业务逻辑的增量数据处理
Canal工作原理?
首先了解一个传统的Mysql主从复制的工作原理:
Mysql主从复制将经历下面这些过程:
- 当master主服务器上的数据发生改变时,则将其改变写入到二进制日志文件当中
- slave从服务器会在一定时间间隔内对master服务器上的二进制文件进行探测,探测其是否发生变化(offset偏移量),如果探测到master主服务器的二进制事件日志发生了改变,则开始一个I/O Thread请求master二进制事件日志
- 同时master主服务器为每个I/O thread启动一个dump Thread,用于向其发送二进制事件文件
参考Mysql主从复制的原理,canal就出现了。
canal模拟Mysql slave的交互协议,伪装自己为mysql slave,向mysql master 发送dump协议,Mysql master受到dump请求,开始推送binary log给slave(即canal),canal解析binary log对象(原始流为byte)。
5. Redis与Mysql数据双写一致性工程落地案例
- Mysql配置
查看Mysql版本
SELECT VERSION();
查看当前主机的binlog
SHOW MASTER STATUS;
查看当前binlog是否对外开放
SHOW VARIABLES LIKE 'log_bin'
默认时关闭的,这里我打开了
开启binlog的写入功能
打开Mysql的my.ini配置文件,进行以下配置
log-bin=mysql-bin #开启binlog
binlog-format=ROW #选择row模式
server_id=1 #配置Mysql replaction需要定义,不要和canal的slaveid相同
Row模式除了记录sql语句之外,还会记录每个字段的变化情况,能够清楚的记录每行记录的变化历史,但会占用更多空间
STATEMENT模式只记录sql语句,但是没有记录上下文信息,在进行数据恢复的时候可能会丢失数据
MIX模式比较灵活的记录,理论上说当遇到了表结构发生变更的情况,就会时statement模式,当遇到数据更新或者删除的情况就会变味row模式
授权canal连接mysql
DROP USER IF EXISTS 'canal'@'%';
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
FLUSH PRIVILEGES;
- canal配置
下载地址
解压
解压后放在指定的一个目录就行
配置
修改conf/example
路径下的instance.properties文件
- 换成自己的mysql主机的master地址
- 换成自己的在mysql新建立的canal账户
启动canal
./startup.sh
查看是否启动成功
查看example.log和canal.log即可
- java程序开发
引入jar包
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.3</version></dependency>
添加配置
spring:datasource:master:url: jdbc:mysql://localhost:3306/atguigudb?characterEncoding=utf-8&useSSL=falsedriver-class-name: com.mysql.cj.jdbc.Driverusername: rootpassword: 123456
业务类
public class RedisUtils {public static final String REDIS_IP_ADDR="127.0.0.1";public static JedisPool jedispool;static {JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();jedisPoolConfig.setMaxTotal(20);jedisPoolConfig.setMaxIdle(20);jedispool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,6379,10000);}public static Jedis getJedis() throws Exception {if(null!=jedispool){return jedispool.getResource();}throw new Exception("Jedis poll is not ok");}
}
package com.jack.mybatis_plus.biz;import java.net.InetSocketAddress;
import java.util.List;import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.jack.mybatis_plus.utils.RedisUtils;
import redis.clients.jedis.Jedis;import java.util.List;
import java.util.UUID;public class SimpleCanalClientExample {public static final Integer _60SECONDS=60;public static final String REDIS_IP_ADDR="127.0.0.1";public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR,11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();//connector.subscribe(".*\\..*"); 订阅全部库全部表connector.subscribe("atguigudb.jobs"); //只订阅atguigudb数据库的jobs表connector.rollback();//这几个就指定了监听程序·10分钟结束int totalEmptyCount = 10*_60SECONDS;while (emptyCount < totalEmptyCount) {System.out.println("我是canal,每一秒一次正在监听:"+ UUID.randomUUID().toString());Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据,一次处理1000条long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {//表示当前数据库没有变动emptyCount++;try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {//计数器置0emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启!");} finally {connector.disconnect();}}private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {//获取已经变更的数据rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {redisDelete(rowData.getAfterColumnsList());} else if (eventType == EventType.INSERT) {redisInsert(rowData.getAfterColumnsList());} else {redisUpdate(rowData.getAfterColumnsList());}}}}private static void redisUpdate(List<Column> columns) {JSONObject jsonObject=new JSONObject();for (Column column : columns) {System.out.println(column.getName()+" : "+column.getValue()+" update="+column.getUpdated());jsonObject.put(column.getName(),column.getValue());}if(columns.size()>0){try(Jedis jedis= RedisUtils.getJedis()){jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());System.out.println("--------update after: "+jedis.get(columns.get(0).getValue()));}catch (Exception e){e.printStackTrace();}}}private static void redisDelete(List<Column> columns) {JSONObject jsonObject=new JSONObject();for (Column column : columns) {jsonObject.put(column.getName(),column.getValue());}if(columns.size()>0){try(Jedis jedis= RedisUtils.getJedis()){jedis.del(columns.get(0).getValue());}catch (Exception e){e.printStackTrace();}}}private static void redisInsert(List<Column> columns) {JSONObject jsonObject=new JSONObject();for (Column column : columns) {System.out.println(column.getName()+" : "+column.getValue()+" insert="+column.getUpdated());jsonObject.put(column.getName(),column.getValue());}if(columns.size()>0){try(Jedis jedis= RedisUtils.getJedis()){jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());}catch (Exception e){e.printStackTrace();}}}
}
测试
修改数据
被正确同步到redis