CAS
全称(Compare And Swap),比较交换
Unsafe类是CAS的核心类,提供硬件级别的原子操作。
// 对象、对象的地址、预期值、修改值
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
缺点:
- 开销大:在并发量比较高的情况下,如果反复尝试更新某个变量,却又一直更新不成功,会给CPU带来较大的压力
- ABA问题:当变量从A修改为B在修改回A时,变量值等于期望值A,但是无法判断是否修改,CAS操作在ABA修改后依然成功。
- 如何避免:Java提供了AtomicStampedReference和AtomicMarkableReference来解决。AtomicStampedReference通过包装[E,Integer]的元组来对对象标记版本戳stamp,对于ABA问题其解决方案是加上版本号,即在每个变量都加上一个版本号,每次改变时加1,即A —> B —> A,变成1A —> 2B —> 3A。
- 不能保证代码块的原子性:CAS机制所保证的只是一个变量的原子性操作,而不能保证整个代码块的原子性。
public class Test {private static AtomicInteger atomicInteger = new AtomicInteger(100);private static AtomicStampedReference atomicStampedReference = new AtomicStampedReference(100,1);public static void main(String[] args) throws InterruptedException {//AtomicIntegerThread at1 = new Thread(new Runnable() {@Overridepublic void run() {atomicInteger.compareAndSet(100,110);atomicInteger.compareAndSet(110,100);}});Thread at2 = new Thread(new Runnable() {@Overridepublic void run() {try {TimeUnit.SECONDS.sleep(2); // at1,执行完} catch (InterruptedException e) {e.printStackTrace();}System.out.println("AtomicInteger:" + atomicInteger.compareAndSet(100,120));}});at1.start();at2.start();at1.join();at2.join();//AtomicStampedReferenceThread tsf1 = new Thread(new Runnable() {@Overridepublic void run() {try {//让 tsf2先获取stamp,导致预期时间戳不一致TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}// 预期引用:100,更新后的引用:110,预期标识getStamp() 更新后的标识getStamp() + 1atomicStampedReference.compareAndSet(100,110,atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1);atomicStampedReference.compareAndSet(110,100,atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1);}});Thread tsf2 = new Thread(new Runnable() {@Overridepublic void run() {int stamp = atomicStampedReference.getStamp();try {TimeUnit.SECONDS.sleep(2); //线程tsf1执行完} catch (InterruptedException e) {e.printStackTrace();}System.out.println("AtomicStampedReference:" +atomicStampedReference.compareAndSet(100,120,stamp,stamp + 1));}});tsf1.start();tsf2.start();}}
AQS(AbstractQueuedSynchronizer)
维护一个volatile int state(代表共享资源状态)和一个FIFO线程等待队列。
模板方法基本分为三类:
- 独占锁
- 共享锁
- 释放锁
资源共享的方式
- Exclusive(独占,只有一个线程能执行,如ReentrantLock)
- Share(共享,多个线程可以同时执行,如Semaphore/CountDownLatch)
同步队列
AQS依靠同步队列(一个FIFO的双向队列)来完成同步状态的管理。当当前线程获取状态失败后,同步器会将当前线程以及等待信息构造成一个节点(Node),并尝试将他加入到同步队列。Head节点不保存等待的线程信息,仅通过next指向队列中第一个保存等待线程信息的Node。
双向同步队列
Node类
源码(中字注释)
static final class Node {/** 代表共享模式 */static final Node SHARED = new Node();/** 代表独占模式 */static final Node EXCLUSIVE = null;/** 以下四个状态解释见下文等待状态 */static final int CANCELLED = 1;static final int SIGNAL = -1;static final int CONDITION = -2;static final int PROPAGATE = -3;/** 标识等待状态,通过CAS操作更新,原子操作不会被打断*/volatile int waitStatus;/** 当前节点的前置节点 */volatile Node prev;/** 当前节点的后置节点 */volatile Node next;/** 该节点关联的线程(未能获取锁,进入等待的线程) */volatile Thread thread;/** 指向下一个在某个条件上等待的节点,或者指向 SHARE 节点,表明当前处于共享模式*/Node nextWaiter;/*** 判断是否处于共享模式*/final boolean isShared() {return nextWaiter == SHARED;}/** * 返回当前节点的前置节点 * 会做对前置节点空值判断 */final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node() { // Used to establish initial head or SHARED marker}Node(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}
}
等待状态:
等待状态的修改是CAS原子操作
- CANCELED: 1,因为等待超时 (timeout)或者中断(interrupt),节点会被置为取消状态。处于取消状态的节点不会再去竞争锁,也就是说不会再被阻塞。节点会一直保持取消状态,而不会转换为其他状态。处于 CANCELED 的节点会被移出队列,被 GC 回收。
- SIGNAL: -1,表明当前的后继结点正在或者将要被阻塞(通过使用 LockSupport.pack 方法),因此当前的节点被释放(release)或者被取消时(cancel)时,要唤醒它的后继结点(通过 LockSupport.unpark 方法)。
- CONDITION: -2,表明当前节点在条件队列中,因为等待某个条件而被阻塞。
- PROPAGATE: -3,在共享模式下,可以认为资源有多个,因此当前线程被唤醒之后,可能还有剩余的资源可以唤醒其他线程。该状态用来表明后续节点会传播唤醒的操作。需要注意的是只有头节点才可以设置为该状态(This is set (for head node only) in doReleaseShared to ensure propagation continues, even if other operations have since intervened.)。
- 0:新创建的节点会处于这种状态
锁的获取与释放:
获取独占锁
获取独占锁
- acquire方法
public final void acquire(int arg) {// 首先尝试获取锁,如果获取失败,会先调用 addWaiter 方法创建节点并追加到队列尾部// 然后调用 acquireQueued 阻塞或者循环尝试获取锁if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){// 在 acquireQueued 中,如果线程是因为中断而退出的阻塞状态会返回 true// 这里的 selfInterrupt 主要是为了恢复线程的中断状态selfInterrupt();}
}
释放独占锁
释放独占锁
在独占模式中,锁的释放由于没有其他线程竞争,相对简单。锁释放失败的原因是由于该线程本身不拥有锁,而非多线程竞争。锁释放成功后会检查后置节点的状态,找到合适的节点,调用unparkSuccessor方法唤醒该节点所关联的线程。
- release方法
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;// waitStatus 为 0,证明是初始化的空队列或者后继结点已经被唤醒了if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}
获取共享锁
获取共享锁
-
acquireShared方法
通过该方法可以申请锁
public final void acquireShared(int arg) {// 如果返回结果小于0,证明没有获取到共享资源if (tryAcquireShared(arg) < 0)doAcquireShared(arg);
}
- doAcquireShared
private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}
释放共享锁
作者:RealityVibe
链接:https://www.jianshu.com/p/2a48778871a9
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。