java 单例 读写锁_终极锁实战:单JVM锁+分布式锁

目录

1.前言

2.单JVM锁

3.分布式锁

4.总结

=========正文分割线=================

1.前言

锁就像一把钥匙,需要加锁的代码就像一个房间。出现互斥操作的典型场景:多人同时想进同一个房间争抢这个房间的钥匙(只有一把),一人抢到钥匙,其他人都等待这个人出来归还钥匙,此时大家再次争抢钥匙循环下去。

作为终极实战系列,本篇用java语言分析锁的原理(源码剖析)和应用(详细代码),根据锁的作用范围分为:JVM锁和分布式锁。如理解有误之处,还请指出。

2.单JVM锁(进程级别)

程序部署在一台服务器上,当容器启动时(例如tomcat),一台JVM就运行起来了。本节分析的锁均只能在单JVM下生效。因为最终锁定的是某个对象,这个对象生存在JVM中,自然锁只能锁单JVM。这一点很重要。如果你的服务只部署一个实例,那么恭喜你,用以下几种锁就可以了。

1.synchronized同步锁

2.ReentrantLock重入锁

3.ReadWriteLock读写锁

4.StampedLock戳锁

3.分布式锁(多服务节点,多进程)

3.1基于数据库锁实现

场景举例:

卖商品,先查询库存>0,更新库存-1。

1.悲观锁:select for update(一致性锁定读)

112508079_1_20171003105710826.png

查询官方文档如上图,事务内起作用的行锁。能够保证当前session事务所锁定的行不会被其他session所修改(这里的修改指更新或者删除)。对读取的记录加X锁,即排它锁,其他事不能对上锁的行加任何锁。

BEGIN;(确保以下2步骤在一个事务中:)

SELECT * FROM tb_product_stock WHERE product_id=1 FOR UPDATE--->product_id有索引,锁行.加锁(注:条件字段必须有索引才能锁行,否则锁表,且最好用explain查看一下是否使用了索引,因为有一些会被优化掉最终没有使用索引)

UPDATE tb_product_stock SET number=number-1 WHERE product_id=1--->更新库存-1.解锁

COMMIT;

2.乐观锁:版本控制,选一个字段作为版本控制字段,更新前查询一次,更新时该字段作为更新条件。不同业务场景,版本控制字段,可以0 1控制,也可以+1控制,也可以-1控制,这个随意。

BEGIN;(确保以下2步骤在一个事务中:)

SELECT number FROM tb_product_stock WHERE product_id=1--》查询库存总数,不加锁

UPDATE tb_product_stock SET number=number-1 WHERE product_id=1 AND number=第一步查询到的库存数--》number字段作为版本控制字段

COMMIT;

3.2基于缓存实现(redis,memcached)

原理:

redisson开源jar包,提供了很多功能,其中就包含分布式锁。是Redis官方推荐的顶级项目,官网飞机票

核心org.redisson.api.RLock接口封装了分布式锁的获取和释放。源码如下:

112508079_2_20171003105711341.gif

1 @Override

2 public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {

3 long time = unit.toMillis(waitTime);

4 long current = System.currentTimeMillis();

5 final long threadId = Thread.currentThread().getId();

6 Long ttl = tryAcquire(leaseTime, unit, threadId);//申请锁,返回还剩余的锁过期时间7 //lock acquired

8 if (ttl == null) {

9 return true;

10 }

11

12 time -= (System.currentTimeMillis() - current);

13 if (time <= 0) {

14 acquireFailed(threadId);

15 return false;

16 }

17

18 current = System.currentTimeMillis();

19 final RFuture subscribeFuture = subscribe(threadId);

20 if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {

21 if (!subscribeFuture.cancel(false)) {

22 subscribeFuture.addListener(new FutureListener() {

23 @Override

24 public void operationComplete(Future future) throws Exception {

25 if (subscribeFuture.isSuccess()) {

26 unsubscribe(subscribeFuture, threadId);

27 }

28 }

29 });

30 }

31 acquireFailed(threadId);

32 return false;

33 }

34

35 try {

36 time -= (System.currentTimeMillis() - current);

37 if (time <= 0) {

38 acquireFailed(threadId);

39 return false;

40 }

41

42 while (true) {

43 long currentTime = System.currentTimeMillis();

44 ttl = tryAcquire(leaseTime, unit, threadId);

45 //lock acquired

46 if (ttl == null) {

47 return true;

48 }

49

50 time -= (System.currentTimeMillis() - currentTime);

51 if (time <= 0) {

52 acquireFailed(threadId);

53 return false;

54 }

55

56 //waiting for message

57 currentTime = System.currentTimeMillis();

58 if (ttl >= 0 && ttl < time) {

59 getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);

60 } else {

61 getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);

62 }

63

64 time -= (System.currentTimeMillis() - currentTime);

65 if (time <= 0) {

66 acquireFailed(threadId);

67 return false;

68 }

69 }

70 } finally {

71 unsubscribe(subscribeFuture, threadId);

72 }

73 //return get(tryLockAsync(waitTime, leaseTime, unit));

74 }

112508079_2_20171003105711341.gif

上述方法,调用加锁的逻辑就是在tryAcquire(leaseTime, unit, threadId)中,如下图:

1 private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {

2 return get(tryAcquireAsync(leaseTime, unit, threadId));//tryAcquireAsync返回RFutrue

3 }

tryAcquireAsync中commandExecutor.evalWriteAsync就是咱们加锁核心方法了

112508079_2_20171003105711341.gif

