Redis系列-5 Redis分布式锁

背景:

本文介绍Redis分布式锁的内容,包括Redis相关命令和Lua脚本的介绍,以及操作分布式锁的流程与消息,最后结合Redission源码介绍分布式锁的实现原理。

1.基本命令

1.1 基本键值对的设置

设值: set key value
取值: get key
删除: del key

>set key1 value1
"OK"
>get key1
"value1"
>del key1
"1"

1.2 setnx用法

setnx key value:
当key不存在时,进行设置,返回1(表示操作成功)
当key存在时,不进行设置,返回0(表示操作失败)

>setnx key1 value1
"1"
>setnx key1 value1
"0"

1.3 setex和psetex

setex key seoconds value
等价于原子性地执行了 set key value和expire key seconds
psetex用法与setex相同,区别是setex单位为秒,而psetex是毫秒;

>setex key1 1000 value1
"OK"
>ttl key1
"997"

1.4 set扩展用法

set key value [EX seconds | PX millSeconds] [NX | XX]
seconds EX 表示设置过期时间以秒为单位,millSeconds PX 表示设置过期时间以毫秒为单位;
NX表示当键不存在时执行,并返回OK;否则返回null
XX表示当键存在时执行,并返回OK;否则返回null

>set key1 value1 EX 1000 NX
"OK"
>set key1 value1 EX 1000 NX
null
>set key1 value1 EX 2000 XX
"OK"
>ttl key1
"1996"

2.lua脚本

由于redis是单线程执行的,因此可以原子性地执行lua脚本。因此可通过lua脚本对基本命令进行组合。
格式如下:

EVAL "lua脚本" n KEY... , ARGV...

(1) 通过EVAL命令执行lua脚本;
(2) 可对脚本进行传参,可以传多个KEY和多个ARGV,KEY和ARGV建议使用逗号(,)隔开;
(3) 需要显示指定KEY个数;
(4) lua脚本通过KEYS[i] 和 ARGV[j] 获取传入的参数,下标从1开始;
以下通过案例的方式介绍一下lua脚本的使用。

2.1 加锁

分布式锁的数据结构可以被定义为如下格式:

{"lockKey":  {"uuid: threadId": num}
}

lockKey表示分布式锁:数据库存中存在lockKey键时,表示已有客户端占据了lockKey锁,否则表示lockKey锁未被获取。
uuid: threadId结构包含了UUID唯一字符串,num为获取锁的次数。UUID用于保证上锁和解锁是同一个客户端,num用于实现锁的可重入。
案例:

// 如果锁不存在,则加锁并设置过期时间
if (redis.call('exists', KEYS[1]) == 0) then// 设置锁记录锁的获取次数为1redis.call('hincrby', KEYS[1], ARGV[2], 1);// 设置锁的过期时间redis.call('pexpire', KEYS[1], ARGV[1]);return nil;
end;// 如果锁存在,且为自己,锁+1,并重新设置过期时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then// 设置锁记录锁的获取次数+1redis.call('hincrby', KEYS[1], ARGV[2], 1);// 重置锁的过期时间redis.call('pexpire', KEYS[1], ARGV[1]);return nil;
end;// 锁已存在,不是自己,则返回锁到期时间
return redis.call('pttl', KEYS[1]);

说明:
上述LUA脚本返回空,说明锁获取成功;否则获取失败并得到锁的过期时间(毫秒)。

其中,redis.call('exists', KEYS[1])表示KEY[1]键是否存在,存在返回1,不存在返回0;redis.call('hincrby', KEYS[1], ARGV[2], 1)表示对哈希类型数据KEYS[1]和ARGV[2]键对应的值加1;redis.call('pexpire', KEYS[1], ARGV[1])表示设置KEYS[1]键的有效期为ARGV[1],单位毫秒;

在redis客户端进行如下操作:

>EVAL "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" 1 myLock 60000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
null>hget myLock 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
"1">ttl myLock
"54">EVAL "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" 1 myLock 60000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
null>ttl myLock
"56">hget myLock 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
"2"

给上述lua脚本的传参为1 myLock 60000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

1表示只有一个Key, 其他为ARGV, 即
KEY[1] = myLock
ARGV[1]=60000
ARGV[2]=80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

