分布式锁之redis实现

docker安装redis

拉取镜像

docker pull redis:6.2.6

查看镜像

87d429bb8dfa467baedf8733e62ac37b.png

启动容器并挂载目录

需要挂在的data和redis.conf自行创建即可

docker run --restart always -d -v /usr/local/docker/redis/redis.conf:/usr/local/etc/redis/redis.conf -v /usr/local/docker/redis/data:/data --name redis -p 6379:6379 redis:6.2.6 redis-server /usr/local/etc/redis/redis.conf

查看运行状态 

不要忘记开放端口6379

b8ff2272d9354ac39d198b5819e62aef.png

进入容器内部使用redis-cli

docker exec -it 13829d3f335a /bin/bashredis-cli

[可选]用密码登录 

修改redis.conf配置文件,设置 requirepass xxxxx

spring boot 集成redis

添加依赖

      <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>

添加redis配置 


server.port= 10010spring.datasource.driver-class-name= com.mysql.cj.jdbc.Driver
spring.datasource.url= jdbc:mysql://39.106.53.30:3306/lock_db?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root123456spring.redis.host=39.106.53.30
spring.redis.port=6379

使用StringRedisTemplate

如果直接使用RedisTemplate使用的序列化器是jdk的,存的是二进制,使用StringRedisTemplate默认初始化序列化器就是String类型

    public StringRedisTemplate() {this.setKeySerializer(RedisSerializer.string());this.setValueSerializer(RedisSerializer.string());this.setHashKeySerializer(RedisSerializer.string());this.setHashValueSerializer(RedisSerializer.string());}

redis演示超卖问题

执行票数存入redis指令

set ticket 5000

 编写代码演示超卖问题

/*** @Author sl*/
@Servicepublic class TicketServiceImpl implements TicketService {@Autowiredprivate StringRedisTemplate redisTemplate;@Overridepublic  void sellTicket(){//获取redis中的票数String ticket = redisTemplate.opsForValue().get("ticket");if(ticket!= null && ticket.length() != 0){// 扣减票数Integer integer = Integer.valueOf(ticket);if(integer >0){redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));}}}
}

 5000请求压测,结果为4895,发生了超卖问题

a7fc4d7f1ee548169fd7f08b0fc6ca0a.png
e1d1f4af1d704938b8188b32b6ccbc36.png
redis解决超卖问题 

解决方案

解决方案

  •         本地jvm锁(这种情况仅限单机,不做介绍)
  •         redis乐观锁 watch  multi exec(性能低)
  •         分布式锁(redis+lua手动实现或者通过redission实现)

redis乐观锁实现 

watch: 监控一个或者多个key,如果这些key在提交事务(exec)之前被其他用户修改过,那么事务将执行失败,需要重新获取最新数据重头操作

multi: 开启事务,使用该命令,标记一个事务块的开始,redis会将这些操作放入队列中

exec: 执行事务

720c867464d145e68e27ad877dc0f155.png