1 RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {

2 internalLockLeaseTime = unit.toMillis(leaseTime);

3

4 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,

5 "if (redis.call('exists', KEYS[1]) == 0) then " +

6 "redis.call('hset', KEYS[1], ARGV[2], 1); " +

7 "redis.call('pexpire', KEYS[1], ARGV[1]); " +

8 "return nil; " +

9 "end; " +

10 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +

11 "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +

12 "redis.call('pexpire', KEYS[1], ARGV[1]); " +

13 "return nil; " +

14 "end; " +

15 "return redis.call('pttl', KEYS[1]);",

16 Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

17 }

112508079_2_20171003105711341.gif

如上图,已经到了redis命令了

加锁:

KEYS[1] :需要加锁的key,这里需要是字符串类型。

ARGV[1] :锁的超时时间,防止死锁

ARGV[2] :锁的唯一标识,(UUID.randomUUID()) + “:” + threadId

112508079_2_20171003105711341.gif

1 //检查是否key已经被占用,如果没有则设置超时时间和唯一标识,初始化value=1

2 if (redis.call('exists', KEYS[1]) == 0)

3 then

4 redis.call('hset', KEYS[1], ARGV[2], 1); //hset key field value 哈希数据结构5 redis.call('pexpire', KEYS[1], ARGV[1]); //pexpire key expireTime 设置有效时间

6 return nil;

7 end;

8 //如果锁重入,需要判断锁的key field 都一直情况下 value 加一

9 if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)

10 then

11 redis.call('hincrby', KEYS[1], ARGV[2], 1);//hincrby key filed addValue 加112 redis.call('pexpire', KEYS[1], ARGV[1]);//pexpire key expireTime重新设置超时时间

13 return nil;

14 end;

15 //返回剩余的过期时间

16 return redis.call('pttl', KEYS[1]);

112508079_2_20171003105711341.gif

以上的方法,当返回空是,说明获取到锁,如果返回一个long数值(pttl 命令的返回值),说明锁已被占用,通过返回剩余时间,外部可以做一些等待时间的判断和调整。

不再分析解锁步骤,直接贴上解锁的redis 命令

解锁:

– KEYS[1] :需要加锁的key,这里需要是字符串类型。

– KEYS[2] :redis消息的ChannelName,一个分布式锁对应唯一的一个channelName:“redisson_lock__channel__{” + getName() + “}”

– ARGV[1] :reids消息体,这里只需要一个字节的标记就可以,主要标记redis的key已经解锁,再结合redis的Subscribe,能唤醒其他订阅解锁消息的客户端线程申请锁。

– ARGV[2] :锁的超时时间,防止死锁

– ARGV[3] :锁的唯一标识,(UUID.randomUUID()) + “:” + threadId

112508079_2_20171003105711341.gif

1 //如果key已经不存在,说明已经被解锁,直接发布(publihs)redis消息

2 if (redis.call('exists', KEYS[1]) == 0)

3 then

4 redis.call('publish', KEYS[2], ARGV[1]);//publish ChannelName message向信道发送解锁消息5 return 1;

6 end;

7 //key和field不匹配,说明当前客户端线程没有持有锁,不能主动解锁。

8 if (redis.call('hexists', KEYS[1], ARGV[3]) == 0)

9 then

10 return nil;

11 end;

12 //将value减1

13 local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); //hincrby key filed addValue 减1

14 //如果counter>0说明锁在重入,不能删除key

15 if (counter > 0)

16 then

17 redis.call('pexpire', KEYS[1], ARGV[2]);

18 return 0;

19 else

20 //删除key并且publish 解锁消息

21 redis.call('del', KEYS[1]);

22 redis.call('publish', KEYS[2], ARGV[1]);

23 return 1;

24 end;

25 return nil;

112508079_2_20171003105711341.gif

特点:

逻辑并不复杂, 实现了可重入功能, 通过pub/sub功能来减少空转,性能极高。

实现了Lock的大部分功能,支持强制解锁。

实战:

1.创建客户端配置类:

这里我们最终只用了一种来测试,就是initSingleServerConfig单例模式。

112508079_2_20171003105711341.gif

1 package distributed.lock.redis;

2

3 import org.redisson.config.Config;

4

5 /**

6 *7 * @ClassName:RedissionConfig8 * @Description:自定义RedissionConfig初始化方法9 * 支持自定义构造:单例模式,集群模式,主从模式,哨兵模式。10 * 注:此处使用spring bean 配置文件保证bean单例,见applicationContext-redis.xml11 * 大家也可以用工厂模式自己维护单例:本类生成RedissionConfig,再RedissonClient redisson = Redisson.create(config);这样就可以创建RedissonClient12 *@authordiandian.zhang13 * @date 2017年7月20日下午12:55:5014 */