得到的结果如下:

{"myLock":  {"80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12": 2}
}

表示"myLock"分布式锁已被占用, 获取锁的次数为2次。

2.2 解锁

案例:

// 解锁成功返回1,失败返回0
if (redis.call('del', KEYS[1]) == 1) then // 向Redis发布消息redis.call('publish', KEYS[2], ARGV[1]); return 1 
else return 0 
end

说明:
解锁成功后,该lua脚本返回1,解锁失败返回0;
其中: redis.call('del', KEYS[1])表示根据KEYS[1]键删除数据;redis.call('publish', KEYS[2], ARGV[1])表示发布消息 KEYS[2], ARGV[1];

2.3 释放一层锁

// 锁不是被自己占有,直接返回
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) thenreturn nil;
end;// 锁数量-1,如果还大于0,重新设置过期时间;否则删除锁
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then// 重置过期时间redis.call('pexpire', KEYS[1], ARGV[2]);return 0;
elseredis.call('del', KEYS[1]);redis.call('publish', KEYS[2], ARGV[1]);return 1;
end;
return nil;

上述Lua脚本返回1表示删除锁成功,返回0表示锁释放一层,返回空表示释放失败。

2.4 续期

// 锁被自己占用,重新设置过期时间,返回1;否则返回0if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) thenredis.call('pexpire', KEYS[1], ARGV[1]);return 1;
end;
return 0;

上述Lua脚本返回1表示续期成功,返回0表示续期失败(当前未获取锁)。

3.Redission用法

分布式锁可以直接使用开源的Redission
引入依赖:

<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.16.1</version>
</dependency>

编码如下:

public static void lock1() {RedissonClient redisson = getRedissonClient();RLock lock = redisson.getLock("myLock");// 获取锁lock.lock();try {// 业务逻辑} catch (InterruptedException e) {e.printStackTrace();} finally {// 释放锁lock.unlock();}redisson.shutdown();System.out.println("Begin end");
}// 获取redis客户端实例
private static RedissonClient getRedissonClient() {Config config = new Config();config.setLockWatchdogTimeout(600*1000);config.useSingleServer().setAddress("redis://127.0.0.1:6001").setPassword("xxx");RedissonClient redisson = Redisson.create(config);return redisson;
}

说明:redisson.getLock(“myLock”)中的myLock即为分布式锁的键,多个客户端实例需要保证键相同。
lock.lock()用于执行获取锁的逻辑,获取成功后直接返回;获取失败后进入等待队列阻塞;lock.unlock();用于手动解锁。
getRedissonClient方法用于获取redis客户端实例,其中的setLockWatchdogTimeout方法用于设置看门狗的超时时间,单位毫秒,默认为30000(30秒)。
使用lock.lock()方法获取锁时不需要设置锁的过期时间,在获取锁成功后,Redisson通过看门狗机制,进行锁的续期,每经过WatchdogTimeout/3时间执行一次续期操作。
当lock.unlock()释放锁时,会同时关闭看门狗。

4.流程和消息

4.1 流程介绍

屏蔽底层Redis对锁的实现方式,仅用Lock和UnLock表示获取锁和释放锁,分布式锁的竞争流程可表示如下图所示:
在这里插入图片描述
[1] 客户端ClientA向Redis发送获取锁的消息,锁key为myLock(自定义);
[2] Redis响应成功,表示占锁成功;
[3] 客户端ClientB向Redis发送获取锁的消息,key为myLock;
[4] 服务器判断此时myLock锁已被ClientA占有,Redis响应失败;
[5] Client B 向Redis发送订阅消息订阅myChannel频道,等待收到通知;
[6-7] ClientA释放锁同时发布消息至Redis的myChannel频道;
[8] Redis收到publish消息后,向所有订阅了myChannel频道的客户端发送message通知消息;
[9] ClientB收到订阅的消息后,知道锁已被释放,再次获取锁;
其中:消息6和消息7是lua脚本执行的,因此具备原子性;当客户端收到message消息时,表明锁已被释放,可以重新竞争锁。
另外,对于客户端ClientA,在消息2-6之间,Redis的看门狗机制会自动为myLock续期。

4.2 消息介绍

