JDK的java.util.concurrent.ThreadPoolExecutor
允许您将任务提交到线程池,并使用BlockingQueue
来保存提交的任务。 如果您要提交数千个任务,请指定一个“绑定”队列(即最大容量的队列),否则JVM可能会耗尽内存。 您可以设置RejectedExecutionHandler
来处理队列已满时发生的情况,但是仍然有待提交的任务。
这里是你展示如何使用一个简单的例子ThreadPoolExecutor
具有BlockingQueue
容量1000 CallerRunsPolicy
确保,当队列已满时,其他任务将由提交线程处理。
int numThreads = 5;
ExecutorService exec = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(1000),new ThreadPoolExecutor.CallerRunsPolicy());
这种方法的问题在于,当队列已满时,向池提交任务的线程会变得忙于执行任务本身,在此期间,队列可能会变空并且池中的线程可能会变得空闲。 这不是很有效。 我们希望一直保持线程池繁忙,并且工作队列始终处于饱和状态。
有多种解决方案。 其中之一是使用自定义的Executor
,该Executor
在队列已满时会阻塞(从而阻止其他任务提交到池)。 BlockingExecutor
的代码如下所示。 它基于Brian Goetz,2006年的BoundedExecutor
示例。Java Concurrency in Practice。 1版。 Addison-Wesley专业。 (第8.3.3节) 。
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** An executor which blocks and prevents further tasks from* being submitted to the pool when the queue is full.* <p>* Based on the BoundedExecutor example in:* Brian Goetz, 2006. Java Concurrency in Practice. (Listing 8.4)*/
public class BlockingExecutor extends ThreadPoolExecutor {private static final Logger LOGGER = LoggerFactory.getLogger(BlockingExecutor.class);private final Semaphore semaphore;/*** Creates a BlockingExecutor which will block and prevent further* submission to the pool when the specified queue size has been reached.** @param poolSize the number of the threads in the pool* @param queueSize the size of the queue*/public BlockingExecutor(final int poolSize, final int queueSize) {super(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());// the semaphore is bounding both the number of tasks currently executing// and those queued upsemaphore = new Semaphore(poolSize + queueSize);}/*** Executes the given task.* This method will block when the semaphore has no permits* i.e. when the queue has reached its capacity.*/@Overridepublic void execute(final Runnable task) {boolean acquired = false;do {try {semaphore.acquire();acquired = true;} catch (final InterruptedException e) {LOGGER.warn("InterruptedException whilst aquiring semaphore", e);}} while (!acquired);try {super.execute(task);} catch (final RejectedExecutionException e) {semaphore.release();throw e;}}/*** Method invoked upon completion of execution of the given Runnable,* by the thread that executed the task.* Releases a semaphore permit.*/@Overrideprotected void afterExecute(final Runnable r, final Throwable t) {super.afterExecute(r, t);semaphore.release();}
}
翻译自: https://www.javacodegeeks.com/2013/11/throttling-task-submission-with-a-blockingexecutor-2.html