Java 中 的 Semaphore (信号量) 是多线程编程中一种重要的同步工具, 用于控制对共享资源的访问。
通过 Semaphore, 我们可以限制同时访问共享资源的线程数量, 有效地管理并发访问, 确保程序在多线程环境下的稳定性和效率。
在一些资源有限制场景下, Semaphore 是特别合适的, 比如流量控制, 数据库连接池等。
1 Semaphore 的构造方法
public class Semaphore implements java.io.Serializable {public Semaphore(int permits) {// permits 同时可以有多少资源可以获取, 默认的实现为非公平实现sync = new NonfairSync(permits);}public Semaphore(int permits, boolean fair) {// 参数 2 可以配置是否为公平实现sync = fair ? new FairSync(permits) : new NonfairSync(permits);}
}
通过上面的构造函数可以看出, Semaphore 最终还是依靠 AQS 实现的, 关于 AQS 就不展开了。
核心的思想就是传入一个整数, 表示资源的数量, 同时提供了一个布尔值, 来决定资源的竞争是否按照公平原则。
2 Semaphore 的主要方法
方法名 | 说明 |
---|---|
void acquire() throws InterruptedException | 获取一个资源,如果无法获取到, 则阻塞等待直至能够获取为止 |
void acquire(int permits) throws InterruptedException | 同 acquire 方法功能基本一样, 只不过该方法可以一次性获取多个资源 |
void release() | 释放一个资源 |
void release(int permits) | 同 release 方法功能基本一样, 只不过该方法可以一次性释放多个资源 |
boolean tryAcquire() | 尝试获取一个资源, 如果能够获取成功则立即返回 true, 否则, 则返回 false |
boolean tryAcquire(int permits) | 与 tryAcquire 方法一致, 只不过这里可以指定获取多个资源 |
boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException | 尝试获取一个资源, 如果能够立即获取到或者在指定时间内能够获取到, 则返回 true, 否则返回 false |
boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException | 与上一个方法一致, 只不过这里能够获取多个资源 |
int availablePermits() | 返回当前可用的资源个数 |
int getQueueLength() | 返回正在等待获取资源的线程数 |
boolean hasQueuedThreads() | 是否有线程正在等待获取资源 |
Collection<Thread> getQueuedThreads() | 获取所有正在等待资源的线程集合 |
下面用一个简单的例子来说明 Semaphore 的具体使用。 我们来模拟这样一样场景。
有一天, 班主任需要班上 10 个同学到讲台上来填写一个表格, 但是老师只准备了 5 支笔, 因此, 只能保证同时只有 5
个同学能够拿到笔并填写表格,
没有获取到笔的同学只能够等前面的同学用完之后, 才能拿到笔去填写表格。该示例代码如下:
public class SemaphoreDemo {// 表示老师只有 10 支笔private static Semaphore semaphore = new Semaphore(5);public static void main(String[] args) {// 表示 10 个学生ExecutorService service = Executors.newFixedThreadPool(10);for (int i = 0; i < 10; i++) {service.execute(() -> {try {System.out.println(Thread.currentThread().getName() + " 同学准备获取笔......");semaphore.acquire();System.out.println(Thread.currentThread().getName() + " 同学获取到笔");System.out.println(Thread.currentThread().getName() + " 填写表格ing.....");TimeUnit.SECONDS.sleep(3);semaphore.release();System.out.println(Thread.currentThread().getName() + " 填写完表格, 归还了笔!!!!!!");} catch (InterruptedException e) {e.printStackTrace();}});}service.shutdown();}
}
输出结果
pool-1-thread-1 同学准备获取笔......
pool-1-thread-1 同学获取到笔
pool-1-thread-1 填写表格ing.....
pool-1-thread-2 同学准备获取笔......
pool-1-thread-2 同学获取到笔
pool-1-thread-2 填写表格ing.....
pool-1-thread-3 同学准备获取笔......
pool-1-thread-4 同学准备获取笔......
pool-1-thread-3 同学获取到笔
pool-1-thread-4 同学获取到笔
pool-1-thread-4 填写表格ing.....
pool-1-thread-3 填写表格ing.....
pool-1-thread-5 同学准备获取笔......
pool-1-thread-5 同学获取到笔
pool-1-thread-5 填写表格ing.....
pool-1-thread-6 同学准备获取笔......
pool-1-thread-7 同学准备获取笔......
pool-1-thread-8 同学准备获取笔......
pool-1-thread-9 同学准备获取笔......
pool-1-thread-10 同学准备获取笔......pool-1-thread-4 填写完表格, 归还了笔!!!!!!
pool-1-thread-9 同学获取到笔
pool-1-thread-9 填写表格ing.....
pool-1-thread-5 填写完表格, 归还了笔!!!!!!
pool-1-thread-7 同学获取到笔
pool-1-thread-7 填写表格ing.....
pool-1-thread-8 同学获取到笔
pool-1-thread-8 填写表格ing.....
pool-1-thread-1 填写完表格, 归还了笔!!!!!!
pool-1-thread-6 同学获取到笔
pool-1-thread-6 填写表格ing.....
pool-1-thread-3 填写完表格, 归还了笔!!!!!!
pool-1-thread-2 填写完表格, 归还了笔!!!!!!
pool-1-thread-10 同学获取到笔
pool-1-thread-10 填写表格ing.....
pool-1-thread-7 填写完表格, 归还了笔!!!!!!
pool-1-thread-9 填写完表格, 归还了笔!!!!!!
pool-1-thread-8 填写完表格, 归还了笔!!!!!!
pool-1-thread-6 填写完表格, 归还了笔!!!!!!
pool-1-thread-10 填写完表格, 归还了笔!!!!!!
根据输出结果进行分析, Semaphore 允许的最大资源为 5, 也就是允许的最大并发执行的线程个数为 5, 可以看出, 前 5 个线程(前 5 个学生)先获取到笔, 然后填写表格。
而 6-10 这 5 个线程, 由于获取不到资源, 只能阻塞等待。当线程 pool-1-thread-4 释放了资源后, pool-1-thread-9 就可以获取到许可, 继续往下执行,
对其他线程的执行过程, 也是同样的道理。 从这个例子就可以看出, Semaphore 用来做特殊资源的并发访问控制是相当合适的, 如果有业务场景需要进行流量控制, 可以优先考虑 Semaphore。
3 Semaphore 的源码实现
Semaphore 内部是通过 AQS 的共享锁实现的, 所以只要理解了 Semaphore 的同步器, 基本就能了解大体的实现了。
3.1 Semaphore 中的 同步器
public class Semaphore implements java.io.Serializable {/*** 内部定义的同步器*/abstract static class Sync extends AbstractQueuedSynchronizer {Sync(int permits) {setState(permits);}final int nonfairTryAcquireShared(int acquires) {// 非公平的获取锁for (;;) {// 当前的状态int available = getState();// 当前的状态 - 需要的状态, 得到剩下的状态int remaining = available - acquires;// remaining 小于 0 或者通 cas 设置为新的状态if (remaining < 0 || compareAndSetState(available, remaining))return remaining;}}protected final boolean tryReleaseShared(int releases) {// 共享锁的释放for (;;) {// 当前的状态int current = getState();// 增加释放的值int next = current + releases;// 增加后的值还是小于当前的状态值, 抛出异常if (next < current) throw new Error("Maximum permit count exceeded");// 通过 cas 设置, 成功了释放锁成功 if (compareAndSetState(current, next))return true;}}final void reducePermits(int reductions) {// 减少许可证, 即减少状态值for (;;) {// 获取当前的状态int current = getState();// 计算出新的状态值int next = current - reductions;// 新的状态值大于当前的状态值if (next > current)throw new Error("Permit count underflow");// cas 交换 if (compareAndSetState(current, next))return;}}final int drainPermits() {// 将当前的状态值设置为 0 for (;;) {// 获取当前的状态值int current = getState();// 当前的状态值等于 0 了// 或者通过 cas 将当前的状态值设置为 0if (current == 0 || compareAndSetState(current, 0))return current;}}}/*** 非公平锁的实现* 很简单, 全部都是直接复用 Sync 的方法*/static final class NonfairSync extends Sync {NonfairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}}/*** 公平锁的实现*/static final class FairSync extends Sync {// 尝试获取共享锁// 这里是公平锁的实现, 而非公平锁的实现, 就是直接调用 nonfairTryAcquireShared 方法// FairSync 和 NonfairSync 只是在 Sync 的基础上重写了这个方法, 没有其他的改变了protected int tryAcquireShared(int acquires) {for (;;) {// 调用 AbstractQueuedSynchronizer 的 hasQueuedPredecessors 方法, // 判断当前同步队列中是否有符合条件的候选节点, 即同步队列中有没有状态不是取消状态的节点, // 有的话, 返回 -1, 尝试获取锁失败if (hasQueuedPredecessors())return -1;// 获取可用的状态 int available = getState();// 可用的状态 - 需要的状态, 得到剩下的状态int remaining = available - acquires;// 如果剩余的状态小于 0 了 获取通过 cas 设置新的状态失败if (remaining < 0 || compareAndSetState(available, remaining))// 返回剩余的状态return remaining;}}}}
3.2 Semaphore 的 acquire 方法
public class Semaphore implements java.io.Serializable {public void acquire() throws InterruptedException {// 1. 先调用到 AbstractQueuedSynchronizer 的 acquireSharedInterruptibly// 2. 在 AQS 的 acquireSharedInterruptibly 中先通过 Semaphore 自定义的 Sync 的 tryAcquireShared() 方法判断是否可以获取锁// 在 tryAcquireShared 方法获取当前的状态值, 通过当前的状态值 - 需要获取的状态值, 得到剩余的状态值// 如果剩余的状态值小于 0, 否则通过 cas 交换当前的状态值为剩余值// 最后返回剩余值// 3. 获取锁失败后, 会加入同步队列, 等待唤醒sync.acquireSharedInterruptibly(1);}
}
3.3 Semaphore 的 release 方法
public class Semaphore implements java.io.Serializable {public void release() {// 1. 先调用到 AbstractQueuedSynchronizer 的 releaseShared// 2. 在 AQS 的 releaseShared 中先通过 CountDownLatch 自定义的 Sync 的 tryReleaseShared() 方法判断是否可以释放锁// 在 tryReleaseShared 方法中, 获取到当前的状态值, 当前的状态值 + 释放的状态值, 得到最新的状态值// 通过 cas 设置当前的状态值为最新的状态值, 释放锁成功sync.releaseShared(1);}
}
几乎所有的方法都是基于同步器 AQS 实现的, 所有理解了 AQS 的实现, Semaphore 的实现也就不难理解了。
至于其他的方法, 比如 tryAcquire, tryRelease, availablePermits 等方法也都是同样的思想, 这里就不再赘述了。
4 参考
大白话说Java并发工具类-Semaphore, Exchanger