分析下SemaPhore吧,也是基于AQS实现的,对并发进行控制的工具类,看下其怎么实现的,
Semaphore semaphore = new Semaphore(3);semaphore.acquire();semaphore.release();
Semaphore 常用于控制并发量,比如这里设置为3,就可以只有三个线程可以acquire拿到资源,后续来的线程需要排队,等原有线程release释放之后,才可以接入新的请求,用于控制最大并发。
acquire 实现
// 默认非公平的
public Semaphore(int permits) {sync = new NonfairSync(permits);
}
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 如果获取不到,走的下面阻塞进行入等待队列if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}
// 执行的AQS的获取资源
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// 添加共享节点final Node node = addWaiter(Node.SHARED);try {for (;;) {// 死循环判断,park之后唤醒,还是走这里final Node p = node.predecessor();// 如果前面是头节点的话if (p == head) {// 执行的子类实现的尝试方法int r = tryAcquireShared(arg);// 获取成功的话if (r >= 0) {// 对其进行唤醒setHeadAndPropagate(node, r);p.next = null; // help GCreturn;}}// 如果不是头节点,判断需要park不,前节点是signal就进行park// park之前检查是不是被打断// 如果第一次不是,会给前节点设置signal,然后下一次再循环到,就park了if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} catch (Throwable t) {cancelAcquire(node);throw t;}
}// 实际获取到锁之后,改头,然后传播,这里是不是传播根据子类返回的是0还是大于0private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);// 大于0,头节点为空(执行完了),状态小于0,// 新的头节点(当前节点)为空,或者状态小于0if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;//如果有后节点为空或者是共享的,释放if (s == null || s.isShared())doReleaseShared();}}private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;// 这里会先把状态改为0,改成功了会是释放,成功释放之后if (ws == Node.SIGNAL) {if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}// 如果为0 改为传播else if (ws == 0 &&!h.compareAndSetWaitStatus(0, Node.PROPAGATE))continue; // loop on failed CAS}// 判断等于头,就是没改变头就breakif (h == head) // loop if head changedbreak;}}
可以看到这是在获取资源,获取不到的时候进入队列等待,默认的是非公平的,去看下怎么实现的
Sync
Semaphore 内部类Sync实现了AQS,看下怎么实现的
abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 1192457210091910933L;// 初始设置的资源数,也是通过stateSync(int permits) {setState(permits);}final int getPermits() {return getState();}// 非公平的获取资源final int nonfairTryAcquireShared(int acquires) {for (;;) {// 获取可用的资源int available = getState();// 如果可用的小于需要获取的int remaining = available - acquires;// 小于0直接返回了,如果不小于0,就cas设置,设置成功就返回对应的值了大于等于0的if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}// 释放资源protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();// 给对应的数量加上释放的数量int next = current + releases;// 释放的不能为负数,也不能超过限制if (next < current) // overflowthrow new Error("Maximum permit count exceeded");// cas设置,成功返回释放完成if (compareAndSetState(current, next))return true;}}// 减去对应的statefinal void reducePermits(int reductions) {for (;;) {int current = getState();int next = current - reductions;if (next > current) // underflowthrow new Error("Permit count underflow");if (compareAndSetState(current, next))return;}}final int drainPermits() {for (;;) {// 是不是为0,不为0的时候尝试设置为0int current = getState();if (current == 0 || compareAndSetState(current, 0))return current;}}
}
// 看下对应的公平锁实现,非公平直接使用Sync的方法获取
static final class FairSync extends Sync {private static final long serialVersionUID = 2014338818796000944L;FairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {for (;;) {// 是不是有在等待的,有就返回-1了,差的就是这个判断if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}
}
可以看到Sync继承AQS之后实现的获取资源方法就是对对应的state进行减,确保其大于等于0,有就可以获取,公平非公平的实现就是判断喜爱是不是有在等待的,有的话直接返回-1,不进行尝试。
release
public void release() {sync.releaseShared(1);
}public final boolean releaseShared(int arg) {// 先改,成功就实际释放if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}
protected final boolean tryReleaseShared(int releases) {for (;;) {// 改了state的值int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}
}
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;// 唤醒后面if (ws == Node.SIGNAL) {if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))continue; // loop to recheck cases// 实际唤醒线程unparkSuccessor(h);}else if (ws == 0 &&!h.compareAndSetWaitStatus(0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}
}// 唤醒线程
private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)node.compareAndSetWaitStatus(ws, 0);// 获取下一个节点,不为空的时候直接唤醒Node s = node.next;// 如果是空或者取消状态的话if (s == null || s.waitStatus > 0) {s = null;// 从后向前遍历,然后唤醒,这里唤醒之后应该去继续拿资源for (Node p = tail; p != node && p != null; p = p.prev)if (p.waitStatus <= 0)s = p;}if (s != null)LockSupport.unpark(s.thread);
}
总结
简单总结下吧,Semaphore 通过AQS的state来控制并发数量,也分为公平和非公平,但是使用的是共享锁,这样就能根据数量进行唤醒,AQS提供的方法tryAcquire 让子类实现的,返回正数代表可以继续向后唤醒,返回0自己得到资源可以执行,就通过这样的形式来控制并发