信号量Semaphore初探
1.信号量(Semaphore)简述
信号量Semaphore是java.util.concurrent包下一个常用的同步工具类,他维护了一个许可集,可以理解成资源数,可以通过aquire操作来获取一个资源,
并通过release来释放一个资源,但需要注意的是,release来释放资源前不一定要先通过acquire来获取一个资源,如果不断的release而不进行aquire将导致
资源数虚增,所以一定得我们自己来保证使用的正确性.
2.信号量(Semaphore)的使用场景
我们经常用信号量来管理可以重复使用的资源,比如数据库连接池、线程等,因为这些资源都有着可预估的上限,所以我们在初始化Semaphore时设定的许可数和我们需要管理
的资源数一致,获取一个资源时就通过aquire来获取一个许可,如果没有可用资源,则aquire将阻塞,释放一个资源时,通过release来释放一个许可.
3.为什么Semaphore没有单独重设信号量数量的方法呢(直接把AQS的setState方法暴露)?
说明:Semaphore没有直接提供更新许可总数方法的,虽然你可以单独通过acquire或release来特意减少或增加许可的总量.这是因为setState操作如果发生在某些使用该Semaphore的线程还没走完整个信号量的获取和释放流程时,将会直接导致state的值不准确.
解决:重新new Semaphoreprivate volatile Semaphore jdbcConnection = new Semaphore(10);public void resetJDBCConnection(int jdbcConnectionAmount){jdbcConnection=new Semaphore(jdbcConnectionAmount); }
注意:这里的jdbcConnection必须是volatile,这样暴力的修改时有风险的,因为你在修改时,jdbcConnection很可能正在被使用,比如进行了jdbcConnection.aquire()操作后,你把jdbcConnection给换成另一个新的Semaphore,所以在你使用jdbcConnection.release释放一个许可时,是在新的Semaphore进行release操作,所以新的Semaphore的许可数量被莫名的+1.有一种解决方案是可以使用局部变量记录下操作aquire时的Semaphore,并在进行release时使用该局部变量来进行release,这样保证了aquire和release是在同一个Semaphore上操作,这种方法简单有效,适合大多数的场景.
4.Semaphore源码解析
1、永久减少许可总数方法protected void reducePermits(int reduction){if(reduction<0) throw new IllegalArgumentException();sync.reducePermits(reduction);}说明:他需要子类来继承使用,不能传入负数来使许可数增加,因为sync.reducePermits(reduction);中对reduction值做了限制final void reducePermits(int reductions){for(;;){int current = getState();int next = current-reductions;if(next>current){throw new Error("Permit count underflow");}if(compareAndSetState(current, next)){return;}}}
2、初始化许可总数就是保存在AQS的state属性中Sync(int permits){setState(permits);}final int getPermits(){return getState();}
说明:Sync是Semaphore中实现的AQS的内部类
3、Sync中关于非公平获取信号量和获取信号量两个方法的实现final int nonfairTryAcquireShared(int acquires){for(;;){int available = getState();int remaining = available - acquires;if(remaining<0||compareAndSetState(available, remaining)){return remaining;}}}protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}
说明:从nonfairTryAcquireShared方法中可以看出,当需要获取acquires许可时,先看看可用的许可够不够,如果不够(remaining<0),则直接返回还差的许可数的负数,如果够,则从可用许可数中减去acquires,并返回剩余的可用的许可数.
3、nonfairTryAcquireShared在Semaphore中的tryAcquire方法使用public boolean tryAcquire() {return sync.nonfairTryAcquireShared(1) >= 0;}public boolean tryAcquire(int permits) {if (permits < 0) throw new IllegalArgumentException();return sync.nonfairTryAcquireShared(permits) >= 0;}
说明:对于tryAcquire的非阻塞方法,Semaphore的公平模式和非公平模式下的实现是一样的,它在能够获取足够许可时不需要进入队列而是直接拿到走人,不能获取足够许可时就直接返回,是非公平的的.注意对于tryAcquire的阻塞方法还是有公平和非公平之分的.tryReleaseShared是用来释放许可数,通过CAS操作指导成功将release数目的许可数加回到当前可用许可数中.Semaphore无法校验你获取的许可和释放的许可是否一一对应,因为获取和释放都是直接在AQS的state上操作,所以操作一段时间后,连AQS自己都忘记最初的state是什么值了,所以无法在中途来校验获取和释放是否正确,即使知道state的初始值,也很难在交错的获取和释放许可的操作过程中做一致性检查.
4、Semaphore的构造函数public Semaphore(int permits,boolean fair){sync = fair ? new FairSync(permits) : new NonfairSync(permits);}
说明:FairSync代表公平的Sync实现,而NonfairSync代表非公平的Sync实现.只要线程由于获取资源数失败而进入队列中后,就一定得等前面的节点获取完锁才有机会尝试获取锁,也就是说在AQS队列中的线程是绝对的公平的,因为队列本来就是先进先出,即先到先得.但如果你想获取的资源数现在就有,那么即使现在队列中有线程排队在等,你也可以不用进入队列而直接拿到你想要的资源,这就是非公平.公平的做法就是不管此时有没有可用的许可,只要线程队列中有线程在等,那么你就得去排队.
5、NonfairSync实现static final class NonfairSync extends Sync{private static final long serialVersionUID = -2694183684443567898L;NonfairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires){return nonfairTryAcquireShared(acquires);}}
6、FairSync实现static final class FairSync extends Sync{private static final long serialVersionUID = 2014338818796000944L;FairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {for(;;){if(hasQueuedPredecessors()){return -1;}int available =getState();int remaining=available-acquires;if(remaining<0||compareAndSetState(available,remaining)){return remaining;}}}}