redis相关面试题
redis支持哪几种数据形式?
String,hash,set,zset,list
redis主要消费什么物理资源?
内存,key-value的形式, redis 具有快速和数据持久化的特征,如果不将数据放在内存中,磁盘 I/O 速度为严重影响 redis 的性能。
redis有哪几种策略模式
lru 淘汰掉使用最少的key,保证新数据的插入
random 随机淘汰掉key,保证新数据的插入
ttl 回收过期集合的健,并且回收存活时间较短的健,保证新数据插入
一个字符串类型的值能存储最大容量是多少?
512M
redis 如何做内存优化?
尽量使用hashes 散列表进行存储,如一个用户的相关信息不需要给每个设置单独的key,把相关信息放到一张hashes中
redis 缓存穿透?缓存雪崩?如何避免
缓存穿透:短时间内请求key不存在与redis中,直接去请求db造成数据库的压力比较大。
避免:对缓存中请求状态为空的时候也进行缓存,数据insert的时候去更新redis
缓存雪崩:当缓存集中在同一个时间段重启,或者很多key在同一个时间内失效会给系统造成很大的压力
避免:做二级缓存(A1为原始缓存、A2为拷贝缓存。A1失效的时候可以访问A2,A1设置为短期、A2设置为长期)、对key设置不同的失效时间,让失效时间均匀分布、在缓存失效的时候控制去访问db的线程数量
1.redis集群模式
redis单机版,出现单机故障后,导致redis无法使用,如果程序使用redis,间接导致程序出错。
redis的集群模式:
主从复制模式 一主多从模式。一个主节点,多个从节点,那么主节点可以负责:读操作,写操作。 从节点只能负责读操作,不能负责写操作。 这样就可以把读的压力从主节点分摊到从节点,以减少主节点的压力。当主节点执行完写命令,会把数据同步到从节点。
哨兵模式 由于主从模式,主节点单机后,从节点不会自动上位。 增加一个哨兵服务,该哨兵时刻监控master,如果master挂了,哨兵会在从节点中选举一位为主节点【哨兵投票机制】
集群化模式 不管上面的主从还是哨兵模式,都无法解决单节点写操作的问题。如果这时写操作的并发比较高。这是可以实验集群化模式【去中心化模式】
相关文档:https://blog.csdn.net/Ysuhang/article/details/126115275
分布式锁相关特性
互斥性:任意时刻,只能有一个客户端持有锁
锁超时释放:持有锁超时,可以释放,防止死锁
可重入性:一个线程获取了锁之后,可以再次对其请求加锁
高可用、高性能:加锁和解锁开销要尽可能低,同时保证高可用
安全性:锁只能被持有该锁的服务(或应用)释放。
容错性:在持有锁的服务崩溃时,锁仍能得到释放,避免死锁。
redis 事项分布式锁
Redis实现分布式锁
再回到上述获取库存的实例,使用Redis实现并发访问安全。
1)基本方案:Redis提供了setXX指令来实现分布式锁
SETNX
格式: setnx key value
将key 的值设为value ,当且仅当key不存在。
若给定的 key已经存在,则SETNX不做任何动作。
设置分布式锁后,能保证并发安全,但上述代码还存在问题,如果执行过程中出现异常,程序就直接抛出异常退出,导致锁没有释放造成最终死锁的问题。(即使将锁放在finally中释放,但是假如是执行到中途系统宕机,锁还是没有被成功的释放掉,依然会出现死锁现象)
2)方案改进:可以给锁设置一个超时时间,到时自动释放锁(锁的过期时间大于业务执行时间)
上述两行代码中,由于加锁和设置锁过期时间不是原子的,可能加锁完就宕机了,那死锁依然存在,所以需要保证两指令执行的原子性
连起来一起写可以原子执行。
3)改进三:再看看是否还有问题。假设有多个线程,锁的过期时间10s,线程1上锁后执行业务逻辑的时长超过十秒,锁到期释放锁,线程2就可以获得锁执行,此时线程1执行完删除锁,删除的就是线程2持有的锁,线程3又可以获取锁,线程2执行完删除锁,删除的是线程3的锁,如此往后,这样就会出问题。
解决办法就是让线程只能删除自己的锁,即给每个线程上的锁添加唯一标识(这里UUID实现,基本不会出现重复),删除锁时判断这个标识:
但上述红框中由于判定和释放锁不是原子的,极端情况下,可能判定可以释放锁,在执行删除锁操作前刚好时间到了,其他线程获取锁执行,前者线程删除锁删除的依然是别的线程的锁,所以要让删除锁具有原子性,可以利用redis事务或lua脚本实现原子操作判断+删除
//redis事务或lua脚本(lua脚本的执行是原子的),如下@RequestMapping(" /deduct_stock")public String deductStock() {String REDIS_LOCK = "good_lock";// 每个人进来先要进行加锁,key值为"good_lock"String value = UUID.randomUUID().toString().replace("-","");try{// 为key加一个过期时间Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS);// 加锁失败if(!flag){return "抢锁失败!";}System.out.println( value+ " 抢锁成功");String result = template.opsForValue().get("goods:001");int total = result == null ? 0 : Integer.parseInt(result);if (total > 0) {// 如果在此处需要调用其他微服务,处理时间较长。。。int realTotal = total - 1;template.opsForValue().set("goods:001", String.valueOf(realTotal));System.out.println("购买商品成功,库存还剩:" + realTotal + "件, 服务端口为8002");return "购买商品成功,库存还剩:" + realTotal + "件, 服务端口为8002";} else {System.out.println("购买商品失败,服务端口为8002");}return "购买商品失败,服务端口为8002";}finally {// 谁加的锁,谁才能删除// 也可以使用redis事务// https://redis.io/commands/set// 使用Lua脚本,进行锁的删除Jedis jedis = null;try{jedis = RedisUtils.getJedis();String script = "if redis.call('get',KEYS[1]) == ARGV[1] " +"then " +"return redis.call('del',KEYS[1]) " +"else " +" return 0 " +"end";Object eval = jedis.eval(script, Collections.singletonList(REDIS_LOCK), Collections.singletonList(value));if("1".equals(eval.toString())){System.out.println("-----del redis lock ok....");}else{System.out.println("-----del redis lock error ....");}}catch (Exception e){}finally {if(null != jedis){jedis.close();}}// redis事务
// while(true){
// template.watch(REDIS_LOCK);
// if(template.opsForValue().get(REDIS_LOCK).equalsIgnoreCase(value)){
// template.setEnableTransactionSupport(true);
// template.multi();
// template.delete(REDIS_LOCK);
// List<Object> list = template.exec();
// if(list == null){
// continue;
// }
// }
// template.unwatch();
// break;
// }}}
}
4)当然,也有不错的框架解决该问题,如Redission,Redisson是redis官网推荐实现分布式锁的一个第三方类库,通过开启另一个服务,后台进程定时检查持有锁的线程是否继续持有锁了,是将锁的生命周期重置到指定时间,即防止线程释放锁之前过期,所以将锁声明周期通过重置延长)
如下,先引入依赖,并在在主启动类中加入如下配置:
1、setnx 线程还没有释放锁、系统就挂了 造成死锁
2、setnx、setex 在加锁的同时给锁设置自动释放时间,但是、不是原子操作存在setnx后系统挂掉 也会造成死锁
3、set key value nx ex (ex是秒 px是毫秒) 进行原子操作 问题:现在系统都是分布式不同的线程线程1加锁后业务没有执行完,锁就自动释放了 线程2加锁 线程1业务执行完成删除锁 删除的就是 线程2的
4、加入uuid判断给每个线程一个唯一值需要保证其原子性 可以通过用lua脚本去进行实现
5、第三方组件实现redission watch dog 看门狗 加锁后 定时去查询锁(续命周期就是设置的超时时间的三分之一),如果线程还持有锁,就会不断的延长锁key的生存时间。因此,Redis就是使用Redisson解决了锁过期释放,业务没执行完问题。当业务执行完,释放锁后,再关闭守护线程,
1)setnx:redis提供的分布式锁
存在问题:线程还没释放锁系统宕机了,造成死锁
2)setnx +setex:给锁设置过期时间,到期自动删除。
存在问题:因为加锁和过期时间设置非原子,存在设置超时时间失败情况,导致死锁
3)set(key,value,nx,px):将setnx+setex变成原子操作
存在问题:加锁和释放锁不是同一个线程的问题。假如线程1业务还没执行完,锁过期释放,线程2获取锁执行,线程1执行完业务删除锁删除的就是线程2的,然后其他线程又可获取锁执行,线程2执行完释放锁删除的是别人的,如此往复,导致并发安全问题。
4).方法1:在value中存入uuid(线程唯一标识),删除锁时判断该标识,同时删除锁需保证原子性,否则还是有删除别人锁问题,可通过lua或者redis事务释放锁
方法2:利用redis提供的第三方类库,Redisson也可解决任务超时,锁自动释放问题。其通过开启另一个服务,后台进程定时检查持有锁的线程是否继续持有锁了,是将锁的生命周期重置到指定时间,即防止线程释放锁之前过期,所以将锁声明周期通过重置延长。
Redission也可解决不可重入问题(AQS,计数)
问题:但上述方案能保证单机系统下的并发访问安全,实际为了保证redis高可用,redis一般会集群部署。单机解决方案会出现锁丢失问题。如线程set值后成功获取锁但主节点还没来得及同步就宕机了,从节点选举成为主节点,没有锁信息,此时其他线程就可以加锁成功,导致并发问题。
5)redis集群解决方案,使用redlock解决:
顺序向5个节点请求加锁(5个节点相互独立,没任何关系)
根据超时时间来判断是否要跳过该节点
如果大于等于3节点加锁成功,并且使用时间小于锁有效期,则加锁成功,否则获取锁失败,解锁
————————————————
版权声明:本文为CSDN博主「三月不灭」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_46129192/article/details/126010250
package com.tulingxueyuan.locks;/*** @author liurong* @date 2023-07-21 14:09*/
public class RedisLockTest {private static RedisLockUtil demo = new RedisLockUtil();private static Integer NUM = 101;public static void main(String[] args) {for (int i = 0; i < 100; i++) {new Thread(() -> {String id = Thread.currentThread().getId() + "";boolean isLock = demo.lock(id);try {// 拿到锁的话,就对共享参数减一if (isLock) {NUM--;System.out.println(NUM);}} finally {// 释放锁一定要注意放在finallydemo.unlock(id);}}).start();}}
}
package com.tulingxueyuan.locks;import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;import java.util.Collections;/*** @author liurong* @date 2023-07-21 14:07*/
public class RedisLockUtil {private String LOCK_KEY = "redis_lock";// key的持有时间,5msprivate long EXPIRE_TIME = 1;// 等待超时时间,1sprivate long TIME_OUT = 1000;// redis命令参数,相当于nx和px的命令合集private SetParams params = SetParams.setParams().nx().px(EXPIRE_TIME);// redis连接池,连的是本地的redis客户端JedisPool jedisPool = new JedisPool("127.0.0.1", 6379);/*** 加锁** @param id* 线程的id,或者其他可识别当前线程且不重复的字段* @return*/public boolean lock(String id) {Long start = System.currentTimeMillis();Jedis jedis = jedisPool.getResource();try {for (;;) {// SET命令返回OK ,则证明获取锁成功String lock = jedis.set(LOCK_KEY, id, params);if ("OK".equals(lock)) {return true;}// 否则循环等待,在TIME_OUT时间内仍未获取到锁,则获取失败long l = System.currentTimeMillis() - start;if (l >= TIME_OUT) {return false;}try {// 休眠一会,不然反复执行循环会一直失败Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}} finally {jedis.close();}}/*** 解锁** @param id* 线程的id,或者其他可识别当前线程且不重复的字段* @return*/public boolean unlock(String id) {Jedis jedis = jedisPool.getResource();// 删除key的lua脚本String script = "if redis.call('get',KEYS[1]) == ARGV[1] then" + " return redis.call('del',KEYS[1]) " + "else"+ " return 0 " + "end";try {String result =jedis.eval(script, Collections.singletonList(LOCK_KEY), Collections.singletonList(id)).toString();return "1".equals(result);} finally {jedis.close();}}
}