1、看门狗的流程图
2、看门狗的代码实现
/****类说明:Redis的key-value结构*/
public class LockItem {private final String key;private final String value;public LockItem(String key, String value) {this.key = key;this.value = value;}public String getKey() {return key;}public String getValue() {return value;}
}
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/****类说明:存放到延迟队列的元素,比标准的delay的实现要提前一点时间*/
public class ItemVo<T> implements Delayed{/*到期时刻 20:00:35,234*/private long activeTime;/*业务数据,泛型*/private T data;/*传入的数值代表过期的时长,单位毫秒,需要乘1000转换为毫秒和到期时间* 同时提前100毫秒续期,具体的时间可以自己决定*/public ItemVo(long expirationTime, T data) {super();this.activeTime = expirationTime+System.currentTimeMillis()-100;this.data = data;}public long getActiveTime() {return activeTime;}public T getData() {return data;}/*** 返回元素到激活时刻的剩余时长*/public long getDelay(TimeUnit unit) {long d = unit.convert(this.activeTime- System.currentTimeMillis(),unit);return d;}/**按剩余时长排序*/public int compareTo(Delayed o) {long d = (getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS));if (d==0){return 0;}else{if (d<0){return -1;}else{return 1;}}}}
核心代码:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;import javax.annotation.PreDestroy;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;/*** 分布式锁,附带看门狗线程的实现:加锁,保持锁1秒*/
@Component
public class RedisDistLockWithDog implements Lock {//加锁的时间private final static int LOCK_TIME = 1*1000;private final static String LOCK_TIME_STR = String.valueOf(LOCK_TIME);//key的开头,用于标记是分布式锁使用private final static String RS_DISTLOCK_NS = "tdln2:";//释放锁的lua,释放的时候保持原子性private final static String RELEASE_LOCK_LUA ="if redis.call('get',KEYS[1])==ARGV[1] then\n" +" return redis.call('del', KEYS[1])\n" +" else return 0 end";/*还有并发问题,考虑ThreadLocal*/private ThreadLocal<String> lockerId = new ThreadLocal<>();private Thread ownerThread;private String lockName = "lock";@Autowiredprivate JedisPool jedisPool;public String getLockName() {return lockName;}public void setLockName(String lockName) {this.lockName = lockName;}public Thread getOwnerThread() {return ownerThread;}public void setOwnerThread(Thread ownerThread) {this.ownerThread = ownerThread;}//加锁的入口@Overridepublic void lock() {//自璇while(!tryLock()){try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}}@Overridepublic void lockInterruptibly() throws InterruptedException {throw new UnsupportedOperationException("不支持可中断获取锁!");}//具体的加锁逻辑@Overridepublic boolean tryLock() {Thread t=Thread.currentThread();/*说明本线程正在持有锁*/if(ownerThread==t) {return true;}else if(ownerThread!=null){/*说明本进程中有别的线程正在持有分布式锁*/return false;}Jedis jedis = null;try {jedis = jedisPool.getResource();/*每一个锁的持有人都分配一个唯一的id,也可采用snowflake算法*/String id = UUID.randomUUID().toString();SetParams params = new SetParams();params.px(LOCK_TIME); //加锁时间1sparams.nx();//synchronized是为了防止本地多个线程争抢synchronized (this){//加锁if ((ownerThread==null)&&"OK".equals(jedis.set(RS_DISTLOCK_NS+lockName,id,params))) {lockerId.set(id);setOwnerThread(t);if(expireThread == null){//看门狗线程启动expireThread = new Thread(new ExpireTask(),"expireThread");expireThread.setDaemon(true);//设置为守护线程expireThread.start();}//往延迟阻塞队列中加入元素(让看门口可以在过期之前一点点的时间去做锁的续期)delayDog.add(new ItemVo<>((int)LOCK_TIME,new LockItem(lockName,id)));System.out.println(Thread.currentThread().getName()+"已获得锁----");return true;}else{System.out.println(Thread.currentThread().getName()+"无法获得锁----");return false;}}} catch (Exception e) {throw new RuntimeException("分布式锁尝试加锁失败!",e);} finally {jedis.close();}}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {throw new UnsupportedOperationException("不支持等待尝试获取锁!");}@Overridepublic void unlock() {if(ownerThread!=Thread.currentThread()) {throw new RuntimeException("试图释放无所有权的锁!");}Jedis jedis = null;try {jedis = jedisPool.getResource();Long result = (Long)jedis.eval(RELEASE_LOCK_LUA,Arrays.asList(RS_DISTLOCK_NS+lockName),Arrays.asList(lockerId.get()));System.out.println(result);if(result.longValue()!=0L){System.out.println("Redis上的锁已释放!");}else{System.out.println("Redis上的锁释放失败!");}} catch (Exception e) {throw new RuntimeException("释放锁失败!",e);} finally {if(jedis!=null) jedis.close();lockerId.remove();setOwnerThread(null);}}@Overridepublic Condition newCondition() {throw new UnsupportedOperationException("不支持等待通知操作!");}/*看门狗线程*/private Thread expireThread;//通过delayDog 避免无谓的轮询,减少看门狗线程的轮序次数 阻塞延迟队列 刷1 没有刷2private static DelayQueue<ItemVo<LockItem>> delayDog = new DelayQueue<>();//续锁逻辑:判断是持有锁的线程才能续锁private final static String DELAY_LOCK_LUA ="if redis.call('get',KEYS[1])==ARGV[1] then\n" +" return redis.call('pexpire', KEYS[1],ARGV[2])\n" +" else return 0 end";private class ExpireTask implements Runnable{@Overridepublic void run() {System.out.println("看门狗线程已启动......");while(!Thread.currentThread().isInterrupted()) {try {LockItem lockItem = delayDog.take().getData();//只有元素快到期了才能take到 0.9sJedis jedis = null;try {jedis = jedisPool.getResource();Long result = (Long)jedis.eval(DELAY_LOCK_LUA,Arrays.asList(RS_DISTLOCK_NS+lockItem.getKey ()),Arrays.asList(lockItem.getValue(),LOCK_TIME_STR));if(result.longValue()==0L){System.out.println("Redis上的锁已释放,无需续期!");}else{delayDog.add(new ItemVo<>((int)LOCK_TIME,new LockItem(lockItem.getKey(),lockItem.getValue())));System.out.println("Redis上的锁已续期:"+LOCK_TIME);}} catch (Exception e) {throw new RuntimeException("锁续期失败!",e);} finally {if(jedis!=null) jedis.close();}} catch (InterruptedException e) {System.out.println("看门狗线程被中断");break;}}System.out.println("看门狗线程准备关闭......");}}// @PostConstruct
// public void initExpireThread(){
//
// }@PreDestroypublic void closeExpireThread(){if(null!=expireThread){expireThread.interrupt();}}
}
测试:
@SpringBootTest
public class TestRedisDistLockWithDog {@Autowiredprivate RedisDistLockWithDog redisDistLockWithDog;private int count = 0;@Testpublic void testLockWithDog() throws InterruptedException {int clientCount =3;CountDownLatch countDownLatch = new CountDownLatch(clientCount);ExecutorService executorService = Executors.newFixedThreadPool(clientCount);for (int i = 0;i<clientCount;i++){executorService.execute(() -> {try {redisDistLockWithDog.lock(); //锁的有效时间1秒System.out.println(Thread.currentThread().getName()+"准备进行累加。");Thread.sleep(2000);count++;} catch (InterruptedException e) {e.printStackTrace();} finally {redisDistLockWithDog.unlock();}countDownLatch.countDown();});}countDownLatch.await();System.out.println(count);}@Testpublic void testTryLock2() {int clientCount =1000;for (int i = 0;i<clientCount;i++) {if (redisDistLockWithDog.tryLock()) {System.out.println("已获得锁!");redisDistLockWithDog.unlock();} else {System.out.println("未能获得锁!");}}}}