15 public class RedissionConfig {

16 private RedissionConfig() {

17 }

18

19 public static Config initSingleServerConfig(String redisHost, String redisPort, String redisPassword) {

20 return initSingleServerConfig(redisHost, redisPort, redisPassword, 0);

21 }

22

23 /**

24 *25 * @Description 使用单例模式初始化构造Config26 *@paramredisHost27 *@paramredisPort28 *@paramredisPassword29 *@paramredisDatabase redis db 默认0 (0~15)有redis.conf配置文件中参数来控制数据库总数:database 16.30 *@return

31 *@authordiandian.zhang32 * @date 2017年7月20日下午12:56:2133 *@sinceJDK1.834 */

35 public static Config initSingleServerConfig(String redisHost, String redisPort, String redisPassword,Integer redisDatabase) {

36 Config config = new Config();

37 config.useSingleServer().setAddress(redisHost + ":" + redisPort)

38 .setPassword(redisPassword)

39 .setDatabase(redisDatabase);//可以不设置,看业务是否需要隔离40 //RedissonClient redisson = Redisson.create(config);

41 return config;

42 }

43

44 /**

45 *46 * @Description 集群模式47 *@parammasterAddress48 *@paramnodeAddressArray49 *@return

50 *@authordiandian.zhang51 * @date 2017年7月20日下午3:29:3252 *@sinceJDK1.853 */

54 public static Config initClusterServerConfig(String masterAddress, String[] nodeAddressArray) {

55 String nodeStr = "";

56 for(String slave:nodeAddressArray){

57 nodeStr +=","+slave;

58 }

59 Config config = new Config();

60 config.useClusterServers()

61 .setScanInterval(2000) //cluster state scan interval in milliseconds

62 .addNodeAddress(nodeStr);

63 return config;

64 }

65

66 /**

67 *68 * @Description 主从模式69 *@parammasterAddress 一主70 *@paramslaveAddressArray 多从71 *@return

72 *@authordiandian.zhang73 * @date 2017年7月20日下午2:29:3874 *@sinceJDK1.875 */

76 public static Config initMasterSlaveServerConfig(String masterAddress, String[] slaveAddressArray) {

77 String slaveStr = "";

78 for(String slave:slaveAddressArray){

79 slaveStr +=","+slave;

80 }

81 Config config = new Config();

82 config.useMasterSlaveServers()

83 .setMasterAddress(masterAddress)//一主

84 .addSlaveAddress(slaveStr);//多从"127.0.0.1:26389", "127.0.0.1:26379"

85 return config;

86 }

87

88 /**

89 *90 * @Description 哨兵模式91 *@parammasterAddress92 *@paramslaveAddressArray93 *@return

94 *@authordiandian.zhang95 * @date 2017年7月20日下午3:01:3596 *@sinceJDK1.897 */

98 public static Config initSentinelServerConfig(String masterAddress, String[] sentinelAddressArray) {

99 String sentinelStr = "";

100 for(String sentinel:sentinelAddressArray){

101 sentinelStr +=","+sentinel;

102 }

103 Config config = new Config();

104 config.useSentinelServers()

105 .setMasterName("mymaster")

106 .addSentinelAddress(sentinelStr);

107 return config;

108 }

109

110

111 }

112508079_2_20171003105711341.gif

2.分布式锁实现类

112508079_2_20171003105711341.gif

1 package distributed.lock.redis;

2

3

4

5 import java.text.SimpleDateFormat;

6 import java.util.Date;

7 import java.util.concurrent.CountDownLatch;

8 import java.util.concurrent.TimeUnit;

9

10 import org.redisson.Redisson;

11 import org.redisson.api.RLock;

12 import org.redisson.api.RedissonClient;

13 import org.slf4j.Logger;

14 import org.slf4j.LoggerFactory;

15

16

17 public class RedissonTest {

18 private static final Logger logger = LoggerFactory.getLogger(RedissonTest.class);

19 static SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

20 //这里可自定义多种模式,单例,集群,主从,哨兵模式。为了简单这里使用单例模式

21 private static RedissonClient redissonClient = Redisson.create(RedissionConfig.initSingleServerConfig("192.168.50.107", "6379", "password"));

22

23 public static void main(String[] args) {

24 CountDownLatch latch = new CountDownLatch(3);

25 //key

26 String lockKey = "testkey20170802";

27 try {

28 Thread t1 = new Thread(() -> {

29 doWithLock(lockKey,latch);//函数式编程

30 }, "t1");

31 Thread t2 = new Thread(() -> {

32 doWithLock(lockKey,latch);

33 }, "t2");

34 Thread t3 = new Thread(() -> {

35 doWithLock(lockKey,latch);

36 }, "t3");

37 //启动线程

38 t1.start();

39 t2.start();

40 t3.start();

41 //等待全部完成

42 latch.await();

43 System.out.println("3个线程都解锁完毕,关闭客户端!");

44 redissonClient.shutdown();

45 } catch (Exception e) {

46 e.printStackTrace();

47 }

48 }

49

50 /**

51 *52 * @Description 线程执行函数体53 *@paramlockKey54 *@authordiandian.zhang55 * @date 2017年8月2日下午3:37:3256 *@sinceJDK1.857 */

58 private static void doWithLock(String lockKey,CountDownLatch latch) {

59 try {

60 System.out.println("进入线程="+Thread.currentThread().getName()+":"+time.format(new Date()));

61 //获取锁,30秒内获取到返回true,未获取到返回false,60秒过后自动unLock

62 if (tryLock(lockKey, 30, 60, TimeUnit.SECONDS)) {

63 System.out.println(Thread.currentThread().getName() + " 获取锁成功!,执行需要加锁的任务"+time.format(new Date()));

64 Thread.sleep(2000L);//休息2秒模拟执行需要加锁的任务65 //获取锁超时

66 }else{

67 System.out.println(Thread.currentThread().getName() + " 获取锁超时!"+time.format(new Date()));

68 }

69 } catch (Exception e) {

70 e.printStackTrace();

71 } finally {

72 try {

73 //释放锁

74 unLock(lockKey);

75 latch.countDown();//完成,计数器减一

76 } catch (Exception e) {

77 e.printStackTrace();

78 }

79 }

80 }

81

82 /**

83 *84 * @Description 获取锁,锁waitTime时间内获取到返回true,未获取到返回false,租赁期leaseTime过后unLock(除非手动释放锁)85 *@paramkey86 *@paramwaitTime87 *@paramleaseTime88 *@paramtimeUnit89 *@return

90 *@authordiandian.zhang91 * @date 2017年8月2日下午3:24:0992 *@sinceJDK1.893 */

94 public static boolean tryLock(String key, long waitTime, long leaseTime, TimeUnit timeUnit) {

95 try {

96 //根据key获取锁实例,非公平锁

97 RLock lock = redissonClient.getLock(key);

98 //在leaseTime时间内阻塞获取锁,获取锁后持有锁直到leaseTime租期结束(除非手动unLock释放锁)。

99 return lock.tryLock(waitTime, leaseTime, timeUnit);

100 } catch (Exception e) {

101 logger.error("redis获取分布式锁异常;key=" + key + ",waitTime=" + waitTime + ",leaseTime=" + leaseTime +

102 ",timeUnit=" + timeUnit, e);

103 return false;

104 }

105 }

106

107 /**

108 *109 * @Description 释放锁110 *@paramkey111 *@authordiandian.zhang112 * @date 2017年8月2日下午3:25:34113 *@sinceJDK1.8114 */

115 public static void unLock(String key) {

116 RLock lock = redissonClient.getLock(key);

117 lock.unlock();

118 System.out.println(Thread.currentThread().getName() + " 释放锁"+time.format(new Date()));

119 }

120 }

