使用信号量解决并发问题
本文主要讨论的是java的信号量在并发过程中的应用。信号量
Semaphore
的具体实现如下:
package java.util.concurrent;
import java.util.Collection;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;public class Semaphore implements java.io.Serializable {private static final long serialVersionUID = -3222578661600680210L;/** 同步机制基于AQS实现 **/private final Sync sync;/*** 信号量的同步器实现,使用AQS的状态作为许可,子类提供了公平和非公平版本。*/abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 1192457210091910933L;Sync(int permits) {setState(permits);}final int getPermits() {return getState();}final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}final void reducePermits(int reductions) {for (;;) {int current = getState();int next = current - reductions;if (next > current) // underflowthrow new Error("Permit count underflow");if (compareAndSetState(current, next))return;}}final int drainPermits() {for (;;) {int current = getState();if (current == 0 || compareAndSetState(current, 0))return current;}}}/*** 非公平版本*/static final class NonfairSync extends Sync {private static final long serialVersionUID = -2694183684443567898L;NonfairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}}/*** 公平版本*/static final class FairSync extends Sync {private static final long serialVersionUID = 2014338818796000944L;FairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}}/*** 使用给定数量的许可创建一个非公平的信号量对象*/public Semaphore(int permits) {sync = new NonfairSync(permits);}/*** 使用指定的许可创建一个可选公平或非公平版本的信号量对象*/public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}/** * 从信号量对象中获取一个许可,会阻塞等待许可可用或者线程被中断,同时可用的许可数量减一。*/public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);}/*** 从信号量对象中获取一个许可,会阻塞等待许可可用,同时可用的许可数量减一。 */public void acquireUninterruptibly() {sync.acquireShared(1);}/*** 从信号量对象中获取一个许可,当且仅当调用时可用则立即返回true,否则直接给返回false。*/public boolean tryAcquire() {return sync.nonfairTryAcquireShared(1) >= 0;}/*** 从信号量对象中获取一个许可,如果许可在给定等待时间内变得可用并且当前线程未被中断。 */public boolean tryAcquire(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}/*** 释放一个许可,返回给信号量。 */public void release() {sync.releaseShared(1);}/*** 从信号量中获取给定数量的许可,阻塞等待许可可用。*/public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits);}/*** 从信号量中获取给定数量的许可,阻塞等待所有的许可可用。*/public void acquireUninterruptibly(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.acquireShared(permits);}/*** 从信号量中获取给定数量的许可,当且仅当本次访问时所有许可可用。*/public boolean tryAcquire(int permits) {if (permits < 0) throw new IllegalArgumentException();return sync.nonfairTryAcquireShared(permits) >= 0;}/*** 从信号量中获取给定数量的许可,如果许可在给定等待时间内变得可用并且当前线程未被中断。 */public boolean tryAcquire(int permits, long timeout, TimeUnit unit)throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));}/*** 释放给定数量的许可,返回信号量。 */public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.releaseShared(permits);}/*** 获取当前信号量中可用的许可*/public int availablePermits() {return sync.getPermits();}/*** 获取立即可用的许可*/public int drainPermits() {return sync.drainPermits();}/*** 通过指示的减少减少可用许可证的数量。*/protected void reducePermits(int reduction) {if (reduction < 0) throw new IllegalArgumentException();sync.reducePermits(reduction);}/*** 如果此信号量的公平性设置为true,则返回true*/public boolean isFair() {return sync instanceof FairSync;}/*** 查询是否有线程在等待获取*/public final boolean hasQueuedThreads() {return sync.hasQueuedThreads();}/*** 获取等待此锁的估计线程数*/public final int getQueueLength() {return sync.getQueueLength();}/*** 获取包含等待获取许可的线程集合*/protected Collection<Thread> getQueuedThreads() {return sync.getQueuedThreads();}/*** 获取标识信号量的字符串*/public String toString() {return super.toString() + "[Permits = " + sync.getPermits() + "]";}
}
semaphore示例:
package org.dsg.jdk.locks.aqs;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.Semaphore;@Slf4j(topic = "c.semaphoreInstance")
public class SemaphoreInstance {public static void main(String[] args) {//1.创建信号量Semaphore semaphore = new Semaphore(3);//2.创建线程使用信号量for(int i=0;i<10;i++){new Thread(()->{try {//运行前需要获得许可semaphore.acquire();log.info("running...");Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {//使用完毕释放许可semaphore.release();}log.info("end...");}).start();}}}