Auth消息:

*2
$4
AUTH
$8
Root@123+OK

其中:*2 表示由两个输入字符串;
$4表示第一个字符串长度为4,即AUTH;
$8表示第二个字符串长度为8,即Root@123;
+OK为Redis返回的结构,表示鉴权成功;
解析后为:

client: AUTH Root@123
Redis: OK

PING/PONG消息:

*1
$4
PING+PONG

客户端向Redis发送PING心跳消息,Redis响应PONG消息。

QUITE消息:

*1
$4
QUIT+OK

客户端向Redis发送QUITE退出消息,Redis响应OK消息。

以下分场景介绍Redis消息,包括成功获取锁—锁的续期—锁的释放和发布通知以及获取锁失败—锁的订阅—收到通知消息—取消订阅等,为简化篇幅,将省略AUTH、PING/PONG、AUITE等重复的内容。

4.2.1 成功获取锁

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "exists"过滤条件,得到:

*6
$4 
EVAL
$339 
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);
$1
1
$6
myLock
$4
6000
$39
80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

解析后为:

EVAL "lua脚本" 1 myLock 6000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

4.2.2 获取锁后,锁的续期

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""过滤条件,得到:

*6
$4
EVAL
$120
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;
$1
1
$6
myLock
$4
6000
$39
80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

解析后为:

EVAL "lua脚本" 1 myLock 6000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

4.2.3 释放锁

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""过滤条件,得到:

*8
$4
EVAL
$305
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;
$1
2
$6
myLock
$31
redisson_lock__channel:{myLock}
$1
0
$4
6000
$39
80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

解析后为:

EVAL "lua脚本" 2 myLock redisson_lock__channel:{myLock} 0 6000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

4.2.4 获取锁失败后订阅

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""过滤条件,得到:

*2
$9
SUBSCRIBE
$31
redisson_lock__channel:{myLock}

解析后为:

EVAL SUBSCRIBE redisson_lock__channel:{myLock}

4.2.5 订阅后收到通知消息

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""过滤条件,得到:

*3
$7
message
$31
redisson_lock__channel:{myLock}
$1
0

解析后为:

message redisson_lock__channel:{myLock} 0

4.2.6 取消订阅

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""过滤条件,得到:

*2
$11
UNSUBSCRIBE
$31
redisson_lock__channel:{myLock}

解析后为:

EVAL UNSUBSCRIBE redisson_lock__channel:{myLock}

5.源码

源码介绍围绕下图展开,如果对下图的逻辑线比较属性,直接跳过本章内容。
在这里插入图片描述
在介绍源码前,有必要了解一下两个概念: Redis的订阅发布机制和Semaphore.
Redis订阅和发布机制:
打开两个Redis客户端,分别执行subscribe myChannel订阅myChannel频道的消息:

>subscribe myChannel
切换到推送/订阅模式,关闭标签页来停止接收信息。
1) "subscribe"
2) "myChannel"
3) "1"

再打开一个客户端,执行publish myChannel key1告诉Redis,向订阅了myChannel的客户端发送消息:

>publish myChannel key1
"2"

返回值2表示有两个订阅客户端。

客户端收到Redis的通知消息:

1) "message"
2) "myChannel"
3) "key1"

Semaphore:

说明:由于在线程专题已经详细介绍过AQS,这里涉及AQS的内容不再展开介绍。

Semaphore是JUC中的一个并发工具类,内部维持了一个state的整数记录状态值,并提供了acquireXXX和release方法用于获取和释放锁(共享锁)。当state的值小于acquireXXX时,线程会进入AQS的等待队列,处于阻塞状态; release方法被时,state属性会增加,如果大于0,则从等待队列中唤醒一个。

如下案例中,Semaphore创建时,state设置为0,当客户端ClientA调用acquire方法获取锁时,进入Semaphore的等待队列处于阻塞状态;
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7H3sF8Ns-1717209303379)(C:\Users\0216001379\AppData\Roaming\Typora\typora-user-images\1716255100424.png)]
当客户端ClientB调用release方法释放锁时(本质是对state值进行加法运算),此时Semaphore会自动唤醒处于等待队列中的ClientA.

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-n602PIUM-1717209303380)(C:\Users\0216001379\AppData\Roaming\Typora\typora-user-images\1716255109685.png)]
客户端A唤醒时,会再次尝试获取锁,此时Semaphore拥有共享锁的数量为1,与acquire方法获取数量相同(默认获取1个),获取锁正常。
在这里插入图片描述接下来,根据如下案例进行源码介绍:

