1. 商户查询缓存
2. 知识储备和课程内容
2.1 什么是缓存
缓存是数据交换的缓冲区,是存贮数据的临时地方,一般读写性能较高。
- 浏览器缓存
- 应用层缓存
- 数据库缓存
- CPU缓存
- 磁盘缓存
缓存的作用:
- 降低后端负载
- 提高读写效率,降低响应时间
缓存的成本:
- 数据的一致性成本
- 代码维护成本
- 运维成本
2.2 缓存更新策略
业务查询:
- 低一致性需求:使用内存淘汰策略。例如店铺类型的查询缓存
- 高一致性需求:主动更新,并以超时剔除作为兜底方案。例如店铺详情查询的缓存。
2.2.1 主动更新策略
Cache Aside Pattern(旁路缓存模式)(企业中用的比较多)
- Cache Aside Pattern
- 指缓存调用者在更新数据库的同时完成对缓存的更新。(一致性良好,实现难度一般)
- Read/Write Through Pattern
- 缓存和数据库集成为一个服务,由服务来保证两者的一致性,对外暴露API接口。调用者调用API,无序知道自己操作的是数据库还是缓存,不关心一致性。(一致性优秀,实现复杂,性能一般)
- Write Behind Caching Pattern
- 缓存调用者的CRUD都针对缓存完成,由独立线程异步的将缓存数据写到数据库,实现最终一致(一致性差,性能好,实现复杂)
Cache Aside Pattern基本思想:
- 当需要获取数据时,首先在缓存中查找数据。
- 如果在缓存中找到了数据,则直接返回给客户端。
- 如果在缓存中没有找到数据,则从后端存储系统(如数据库)中读取数据,并将数据存储到缓存中。
- 在写入数据时,首先更新后端存储系统中的数据,然后让缓存中的数据失效或更新,以便下次读取时从后端存储系统中获取最新数据。
特点:
- 简单直观:模式简单易懂,易于实现。
- 读性能提升:大部分读操作可以直接从缓存中获取数据,减少了对后端存储系统的访问。
- 数据一致性:通过手动管理缓存和后端存储系统中的数据一致性,确保数据的准确性。
需要思考的问题!
- 删除缓存还是更新缓存?
- 更新缓存:每次更新数据库都更新缓存,无效写操作较多(❎)
- 删除操作:更新数据库时让缓存失效,查询时再更新缓存(✅)
- 如何保证缓存与数据库的操作同时成功或失败?
- 单体系统,将缓存与数据库操作放在一个事务
- 分布式系统,利用TCC等分布式事务方案
- 先操作缓存还是先操作数据库?
- 先删除缓存,在操作数据库
- 先操作数据库,在删除缓存
先操作缓存还是先操作数据库(重要)?
相比较而言方案二安全性更高一些:
原因:方案二需要满足,线程1查询时缓存恰好失效,且更新数据库的操作间隔要比写入缓存的时间短。(但还是有可能),需要赋予超时剔除作为兜底方案。
2.3 缓存穿透
缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这些请求都会打到数据库,给数据库带来巨大的压力。
解决方案
- 缓存空对象
- 布隆过滤
- 增强id的复杂度,避免被猜测id规律
- 做好数据的基础格式校验
- 加强用户权限校验
- 做好热点参数的限流
2.4 缓存雪崩
缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。
解决方案:
- 给不同的Key的TTL添加随机值
- 利用Redis集群提高服务的可用性
- 给缓存业务添加降级限流策略
- 给业务添加多级缓存
2.5 缓存击穿
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务比较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
2.5.1 解决方案1:互斥锁
存在的问题:需要等待阻塞
利用setnx来模拟简单的分布式锁。
# 获得锁(一般上会设置有效期)
setnx lock 1
# 删除锁
del lock
2.5.2 解决方案2:逻辑过期
基于逻辑过期的方式会存在一段时间内的不一致性,一旦线程完成了缓存重建,就能够得到一致性的结果。
2.5.3 解决方案对比
2.6 缓存工具封装
基于StringRedisTemplate封装一个缓存工具类,满足下列需求:
- 方法1: 将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
- 方法2: 将任意java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
- 方法3: 根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
- 方法4: 根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
@Slf4j
@Component
public class CacheClient {private final StringRedisTemplate stringRedisTemplate;private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);public CacheClient(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public void set(String key, Object value, Long time, TimeUnit unit){stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit);}public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit){// 设置逻辑过期RedisData redisData = new RedisData();redisData.setData(value);redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));// 写入RedisstringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));}public <R,ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallback, Long time, TimeUnit unit){String key = keyPrefix + id;// 1. 从Redis中查询缓存String json = stringRedisTemplate.opsForValue().get(key);// 2. 判断是否存在if (StrUtil.isNotBlank(json)){// 3. 存在,直接返回return JSONUtil.toBean(json,type);}// 判断命中的是否是空值if (json!=null){return null;}// 4. 不存在,根据id查询数据库R r = dbFallback.apply(id);// 5. 数据库不存在,返回错误if (r==null){stringRedisTemplate.opsForValue().set(key,"",RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);return null;}// 6. 存在,写入Redisthis.set(key,r,time,unit);// 7. 返回return r;}public <R,ID> R queryWithLogicalExpire(String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallback, Long time, TimeUnit unit){String key = keyPrefix + id;// 1. 从Redis中查询缓存String json = stringRedisTemplate.opsForValue().get(key);// 2. 判断是否存在if (StrUtil.isBlank(json)){// 3. 不存在直接返回return null;}// 4.命中,需要先把json反序列化对象RedisData redisData = JSONUtil.toBean(json, RedisData.class);R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);LocalDateTime expireTime = redisData.getExpireTime();// 5. 判断是否过期if(expireTime.isAfter(LocalDateTime.now())){// 5.1 未过期,直接返回店铺信息return r;}// 5.2 已过期,需要缓存重建// 6 缓存重建// 6.1 获取互斥锁String lockKey = RedisConstants.LOCK_SHOP_KEY + id;boolean isLock = tryLock(lockKey);// 6.2 判断是否获取锁成功if(isLock){// 6.3 成功,开启独立线程,实现缓存重建CACHE_REBUILD_EXECUTOR.submit(()->{// TODO 重建缓存,需要修改过期时间为1800秒try {// 查询数据库R r1 = dbFallback.apply(id);// 写入redisthis.setWithLogicalExpire(key,r1,time,unit);}catch (Exception e){throw new RuntimeException(e);}finally {unlock(lockKey);}});}// 6.4 先返回过期的商铺信息return r;}private boolean tryLock(String key){Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);return BooleanUtil.isTrue(flag);}private void unlock(String key){stringRedisTemplate.delete(key);}
}
3. 问题汇总
3.1 基于互斥锁的递归是否存在问题?
递归调用 queryWithMutex(id) 可能会导致栈溢出,因为没有任何条件来终止递归。在这种情况下,如果无法获取锁,线程会无限制地尝试递归调用自身,并且每次递归都会消耗一些栈空间,最终导致栈溢出异常。
public Shop queryWithMutex(Long id){String key = RedisConstants.CACHE_SHOP_KEY + id;// 1. 从Redis中查询缓存String shopJson = stringRedisTemplate.opsForValue().get(key);// 2. 判断是否存在if (StrUtil.isNotBlank(shopJson)){// 3. 存在,直接返回return JSONUtil.toBean(shopJson,Shop.class);}// 判断命中的是否是空值if (shopJson!=null){return null;}// 4. 实现缓存重建// 4.1 获取互斥锁String lockKey = RedisConstants.LOCK_SHOP_KEY + id;Shop shop = null;try {boolean isLock = tryLock(lockKey);// 4.2 判断是否获取成功if (!isLock){// 4.3 失败,则休眠并重试Thread.sleep(50);// TODO 感觉这里代码有问题。建议重新修改return queryWithMutex(id);}// 4.4 成功,根据id查询数据库shop = getById(id);// TODO 模拟重建的延时(正常运行时需要删除)Thread.sleep(200);// 5. 数据库不存在,返回错误if (shop==null){stringRedisTemplate.opsForValue().set(key,"",RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);return null;}// 6. 存在,写入RedisstringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);}catch (Exception e){throw new RuntimeException(e);}finally {// 7.释放获取锁unlock(lockKey);}// 8. 返回return shop;}
修改后的代码如下:采用了循环替代了递归,并设置了最大循环次数。达到最大循环后没有成功即返回null.在并发为200/s的时候平均每个请求需要在循环中执行的次数为7次。
public Shop queryWithMutex2(Long id){String key = RedisConstants.CACHE_SHOP_KEY + id;// 从Redis中查询缓存String shopJson = stringRedisTemplate.opsForValue().get(key);if (StrUtil.isNotBlank(shopJson)){return JSONUtil.toBean(shopJson,Shop.class);}// 判断命中的是否是空值,即是否等于空字符串if (shopJson!=null){return null;}// 尝试准备从数据库中获取数据int MAX_RETRY_COUNT = 10;boolean isLock = false;int retryCount = 0;String lockKey = RedisConstants.LOCK_SHOP_KEY + id;Shop shop = null;try{// 4.2 循环重试直至获取锁成功或达到最大重试次数while (!isLock && retryCount < MAX_RETRY_COUNT) {isLock = tryLock(lockKey);if (!isLock) {// 4.3 失败,则休眠并重试Thread.sleep(50);retryCount++;}//休眠结束后尝试从缓存中查询数据shopJson = stringRedisTemplate.opsForValue().get(key);if (StrUtil.isNotBlank(shopJson)){System.out.println("Thread尝试的次数为:"+retryCount);return JSONUtil.toBean(shopJson,Shop.class);}// 否者继续循环获得锁}// 判断是否获取锁成功,或者超过最大重试次数if (!isLock || retryCount == MAX_RETRY_COUNT){return null;}// 获取锁成功,根据id查询数据库shop = getById(id);// TODO 模拟重建的延时(正常运行时需要删除)Thread.sleep(200);// 5. 数据库不存在,返回错误if (shop==null){stringRedisTemplate.opsForValue().set(key,"",RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);return null;}// 6. 存在,写入RedisstringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);}catch (Exception e){throw new RuntimeException(e);}finally {// 7.释放获取锁unlock(lockKey);}// 8. 返回return shop;}