 乐观锁的代码需要包在SessionCallback中实现

package com.test.lockservice.service.impl;import com.test.lockservice.service.TicketService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;import java.util.List;/*** @Author sl*/
@Servicepublic class TicketServiceImpl implements TicketService {@Autowiredprivate StringRedisTemplate redisTemplate;@Overridepublic  void sellTicket(){redisTemplate.execute(new SessionCallback<Object>() {@Overridepublic  Object execute(RedisOperations redisOperations) throws DataAccessException {// 开启监听redisOperations.watch("ticket");//获取redis中的票数String ticket = redisTemplate.opsForValue().get("ticket");if(ticket!= null && ticket.length() != 0){// 开启事务redisOperations.multi();Integer integer = Integer.valueOf(ticket);// 扣减票数redisOperations.opsForValue().set("ticket",String.valueOf(--integer));// 提交事务List exec = redisOperations.exec();// 如果获取锁失败 ,重试if(exec == null || exec.size() == 0){try {// 减少锁争抢,避免栈内存溢出Thread.sleep(40);sellTicket();} catch (InterruptedException e) {e.printStackTrace();}}}return null;}});}
}

 1000请求压测,结果为4000,没有发生超卖,但性能极低

b975be5edee2453597b57ccd66d557f1.pngredis实现分布式锁

分布式锁的实现方案中redis的实现主要思想就是独占排他使用,在redis中可以使用setnx命令进行独占排他使用

  • 加锁 setnx 
  • 解锁 del
  • 重试:递归(容易造成栈内存溢出),这里使用循环

 

package com.test.lockservice.service.impl;import com.test.lockservice.service.TicketService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;/*** @Author sl*/
@Servicepublic class TicketServiceImpl implements TicketService {@Autowiredprivate StringRedisTemplate redisTemplate;@Overridepublic  void sellTicket(){// setnx 排他使用,如果获取锁不成功,则重试while(!redisTemplate.opsForValue().setIfAbsent("lock", "111")){try {Thread.sleep(40);} catch (InterruptedException e) {e.printStackTrace();}}try {//获取redis中的票数String ticket = redisTemplate.opsForValue().get("ticket");if(ticket!= null && ticket.length() != 0){// 扣减票数Integer integer = Integer.valueOf(ticket);if(integer >0){redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));}}} finally {// 解锁操作redisTemplate.delete("lock");}}
}

压测1000,显示无超卖现象 

5629d991544d4b7c9b77472abca1c682.png042eaa1342124922a9d528dc660a16ec.png

添加过期时间防止死锁问题

当前代码存在问题,假如现在有4台服务器争抢锁,编号为1的服务器抢到了锁,但是没来得及释放锁,就宕机啦,其他2,3,4服务器就永远拿不到锁,这就是产生的死锁问题,解决方案是给锁添加过期时间来解决

4affdb3b239141e78d695451512263fb.png

要保证枷锁和设置过期时间具有原子性,否则加了锁,没来得及给过期时间就宕机啦,又会产生死锁问题

expire key 20指令和枷锁指令是两条指令不具有原子性,在这里使用 set key ex 20 nx命令设置过期时间来保证原子性

9d6becfe33a84ebf93433896a4d65bcf.png

添加过期时间和获取锁的原子性

redisTemplate.opsForValue().setIfAbsent("lock", "111",3, TimeUnit.SECONDS)

 // setnx 排他使用,如果获取锁不成功,则重试while(!redisTemplate.opsForValue().setIfAbsent("lock", "111",3, TimeUnit.SECONDS)){try {Thread.sleep(40);} catch (InterruptedException e) {e.printStackTrace();}}

通过UUID防止误删

因为已经加了过期时间,如果加了3秒过期时间,第一个请求到了第3秒还没执行完毕,锁就失效了,这时第二个请求获取锁,执行1s的时候,第一个请求执行到del指令,就把第二个锁删除掉啦(误删)

解决方案:通过uuid标识是自己的锁,通过判断是自己的锁,在删除

84e258e5fb574c91a78b4244a41923d9.png

添加uuid防止误删

    public  void sellTicket(){String uuid = UUID.randomUUID().toString();// setnx 排他使用,如果获取锁不成功,则重试while(!redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS)){try {Thread.sleep(40);} catch (InterruptedException e) {e.printStackTrace();}}try {//获取redis中的票数String ticket = redisTemplate.opsForValue().get("ticket");if(ticket!= null && ticket.length() != 0){// 扣减票数Integer integer = Integer.valueOf(ticket);if(integer >0){redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));}}} finally {// 判断是自己的锁在删除if(uuid.equals(redisTemplate.opsForValue().get("lock"))){redisTemplate.delete("lock");}}}

使用Lua脚本解决防误删的原子性问题

判断和删除锁之间需要保证原子性第一个请求因为如果判断的时候,发现是自己的锁,然后此时锁超过了过期时间,此时,第二个请求获取到锁,第一个请求执行del指令,删除的是第二个请求的锁,所以需要在判断和删除锁之间保持原子性

解决方案:使用Lua脚本保证原子性,Lua脚本将多条命令一次性发给redis,redis单线程的特性可以保证原子性操作

Lua脚本介绍和redis执行Lua脚本 

Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能,编译后仅仅一百余K,可以很方便的嵌入别的程序里

菜鸟地址:https://www.runoob.com/lua/lua-variables.html

Lua脚本流程控制和变量定义

--[ 定义全局变量a 局部变量用local a --]
a = 100;
--[ 检查条件 --]
if( a < 20 )
then--[ if 条件为 true 时执行该语句块 --]print("a 小于 20" )
else--[ if 条件为 false 时执行该语句块 --]print("a 大于 20" )
end
print("a 的值为 :", a)

在redis中执行Lua脚本

redis中继承了Lua脚本,lua-time-limit参数现在脚本最长运行时间,默认是5秒,执行指令为:

eval script numkeys key [key ...] arg [arg ...]

numkeys:标识key的数量 不能省略

hello word

eval "return 'hello world'" 0

分支语句KEYS和ARGV必须大写

eval "if KEYS[1]==1 then return KEYS[1] else return  ARGV[1] end" 1 0 3 

4c5257c42e834f25958f88987fdfb151.png解决判断和删除之间的原子性问题

// 如果是自己的锁,则删除,否则返回0为false
if redis.call('get',KEYS[1]) == ARGV[1]
thenreturn redis.call('del',KEYS[1])
elsereturn 0
endkeys:lockargv: uuid
 public  void sellTicket(){String uuid = UUID.randomUUID().toString();// setnx 排他使用,如果获取锁不成功,则重试while(!redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS)){try {Thread.sleep(40);} catch (InterruptedException e) {e.printStackTrace();}}try {//获取redis中的票数String ticket = redisTemplate.opsForValue().get("ticket");if(ticket!= null && ticket.length() != 0){// 扣减票数Integer integer = Integer.valueOf(ticket);if(integer >0){redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));}}} finally {String script="if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";this.redisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Collections.singletonList("lock"),uuid);}}

 压测1000 显示无超卖现象

daf44ce416a4420294d1bd923028b763.png

4b48b5d2be32427c9ec2bf1033598291.pnghash+Lua解决锁的可重复入问题

由于上述加锁命令使用了 SETNX ,一旦键存在就无法再设置成功,这就导致后续同一线程内继续加 锁,将会加锁失败。当一个线程执行一段代码成功获取锁之后,继续执行时,又遇到加锁的子任务代码,可重入性就保证线程能继续执行,而不可重入就是需要等待锁释放之后,再次获取锁成功,才能继续往下执行

第一个就是锁的重入问题

当前方法a获取锁,在方法之中调用b方法,b方法也需要获取锁,这个时候造成了死锁问题,采用hash+Lua脚本解决

第二个就是锁的自动续期问题:后续会解决续期问题

探讨ReentrantLock的可重入原理

ReentrantLock继承了aqs,aqs是锁的基石

可重入锁加锁流程

  • CAS获取锁,如果没有线程占用锁(state==0),加锁成功并记录当前线程是有锁线程
  • 如果state的值不为0,说明锁已经被占用。则判断当前线程是否是有锁线程,如果是则重入 (state + 1)
  • 否则加锁失败,入队等待

可重入锁解锁流程

  • 判断当前线程是否是有锁线程,不是则抛出异常
  • 对state的值减1之后,判断state的值是否为0,为0则解锁成功,返回true
  • 如果减1后的值不为0,则返回false

hash+Lua实现可重复入锁

参照ReentrantLock中的非公平可重入锁实现分布式可重入锁: hash + lua脚本
加锁

  •     判断锁是否存在 (exists),则直接获取锁 hset key field value
  •     如果锁存在则判断是否自己的锁 (hexists),如果是自己的锁则重入: hincrby key field increment
  •     否则重试:递归 循环
加锁
如果锁不存在或者这是自己的锁,就通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次
数加1if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1], ARGV[1]) == 1 
then redis.call('hincrby',KEYS[1], ARGV[1], 1)redis.call('expire',KEYS[1],ARGV[2])return 1
else return 0
endkeys lock
argv uuid 30解锁
判断 hash set 可重入 key 的值是否等于 0
如果为 nil 代表 自己的锁已不存在,在尝试解其他线程的锁,解锁失败
如果为 0 代表 可重入次数被减 1
如果为 1 代表 该可重入 key 解锁成功
1 代表解锁成功,锁被释放
0 代表可重入次数被减 1
null 代表其他线程尝试解锁,解锁失败
if redis.call('hexists',KEYS[1],ARGV[1])==0 
then return nil 
elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)>0 
then return 0 
else redis.call('del',KEYS[1]) return 1 
endkeys lock
argv uuid

exists判断lock是否存在,hexists lock uuid 判断filed是否存在

通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次数加1,hincrby命令,如果增加的key filed 不存在则新增并加1

ef39ebc7bb9840cd8169db5e279dc80a.png

加锁工具类

package com.test.lockservice.utils;import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;import java.util.Collections;
import java.util.UUID;public class RedisLock {private StringRedisTemplate redisTemplate;private String lockName;private String uuid;private Integer expire = 30;private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();public RedisLock(StringRedisTemplate redisTemplate, String lockName) {this.redisTemplate = redisTemplate;this.lockName = lockName;this.uuid = THREAD_LOCAL.get();if (uuid == null) {this.uuid = UUID.randomUUID().toString();THREAD_LOCAL.set(uuid);}this.expire = expire;}public void lock(){this.lock(expire);}public void lock(Integer expire){this.expire = expire;String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1], ARGV[1]) == 1 then redis.call('hincrby',KEYS[1], ARGV[1], 1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";System.out.println(script);if (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Collections.singletonList(lockName), uuid, expire.toString())){try {// 没有获取到锁,重试Thread.sleep(60);lock(expire);} catch (InterruptedException e) {e.printStackTrace();}}}public void unlock(){String script = "if redis.call('hexists',KEYS[1],ARGV[1])==0 then return nil elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)>0 then return 0 else redis.call('del',KEYS[1]) return 1 end";/*** 如果返回值没有使用Boolean,spring-data-redis 进行类型转换时将会把 null 转为 false,这就会影响我们逻辑判断* 所以返回类型只好使用 Long:null-解锁失败;0-重入次数减1;1-解锁成功*/Long result = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class),  Collections.singletonList(lockName), uuid);// 如果未返回值,代表尝试解其他线程的锁if (result == null) {throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName: " + lockName + " with request: " + uuid);} else if (result == 1) {THREAD_LOCAL.remove();}}
}

测试可重入 

@Overridepublic  void checkAndLock(){RedisLock lock = new RedisLock(redisTemplate, "lock");lock.lock();// 查询票数Ticket ticket = ticketMapper.selectOne(new QueryWrapper<Ticket>().eq("sell_company", "12306"));// 判断不为空和票数大于0if(ticket!=null&& ticket.getCount() > 0){ticket.setCount(ticket.getCount()-1);ticketMapper.updateById(ticket);}// 测试可重入testRepeatEntry();lock.unlock();}public void testRepeatEntry(){RedisLock lock = new RedisLock(redisTemplate, "lock");lock.lock();System.out.println("redis分布式锁测试可重入");lock.unlock();}

 压测1000,未发现超卖问题,并解决可重入的问题

7dfd3282120641e2a34d09da730a07c6.png

d920dd12a6cb4e7dbce13ea0c1b15000.png锁的自动续期

如果在锁还在使用过程中,锁还未使用完,就失效了,也就产生了锁如何自动添加过期时间的问题 

实现方案: 定时器 + Lua脚本定时续期


自动续期if redis.call('hexists',KEYS[1],ARGV[1])==1
thenredis.call('expire',KEYS[1],ARGV[2]) return 1 
else return 0 
end

这里没有选用线程池的原因在于释放锁之后没有取消定时任务的方法,所以选用jdk自带的

Timer作为定时任务 

package com.test.lockservice.utils;import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;import java.util.*;public class RedisLock {private StringRedisTemplate redisTemplate;private String lockName;private String uuid;private Integer expire = 30;@SuppressWarnings("all")private static final Timer timer = new Timer();private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();public RedisLock(StringRedisTemplate redisTemplate, String lockName) {this.redisTemplate = redisTemplate;this.lockName = lockName;this.uuid = THREAD_LOCAL.get();if (uuid == null) {this.uuid = UUID.randomUUID().toString();THREAD_LOCAL.set(uuid);}this.expire = expire;}public void lock(){this.lock(expire);}public void lock(Integer expire){this.expire = expire;String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1], ARGV[1]) == 1 then redis.call('hincrby',KEYS[1], ARGV[1], 1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";if (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Collections.singletonList(lockName), uuid, expire.toString())){try {// 没有获取到锁,重试Thread.sleep(60);lock(expire);} catch (InterruptedException e) {e.printStackTrace();}}// 自动续期renewExpire();}public void unlock(){String script = "if redis.call('hexists',KEYS[1],ARGV[1])==0 then return nil elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)>0 then return 0 else redis.call('del',KEYS[1]) return 1 end";/*** 如果返回值没有使用Boolean,spring-data-redis 进行类型转换时将会把 null 转为 false,这就会影响我们逻辑判断* 所以返回类型只好使用 Long:null-解锁失败;0-重入次数减1;1-解锁成功*/Long result = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class),  Collections.singletonList(lockName), uuid);// 如果未返回值,代表尝试解其他线程的锁if (result == null) {throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName: " + lockName + " with request: " + uuid);} else if (result == 1) {THREAD_LOCAL.remove();}// 释放锁成功this.uuid = null;}@SuppressWarnings("all")private void renewExpire() {String script = "if redis.call('hexists',KEYS[1],ARGV[1])==1 then redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";timer.schedule(new TimerTask() {@Overridepublic void run() {if (uuid != null) {redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), RedisLock.this.uuid, expire.toString());renewExpire();}}},expire * 1000 / 3);}
}

