一、Zookeeper实现分布式锁
分布式锁主要用于在分布式环境中保证数据的一致性。
包括跨进程、跨机器、跨网络导致共享资源不一致的问题。
1.Zookeeper分布式锁的代码实现
新建一个maven项目ZK-Demo,然后在pom.xml里面引入相关的依赖
<dependency><groupId>com.101tec</groupId><artifactId>zkclient</artifactId><version>0.10</version></dependency>
2. Zookeeper分布式锁的核心代码实现
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/**
*
* @Description: Zookeeper分布式锁的核心代码实现
* @author leeSmall
* @date 2018年9月4日
*
*/
public class DistributedLock implements Lock {private static Logger logger = LoggerFactory.getLogger(DistributedLock.class);private static final String ZOOKEEPER_IP_PORT = "192.168.152.130:2181";private static final String LOCK_PATH = "/LOCK";private ZkClient client = new ZkClient(ZOOKEEPER_IP_PORT, 4000, 4000, new SerializableSerializer());private CountDownLatch cdl;private String beforePath;// 当前请求的节点前一个节点private String currentPath;// 当前请求的节点// 判断有没有LOCK目录,没有则创建public DistributedLock() {if (!this.client.exists(LOCK_PATH)) {this.client.createPersistent(LOCK_PATH);}}public void lock() {//尝试去获取分布式锁失败if (!tryLock()) {//对次小节点进行监听waitForLock();lock();} else {logger.info(Thread.currentThread().getName() + " 获得分布式锁!");}}public boolean tryLock() {// 如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPathif (currentPath == null || currentPath.length() <= 0) {// 创建一个临时顺序节点currentPath = this.client.createEphemeralSequential(LOCK_PATH + '/', "lock");System.out.println("---------------------------->" + currentPath);}// 获取所有临时节点并排序,临时节点名称为自增长的字符串如:0000000400List<String> childrens = this.client.getChildren(LOCK_PATH);//由小到大排序所有子节点Collections.sort(childrens);//判断创建的子节点/LOCK/Node-n是否最小,即currentPath,如果当前节点等于childrens中的最小的一个就占用锁if (currentPath.equals(LOCK_PATH + '/' + childrens.get(0))) {return true;} //找出比创建的临时顺序节子节点/LOCK/Node-n次小的节点,并赋值给beforePathelse {int wz = Collections.binarySearch(childrens, currentPath.substring(6));beforePath = LOCK_PATH + '/' + childrens.get(wz - 1);}return false;}//等待锁,对次小节点进行监听private void waitForLock() {IZkDataListener listener = new IZkDataListener() {public void handleDataDeleted(String dataPath) throws Exception {logger.info(Thread.currentThread().getName() + ":捕获到DataDelete事件!---------------------------");if (cdl != null) {cdl.countDown();}}public void handleDataChange(String dataPath, Object data) throws Exception {}};// 对次小节点进行监听,即beforePath-给排在前面的的节点增加数据删除的watcherthis.client.subscribeDataChanges(beforePath, listener);if (this.client.exists(beforePath)) {cdl = new CountDownLatch(1);try {cdl.await();} catch (InterruptedException e) {e.printStackTrace();}}this.client.unsubscribeDataChanges(beforePath, listener);}//完成业务逻辑以后释放锁public void unlock() {// 删除当前临时节点client.delete(currentPath);}// ==========================================public void lockInterruptibly() throws InterruptedException {}public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return false;}public Condition newCondition() {return null;}
}3.2 在业务里面使用分布式锁
package com.study.demo.lock;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/**
*
* @Description: 在业务里面使用分布式锁
* @author leeSmall
* @date 2018年9月4日
*
*/
public class OrderServiceImpl implements Runnable {private static OrderCodeGenerator ong = new OrderCodeGenerator();private Logger logger = LoggerFactory.getLogger(OrderServiceImpl.class);// 同时并发的线程数private static final int NUM = 10;// 按照线程数初始化倒计数器,倒计数器private static CountDownLatch cdl = new CountDownLatch(NUM);private Lock lock = new DistributedLock();// 创建订单接口public void createOrder() {String orderCode = null;//准备获取锁lock.lock();try {// 获取订单编号orderCode = ong.getOrderCode();} catch (Exception e) {// TODO: handle exception} finally {//完成业务逻辑以后释放锁lock.unlock();}// ……业务代码logger.info("insert into DB使用id:=======================>" + orderCode);}public void run() {try {// 等待其他线程初始化cdl.await();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}// 创建订单createOrder();}public static void main(String[] args) {for (int i = 1; i <= NUM; i++) {// 按照线程数迭代实例化线程new Thread(new OrderServiceImpl()).start();// 创建一个线程,倒计数器减1cdl.countDown();}}
}
工具类:
import java.text.SimpleDateFormat;
import java.util.Date;public class OrderCodeGenerator {// 自增长序列private static int i = 0;// 按照“年-月-日-小时-分钟-秒-自增长序列”的规则生成订单编号public String getOrderCode() {Date now = new Date();SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");return sdf.format(now) + ++i;}}
二.新建一个zookeeper配置中心类,从zookeeper动态获取数据库配置
import java.util.List;
import java.util.Properties;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.web.context.ContextLoader;
import org.springframework.web.context.WebApplicationContext;import com.zaxxer.hikari.HikariDataSource;/**
*
* @Description: zookeeper配置中心类,从zookeeper动态获取数据库配置
* @author leeSmall
* @date 2018年9月10日
*
*/
public class ZookeeperConfigurerCentral {//curator客户端private CuratorFramework zkClient;//curator事件监听private TreeCache treeCache;//zookeeper的ip和端口private String zkServers;//zookeeper上的/Jdbc路径private String zkPath;//超时设置private int sessionTimeout;//读取zookeeper上的数据库配置文件放到这里private Properties props;public ZookeeperConfigurerCentral(String zkServers, String zkPath, int sessionTimeout) {this.zkServers = zkServers;this.zkPath = zkPath;this.sessionTimeout = sessionTimeout;this.props = new Properties();//初始化curator客户端initZkClient();//从zookeeper的Jdbc节点下获取数据库配置存入propsgetConfigData();//对zookeeper上的数据库配置文件所在节点进行监听,如果有改变就动态刷新propsaddZkListener();}//初始化curator客户端private void initZkClient() {zkClient = CuratorFrameworkFactory.builder().connectString(zkServers).sessionTimeoutMs(sessionTimeout).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();zkClient.start();}//从zookeeper的Jdbc节点下获取数据库配置存入propsprivate void getConfigData() {try {List<String> list = zkClient.getChildren().forPath(zkPath);for (String key : list) {String value = new String(zkClient.getData().forPath(zkPath + "/" + key));if (value != null && value.length() > 0) {props.put(key, value);}}} catch (Exception e) {e.printStackTrace();}}//对zookeeper上的数据库配置文件所在节点进行监听,如果有改变就动态刷新propsprivate void addZkListener() {TreeCacheListener listener = new TreeCacheListener() {public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {if (event.getType() == TreeCacheEvent.Type.NODE_UPDATED) {getConfigData();WebApplicationContext ctx = ContextLoader.getCurrentWebApplicationContext();HikariDataSource dataSource = (HikariDataSource) ctx.getBean("dataSource");System.out.println("================"+props.getProperty("url"));dataSource.setJdbcUrl(props.getProperty("url"));dataSource.setUsername(props.getProperty("uname"));dataSource.setPassword(props.getProperty("password "));dataSource.setDriverClassName(props.getProperty("driver "));}}};treeCache = new TreeCache(zkClient, zkPath);try {treeCache.start();treeCache.getListenable().addListener(listener);} catch (Exception e) {e.printStackTrace();}}public Properties getProps() {return props;}public void setZkServers(String zkServers) {this.zkServers = zkServers;}public void setZkPath(String zkPath) {this.zkPath = zkPath;}public void setSessionTimeout(int sessionTimeout) {this.sessionTimeout = sessionTimeout;}
}
新建一个加载props里面的数据库配置的类
import java.util.Properties;import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.PropertyPlaceholderConfigurer;/**
*
* @Description: 加载props里面的数据库配置,这个类等价于以前在xml文件里面的配置:
* <context:property-placeholder location="classpath:config/jdbc_conf.properties"/>
* @author leeSmall
* @date 2018年9月10日
*
*/
public class ZookeeperPlaceholderConfigurer extends PropertyPlaceholderConfigurer {private ZookeeperConfigurerCentral zkConfigurerCentral;@Overrideprotected void processProperties(ConfigurableListableBeanFactory beanFactoryToProcess, Properties props)throws BeansException {System.out.println(zkConfigurerCentral.getProps());super.processProperties(beanFactoryToProcess, zkConfigurerCentral.getProps());}public void setzkConfigurerCentral(ZookeeperConfigurerCentral zkConfigurerCentral) {this.zkConfigurerCentral = zkConfigurerCentral;}
}