章节内容
上节我们完成了:
- ZooKeeper的Leader选举机制
- ZooKeeper的选举过程
- ZooKeeper的ZAB协议
背景介绍
这里是三台公网云服务器,每台 2C4G,搭建一个Hadoop的学习环境,供我学习。
之前已经在 VM 虚拟机上搭建过一次,但是没留下笔记,这次趁着前几天薅羊毛的3台机器,赶紧尝试在公网上搭建体验一下。
- 2C4G 编号 h121
- 2C4G 编号 h122
- 2C2G 编号 h123
分布式锁
出现问题1(单机器)
- 假设 Redis 里面的某个商品库存为1,此时两个用户同时下单,其中一个下单请求执行到第3步,更新数据库的库存为0,但是第4步还没执行。
- 而另外一个用户下单执行到了第二步,发现库存还是1,就会继续执行第3步。
- 但是此时库存已经为0了,所以数据库没有限制,此时会出现超卖的问题。
解决方案1
- 用锁把2、3、4步锁住,让他们执行完后,另一个线程才能够继续执行。
- 但是由于业务发展迅速,原来的单机已经不能够满足,此时增加一台机器后,会出现更严重的问题。
出现问题2(多机器)
假设有两个订单同时执行,分别有两个机器执行,那么这两个请求就是可以同时执行了,这样就依然出现了超卖的问题。
解决方案2
我们需要使用分布式锁来解决上面出现的问题。
分布式锁的作用就是在整个系统中提供一个全局的、唯一的锁,在分布式系统中每个系统进行相关的操作时都需要获取到该锁,才能够执行相应的操作。
ZK 分布式锁
实现思路
- 锁就是ZK指定目录下序号最小的临时节点,多个系统的多个线程都要在此目录下创建临时顺序节点,因为ZK会保证节点的顺序性,所以可以利用节点的顺序性进行锁判断。
- 每个线程都是先创建临时顺序节点,然后获取当前目录下最小的节点(序号),判断最小节点是不是当前节点,如果是那么获取锁成功,如果不是则获取锁失败。
- 获取锁失败的线程获取当前节点上一个临时顺序节点,并对此节点进行监听,当该节点删除时,代表释放了锁。
流程图
编写代码
LockTest
package icu.wzk.zk.demo02;public class LockTest {public static void main(String[] args) {for (int i = 0; i < 10; i ++) {// 启动10个new Thread(new LockRunnable()).start();}}static class LockRunnable implements Runnable {@Overridepublic void run() {final ClientTest clientTest = new ClientTest();clientTest.getLock();try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}clientTest.deleteLock();}}}
ClientTest
package icu.wzk.zk.demo02;import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;public class ClientTest {private ZkClient zkClient = new ZkClient("h121.wzk.icu:2181,h122.wzk.icu:2181,h123.wzk.icu:2181");String beforeNodePath;String currentNodePath;CountDownLatch countDownLatch = null;public ClientTest() {synchronized (ClientTest.class) {if (!zkClient.exists("/lock")) {zkClient.createPersistent("/lock");}}}public boolean tryGetLock() {if (null == currentNodePath || currentNodePath.isEmpty()) {currentNodePath = zkClient.createEphemeralSequential("/lock/", "lock");}final List<String> childs = zkClient.getChildren("/lock");Collections.sort(childs);final String minNode = childs.get(0);if (currentNodePath.equals("/lock/" + minNode)) {return true;} else {final int i = Collections.binarySearch(childs, currentNodePath.substring("/lock/".length()));String lastNodeChild = childs.get(i - 1);beforeNodePath = "/lock/" + lastNodeChild;}return false;}public void waitForLock() {final IZkDataListener iZkDataListener = new IZkDataListener() {@Overridepublic void handleDataChange(String dataPath, Object data) throws Exception {//}@Overridepublic void handleDataDeleted(String dataPath) throws Exception {countDownLatch.countDown();}};zkClient.subscribeDataChanges(beforeNodePath, iZkDataListener);if (zkClient.exists(beforeNodePath)) {countDownLatch = new CountDownLatch(1);try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}}zkClient.unsubscribeDataChanges(beforeNodePath, iZkDataListener);}public void deleteLock() {if (zkClient != null) {zkClient.delete(currentNodePath);zkClient.close();}}public void getLock() {final String threadName = Thread.currentThread().getName();if (tryGetLock()) {System.out.println(threadName + ": 获取到了锁!");} else {System.out.println(threadName + ": 没有获取到锁!");waitForLock();// 自己调用自己getLock();}}}