112508079_2_20171003105711341.gif

执行结果如下:

112508079_2_20171003105711341.gif

1 进入线程=t3:2017-08-02 16:33:19

2 进入线程=t1:2017-08-02 16:33:19

3 进入线程=t2:2017-08-02 16:33:19

4 t2 获取锁成功!,执行需要加锁的任务2017-08-02 16:33:19--->T2 19秒时获取到锁

5 t2 释放锁2017-08-02 16:33:21--->T2任务完成,21秒时释放锁

6 t1 获取锁成功!,执行需要加锁的任务2017-08-02 16:33:21--->T1 21秒时获取到锁

7 t1 释放锁2017-08-02 16:33:23--->T2任务完成,23秒时释放锁

8 t3 获取锁成功!,执行需要加锁的任务2017-08-02 16:33:23--->T3 23秒时获取到锁

9 t3 释放锁2017-08-02 16:33:25--->T2任务完成,25秒时释放锁

10 3个线程都解锁完毕,关闭客户端!

112508079_2_20171003105711341.gif

如上图,3个线程共消耗25-19=6秒,验证通过,确实互斥锁住了。

我们用Redis Desktop Manger来看一下redis中数据:

112508079_2_20171003105711341.gif

1 192.168.50.107:0>hgetall "testkey20170802"--->用key查询hash所有的值

2 1) 159b46b3-8bc5-4447-ad57-c55fdd381384:30--->T2获取到锁field=uuid:线程号

3 2) 1 --->value=1代表重入次数为1

4 192.168.50.107:0>hgetall "testkey20170802"--->T2释放锁,T1获取到锁

5 1) 159b46b3-8bc5-4447-ad57-c55fdd381384:29

6 2) 1

7 192.168.50.107:0>hgetall "testkey20170802"--->T1释放锁,T3获取到锁

8 1) 159b46b3-8bc5-4447-ad57-c55fdd381384:31

9 2) 1

10 192.168.50.107:0>hgetall "testkey20170802"--->最后一次查询时,T3释放锁,已无数据

112508079_2_20171003105711341.gif

2)基于zookeeper实现

原理:

每个客户端(每个JVM内部共用一个客户端实例)对某个方法加锁时,在zookeeper上指定节点的目录下,生成一个唯一的瞬时有序节点。判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。当释放锁的时候,只需将这个瞬时节点删除即可。

我们使用apache的Curator组件来实现,一般使用Client、Framework、Recipes三个组件。

curator下,InterProcessMutex可重入互斥公平锁,源码(curator-recipes-2.4.1.jar)注释如下:

A re-entrant mutex that works across JVMs. Uses Zookeeper to hold the lock. All processes in all JVMs that use the same lock path will achieve an inter-process critical section. Further, this mutex is "fair" - each user will get the mutex in the order requested (from ZK's point of view)

即一个在JVM上工作的可重入互斥锁。使用ZK去持有这把锁。在所有JVM中的进程组,只要使用相同的锁路径将会获得进程间的临界资源。进一步说,这个互斥锁是公平的-因为每个线程将会根据请求顺序获得这个互斥量(对于ZK来说)

主要方法如下:

112508079_2_20171003105711341.gif

1 //构造方法

2 public InterProcessMutex(CuratorFramework client, String path)

3 public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver)

4 //通过acquire获得锁,并提供超时机制:

5 public void acquire() throws Exception

6 public boolean acquire(long time, TimeUnit unit) throws Exception

7 //撤销锁

8 public void makeRevocable(RevocationListener listener)

9 public void makeRevocable(final RevocationListener listener, Executor executor)

112508079_2_20171003105711341.gif

我们主要分析核心获取锁acquire方法如下:

112508079_2_20171003105711341.gif

1 @Override

2 public boolean acquire(long time, TimeUnit unit) throws Exception

3 {

4 return internalLock(time, unit);5 }

6

7 private boolean internalLock(long time, TimeUnit unit) throws Exception