public static void main(String[] args) {RedissonClient client = getClient();RLock lock = client.getLock("myLock");System.out.println("Lock1 Begin exec");lock.lock();try {System.out.println("Lock1 Begin...");Thread.sleep(1000 * 60);System.out.println("Lock1 End.");} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}client.shutdown();
}

5.1 获取锁

源码入口lock.lock()->lock(-1, null, false)

说明:为减少代码重复度,会提取公共部分代码形成模板方法,模板方法相对于提取前的方法因存在扩展逻辑,导致可读性降低(虽然整体可维护性提升)。如lock(long leaseTime, TimeUnit unit, boolean interruptibly)方法支持响应中断与忽略中断两种情况,支持设置超时时间与不设置超时时间两种情况。

说明:是否响应中断(即被中断时抛出异常或者不抛出异常)不影响解析主线逻辑,在介绍源码时,认为interruptibly为false, 对interruptibly=true分为不进行说明。

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {long threadId = Thread.currentThread().getId();// part-1.执行lua脚本获取锁Long ttl = tryAcquire(-1, leaseTime, unit, threadId);if (ttl == null) {return;}// part-2.向Redis发布订阅请求RFuture<RedissonLockEntry> future = subscribe(threadId);commandExecutor.syncSubscription(future);try {// part-3.while死循环中获取锁while (true) {ttl = tryAcquire(-1, leaseTime, unit, threadId);if (ttl == null) {break;}if (ttl >= 0) {try {future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {future.getNow().getLatch().acquireUninterruptibly();}}} finally {// part-4.获取锁成功后,取消订阅unsubscribe(future, threadId);}
}

上述lock方法的主体逻辑可以分为4个部分:
[1] part-1:执行Lua脚本尝试获取锁,获取锁成功,则直接返回;
[2] part-2: 获取锁失败后,向Redis发布订阅;
[3] part-3: while死循环,获取锁或者抛出异常后退出循环;
[4] part-4: 退出while循环(获取锁成功或抛出异常),向Redis发送取消订阅消息;
整体流程比较清晰,从逻辑上可以切分为两个部分:获取锁成功场景和获取锁失败场景。

5.1.1 获取锁成功:

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {long threadId = Thread.currentThread().getId();// part-1.执行lua脚本获取锁Long ttl = tryAcquire(-1, leaseTime, unit, threadId);if (ttl == null) {return;}// Ignore ...
}

tryAcquire(-1, leaseTime, unit, threadId)返回null时,获取锁成功,退出lock方法。进入tryAcquire方法:

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

其中tryAcquireAsync返回一个Future对象,get方法阻塞等待(通过Future的await方法)该Future执行完成并返回结果或者抛出异常(包装后的RedisException)。进入tryAcquireAsync方法:

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {// 1.尝试从Redis获取锁,返回一个异步的Future对象RFuture<Long> ttlRemainingFuture;if (leaseTime != -1) {ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}// 2.在Future对象添加回调函数,完成时回调ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}// lock acquiredif (ttlRemaining == null) {if (leaseTime != -1) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {// 3.通过看门狗对锁进行自动续期scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;
}

tryAcquireAsync方法整体逻辑较为简单:尝试从Redis获取锁,返回一个异步的Future对象;然后在Future对象添加回调逻辑,在Future完成时回调。

tryAcquireAsync方法仍然是个模板方法,支持设置过期时间和不设置过期时间:

[1]不设置时间: 向Redis申请锁时携带看门狗的超时时间(在3.Redission用法章节中通过Config对象的setLockWatchdogTimeout方法设置),之后通过看门狗续期。

[2]设置过期时间: 向Redis申请锁时携带执行的超时时间,超时后自动释放锁,因此不需要看门狗。

这里有两个重点方法需要关注一下:tryLockInnerAsync向Redis申请锁和scheduleExpirationRenewal开启看门狗。

