先放结论:主要是实现AbstractQueuedSynchronizer中进入和退出函数,控制不同的进入和退出条件,实现适用于各种场景下的锁。
JAVA中对于线程的同步提供了多种锁机制,比较著名的有可重入锁ReentrantLock,信号量机制Semaphore,队列等待机制CountDownLatch,
通过查看源代码可以,他们都是基于AbstractQueuedSynchronizer实现了自身的功能。
对于AbstractQueuedSynchronizer的讲解,可以看上一篇文章,这里就讲解下,如何通过集成AbstractQueuedSynchronizer实现上述的锁机制。
所谓的锁实现,就是对AbstractQueuedSynchronizer中state变量的抢占,谁先抢占并且修改了这个变量值不为0,谁就获得了锁。
AbstractQueuedSynchronizer保留了几个未实现的接口供子类实现。分别是
protected boolean tryAcquire(intarg) {throw newUnsupportedOperationException();
}protected boolean tryRelease(intarg) {throw newUnsupportedOperationException();
}protected int tryAcquireShared(intarg) {throw newUnsupportedOperationException();
}protected boolean tryReleaseShared(intarg) {throw newUnsupportedOperationException();
}
tryAcquire和tryRelease是用于独占锁的获取和释放,tryAcquireShared和tryReleaseShared是共享锁的获取和释放,下面看下他们分别是什么地方被调用。
/*** Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link#tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link* #tryAcquire} until success. This method can be used
* to implement method {@linkLock#lock}.
*
*@paramarg the acquire argument. This value is conveyed to
* {@link#tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.*/
public final void acquire(intarg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}/*** Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link#tryRelease} returns true.
* This method can be used to implement method {@linkLock#unlock}.
*
*@paramarg the release argument. This value is conveyed to
* {@link#tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
*@returnthe value returned from {@link#tryRelease}*/
public final boolean release(intarg) {if(tryRelease(arg)) {
Node h=head;if (h != null && h.waitStatus != 0)
unparkSuccessor(h);return true;
}return false;
}/*** Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link#tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link* #tryAcquireShared} until success.
*
*@paramarg the acquire argument. This value is conveyed to
* {@link#tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.*/
public final void acquireShared(intarg) {if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}/*** Releases in shared mode. Implemented by unblocking one or more
* threads if {@link#tryReleaseShared} returns true.
*
*@paramarg the release argument. This value is conveyed to
* {@link#tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
*@returnthe value returned from {@link#tryReleaseShared}*/
public final boolean releaseShared(intarg) {if(tryReleaseShared(arg)) {
doReleaseShared();return true;
}return false;
}
在acquire中,如果tryAcquire失败,那么就去等待队列中排队,release中如果tryRelease成功,那么就唤醒下一个等待队列中的线程。
acquireShared中,如果tryAcquireShared失败,那么再次进入循环获取过程,releaseShared中,如果tryReleaseShared成功,那么就唤醒下一个等待队列中的线程。
现在的主要问题是,如何判定上述的tryAcquire、tryRelease、tryAcquireShared和tryReleaseShared的成功和失败,
通过阅读源代码可知:
ReentrantLock是一个线程执行,其他线程等待。
ReentrantLock的实现机制就是:线程lock()时,获取时state变量的值不为0,那么tryAcquire就失败,tryRelease执行完state变量的值==0,表示成功,唤醒等待的线程,否则就是失败。
Semaphore是先分配一定数量的许可证,然后多个线程来抢许可证,抢到就可以执行。
Semaphore的实现机制就是:如果获取时当前state减去申请的信号量数目acquires>0,那么就表示成功,此时 state=state-acquires, 否则失败,释放时,当前释放的信号量不为负数,那么就表示成功,唤醒等待的线程,释放后state=state+acquires.
CountDownLatch是一个线程执行,其他线程等待。
CountDownLatch的实现机制是: 线程如果lock()时,获取时state变量的值不为0,那么tryAcquire就失败,tryRelease执行完state变量的值等于0或者state-1后值等于0,表示成功,唤醒等待的线程,否则就是失败。
ReentrantLock的代码如下:
final boolean nonfairTryAcquire(intacquires) {final Thread current =Thread.currentThread();int c =getState();if (c == 0) {if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);return true;
}
}else if (current ==getExclusiveOwnerThread()) {int nextc = c +acquires;if (nextc < 0) //overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);return true;
}return false;
}protected final boolean tryRelease(intreleases) {int c = getState() -releases;if (Thread.currentThread() !=getExclusiveOwnerThread())throw newIllegalMonitorStateException();boolean free = false;if (c == 0) {
free= true;
setExclusiveOwnerThread(null);
}
setState(c);returnfree;
}
Semaphore代码如下:
final int nonfairTryAcquireShared(intacquires) {for(;;) {int available =getState();int remaining = available -acquires;if (remaining < 0 ||compareAndSetState(available, remaining))returnremaining;
}
}protected final boolean tryReleaseShared(intreleases) {for(;;) {int current =getState();int next = current +releases;if (next < current) //overflow
throw new Error("Maximum permit count exceeded");if(compareAndSetState(current, next))return true;
}
}
CountDownLatch代码如下:
protected int tryAcquireShared(intacquires) {return (getState() == 0) ? 1 : -1;
}protected boolean tryReleaseShared(intreleases) {//Decrement count; signal when transition to zero
for(;;) {int c =getState();if (c == 0)return false;int nextc = c-1;if(compareAndSetState(c, nextc))return nextc == 0;
}
}