一、 同步框架AbstractQueuedSynchronizer
Java并发编程核心在于java.concurrent.util包
而juc当中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是基于AbstractQueuedSynchronizer简称AQS,AQS定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器,主要结构是双向链表的FIFO队列(尾插法),如果线程抢不到,就进这个链表排队,之后再等待被唤醒;
AQS具备特性
1、阻塞等待队列
2、共享/独占
3、公平/非公平
4、可重入
5、允许中断
二、并发编程包依赖于AQS的内部实现
Java.concurrent.util当中同步器的实现如Lock,Latch,Barrier等,都是基于AQS框架实现
一般通过定义内部类Sync继承AQS,将同步器所有调用都映射到Sync对应的方法
三、AQS框架-管理状态
1、AQS内部维护属性volatile int state (32位)
state表示资源的可用状态
2、State三种访问方式
getState()、setState()、compareAndSetState()
3、AQS定义两种资源共享方式
Exclusive-独占,只有一个线程能执行,如ReentrantLock
Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch
4、AQS定义两种队列
同步等待队列
条件等待队列
四、ReentrantLock
这里 state 代表 :同一线程占用的次数
应用场景:
虽然允许多个线程进行访问,但是通过state机制保证同一时刻只有一个线程能获取资源。
可以替代synchronized关键字,提供更灵活的加锁和解锁操作,支持公平性和非公平性,可重入。
底层原理:
支持可重入性:reentrantLock.lock(); 同一个线程可用多次,每一次state就加1;同理reentrantLock.unlock(); 每次给state 减1,减到0就允许其他线程来抢。
state>0 代表有锁状态,state=0代表无锁状态;
任务开始前调用 reentrantLock.lock(); state就加1,表示同个线程占用的次数
任务结束后调用 reentrantLock.unlock(); state就减1
ReentrantLock reentrantLock = new ReentrantLock(false);public static void print() {reentrantLock.lock(); //给state 加1System.out.println(Thread.currentThread().getName());try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}finally {reentrantLock.unlock(); //给state 减1}}
源码中这一段取决了锁是不是 可重入的
源码中,这里会判断是不是当前线程在获取资源,是的话就加起来,再set;
而其他不可重入的锁没有这个逻辑,甚至不用去记录是不是当前线程获取的,所以不可重入
//源码中,这里会判断是不是当前线程在获取资源,是的话就加起来,再set;
//而其他不可重入的锁没有这个逻辑,甚至不用去记录是不是当前线程获取的,所以不可重入
else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) throw new Error("Maximum lock count exceeded");setState(nextc);return true;
}
公平与非公平的区别
1)公平锁的实现:调用acquire(1)函数,其实公平锁也没有那么老实,这里一上来还是会去尝试抢一下锁的 tryAcquire(arg),抢不到了才入队列排队acquireQueued(),再中断自己的运行selfInterrupt();
static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}
//其实公平锁也没有那么老实,这里一上来还是会去尝试抢一下锁的 tryAcquire(arg),抢不到了才入队列排队acquireQueued(),中断自己的运行selfInterrupt();
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
2)非公平锁的实现:
先尝试抢占锁,就是把state从0改成1, compareAndSetState(0, 1),然后将当前的执行线程改成自己 setExclusiveOwnerThread(Thread.currentThread())
失败的话就调用公平锁的流程acquire(1); 但还是一样,要先尝试抢一下锁,抢不到才乖乖去排队,所以它其实前后尝试了两次抢锁,这个刺头
为什么要插队呢?因为插队可以减少cpu去等待队列中唤醒线程的时间代价
只有在别人刚好完成任务退出后,cup要去队列中唤醒线程的那一瞬间有机会被插队成功
static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;final void lock() {//先尝试抢占锁把state从0改成1,然后将当前的执行线程改成自己if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}
唤醒规则:
不管是不是公平锁,唤醒方式是用的同一套方法;
一般是唤醒队列中排队的头节点线程,但如果该线程任务状态异常,就是waitStatus >0,那它就会从尾部开始找一个正常状态的线程任务去唤醒;
这里不清楚它为什么不直接清理掉头节点,然后再顺序找下去,居然去尾部找
private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev) //从尾巴倒着找到第一个正常的if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
五、 CountDownLatch
这里 state 代表 :还有多少线程能拿到资源,资源被拿走后不还回去,一次性
static CountDownLatch countDownLatch = new CountDownLatch(5);
新建时,初始化state = 某值 ;
每个线程完成任务后,调用 CountDownLatch.countDown() 将state减1;
调用 CountDownLatch.await() 的主线程会被阻塞,直到state 减到0,也就是所有任务完成了,主线程才往下走;
六、Semaphore
这里 state 代表 :最多有多少线程同时能拿到资源,且资源被拿走后可以还回去给别的线程拿,动态变化
static Semaphore semaphore = new Semaphore(6);
新建时,初始化state = 某值 ;
任务开始前先调用semaphore.acquire() 给state 减1,表示占用了一个资源
任务完成后调用semaphore.release(); 给state 加1,表示把资源还回去
减到state = 0 时,则表示资源不够了,要抢的线程进队列排队
七、CyclicBarrier
这里 state 代表 :还有多少线程能拿到资源,但资源被拿走后统一在最后结束还回去,一波流
CyclicBarrier cyclicBarrier = new CyclicBarrier(7)
新建时,初始化state = 某值;
每次任务执行完调用cyclicBarrier.await(); state减1;
减到state = 0 时,表示所有线程都干完活了,所有等待的线程继续执行,包括主线程。然后重新赋值 state=初始值;
允许一组线程互相等待,直到到达某个公共屏障点。