tryLockInnerAsync方法就是将Lua脚本的执行包装为异步执行,返回一个Future对象:

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

这里的lua脚本在2.1章节已进行结合,不再赘述。

scheduleExpirationRenewal功能是开启一个看门狗线程,定期(1/3的看门狗超时时间)向Redis续期,主线逻辑如下所示:

protected void scheduleExpirationRenewal(long threadId) {//...// scheduleExpirationRenewal的核心逻辑在于调用renewExpirationrenewExpiration();//...
}private void renewExpiration() {//...Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {          //...// 调用lua脚本-为锁续期RFuture<Boolean> future = renewExpirationAsync(threadId);future.onComplete((res, e) -> {   //...// 每internalLockLeaseTime/3时间,回调自身,开始循环renewExpiration();   //...                    });}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);//...
}// 对lua脚本异步执行的封装,与2.4中介绍的lua脚本相同
protected RFuture<Boolean> renewExpirationAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.singletonList(getRawName()),internalLockLeaseTime, getLockName(threadId));
}

5.1.2 获取锁失败:

当锁已被其他客户端占有,获取锁失败,进入订阅-阻塞等待锁释放流程:

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {//...// part-2.向Redis发布订阅请求RFuture<RedissonLockEntry> future = subscribe(threadId);commandExecutor.syncSubscription(future);try {// part-3.while死循环中获取锁while (true) {ttl = tryAcquire(-1, leaseTime, unit, threadId);if (ttl == null) {break;}if (ttl >= 0) {try {future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {future.getNow().getLatch().acquireUninterruptibly();}}} finally {// part-4.获取锁成功后,取消订阅unsubscribe(future, threadId);}
}

[1] 先看一下向Redis发布订阅请求部分:

// 向lua发送订阅消息,并注册一个监听器监听订阅频道的通知消息
RFuture<RedissonLockEntry> future = subscribe(threadId);
// 与前文结合的get方法逻辑相似,阻塞等待Future执行完成,即等待订阅消息发送给Redis并说道订阅成功消息
commandExecutor.syncSubscription(future);

重点在于第一个方法subscribe(threadId):向Redis发送订阅消息,并注册一个监听器,监听通知消息。
向redis发送订阅消息, 消息内容如章节4.2.4中的SUBSCRIBE redisson_lock__channel:{myLock}, 该消息表示客户端订阅redisson_lock__channel:{myLock}频道, 当Redis服务器收到该频道的通知消息后,会以Message类型的消息通知给订阅的客户端,消息内容为:message redisson_lock__channel:{myLock} 0.
当Redssion客户端收到message redisson_lock__channel:{myLock} 0后,监听器被调用,监听器的注册和监听器的内容后面介绍。
[2] 接着进入while死循环,只有抛出异常或者获取锁成功才会退出:

while (true) {// tryAcquire前文已介绍:获取锁成功返回null, 否则返回锁的超时时间ttl = tryAcquire(-1, leaseTime, unit, threadId);if (ttl == null) {break;}if (ttl >= 0) {try {// 根据锁的超时时间,定时阻塞等待锁,超时后,自动苏醒future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {// 当锁没有设置超时时间时,阻塞等待,直到被唤醒future.getNow().getLatch().acquireUninterruptibly();}
}

这里的future.getNow().getLatch()返回的是一个Semaphore对象,初始化时state设置为0,因此调用acquire方法会陷入等待队列。唤醒逻辑在5.3 订阅和通知章节中介绍。
[3] unsubscribe(future, threadId)用于取消订阅,能进入finnally说明有异常抛出或者已经获取锁,从而不需要再监听redis的通知,unsubscribe核心是删除注册的监听器。

5.2 释放锁

Redssion提供的分布式锁支持可重入,因此多次获取需要多次释放。根据案例的lock.unlock()进入unlock方法:

public void unlock() {try {get(unlockAsync(Thread.currentThread().getId()));} catch (RedisException e) {if (e.getCause() instanceof IllegalMonitorStateException) {throw (IllegalMonitorStateException) e.getCause();} else {throw e;}}
}

get是5.1章中结合过,这里直接进入unlockAsync方法,主线逻辑如下:

