一、定义
是JUC包下的一个工具类,我们可以通过其限制执行的线程数量,达到限流的效果。
当一个线程执行时先通过其方法进行获取许可操作,获取到许可的线程继续执行业务逻辑,当线程执行完成后进行释放许可操作,未获取达到许可的线程进行等待或者直接结束。
可以把它简单的理解成我们停车场入口立着的那个显示屏,每有一辆车进入停车场显示屏就会显示剩余车位减1,每有一辆车从停车场出去,显示屏上显示的剩余车辆就会加1,当显示屏上的剩余车位为0时,停车场入口的栏杆就不会再打开,车辆就无法进入停车场了,直到有一辆车从停车场出去为止。
二、方法
构造方法
/*** Creates a {@code Semaphore} with the given number of* permits and nonfair fairness setting.** @param permits the initial number of permits available.* This value may be negative, in which case releases* must occur before any acquires will be granted.*/public Semaphore(int permits) {sync = new NonfairSync(permits);}
表示许可线程的数量
/*** Creates a {@code Semaphore} with the given number of* permits and the given fairness setting.** @param permits the initial number of permits available.* This value may be negative, in which case releases* must occur before any acquires will be granted.* @param fair {@code true} if this semaphore will guarantee* first-in first-out granting of permits under contention,* else {@code false}*/public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}
fair表示公平性,如果设置为true,表示公平,那么等待最久的线程先执行
/*** Acquires a permit from this semaphore, blocking until one is* available, or the thread is {@linkplain Thread#interrupt interrupted}.** <p>Acquires a permit, if one is available and returns immediately,* reducing the number of available permits by one.** <p>If no permit is available then the current thread becomes* disabled for thread scheduling purposes and lies dormant until* one of two things happens:* <ul>* <li>Some other thread invokes the {@link #release} method for this* semaphore and the current thread is next to be assigned a permit; or* <li>Some other thread {@linkplain Thread#interrupt interrupts}* the current thread.* </ul>** <p>If the current thread:* <ul>* <li>has its interrupted status set on entry to this method; or* <li>is {@linkplain Thread#interrupt interrupted} while waiting* for a permit,* </ul>* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared.** @throws InterruptedException if the current thread is interrupted*/public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);}
表示线程获取1个许可,那么线程许可数量相应减少一个,获取一个令牌,在获取到令牌、或者被其他线程调用中断之前线程一直处于阻塞状态。
/*** Releases a permit, returning it to the semaphore.** <p>Releases a permit, increasing the number of available permits by* one. If any threads are trying to acquire a permit, then one is* selected and given the permit that was just released. That thread* is (re)enabled for thread scheduling purposes.** <p>There is no requirement that a thread that releases a permit must* have acquired that permit by calling {@link #acquire}.* Correct usage of a semaphore is established by programming convention* in the application.*/public void release() {sync.releaseShared(1);}
表示释放一个许可,那么线程许可数量相应增加,释放一个令牌,唤醒一个获取令牌不成功的阻塞线程。
/*** Releases the given number of permits, returning them to the semaphore.** <p>Releases the given number of permits, increasing the number of* available permits by that amount.* If any threads are trying to acquire permits, then one* is selected and given the permits that were just released.* If the number of available permits satisfies that thread's request* then that thread is (re)enabled for thread scheduling purposes;* otherwise the thread will wait until sufficient permits are available.* If there are still permits available* after this thread's request has been satisfied, then those permits* are assigned in turn to other threads trying to acquire permits.** <p>There is no requirement that a thread that releases a permit must* have acquired that permit by calling {@link Semaphore#acquire acquire}.* Correct usage of a semaphore is established by programming convention* in the application.** @param permits the number of permits to release* @throws IllegalArgumentException if {@code permits} is negative*/public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.releaseShared(permits);}
表示一个线程释放n个许可,这个数量有参数permits决定,获取一个令牌,在获取到令牌、或者被其他线程调用中断、或超时之前线程一直处于阻塞状态。
/*** Acquires the given number of permits from this semaphore,* blocking until all are available,* or the thread is {@linkplain Thread#interrupt interrupted}.** <p>Acquires the given number of permits, if they are available,* and returns immediately, reducing the number of available permits* by the given amount.** <p>If insufficient permits are available then the current thread becomes* disabled for thread scheduling purposes and lies dormant until* one of two things happens:* <ul>* <li>Some other thread invokes one of the {@link #release() release}* methods for this semaphore, the current thread is next to be assigned* permits and the number of available permits satisfies this request; or* <li>Some other thread {@linkplain Thread#interrupt interrupts}* the current thread.* </ul>** <p>If the current thread:* <ul>* <li>has its interrupted status set on entry to this method; or* <li>is {@linkplain Thread#interrupt interrupted} while waiting* for a permit,* </ul>* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared.* Any permits that were to be assigned to this thread are instead* assigned to other threads trying to acquire permits, as if* permits had been made available by a call to {@link #release()}.** @param permits the number of permits to acquire* @throws InterruptedException if the current thread is interrupted* @throws IllegalArgumentException if {@code permits} is negative*/public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits);}
表示一个线程获取n个许可,这个数量有参数permits决定
/*** Returns the current number of permits available in this semaphore.** <p>This method is typically used for debugging and testing purposes.** @return the number of permits available in this semaphore*/public int availablePermits() {return sync.getPermits();}
返回当前信号量线程许可数量
/*** Returns an estimate of the number of threads waiting to acquire.* The value is only an estimate because the number of threads may* change dynamically while this method traverses internal data* structures. This method is designed for use in monitoring of the* system state, not for synchronization control.** @return the estimated number of threads waiting for this lock*/public final int getQueueLength() {return sync.getQueueLength();}
返回等待获取许可的线程数的预估值
/*** Acquires a permit from this semaphore, blocking until one is* available.** <p>Acquires a permit, if one is available and returns immediately,* reducing the number of available permits by one.** <p>If no permit is available then the current thread becomes* disabled for thread scheduling purposes and lies dormant until* some other thread invokes the {@link #release} method for this* semaphore and the current thread is next to be assigned a permit.** <p>If the current thread is {@linkplain Thread#interrupt interrupted}* while waiting for a permit then it will continue to wait, but the* time at which the thread is assigned a permit may change compared to* the time it would have received the permit had no interruption* occurred. When the thread does return from this method its interrupt* status will be set.*/public void acquireUninterruptibly() {sync.acquireShared(1);}
获取一个令牌,在获取到令牌之前线程一直处于阻塞状态(忽略中断)
/*** Acquires a permit from this semaphore, only if one is available at the* time of invocation.** <p>Acquires a permit, if one is available and returns immediately,* with the value {@code true},* reducing the number of available permits by one.** <p>If no permit is available then this method will return* immediately with the value {@code false}.** <p>Even when this semaphore has been set to use a* fair ordering policy, a call to {@code tryAcquire()} <em>will</em>* immediately acquire a permit if one is available, whether or not* other threads are currently waiting.* This "barging" behavior can be useful in certain* circumstances, even though it breaks fairness. If you want to honor* the fairness setting, then use* {@link #tryAcquire(long, TimeUnit) tryAcquire(0, TimeUnit.SECONDS) }* which is almost equivalent (it also detects interruption).** @return {@code true} if a permit was acquired and {@code false}* otherwise*/public boolean tryAcquire() {return sync.nonfairTryAcquireShared(1) >= 0;}
尝试获得令牌,返回获取令牌成功或失败,不阻塞线程。
/*** Acquires a permit from this semaphore, if one becomes available* within the given waiting time and the current thread has not* been {@linkplain Thread#interrupt interrupted}.** <p>Acquires a permit, if one is available and returns immediately,* with the value {@code true},* reducing the number of available permits by one.** <p>If no permit is available then the current thread becomes* disabled for thread scheduling purposes and lies dormant until* one of three things happens:* <ul>* <li>Some other thread invokes the {@link #release} method for this* semaphore and the current thread is next to be assigned a permit; or* <li>Some other thread {@linkplain Thread#interrupt interrupts}* the current thread; or* <li>The specified waiting time elapses.* </ul>** <p>If a permit is acquired then the value {@code true} is returned.** <p>If the current thread:* <ul>* <li>has its interrupted status set on entry to this method; or* <li>is {@linkplain Thread#interrupt interrupted} while waiting* to acquire a permit,* </ul>* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared.** <p>If the specified waiting time elapses then the value {@code false}* is returned. If the time is less than or equal to zero, the method* will not wait at all.** @param timeout the maximum time to wait for a permit* @param unit the time unit of the {@code timeout} argument* @return {@code true} if a permit was acquired and {@code false}* if the waiting time elapsed before a permit was acquired* @throws InterruptedException if the current thread is interrupted*/public boolean tryAcquire(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}
尝试获得令牌,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程。
/*** Queries whether any threads are waiting to acquire. Note that* because cancellations may occur at any time, a {@code true}* return does not guarantee that any other thread will ever* acquire. This method is designed primarily for use in* monitoring of the system state.** @return {@code true} if there may be other threads waiting to* acquire the lock*/public final boolean hasQueuedThreads() {return sync.hasQueuedThreads();}
等待队列里是否还存在等待线程。
/*** Acquires and returns all permits that are immediately available.** @return the number of permits acquired*/public int drainPermits() {return sync.drainPermits();}
清空令牌把可用令牌数置为0,返回清空令牌的数量。
三、使用Semaphore实现停车场指示牌功能
每个停车场入口都有一个提示牌,上面显示着停车场的剩余车位还有多少,当剩余车位为0时,不允许车辆进入停车场,直到停车场里面有车离开停车场,这时提示牌上会显示新的剩余车位数。
业务场景 :
1、停车场容纳总停车量20。
2、当一辆车进入停车场后,显示牌的剩余车位数响应的减1.
3、每有一辆车驶出停车场后,显示牌的剩余车位数响应的加1。
4、停车场剩余车位不足时,车辆只能在外面等待。
实现代码
package com.util;import lombok.extern.slf4j.Slf4j;import java.util.Random;
import java.util.concurrent.Semaphore;
import static java.lang.Thread.sleep;/*** @author : lssffy* @Description : 线程的信号量控制* @date : 2023/12/17 21:32*/
@Slf4j
public class SemaphoreTest {public static void main(String[] args) {//创建Semaphore对象数量20Semaphore sp = new Semaphore(20);//100个线程同时运行for (int i = 0; i < 8; i++) {new Thread(new Runnable() {@Overridepublic void run() {try{System.out.println("===" + Thread.currentThread().getName()+"来到停车场");if(sp.availablePermits() == 0){System.out.println("车位不足,请耐心等待");}sp.acquire();//获取令牌尝试进入停车场System.out.println(Thread.currentThread().getName() + "成功进入停车场");Thread.sleep(new Random().nextInt(10000));//模拟车辆在停车场停留的时间System.out.println(Thread.currentThread().getName() + "驶出停车场");sp.release();//释放令牌,腾出停车场车位}catch (InterruptedException e){e.printStackTrace();}}},i+"号车").start();}}
}
执行结果
Semaphore实现原理
Semaphore初始化
Semaphore sp = new Semaphore(20);
1、当调用new Semaphore(20)方式时,默认会创建一个非公平的锁的同步阻塞队列
2、把初始化令牌数量赋值给同步队列的state状态,state的值代表当前所剩余的令牌数量
获取令牌
sp.acquire();
1、当前线程会尝试去同步队列获取一个令牌,获取令牌的过程就是使用原子的操作去修改同步队列的state,获取一个令牌则修改为state=state-1
2、当计算出来的state<0,则代表令牌数量不足,此时会创建一个Node节点加入阻塞队列,挂起当前线程
3、当计算出来的state>=0,则表示获取令牌成功
/*** Acquires in shared mode, aborting if interrupted. Implemented* by first checking interrupt status, then invoking at least once* {@link #tryAcquireShared}, returning on success. Otherwise the* thread is queued, possibly repeatedly blocking and unblocking,* invoking {@link #tryAcquireShared} until success or the thread* is interrupted.* @param arg the acquire argument.* This value is conveyed to {@link #tryAcquireShared} but is* otherwise uninterpreted and can represent anything* you like.* @throws InterruptedException if the current thread is interrupted*/public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();//尝试获取令牌,arg为获取令牌个数,当可用令牌数量减去当前令牌数量结果小于0,则创建一个节点加入阻塞队列,挂起当前线程if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
/*** 1、创建节点,加入阻塞队列* 2、重双向链表的head、tail节点关系,清空无效节点* 3、挂起当前节点线程* Acquires in shared interruptible mode.* @param arg the acquire argument*/private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {//创建节点加入阻塞队列final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {//获得当前节点pre节点final Node p = node.predecessor();if (p == head) {//返回锁的stateint r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}//重组双向链表,清空无效节点,挂起当前线程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}
释放令牌
sp.release();
当调用sp.release();方法时
1、线程会尝试释放一个令牌,释放令牌的过程就是把同步队列的state修改为state=state+1的过程
2、释放令牌成功之后,同时唤醒同步队列的一个线程
3、被唤醒的节点会重新尝试去修改state=state-1的操作,如果state>=0则获取令牌成功,否则重新进入阻塞队列,挂起线程
/*** Releases in shared mode. Implemented by unblocking one or more* threads if {@link #tryReleaseShared} returns true.* 释放共享锁,同时会唤醒同步队列的一个线程* @param arg the release argument. This value is conveyed to* {@link #tryReleaseShared} but is otherwise uninterpreted* and can represent anything you like.* @return the value returned from {@link #tryReleaseShared}*/public final boolean releaseShared(int arg) {//释放共享锁if (tryReleaseShared(arg)) {//唤醒所有共享节点线程doReleaseShared();return true;}return false;}
/*** Release action for shared mode -- signals successor and ensures* propagation. (Note: For exclusive mode, release just amounts* to calling unparkSuccessor of head if it needs signal.)* 唤醒同步队列中的一个线程*/private void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases. This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;//是否需要唤醒后续节点if (ws == Node.SIGNAL) {//修改状态为初始0if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);//唤醒h.nex节点线程}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}