五、分布式锁-redission

源码仓库地址:git@gitee.com:chuangchuang-liu/hm-dingping.git

1、redission介绍

目前基于redis的setnx特性实现的自定义分布式锁仍存在的问题:

问题描述
重入问题同一个线程无法多次获取统一把锁。当方法A成功获取锁后,调用方法B,方法B也要获取锁,此时由于锁是不可重入的,也就是被方法A占用着,此时就产生了死锁的问题
不可重试自定义分布式锁无失败重试机制
超时释放锁的超时释放虽然可以避免死锁问题,但确实也可能存在业务执行时间比较长的情况,那这种情况下就仍存在安全隐患问题
主从一致性如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。

什么是Redission?
Redission是一个用于Java的Redis客户端,它提供了丰富的特性,包括内存数据网格的功能。它支持同步/异步/RxJava/Reactive API,拥有超过50种基于Redis的Java对象和服务。Redission的使用非常简单,没有学习曲线,您不需要了解任何Redis命令就可以开始使用。(GitHub - redisson/redisson, Redisson官网)
Redission可以让Java应用更方便地访问和操作Redis数据存储,适合于需要高性能和高并发的应用场景。

2、快速开始

  1. 导入依赖
<!--redission-->
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version>
</dependency>
  1. Redission配置客户端
@Configuration
public class RedisConfig {@Beanpublic RedissonClient redisClient(){Config config = new Config();// 可以用"rediss://"来启用SSL连接config.useSingleServer().setAddress("redis://192.168.224.128:6379").setPassword("123456");return Redisson.create(config);}
}
  1. 使用Redission分布式锁
@Resource
private RedissionClient redissonClient;@Test
void testRedisson() throws Exception{//获取锁(可重入),指定锁的名称RLock lock = redissonClient.getLock("anyLock");//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);//判断获取锁成功if(isLock){try{System.out.println("执行业务");          }finally{//释放锁lock.unlock();}}
}

3、redission可重入锁原理

3.1、原理介绍

在Lock锁中,他是借助于底层的一个voaltile的一个state变量来记录重入的状态的,比如当前没有人持有这把锁,那么state=0,假如有人持有这把锁,那么state=1,如果持有这把锁的人再次持有这把锁,那么state就会+1 ,如果是对于synchronized而言,他在c语言代码中会有一个count,原理和state类似,也是重入一次就加一,释放一次就-1 ,直到减少成0 时,表示当前这把锁没有被人持有。 、

而在redission中,也支持这种可重入锁原理,是通过redis的hash数据结构实现的。其中key表示这把锁是否存在,field判断锁是被哪个线程持有,value则记录锁被持有次数。
image.png

3.2、源码剖析

  • 获取锁

其中各参数解释:
KEYS[1]:锁的名称
ARGV[1]:锁过期时间
ARGV[2]:id + “:” + threadId

"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);"

判断锁是否存在
如果不存在,则设置当前线程标识,计数器+1;设置过期时间;
如果存在。做二次判断,判断锁的持有线程是不是自己?
如果是,计数器+1,重置锁的过期时间;
如果不是,获取锁失败,返回锁的剩余过期时间

