案例
Semaphore,俗称信号量,它是操作系统中PV操作的原语在java的实现,它也是基于 AbstractQueuedSynchronizer实现的
private static ExecutorService threadPool = Executors.newFixedThreadPool(4);private static Semaphore semaphore = new Semaphore(2);public static void main(String[] args) {for ( ; ; ) {threadPool.execute(()-> exec());}}public static void exec() {try {semaphore.acquire();System.out.println(Thread.currentThread().getName() + " before");TimeUnit.SECONDS.sleep(1);System.out.println("执行任务");System.out.println(Thread.currentThread().getName() + " after");} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release();}}
从执行结果来看的话,先是2个线程获取到凭证,然后执行完毕。后续两个线程才开始获取凭证。
pool-1-thread-1 before
pool-1-thread-2 before
执行任务
执行任务
pool-1-thread-2 after
pool-1-thread-1 after
pool-1-thread-3 before
pool-1-thread-4 before
执行任务
执行任务
pool-1-thread-3 after
pool-1-thread-4 after
应用场景
Semaphore的使用场景主要用于一些中间件的时候,进行限流使用。
源码解析
构造方法
默认是非公平锁,可以通过构造参数进行设置。本篇主要介绍非公平锁的实现方式。
public Semaphore(int permits) {sync = new NonfairSync(permits);}public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}
// 非公平锁static final class NonfairSync extends Sync {private static final long serialVersionUID = -2694183684443567898L;NonfairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}}
Sync(int permits) {setState(permits);}// 设置state为构造方法的数值protected final void setState(int newState) {state = newState;}
获取凭证
semaphore.acquire();
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);}
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {//是否中断if (Thread.interrupted())throw new InterruptedException();// 线程等待if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
T1线程直接获取锁,返回。T2线程也可以获取,但是T3线程进入的时候 state=0,获取不到锁。就会进入到 doAcquireSharedInterruptibly 这个逻辑中
final int nonfairTryAcquireShared(int acquires) {for (;;) {获取当前state的值int available = getState();int remaining = available - acquires;if (remaining < 0 ||// //cas操作compareAndSetState(available, remaining))return remaining;}}
doAcquireSharedInterruptibly 其实就是将当前线程封装成一个Node节点,添加到AQS队列中。 shouldParkAfterFailedAcquire 会进行阻塞。
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// 封装成一个node 加入AQS队列中 共享模式final Node node = addWaiter(Node.SHARED);boolean failed = true;try {//自选锁for (;;) {final Node p = node.predecessor();if (p == head) {// state 不等于0 返回-1int r = tryAcquireShared(arg);// 第一次不会进入if (r >= 0) {// // 2. 这里将唤醒t3的后续节点t4,以此类推,t4被唤醒后,会在t4的await中唤醒t4的后续节点setHeadAndPropagate(node, r);// t3节点删除p.next = null; // help GCfailed = false;return;}}// 修改前驱节点waitstate = -1 挂起当前线程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}
释放凭证
public void release() {sync.releaseShared(1);}
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}
获取当前state的值,然后将state+=1 操作。
protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}
释放资源。 unparkSuccessor(h); 会将T3线程进行唤醒。然后T3线程会尝试唤醒T4 (共享模式)。如果有资源的话,就获取锁,没有的话就会阻塞。
private void doReleaseShared() {// 自选锁for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;// 前面已经将pre节点 设置为-1if (ws == Node.SIGNAL) {// 设置为0 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck cases// 唤醒head的后继节点unparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}
小结
通过代码进一步分析 可以更加了解Semaphore的原理。