8 {

9 /*

10 Note on concurrency: a given lockData instance11 can be only acted on by a single thread so locking isn't necessary12 */

13

14 Thread currentThread = Thread.currentThread();

15 //线程安全map:private final ConcurrentMap   threadData = Maps.newConcurrentMap();

16 LockData lockData =threadData.get(currentThread);

17 if ( lockData != null )

18 {

19 //这里实现了可重入,如果当前线程已经获取锁,计数+1,直接返回true

20 lockData.lockCount.incrementAndGet();

21 return true;

22 }

23 //获取锁,核心方法

24 String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());

25 if ( lockPath != null )

26 { //得到锁,塞进线程安全map

27 LockData newLockData = new LockData(currentThread, lockPath);

28 threadData.put(currentThread, newLockData);

29 return true;

30 }

31

32 return false;

33 }

112508079_2_20171003105711341.gif

核心获取锁的方法attemptLock源码如下:

112508079_2_20171003105711341.gif

1 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception

2 {

3 final long startMillis = System.currentTimeMillis();

4 final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;

5 final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;

6 int retryCount = 0;

7

8 String ourPath = null;

9 boolean hasTheLock = false;

10 boolean isDone = false;

11 while ( !isDone )

12 {

13 isDone = true;

14

15 try

16 {

17 if ( localLockNodeBytes != null )

18 {

19 ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, localLockNodeBytes);

20 }

21 else

22 { //创建瞬时节点(客户端断开连接时删除),节点名追加自增数字

23 ourPath =client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);24 }

//自循环等待时间,并判断是否获取到锁

25 hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);

26 }

27 catch ( KeeperException.NoNodeException e )

28 {

29 //gets thrown by StandardLockInternalsDriver when it can't find the lock node30 //this can happen when the session expires, etc. So, if the retry allows, just try it all again

31 if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )

32 {

33 isDone = false;

34 }

35 else

36 {

37 throw e;

38 }

39 }

40 }

41 //获取到锁返回节点path

42 if ( hasTheLock )

43 {

44 return ourPath;

45 }

46

47 return null;

48 }

112508079_2_20171003105711341.gif

自循环等待时间:

112508079_2_20171003105711341.gif

1 private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception

2 {

3 boolean haveTheLock = false;

4 boolean doDelete = false;

5 try

6 {

7 if ( revocable.get() != null )

8 {

9 client.getData().usingWatcher(revocableWatcher).forPath(ourPath);

10 }

11

12 while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )//如果状态是开始且未获取到锁

13 {

14 List children = getSortedChildren();//获取父节点下所有线程的子节点

15 String sequenceNodeName = ourPath.substring(basePath.length() + 1); //获取当前节点名称

16//核心方法:判断是否获取到锁

17 PredicateResults predicateResults =driver.getsTheLock(client, children, sequenceNodeName, maxLeases);

18 if ( predicateResults.getsTheLock() )//获取到锁,置true,下一次循环退出

19 {

20 haveTheLock = true;

21 }

22 else//没有索取到锁

23 {

24 String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();//这里路径是上一次获取到锁的持有锁路径

25

26 synchronized(this)//强制加锁

27 {

//让线程等待,并且watcher当前节点,当节点有变化的之后,则notifyAll当前等待的线程,让它再次进入来争抢锁

28 Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath);

29 if ( stat != null )

30 {

31 if ( millisToWait != null )

32 {

33 millisToWait -= (System.currentTimeMillis() - startMillis);

34 startMillis = System.currentTimeMillis();

35 if ( millisToWait <= 0 )

36 {

37 doDelete = true; //等待超时,置状态为true,后面会删除节点

38 break;

39 }

40 //等待指定时间

41 wait(millisToWait);

42 }

43 else

44 { //一直等待

45 wait();

46 }

47 }

48 }

49 //else it may have been deleted (i.e. lock released). Try to acquire again

50 }

51 }

52 }

53 catch ( Exception e )

54 {

55 doDelete = true;

56 throw e;

57 }

58 finally

59 {

60 if ( doDelete )//删除path

61 {

62 deleteOurPath(ourPath);

63 }

64 }

65 return haveTheLock;

66 }

112508079_2_20171003105711341.gif

112508079_2_20171003105711341.gif

1 @Override

2 public PredicateResults getsTheLock(CuratorFramework client, List children, String sequenceNodeName, int maxLeases) throws Exception

3 {

4 int ourIndex = children.indexOf(sequenceNodeName);//先根据子节点名获取children(所有子节点升序集合)中的索引

5 validateOurIndex(sequenceNodeName, ourIndex);//校验如果索引为负值,即不存在该子节点

6 //maxLeases允许同时租赁的数量,这里源代码写死了1,但这种设计符合将来拓展,修改maxLeases即可满足多租赁

7 boolean getsTheLock = ourIndex

8 String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);//获取到锁返回null,否则未获取到锁,获取上一次的获取到锁的路径。后面会监视这个路径用以唤醒请求线程

9

10 return new PredicateResults(pathToWatch, getsTheLock);

11 }

112508079_2_20171003105711341.gif

特点:

1.可避免死锁:zk瞬时节点(Ephemeral Nodes)生命周期和session一致,session结束,节点自动删除。

2.依赖zk创建节点,涉及文件操作,开销较大。

实战:

1.创建客户端client

2.生成互斥锁InterProcessMutex

3.开启3个线程去获取锁

112508079_2_20171003105711341.gif

1 package distributed.lock.zk;

2

3 import java.text.SimpleDateFormat;

4 import java.util.Date;

5 import java.util.concurrent.TimeUnit;

6

7 import org.apache.curator.framework.CuratorFramework;

