Java的自定义线程池
自定义线程池的原因
在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面使线程的创建更加规范,可以合理控制开辟线程的数量;另一方面线程的细节管理交给线程池处理,优化了资源的开销。而线程池不允许使用Executors去创建,而要通过ThreadPoolExecutor方式,这一方面是由于jdk中Executor框架虽然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等创建线程池的方法,但都有其局限性,不够灵活,而且有资源耗尽的风险(OOM - Out Of Memory )。
一般我们创建线程池时,为防止资源被耗尽,任务队列都会选择创建有界任务队列,但这种模式下如果出现任务队列已满且线程池创建的线程数达到你设置的最大线程数时,这时就需要你指定ThreadPoolExecutor的RejectedExecutionHandler参数即合理的拒绝策略,来处理线程池"超载"的情况
自定义线程池
在Java中,可以通过ThreadPoolExecutor
类来自定义线程池。
import java.util.concurrent.*;public class CustomThreadPoolExample {public static void main(String[] args) {int corePoolSize = 2;int maxPoolSize = 4;long keepAliveTime = 10;int queueSize = 2;ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveTime,TimeUnit.SECONDS,new ArrayBlockingQueue<>(queueSize));for (int i = 1; i <= 6; i++) {final int task = i;threadPool.execute(() -> {System.out.println("Task " + task + " is running on thread: " + Thread.currentThread().getName());try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("Task " + task + " is completed");});}threadPool.shutdown();}
}
分析:在这个示例中,我们创建了一个ThreadPoolExecutor
实例,指定了核心线程数、最大线程数、线程空闲时间、工作队列等参数。然后通过execute
方法提交任务给线程池执行。任务会在空闲线程中执行,如果线程数达到最大线程数,多余的任务会被放入工作队列中等待执行。最后记得调用shutdown
方法关闭线程池。
注意:你可以根据自己的需求调整线程池的参数,以满足不同的场景要求。
自定义拒绝策略的原因
自定义拒绝策略是为了更好地控制线程池在无法接受新任务时的行为,根据实际需求来定义适合当前场景的处理方式。以下是一些常见的情况,说明为什么需要自定义拒绝策略:
- 避免任务丢失: 默认的拒绝策略是直接抛出
RejectedExecutionException
异常,导致提交的任务丢失。如果希望在任务无法执行时进行处理,比如记录日志、重新提交任务等,就需要自定义拒绝策略来避免任务丢失。- 优雅降级: 在系统负载过高或资源不足时,可以通过自定义拒绝策略来实现任务的优雅降级,比如拒绝新任务、延迟处理、限流等,以保护系统稳定性。
- 灵活适配业务需求: 不同的业务场景可能需要不同的处理方式,通过自定义拒绝策略可以灵活适配业务需求,实现定制化的处理逻辑。
- 监控和报警: 自定义拒绝策略可以用于监控线程池的状态,比如当线程池达到一定负载时触发报警机制,及时发现问题并进行处理。
总之,自定义拒绝策略可以帮助开发人员更好地控制线程池的行为,提高系统的稳定性和可维护性,同时根据具体业务需求来定制适合的处理方式。
自定义拒绝策略
要自定义拒绝策略,你可以实现RejectedExecutionHandler
接口,并重写rejectedExecution
方法来定义自己的拒绝逻辑。
import java.util.concurrent.*;public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println("Task rejected. Custom handling logic here.");// 可以根据自己的需求实现不同的拒绝逻辑,比如抛出异常、记录日志等}public static void main(String[] args) {int corePoolSize = 2;int maxPoolSize = 4;long keepAliveTime = 10;int queueSize = 2;ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveTime,TimeUnit.SECONDS,new ArrayBlockingQueue<>(queueSize),new CustomRejectedExecutionHandler());for (int i = 1; i <= 6; i++) {final int task = i;threadPool.execute(() -> {System.out.println("Task " + task + " is running on thread: " + Thread.currentThread().getName());try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("Task " + task + " is completed");});}threadPool.shutdown();}
}
分析:在这个示例中,我们创建了一个CustomRejectedExecutionHandler
类来实现自定义的拒绝策略,重写了rejectedExecution
方法。然后在创建ThreadPoolExecutor
实例时,将这个自定义的拒绝策略传入构造函数中。
注意:当线程池无法接受新任务时,就会调用自定义的拒绝策略中的rejectedExecution
方法来处理。你可以根据具体需求,在这个方法中实现自定义的拒绝逻辑,比如抛出异常、记录日志、丢弃任务等。
示例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Test01 {/*** 知识点:自定义线程池*/public static void main(String[] args) {FastThreadPool pool = new FastThreadPool(5, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));for (int i = 1; i <= 100; i++) {Task task = new Task(i);pool.execute(task);}//设置为true后,闲置时间一旦到达,核心线程也会被销毁//经验:我们一般不会回收核心线程,因为设置回收后线程池中的线程有可能为0,这样就没有线程复用率的说法了//pool.allowCoreThreadTimeOut(true);pool.shutdown();}
}//自定义线程池
public class FastThreadPool extends ThreadPoolExecutor{public FastThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new FastThreadFactory(), new FastRejectedExecutionHandler());}public FastThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);}@Overrideprotected void beforeExecute(Thread t, Runnable r) {System.out.println("线程执行任务之前调用 -- " + r);}@Overrideprotected void afterExecute(Runnable r, Throwable t) {System.out.println("线程执行任务之后调用 -- " + r);}@Overrideprotected void terminated() {System.out.println("线程池关闭时调用");}//自定义线程工厂private static class FastThreadFactory implements ThreadFactory{private int num;//线程编号@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("自定义线程池中的线程"+num);t.setPriority(Thread.MAX_PRIORITY);num++;return t;}}//自定义拒绝策略private static class FastRejectedExecutionHandler implements RejectedExecutionHandler{@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println("任务被拒绝了~~~");}}}
public class Task implements Runnable{private int num;public Task(int num) {this.num = num;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + "处理了第" + num + "个任务");}@Overridepublic String toString() {return "任务" + num;}
}