1、布隆过滤
1.1、简介
由一个初值都为零的bit数组和多个哈希函数构成,可以用来快速判断集合中是否存在某个元素,减少占用内存,不保存数据信息,只是在内存中做出一个标记。
它实际上是一个很长的二进制数组(00000000)+一系列随机hash算法映射函数,主要用于判断一个元素是否在集合中。通常我们会遇到很多要判断一个元素是否在某个集合中的业务场景,一般想到的是将集合中所有元素保存起来,然后通过比较确定。
使用布隆过滤的话一个元素如果判断结果:存在时,元素不一定存在,但是判断结果为不存在时,则一定不存在。布隆过滤器可以添加元素,但是不能删除元素,由于涉及hashcode判断依据,删掉元素会导致误判率增加。
布隆过滤器(Bloom Filter) 是一种专门用来解决去重问题的高级数据结构。实质就是一个大型*位数组*和几个不同的无偏hash函数(无偏表示分布均匀)。由一个初值都为零的bit数组和多个个哈希函数构成,用来快速判断某个数据是否存在。但是跟 HyperLogLog 一样,它也一样有那么一点点不精确,也存在一定的误判概率
1.2、使用特点
添加key时使用多个hash函数对key进行hash运算得到一个整数索引值,对位数组长度进行取模运算得到一个位置,每个hash函数都会得到一个不同的位置,将这几个位置都置1就完成了add操作。
当有变量被加入集合时,通过N个映射函数将这个变量映射成位图中的N个点,把它们置为 1(假定有两个变量都通过 3 个映射函数)。
查询key时只要有其中一位是零就表示这个key不存在,但如果都是1,则不一定存在对应的key。如果这些点,有任何一个为零则被查询变量一定不在,如果都是 1,则被查询变量很可能存在,为什么说是可能存在,而不是一定存在呢?那是因为映射函数本身就是散列函数,散列函数是会有碰撞的。(见上图3号坑两个对象都1)
此外还有可能出现的误差是因为,哈希函数的概念是:将任意大小的输入数据转换成特定大小的输出数据的函数,转换后的数据称为哈希值或哈希编码,也叫散列值。
如果两个散列值是不相同的(根据同一函数)那么这两个散列值的原始输入也是不相同的。
这个特性是散列函数具有确定性的结果,具有这种性质的散列函数称为单向散列函数。
散列函数的输入和输出不是唯一对应关系的,如果两个散列值相同,两个输入值很可能是相同的,但也可能不同,这种情况称为“散列碰撞(collision)”。用 hash表存储大数据量时,空间效率还是很低,当只有一个 hash 函数时,还很容易发生哈希碰撞。
1.3、使用步骤
1.3.1、初始化bitmap
布隆过滤器 本质上 是由长度为 m 的位向量或位列表(仅包含 0 或 1 位值的列表)组成,最初所有的值均设置为 0。
1.3.2、添加占坑位
当我们向布隆过滤器中添加数据时,为了尽量地址不冲突,会使用多个 hash 函数对 key 进行运算,算得一个下标索引值,然后对位数组长度进行取模运算得到一个位置,每个 hash 函数都会算得一个不同的位置。再把位数组的这几个位置都置为 1 就完成了 add 操作。
例如,我们添加一个字符串wmyskxz,对字符串进行多次hash(key) → 取模运行→ 得到坑位
1.3.3、判断是否存在
向布隆过滤器查询某个key是否存在时,先把这个 key 通过相同的多个 hash 函数进行运算,查看对应的位置是否都为 1,只要有一个位为零,那么说明布隆过滤器中这个 key 不存在;如果这几个位置全都是 1,那么说明极有可能存在;因为这些位置的 1 可能是因为其他的 key 存在导致的,也就是前面说过的hash冲突。。。。。
就比如我们在 add 了字符串wmyskxz数据之后,很明显下面1/3/5 这几个位置的 1 是因为第一次添加的 wmyskxz 而导致的;此时我们查询一个没添加过的不存在的字符串inexistent-key,它有可能计算后坑位也是1/3/5 ,这就是误判了
2、缓存问题
2.1、缓存预热
Redis缓存预热是指在服务器启动或应用程序启动之前,将一些数据先存储到Redis中,以提高Redis的性能和数据一致性。这可以减少服务器在启动或应用程序启动时的数据传输量和延迟,从而提高应用程序的性能和可靠性。 (1)数据准备
在应用程序启动或服务器启动之前,准备一些数据,这些数据可以是静态数据、缓存数据或其他需要预热的数据。
(2)数据存储
将数据存储到Redis中,可以使用Redis的列表(List)数据类型或集合(Set)数据类型。
(3)数据预热
在服务器启动或应用程序启动之前,将数据存储到Redis中。可以使用Redis的客户端工具或命令行工具来执行此操作。
(4)数据清洗
在服务器启动或应用程序启动之后,可能会对存储在Redis中的数据进行清洗和处理。例如,可以删除过期的数据、修改错误的数据等。
需要注意的是,Redis缓存预热可能会增加服务器的开销,因此应该在必要时进行。同时,为了减少预热的次数,可以考虑使用Redis的其他数据类型,如哈希表(Hash)或有序集合(Sorted Set)。此外,为了提高数据一致性和性能,可以使用Redis的持久化功能,将数据存储到Redis中,并在服务器重启后自动恢复数据。
2.2、缓存雪崩
缓存雪崩是指在同一时间大量的缓存key同时失效或者Redis的服务器宕机,然后大量的请求到达数据库,然后带来很大的压力。
2.2.1、解决方法
2.2.1.1、 设置有效期均匀分布
将缓存的有效期设置为均匀分布,避免大量缓存同时失效。大概的意思就是在缓存数据的时候不要将缓存的数据存在Redis中的设置的过期是时间统一,应该让这些缓存的数据的过期时间分散开来,这样就不会导致在同一时间有大量的key过期。
2.2.1.2、 数据预热
在系统上线前,将可能会访问的数据预先加载到缓存中,避免缓存冷启动,这个做法还是有必要的,因为这样可以避免大量访问请求没有命中Redis后去访问数据库。
2.2.1.3、 保证Redis服务高可用
保证Redis服务的高可用性,避免Redis宕机的情况,这个方式也很好理解,Redis可以配置哨兵,用来监控Master主机的是否可以正常工作,如果宕机选出新的主机,然后如果是想要提高读写性能的话也可以配置成集群的模式,多台主机可以执行写操作,即使某一台主机挂掉然后还有其他的主机可以执行。
2.2.1.4、多级缓存
2.2.1.5、服务降级
2.3、缓存穿透
首先说一下缓存穿透,简单的讲就是请求的数据首先在Redis中查询没有找到,然后请求数据库去访问数据,但是在数据库中也没有找到,但是用户知道自己查询的数据不存在,所以此后的所有的请求的压力都会到数据库中, 但是事实上请求也没有预期的结果。
2.3.1、解决方法
2.3.1.1、 业务层校验
在缓存中没有命中的情况下,可以在业务层进行校验,如果校验不通过,直接返回错误信息,避免继续访问数据库。
还有一种做法就是,没有命中Redis和数据库这时候后就可以使用null来应对这种情况,大概的思路就是当第一次访问数据库时如果没有数据的话,然后将null存入Redis, 这样就可以解决下一次数据命中的是Redis而不是数据库,然后判断Redis命中如果是空值的话就要返回错误信息,但是对于这种的空值的设置的话,可以设置超时时间自动清除,不建议设置的时间过长, 这样会导致数据的不一致的时间过长,如果数据库中的数据更新但是Redis中的数据还是null,导致请求的参数无法获取正确值,这种方式的优点就是简单方便操作,但是这样会额外消耗内存以及数据不一致(所以要设置过期时间短一点) 。
还有对于一些访问数据的格式做出校验,不要让别人直接可以猜到数据然后直接大量的请求进行访问。此外是对于用户的权限验证。没有权限就直接拦截不让他访问。
2.3.1.2、 布隆过滤器
布隆过滤器是一种数据结构,可以用于判断一个元素是否在一个集合中。在缓存中没有命中的情况下,可以使用布隆过滤器判断该数据是否存在,如果不存在,直接返回错误信息。
#初始化
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
/*** @auther zzyy* @create 2022-12-27 14:55* 布隆过滤器白名单初始化工具类,一开始就设置一部分数据为白名单所有,* 白名单业务默认规定:布隆过滤器有,redis也有。*/
@Component
@Slf4j
public class BloomFilterInit
{@Resourceprivate RedisTemplate redisTemplate;
@PostConstruct//初始化白名单数据,故意差异化数据演示效果......public void init(){//白名单客户预加载到布隆过滤器String uid = "customer:12";//1 计算hashcode,由于可能有负数,直接取绝对值int hashValue = Math.abs(uid.hashCode());//2 通过hashValue和2的32次方取余后,获得对应的下标坑位long index = (long) (hashValue % Math.pow(2, 32));log.info(uid+" 对应------坑位index:{}",index);//3 设置redis里面bitmap对应坑位,该有值设置为1redisTemplate.opsForValue().setBit("whitelistCustomer",index,true);}
}
#utils
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/*** @auther zzyy* @create 2022-12-27 14:56*/
@Component
@Slf4j
public class CheckUtils
{@Resourceprivate RedisTemplate redisTemplate;
public boolean checkWithBloomFilter(String checkItem,String key){int hashValue = Math.abs(key.hashCode());long index = (long) (hashValue % Math.pow(2, 32));boolean existOK = redisTemplate.opsForValue().getBit(checkItem, index);log.info("----->key:"+key+"\t对应坑位index:"+index+"\t是否存在:"+existOK);return existOK;}
}
#controller
import com.atguigu.redis7.entities.Customer;
import com.atguigu.redis7.service.CustomerSerivce;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Random;
import java.util.Date;
import java.util.concurrent.ExecutionException;
/*** @auther zzyy* @create 2022-07-23 13:55*/
@Api(tags = "客户Customer接口+布隆过滤器讲解")
@RestController
@Slf4j
public class CustomerController
{@Resource private CustomerSerivce customerSerivce;
@ApiOperation("数据库初始化2条Customer数据")@RequestMapping(value = "/customer/add", method = RequestMethod.POST)public void addCustomer() {for (int i = 0; i < 2; i++) {Customer customer = new Customer();
customer.setCname("customer"+i);customer.setAge(new Random().nextInt(30)+1);customer.setPhone("1381111xxxx");customer.setSex((byte) new Random().nextInt(2));customer.setBirth(Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()));
customerSerivce.addCustomer(customer);}}
@ApiOperation("单个用户查询,按customerid查用户信息")@RequestMapping(value = "/customer/{id}", method = RequestMethod.GET)public Customer findCustomerById(@PathVariable int id) {return customerSerivce.findCustomerById(id);}
@ApiOperation("BloomFilter案例讲解")@RequestMapping(value = "/customerbloomfilter/{id}", method = RequestMethod.GET)public Customer findCustomerByIdWithBloomFilter(@PathVariable int id) throws ExecutionException, InterruptedException{return customerSerivce.findCustomerByIdWithBloomFilter(id);}
}
#service
import com.atguigu.redis7.entities.Customer;
import com.atguigu.redis7.mapper.CustomerMapper;
import com.atguigu.redis7.utils.CheckUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/*** @auther zzyy* @create 2022-07-23 13:55*/
@Service
@Slf4j
public class CustomerSerivce
{public static final String CACHE_KEY_CUSTOMER = "customer:";
@Resourceprivate CustomerMapper customerMapper;@Resourceprivate RedisTemplate redisTemplate;
@Resourceprivate CheckUtils checkUtils;
public void addCustomer(Customer customer){int i = customerMapper.insertSelective(customer);
if(i > 0){//到数据库里面,重新捞出新数据出来,做缓存customer=customerMapper.selectByPrimaryKey(customer.getId());//缓存keyString key=CACHE_KEY_CUSTOMER+customer.getId();//往mysql里面插入成功随后再从mysql查询出来,再插入redisredisTemplate.opsForValue().set(key,customer);}}
public Customer findCustomerById(Integer customerId){Customer customer = null;
//缓存key的名称String key=CACHE_KEY_CUSTOMER+customerId;
//1 查询rediscustomer = (Customer) redisTemplate.opsForValue().get(key);
//redis无,进一步查询mysqlif(customer==null){//2 从mysql查出来customercustomer=customerMapper.selectByPrimaryKey(customerId);// mysql有,redis无if (customer != null) {//3 把mysql捞到的数据写入redis,方便下次查询能redis命中。redisTemplate.opsForValue().set(key,customer);}}return customer;}
/*** BloomFilter → redis → mysql* 白名单:whitelistCustomer* @param customerId* @return*/
@Resourceprivate CheckUtils checkUtils;public Customer findCustomerByIdWithBloomFilter (Integer customerId){Customer customer = null;
//缓存key的名称String key = CACHE_KEY_CUSTOMER + customerId;
//布隆过滤器check,无是绝对无,有是可能有//===============================================if(!checkUtils.checkWithBloomFilter("whitelistCustomer",key)){log.info("白名单无此顾客信息:{}",key);return null;}//===============================================
//1 查询rediscustomer = (Customer) redisTemplate.opsForValue().get(key);//redis无,进一步查询mysqlif (customer == null) {//2 从mysql查出来customercustomer = customerMapper.selectByPrimaryKey(customerId);// mysql有,redis无if (customer != null) {//3 把mysql捞到的数据写入redis,方便下次查询能redis命中。redisTemplate.opsForValue().set(key, customer);}}return customer;}
}
比如设置白名单,或者每日签到,再者是打卡等等,在数据存储的时候可以将该数据的key使用hash函数获取索引,然后结合bitmap将该索引的位设置成为true(1),然后在数据读取的时候,首先在布隆过滤器中拦截查询的key,并且计算出这个key的索引值,然后读取redis中的bitmap结构中该索引位置的数据是true还是false,如果为true说明可能存在这个key所对应的数据,但是如果为false则说明这个key一定不存在,直接拦截!
2.3.1.3、Guava布隆过滤器
import com.atguigu.redis7.service.GuavaBloomFilterService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/*** @auther zzyy* @create 2022-12-30 16:50*/
@Api(tags = "google工具Guava处理布隆过滤器")
@RestController
@Slf4j
public class GuavaBloomFilterController
{@Resourceprivate GuavaBloomFilterService guavaBloomFilterService;
@ApiOperation("guava布隆过滤器插入100万样本数据并额外10W测试是否存在")@RequestMapping(value = "/guavafilter",method = RequestMethod.GET)public void guavaBloomFilter(){guavaBloomFilterService.guavaBloomFilter();}
}
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
/*** @auther zzyy* @create 2022-12-30 16:50*/
@Service
@Slf4j
public class GuavaBloomFilterService{public static final int _1W = 10000;//布隆过滤器里预计要插入多少数据public static int size = 100 * _1W;//误判率,它越小误判的个数也就越少(思考,是不是可以设置的无限小,没有误判岂不更好)//fpp the desired false positive probabilitypublic static double fpp = 0.03;// 构建布隆过滤器private static BloomFilter<Integer> bloomFilter = BloomFilter.create(Funnels.integerFunnel(), size,fpp);public void guavaBloomFilter(){//1 先往布隆过滤器里面插入100万的样本数据for (int i = 1; i <=size; i++) {bloomFilter.put(i);}//故意取10万个不在过滤器里的值,看看有多少个会被认为在过滤器里List<Integer> list = new ArrayList<>(10 * _1W);for (int i = size+1; i <= size + (10 *_1W); i++) {if (bloomFilter.mightContain(i)) {log.info("被误判了:{}",i);list.add(i);}}log.info("误判的总数量::{}",list.size());}
}
2.4、缓存击穿
缓存击穿问题也叫做热点key问题,就是一个高并发访问并且缓存重建业务比较复杂的key突然失效啦,无数的请求访问会在瞬间给数据库带来巨大的冲击。
2.4.1、解决方法
2.4.1.1、设置热点数据永不过期
将热点数据设置为永不过期,这样可以避免热点数据失效的情况,既然你是热点key而且我还害怕你过期,奶奶的给你设置为永不过期!!!
2.4.1.2、差异失效时间
这种机制的话可以设置一个缓存的过期时间的不一致性,就是可以实现有时间的间隔来执行缓存更新数据,即使在某一时刻a缓存过期了还有b缓存用于命中旧的数据!
2.4.1.3、互斥锁
在热点数据失效的情况下,可以使用互斥锁,保证只有一个线程去访问数据库,其他线程等待。相较于原来从缓存中查询不到数据后直接查询数据库而言,现在的方案是 进行查询之后,如果从缓存没有查询到数据,则进行互斥锁的获取,获取互斥锁后,判断是否获得到了锁,如果没有获得到,则休眠,过一会再进行尝试,直到获取到锁为止,才能进行查询
如果获取到了锁的线程,再去进行查询,查询后将数据写入redis,再释放锁,返回数据,利用互斥锁就能保证只有一个线程去执行操作数据库的逻辑,防止缓存击穿
2.4.1.4、逻辑过期
当用户开始查询redis时,判断是否命中,如果没有命中则直接返回空数据,不查询数据库,而一旦命中后,将value取出,判断value中的过期时间是否满足,如果没有过期,则直接返回redis中的数据,如果过期,则在开启独立线程后直接返回之前的数据,独立线程去重构数据,重构完成后释放互斥锁。
3、Redis锁
在单机的系统中同一个JVM虚拟机内可以使用synchronized或者lock接口来实现锁的功能,但是对于分布式的功能来讲,里面有者不同的JVM虚拟机,这就到导致之前的在JVM内部的锁的无法使用,所以我们要能够有一种分布式的锁来完成这种功能。
那么要成为一个靠谱的分布式的锁需要具备那些的前提条件:
-
独占性 任何时刻有且只有一个线程所持有
-
高可用 在redis集群的环境下,不能出现某一个节点挂掉而出现获取锁和释放锁失败的情况,在高并发的情况下依然耐造
-
防死锁 杜绝死锁,必须有超时控制机制或者撤销机制,有一个兜底终止跳出的方案
-
不乱抢 自己的锁自己释放,不能二话不说释放别人的锁
-
重入性 同一个节点的同一个线程如果获取锁之后,它可以再次获取这个锁
3.1、使用单机的redis锁
public String sale(){String retMessage = "";lock.lock();try{//1 查询库存信息String result = stringRedisTemplate.opsForValue().get("inventory001");//2 判断库存是否足够Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);//3 扣减库存if(inventoryNumber > 0) {stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventoryNumber));retMessage = "成功卖出一个商品,库存剩余: "+inventoryNumber;System.out.println(retMessage);}else{retMessage = "商品卖完了,o(╥﹏╥)o";}}finally {lock.unlock();}return retMessage+"\t"+"服务端口号:"+port;}
使用nginx配置负载均衡(暂时没有学习,先使用后补课)
重新加载配置文件后启动服务器,然后使用jmeter进行高并发的测试。创建线程组,以及多少线程和持续时间,然后配置http请求,启动后看测式结果。我们再redsi中存储100个数据,然后两台服务器按道理说最后执行完毕刚好全部卖完,但是事实上redis中还剩余11个,另外在控制台中可以看出有卖出重复票的情况。
那么使用单机锁来应对分布式高并发的项目来讲,就会出现锁不管用的情况。在单机环境下,可以使用synchronized或Lock来实现。
但是在分布式系统中,因为竞争的线程可能不在同一个节点上(同一个jvm中),所以需要一个让所有进程都能访问到的锁来实现(比如redis或者zookeeper来构建)。不同进程jvm层面的锁就不管用了,那么可以利用第三方的一个组件,来获取锁,未获取到锁,则阻塞当前想要运行的线程。