前言
相关系列
- 《Java & AQS & 目录》
- 《Java & AQS & CountDownLatch & 源码》
- 《Java & AQS & CountDownLatch & 总结》
- 《Java & AQS & CountDownLatch & 问题》
涉及内容
- 《Java & AQS & 总结》
- 《Java & AQS & CyclicBarrier & 总结》
源码
/** ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.*********************//******* Written by Doug Lea with assistance from members of JCP JSR-166* Expert Group and released to the public domain, as explained at* http://creativecommons.org/publicdomain/zero/1.0/*/package juc;import juc.locks.AbstractQueuedSynchronizer;import java.util.concurrent.TimeUnit;/*** A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads* completes.* 一个允许一个或更多线程等待直至一套操作在其它线程中执行完成的同步辅助工具。* <p>* A {@code CountDownLatch} is initialized with a given <em>count</em>. The {@link #await await} methods block until the* current count reaches zero due to invocations of the {@link #countDown} method, after which all waiting threads are released* and any subsequent invocations of {@link #await await} return immediately. This is a one-shot phenomenon -- the count* cannot be reset. If you need a version that resets the count, consider using a {@link CyclicBarrier}.* 倒数闭锁随指定[总数]初始化。await()方法会阻塞知道当前[总数]由于调用countDown()方法达到0,在这之后所有等待的线程都被* 释放并且随后调用await()方法会立即返回。这是一个单次现象,[总数]无法重置。如果需要一个重置[总数]的版本,考虑使用循环* 栅栏类。* <p>* A {@code CountDownLatch} is a versatile synchronization tool and can be used for a number of purposes. A* {@code CountDownLatch} initialized with a count of one serves as a simple on/off latch, or gate: all threads invoking* {@link #await await} wait at the gate until it is opened by a thread invoking {@link #countDown}. A {@code CountDownLatch}* initialized to <em>N</em> can be used to make one thread wait until <em>N</em> threads have completed some action, or some* action has been completed N times.* 倒数闭锁是一个多功能同步工具,并且可以使用目标数字(即人为设置[总数])。一个倒数闭锁随着[总数]初始化充当一个简单的开/* 关闭锁或阀门:所有线程调用await()方法在阀门处等待直至通过一个线程调用countDown()方法打开。倒数闭锁随着N初始化可以令一* 个线程等待直至N条线程完成一些行为,或一个行为已经完成N次。* <p>* A useful property of a {@code CountDownLatch} is that it doesn't require that threads calling {@code countDown} wait for the* count to reach zero before proceeding, it simply prevents any thread from proceeding past an {@link #await await} until all* threads could pass.* 倒数闭锁的有用特性是它不要求调用countDown()方法的线程在开始行动之前为了[总数]达到0而等待,它只是防止任何线程执行await()* 后(的代码),直到所有线程都可以通过(即不阻止任务本身的执行,只是阻止任务结束后继续向下运行,直到所有的线程都可以向下* 通过)。* <p>* <b>Sample usage:</b> Here is a pair of classes in which a group of worker threads use two countdown latches:* 用法样例:这是一对类在一组工作者中线程使用倒数闭锁:* <ul>* <li>The first is a start signal that prevents any worker from proceeding until the driver is ready for them to proceed;* 第一个是一个防止任意工作者执行直到驱动已准备好执行的开始信号;* <li>The second is a completion signal that allows the driver to wait until all workers have completed.* 第二是允许驱动等待直至所有工作已完成的完成信号。* </ul>** <pre> {@code* class Driver { // ...* void main() throws InterruptedException {* // 实例化开始信号和结束信号。* CountDownLatch startSignal = new CountDownLatch(1);* CountDownLatch doneSignal = new CountDownLatch(N);* // create and start threads* // 创建和开始线程。* for (int i = 0; i < N; ++i)* new Thread(new Worker(startSignal, doneSignal)).start();* // don't let run yet* // 好不让运行。* doSomethingElse();* // let all threads proceed* // 让所有线程开始执行* startSignal.countDown();* doSomethingElse();* // wait for all to finish* // 等到所有线程结束* doneSignal.await();* }* }** class Worker implements Runnable {* private final CountDownLatch startSignal;* private final CountDownLatch doneSignal;** Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {* this.startSignal = startSignal;* this.doneSignal = doneSignal;* }** public void run() {* try {* // 等待开始。* startSignal.await();* doWork();* // 倒数。* doneSignal.countDown();* } catch (InterruptedException ex) {} // return;* }** void doWork() { ... }* }}</pre>** <p>* Another typical usage would be to divide a problem into N parts, describe each part with a Runnable that executes that portion* and counts down on the latch, and queue all the Runnables to an Executor. When all sub-parts are complete, the coordinating* thread will be able to pass through await. (When threads must repeatedly count down in this way, instead use a* {@link CyclicBarrier}.)* 其它典型的用法会分割一个问题为N个比分,设计每个部分随一个可运行部分执行并在闭锁中倒数,并且将所有的可运行入队至一个执* 行器(即把一个任务拆分为多个部分,每个部分共用一个倒数闭锁,并且通过执行器执行,每完成一个就倒数)。当所有子部分完成,* 协调线程将可以通过等待。(当线程必须在这个方法中重复倒数,使用一个循环栅栏替换)* <pre> {@code* class Driver2 { // ...* void main() throws InterruptedException {* CountDownLatch doneSignal = new CountDownLatch(N);* Executor e = ...* // create and start threads* // 开始线程并启动。* for (int i = 0; i < N; ++i)* e.execute(new WorkerRunnable(doneSignal, i));* // wait for all to finish* // 等待所有线程执行结束。* doneSignal.await();* }* }** class WorkerRunnable implements Runnable {* private final CountDownLatch doneSignal;* private final int i;* WorkerRunnable(CountDownLatch doneSignal, int i) {* this.doneSignal = doneSignal;* this.i = i;* }* public void run() {* try {* doWork(i);* doneSignal.countDown();* } catch (InterruptedException ex) {} // return;* }** void doWork() { ... }* }}</pre>** <p>* Memory consistency effects: Until the count reaches zero, actions in a thread prior to calling {@code countDown()}* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> actions following a successful return from a* corresponding {@code await()} in another thread.* 内存一致性影响:直到[总数]达到0,线程中优先于countDown()方法调用的行为先行发生于另一个线程中随着一次从相关await()* 方法中返回的行为。** @author Doug Lea* @Description: 闭锁(计数器)* @since 1.5*/
public class CountDownLatch {/*** Synchronization control For CountDownLatch. Uses AQS state to represent count.* 倒数的同步控制,使用AQS状态表示总数。** @Description: --------------------------------------------------------------- 名称 ---------------------------------------------------------------* @Description: 同步类* @Description: --------------------------------------------------------------- 作用 ---------------------------------------------------------------* @Description: 作为倒数闭锁的底层同步机制。* @Description: --------------------------------------------------------------- 逻辑 ---------------------------------------------------------------* @Description: ~* @Description: --------------------------------------------------------------- 注意 ---------------------------------------------------------------* @Description: ~* @Description: --------------------------------------------------------------- 疑问 ---------------------------------------------------------------* @Description: ~*/private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;/*** Synchronization control For CountDownLatch. Uses AQS state to represent count.* 倒数的同步控制,使用AQS状态表示总数。** @Description: ------------------------------------------------------------- 名称 -------------------------------------------------------------* @Description: ~* @Description: ------------------------------------------------------------- 作用 -------------------------------------------------------------* @Description: 通过指定总数创建同步。* @Description: ------------------------------------------------------------- 逻辑 -------------------------------------------------------------* @Description: 方法直接使用父类AQS的[状态]保存制定总数。* @Description: ------------------------------------------------------------- 注意 -------------------------------------------------------------* @Description: ~* @Description: ------------------------------------------------------------- 疑问 -------------------------------------------------------------* @Description: ~*/Sync(int count) {setState(count);}/*** @Description: ------------------------------------------------------------- 名称 -------------------------------------------------------------* @Description: 获取总数* @Description: ------------------------------------------------------------- 作用 -------------------------------------------------------------* @Description: 获取当前同步的总数。* @Description: ------------------------------------------------------------- 逻辑 -------------------------------------------------------------* @Description: 方法直接返回[状态]。* @Description: ------------------------------------------------------------- 注意 -------------------------------------------------------------* @Description: ~* @Description: ------------------------------------------------------------- 疑问 -------------------------------------------------------------* @Description: ~*/int getCount() {return getState();}/*** @Description: ------------------------------------------------------------- 名称 -------------------------------------------------------------* @Description: 尝试获取共享* @Description: ------------------------------------------------------------- 作用 -------------------------------------------------------------* @Description: 判断当前同步的总数是否为0,是则返回1;否则返回-1。* @Description: ------------------------------------------------------------- 逻辑 -------------------------------------------------------------* @Description: 方法直接将[状态]与0进行判断。* @Description: ------------------------------------------------------------- 注意 -------------------------------------------------------------* @Description: ~* @Description: ------------------------------------------------------------- 疑问 -------------------------------------------------------------* @Description: ~*/@Overrideprotected int tryAcquireShared(int acquires) {// 判断可用许可是否为0。return (getState() == 0) ? 1 : -1;}/*** @Description: ------------------------------------------------------------- 名称 -------------------------------------------------------------* @Description: 尝试释放共享* @Description: ------------------------------------------------------------- 作用 -------------------------------------------------------------* @Description: 递减当前同步的总数,如果递减后总数为0则返回true;否则返回false。但如果总数已经为0无法递减则返* @Description: 回false。* @Description: ------------------------------------------------------------- 逻辑 -------------------------------------------------------------* @Description: 方法首先会判断[状态]是否为0,是则直接返回false;否则对[总数]进行CAS递减。由于并发可能会导致递* @Description: 减的CAS操作失败,因此整个需要以循环的方式进行。当递减成功后判断[总数]是否为0,是则返回true;* @Description: 否则返回false。* @Description: ------------------------------------------------------------- 注意 -------------------------------------------------------------* @Description: ~* @Description: ------------------------------------------------------------- 疑问 -------------------------------------------------------------* @Description: ~*/@Overrideprotected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zero// 递减总数,当转变为0时发送信号,for (; ; ) {// 获取可用许可(快照),如果等于0,则直接返回失败。int c = getState();if (c == 0) {return false;}// 释放一个共享许可,即通过CAS减少一个计数。如果失败,说明有其它线程一同参与了竞争,自旋重试,直至成功为止。int nextc = c - 1;if (compareAndSetState(c, nextc)) {return nextc == 0;}}}}/*** @Description: ------------------------------------------------------------- 名称 -------------------------------------------------------------* @Description: 同步* @Description: ------------------------------------------------------------- 作用 -------------------------------------------------------------* @Description: 持有当前倒数闭锁同步的引用。* @Description: ------------------------------------------------------------- 逻辑 -------------------------------------------------------------* @Description: ~* @Description: ------------------------------------------------------------- 注意 -------------------------------------------------------------* @Description: ~* @Description: ------------------------------------------------------------- 疑问 -------------------------------------------------------------* @Description: ~*/private final Sync sync;/*** Constructs a {@code CountDownLatch} initialized with the given count.* 通过指定总数初始化构造导数闭锁。** @param count the number of times {@link #countDown} must be invoked before threads can pass through* {@link #await}* countDown()方法在线程可以通过await()方法之前必须调用的次数。* @throws IllegalArgumentException if {@code count} is negative* 非法参数异常:如果总数为负* @Description: ------------------------------------------------------------- 名称 -------------------------------------------------------------* @Description: ~* @Description: ------------------------------------------------------------- 作用 -------------------------------------------------------------* @Description: 通过指定总数创建倒数闭锁。* @Description: ------------------------------------------------------------- 逻辑 -------------------------------------------------------------* @Description: 方法首先会判断指定总数是否为小于0,是则世界抛出非法参数异常;否则将之作为构造参数创建同步。* @Description: ------------------------------------------------------------- 注意 -------------------------------------------------------------* @Description: ~* @Description: ------------------------------------------------------------- 疑问 -------------------------------------------------------------* @Description: ~*/public CountDownLatch(int count) {// 如果总数的值小于0,则直接抛出异常。if (count < 0) throw new IllegalArgumentException("count < 0");// 声明一个同步机制实例。this.sync = new Sync(count);}/*** Causes the current thread to wait until the latch has counted down to zero, unless the thread is* {@linkplain Thread#interrupt interrupted}.* 令当前线程等待直至闭锁已经倒数为0,除非线程被中断。* <p>* If the current count is zero then this method returns immediately.* 如果当前总数为0那么该方法立即返回。* <p>* If the current count is greater than zero then the current thread becomes disabled for thread scheduling purposes and* lies dormant until one of two things happen:* 如果当前总数大于0那么当前线程的预期目的将无效并休眠直至两件事情中的意见发生:* <ul>* <li>The count reaches zero due to invocations of the {@link #countDown} method; or* <li>Some other thread {@linkplain Thread#interrupt interrupts} the current thread.* 总数由于countDown()方法的调用而达到0;或者某些其它线程中断当前线程。* </ul>* <p>* If the current thread:* 如果当前线程:* <ul>* <li>has its interrupted status set on entry to this method; or* <li>is {@linkplain Thread#interrupt interrupted} while waiting,* 在进入当前方法时(前)被设置中断状态;或者在等待期间被中断,* </ul>* then {@link InterruptedException} is thrown and the current thread's interrupted status is cleared.* 那么会抛出中断异常并且当前线程的中断状态会被清理。** @throws InterruptedException if the current thread is interrupted while waiting* 中断异常:如果当前线程在等待期间被中断* @Description: ------------------------------------------------------------- 名称 -------------------------------------------------------------* @Description: 等待* @Description: ------------------------------------------------------------- 作用 -------------------------------------------------------------* @Description: 通过当前倒数闭锁令当前线程等待。当总数不为0时等待,否则没有实际作用。当总数由不为0变为0时当前* @Description: 线程将被唤醒。* @Description: ------------------------------------------------------------- 逻辑 -------------------------------------------------------------* @Description: 方法直接通过同步的acquireSharedInterruptibly(int arg)方法实现。内部逻辑是判断当前同步[总数]是否为* @Description: 0,是则什么也不错;否则令当前线程进入等待状态,直至[总数]归0被相关线程唤醒。* @Description: ------------------------------------------------------------- 注意 -------------------------------------------------------------* @Description: ~* @Description: ------------------------------------------------------------- 疑问 -------------------------------------------------------------* @Description: ~*/public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}/*** Causes the current thread to wait until the latch has counted down to zero, unless the thread is* {@linkplain Thread#interrupt interrupted}, or the specified waiting time elapses.* 令当前线程等待直至闭锁倒数为0,除非线程被中断,或者指定等待时间消逝。* <p>* If the current count is zero then this method returns immediately with the value {@code true}.* 如果当前总数为0那么当前方法立即返回true。* <p>* If the current count is greater than zero then the current thread becomes disabled for thread scheduling purposes and* lies dormant until one of three things happen:* 如果当前总数大于0那么当前线程的预期目的变的无效并且休眠直至三种情况的其中一种发生:* <ul>* <li>The count reaches zero due to invocations of the {@link #countDown} method; or* <li>Some other thread {@linkplain Thread#interrupt interrupts} the current thread; or* <li>The specified waiting time elapses.* </ul>* 总数由于countDown()方法调用达到0;或者某些其它线程中断当前线程;或者指定等待时间消逝。* <p>* If the count reaches zero then the method returns with the value {@code true}.* 如果总数达到0那么方法返回true。* <p>* If the current thread:* 如果当前线程:* <ul>* <li>has its interrupted status set on entry to this method; or* <li>is {@linkplain Thread#interrupt interrupted} while waiting,* </ul>* then {@link InterruptedException} is thrown and the current thread's interrupted status is cleared.* 在进入当前方法是/前被设置中断状态;或者在等待期间被中断则抛出中断异常,并且当前线程的中断状态会被清理。* <p>* If the specified waiting time elapses then the value {@code false} is returned. If the time is less than or equal to zero,* the method will not wait at all.* 如果指定等待时间销毁则值将返回false。如果时间小于或等于0则方法将不会等待。** @param timeout the maximum time to wait 用于等待的最大时间* @param unit the time unit of the {@code timeout} argument 超时参数的时间单位* @return {@code true} if the count reached zero and {@code false} if the waiting time elapsed before the count* reached zero* 如果总数达到0则返回true;否则如果在总数达到0之前等到时间消逝则返回false。* @throws InterruptedException if the current thread is interrupted while waiting* 中断异常:如果当前线程在等待期间被中断* @Description: ------------------------------------------------------------- 名称 -------------------------------------------------------------* @Description: 等待* @Description: ------------------------------------------------------------- 作用 -------------------------------------------------------------* @Description: 通过当前倒数闭锁令当前线程有限等待。当总数不为0时等待,否则没有实际作用。当总数由不为0变为0时* @Description: 当前线程将被唤醒并返回true;如果因为超时而唤醒则返回false;如果因为中断而唤醒则抛出中断异常。* @Description: ------------------------------------------------------------- 逻辑 -------------------------------------------------------------* @Description: 方法直接通过同步的tryAcquireSharedNanos(int arg, long nanosTimeout)方法实现。* @Description: ------------------------------------------------------------- 注意 -------------------------------------------------------------* @Description: ~* @Description: ------------------------------------------------------------- 疑问 -------------------------------------------------------------* @Description: ~*/public boolean await(long timeout, TimeUnit unit) throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}/*** Decrements the count of the latch, releasing all waiting threads if the count reaches zero.* 递减闭锁的总数,如果总数达到0则释放所有的等待线程。* <p>* If the current count is greater than zero then it is decremented. If the new count is zero then all waiting threads are* re-enabled for thread scheduling purposes.* 如果当前总数大于0则递减总数,如果新总数为0则所有等待线程都重新可用于线程预期的目的。* <p>* If the current count equals zero then nothing happens.* 如果当前总数等于0则什么都不会发生。** @Description: ------------------------------------------------------------- 名称 -------------------------------------------------------------* @Description: 倒数* @Description: ------------------------------------------------------------- 作用 -------------------------------------------------------------* @Description: 递减当前倒数闭锁的总数,如果总数为0则什么都不会发生。* @Description: ------------------------------------------------------------- 逻辑 -------------------------------------------------------------* @Description: 方法直接通过同步的releaseShared(int arg)方法实现。内部逻辑是,方法首先会判断当前[状态]是否为0,* @Description: 是则什么也不做;否则递减[总数]。如递减后[总数]为0则唤醒所有的等待线程。* @Description: ------------------------------------------------------------- 注意 -------------------------------------------------------------* @Description: ~* @Description: ------------------------------------------------------------- 疑问 -------------------------------------------------------------* @Description: ~*/public void countDown() {// 该操作的本质是对计数进行递减,当计数为0时会触发对同步队列中节点的唤醒,以达到对拦截的线程放行的目的。sync.releaseShared(1);}/*** Returns the current count.* 返回当前总数* <p>* This method is typically used for debugging and testing purposes.* 该方法通常用于调试或测试目的。** @return the current count 当前总数* @Description: ------------------------------------------------------------- 名称 -------------------------------------------------------------* @Description: 获取总数* @Description: ------------------------------------------------------------- 作用 -------------------------------------------------------------* @Description: 获取当前倒数闭锁的总数。* @Description: ------------------------------------------------------------- 逻辑 -------------------------------------------------------------* @Description: 方法直接通过同步的getCount()实现,即是直接返回同步的[状态]。* @Description: ------------------------------------------------------------- 注意 -------------------------------------------------------------* @Description: ~* @Description: ------------------------------------------------------------- 疑问 -------------------------------------------------------------* @Description: ~*/public long getCount() {return sync.getCount();}/*** Returns a string identifying this latch, as well as its state.* The state, in brackets, includes the String {@code "Count ="}* followed by the current count.*/public String toString() {return super.toString() + "[Count = " + sync.getCount() + "]";}
}