Redisson
源码研究
主要分析RedissonLock
@Override
public boolean tryLock(long waitTime/** 等待时间**/, long leaseTime/** TTL时间 **/, TimeUnit unit/** 时间单位 **/) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}current = System.currentTimeMillis();CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);try {subscribeFuture.get(time, TimeUnit.MILLISECONDS);} catch (TimeoutException e) {if (!subscribeFuture.completeExceptionally(new RedisTimeoutException("Unable to acquire subscription lock after " + time + "ms. " +"Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {subscribeFuture.whenComplete((res, ex) -> {if (ex == null) {unsubscribe(res, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;} catch (ExecutionException e) {acquireFailed(waitTime, unit, threadId);return false;}try {time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}while (true) {long currentTime = System.currentTimeMillis();ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// waiting for messagecurrentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) {commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);}
}
关键代码Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync0(waitTime, leaseTime, unit, threadId));
}
private RFuture<Long> tryAcquireAsync0(long waitTime, long leaseTime, TimeUnit unit, long threadId) {return getServiceManager().execute(() -> tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;if (leaseTime > 0) {ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);ttlRemainingFuture = new CompletableFutureWrapper<>(s);CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// lock acquiredif (ttlRemaining == null) {if (leaseTime > 0) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);
}
再往下看lua
脚本eval
命令保证了对redis
操作的原子性=setex key expireTime value
若leaseTime>0
,则自动超时过期
若leaseTime<=0
,则是redisson
的看门狗机制,后端定时任务每隔10s
看下缓存是否过期,若存在自动设置TTL为30s
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {// lua evalreturn commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command,"if ((redis.call('exists', KEYS[1]) == 0) " +"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// lock acquiredif (ttlRemaining == null) {if (leaseTime > 0) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {// leaseTime <= 0 定时续期scheduleExpirationRenewal(threadId);}}return ttlRemaining;
});
protected void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);try {// 核心代码renewExpiration();} finally {if (Thread.currentThread().isInterrupted()) {cancelExpirationRenewal(threadId);}}}
}
private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}Timeout task = getServiceManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}// 关键代码 CompletionStage<Boolean> future = renewExpirationAsync(threadId);future.whenComplete((res, e) -> {if (e != null) {log.error("Can't update lock {} expiration", getRawName(), e);EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}if (res) {// reschedule itselfrenewExpiration();} else {cancelExpirationRenewal(null);}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);}
internalLockLeaseTime
主要这个时间是多少我们再细看下源码
org.redisson.config.Config
定义了private long lockWatchdogTimeout = 30 * 1000;
也就是上述说的每隔10s触发一次, internalLockLeaseTime=30s
,自动续期30s
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.singletonList(getRawName()),internalLockLeaseTime, getLockName(threadId));
}