在日常开发中,如果需要使用到多线程,最简单的方式是 new Thread,但是这种方式有很大弊端:
- 首先new Thread 是比较消耗系统性能的,性能比较差;
- 线程缺乏统一的管理,会无限制的创建新线程,相互之间竞争资源或者锁,可能占用过多的资源导致系统党纪或者OOM;
- 缺少必要的功能,比如定时执行,线程中断,设定最长时间等功能。
相比于直接 new Thread,java提供了四种线程池,它的好处是:
- 可以重用存在的线程,见面少线程对象的创建、销毁等资源的开销,性能较好;
- 可以统一管理线程,提高系统资源利用率,避免线程之间竞争系统资源,避免阻塞;
- 可以提供定时执行、并发数控制,线程中断等能力。
Java通过Executors提供了四种线程池,分别是:
- newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
- newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
- newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
- newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
但实际上,实际开发中是不允许使用上面4类线程池的,而是要求我们自定义线程池。下面我们就看下Java自带线程池的缺点。
- 像 newFixedThreadPool 和 SingleThreadPool 都是固定长度的线程池,一旦请求量增加,就会堆积大量的等待线程在阻塞队列,而阻塞队列的长度允许是 Integer.MAX_VALUE,很容易会造成内存溢出的问题。
- 像 newCachedThreadPool 和 newScheduledThreadPool 这2个线程池中允许创建的最大线程数量是 Integer.MAX_VALUE,一但请求数量增加,势必会创建大量线程,系统的性能降低,很容易出现宕机的情况。
下面这个代码案例,是开发当中比较常用常用的一个线程池,其中核心线程数是服务器内核数的3倍,最大线程数是服务器内核数的4倍,默认的任务队列数是10000个。
public class ThreadPoolUtil {/*** 获取当前系统可用的处理器核的数量*/private static final int CORE_NUM = Runtime.getRuntime().availableProcessors();/*** 默认任务队列长度*/private static final int DEFAULT_TASK_QUEUE_SIZE = 10000;private ThreadPoolUtil() {}/*** 核心线程内核3倍,最大线程内核4倍,60等待超时销毁线程* 如果使用,线程数量需要考虑任务数量场景*/private static class ThreadPoolHolder {private static final ThreadPoolExecutor INSTANCE = new ThreadPoolExecutor(CORE_NUM * 3, CORE_NUM * 4,60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(DEFAULT_TASK_QUEUE_SIZE),new ThreadFactoryBuilder().setNameFormat("OrderDispatchThreadPool-%s").build(),new ThreadPoolExecutor.CallerRunsPolicy());}public static ThreadPoolExecutor getInstance() {return ThreadPoolHolder.INSTANCE;}public static Future async(Runnable task) {return getInstance().submit(task);}public static Future async(Callable task) {return getInstance().submit(task);}public static <P> Future async(Consumer<P> method, P param) {return getInstance().submit(() -> method.accept(param));}/*** 有两个参数但是无返回值的异步执行方法, 如void noStaticFoo(Long id,Entity entity)** @param method 要执行的方法,如 , user::noStaticFoo* @param param1 第一个入参值,如id* @param param2 第二个入参值,如entity* @param <P1> 第一个入参类型* @param <P2> 第二个入参类型* @return Future对象,用以判断是否执行结束*/public static <P1, P2> Future async(BiConsumer<P1, P2> method, P1 param1, P2 param2) {return getInstance().submit(() -> method.accept(param1, param2));}/*** 单个入参,有返回值的异步执行方法 , Entity noStaticFoo(Long id)** @param method 要执行的方法,如 , user::noStaticFoo* @param param 入参值,如 id* @param <P> 入参类型,如Long* @param <R> 返回值类型,如 Entity* @return Future对象,用以判断是否执行结束、获取返回结果*/public static <P, R> Future<R> async(Function<P, R> method, P param) {return getInstance().submit(() -> method.apply(param));}/*** 两个入参,有返回值的异步执行方法 , Entity noStaticFoo(Long id)** @param method 要执行的方法,如 , user::noStaticFoo* @param param1 第一个入参值,如id* @param param2 二个入参值,如entity* @param <P1> 第一个入参类型* @param <P2> 第二个入参类型* @param <R> 返回值类型,如 Entity* @return Future对象,用以判断是否执行结束、获取返回结果*/public static <P1, P2, R> Future<R> async(BiFunction<P1, P2, R> method, P1 param1, P2 param2) {return getInstance().submit(() -> method.apply(param1, param2));}
}