  • 释放锁

其中各参数解释:
KEYS[1]:锁的名称
KEYS[2]:订阅频道
ARGV[1]:是要发布的消息内容
ARGV[2]:锁过期时间
ARGV[3]:id + “:” + threadId

"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;"

判断锁是不是当前线程?
不是==>直接返回
是==>计数器–
二次判断,判断计数器是否大于0
大于0==>重置锁过期时间
否则==>真正释放锁

4、redission锁重试和WatchDog机制

4.1、redission是如何解决不可重试的?

源码剖析:
用户调用tryLock方法时,指定waitTime最大等待时间

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}current = System.currentTimeMillis();RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {unsubscribe(subscribeFuture, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;}try {time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}while (true) {long currentTime = System.currentTimeMillis();ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// waiting for messagecurrentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) {subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {unsubscribe(subscribeFuture, threadId);}//        return get(tryLockAsync(waitTime, leaseTime, unit));
}
  1. 计算等待时间和获取当前时间:将用户指定的等待时间转换为毫秒,并记录方法调用时的当前时间。
  2. 尝试获取锁。如果ttl为空,则获取锁成功;否则,返回的是其他线程占用锁的剩余有效时间
  3. 检查剩余等待时间。如果time小于等于0,调用acquireFailed方法返回false
  4. 订阅锁。通过subscribe方法订阅相关的锁。如果在剩余时间内未能订阅成功,处理取消订阅并调用acquireFailed方法,返回false。
  5. 循环等待锁释放消息。等待过程中会调用tryAcquire方法获取锁,如果获取成功返回true
  6. 处理锁的ttl。如果ttl大于0,返回锁被其他线程占用的剩余过期时间(ttl)。更新剩余等待时间(time)。以time和ttl中较小的值继续等待再次尝试。
  7. 再次检查等于剩余等待时间。如果小于0,调用acquireFailed方法返回false
  8. 循环结束后(要么获取锁成功,要么超过最大等待时间了),最终调用unsubscribe方法取消订阅

结论1:redission不是获取锁失败后立即进行重试,而是等待“一定时间”后再进行重试,节省了一定的CPU资源,对服务器性能有一定提升;
结论2:一定要采取调用tryLock方法携带参数waitTime的重载方法,其他重载的tryLock方法底层是不具备重试机制的。

4.2、redission是如何解决锁超时释放的-看门狗机制?

自定义分布式锁仍存在的一个问题是:锁的超时释放虽然可以避免死锁问题,但确实也可能存在业务执行时间比较长的情况,那这种情况下业务还未执行完毕,锁就被释放了,存在一定的安全隐患。
源码剖析:

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);}RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}// lock acquiredif (ttlRemaining) {// 开启任务更新过期时间scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture;
}
private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}RFuture<Boolean> future = renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getName() + " expiration", e);return;}if (res) {// reschedule itselfrenewExpiration();}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.singletonList(getName()),internalLockLeaseTime, getLockName(threadId));}
  1. 如果没有指定leaseTime,那么底层会默认传入commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()看门狗时间
  2. 在leasetime的1/3处时间,会创建一个任务renewExpirationAsync方法来异步地更新重置锁过期时间
  3. 递归地调用自身来更新锁过期时间,直到业务处理完毕。

至此redission解决了因业务阻塞而导致锁提前释放的问题

业务执行完毕,释放锁源码剖析:

public RFuture<Void> unlockAsync(long threadId) {RPromise<Void> result = new RedissonPromise<Void>();// 释放锁RFuture<Boolean> future = unlockInnerAsync(threadId);future.onComplete((opStatus, e) -> {// 取消锁失效时间更新重置任务cancelExpirationRenewal(threadId);if (e != null) {result.tryFailure(e);return;}if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);result.tryFailure(cause);return;}result.trySuccess(null);});return result;
}void cancelExpirationRenewal(Long threadId) {ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (task == null) {return;}if (threadId != null) {task.removeThreadId(threadId);}if (threadId == null || task.hasNoThreads()) {Timeout timeout = task.getTimeout();if (timeout != null) {timeout.cancel();}// 删除递归更新锁时间任务EXPIRATION_RENEWAL_MAP.remove(getEntryName());}
}

当业务执行完毕且锁正常释放后,删除递归更新锁时间任务,避免redission一直递归创建任务更新锁过期时间

5、redission锁的MultiLock原理

为了提高redis的可用性,我们会搭建集群或者主从,现在以主从为例
此时我们去写命令,写在主机上, 主机会将数据同步给从机,但是假设在主机还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并且选举一个slave变成master,而此时新的master中实际上并没有锁信息,此时锁信息就已经丢掉了。
1653553998403.png
为了解决该问题,redission的方案是去掉redis集群主从关系,每一个节点都是平等的。加锁逻辑是需要写入到每一个节点上才算加锁成功。这样,当某一台机器宕机了,这台机器的slave节点变为master节点,此时另一个线程趁虚而入,虽然可以正常写入,但其它机器仍会写入失败,最终结果仍是获取锁失败,从而保证了获取锁的可靠性。
1653554055048.png
MulitLock源码剖析:

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {//        try {//            return tryLockAsync(waitTime, leaseTime, unit).get();//        } catch (ExecutionException e) {//            throw new IllegalStateException(e);//        }long newLeaseTime = -1;if (leaseTime != -1) {if (waitTime == -1) {newLeaseTime = unit.toMillis(leaseTime);} else {newLeaseTime = unit.toMillis(waitTime)*2;}}long time = System.currentTimeMillis();long remainTime = -1;if (waitTime != -1) {remainTime = unit.toMillis(waitTime);}long lockWaitTime = calcLockWaitTime(remainTime);int failedLocksLimit = failedLocksLimit();List<RLock> acquiredLocks = new ArrayList<>(locks.size());for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {RLock lock = iterator.next();boolean lockAcquired;try {if (waitTime == -1 && leaseTime == -1) {lockAcquired = lock.tryLock();} else {long awaitTime = Math.min(lockWaitTime, remainTime);lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);}} catch (RedisResponseTimeoutException e) {unlockInner(Arrays.asList(lock));lockAcquired = false;} catch (Exception e) {lockAcquired = false;}if (lockAcquired) {acquiredLocks.add(lock);} else {if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {break;}if (failedLocksLimit == 0) {unlockInner(acquiredLocks);if (waitTime == -1) {return false;}failedLocksLimit = failedLocksLimit();acquiredLocks.clear();// reset iteratorwhile (iterator.hasPrevious()) {iterator.previous();}} else {failedLocksLimit--;}}if (remainTime != -1) {remainTime -= System.currentTimeMillis() - time;time = System.currentTimeMillis();if (remainTime <= 0) {unlockInner(acquiredLocks);return false;}}}if (leaseTime != -1) {List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());for (RLock rLock : acquiredLocks) {RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);futures.add(future);}for (RFuture<Boolean> rFuture : futures) {rFuture.syncUninterruptibly();}}return true;}
  1. 遍历锁集合,调用lock.tryLock尝试获取锁。将获取结果传给变量lockAcquired
  2. 如果获取成功,将当前锁存放到acquiredLocks集合中
  3. 获取成功后,如果此时的剩余等待时间小于等于0,释放自己已获取的锁,返回false
  4. 如果获取失败,判断是否具备重试机制
    1. 没有重试,则直接返回false
    2. 有重试机制,将acquiredLocks集合清空,将iterator指针前移,重新遍历尝试。

6、结论

目前已接触的分布式锁有:

  • 可不重入锁/自定义分布式锁:

原理: 利用setnx特性、expire避免死锁、添加线程标识避免锁误删
缺点: 仍存在不可重入、失败不可重试、锁超时失效等问题

  • 可重入锁:

原理: 利用hash数据结构存储线程标识和重入次数、利用看门狗机制延续锁失效时间、利用信号量机制控制等待重试时间
缺点: 仍存在集群模式下redis宕机导致的锁失效问题

  • MulitLock

原理: 利用多个平等的redis节点,所有redis都写入才算获取锁成功
缺点: 维护成本高,实现相对复杂

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/768041.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

腾讯云服务器价格查询系统,2024年1年、3年和5年活动价格表

腾讯云服务器多少钱一年&#xff1f;61元一年起。2024年最新腾讯云服务器优惠价格表&#xff0c;腾讯云轻量2核2G3M服务器61元一年、2核2G4M服务器99元一年可买三年、2核4G5M服务器165元一年、3年756元、轻量4核8M12M服务器646元15个月、4核16G10M配置32元1个月、312元一年、8核…

Java生成动态图形验证码

dome /*** ClassName : VerifyCodeController* Description : 图片验证码* Author : llh* Date: 2024-03-22 10:48*/ Controller RequestMapping("/verifycode") public class VerifyCodeController {Resourceprivate StringRedisTemplate stringRedisTemplate;Get…

YOLOv8:Roboflow公开数据集训练模型

Roboflow公开数据集 Roboflow是一个提供计算机视觉数据集管理和处理工具的平台。虽然Roboflow本身并不创建或策划公开数据集&#xff0c;但它提供了一系列功能&#xff0c;帮助用户组织、预处理、增强和导出计算机视觉数据集。 官方网站&#xff1a;https://universe.roboflow…

求解完全背包问题

10.求解完全背包问题 - 蓝桥云课 (lanqiao.cn) import os import sys# 请在此输入您的代码 taotal_w,nmap(int,input().split()) w[] v[] dp[0]*(taotal_w1) #物品无限使用不用考虑 for i in range(n):wi,vimap(int,input().split())w.append(wi)v.append(vi)for i in range(n…

Java学习笔记01

1.1 Java简介 Java的前身是Oak&#xff0c;詹姆斯高斯林是java之父。 1.2 Java体系 Java是一种与平台无关的语言&#xff0c;其源代码可以被编译成一种结构中立的中间文件&#xff08;.class&#xff0c;字节码文件&#xff09;于Java虚拟机上运行。 1.2.3 专有名词 JDK提…

【课程】MyBatisPlus视频教程

MyBatis-Plus是一款非常强大的MyBatis增强工具包,只做增强不做改变. 在不用编写任何SQL语句的情况下即可以极其方便的实现单一、批量、分页等操作。 本套教程基于MyBatis-Plus新2.3版本,详细讲授&#xff1a;集成Mybatis-Plus、 通用CRUD、EntityWrapper条件构造器、ActiveRec…

C++String类

1. 前言 String是C中操作字符串的类&#xff0c;它是在比较早的时候设计的STL模板&#xff0c;因此在某些地方设计的有些冗余 对于String类&#xff0c;不仅仅是学会使用它&#xff0c;更重要的是要从底层去理解它&#xff1b;本篇文章将从底层出发&#xff0c;模拟实现常用的S…

2024年阿里云服务器价格查询系统,最新报价

2024年腾讯云服务器优惠价格表&#xff0c;一张表整理阿里云服务器最新报价&#xff0c;阿里云服务器网整理云服务器ECS和轻量应用服务器详细CPU内存、公网带宽和系统盘详细配置报价单&#xff0c;大家也可以直接移步到阿里云CLUB中心查看 aliyun.club 当前最新的云服务器优惠券…

反激电源进阶及充电器基础认知

前言 反激开关电源核心工作原理&#xff0c;学会了这个原理&#xff0c;就代表着你的双脚已经全部跨入了开关电源世界的大门了。_哔哩哔哩_bilibili 最近不小心看了上面这个视频&#xff0c;有点感觉。 本文是 从开关电源&#xff08;BMS充电器&#xff09;入门硬件之——开…

代码随想录|Day26|贪心01|455.分发饼干、376.摆动序列、53.最大子数组和

455.分发饼干 大尺寸的饼干既可以满足胃口大的孩子也可以满足胃口小的孩子。 局部最优&#xff1a;尽量确保每块饼干被充分利用 全局最优&#xff1a;手上的饼干可以满足尽可能多的孩子 思路&#xff1a;大饼干 尽量分给 大胃口孩子 将小孩和饼干数组排序&#xff0c;我们从大到…

洛谷day3

B2053 求一元二次方程 - 洛谷 掌握printf用法&#xff1b; #include <iostream> #include <cmath> using namespace std; double a,b,c; double delta; double x1,x2;int main() {cin>>a>>b>>c;delta b*b-4*a*c;if(delta>0){x1 (-bsqrt…

Windows 系统 隐藏C++ 控制台输入的字符

在控制台写了一个小程序,一个简单的登录账户的代码,发现用户名显示很正常,但是在输入密码的时候也显示出来就比较尴尬,所以需要在输入密码的时候把字符隐藏掉. windows和linux实现方式不一样,我在windows下实现的,使用了windows的函数. #include <iostream> #include &…

ensp ppp验证实验(二)

实验拓扑&#xff1a; 1、R1和R2使用PPP链路直连&#xff0c;R2和R3把2条PPP链路捆绑为PPP MP直连 2、按照图示配置IP地址 3、R2对R1的PPP进行单向chap验证 4、R2和R3的PPP进行双向chap验证 实验内容&#xff1a; R1配置&#xff1a; #修改名称 <Huawei>sys Enter …

一些规律、现象

图文部分由COPILOT生成。 规律详情 墨菲定律 墨菲定律(Murphys Law) 一件事可能出错时就一定会出错。 图&#xff1a;AI生成 破窗效应 破窗效应(Broken windows theory&#xff09;是犯罪心理学理论。以一幢有少许破窗的建筑为例&#xff0c;如果那些窗没修理好&#xff0…

vue基础——java程序员版(vuex)

​ vuex可以定义共享数据。 1、主要结构 src/store/index.js 是使用vuex的核心js文件。 定义数据&#xff1a;state 修改数据(同步)&#xff1a;mutations 修改数据(异步)&#xff1a;action调用>mutations 下面定义了一个公共数据msg &#xff0c;mutations方法setName…

ShardingSphere水平分表——开发经验(2)

1. 什么场景下分表&#xff1f; 数据量过大或者数据库表对应的磁盘文件过大。 Q&#xff1a;多少数据分表&#xff1f; A&#xff1a;网上有人说1kw&#xff0c;2kw&#xff1f;不准确。 1、一般看字段的数量&#xff0c;有没有包含text类型的字段。我们的主表里面是不允许有t…

【晴问算法】入门篇—日期处理—日期先后

题目描述 给定两个日期DAY1和DAY2&#xff0c;判断DAY1是否在DAY2之前。输入描述 前两行分别为日期DAY1和DAY2(格式为YYYY-MM-DD&#xff0c;范围为1900-01-01≤DAY≤2199-12-31)&#xff0c;数据保证一定合法。输出描述 如果DAY1在DAY2之前&#xff0c;那么输出YES&#xff0c…

从零开始学HCIA之网络基础知识02

1、TCP/IP&#xff08;Transmission Control Protocol/Internet Protocol&#xff09;参考模型&#xff0c;它是当下实际的业界标准。 2、TCP/IP这个名字来自该协议簇中两个非常重要的协议&#xff0c;一个是IP&#xff08;Internet Protocol&#xff09;&#xff0c;另一个是T…

大厂面试--列举并解释一下 http的所有请求方法?

HTTP请求方法 HTTP/1.1定义的请求方法有8种: GET、POST、 PUT、 DELETE、PATCH、 HEAD、OPTIONS、TRACE。 最常的两种GET和POST&#xff0c;如果是RESTful接口的话- -般会 用到GET、POST、 DELETE、 PUT。 OPTIONS 返回服务器针对特定资源所支持的HTTP请求方法&#xff0c;也…

IOS面试题编程机制 51-55

51. 在iPhone应用中如何保存数据?有以下几种保存机制: 1).通过web服务,保存在服务器上 2).通过NSCoder固化机制,将对象保存在文件中 3).通过SQlite或CoreData保存在文件数据库中52. 阐述Block 的理解?并写出一个使用Block执行UIVew动画?Block是可以获取其他函数局部变量的…