缓存预热
定义
缓存预热是一种优化方案,它可以提高用户的使用体验。
缓存预热是指在系统启动的时候,先把查询结果预存到缓存中,以便用户后面查询时可以直接从缓存中读取,节省用户等待时间
实现思路
- 把需要缓存的方法写在初始化方法中,让程序启动时自动加载数据并缓存数据。
- 把需要缓存的方法挂在某个页面或是后端接口上,手动触发缓存预热。
- 设置定时任务,定时进行缓存预热。
解决方案
使用 @PostConstruct 初始化白名单数据
缓存雪崩(大量数据同时失效/Redis 崩了,没有数据了)
定义
缓存雪崩是指在短时间内大量缓存同时过期,导致大量请求直接查询数据库, 从而对数据库造成巨大压力,严重情况下可能会导致数据库宕机
解决方案
- 加锁排队:起到缓冲作用,防止大量请求同时操作数据库,但缺点是增加了系统的响应时间,降低了系统的吞吐量,牺牲一部分用户体验。
- 设置二级缓存:二级缓存是除了 Redis 本身的缓存,再设置一层缓存,当 Redis 失效后,就先去查询二级缓存
- 随机化过期时间:为了避免缓存同时过期,可以设置缓存时添加随机时间,这样就可以极大的避免大量缓存同时失效
- redis 缓存集群实现高可用
- 主从 + 哨兵
- Redis 集群
- 开启Redis 持久化机制 aof / rdb,尽快恢复缓存集群
- 服务降级
- Hystrix 或者 sentinel 限流 & 降级
// 缓存原本的失效时间
int exTime = 10 * 60;
// 随机数⽣成类
Random random = new Random();
// 缓存设置
jedis.setex(cacheKey, exTime+random.nextInt(1000) , value);
缓存穿透(黑客攻击/空数据/穿过 Redis 和数据库)
定义
- 缓存穿透是指查询数据库和缓存都无数据,因此每次请求都会去查询数据库
解决方案
- **缓存空结果:**对查询的空结果也进行缓存,如果是集合,可以缓存一个空的的集合,如果是缓存单个对象,可以字段标识来区分,避免请求穿透到数据库。
- **布隆过滤器处理:**将所有可能对应的数据为空的 key 进行统一的存放,并在请求前做拦截,避免请求穿透到数据库(这样的方式实现起来相对麻烦,比较适合命中不高,但是更新不频繁的数据)。
- 双锁锁策略机制
package com.redis.redis01.service;import com.redis.redis01.bean.RedisBs;
import com.redis.redis01.mapper.RedisBsMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;
import java.beans.Transient;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;@Slf4j
@Service
public class RedisBsService {//定义key前缀/命名空间public static final String CACHE_KEY_USER = "user:";@Autowiredprivate RedisBsMapper mapper;@Resourceprivate RedisTemplate<String, Object> redisTemplate;private static ReentrantLock lock = new ReentrantLock();/*** 业务逻辑没有写错,对于中小长(qps<=1000)可以使用,但是大厂不行:大长需要采用双检加锁策略** @param id* @return*/@Transactionalpublic RedisBs findUserById(Integer id,int type,int qps) {//qps<=1000if(qps<=1000){return qpsSmall1000(id);}//qps>1000return qpsBig1000(id, type);}/*** 加强补充,避免突然key失效了,或者不存在的key穿透redis打爆mysql,做一下预防,尽量不出现缓存击穿的情况,进行排队等候* @param id* @param type 0使用synchronized重锁,1ReentrantLock轻量锁* @return*/private RedisBs qpsBig1000(Integer id, int type) {RedisBs redisBs = null;String key = CACHE_KEY_USER + id;//1先从redis里面查询,如果有直接返回,没有再去查mysqlredisBs = (RedisBs) redisTemplate.opsForValue().get(key);if (null == redisBs) {switch (type) {case 0://加锁,假设请求量很大,缓存过期,大厂用,对于高qps的优化,进行加锁保证一个请求操作,让外面的redis等待一下,避免击穿mysqlsynchronized (RedisBsService.class) {//第二次查询缓存目的防止加锁之前刚好被其他线程缓存了redisBs = (RedisBs) redisTemplate.opsForValue().get(key);if (null != redisBs) {//查询到数据直接返回return redisBs;} else {//数据缓存//查询mysql,回写到redis中redisBs = mapper.findUserById(id);if (null == redisBs) {// 3 redis+mysql都没有数据,防止多次穿透(redis为防弹衣,mysql为人,穿透直接伤人,就是直接访问mysql),优化:记录这个null值的key,列入黑名单或者记录或者异常return new RedisBs(-1, "当前值已经列入黑名单");}//4 mysql有,回写保证数据一致性//setifabsentredisTemplate.opsForValue().setIfAbsent(key, redisBs,7l, TimeUnit.DAYS);}}break;case 1://加锁,大厂用,对于高qps的优化,进行加锁保证一个请求操作,让外面的redis等待一下,避免击穿mysqllock.lock();try {//第二次查询缓存目的防止加锁之前刚好被其他线程缓存了redisBs = (RedisBs) redisTemplate.opsForValue().get(key);if (null != redisBs) {//查询到数据直接返回return redisBs;} else {//数据缓存//查询mysql,回写到redis中redisBs = mapper.findUserById(id);if (null == redisBs) {// 3 redis+mysql都没有数据,防止多次穿透(redis为防弹衣,mysql为人,穿透直接伤人,就是直接访问mysql),优化:记录这个null值的key,列入黑名单或者记录或者异常return new RedisBs(-1, "当前值已经列入黑名单");}//4 mysql有,回写保证数据一致性redisTemplate.opsForValue().set(key, redisBs);}} catch (Exception e) {e.printStackTrace();} finally {//解锁lock.unlock();}}}return redisBs;}private RedisBs qpsSmall1000(Integer id) {RedisBs redisBs = null;String key = CACHE_KEY_USER + id;//1先从redis里面查询,如果有直接返回,没有再去查mysqlredisBs = (RedisBs) redisTemplate.opsForValue().get(key);if (null == redisBs) {//2查询mysql,回写到redis中redisBs = mapper.findUserById(id);if (null == redisBs) {// 3 redis+mysql都没有数据,防止多次穿透(redis为防弹衣,mysql为人,穿透直接伤人,就是直接访问mysql),优化:记录这个null值的key,列入黑名单或者记录或者异常return new RedisBs(-1, "当前值已经列入黑名单");}//4 mysql有,回写保证数据一致性redisTemplate.opsForValue().set(key, redisBs);}return redisBs;}}
package com.redis.redis01.service;import com.google.common.collect.Lists;
import com.redis.redis01.bean.RedisBs;
import com.redis.redis01.mapper.RedisBsMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;@Slf4j
@Service
public class BitmapService {@Resourceprivate RedisTemplate<String, Object> redisTemplate;private static ReentrantLock lock = new ReentrantLock();@Autowiredprivate RedisBsMapper redisBsMapper;/*** 场景一:布隆过滤器解决缓存穿透问题(null/黑客攻击);利用redis+bitmap实现* 有可能有,没有一定没有* 无-------------》mysql查询* 有--------》redis查询----------》有-----------》返回* 请求-----》布隆过滤器-----------》* 无-------终止** @param type:0初始化,1常规查询*/public void booleanFilterBitmap(int type, Integer id) {switch (type) {case 0://初始化数据for (int i = 0; i < 10; i++) {RedisBs initBs = RedisBs.builder().id(i).name("赵三" + i).phone("1580080569" + i).build();//1 插入数据库redisBsMapper.insert(initBs);//2 插入redisredisTemplate.opsForValue().set("customer:info" + i, initBs);}//3 将用户id插入布隆过滤器中,作为白名单for (int i = 0; i < 10; i++) {String booleanKey = "customer:booleanFilter:" + i;//3.1 计算hashvalueint abs = Math.abs(booleanKey.hashCode());//3.2 通过abs和2的32次方取余,获得布隆过滤器/bitmap对应的下标坑位/indexlong index = (long) (abs % Math.pow(2, 32));log.info("坑位:{}", index);//3.3 设置redis里面的bitmap对应类型的白名单redisTemplate.opsForValue().setBit("whiteListCustomer", index, true);}break;case 1://常规查询//1 获取当前传过来的id对应的哈希值String inputBooleanKey = "customer:booleanFilter:" + id;int abs = Math.abs(inputBooleanKey.hashCode());long index = (long) (abs % Math.pow(2, 32));Boolean whiteListCustomer = redisTemplate.opsForValue().getBit("whiteListCustomer", index);//加入双检锁//加锁,大厂用,对于高qps的优化,进行加锁保证一个请求操作,让外面的redis等待一下,避免击穿mysqllock.lock();try {if (null == whiteListCustomer) {whiteListCustomer = redisTemplate.opsForValue().getBit("whiteListCustomer", index);if (null != whiteListCustomer && whiteListCustomer) {//布隆过滤器中存在,则可能存在//2 查找redisObject queryCustomer = redisTemplate.opsForValue().get("customer:info" + id);if (null != queryCustomer) {log.info("返回客户信息:{}", queryCustomer);break;} else {//3 redis没有查找mysqlRedisBs userById = redisBsMapper.findUserById(id);if (null != userById) {log.info("返回客户信息:{}", queryCustomer);redisTemplate.opsForValue().set("customer:info" + id, userById);break;} else {log.info("当前客户信息不存在:{}", id);break;}}} else {//redis没有,去mysql中查询//3 redis没有查找mysqlRedisBs userById = redisBsMapper.findUserById(id);if (null != userById) {log.info("返回客户信息:{}", userById);redisTemplate.opsForValue().set("customer:info" + id, userById);break;} else {log.info("当前客户信息不存在:{}", id);break;}}}} finally {lock.unlock();}log.info("当前客户信息不存在:{}", id);break;default:break;}}
}
缓存击穿(热点数据/刚失效/定点打击)
定义
缓存击穿是指某个经常使用的缓存,在某一个时刻恰好失效了(例如缓存过期),并且此时刚好有大量的并发请求,这些请求就会给数据库造成巨大的压力
解决方案
- **加锁排队:**和处理缓存雪崩的加锁类似,都是在查询数据库的时候加锁排队,缓存操作请求以此来减少服务器的运行压力。
- **设置永不过时:**对于某些经常使用的缓存,我们可以设置为永不过期,这样就能保证缓存的稳定性,但要注意在数据更改后,要及时更新此热点缓存,否则就会造成查询结果误差。
总结
脑裂
分布式session
分布式锁
- 分布式锁需要的条件和刚需
- 独占性
- 任何时刻有且只有一个线程持有这个锁
- 高可用
- 若redis集群环境下,不能因为某一个节点挂了而出现获取锁和释放锁失败的情况
- 高并发请求下,依旧性能很好
- 防死锁
- 不能出现死锁问题,必须有超时重试机制或者撤销操作,有个终止跳出的途径
- 不乱抢
- 防止张冠李戴,只能解锁自己的锁,不能把别人的锁给释放了
- 重入性
- 同一节点的同一线程如果获得锁之后,他可以再次获取这个锁
- 独占性
v 8.0 其实面对不是特别高的并发场景足够用了,单机redis也够用了
- 要兼顾锁的重入性
- setnx不满足了,需要hash结构的hset
- 上锁和解锁都用 Lua 脚本来实现原子性
- 引入工厂模式 DistributedLockFactory, 实现 Lock 接口,实现redis的可重入锁
- lock() 加锁的关键逻辑
- 加锁 实际上就是在redis中,给Key键设置一个值,为避免死锁,并给定一个过期时间
- 自旋
- 续期
- unlock() 解锁关键逻辑
- 将 Key 键删除,但是也不能乱删,只能自己删自己的锁
- lock() 加锁的关键逻辑
- 实现自动续期功能的完善,后台自定义扫描程序,如果规定时间内没有完成业务逻辑,会调用加钟自动续期的脚本
@Autowiredprivate DistributedLockFactory distributedLockFactory;/*** v8.0 实现自动续期功能的完善,后台自定义扫描程序,如果规定时间内没有完成业务逻辑,会调用加钟自动续期的脚本** @return*/public String sale() {String retMessage = "";Lock redisLock = distributedLockFactory.getDistributedLock("redis");redisLock.lock();try {//1 查询库存信息String result = stringRedisTemplate.opsForValue().get("inventory001");//2 判断库存是否足够Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);//3 扣减库存if (inventoryNumber > 0) {stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventoryNumber));retMessage = "成功卖出一个商品,库存剩余: " + inventoryNumber;// 演示自动续期的的功能
// try {
// TimeUnit.SECONDS.sleep(120);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }} else {retMessage = "商品卖完了,o(╥﹏╥)o";}} finally {redisLock.unlock();}return retMessage + "\t" + "服务端口号:" + port;}/*** v7.0 兼顾锁的可重入性 setnx不满足了,需要hash结构的hset* 上锁和解锁都用 Lua 脚本实现原子性* 引入工厂模式 DistributedLockFactory 实现Lock接口 ,实现 redis的可重入锁** @return*/
// //private Lock redisDistributedLock = new RedisDistributedLock(stringRedisTemplate, "xfcyRedisLock");
//
// public String sale() {
// String retMessage = "";
//
// Lock redisLock = distributedLockFactory.getDistributedLock("redis");
// redisLock.lock();
//
// //redisDistributedLock.lock();
// try {
// //1 查询库存信息
// String result = stringRedisTemplate.opsForValue().get("inventory001");
// //2 判断库存是否足够
// Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
// //3 扣减库存
// if (inventoryNumber > 0) {
// stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventoryNumber));
// retMessage = "成功卖出一个商品,库存剩余: " + inventoryNumber;
// System.out.println(retMessage);
//
// // 测试可重入性
// //testReEntry();
//
// } else {
// retMessage = "商品卖完了,o(╥﹏╥)o";
// }
// } finally {
// redisLock.unlock();
// //redisDistributedLock.unlock();
// }
// return retMessage + "\t" + "服务端口号:" + port;
// }
//
// private void testReEntry() {
// Lock redisLock = distributedLockFactory.getDistributedLock("redis");
// redisLock.lock();
//
// //redisDistributedLock.lock();
// try {
// System.out.println("测试可重入锁");
// } finally {
// redisLock.unlock();
// //redisDistributedLock.unlock();
// }
// }
package com.xfcy.mylock;import cn.hutool.core.util.IdUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.util.concurrent.locks.Lock;/*** @author 晓风残月Lx* @date 2023/4/1 22:14*/
@Component
public class DistributedLockFactory {@Autowiredprivate StringRedisTemplate stringRedisTemplate;private String lockName;private String uuidValue;public DistributedLockFactory() {this.uuidValue = IdUtil.simpleUUID();}public Lock getDistributedLock(String lockType) {if (lockType == null) {return null;}if (lockType.equalsIgnoreCase("REDIS")) {this.lockName = "xfcyRedisLock";return new RedisDistributedLock(stringRedisTemplate, lockName, uuidValue);}else if (lockType.equalsIgnoreCase("ZOOKEEPER")) {this.lockName = "xfcyZookeeperLock";// TODO zoookeeper 版本的分布式锁return null;}else if (lockType.equalsIgnoreCase("MYSQL")){this.lockName = "xfcyMysqlLock";// TODO MYSQL 版本的分布式锁return null;}return null;}}
package com.xfcy.mylock;import cn.hutool.core.util.IdUtil;
import com.sun.org.apache.xpath.internal.operations.Bool;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;/*** @author 晓风残月Lx* @date 2023/4/1 21:38* 自研的redis分布式锁,实现 Lock 接口*/
// @Component 引入DistributedLockFactory工厂模式,从工厂获得即可
public class RedisDistributedLock implements Lock {@Autowiredprivate StringRedisTemplate stringRedisTemplate;private String lockName; // KEYS[1]private String uuidValue; // ARGV[1]private long expireTime; // ARGV[2]public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockName, String uuidValue) {this.stringRedisTemplate = stringRedisTemplate;this.lockName = lockName;this.uuidValue = uuidValue + ":" + Thread.currentThread().getId();this.expireTime = 30L;}@Overridepublic void lock() {tryLock();}@Overridepublic boolean tryLock() {try {tryLock(-1L, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}return false;}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {if (time == -1L) {String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " +"redis.call('hincrby',KEYS[1],ARGV[1],1) " +"redis.call('expire',KEYS[1],ARGV[2]) " +"return 1 " +"else " +"return 0 " +"end";System.out.println("lockName = " + lockName +"\t" + "uuidValue = " + uuidValue);while (!stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime))) {// 暂停 60msThread.sleep(60);}// 新建一个后台扫描程序,来监视key目前的ttl,是否到我们规定的 1/2 1/3 来实现续期resetExpire();return true;}return false;}@Overridepublic void unlock() {String script = "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then " +"return nil " +"elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " +"return redis.call('del',KEYS[1]) " +"else " +"return 0 " +"end";// nil = false 1 = true 0 = falseLong flag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));if (null == flag) {throw new RuntimeException("this lock doesn't exists0");}}private void resetExpire() {String script = "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then " +"return redis.call('expire',KEYS[1],ARGV[2]) " +"else " +"return 0 " +"end";new Timer().schedule(new TimerTask() {@Overridepublic void run() {if (stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime))) {resetExpire();}}}, (this.expireTime * 1000) / 3);}// 下面两个用不上// 下面两个用不上// 下面两个用不上@Overridepublic void lockInterruptibly() throws InterruptedException {}@Overridepublic Condition newCondition() {return null;}
}