8 import org.apache.curator.framework.CuratorFrameworkFactory;

9 import org.apache.curator.framework.recipes.locks.InterProcessMutex;

10 import org.apache.curator.retry.ExponentialBackoffRetry;

11 import org.apache.curator.retry.RetryNTimes;

12 import org.jboss.netty.channel.StaticChannelPipeline;

13 import org.omg.CORBA.PRIVATE_MEMBER;

14

15 /**

16 *17 * @ClassName:CuratorDistrLockTest18 * @Description:Curator包实现zk分布式锁:利用了zookeeper的临时顺序节点特性,一旦客户端失去连接后,则就会自动清除该节点。19 *@authordiandian.zhang20 * @date 2017年7月11日下午12:43:4421 */

22

23 public class CuratorDistrLock {

24 private static final String ZK_ADDRESS = "192.168.50.253:2181";//zk

25 private static final String ZK_LOCK_PATH = "/zktest";//path

26 static SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

27

28 public static void main(String[] args) {

29 try {

30 //创建zk客户端31 //CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS,new RetryNTimes(3, 1000));

32 CuratorFramework client = CuratorFrameworkFactory.builder()

33 .connectString(ZK_ADDRESS)

34 .sessionTimeoutMs(5000)

35 .retryPolicy(new ExponentialBackoffRetry(1000, 10))

36 .build();

37 //开启

38 client.start();

39 System.out.println("zk client start successfully!"+time.format(new Date()));

40

41 Thread t1 = new Thread(() -> {

42 doWithLock(client);//函数式编程

43 }, "t1");

44 Thread t2 = new Thread(() -> {

45 doWithLock(client);

46 }, "t2");

47 Thread t3 = new Thread(() -> {

48 doWithLock(client);

49 }, "t3");

50 //启动线程

51 t1.start();

52 t2.start();

53 t3.start();

54 } catch (Exception e) {

55 e.printStackTrace();

56 }

57 }

58

59 /**

60 *61 * @Description 线程执行函数体62 *@paramclient63 *@paramlock64 *@authordiandian.zhang65 * @date 2017年7月12日下午6:00:5366 *@sinceJDK1.867 */

68 private static void doWithLock(CuratorFramework client) {

69 //依赖ZK生成的可重入互斥公平锁(按照请求顺序)

70 InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);

71 try {

72 System.out.println("进入线程="+Thread.currentThread().getName()+":"+time.format(new Date()));

73

74 //花20秒时间尝试获取锁

75 if (lock.acquire(20, TimeUnit.SECONDS)) {

76 System.out.println(Thread.currentThread().getName() + " 获取锁成功!,执行需要加锁的任务"+time.format(new Date()));

77 Thread.sleep(2000L);//休息2秒模拟执行需要加锁的任务78 //获取锁超时

79 }else{

80 System.out.println(Thread.currentThread().getName() + " 获取锁超时!"+time.format(new Date()));

81 }

82 } catch (Exception e) {

83 e.printStackTrace();

84 } finally {

85 try {

86 //当前线程获取到锁,那么最后需要释放锁(实际上是删除节点)

87 if (lock.isAcquiredInThisProcess()) {

88 lock.release();

89 System.out.println(Thread.currentThread().getName() + " 释放锁"+time.format(new Date()));

90 }

91 } catch (Exception e) {

92 e.printStackTrace();

93 }

94 }

95 }

96

97 }

112508079_2_20171003105711341.gif

执行结果:

112508079_2_20171003105711341.gif

zk client start successfully!

进入线程=t2:2017-07-13 11:13:23

进入线程=t1:2017-07-13 11:13:23

进入线程=t3:2017-07-13 11:13:23

t2 获取锁成功!,执行需要加锁的任务2017-07-13 11:13:23----》起始时间23秒

t2 释放锁2017-07-13 11:13:25

t3 获取锁成功!,执行需要加锁的任务2017-07-13 11:13:25----》验证耗时2秒,T2执行完,T3执行

t3 释放锁2017-07-13 11:13:27

t1 获取锁成功!,执行需要加锁的任务2017-07-13 11:13:27----》验证耗时2秒,T3执行完,T1执行

t1 释放锁2017-07-13 11:13:29----》验证耗时2秒,T1执行完,3个任务共耗时=29-23=6秒,验证互斥锁达到目标。

112508079_2_20171003105711341.gif

查看zookeeper节点:

1.客户端连接

zkCli.sh -server 192.168.50.253:2181

2.查看节点

[zk: 192.168.50.253:2181(CONNECTED) 80] ls /-----》查看根目录

[dubbo, zktest, zookeeper, test]

[zk: 192.168.50.253:2181(CONNECTED) 81] ls /zktest -----》查看我们创建的子节点

[_c_034e5f23-abaf-4d4a-856f-c27956db574e-lock-0000000007, _c_63c708f1-2c3c-4e59-9d5b-f0c70c149758-lock-0000000006, _c_1f688cb7-c38c-4ebb-8909-0ba421e484a4-lock-0000000008]

[zk: 192.168.50.253:2181(CONNECTED) 82] ls /zktest-----》任务执行完毕最终释放了子节点

[]

4.总结比较

一级锁分类

二级锁分类

锁名称

特性

是否推荐

单JVM锁

基于JVM源生synchronized关键字实现

synchronized同步锁

适用于低并发的情况,性能稳定。

新手推荐

基于JDK实现,需显示获取锁,释放锁

ReentrantLock可重入锁

适用于低、高并发的情况,性能较高

