单机环境下可以利用jvm级别的锁,比如synchronized、Lock等来实现锁,如果是多机部署就需要一个共享数据存储区域来实现分布式锁
一、分布式锁实现方式
1、基于数据库实现分布式锁
可以用数据库唯一索引来实现
2、基于redis实现分布式锁
redis实现的分布式锁始终会有一些问题,即便使用多数写入,主节点挂了,数据丢失还是会存在加锁问题,就是主节点宕机,客户端无法感知
3、基于zookeeper实现分布式锁
1)实现方式一
使用临时节点创建成功获取锁,否则监听临时节点,有个问题,比如1000个线程只有一个会加锁成功,当删除临时节点时999个线程都会去竞争
2)实现方式二
公平锁的实现
4、Curator可重入分布式锁工作流程
从InterProcessMutex类找到acquire()加锁方法
public void acquire() throws Exception {if (!this.internalLock(-1L, (TimeUnit)null)) {throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);}}
1)加锁
private boolean internalLock(long time, TimeUnit unit) throws Exception {// 获取当前线程Thread currentThread = Thread.currentThread();// threadData类型是ConcurrentMap,从threadData中去拿LockData加锁对象InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);// 如果拿到了 证明之前加锁了,lockCount重入次数+1if (lockData != null) {lockData.lockCount.incrementAndGet();return true;} else {// 没有拿到开始从zookeeper上创建lock节点String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());// 创建成功 加锁成功把对象放到threadData中if (lockPath != null) {InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);this.threadData.put(currentThread, newLockData);return true;} else {// 加锁失败return false;}}}private static class LockData {// 持有锁的线程final Thread owningThread;// 在zookeeper的锁路径final String lockPath;// 线程加锁次数final AtomicInteger lockCount;private LockData(Thread owningThread, String lockPath) {this.lockCount = new AtomicInteger(1);this.owningThread = owningThread;this.lockPath = lockPath;}}
2)创建节点返回路径
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {long startMillis = System.currentTimeMillis();Long millisToWait = unit != null ? unit.toMillis(time) : null;byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;// 重试次数int retryCount = 0;String ourPath = null;boolean hasTheLock = false;boolean isDone = false;while(!isDone) {isDone = true;try {// 创建临时有序节点ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);// 创建的节点是否为最小节点hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);} catch (NoNodeException var14) {// 加锁失败 重试设置重试策略if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {throw var14;}isDone = false;}}return hasTheLock ? ourPath : null;}
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {String ourPath;// 是否要给节点设置属性 创建的都是临时有序节点if (lockNodeBytes != null) {ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path, lockNodeBytes);} else {ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path);}return ourPath;}
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {boolean haveTheLock = false;boolean doDelete = false;try {if (this.revocable.get() != null) {((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);}while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {// 将子节点进行排序List<String> children = this.getSortedChildren();// 截取创建的节点的序号String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);// 判断是否为最小序号PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);if (predicateResults.getsTheLock()) {// 加锁成功haveTheLock = true;} else {// 拿到上一个节点的路径String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();synchronized(this) {try {// 监听上一个节点((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);if (millisToWait == null) {// 等待this.wait();} else {millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);startMillis = System.currentTimeMillis();if (millisToWait > 0L) {// 超时等待this.wait(millisToWait);} else {doDelete = true;break;}}} catch (NoNodeException var19) {}}}}} catch (Exception var21) {ThreadUtils.checkInterrupted(var21);doDelete = true;throw var21;} finally {if (doDelete) {this.deleteOurPath(ourPath);}}return haveTheLock;}
3)解锁
public void release() throws Exception {Thread currentThread = Thread.currentThread();InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);if (lockData == null) {// 分布式场景下什么情况都可能有 所以判断一下throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath);} else {// 重入次数减1int newLockCount = lockData.lockCount.decrementAndGet();if (newLockCount <= 0) {if (newLockCount < 0) {throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath);} else {try {// 这里之前应该还有个大于0的判断 在curator5.1.0&zookeeper 3.9.0版本去掉了this.internals.releaseLock(lockData.lockPath);} finally {this.threadData.remove(currentThread);}}}}}
5、总结
优点:Zookeeper分布式锁(如InterProcessMutex),具备高可用、可重入、阻塞锁特性,可解决失效死锁问题
缺点:因为需要频繁的创建和删除节点,性能上不如redis
在高性能、高并发场景下,不建议用Zookeeper的分布式锁。而由于Zookeeper的高可靠性,因此在并发量不是太高的应用场景下,还是推荐使用Zookeeper的分布式锁
二、服务注册与发现
1、设计思路
2、实现注册中心的优缺点
优点:
- 高可用性:ZooKeeper是一个高可用的分布式系统,可以通过配置多个服务器实例来提供容错能力。如果其中一个实例出现故障,其他实例仍然可以继续提供服务。
- 强一致性:ZooKeeper保证了数据的强一致性。当一个更新操作完成时,所有的服务器都将具有相同的数据视图。这使得ZooKeeper非常适合作为服务注册中心,因为可以确保所有客户端看到的服务状态是一致的。
- 实时性:ZooKeeper的监视器(Watcher)机制允许客户端监听节点的变化。当服务提供者的状态发生变化时(例如,上线或下线),客户端会实时收到通知。这使得服务消费者能够快速响应服务的变化,从而实现动态服务发现。
缺点:
- 性能限制:ZooKeeper的性能可能不如一些专为服务注册中心设计的解决方案,如nacos或Consul。尤其是在大量的读写操作或大规模集群的情况下,ZooKeeper可能会遇到性能瓶颈。
3、整合Spring Cloud Zookeeper实现微服务注册中心
Spring Cloud Zookeeper
第一步:在父pom文件中指定Spring Cloud版本
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.2.RELEASE</version><relativePath/> <!-- lookup parent from repository -->
</parent>
<properties><java.version>1.8</java.version><spring-cloud.version>Hoxton.SR8</spring-cloud.version>
</properties>
<dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>
注意: springboot和springcloud的版本兼容问题
第二步:微服务pom文件中引入Spring Cloud Zookeeper注册中心依赖
<!-- zookeeper服务注册与发现 -->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-zookeeper-discovery</artifactId><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions>
</dependency><!-- zookeeper client -->
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.0</version>
</dependency>
注意: zookeeper客户端依赖和zookeeper sever的版本兼容问题
Spring Cloud整合Zookeeper注册中心核心源码入口: ZookeeperDiscoveryClientConfiguration
第三步: 微服务配置文件application.yml中配置zookeeper注册中心地址
spring:cloud:zookeeper: connect-string: localhost:2181discovery:instance-host: 127.0.0.1
注册到zookeeper的服务实例元数据信息如下:
注意:如果address有问题,会出现找不到服务的情况,可以通过instance-host配置指定
第四步:整合feign进行服务调用
@RequestMapping(value = "/findOrderByUserId/{id}")
public R findOrderByUserId(@PathVariable("id") Integer id) {log.info("根据userId:"+id+"查询订单信息");//feign调用 R result = orderFeignService.findOrderByUserId(id);return result;
}