public RFuture<Void> unlockAsync(long threadId) {RPromise<Void> result = new RedissonPromise<>();//  向Redis发送解锁消息RFuture<Boolean> future = unlockInnerAsync(threadId);future.onComplete((opStatus, e) -> {// 关闭看门狗cancelExpirationRenewal(threadId);//...});return result;
}

核心逻辑在于unlockInnerAsync:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end; " +"return nil;",Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

这里的lua脚本用于释放一层锁,如果释放完一层锁后,锁的数量为0,则删除对应的key(释放锁), 此时还会发布一条消息通知Redis,可参考2.3 释放一层锁

5.3 订阅和通知

章节5.2中介绍了客户端获取锁失败后Redis订阅,然后进入等待队列阻塞;在章节5.3中介绍了释放锁以及向Redis发布通知消息,本章节的内容是将二者衔接起来。

订阅和通知的核心功能是:阻塞在等待队列中的客户端将会因为通知消息而被唤醒。

Redis通知Redission:

客户端与Redis服务器底层的通讯是基于TCP链,Redssion使用Netty进行了封装,在pipeline中添加了CommandPubSubDecoder解码器,该解码器中存在如下逻辑:

protected void decodeResult(CommandData<Object, Object> data, List<Object> parts, Channel channel,Object result) throws IOException {//...if (result instanceof Message) {//...if (result instanceof PubSubMessage) {pubSubConnection.onMessage((PubSubMessage) result);}//...}// ...
}

当接收到Message且是PubSubMessage类型的消息(即前文介绍的message redisson_lock__channel:{myLock} 0消息)时,调用pubSubConnection.onMessage((PubSubMessage) result),进入该方法:

public void onMessage(PubSubMessage message) {for (RedisPubSubListener<Object> redisPubSubListener : listeners) {redisPubSubListener.onMessage(message.getChannel(), message.getValue());}
}

这里会调用注册的监听器,包括5.1.2 获取锁失败章节中注册的监听器。

Redission注册监听器:
继续看一下5.1.2 获取锁失败章节RFuture<RedissonLockEntry> future = subscribe(threadId)逻辑:

protected RFuture<RedissonLockEntry> subscribe(long threadId) {return pubSub.subscribe(getEntryName(), getChannelName());
}public RFuture<E> subscribe(String entryName, String channelName) {//...RedisPubSubListener<Object> listener = createListener(channelName, value);service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);//...
}

通过createListener创建监听器,然后将监听器注册到RedisPubSubConnection对象的listeners属性中:

public class RedisPubSubConnection extends RedisConnection {// 监听器容器final Queue<RedisPubSubListener<Object>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<Object>>();// 添加监听器的方法public void addListener(RedisPubSubListener<?> listener) {listeners.add((RedisPubSubListener<Object>) listener);}
}

至于创建listener后,如何调用RedisPubSubConnection的addListener方法可通过代码追踪和Bebug进行了解,不是重点内容;这里重点关注的是这个监听器的内部逻辑,即当这个监听器被调用时触发的逻辑:

