AQS介绍
AQS是一个抽象类,主要用来构建锁和同步器。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
}
AQS为构建锁和同步器提供了一些通用功能的实现,因此,使用AQS能简单且高效的构造出应用广泛的大量的同步器,比如常用的ReentrantLock,Seamphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue等都是基于AQS的。
AQS原理
AQS核心思想
AQS核心思想是,如果被请求的共享资源空闲, 则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要阻塞线程并且在资源释放时唤醒并未等待的线程分配锁。这个机制AQS是基于CLH锁实现的。
CLH锁是对自旋锁的一种改进,是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在节点之间的关联关系),暂时获取不到锁的线程将被加入到该队列中,AQS将每个请求共享资源的线程封装成一个CLH队列锁的一个结点(Node)来实现锁的分配。在CLH队列锁中,一个节点表示一个线程,他保存着线程的引用、当前节点在队列中的状态、前驱节点、后继节点。
CLH队列结构如图所示
AQS核心原理图
AQS使用int成员变量state表示同步状态,通过内置的FIFO线程等待队列来完成获取资源线程的排队工作。
state变量由volatile修饰,用于展示当前临界资源的获锁情况。
// 共享变量,使用volatile修饰保证线程可见性
private volatile int state;
另外,状态信息state可以通过protected类型的getState()、setState()、和compareAndSetState()进行操作。并且,这几个方法都是final修饰的,在子类中无法被重写。
//返回同步状态的当前值
protected final int getState() {return state;
}// 设置同步状态的值
protected final void setState(int newState) {state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
以可重入的互斥锁ReentrantLock为例,他的内部维护了一个state变量,用来表示锁的占用状态,state的初始值为0,表示锁处于未锁定状态。当线程A调用lock()方法时,会尝试通过tryAcquire()方法独占该锁,并让state的值加1.如果成功,那么线程A就获取到了锁,如果失败,那么线程A就会被加入到一个等待队列(CLH)中,直到其他线程释放该锁。假设线程A获取锁成功了,释放锁之前,A线程自己可以重复获取此锁(state的值会累加)。这就是可重入性的表现。
一个线程可以多次获取同一个锁而不会被阻塞。但是,这也意味着,一个线程必须释放与获取的次数相同的锁,才能让state的值回到0,也就是让锁恢复到未锁定状态。只有这样,其他等待的线程才能有机会获取该锁。
示意图如下:
再以CountDownLatch为例,任务分为N个子线程去执行,state初始化为N(N要与线程个数一致)。这N个子线程开始执行任务,每执行完一个子线程,就调用一次countDown()方法,该方法会尝试使用CAS操作,让state的值减1,当所有的子线程都执行完毕(即state的值为0),CountDownLatch会调用unpack()方法,唤醒主线程,这时,主线程就可以从await()方法返回,继续执行后续操作。
AQS资源共享方式
AQS定义两种资源共享方式:Exclusive(独占式,只有一个线程能执行,如ReentrantLock)和Share(共享式,多个线程同时执行 如Seamphore,CountDownLatch)。
一般来说,自定义同步器的共享方式要么是独占,要么是共享,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式ReentrantReadWriteLock。
自定义同步器
同步器的设计是基于模版方法模式的,如果需要自定义同步器的一般方式如下(模版方法模式的经典应用):
- 使用者继承AbstractQueuedSychronized并重写指定的方法。
- 将AQS组合在自定义同步组件的实现中,并调用其模版方法,而这些模版方法会调用使用者重写的方法。
AQS使用了模版方法模式,自定义同步器时需要重写下面几个AQS提供的钩子方法:
//独占方式。尝试获取资源,成功则返回true,失败则返回false。
protected boolean tryAcquire(int)
//独占方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryRelease(int)
//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected int tryAcquireShared(int)
//共享方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryReleaseShared(int)
//该线程是否正在独占资源。只有用到condition才需要去实现它。
protected boolean isHeldExclusively()
什么是钩子方法:钩子方法是一种被声明在抽象类中的方法,一般使用protected关键字修饰,它可以是空方法(由子类实现),也可以是默认实现方法,模版设计模式通过钩子方法控制固定步骤的实现。
常见的同步工具类(AQS子类)
Seamphore(信号量)
介绍
Sychrnoized和ReentrantLock都是一次只允许一个线程访问某个资源,而Seamphore(信号量)可以用来控制同时访问特定资源的线程数量
。
Seamphore的使用很简单,假如有N(N>5)个线程来获取Seamphore中的共享资源,下面代码表示同一时刻N个线程中只有5个线程能获取到共享资源,其他线程都会阻塞,只有获取到共享资源的线程才能执行,等到有线程释放了共享资源,其他阻塞的线程才能获取到。
// 初始共享资源数量
final Semaphore semaphore = new Semaphore(5);
// 获取1个许可
semaphore.acquire();
// 释放1个许可
semaphore.release();
当初始的资源个数为1的时候,Seamphore退化为排他锁
。
Seamphore有两种模式
- 公平模式:调用acquire()方法的顺序就是获取许可证的顺序,遵循FIFO。
- 非公平模式:抢占式的。
对应的两个构造方法
public Semaphore(int permits) {sync = new NonfairSync(permits);
}public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
这两个构造方法,都必须提供许可(permits)的数量,第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式。
Seamphore通常用于那些资源有明确访问数量限制的场景比如限流(仅限于单机模式,实际项目中推荐使用Redis+Lua脚本实现)。
原理
Seamphore是共享锁的一种实现,默认构造AQS的state值为permits
,其中permits为许可证数量,拿到许可证的线程才能继续执行。
以无参acquire为例,调用seamphore.acquire(),线程尝试获取许可证,如果state>0.则表示可以获取成功,如果state<=0的话,则表示许可证数量不足,获取失败。
如果可以获取成功的话(state>0),会尝试使用CAS操作去修改state的值,如果获取失败则会创建一个Node节点加入等待队列,挂起当前线程。
// 获取1个许可证
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
// 获取一个或者多个许可证
public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits);
}
acquireSharedInterruptibly方法是AbstractQueuedSychronizer的默认实现。
// 共享模式下获取许可证,获取成功则返回,失败则加入等待队列,挂起线程
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 尝试获取许可证,arg为获取许可证个数,当获取失败时,则创建一个节点加入等待队列,挂起当前线程。if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}
// 共享模式下尝试获取资源(在Semaphore中的资源即许可证):
protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);
}
// 非公平的共享模式获取许可证
final int nonfairTryAcquireShared(int acquires) {for (;;) {// 当前可用许可证数量int available = getState();/** 尝试获取许可证,当前可用许可证数量小于等于0时,返回负值,表示获取失败,* *//重要!!!:当前可用许可证大于0时才可能获取成功,CAS失败了会循环重新获取最新的值尝试获取 int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}
}
以无参release方法为例,调用seamphore.release(); 线程尝试释放许可证,并使用CAS操作去修改state的值state=state+1;释放许可证成功之后,同时会唤醒等待队列中的一个线程。被唤醒的线程会重新尝试去修改state的值state=state-1;如果state>0则令牌获取成功,否则重新进入等待队列,挂起线程。
// 释放一个许可证
public void release() {sync.releaseShared(1);
}
// 释放一个或者多个许可证
public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.releaseShared(permits);
}
releaseShared方法是AbstractQueuedSychronized中的默认实现
// 释放共享锁
// 如果 tryReleaseShared 返回 true,就唤醒等待队列中的一个或多个线程。
public final boolean releaseShared(int arg) {//释放共享锁if (tryReleaseShared(arg)) {//释放当前节点的后置等待节点doReleaseShared();return true;}return false;
}
tryReleaseShared方法是Seamphore内部类Sync重写的一个方法。
// 内部类 Sync 中重写的一个方法
// 尝试释放资源
protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();// 可用许可证+1int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");// CAS修改state的值if (compareAndSetState(current, next))return true;}
}
代码示例:
public class SemaphoreExample {// 请求的数量private static final int threadCount = 550;public static void main(String[] args) throws InterruptedException {// 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)ExecutorService threadPool = Executors.newFixedThreadPool(300);// 初始许可证数量final Semaphore semaphore = new Semaphore(20);for (int i = 0; i < threadCount; i++) {final int threadnum = i;threadPool.execute(() -> {// Lambda 表达式的运用try {semaphore.acquire();// 获取一个许可,所以可运行线程数量为20/1=20test(threadnum);semaphore.release();// 释放一个许可} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}});}threadPool.shutdown();System.out.println("finish");}public static void test(int threadnum) throws InterruptedException {Thread.sleep(1000);// 模拟请求的耗时操作System.out.println("threadnum:" + threadnum);Thread.sleep(1000);// 模拟请求的耗时操作}
}
Seamphore获取凭证的两个核心方法acquire()、tryAcquire();其中acquire()方法是阻塞方法,获取不到凭证会加入CLH等待队列,tryAcquire()方法,如果获取不到凭证会立即返回false。
Seamphore与CountDownLatch一样,也是共享锁的一种实现。它默认构造AQS的state为permits,当执行任务的线程数量超出permits,那么多余的线程将会被放入等待队列park,并自旋判断state是否大于0,只有当state大于0的时候,阻塞的线程才能继续执行,此时先前执行任务的线程继续执行release()方法,release()方法使得state的变量加1,那么自旋的线程便会判断成功,如此,每次只有最多不超过permits数量的线程能自旋成功,便限制了执行任务线程的数量。
CountDownLatch(倒计数器)
介绍
CountDownLatch允许count个线程阻塞在一个地方,直至所有线程的任务都执行完毕。
CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,他不能再次被使用。
原理
CountDownLatch是共享锁的一种实现,他默认构造AQS的state值为count,这个通过构造方法可以看出。
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);
}private static final class Sync extends AbstractQueuedSynchronizer {Sync(int count) {setState(count);}//...
}
当线程调用countDown()时,其实使用了tryReleaseShared方法以CAS的操作来减少state,直至state为0.当state为0时,表示所有线程都调用了countDown方法,那么在CountDownLatch上等待的线程就会被唤醒并继续执行。
public void countDown() {// Sync 是 CountDownLatch 的内部类 , 继承了 AbstractQueuedSynchronizersync.releaseShared(1);
}
releaseShared方法是AbstractQueuedSychronized中的默认实现
// 释放共享锁
// 如果 tryReleaseShared 返回 true,就唤醒等待队列中的一个或多个线程。
public final boolean releaseShared(int arg) {//释放共享锁if (tryReleaseShared(arg)) {//释放当前节点的后置等待节点doReleaseShared();return true;}return false;
}
tryReleaseShared方法是CountDownLatch内部类Sync重写的一个方法,AbstractQueuedSychronizer中的默认实现仅仅抛出UnsupportedOperationException异常。
// 对 state 进行递减,直到 state 变成 0;
// 只有 count 递减到 0 时,countDown 才会返回 true
protected boolean tryReleaseShared(int releases) {// 自选检查 state 是否为 0for (;;) {int c = getState();// 如果 state 已经是 0 了,直接返回 falseif (c == 0)return false;// 对 state 进行递减int nextc = c-1;// CAS 操作更新 state 的值if (compareAndSetState(c, nextc))return nextc == 0;}
}
当主线程调用await()的时候【实际上就是尝试获取state】,如果state不为0,那就证明任务还没有执行完毕,await()就会一直阻塞,也就是说await()之后的语句不会被执行(主线程被加入到等待队列即CLH中了)。然后CountDownLatch会自旋CAS判断state==0,如果state==0的话,就会释放所有等待的线程,await()方法之后的语句就可以得到执行。
// 等待(也可以叫做加锁)
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
// 带有超时时间的等待
public boolean await(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
CountDownLatch典型用法
- 某个线程在开始运行之前等待n个线程执行完毕:将CountDownLatch的计数器初始化为n【new CountDownLatch(n)】;每当一个线程执行完毕,将计数器减1【countDown()】; 当计数器的值变为0时,在CountDownLatch上await()的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
- 多个线程开始执行任务的最大并行性:注意是并行性、不是并发,强调的是多个线程在某一个时刻同时开始执行,类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch对象,将其计数器初始化为1,多个线程在开始执行任务之前首先countdownLatch.await(),当主线程调用countDown()时,计数器变为0,多个线程同时被唤醒。
!!! 注意
:不管任务有没有抛出异常,都需要调用countDown()将计数器最终归0,否则很容易造成死锁。当然也可以等待一定的时间如await(30,TimeUnit.Seconds).
CyclicBarrier(循环栅栏)
介绍
先通俗理解:N个运动一起赛跑。但出发时间随机,在终点设置一个裁判负责计时,只有当最后一个运动员也到了终点,裁判才会公布所有人的成绩。
CyclicBarrier和CountDownLatch非常类似,他也可以实现线程间的技术等待,但是他的功能比CountDownLatch更加复杂和强 大,主要应用场景和CountDownLatch类似。
CountDownLatch是基于AQS实现的,而CyclicBarrier是基于ReentrantLock(ReentrantLock也属于AQS同步器)和Condition的。
CyclicBarrier字面意思是可循环使用(Cyclic)和屏障(Barrier)。他要做的事情就是:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
原理
CyclicBarrier内部通过一个count变量作为计数器,count的初始值为parties属性的初始化值,每当一个线程到了栅栏这里,那么就将计数器减1.如果count值为0了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。
//每次拦截的线程数
private final int parties;
//计数器
private int count;
结合源码来看
- CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
public CyclicBarrier(int parties) {this(parties, null);
}public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;//这个Runnble类型参数,表示所有的任务都到了屏障以后,要做的一个操作。this.barrierCommand = barrierAction;
}
- 当调用CyclicBarrier对象的await方法时,实际上调用的是dowait(false,0L)方法。await()方法就像树立起一个栅栏的行为一样,将线程挡住了,当拦住的线程数量达到parties的值时,栅栏才会打开,线程得以通行。
public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}
}
// 当线程数量或者请求数量达到 count 时 await 之后的方法才会被执行。上面的示例中 count 的值就为 5。private int count;/*** Main barrier code, covering the various policies.*/private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;// 锁住lock.lock();try {//。。。。。。。缩略// count 减1int index = --count;// 当 count 数量减为 0 之后说明最后一个线程已经到达栅栏了,也就是达到了可以执行await 方法之后的条件if (index == 0) { // trippedboolean ranAction = false;try {final Runnable command = barrierCommand;if (command != null)// 所有线程都到了,栅栏打开,执行构造函数中的方法command.run();ranAction = true;// 将 count 重置为 parties 属性的初始化值// 唤醒之前等待的线程// 下一波执行开始nextGeneration();return 0;} finally {if (!ranAction)breakBarrier();}}for (;;) {try {//线程还未全部到达,就开始循环,除非发生线程阻塞、超时等待时间到、打断等行为,否则始终循环等待index==0}} finally {lock.unlock();}}
示例
public class CyclicBarrierExample1 {// 请求的数量private static final int threadCount = 550;// 需要同步的线程数量private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);public static void main(String[] args) throws InterruptedException {// 创建线程池ExecutorService threadPool = Executors.newFixedThreadPool(10);for (int i = 0; i < threadCount; i++) {final int threadNum = i;Thread.sleep(1000);threadPool.execute(() -> {try {test(threadNum);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (BrokenBarrierException e) {// TODO Auto-generated catch blocke.printStackTrace();}});}threadPool.shutdown();}public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {System.out.println("threadnum:" + threadnum + "is ready");try {/**等待60秒,保证子线程完全执行结束*/cyclicBarrier.await(60, TimeUnit.SECONDS);} catch (Exception e) {System.out.println("-----CyclicBarrierException------");}System.out.println("threadnum:" + threadnum + "is finish");}}
运行结果:
threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
//cyclicBarrier为5,计数器到
threadnum:4is finish
threadnum:0is finish
threadnum:1is finish
threadnum:2is finish
threadnum:3is finish
//开始新的一轮
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
//cyclicBarrier为5,计数器到
threadnum:9is finish
threadnum:5is finish
threadnum:8is finish
threadnum:7is finish
threadnum:6is finish
......