线程池管理工作线程池,它包含一个队列,使任务等待执行。 线程池管理可运行线程的集合,工作线程从队列中执行可运行线程。 java.util.concurrent.Executors提供java.util.concurrent.Executor接口的实现,以在Java中创建线程池。
让我们编写一个简单的程序来说明它的工作原理。
首先,我们需要有一个Runnable类。
package com.journaldev.threadpool;public class WorkerThread implements Runnable {private String command;public WorkerThread(String s){this.command=s;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName()+' Start. Command = '+command);processCommand();System.out.println(Thread.currentThread().getName()+' End.');}private void processCommand() {try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic String toString(){return this.command;}
}
这是我们从Executors框架创建固定线程池的测试程序。
package com.journaldev.threadpool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class SimpleThreadPool {public static void main(String[] args) {ExecutorService executor = Executors.newFixedThreadPool(5);for (int i = 0; i < 10; i++) {Runnable worker = new WorkerThread('' + i);executor.execute(worker);}executor.shutdown();while (!executor.isTerminated()) {}System.out.println('Finished all threads');}}
在上面的程序中,我们正在创建5个工作线程的固定大小的线程池。 然后,我们将10个作业提交到该池中,因为该池的大小为5,它将开始处理5个作业,其他作业将处于等待状态,一旦其中一个作业完成,来自等待队列的另一个作业将被工作线程拾取并执行。
这是上面程序的输出。
pool-1-thread-2 Start. Command = 1
pool-1-thread-4 Start. Command = 3
pool-1-thread-1 Start. Command = 0
pool-1-thread-3 Start. Command = 2
pool-1-thread-5 Start. Command = 4
pool-1-thread-4 End.
pool-1-thread-5 End.
pool-1-thread-1 End.
pool-1-thread-3 End.
pool-1-thread-3 Start. Command = 8
pool-1-thread-2 End.
pool-1-thread-2 Start. Command = 9
pool-1-thread-1 Start. Command = 7
pool-1-thread-5 Start. Command = 6
pool-1-thread-4 Start. Command = 5
pool-1-thread-2 End.
pool-1-thread-4 End.
pool-1-thread-3 End.
pool-1-thread-5 End.
pool-1-thread-1 End.
Finished all threads
输出确认池中有五个线程来自“ pool-1-thread-1? 到“ pool-1-thread-5”? 他们负责执行提交给池的任务。
Executors类提供简单实现的ExecutorService的使用的ThreadPoolExecutor但ThreadPoolExecutor的提供了更多的功能不止于此。 我们可以指定创建ThreadPoolExecutor实例时仍处于活动状态的线程数,并且可以限制线程池的大小,并创建自己的RejectedExecutionHandler实现以处理无法容纳在工作队列中的作业。
这是我们对RejectedExecutionHandler接口的自定义实现。
package com.journaldev.threadpool;import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println(r.toString() + ' is rejected');}}
ThreadPoolExecutor提供了几种方法,通过这些方法我们可以找到执行器的当前状态,池大小,活动线程数和任务数。 因此,我有一个监视线程,它将在特定时间间隔打印执行程序信息。
package com.journaldev.threadpool;import java.util.concurrent.ThreadPoolExecutor;public class MyMonitorThread implements Runnable
{private ThreadPoolExecutor executor;private int seconds;private boolean run=true;public MyMonitorThread(ThreadPoolExecutor executor, int delay){this.executor = executor;this.seconds=delay;}public void shutdown(){this.run=false;}@Overridepublic void run(){while(run){System.out.println(String.format('[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s',this.executor.getPoolSize(),this.executor.getCorePoolSize(),this.executor.getActiveCount(),this.executor.getCompletedTaskCount(),this.executor.getTaskCount(),this.executor.isShutdown(),this.executor.isTerminated()));try {Thread.sleep(seconds*1000);} catch (InterruptedException e) {e.printStackTrace();}}}
}
这是使用ThreadPoolExecutor的线程池实现示例。
package com.journaldev.threadpool;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class WorkerPool {public static void main(String args[]) throws InterruptedException{//RejectedExecutionHandler implementationRejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();//Get the ThreadFactory implementation to useThreadFactory threadFactory = Executors.defaultThreadFactory();//creating the ThreadPoolExecutorThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler);//start the monitoring threadMyMonitorThread monitor = new MyMonitorThread(executorPool, 3);Thread monitorThread = new Thread(monitor);monitorThread.start();//submit work to the thread poolfor(int i=0; i<10; i++){executorPool.execute(new WorkerThread('cmd'+i));}Thread.sleep(30000);//shut down the poolexecutorPool.shutdown();//shut down the monitor threadThread.sleep(5000);monitor.shutdown();}
}
请注意,在初始化ThreadPoolExecutor时,我们将初始池大小保持为2,最大池大小保持为4,工作队列大小保持为2。因此,如果有4个正在运行的任务并且提交了更多任务,则工作队列将仅容纳其中2个其余的将由RejectedExecutionHandlerImpl处理。
这是上述程序的输出,确认上述声明。
pool-1-thread-1 Start. Command = cmd0
pool-1-thread-4 Start. Command = cmd5
cmd6 is rejected
pool-1-thread-3 Start. Command = cmd4
pool-1-thread-2 Start. Command = cmd1
cmd7 is rejected
cmd8 is rejected
cmd9 is rejected
[monitor] [0/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false
[monitor] [4/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false
pool-1-thread-4 End.
pool-1-thread-1 End.
pool-1-thread-2 End.
pool-1-thread-3 End.
pool-1-thread-1 Start. Command = cmd3
pool-1-thread-4 Start. Command = cmd2
[monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false
[monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false
pool-1-thread-1 End.
pool-1-thread-4 End.
[monitor] [4/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true
[monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true
注意执行程序的活动,已完成和已完成任务总数的变化。 我们可以调用shutdown()方法来完成所有提交的任务的执行并终止线程池。
参考: Java线程池示例,使用我们的JCG合作伙伴 Pankaj Kumar的Executors和ThreadPoolExecutor ,位于Developer Recipes博客上。
翻译自: https://www.javacodegeeks.com/2013/01/java-thread-pool-example-using-executors-and-threadpoolexecutor.html