ReentrantLock与Synchronized区别在于后者是JVM实现,前者是JDK实现,属于Java对象,使用的时候必须有明确的加锁(Lock)和解锁(Release)方法,否则可能会造成死锁。
先来查看ReentrantLock的继承关系(下图),实现了Lock和Serializable接口,表明ReentrantLock对象是可序列化的。
同时在ReentrantLock内部还定义了三个重要的内部类,Sync继承自抽象类AbstractQueuedSynchronizer(队列同步器)。其后又分别定义了它的两个子类公平锁FairSync和非公平锁NonfairSync。
/*** Base of synchronization control for this lock. Subclassed* into fair and nonfair versions below. Uses AQS state to* represent the number of holds on the lock.*/abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = -5179523762034025860L;/*** Performs {@link Lock#lock}. The main reason for subclassing* is to allow fast path for nonfair version.*/abstract void lock();/*** Performs non-fair tryLock. tryAcquire is implemented in* subclasses, but both need nonfair try for trylock method.*/final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}protected final boolean isHeldExclusively() {// While we must in general read state before owner,// we don't need to do so to check if current thread is ownerreturn getExclusiveOwnerThread() == Thread.currentThread();}final ConditionObject newCondition() {return new ConditionObject();}// Methods relayed from outer class final Thread getOwner() {return getState() == 0 ? null : getExclusiveOwnerThread();}final int getHoldCount() {return isHeldExclusively() ? getState() : 0;}final boolean isLocked() {return getState() != 0;}/*** Reconstitutes the instance from a stream (that is, deserializes it).*/private void readObject(java.io.ObjectInputStream s)throws java.io.IOException, ClassNotFoundException {s.defaultReadObject();setState(0); // reset to unlocked state }}/*** Sync object for non-fair locks*/static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock. Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}/*** Sync object for fair locks*/static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}/*** Fair version of tryAcquire. Don't grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}
然后先看一下ReentrantLock的构造函数:
public ReentrantLock() {sync = new NonfairSync();}/*** Creates an instance of {@code ReentrantLock} with the* given fairness policy.** @param fair {@code true} if this lock should use a fair ordering policy*/public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}
ReentrantLock():无参构造器,默认的是非公平锁。
ReentrantLock(boolean):有参构造器,根据参数指定公平锁还是非公平锁。
从这里可以看出,ReentrantLock其实既可以是公平锁也可以是非公平锁,通过参数来进行自定义。
然后我们看一下加锁方法Lock:
public void lock() {sync.lock();}
内部是调用了构造器中创建的Sync对象,由于默认的是非公平锁,因此我们先来看一下非公平锁的实现。
final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}
从方法名compareAndSetState可以看出这是一个CAS操作,我们点进去查看源码,这是在AbstractQueuedSynchronized里面定义的一个方法
protected final boolean compareAndSetState(int expect, int update) {// See below for intrinsics setup to support thisreturn unsafe.compareAndSwapInt(this, stateOffset, expect, update);}
通过Unsafe对象来进行CAS操作。由于Unsafe里面定义的是Native方法,通过其他语言实现了对内存的直接操作,因此是保证了线程安全的。
然后我们再来看操作成功后的代码:setExclusiveOwnerThread(Thread.currentThread());
protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;}
这个方法的实现是在AbstractQueuedSynchronized的父类AbstractOwnableSynchronized中进行的,只是记录了当前拥有锁的线程。由于我们在if判断中已经获取到了锁,因此这一步也是线程安全的。由此,非公平锁获取结束。
然后我们再看看如果获取锁失败后的执行方法:acquire(1);获取锁失败,则说明现在已经有其他线程获取到了锁,并且正在执行代码块里面的内容。我们假设这个线程为B。
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
这个方法是被定义在AbstractQueuedSynchronized中,里面只有三行代码,但是需要注意,在这里的时候就可能会发生同步执行。
首先看最下面的如果条件成立执行的方法:selfInterrupt()
static void selfInterrupt() {Thread.currentThread().interrupt();}
这个方法很简单,令当前线程中断。但需要注意的是,这个中断只是把线程里面的中断标志位改为true,并没有实际的对线程进行阻塞。线程阻塞已经在上面的两个判断条件里面完成了。
然后我们再来看下上面的判断条件:
首先是tryAcquire(arg),调用非公平锁的tryAcquire(int),里面又调用了Sync的nonfairTryAcquire(int)方法,通过判断当前的锁状态是否等于0,等于则表示没有线程获取锁(实际有可能是线程B已经执行完成并已经释放锁),再次尝试用CAS操作获取锁,获取成功则返回true,并且记录当前线程。如果获取失败,或者锁状态不等于0,则表示已经有线程获取到锁,此时会比较记录的线程是否为当前线程,如果是,则表示是当前线程重入(这里可以看出ReentrantLock是可重入锁),再令锁状态state加1,返回true,否则没有获取到锁返回false。
在这里我们可以看到tryAcquire()目的是再次判断当前锁是否是可获取状态(线程B已经执行完成并释放锁)以及是否是同一个线程的重入操作。获取锁成功或者是线程重入则返回true,lock方法就此结束。否则继续执行第二个条件判断。
static final class NonfairSync extends Sync {protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}abstract static class Sync extends AbstractQueuedSynchronizer {final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}
再次获取锁失败后,会通过addWaiter()将当前线程添加到FIFO队列中。
在AQS(队列同步器)中通过Node内部类来制定一个双向链表,此链表采取的是先进先出(FIFO)策略。同时定义了一个头结点head和尾节点tail,都使用关键字volatile来保证多线程的可见性。
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode); //创建新的节点// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) { //判断当前尾节点是否等于nullnode.prev = pred;if (compareAndSetTail(pred, node)) { //尾节点不等于null,通过CAS操作将当前节点替换为链表尾节点。替换成功令当前节点作为前一个节点的next节点。替换失败则说明有其他线程正在操作,进入enq进行操作。pred.next = node; return node; //操作成功,返回当前节点。}}enq(node); //自旋获取锁return node;}private final boolean compareAndSetTail(Node expect, Node update) {return unsafe.compareAndSwapObject(this, tailOffset, expect, update); //这里是通过偏移量上值比较来进行值替换。}
//进入这个方法有两种可能,一是当前链表没有初始化,等于null,二是当前线程与其他线程竞争添加线程到尾节点失败。private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node())) //当前链表没有初始化,先进行初始化。添加一个新节点作为头节点(代表的是当前正在执行的线程),初始化成功,则令首尾节点都等于该节点。初始化失败,说明已经有其他线程进行了初始化。进入下一个循环。tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) { //当前链表中已经初始化过,将新节点添加在链表末尾,添加成功则返回新节点,添加失败说明有其他线程在竞争添加,进入下一个循环,直到操作成功,当前线程被添加进队列中。t.next = node;return t;}}}}
新的线程被添加到队列里面后,再调用方法acquireQueue(Node,int);
这里可以简单的理解,线程自旋,如果当前线程的前一个节点是头节点(头结点代表获取到锁且正在执行的线程),说明下一个移出队列参与竞争锁的线程是当前线程,再次尝试获取锁,获取到了说明前一个节点已经执行完,令当前节点替换头节点,并返回中断标志位false。
获取锁失败说明上一个线程仍未执行完,或者锁被其他线程竞争到(新建的线程尚未添加到队列中,可以参与锁竞争),同时如果当前线程的上一个节点不是头节点(说明下一个移出队列竞争锁的线程不是当前线程),都会将线程节点的前一个节点的标志位设置为SIGNAL(表示下一个节点需要被unparking),然后令当前线程中断,暂停循环,等待唤醒。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) { //自旋final Node p = node.predecessor(); //获取当前节点的前一个节点if (p == head && tryAcquire(arg)) { //如果前一个节点是头节点,说明当前线程是下一个执行的线程,再次尝试获取锁,获取成功则将当前节点作为头节点,去掉后面的所有节点。setHead(node);p.next = null; // help GCfailed = false;return interrupted; //获取锁成功返回false;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) //不会一直循环下去,因为会不断地消耗资源,适时会进入中断,等待被唤醒后才继续自旋。interrupted = true;}} finally {if (failed)cancelAcquire(node);}}final Node predecessor() throws NullPointerException { //获取当前节点的前节点Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus; //在这里Node的线程状态一共有5种情况:SIGNAL=-1,CANCELLED=1,CONDITION=-2,PROPAGATE=-3,以及默认值0if (ws == Node.SIGNAL) //SIGNAL表示唤醒状态/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;if (ws > 0) { //大于0的只有CANCELLED情况,当前线程被取消执行,因此从队列中剔除/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else { //剩下的不论什么情况,都会利用CAS操作尝试将节点的waitStatus改为SINGAL,不论操作成功还是失败,都会返回false/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}private static final boolean compareAndSetWaitStatus(Node node,int expect,int update) { //利用CAS操作修改节点的waitStatus值return unsafe.compareAndSwapInt(node, waitStatusOffset,expect, update);}private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}private void cancelAcquire(Node node) {// Ignore if node doesn't existif (node == null)return;node.thread = null;// Skip cancelled predecessorsNode pred = node.prev;while (pred.waitStatus > 0)node.prev = pred = pred.prev;// predNext is the apparent node to unsplice. CASes below will// fail if not, in which case, we lost race vs another cancel// or signal, so no further action is necessary.Node predNext = pred.next;// Can use unconditional write instead of CAS here.// After this atomic step, other Nodes can skip past us.// Before, we are free of interference from other threads.node.waitStatus = Node.CANCELLED;// If we are the tail, remove ourselves.if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);} else {// If successor needs signal, try to set pred's next-link// so it will get one. Otherwise wake it up to propagate.int ws;if (pred != head &&((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {Node next = node.next;if (next != null && next.waitStatus <= 0)compareAndSetNext(pred, predNext, next);} else {unparkSuccessor(node);}node.next = node; // help GC }}
由此,整个非公平锁的加锁过程结束,总结一下:
1.如果state位是0,则表示没有线程获取对象锁,通过CAS操作设置state位从0到1,尝试获取锁
2.获取锁成功,记录当前获取锁的线程。流程结束。
3.获取失败,判断是否是已经获取了锁的线程再次获取(通过第二步里面记录的线程与当前线程判断是否相等),如果是,令state再加1,流程结束。
4.如果不是,将线程添加到FIFO链表队列中,然后进行自旋。
5.自旋时会判断当前线程是否是head节点的next,如果是则再次尝试获取锁,获取到了后将头节点替换为当前节点,返回false。流程结束。
6.自旋一定次数后仍未获取到锁,或当前线程节点不是下一个参与竞争锁的线程,则进入中断。等待被唤醒后继续自旋。
公平锁的Lock()方法:
final void lock() {acquire(1);}
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() && //公平锁与非公平锁的加锁区别compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}
公平锁与非公平锁的加锁方法区别在于,tryAcquire(int)方法的不同。公平锁中要判断队列里第一个线程是否是当前线程,如果是,则允许它获取锁,如果不是,则不能获取。
下面看一下解锁方法:unlock()
public void unlock() {sync.release(1);}
内部不分公平锁与非公平锁,一律调用AbstractQueuedSynchronized方法的release(int)。
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
先看第一行的判断:if(tryRelease(arg))
里面先判断了锁指向的线程与当前线程相等,不相等则抛出异常。
再令status减1,判断结果是否等于0。等于0说明可以释放锁,将锁指向的线程改为null,status改为0,返回true。
不等于0则说明仍未全部执行完重入的操作,令status自减一,返回false。
protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}
判断为false则释放锁失败,判断为true则继续执行if里面内容。
if里面主要是判断了链表队列head里面有等待唤醒的其他线程节点,对他们进行一个唤醒。
private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0); //将waitStatus赋值为初始状态0/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) { //下一个节点等于null或者被取消执行,从尾节点开始向前遍历,找到最头位置上的节点s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null) //唤醒队列中的下一个可执行的节点。LockSupport.unpark(s.thread);}
解锁流程总结:
1.锁指向的线程与当前线程必须是同一个线程。
2.锁标志位status必须已经减到0。
3.判断链表队列不等于null,并且头节点的waitStatus标志位不等于0,需要唤醒下一个节点。否则返回true,业务结束。
4.唤醒下一个节点首先利用CAS操作将waitStatus的标志位改为0,然后再按队列顺序获取下一个节点。
5.如果获取的新节点等于null,或者waitStatus位等于1(表示已经被取消执行),则从尾节点向前遍历,直到遇见最前面的非null非当前线程节点的节点。
6.唤醒获取的新节点。业务结束。