目录
- 缓存的处理流程
- 缓存穿透
- 解释
- 产生原因
- 解决方案
- 1.针对不存在的数据也进行缓存
- 2.设置合适的缓存过期时间
- 3. 对缓存访问进行限流和降级
- 4. 接口层增加校验
- 5. 布隆过滤器
- 原理
- 优点
- 缺点
- 关于扩容
- 其他使用场景
- SpringBoot 整合 布隆过滤器
- 缓存击穿
- 产生原因
- 解决方案
- 1.设置热点数据永不过期
- 优点
- 缺点
- 示例
- 2.使用互斥锁
- 优点
- 缺点
- 示例
- 3. 使用分布式缓存
- 4. 设置随机过期时间
- 缓存雪崩
- 产生原因
- 解决方案
- 对于大量的缓存key同时失效
- 对于redis服务宕机
缓存的处理流程
- 客户端发起请求,服务端先尝试从缓存中取数据,若取到直接返回结果
- 若取不到,则从数据库中取,然后将取出的数据更新到缓存,并返回结果
- 若数据库也没取到,那直接返回空结果。
缓存穿透
解释
- Redis缓存穿透指的是针对一个不存在于缓存中的数据进行查询操作,由于缓存中不存在该数据,所以会直接访问数据库获取数据。
- 这种情况下,如果有大量并发查询这个不存在于缓存中的数据,就会导致大量的查询请求直接访问数据库,增加了数据库的负载,并且没有起到缓存的作用。
产生原因
- 缓存穿透的主要原因是恶意攻击或者查询频率极高的缓存击穿。
- 恶意攻击指的是有人故意请求不存在于缓存中的数据,以此来进行攻击。
- 查询频率极高的缓存击穿是指某个热点数据在缓存中过期后,大量并发访问该数据,导致缓存失效并且直接访问数据库。
解决方案
1.针对不存在的数据也进行缓存
针对不存在的数据也进行缓存:即使查询的数据不存在于数据库中,也可以将查询的结果缓存起来,标记为不存在。这样下次查询同样的数据时,就可以直接从缓存中获取结果,而不需要再查询数据库。
2.设置合适的缓存过期时间
设置合适的缓存过期时间:对于热点数据,可以设置较长的缓存过期时间,让它在过期之前不会被失效。这样可以减少缓存击穿的风险。
3. 对缓存访问进行限流和降级
对缓存访问进行限流和降级:限制并发访问缓存的请求数量,避免由于高并发访问导致缓存失效。当缓存失效时,可以选择使用备用方案,如返回默认值或者降级处理。
4. 接口层增加校验
接口层增加校验:如用户鉴权校验;查询数据的id做基础校验,id<=0的直接拦截;
5. 布隆过滤器
- 使用布隆过滤器(Bloom Filter)来过滤掉不存在的数据:布隆过滤器是一种高效的数据结构,可以用来判断一个元素是否存在于一个集合中,如果布隆过滤器判断一个元素不存在,则可以避免进行数据库查询操作。
- 它由位数组(Bit Array)和一系列哈希函数组成。
原理
- 初始化时,位数组的所有位都被设置为0。
- 当要插入一个元素时,使用预先设定好的多个独立、均匀分布的哈希函数对元素进行哈希运算,每个哈希函数都会计算出一个位数组的索引位置。
- 将通过哈希运算得到的每个索引位置的位设置为1。
- 查询一个元素是否存在时,同样用相同的哈希函数对该元素进行运算,并检查对应位数组的位置是否都是1。
- 如果所有位都为1,则认为该元素可能存在于集合中;
- 如果有任何一个位为0,则可以确定该元素肯定不在集合中。
- 由于哈希碰撞的存在,当多位同时为1时,可能出现误报(False Positive),即报告元素可能在集合中,但实际上并未被插入过。但布隆过滤器不会出现漏报(False Negative),即如果布隆过滤器说元素不在集合中,则这个结论是绝对正确的。
- 由于哈希碰撞的存在,在实际应用中,随着更多元素被插入,相同哈希值对应的位会被多次置1,这就导致原本未出现过的元素经过哈希运算后也可能指向已经置1的位置,从而产生误报。不过,通过调整位数组大小、哈希函数的数量以及负载因子等参数,可以在误报率和存储空间之间取得平衡。
总之,布隆过滤器提供了一种空间效率极高但牺牲了精确性的解决方案,特别适合用于那些能够容忍一定误报率的大规模数据处理场景。
优点
- 布隆过滤器的主要优点是占用空间较小,查询速度快。
- 由于只需要位数组和哈希函数,不需要存储实际的元素,因此占用的空间相对较小。
- 同时,布隆过滤器查询一个元素的时间复杂度为O(k),与集合的大小无关,查询速度非常快。
缺点
- 由于哈希函数的关系,布隆过滤器一旦出现误判元素存在于集合中的情况,就无法修正,也无法直接删除其中的元素。
- 此外,布隆过滤器存在一定的误判率,即有一定的概率将不存在的元素误判为存在,这取决于位数组的大小和哈希函数的个数。
关于扩容
- 布隆过滤器步进无法直接删除,而且在布隆过滤器设计时,其容量是固定的,因此不支持直接扩容。
- 传统的布隆过滤器一旦创建,它的位数组大小就无法改变,这意味着如果需要处理的数据量超过了初始化时预设的容量,将导致误报率增加,且无法通过简单地增大位数组来解决这个问题
- 在实际应用中,为了应对数据增长的需求,可以采用以下策略来进行扩容
- 并行布隆过滤器:
- 可以维护多个独立的布隆过滤器,随着数据增长,当一个过滤器填满后,新加入的数据放入新的布隆过滤器中。
- 查询时,需要对所有布隆过滤器进行查询,只有当所有的过滤器都表明元素可能不存在时,才能确定元素肯定不在集合中。
- 可扩展布隆过滤器:
- 一些变种如 Scalable Bloom Filter 或 Dynamic Bloom Filter 允许添加额外的空间,并重新哈希已有数据到更大的位数组中,从而维持较低的误报率。
- 扩容过程通常涉及构造一个新的更大容量的布隆过滤器,然后迁移旧数据到新过滤器,并从这一刻起在新过滤器中插入新数据。
- 层次结构布隆过滤器: 创建一个多层的布隆过滤器结构,新数据首先被插入到最顶层(最小容量)的过滤器中,当某个层级的过滤器接近饱和时,再启用下一个容量更大的过滤器。
- 并行布隆过滤器:
其他使用场景
- 数据库索引优化:对于大型数据库,可以利用布隆过滤器作为辅助索引结构,提前过滤掉大部分肯定不在结果集中的查询条件,减轻主索引的压力。
- 推荐系统:在个性化推荐系统中,用于快速排除用户已经浏览过或者不感兴趣的内容。
- 垃圾邮件过滤:用于电子邮件系统的垃圾邮件地址库,快速判断收到的邮件是否可能来自已知的垃圾邮件发送者。
- 数据分析与挖掘:在大规模数据清洗阶段,用来剔除重复样本或无效数据。
- 实时监控与报警系统:当大量事件流经系统时,可以用于快速识别并过滤出已知异常事件,降低报警系统误报率。
- 重复数据检测:
- 在爬虫抓取网页或者日志分析中,用于URL去重,确保不会重复抓取相同的页面或记录。
- 在大数据处理中,比如在Hadoop等框架中,用来过滤掉重复的数据块或者记录,减少计算和存储负担。
- 网络安全: 网络防火墙和入侵检测系统中,用于过滤已知恶意IP或攻击特征。
- 社交网络和互联网服务:在社交网络中,用于快速检测用户上传的内容是否存在违规信息,或是检查用户ID、账号是否存在黑名单中。
SpringBoot 整合 布隆过滤器
-
依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- 引入Redisson的Spring Boot启动器 --><dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.16.2</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
-
配置
spring:datasource:username: xxpassword: xxxxxxdriver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://localhost:3306/smbms?useUnicode=true&characterEncoding=utf-8&serverTimezone=CTTcache:type: redisredis:database: 0port: 6379 # Redis服务器连接端口host: localhostpassword: 123456timeout: 5000 # 超时时间 mybatis:mapper-locations: classpath:mapper/*.xmlconfiguration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
-
工具类
package com.kgc.utils; import org.redisson.api.RBloomFilter; import org.redisson.api.RedissonClient; import org.springframework.stereotype.Component; import javax.annotation.Resource; /*** @author: zjl* @datetime: 2024/6/7* @desc: 复兴Java,我辈义不容辞*/ @Component public class BloomFilterUtil {@Resourceprivate RedissonClient redissonClient;/*** 创建布隆过滤器** @param filterName 过滤器名称* @param expectedInsertions 预测插入数量* @param falsePositiveRate 误判率*/public <T> RBloomFilter<T> create(String filterName, long expectedInsertions, double falsePositiveRate) {RBloomFilter<T> bloomFilter = redissonClient.getBloomFilter(filterName);bloomFilter.tryInit(expectedInsertions, falsePositiveRate);return bloomFilter;} }
-
业务示例
package com.kgc.service;import com.kgc.mapper.UserMapper; import com.kgc.pojo.User; import com.kgc.utils.BloomFilterUtil; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBloomFilter; import org.redisson.api.RedissonClient; import org.redisson.client.codec.StringCodec; import org.springframework.cache.annotation.CacheEvict; import org.springframework.cache.annotation.CachePut; import org.springframework.cache.annotation.Cacheable; import org.springframework.stereotype.Service;import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit;/*** @author: zjl* @datetime: 2024/6/7* @desc: 复兴Java,我辈义不容辞*/ @Service @Slf4j public class UserService {// 预期插入数量static long expectedInsertions = 200L;// 误判率static double falseProbability = 0.01;// 非法请求所返回的JSONstatic String illegalJson = "[{\"id\":0,\"userName\":\"null\",\"userCode\":null,\"userRole\":0}]";private RBloomFilter<Long> bloomFilter = null;@Resourceprivate BloomFilterUtil bloomFilterUtil;@Resourceprivate RedissonClient redissonClient;@Resourceprivate UserMapper userMapper;@PostConstruct // 项目启动的时候执行该方法,也可以理解为在spring容器初始化的时候执行该方法public void init() {// 启动项目时初始化bloomFilterList<User> userList = userMapper.selectUserList(null,0L);bloomFilter = bloomFilterUtil.create("idWhiteList", expectedInsertions, falseProbability);for (User user : userList) {bloomFilter.add(user.getId());}}@Cacheable(cacheNames = "user", key = "#id", unless = "#result==null")public User findById(Long id) {// bloomFilter中不存在该key,为非法访问if (!bloomFilter.contains(id)) {log.info("所要查询的数据既不在缓存中,也不在数据库中,为非法key");/*** 设置unless = "#result==null"并在非法访问的时候返回null的目的是不将该次查询返回的null使用* RedissonConfig-->RedisCacheManager-->RedisCacheConfiguration-->entryTtl设置的过期时间存入缓存。** 因为那段时间太长了,在那段时间内可能该非法key又添加到bloomFilter,比如之前不存在id为1234567的用户,* 在那段时间可能刚好id为1234567的用户完成注册,使该key成为合法key。** 所以我们需要在缓存中添加一个可容忍的短期过期的null或者是其它自定义的值,使得短时间内直接读取缓存中的该值。** 因为Spring Cache本身无法缓存null,因此选择设置为一个其中所有值均为null的JSON,*/redissonClient.getBucket("user::" + id, new StringCodec()).set(illegalJson, new Random().nextInt(200) + 300, TimeUnit.SECONDS);return null;}// 不是非法访问,可以访问数据库log.info("数据库中得到数据.......");return userMapper.selectById(id);}// 先执行方法体中的代码,成功执行之后删除缓存@CacheEvict(cacheNames = "user", key = "#id")public boolean delete(Long id) {// 删除数据库中具有的数据,就算此key从此之后不再出现,也不能从布隆过滤器删除return userMapper.deleteById(id) == 1;}// 如果缓存中先前存在,则更新缓存;如果不存在,则将方法的返回值存入缓存@CachePut(cacheNames = "user", key = "#user.id")public User update(User user) {userMapper.updateById(user);// 新生成key的加入布隆过滤器,此key从此合法,因为该更新方法并不更新id,所以也不会产生新的合法的keybloomFilter.add(user.getId());return user;}@CachePut(cacheNames = "user", key = "#user.id")public User insert(User user) {userMapper.insert(user);// 新生成key的加入布隆过滤器,此key从此合法bloomFilter.add(user.getId());return user;} }
-
省略实体类、mapper、测试
缓存击穿
- Redis缓存击穿是指在使用Redis作为缓存服务时,当一个缓存键失效时,大量的请求同时涌入,导致数据库负载激增,造成性能下降甚至崩溃的情况。
产生原因
-
热点数据失效:当一个热点数据的缓存键失效时,大量的请求会同时请求该数据,导致数据库负载过高。
-
频繁的缓存失效:如果某个应用频繁地对某个缓存键进行更新或者删除操作,那么每次失效后都会触发大量的请求同时请求该数据。
-
缓存击穿攻击:恶意用户故意请求不存在的缓存键,造成大量的请求同时涌入。
解决方案
1.设置热点数据永不过期
- 设置热点数据永不过期:对于一些热点数据,可以设置其缓存键永不过期,转为逻辑过期,以避免缓存失效时大量请求涌入。
优点
- 逻辑过期的优点是可以减少缓存的更新次数,避免在没有必要的情况下过多地读取后端数据源,并且在数据本身有频繁更新的情况下可以避免缓存数据过时;
缺点
- 逻辑过期的缺点是在某些极端情况下会出现缓存为空的情况,如果此时恰巧有大量请求同时访问缓存,则可能导致缓存击穿,
- 并且无法避免大量的并发请求直接落到后端,并且实现起来也是比较复杂和数据无法保证一致性(因为可能返回旧数据)。
示例
@Data
public class RedisData {private LocalDateTime expireTime;private Object data;
}
/*** 给热点key缓存预热* @param id* @param expireSeconds*/
private void saveShop2Redis(Long id, Long expireSeconds) {// 1.查询店铺数据Shop shop = getById(id);// 2.封装逻辑过期时间RedisData redisData = new RedisData();redisData.setData(shop);redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));// 3.写入RedisstringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));
}
/*** 缓存击穿(逻辑过期)功能封装* @param id* @return*/
public Shop queryWithLogicalExpire(Long id) {String key = CACHE_SHOP_KEY + id;//1. 从Redis中查询商铺缓存String shopJson = stringRedisTemplate.opsForValue().get(key);//2. 判断是否存在if(StrUtil.isBlank(shopJson)){//3. 不存在,直接返回(这里做的事热点key预热,所以已经假定热点key已经在缓存中)return null;}//4. 存在,需要判断过期时间,需要先把json反序列化为对象RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);LocalDateTime expireTime = redisData.getExpireTime();//5. 判断是否过期if(expireTime.isAfter(LocalDateTime.now())) {//5.1 未过期,直接返回店铺信息return shop;}//5.2 已过期,需要缓存重建//6. 缓存重建//6.1 获取互斥锁String lockKey = LOCK_SHOP_KEY + id;//6.2 判断是否获取锁成功boolean isLock = tryLock(lockKey);if(isLock) {// 二次验证是否过期,防止多线程下出现缓存重建多次String shopJson2 = stringRedisTemplate.opsForValue().get(key);// 这里假定key存在,所以不做存在校验// 存在,需要判断过期时间,需要先把json反序列化为对象RedisData redisData2 = JSONUtil.toBean(shopJson2, RedisData.class);Shop shop2 = JSONUtil.toBean((JSONObject) redisData2.getData(), Shop.class);LocalDateTime expireTime2 = redisData2.getExpireTime();if(expireTime2.isAfter(LocalDateTime.now())) {// 未过期,直接返回店铺信息return shop2;}//6.3 成功,开启独立线程,实现缓存重建CACHE_REBUILD_EXECUTOR.submit(() -> {try {// 重建缓存,这里设置的值小一点,方便观察程序执行效果,实际开发应该设为30minthis.saveShop2Redis(id, 20L);} catch (Exception e) {throw new RuntimeException(e);} finally {// 释放锁unLock(lockKey);}});}//7. 返回return shop;
}
测试
@SpringBootTest
class HmDianPingApplicationTests {@Resourceprivate ShopServiceImpl shopService;@Testvoid testSaveShop() throws InterruptedException {shopService.saveShop2Redis(1L, 10L);}
}
2.使用互斥锁
- 使用互斥锁:在缓存失效时,可以通过加锁的方式只允许一个请求去更新缓存,其他请求等待直到缓存更新完成。
优点
- 互斥锁的优点是它可以确保只有一个线程在访问缓存内容,并且在缓存中没有命中时,只会读取一次后端数据库(或其他数据源),
- 其余线程会等待读取完毕后再次读取缓存,这避免了大量的并发请求直接落到后端,从而减少了并发压力,保证系统的稳定性,并且可以保证数据的一致性;
缺点
- 互斥锁的缺点是会增加单个请求的响应时间,因为只有一个线程能够读取缓存值,其他线程则需要等待,这可能会在高并发场景下导致线程池饱和。
示例
/*** 获取锁* @param key* @return
*/
private boolean tryLock(String key) {// setnx 就是 setIfAbsent 如果存在Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.MINUTES);// 装箱是将值类型装换成引用类型的过程;拆箱就是将引用类型转换成值类型的过程// 不要直接返回flag,可能为nullreturn BooleanUtil.isTrue(flag);
}/*** 释放锁* @param key*/
private void unLock(String key) {stringRedisTemplate.delete(key);
}
/*** 互斥锁解决缓存击穿 queryWithMutex()* @param id* @return*/
public Shop queryWithMutex(Long id) {// 1.从redis查询商铺缓存String key = CACHE_SHOP_KEY + id;String shopJson = stringRedisTemplate.opsForValue().get(key);// 2.判断是否存在if (StrUtil.isNotBlank(shopJson)) {return JSONUtil.toBean(shopJson, Shop.class);}// 判断空值if (shopJson != null) {// 返回一个错误信息return null;}String lockKey = "lock:shop:" + id;Shop shop = null;try {// 4.实现缓存重建// 4.1获取互斥锁boolean isLock = tryLock(lockKey);// 4.2判断是否成功if (!isLock) {// 4.3失败,则休眠并重试Thread.sleep(50);// 递归return queryWithMutex(id);}// 4.4成功,根据id查询数据库shop = getById(id);// 模拟延迟Thread.sleep(200);// 5.不存在,返回错误if (shop == null) {stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);return null;}// 6.存在,写入redisstringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL,TimeUnit.MINUTES);} catch (InterruptedException ex) {throw new RuntimeException(ex);} finally {// 7.释放锁unLock(lockKey);}// 8.返回return shop;
}
合并
/*** 缓存击穿和缓存穿透功能合并封装* @param id* @return*/
public Shop queryWithMutex(Long id) {String key = CACHE_SHOP_KEY + id;//1. 从Redis中查询商铺缓存String shopJson = stringRedisTemplate.opsForValue().get(key);//2. 判断是否存在if(StrUtil.isNotBlank(shopJson)){//3. 存在,直接返回return JSONUtil.toBean(shopJson, Shop.class);}// 这里要先判断命中的是否是null,因为是null的话也是被上面逻辑判断为不存在// 这里要做缓存穿透处理,所以要对null多做一次判断,如果命中的是null则shopJson为""if("".equals(shopJson)){return null;}//4. 实现缓存重建//4.1 获取互斥锁String lockKey = LOCK_SHOP_KEY + id;Shop shop = null;try {boolean isLock = tryLock(lockKey);//4.2 判断获取是否成功if(!isLock) {//4.3 失败,则休眠并重试Thread.sleep(50);// 递归重试return queryWithMutex(id);}//4.4 成功,根据id查询数据库shop = getById(id);if(shop == null) {//5. 不存在,将null写入redis,以便下次继续查询缓存时,如果还是查询空值可以直接返回false信息stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);return null;}//6. 存在,写入RedisstringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {//7. 释放互斥锁unLock(lockKey);}//8. 返回return shop;
}
3. 使用分布式缓存
- 使用分布式缓存:将缓存分散到多个Redis实例中,以减轻单个Redis实例的负载压力。
4. 设置随机过期时间
- 设置随机过期时间:在设置缓存键的过期时间时,可以增加一个随机的过期时间,以避免大量的缓存同时失效。
缓存雪崩
- Redis缓存雪崩是指在使用Redis作为缓存服务时,当缓存的大量键同时失效或者Redis服务发生故障时,导致大量请求直接访问数据库,造成数据库负载过高,甚至导致数据库崩溃的情况。
产生原因
-
缓存键过期时间一致:如果大量的缓存键设置了相同的过期时间,同时失效,那么所有请求都会直接访问数据库,导致数据库压力过大。
-
Redis服务宕机:当Redis服务发生故障无法提供缓存服务时,所有的请求都会直接访问数据库,造成数据库负载过高。
-
大规模数据更新:在进行大规模数据更新时,如果更新操作直接修改数据库,而不是通过更新缓存,那么所有请求都会直接访问数据库,导致数据库负载过高。
解决方案
对于大量的缓存key同时失效
- 给不同的Key的TTL添加随机值,比如将缓存失效时间分散开,可以在原有的失效时间基础上增加一个随机值
比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。
对于redis服务宕机
- 利用Redis集群提高服务的可用性,比如哨兵模式、集群模式;
- 给缓存业务添加降级限流策略,比如可以在ngxin或spring cloud gateway中处理;
- 降级也可以做为系统的保底策略,适用于穿透、击穿、雪崩
- 给业务添加多级缓存,比如使用Guava或Caffeine作为一级缓存,redis作为二级缓存等;