红锁算法 

利用红锁算法解决集群下锁的问题:

  • 1、应用程序获取当前系统时间
  • 2、应用程序以相同的kv值依次从多个redis实例中获取锁,如果某一个节点超过了一定时间(小于过期时间)没有获取到锁,则放弃,尽快从其他节点获取锁,避免一个节点宕机阻塞
  • 3、计算锁的消耗时间= 客户端当前时间-step1中的事件,获取锁的时间小于总的锁定时间,并且半数以上节点获取锁成功,认为获取锁成功
  • 4、如果获取锁失败,对所有节点释放锁

redis分布式锁小结

redis分布式锁最开始采用setnex+Lua脚本的方式,我们发现存在不可重入的问题,于是使用hash+Lua脚本解决可重入问题,并解决了自动续期问题,但是还存在一个重要问题,就是redis集群部署所带来的并发问题,所以使用Redission作为最终的分布式锁解决方案

redis集群状态下的问题:
  • 客户端A从master获取到锁
  • 在master将锁同步到slave之前,master宕掉了
  • slave节点被晋级为master节点
  • 客户端B取得了同一个资源被客户端A已经获取到的另外一个锁

redisson中的分布式锁  

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅 提供了一系列的分布式的Java常用对象,还提供了许多分布式服务
Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上