需要指定公平、非公平或condition时使用。

ReentrantReadWriteLock

可重入读写锁

适用于读多写少的情况。性能高。

老司机推荐

StampedLock戳锁

JDK8才有,适用于高并发且读远大于写时,支持乐观读,票据校验失败后可升级悲观读锁,性能极高!

老司机推荐

分布式锁

基于数据库锁实现

悲观锁:select for update

sql直接使用,但水很深。涉及数据库ACID原理+隔离级别+不同数据库规范

不推荐

乐观锁:版本控制

自己实现字段版本控制

新手推荐

基于缓存实现

org.redisson

性能极高,支持除了分布式锁外还实现了分布式对象、分布式集合等极端强大的功能

老司机推荐

基于zookeeper实现

org.apache.curator zookeeper

性能较高,除支持分布式锁外,还实现了master选举、节点监听()、分布式队列、Barrier、AtomicLong等计数器

老司机推荐

=====附Redis命令=======

SETNX key value (SET if Not eXists):当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0。详见:SETNX commond

GETSET key value:将给定 key 的值设为 value ,并返回 key 的旧值 (old value),当 key 存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。详见:GETSET commond

GET key:返回 key 所关联的字符串值,如果 key 不存在那么返回 nil 。详见:GET Commond

DEL key [KEY …]:删除给定的一个或多个 key ,不存在的 key 会被忽略,返回实际删除的key的个数(integer)。详见:DEL Commond

HSET key field value:给一个key 设置一个{field=value}的组合值,如果key没有就直接赋值并返回1,如果field已有,那么就更新value的值,并返回0.详见:HSET Commond

HEXISTS key field:当key 中存储着field的时候返回1,如果key或者field至少有一个不存在返回0。详见HEXISTS Commond

HINCRBY key field increment:将存储在 key 中的哈希(Hash)对象中的指定字段 field 的值加上增量 increment。如果键 key 不存在,一个保存了哈希对象的新建将被创建。如果字段 field 不存在,在进行当前操作前,其将被创建,且对应的值被置为 0。返回值是增量之后的值。详见:HINCRBY Commond

PEXPIRE key milliseconds:设置存活时间,单位是毫秒。expire操作单位是秒。详见:PEXPIRE Commond

PUBLISH channel message:向channel post一个message内容的消息,返回接收消息的客户端数。详见PUBLISH Commond

======参考======

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/507230.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux java socket编程_深入学习socket网络编程,以java语言为例

了解java的socket编程与Linux Socket API之间的关系一、java的网络编程1、socket原理socket通信就是通过IP和端口号将两台主机建立连接&#xff0c;提供通信。主机A的应用程序要能和服务器B进行通信&#xff0c;必须通过Socket建立连接&#xff0c;而建立Socket连接本质上就是依…

python去掉空白行_python去掉空白行的多种实现代码

这篇文章主要介绍了python去掉空白行实现代码,需要的朋友可以参考下 测试代码 php.txt1:www.php.cn 2:www.php.cn 3:www.php.cn 4:www.php.cn 5:www.php.cn 6:www.php.cn 7:www.php.cn 8:www.php.cn 9:www.php.cn 10:www.php.cn 11:www.php.cn 12:www.php.cn 13:www.php.cn 14:…

html get请求_99% 的人都理解错了 HTTP 中 GET 与 POST 的区别【面试必问】

先分析一波&#xff1a;1、GET和POST与数据如何传递没有关系&#xff1f;&#xff1f;GET和POST是由HTTP协议定义的。那么使用哪个方式与应用层的数据如何传输是没有相互关系的。从而&#xff0c;HTTP就没有要求&#xff0c;POST一定要放到请求体里面&#xff0c;GET就一定要放…

hive load data外部表报错_生产SparkSQL如何读写本地外部数据源及排错

https://spark-packages.org/里有很多third-party数据源的package&#xff0c;spark把包加载进来就可以使用了csv格式在spark2.0版本之后是内置的&#xff0c;2.0之前属于第三方数据源一、读取本地外部数据源1.直接读取一个json文件[hadoophadoop000 bin]$ ./spark-shell --mas…

ffmpeg命令_使用ffmpeg命令为多个短视频修改视频备注说明

今天主要给大家讲一下使用视频剪辑高手中的ffmpeg命令为多个短视频修改备注说明的详细步骤&#xff0c;有需要和感兴趣的宝贝们可以跟随小编一起来试试。收集视频将需要剪辑的短视频保存到同一文件夹上进入软件双击进入视频剪辑高手&#xff0c;选择“批量剪辑视频”功能添加视…

从事python需要掌握哪些知识和技能_零基础想转行从事Python?需要掌握如下技能...

零基础python能找到工作吗&#xff1f;需要掌握哪些技能&#xff1f;对于大部分零基础学编程半路出家的人来说&#xff0c;无非是想改变现状换一门新职业&#xff0c;所谓技术大牛不过是比小白们更早接触编程罢了&#xff0c;选择好自己有兴趣的职业技能&#xff0c;并为之学习…

java byte 判断相等_你真的了解Java中quot;==quot;和equals()的区别?

部分面试资料链接:https://pan.baidu.com/s/1qDb2YoCopCHoQXH15jiLhA密码:jsam想获得全部面试必看资料&#xff0c;关注公众号&#xff0c;大家可以在公众号后台回复“知乎”即可。“判断两个事物是否相等”&#xff0c;是编程中最常见的操作之一&#xff0c;在Java中&#xff…

数据通信原理_同网段主机通信原理