private RedisPubSubListener<Object> createListener(String channelName, E value) {RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {@Overridepublic void onMessage(CharSequence channel, Object message) {if (!channelName.equals(channel.toString())) {return;}PublishSubscribe.this.onMessage(value, (Long) message);}//...};return listener;
}

channelName.equals(channel.toString())用于判断收到的消息是否是订阅的消息,如前文介绍的订阅的频道是redisson_lock__channel:{myLock}, 此时会校验channelName。
核心逻辑在于PublishSubscribe.this.onMessage(value, (Long) message):

protected void onMessage(RedissonLockEntry value, Long message) {//...value.getLatch().release();//...
}

value.getLatch()获取的是前文介绍的Semaphore对象,调用release时会修改state值,并唤醒一个等待的线程。
基于上述介绍:线程获取分布式锁失败后,向Redis订阅消息,并注册监听器,然后陷入Semaphore的等待队列;当其他客户端释放锁时,同时会发布通知消息给Redis服务器;Redis服务器收到消息后,向订阅的客户端发送通知消息;客户端收到通知消息后,触发监听器逻辑,监听器基于Semaphore机制,唤醒阻塞的线程。线程被唤醒后再次尝试获取分布式锁。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/25458.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SPI通信协议

SPI通信结介绍 W25Q64是一个Flash存储器芯片&#xff0c;内部可以存储8M字节的数据&#xff0c;并且是掉电不丢失的。 四根通信线&#xff1a;SCK&#xff08;Serial Clock&#xff09;串行时钟线、MOSI&#xff08;Master Output Slave Input&#xff09;主机输出从机输入、M…

【十大排序算法】快速排序

在乱序的世界中&#xff0c;快速排序如同一位智慧的园丁&#xff0c; 以轻盈的手法&#xff0c;将无序的花朵们重新安排&#xff0c; 在每一次比较中&#xff0c;沐浴着理性的阳光&#xff0c; 终使它们在有序的花园里&#xff0c;开出绚烂的芬芳。 文章目录 一、快速排序二、…

profile-3d-contrib,github三维立体图的使用

图片展示: 提示: 这个profile-3d-contrib存储库有时候会出现问题,导致又有使用这个存储库svg的用户显示出现问题. 参考: https://zhuanlan.zhihu.com/p/681786778 原仓库链接&#xff1a; GitHub - yoshi389111/github-profile-3d-contrib: This GitHub Action creates a Gi…

【算法刷题 | 动态规划08】6.9(单词拆分、打家劫舍、打家劫舍||)

文章目录 21.单词拆分21.1题目21.2解法&#xff1a;动规21.2.1动规思路21.2.2代码实现 22.打家劫舍22.1题目22.2解法&#xff1a;动规22.2.1动规思路22.2.2代码实现 23.打家劫舍||23.1题目23.2解法&#xff1a;动规23.2.1动规思路23.2.2代码实现 21.单词拆分 21.1题目 给你一…

java中的异常-异常处理(try、catch、finally、throw、throws)+自定义异常

一、概述 1、java程序员在编写程序时提前编写好对异常的处理程序&#xff0c;在程序发生异常时就可以执行预先设定好的处理程序&#xff0c;处理程序执行完之后&#xff0c;可以继续向后执行后面的程序 2、异常处理程序是在程序执行出现异常时才执行的 二、5个关键字 1、tr…

Redis实战篇02

1.分布式锁Redisson 简单介绍&#xff1a; 使用setnx可能会出现的极端问题&#xff1a; Redisson的简介&#xff1a; 简单的使用&#xff1a; 业务代码的改造&#xff1a; private void handleVoucherOrder(VoucherOrder voucherOrder) {Long userId voucherOrder.getUserI…

2024真机项目

项目需求&#xff1a; 1. 172.25.250.101 主机上的 Web 服务要求提供 www.exam.com 加密站点&#xff0c;该站点在任何路由可达 的主机上被访问&#xff0c;页面内容显示为 "Hello&#xff0c;Welcome to www.exam.com !"&#xff0c;并提供 content.exam.com/yum/A…

数据:人工智能的基石 | Scale AI 创始人兼 CEO 亚历山大·王的创业故事与行业洞见

引言 在人工智能领域&#xff0c;数据被誉为“新石油”&#xff0c;其重要性不言而喻。随着GPT-4的问世&#xff0c;AI技术迎来了新的浪潮。众多年轻创业者纷纷投身这一领域&#xff0c;Scale AI的创始人兼CEO亚历山大王&#xff08;Alexander Wang&#xff09;就是其中的佼佼…

什么是Java?

什么是Java&#xff1f;java是什么&#xff1f;下面我们来总结一下。 java是什么&#xff1f; java是一个静态编程语言&#xff0c;具有强大的多线程特征&#xff0c;目前java不仅采用c语言的优点&#xff0c;还去掉了一些多继承指针&#xff0c;等复杂的概念&#xff0c;我们…

Git配置 安装及使用

团队开发的神 找工作必备 环境变量 配置好环境后 打开终端环境 winr cmd 我习惯在桌面打开&#xff0c;然后进入相应的文件夹 &#xff08;文件夹结构&#xff09; &#xff08;个人感觉能用cmd不用git&#xff0c;cmd更好用一些&#xff09; 进入对应的文件夹 填写自己对…

docker安装rabbitmq详解

目录 1、安装 1-1.查看rabbitmq镜像 1-2.下载Rabbitmq的镜像 1-3.创建并运行rabbitmq容器 1-4.查看启动情况 1-5.启动web客户端 1-6.访问rabbitmq的客户端 2..遇到的问题 解决方法: 1、安装 1-1.查看rabbitmq镜像 docker search rabbitmq 1-2.下载Rabbitmq的镜像 拉…

国标GB/T 28181详解:校时流程详细说明

目录 一、定义 二、作用 1. 时间同步性 2. 事件记录的准确性 3. 跨平台、跨设备协作 4. 降低时间误差 5. 安全性提升 三、基本要求 四、命令流程 五、协议接口 六、校时效果 1、未校时的情况 2、校时后的效果 七、参考 一、定义 GB28181协议要求所有的监控设…

python后端结合uniapp与uview组件tabs,实现自定义导航按钮与小标签颜色控制

实现效果&#xff08;红框内&#xff09;&#xff1a; 后端api如下&#xff1a; task_api.route(/user/task/states_list, methods[POST, GET]) visitor_token_required def task_states(user):name_list [待接单, 设计中, 交付中, 已完成, 全部]data []color [#F04864, …

CPP初阶:CPP的内存管理模式

目录 一.new和delete操作自定义类型 1.1C语言的内存管理 1.2CPP的内存管理方式 1.3C与CPP内存管理的差异 二.operator new和operator delete函数 三.CPP空间操作符使用深化 3.1 连续内存开辟与释放 3.2 非连续内存开辟与释放 四.new和delete的实现原理 4.1内置类型 4.2…

100道面试必会算法-32-二叉树右视图用栈实现队列

100道面试必会算法-32-二叉树右视图&用栈实现队列 给定一个二叉树的 根节点 root&#xff0c;想象自己站在它的右侧&#xff0c;按照从顶部到底部的顺序&#xff0c;返回从右侧所能看到的节点值。 示例 1: 输入: [1,2,3,null,5,null,4] 输出: [1,3,4]示例 2: 输入: [1,n…

【内网攻防实战】红日靶场(一)续篇_金票与银票

红日靶场&#xff08;一&#xff09;续篇_权限维持 前情提要当前位置执行目标 PsExec.exe拿下域控2008rdesktop 远程登录win7msf上传文件kail回连马连上win7upload上传PsExec.exe PsExec.exe把win7 带到 2008&#xff08;域控hostname&#xff1a;owa)2008开远程、关防火墙Win7…

OpenCV绘制直线

一 绘制图形 画线 画矩形 画圆 画椭圆 画多边形 绘制字体 二 画线 line(img,开始点&#xff0c;结束点&#xff0c;颜色…) 参数结束 img&#xff1a;在那个图像上画线 开始点,结束点&#xff1a;指定线的开始与结束位置&#xff1b; 颜色&#xff0c;线宽&#xff0c;线体…

Linux系统编程(十二)线程同步、锁、条件变量、信号量

线程同步&#xff1a; 协同步调&#xff0c;对公共区域数据按序访问。防止数据混乱&#xff0c;产生与时间有关的错误。数据混乱的原因 一、互斥锁/互斥量mutex 1. 建议锁&#xff08;协同锁&#xff09;&#xff1a; 公共数据进行保护。所有线程【应该】在访问公共数据前先拿…

文心一言 VS 讯飞星火 VS chatgpt (277)-- 算法导论20.3 4题

四、如果调用 vEB-TREE-INSERT 来插入一个已包含在 vEB 树中的元素&#xff0c;会出现什么情况&#xff1f;如果调用 vEB-TREE-DELETE 来删除一个不包含在 vEB 树中的元素&#xff0c;会出现什么情况&#xff1f;解释这些函数为什么有相应的运行状况&#xff1f;怎样修改 vEB 树…

vs - vs2015编译gtest-v1.12.1

文章目录 vs - vs2015编译gtest-v1.12.1概述点评笔记将工程迁出到本地后&#xff0c;如果已经编译过工程&#xff0c;将工程Revert, Clean up 干净。编译用的CMake, 优先用VS2019自带的打开VS2015X64本地命令行编译gtest工程测试安装自己写个测试工程&#xff0c;看看编译出来的…