Redisson引入依赖

 <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.11.2</version>
</dependency>

Redission配置 

package com.test.lockservice.config;import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author sl*/
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){Config config = new Config();
//        config.useClusterServers()config.useSingleServer().setAddress("redis://39.106.53.30:6379").setPassword("12345");return Redisson.create(config);}
}

Redission使用

@Autowiredprivate RedissonClient redissonClient;public void userRedisson(){// 获取锁RLock lock = redissonClient.getLock("lock");try {// 加锁lock.lock();//获取redis中的票数String ticket = redisTemplate.opsForValue().get("ticket");if(ticket!= null && ticket.length() != 0){// 扣减票数Integer integer = Integer.valueOf(ticket);if(integer >0){redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));}}} finally {// 解锁lock.unlock();}}

1000并发压测,发现并无超卖问题 

60440ac508af4380ba2ecc5de16754fa.png

c017a7803ee2469c9e89014ef00362c7.png

RLock原理

 RLock对象实现了 java.util.concurrent.locks.Lock 接口,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗检查锁的超时时间 是30秒钟,也可以通过修改 Config.lockWatchdogTimeout 来另行指定

  • RLock 对象完全符合Java的Lock规范。也就是说只有拥有锁的进程才能解锁,其他进程解锁则会抛出 IllegalMonitorStateException 错误
  • 另外Redisson还通过加锁的方法提供了 leaseTime 的参数来指定加锁的时间。超过这个时间后锁便自动解开了

其实Redisson底层的实现思路同样是hash+Lua脚本的实现方式,在源码中可以看到,下面列举一下加锁的源码

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}

 公平锁 