本篇文章介绍数据通信中最基础&#xff0c;最关键的原理之一&#xff0c;两台通网段的主机如何通信。获得更多技术资料和免费学习视频&#xff0c;加入讨论群&#xff1a;752160765适合两台普通电脑之间&#xff0c;两台服务器之间&#xff0c;两台手机之间&#xff0c;电脑和打…

java jdk 未知错误_解决JAVA JDK安装出错的最常见问题,帮你排除困扰

一般来说&#xff0c;安装JAVA JDK的整个流程是很简单的&#xff0c;只要按照提示进行操作即可&#xff0c;就不会出现问题。但是呢&#xff0c;有小伙伴反映说&#xff0c;之前安装了JAVA JDK&#xff0c;进行卸载重装的时候出现错误提示&#xff0c;“正在进行另一Java安装”…

定义const变量是不可以赋值_JavaScript的声明方法和作用范围,常见的结构赋值类型和使用场景...

链接&#xff1a;https://juejin.im/post/5d9bf530518825427b27639d声明const命令&#xff1a;声明常量 let命令&#xff1a;声明变量作用作用域全局作用域函数作用域&#xff1a;function() {}块级作用域&#xff1a;{}作用范围var 命令在全局代码中执行const命令和let命令只能…

java社区活跃度_Java并发编程-活跃度问题

在讲问题前&#xff0c;我先说明一下什么是活跃度&#xff1f;一个并发应用及时执行的能力称作活跃度。我主要讲死锁问题&#xff0c;顺带介绍一下饥饿&#xff0c;弱响应性和活锁。死锁死锁这个词大家都听过&#xff0c;我先来罗列一下产生死锁的四个必要条件&#xff1a;(1) …

java8 di_java8 多个list对象用lambda求差集操作

业务场景&#xff1a;调用同步接口获取当前全部有效账户&#xff0c;数据库已存在部分账户信息&#xff0c;因此需要筛选同步接口中已存在本地的帐户。调用接口获取的数据集合List list response.getData().getItems();本地查询出来的账户集合List towList accountRepository…

怎么抓python程序的包_如何在AWS上部署python应用程序

如何在AWS上部署python应用程序&#xff0c;学姐呕心沥血亲自总结&#xff0c;亲测有效&#xff0c;比网上看网上大把大把的文档要快得多&#xff01;作者&#xff1a;蕾学姐亚马逊云计算服务&#xff08;Amazon Web Services&#xff0c;缩写为AWS&#xff09;&#xff0c;由亚…

【Hadoop】Zookeeper架构/特点

Zookeeper 中的角色主要有以下三类&#xff1a; Zookeeper需要保证高可用性和强一致性为了支持更多的客户端&#xff0c;需要增加更多Server&#xff0c;但是Server增多&#xff0c;意味着投票阶段延迟增大&#xff0c;会影响整个系统的性能。所以在3.3.0中ZK引入的新角色&…

wpf 按钮样式_键盘 | 01.在程序集间引用样式

设置Button和TextBox的特定颜色和字体的样式和默认样式&#xff0c;并在程序集间引用。从零开始用WPF/C#开发一个键盘指示器项目完整开源、免费&#xff0c;不依赖第三方库编译好的先行版程序在微信公众号(香辣恐龙蛋)下载。文章同时发布在微信公众号(香辣恐龙蛋)、B站(香辣恐龙…

怎么去除表中的系统导出的字符_EXCEL非常有用的字符函数LEN、LENB,财务工作者的必备利器...

LEN函数与LENB函数是比较常用的函数&#xff0c;在实际中应用那是相当广的&#xff0c;尤其在财务工作中的使用频率是相当的高。我就一起看看实际工作中哪些地方用到了它。我们先简单说下它的用法&#xff0c;很简单&#xff0c;LEN(text)、LENB(text)&#xff0c;两个用法是一…

java软尾山地车碳_JAVA FURIA 27.5入门软尾山地车评测

铝合金车架、前后100mm避震行程、超短把立、长款燕把、27.5轮径……之前跟大家讲过&#xff0c;在这台车上你能拥有全避震车型所应该具备的所有基础元素。2个月的时间已经过去&#xff0c;这台车到底怎么样呢&#xff1f;我们一起来看一下。优点&#xff1a;质量靠谱&#xff0…

java 模拟路由表_Router的路由表

Router中使用routers字段表示路由表&#xff0c;这是一个数组&#xff0c;每个元组的类型是[desnination,nexthop],其中destination表示目的网段(cidr)&#xff0c;nexthop表示下一跳的IP&#xff0c;举例如下&#xff1a;“routes”:[ { “destination”:”10.50.10.0/24” “…

无符号有符号乘法_刘帅嵌入式系统-乘法指令

ARM有两类乘法指令&#xff1a;一类为32位的乘法指令&#xff0c;即乘法操作的结果为32位&#xff1b;另一类为64位的乘法指令&#xff0c;即乘法操作的结果为64位。两类指令共有以下6条。MUL&#xff1a;32位乘法指令MLA&#xff1a;32位带加数的乘法指令SMULL&#xff1a;64位…

php导出csv_原生PHP实现导出csv格式Excel文件的方法示例【附源码下载】

本文实例讲述了原生PHP实现导出csv格式Excel文件的方法。分享给大家供大家参考&#xff0c;具体如下&#xff1a;效果图源码分析index.phprequire_once "./Export.php";//测试数据$headerList [列名1,列名2,列名3];$data [[值1,值2,值3],[值11,值22,值33],[值111,值…