基于Redis的Redisson分布式可重入公平锁也是实现了 java.util.concurrent.locks.Lock 接口的一 种 RLock 对象。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。它保证了 当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队 列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个 线程都处于等待状态,那么后面的线程会等待至少25秒
public void useFairLock() {RLock fairLock = redissonClient.getFairLock("fairLock");
//        fairLock.lock();// 10秒钟以后自动解锁// 无需调用unlock方法手动解锁fairLock.lock(10, TimeUnit.SECONDS);System.out.println("加锁成功"+Thread.currentThread().getName());// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
//        boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
//        fairLock.unlock();}加锁成功http-nio-10010-exec-5
加锁成功http-nio-10010-exec-10

可以看到,公平锁会维护一个队列,按发送顺序依次加锁

22641eac255c4cd78315c6cda75863ee.png 

联锁

   在多个redis实例上获取锁,联锁所有的锁都上锁成功才算成功

  @Overridepublic void useMutiLock() {RLock lock1 = redissonClient.getLock("lock1");
//        RLock lock2 = redissonClient.getLock("lock2");//联锁所有的锁都上锁成功才算成功RedissonMultiLock redissonMultiLock = new RedissonMultiLock(lock1);redissonMultiLock.lock();System.out.println("业务内容");redissonMultiLock.unlock();}

红锁

在多个节点上加锁,大部分节点获取锁成功就算成功

public void useRedLock() {RLock lock1 = redissonClient.getLock("lock1");
//        RLock lock2 = redissonClient.getLock("lock2");RedissonRedLock readLock = new RedissonRedLock(lock1);// 红锁在大部分节点上加锁成功就算成功readLock.lock();System.out.println("业务内容");readLock.unlock();}

读写锁

对读和写上锁,RReadWriteLock实现了java.util.concurrent.locks.ReadWriteLock接口,读-读不阻塞

 public void useReadWriteLock() {/*** 读-读 不阻塞 读-写 阻塞 写-写 阻塞* RReadWriteLock实现了java.util.concurrent.locks.ReadWriteLock接口*/RReadWriteLock rwlock = redissonClient.getReadWriteLock("readWrite");// 最常见的读锁rwlock.readLock().lock();// 写锁rwlock.writeLock().lock();// 10秒钟以后自动解锁无需调用unlock方法手动解锁rwlock.readLock().lock(10, TimeUnit.SECONDS);rwlock.writeLock().lock(10, TimeUnit.SECONDS);// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁// boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);rwlock.readLock().unlock();rwlock.writeLock().unlock();}

 信号量

资源限流并发工具类,java.util.concurrent.semaphore是单机版限流,RSemaphore是分布式限流,下面的Semaphore会始终限流3个资源

单机版 

package com.test.lockservice.service.impl;import java.util.concurrent.Semaphore;/*** @Author sl*/
public class SemaphoreTest {public static void main(String[] args) {// 3个有限资源Semaphore semaphore = new Semaphore(3);for (int i = 0; i < 6; i++) {new Thread(()->{try{// 获取资源semaphore.acquire();System.out.println(Thread.currentThread().getName() + "抢到车位");Thread.sleep(1000);System.out.println(Thread.currentThread().getName()  +"离开车位");}catch (Exception e){e.printStackTrace();}finally {// 释放资源semaphore.release();}}).start();}}
}Thread-1抢到车位
Thread-0抢到车位
Thread-4抢到车位
Thread-0离开车位
Thread-1离开车位
Thread-4离开车位
Thread-3抢到车位
Thread-5抢到车位
Thread-2抢到车位
Thread-5离开车位
Thread-3离开车位
Thread-2离开车位

分布式版

  public void useSemaphore() {/*** RSemaphore 采用了与java.util.concurrent.semaphore相似的接口* 资源限流信号量, 3个资源 6个线程, semaphore是单机版限流,RSemaphore是分布式限流*/RSemaphore semaphore = redissonClient.getSemaphore("semaphore");try{semaphore.acquire();}catch(Exception e){e.printStackTrace();}finally {semaphore.release();}}

闭锁(CountDownLatch

CountDownLatch并发工具类,一个线程等待一组线程结束是一个做减法的倒计时器,RCountDownLatch 采用了与java.util.concurrent.CountDownLatch 相似的接口和用法,

 单机版

package com.test.lockservice.service.impl;import java.util.concurrent.CountDownLatch;/*** @Author sl*/
public class CountDownLatchTest {public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(6);for (int i = 1; i <= 6; i++) {new Thread(()->{System.out.println(Thread.currentThread().getName() + "\t上完自习");countDownLatch.countDown();},String.valueOf(i)).start();}// 班长等待所有线程同学走完在锁门countDownLatch.await();System.out.println(Thread.currentThread().getName() + "\t班长离开,锁门");}
}1	上完自习
3	上完自习
4	上完自习
5	上完自习
2	上完自习
6	上完自习
main	班长离开,锁门

顺道介绍一下CyclicBarrier并发工具类,与CountDownLatch正好相反,它做的是加法

package com.test.lockservice.service.impl;import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;/*** @Author sl*/
public class CyclicBarrierTest {public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{System.out.println("集齐了卡片,开始召唤神龙");});for (int i = 0; i < 7; i++) {String s = String.valueOf(i);new Thread(()->{System.out.println(Thread.currentThread().getName() + "\t 收集到第"+s+"卡片");try {cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}},String.valueOf(i)).start();}}
}0	 收集到第0卡片
6	 收集到第6卡片
2	 收集到第2卡片
1	 收集到第1卡片
5	 收集到第5卡片
4	 收集到第4卡片
3	 收集到第3卡片
集齐了卡片,开始召唤神龙

分布式版

 public void useCountDownLatch() {/*** RCountDownLatch 采用了与java.util.concurrent.CountDownLatch 相似的接口和用法* 一个线程 等待一组线程完事* 班长等待所有同学走出门口在锁门 CountDownLatch是单机版 RCountDownLatch是分布式版*/RCountDownLatch latch = redissonClient.getCountDownLatch("anyCountDownLatch");latch.trySetCount(6);latch.countDown();try{latch.await();}catch (Exception e){e.printStackTrace();}}

关于zookeeper实现分布式锁,在本专栏zookeeper章节中做了简单介绍,就是创建临时顺序节点,值最小的就是锁。 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/69425.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用 【jacoco】对基于 SpringBoot 和 Dubbo RPC 的项目生成测试覆盖率报告:实践+原理

基于 Dubbo RPC 的项目中有一个提供者项目backend、一个消费者项目gateway、以及注册中心nacos。本篇文章记录在windows本地对该框架的测试过程&#xff0c;以及介绍jacoco的基本原理 测试过程 官网下载安装包解压到本地&#xff0c;https://www.jacoco.org/jacoco/ 只需要用…

11. Junit

我们主要学习的是 Junit5. 1. selenium 和 Junit 之间的关系 selenium 和 Junit 之间的关系 就像 Java 和 JavaScript 之间的关系&#xff0c;也就是没有关系。 为什么学习了 selenium 还要学习 Junit 呢&#xff1f; 举个例子&#xff0c;如果 Selenium 编写的自动化测试用…

论坛系统公共组件部分

1.在Java⽬录下创建包&#xff0c;在Resources⽬录下创建⽂件夹&#xff0c;结构如下 ├─java # java⽂件区 │ └─com │ └─example │ └─demo │ ├─common # 公共类 │ ├─config # 配置…

MySQL表操作

目录 一、创建mysql表的结构 1.在mydb数据库下创建一个表格名字为stu_info&#xff0c;里面结构包含了学号和姓名的名称&#xff0c;字符型以及他的引擎为innodb 字符集为gbk 校队规则为gbk_chinese_ci 二、数据库表查看的基本用法语句 1.查看数据库表明 2.查看数据库表的结…

数字图像处理:亮度对比度-几何变换-噪声处理

文章目录 数字图像增强亮度与对比度转换几何变换图像裁剪尺寸变换图像旋转 噪声处理添加噪声处理噪声 数字图像增强 亮度与对比度转换 图像变换可分为以下两种&#xff1a; 点算子&#xff1a;基于像素变换&#xff0c;在这一类图像变换中&#xff0c;仅仅根据输入像素值计算…

【计算机网络】 ARP协议和DNS协议

文章目录 数据包在传输过程中的变化过程单播组播和广播ARP协议ARP代理免费ARP路由数据转发过程DNS协议 数据包在传输过程中的变化过程 在说ARP和DNS之前&#xff0c;我们需要知道数据包在传输过程的变化过程 从图片中可以看到&#xff0c;发送方的原数据最开始是在应用层&…

vscode使用delve调试golang程序

环境配置 delve仓库&#xff0c;含有教程&#xff1a;https://github.com/go-delve/delve golang的debugging教程&#xff1a;https://github.com/golang/vscode-go/wiki/debugging > go version go version go1.20 windows/amd64> go install github.com/go-delve/de…

华为云Stack的学习(五)

六、华为云stack服务简介 1.云服务在华为云Stack中的位置 云服务对接多个数据中心资源池层提供的资源&#xff0c;并向各种行业应用提供载体。 2.华为云Stack通用服务 2.1 云计算的服务模式 2.2 计算相关的云服务 2.3 存储相关的云服务 2.4 网络相关的云服务 3.云化案例 **…

如何取消KEIL-MDK工程中出现的CMSIS绿色图标

如何取消KEIL-MDK工程中出现的CMSIS绿色图标&#xff1f;我以前经常遇到&#xff0c;不知道怎么搞&#xff0c;好像也不影响编译结果。以前问过其他人&#xff0c;但是不知道怎么搞&#xff0c;相信很多人也遇到过。水平有限&#xff0c;表达不清楚&#xff0c;见下图&#xff…

Bootstrap的标题类(标题样式h1~h6)

Bootstrap 的标题字体大小通常遵循以下样式规则&#xff1a; h1 标题的字体大小为 2.5rem&#xff08;40像素&#xff09;。h2 标题的字体大小为 2rem&#xff08;32像素&#xff09;。h3 标题的字体大小为 1.75rem&#xff08;28像素&#xff09;。h4 标题的字体大小为 1.5re…

【trie树】CF Edu12 E

Problem - E - Codeforces 题意&#xff1a; 思路&#xff1a; 这其实是一个套路题 区间异或转化成前缀异或&#xff0c;然后枚举 i 对于每一个 i&#xff0c;ai ^ x k&#xff0c;对 x 计数 先建一棵字典树&#xff0c;然后在字典树上计数 先去对 > k 的部分计数&a…

国际版阿里云/腾讯云:弹性高性能计算E-HPC入门概述

入门概述 本文介绍E-HPC的运用流程&#xff0c;帮助您快速上手运用弹性高性能核算。 下文以创立集群&#xff0c;在集群中安装GROMACS软件并运转水分子算例进行高性能核算为例&#xff0c;介绍弹性高性能核算的运用流程&#xff0c;帮助您快速上手运用弹性高性能核算。运用流程…

易云维®医院后勤管理系统软件利用物联网智能网关帮助实现医院设备实现智能化、信息化管理

近年来&#xff0c;我国医院逐渐意识到医院设备信息化管理的重要性&#xff0c;逐步建立医院后勤管理系统软件&#xff0c;以提高信息化管理水平。该系统是利用数据库技术&#xff0c;为医院的中央空调、洁净空调、电梯、锅炉、医疗设备等建立电子档案&#xff0c;把设备监控、…

性能炸裂c++20协程+iocp/epoll,超轻量高性能异步库开发实战

前言&#xff1a; c20出来有一段时间了。其中一大功能就是终于支持协程了&#xff08;c作为行业大哥大级别的语言&#xff0c;居然到C20才开始支持协程&#xff0c;我也是无力吐槽了&#xff0c;让多少人等了多少年&#xff0c;等了多少青春&#xff09;但千呼万唤他终于还是来…

基于Matlab实现多个图像融合案例(附上源码+数据集)

图像融合是将多幅图像合成为一幅图像的过程&#xff0c;旨在融合不同图像的信息以获得更多的细节和更丰富的视觉效果。在本文中&#xff0c;我们将介绍如何使用Matlab实现图像融合。 文章目录 简单案例源码数据集下载 简单案例 首先&#xff0c;我们需要了解图像融合的两种主…

【STM32】IIC的初步使用

IIC简介 物理层 连接多个devices 它是一个支持设备的总线。“总线”指多个设备共用的信号线。在一个 I2C 通讯总线中&#xff0c;可连接多个 I2C 通讯设备&#xff0c;支持多个通讯主机及多个通讯从机。 两根线 一个 I2C 总线只使用两条总线线路&#xff0c;一条双向串行数…

Android Studio 汉化

一、汉化&#xff1a; 查看版本号&#xff0c;查看Android Studio版本&#xff0c;根据版本下载对应的汉化包。例如我的是223。 下载汉化包&#xff1a; 中文语言包下载地址 找到对应的版本 回到Android Studio 1、进入设置 2、从磁盘安装插件 3、选择下载好的包点击OK 4、…

介绍OpenCV

OpenCV是一个开源计算机视觉库&#xff0c;可用于各种任务&#xff0c;如物体识别、人脸识别、运动跟踪、图像处理和视频处理等。它最初由英特尔公司开发&#xff0c;目前由跨学科开发人员社区维护和支持。OpenCV可以在多个平台上运行&#xff0c;包括Windows、Linux、Android和…

AJAX学习笔记6 JQuery对AJAX进行封装

AJAX学习笔记5同步与异步理解_biubiubiu0706的博客-CSDN博客 AJAX请求相关的代码都是类似的&#xff0c;有很多重复的代码&#xff0c;这些重复的代码能不能不写&#xff0c;能不能封装一个工具类。要发送ajax请求的话&#xff0c;就直接调用这个工具类中的相关函数即可。 用J…

深圳-海岸城购物中心数据分析

做数据分析的时候&#xff0c;如果要对商场进行分析&#xff0c;可以从这些数据纬度进行分析&#xff0c;如下图所示&#xff1a; 截图来源于数位观察&#xff1a;https://www.swguancha.com/